From 70035cae4d4cd2906a244e420830bb9f7b6e3391 Mon Sep 17 00:00:00 2001 From: mikescarlett Date: Fri, 29 Apr 2016 18:14:37 -0700 Subject: [PATCH] Fix QuicSession to unbuffer data when the QuicTransportChannel reconnects The QuicWriteBlockedList needs to register outgoing QUIC streams so that when the QuicTransportChannel becomes unwritable and QUIC streams have buffered data, they can send data once the QuicTransportChannel becomes writable. Otherwise the QUIC streams will remain write blocked after the QuicTransportChannel is writable. BUG= Review-Url: https://codereview.webrtc.org/1888903002 Cr-Commit-Position: refs/heads/master@{#12573} --- webrtc/p2p/quic/quicsession.cc | 28 ++++++++++++++--- webrtc/p2p/quic/quicsession.h | 4 ++- webrtc/p2p/quic/quictransportchannel.cc | 2 +- .../p2p/quic/quictransportchannel_unittest.cc | 30 ++++++++++++++++++- 4 files changed, 57 insertions(+), 7 deletions(-) diff --git a/webrtc/p2p/quic/quicsession.cc b/webrtc/p2p/quic/quicsession.cc index d451f891ba..65ed135754 100644 --- a/webrtc/p2p/quic/quicsession.cc +++ b/webrtc/p2p/quic/quicsession.cc @@ -20,6 +20,10 @@ namespace cricket { +// Default priority for incoming QUIC streams. +// TODO(mikescarlett): Determine if this value is correct. +static const net::SpdyPriority kDefaultPriority = 3; + QuicSession::QuicSession(std::unique_ptr connection, const net::QuicConfig& config) : net::QuicSession(connection.release(), config) {} @@ -62,9 +66,19 @@ void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { } } +void QuicSession::CloseStream(net::QuicStreamId stream_id) { + if (IsClosedStream(stream_id)) { + // When CloseStream has been called recursively (via + // ReliableQuicStream::OnClose), the stream is already closed so return. + return; + } + write_blocked_streams()->UnregisterStream(stream_id); + net::QuicSession::CloseStream(stream_id); +} + ReliableQuicStream* QuicSession::CreateIncomingDynamicStream( net::QuicStreamId id) { - ReliableQuicStream* stream = CreateDataStream(id); + ReliableQuicStream* stream = CreateDataStream(id, kDefaultPriority); if (stream) { SignalIncomingStream(stream); } @@ -73,17 +87,23 @@ ReliableQuicStream* QuicSession::CreateIncomingDynamicStream( ReliableQuicStream* QuicSession::CreateOutgoingDynamicStream( net::SpdyPriority priority) { - return CreateDataStream(GetNextOutgoingStreamId()); + return CreateDataStream(GetNextOutgoingStreamId(), priority); } -ReliableQuicStream* QuicSession::CreateDataStream(net::QuicStreamId id) { +ReliableQuicStream* QuicSession::CreateDataStream(net::QuicStreamId id, + net::SpdyPriority priority) { if (crypto_stream_ == nullptr || !crypto_stream_->encryption_established()) { // Encryption not active so no stream created return nullptr; } ReliableQuicStream* stream = new ReliableQuicStream(id, this); if (stream) { - ActivateStream(stream); // QuicSession owns the stream. + // Make QuicSession take ownership of the stream. + ActivateStream(stream); + // Register the stream to the QuicWriteBlockedList. |priority| is clamped + // between 0 and 7, with 0 being the highest priority and 7 the lowest + // priority. + write_blocked_streams()->RegisterStream(stream->id(), priority); } return stream; } diff --git a/webrtc/p2p/quic/quicsession.h b/webrtc/p2p/quic/quicsession.h index a50eb6bb2c..f8b27e055e 100644 --- a/webrtc/p2p/quic/quicsession.h +++ b/webrtc/p2p/quic/quicsession.h @@ -49,6 +49,7 @@ class QuicSession : public net::QuicSession, public sigslot::has_slots<> { // QuicSession optional overrides. void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override; + void CloseStream(net::QuicStreamId stream_id) override; // QuicConnectionVisitorInterface overrides. void OnConnectionClosed(net::QuicErrorCode error, @@ -81,7 +82,8 @@ class QuicSession : public net::QuicSession, public sigslot::has_slots<> { ReliableQuicStream* CreateIncomingDynamicStream( net::QuicStreamId id) override; - virtual ReliableQuicStream* CreateDataStream(net::QuicStreamId id); + virtual ReliableQuicStream* CreateDataStream(net::QuicStreamId id, + net::SpdyPriority priority); private: std::unique_ptr crypto_stream_; diff --git a/webrtc/p2p/quic/quictransportchannel.cc b/webrtc/p2p/quic/quictransportchannel.cc index aa6d037f75..b3f918071c 100644 --- a/webrtc/p2p/quic/quictransportchannel.cc +++ b/webrtc/p2p/quic/quictransportchannel.cc @@ -583,7 +583,7 @@ void QuicTransportChannel::set_quic_state(QuicTransportState state) { ReliableQuicStream* QuicTransportChannel::CreateQuicStream() { if (quic_) { - net::SpdyPriority priority = 0; // Priority of the QUIC stream (not used) + net::SpdyPriority priority = 0; // Priority of the QUIC stream return quic_->CreateOutgoingDynamicStream(priority); } return nullptr; diff --git a/webrtc/p2p/quic/quictransportchannel_unittest.cc b/webrtc/p2p/quic/quictransportchannel_unittest.cc index 45d3db87f3..dba07ebf2b 100644 --- a/webrtc/p2p/quic/quictransportchannel_unittest.cc +++ b/webrtc/p2p/quic/quictransportchannel_unittest.cc @@ -95,7 +95,8 @@ class QuicTestPeer : public sigslot::has_slots<> { : name_(name), bytes_sent_(0), ice_channel_(name_, 0), - quic_channel_(&ice_channel_) { + quic_channel_(&ice_channel_), + incoming_stream_count_(0) { quic_channel_.SignalReadPacket.connect( this, &QuicTestPeer::OnTransportChannelReadPacket); quic_channel_.SignalIncomingStream.connect(this, @@ -204,6 +205,8 @@ class QuicTestPeer : public sigslot::has_slots<> { ReliableQuicStream* incoming_quic_stream() { return incoming_quic_stream_; } + size_t incoming_stream_count() const { return incoming_stream_count_; } + bool signal_closed_emitted() const { return signal_closed_emitted_; } private: @@ -220,6 +223,7 @@ class QuicTestPeer : public sigslot::has_slots<> { } void OnIncomingStream(ReliableQuicStream* stream) { incoming_quic_stream_ = stream; + ++incoming_stream_count_; } void OnClosed() { signal_closed_emitted_ = true; } @@ -230,6 +234,7 @@ class QuicTestPeer : public sigslot::has_slots<> { QuicTransportChannel quic_channel_; // QUIC channel to test. std::unique_ptr local_fingerprint_; ReliableQuicStream* incoming_quic_stream_ = nullptr; + size_t incoming_stream_count_; bool signal_closed_emitted_ = false; }; @@ -514,6 +519,29 @@ TEST_F(QuicTransportChannelTest, CreateOutgoingAndIncomingQuicStream) { EXPECT_EQ(stream->id(), peer2_.incoming_quic_stream()->id()); } +// Test that if the QuicTransportChannel is unwritable, then all outgoing QUIC +// streams can send data once the QuicTransprotChannel becomes writable again. +TEST_F(QuicTransportChannelTest, OutgoingQuicStreamSendsDataAfterReconnect) { + Connect(); + ASSERT_TRUE_WAIT(quic_connected(), kTimeoutMs); + ReliableQuicStream* stream1 = peer1_.quic_channel()->CreateQuicStream(); + ASSERT_NE(nullptr, stream1); + ReliableQuicStream* stream2 = peer1_.quic_channel()->CreateQuicStream(); + ASSERT_NE(nullptr, stream2); + + peer1_.ice_channel()->SetWritable(false); + stream1->Write("First", 5); + EXPECT_EQ(5u, stream1->queued_data_bytes()); + stream2->Write("Second", 6); + EXPECT_EQ(6u, stream2->queued_data_bytes()); + EXPECT_EQ(0u, peer2_.incoming_stream_count()); + + peer1_.ice_channel()->SetWritable(true); + EXPECT_EQ_WAIT(0u, stream1->queued_data_bytes(), kTimeoutMs); + EXPECT_EQ_WAIT(0u, stream2->queued_data_bytes(), kTimeoutMs); + EXPECT_EQ_WAIT(2u, peer2_.incoming_stream_count(), kTimeoutMs); +} + // Test that SignalClosed is emitted when the QuicConnection closes. TEST_F(QuicTransportChannelTest, SignalClosedEmitted) { Connect();