diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc index f4f573276c..5493382e1a 100644 --- a/webrtc/api/quicdatachannel.cc +++ b/webrtc/api/quicdatachannel.cc @@ -61,10 +61,12 @@ bool ParseQuicDataMessageHeader(const char* data, QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, rtc::Thread* worker_thread, + rtc::Thread* network_thread, const std::string& label, const DataChannelInit& config) : signaling_thread_(signaling_thread), worker_thread_(worker_thread), + network_thread_(network_thread), id_(config.id), state_(kConnecting), buffered_amount_(0), @@ -91,12 +93,12 @@ bool QuicDataChannel::Send(const DataBuffer& buffer) { << " is not open so cannot send."; return false; } - return worker_thread_->Invoke( - RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); + return network_thread_->Invoke( + RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_n, this, buffer)); } -bool QuicDataChannel::Send_w(const DataBuffer& buffer) { - RTC_DCHECK(worker_thread_->IsCurrent()); +bool QuicDataChannel::Send_n(const DataBuffer& buffer) { + RTC_DCHECK(network_thread_->IsCurrent()); // Encode and send the header containing the data channel ID and message ID. rtc::CopyOnWriteBuffer header; @@ -256,7 +258,7 @@ DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() { } void QuicDataChannel::OnIncomingMessage(Message&& message) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); RTC_DCHECK(message.stream); if (!observer_) { LOG(LS_WARNING) << "QUIC data channel " << id_ @@ -295,7 +297,7 @@ void QuicDataChannel::OnIncomingMessage(Message&& message) { void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, const char* data, size_t len) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); RTC_DCHECK(data); const auto& kv = incoming_quic_messages_.find(stream_id); if (kv == incoming_quic_messages_.end()) { @@ -325,7 +327,7 @@ void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, } void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); RTC_DCHECK(channel == quic_transport_channel_); LOG(LS_INFO) << "QuicTransportChannel is ready to send"; invoker_.AsyncInvoke( @@ -342,7 +344,7 @@ void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, int error) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; incoming_quic_messages_.erase(stream_id); } diff --git a/webrtc/api/quicdatachannel.h b/webrtc/api/quicdatachannel.h index a6b987b144..18a10acbdf 100644 --- a/webrtc/api/quicdatachannel.h +++ b/webrtc/api/quicdatachannel.h @@ -88,6 +88,7 @@ class QuicDataChannel : public rtc::RefCountedObject, QuicDataChannel(rtc::Thread* signaling_thread, rtc::Thread* worker_thread, + rtc::Thread* network_thread, const std::string& label, const DataChannelInit& config); ~QuicDataChannel() override; @@ -155,11 +156,13 @@ class QuicDataChannel : public rtc::RefCountedObject, void OnReadyToSend(cricket::TransportChannel* channel); void OnConnectionClosed(); - // Worker thread methods. + // Network thread methods. // Sends the data buffer to the remote peer using an outgoing QUIC stream. // Returns true if the data buffer can be successfully sent, or if it is // queued to be sent later. - bool Send_w(const DataBuffer& buffer); + bool Send_n(const DataBuffer& buffer); + + // Worker thread methods. // Connects the |quic_transport_channel_| signals to this QuicDataChannel, // then returns the new QuicDataChannel state. DataState SetTransportChannel_w(); @@ -185,8 +188,10 @@ class QuicDataChannel : public rtc::RefCountedObject, cricket::QuicTransportChannel* quic_transport_channel_ = nullptr; // Signaling thread for DataChannelInterface methods. rtc::Thread* const signaling_thread_; - // Worker thread for sending data and |quic_transport_channel_| callbacks. + // Worker thread for |quic_transport_channel_| callbacks. rtc::Thread* const worker_thread_; + // Network thread for sending data and |quic_transport_channel_| callbacks. + rtc::Thread* const network_thread_; rtc::AsyncInvoker invoker_; // Map of QUIC stream ID => ReliableQuicStream* for write blocked QUIC // streams. diff --git a/webrtc/api/quicdatachannel_unittest.cc b/webrtc/api/quicdatachannel_unittest.cc index e701c29b4f..7245ccfa21 100644 --- a/webrtc/api/quicdatachannel_unittest.cc +++ b/webrtc/api/quicdatachannel_unittest.cc @@ -120,8 +120,9 @@ class FakeQuicDataTransport : public sigslot::has_slots<> { DataChannelInit config; config.id = id; config.protocol = protocol; - rtc::scoped_refptr data_channel(new QuicDataChannel( - rtc::Thread::Current(), rtc::Thread::Current(), label, config)); + rtc::scoped_refptr data_channel( + new QuicDataChannel(rtc::Thread::Current(), rtc::Thread::Current(), + rtc::Thread::Current(), label, config)); data_channel_by_id_[id] = data_channel; return data_channel; } @@ -201,8 +202,6 @@ class QuicDataChannelPeer { // Connects |ice_transport_channel_| to that of the other peer. void Connect(QuicDataChannelPeer* other_peer) { - ice_transport_channel_->Connect(); - other_peer->ice_transport_channel_->Connect(); ice_transport_channel_->SetDestination(other_peer->ice_transport_channel_); } diff --git a/webrtc/api/quicdatatransport.cc b/webrtc/api/quicdatatransport.cc index 70ad03dbfd..c1caf54067 100644 --- a/webrtc/api/quicdatatransport.cc +++ b/webrtc/api/quicdatatransport.cc @@ -17,10 +17,14 @@ namespace webrtc { QuicDataTransport::QuicDataTransport(rtc::Thread* signaling_thread, - rtc::Thread* worker_thread) - : signaling_thread_(signaling_thread), worker_thread_(worker_thread) { + rtc::Thread* worker_thread, + rtc::Thread* network_thread) + : signaling_thread_(signaling_thread), + worker_thread_(worker_thread), + network_thread_(network_thread) { RTC_DCHECK(signaling_thread_); RTC_DCHECK(worker_thread_); + RTC_DCHECK(network_thread_); } QuicDataTransport::~QuicDataTransport() {} @@ -68,8 +72,8 @@ rtc::scoped_refptr QuicDataTransport::CreateDataChannel( LOG(LS_ERROR) << "QUIC data channel already exists with id " << config->id; return nullptr; } - rtc::scoped_refptr data_channel( - new QuicDataChannel(signaling_thread_, worker_thread_, label, *config)); + rtc::scoped_refptr data_channel(new QuicDataChannel( + signaling_thread_, worker_thread_, network_thread_, label, *config)); if (quic_transport_channel_) { if (!data_channel->SetTransportChannel(quic_transport_channel_)) { LOG(LS_ERROR) diff --git a/webrtc/api/quicdatatransport.h b/webrtc/api/quicdatatransport.h index f0c427d1b5..96fe2a0ad7 100644 --- a/webrtc/api/quicdatatransport.h +++ b/webrtc/api/quicdatatransport.h @@ -36,7 +36,9 @@ namespace webrtc { // exists, it sends the QUIC stream to the QuicDataChannel. class QuicDataTransport : public sigslot::has_slots<> { public: - QuicDataTransport(rtc::Thread* signaling_thread, rtc::Thread* worker_thread); + QuicDataTransport(rtc::Thread* signaling_thread, + rtc::Thread* worker_thread, + rtc::Thread* network_thread); ~QuicDataTransport() override; // Sets the QUIC transport channel for the QuicDataChannels and the @@ -80,9 +82,10 @@ class QuicDataTransport : public sigslot::has_slots<> { quic_stream_by_id_; // QuicTransportChannel for sending/receiving data. cricket::QuicTransportChannel* quic_transport_channel_ = nullptr; - // Signaling and worker threads for the QUIC data channel. + // Threads for the QUIC data channel. rtc::Thread* const signaling_thread_; rtc::Thread* const worker_thread_; + rtc::Thread* const network_thread_; }; } // namespace webrtc diff --git a/webrtc/api/quicdatatransport_unittest.cc b/webrtc/api/quicdatatransport_unittest.cc index d668c55b0b..975898ef1f 100644 --- a/webrtc/api/quicdatatransport_unittest.cc +++ b/webrtc/api/quicdatatransport_unittest.cc @@ -64,7 +64,9 @@ class FakeObserver : public DataChannelObserver { class QuicDataTransportPeer { public: QuicDataTransportPeer() - : quic_data_transport_(rtc::Thread::Current(), rtc::Thread::Current()), + : quic_data_transport_(rtc::Thread::Current(), + rtc::Thread::Current(), + rtc::Thread::Current()), ice_transport_channel_(new FakeTransportChannel("data", 0)), quic_transport_channel_(ice_transport_channel_) { ice_transport_channel_->SetAsync(true); @@ -80,8 +82,6 @@ class QuicDataTransportPeer { // Connects |ice_transport_channel_| to that of the other peer. void Connect(QuicDataTransportPeer* other_peer) { - ice_transport_channel_->Connect(); - other_peer->ice_transport_channel_->Connect(); ice_transport_channel_->SetDestination(other_peer->ice_transport_channel_); } diff --git a/webrtc/p2p/quic/quicsession_unittest.cc b/webrtc/p2p/quic/quicsession_unittest.cc index 2f3aaae332..65996e5c1a 100644 --- a/webrtc/p2p/quic/quicsession_unittest.cc +++ b/webrtc/p2p/quic/quicsession_unittest.cc @@ -295,8 +295,6 @@ void QuicSessionTest::CreateClientAndServerSessions() { channel2->SetAsync(true); // Configure peers to send packets to each other. - channel1->Connect(); - channel2->Connect(); channel1->SetDestination(channel2.get()); client_peer_ = CreateSession(std::move(channel1), Perspective::IS_CLIENT); diff --git a/webrtc/p2p/quic/quictransportchannel.cc b/webrtc/p2p/quic/quictransportchannel.cc index e86fe6a0b2..29819c6269 100644 --- a/webrtc/p2p/quic/quictransportchannel.cc +++ b/webrtc/p2p/quic/quictransportchannel.cc @@ -395,7 +395,8 @@ void QuicTransportChannel::OnRouteChange(TransportChannel* channel, void QuicTransportChannel::OnSelectedCandidatePairChanged( TransportChannel* channel, CandidatePairInterface* selected_candidate_pair, - int last_sent_packet_id bool ready_to_send) { + int last_sent_packet_id, + bool ready_to_send) { ASSERT(channel == channel_.get()); SignalSelectedCandidatePairChanged(this, selected_candidate_pair, last_sent_packet_id, ready_to_send); diff --git a/webrtc/p2p/quic/quictransportchannel.h b/webrtc/p2p/quic/quictransportchannel.h index 22a33eaf0f..1ab13fa0b2 100644 --- a/webrtc/p2p/quic/quictransportchannel.h +++ b/webrtc/p2p/quic/quictransportchannel.h @@ -166,9 +166,6 @@ class QuicTransportChannel : public TransportChannelImpl, void SetIceConfig(const IceConfig& config) override { channel_->SetIceConfig(config); } - void Connect() override { - channel_->Connect(); - } // QuicPacketWriter overrides. // Called from net::QuicConnection when |quic_| has packets to write. diff --git a/webrtc/p2p/quic/quictransportchannel_unittest.cc b/webrtc/p2p/quic/quictransportchannel_unittest.cc index 0e16390a89..49ca29cd14 100644 --- a/webrtc/p2p/quic/quictransportchannel_unittest.cc +++ b/webrtc/p2p/quic/quictransportchannel_unittest.cc @@ -112,8 +112,6 @@ class QuicTestPeer : public sigslot::has_slots<> { // Connects |ice_channel_| to that of the other peer. void Connect(QuicTestPeer* other_peer) { - ice_channel_->Connect(); - other_peer->ice_channel_->Connect(); ice_channel_->SetDestination(other_peer->ice_channel_); } @@ -419,8 +417,6 @@ TEST_F(QuicTransportChannelTest, TransferInvalidSrtp) { // Test that QuicTransportChannel::WritePacket blocks when the ICE // channel is not writable, and otherwise succeeds. TEST_F(QuicTransportChannelTest, QuicWritePacket) { - peer1_.ice_channel()->Connect(); - peer2_.ice_channel()->Connect(); peer1_.ice_channel()->SetDestination(peer2_.ice_channel()); std::string packet = "FAKEQUICPACKET"; diff --git a/webrtc/p2p/quic/reliablequicstream_unittest.cc b/webrtc/p2p/quic/reliablequicstream_unittest.cc index cf9f5e92dd..ff517afb58 100644 --- a/webrtc/p2p/quic/reliablequicstream_unittest.cc +++ b/webrtc/p2p/quic/reliablequicstream_unittest.cc @@ -49,6 +49,7 @@ using rtc::SR_BLOCK; // Arbitrary number for a stream's write blocked priority. static const SpdyPriority kDefaultPriority = 3; +static const net::QuicStreamId kStreamId = 5; // QuicSession that does not create streams and writes data from // ReliableQuicStream to a string. @@ -78,7 +79,7 @@ class MockQuicSession : public QuicSession { net::ReliableQuicStream* CreateIncomingDynamicStream( QuicStreamId id) override { - return nullptr; + return new ReliableQuicStream(kStreamId, this); } net::ReliableQuicStream* CreateOutgoingDynamicStream( @@ -142,7 +143,6 @@ class ReliableQuicStreamTest : public ::testing::Test, ReliableQuicStreamTest() {} void CreateReliableQuicStream() { - const net::QuicStreamId kStreamId = 5; // Arbitrary values for QuicConnection. QuicConnectionHelper* quic_helper = @@ -232,7 +232,7 @@ TEST_F(ReliableQuicStreamTest, BufferData) { // Read an entire string. TEST_F(ReliableQuicStreamTest, ReadDataWhole) { CreateReliableQuicStream(); - net::QuicStreamFrame frame(-1, false, 0, "Hello, World!"); + net::QuicStreamFrame frame(kStreamId, false, 0, "Hello, World!"); stream_->OnStreamFrame(frame); EXPECT_EQ("Hello, World!", read_buffer_); @@ -241,7 +241,7 @@ TEST_F(ReliableQuicStreamTest, ReadDataWhole) { // Read part of a string. TEST_F(ReliableQuicStreamTest, ReadDataPartial) { CreateReliableQuicStream(); - net::QuicStreamFrame frame(-1, false, 0, "Hello, World!"); + net::QuicStreamFrame frame(kStreamId, false, 0, "Hello, World!"); frame.frame_length = 5; stream_->OnStreamFrame(frame);