diff --git a/webrtc/p2p/quic/quicsession.cc b/webrtc/p2p/quic/quicsession.cc index 281af5e3a7..a70aa0b01d 100644 --- a/webrtc/p2p/quic/quicsession.cc +++ b/webrtc/p2p/quic/quicsession.cc @@ -75,7 +75,7 @@ ReliableQuicStream* QuicSession::CreateOutgoingDynamicStream( net::SpdyPriority priority) { ReliableQuicStream* stream = CreateDataStream(GetNextOutgoingStreamId()); if (stream) { - ActivateStream(stream); + ActivateStream(stream); // QuicSession owns the stream. } return stream; } diff --git a/webrtc/p2p/quic/quicsession_unittest.cc b/webrtc/p2p/quic/quicsession_unittest.cc index 2e2d6ae0a6..04b7d1ee7f 100644 --- a/webrtc/p2p/quic/quicsession_unittest.cc +++ b/webrtc/p2p/quic/quicsession_unittest.cc @@ -450,3 +450,17 @@ TEST_F(QuicSessionTest, CannotCreateDataStreamBeforeHandshake) { EXPECT_EQ(nullptr, server_peer_->CreateOutgoingDynamicStream(5)); EXPECT_EQ(nullptr, client_peer_->CreateOutgoingDynamicStream(5)); } + +// Test that closing a QUIC stream causes the QuicSession to remove it. +TEST_F(QuicSessionTest, CloseQuicStream) { + CreateClientAndServerSessions(); + StartHandshake(true, true); + ASSERT_TRUE_WAIT(client_peer_->IsCryptoHandshakeConfirmed() && + server_peer_->IsCryptoHandshakeConfirmed(), + kTimeoutMs); + ReliableQuicStream* stream = client_peer_->CreateOutgoingDynamicStream(5); + ASSERT_NE(nullptr, stream); + EXPECT_FALSE(client_peer_->IsClosedStream(stream->id())); + stream->Close(); + EXPECT_TRUE(client_peer_->IsClosedStream(stream->id())); +} diff --git a/webrtc/p2p/quic/quictransportchannel.cc b/webrtc/p2p/quic/quictransportchannel.cc index cc0576ddb2..446fd4201c 100644 --- a/webrtc/p2p/quic/quictransportchannel.cc +++ b/webrtc/p2p/quic/quictransportchannel.cc @@ -443,6 +443,8 @@ bool QuicTransportChannel::CreateQuicSession() { this, &QuicTransportChannel::OnHandshakeComplete); quic_->SignalConnectionClosed.connect( this, &QuicTransportChannel::OnConnectionClosed); + quic_->SignalIncomingStream.connect(this, + &QuicTransportChannel::OnIncomingStream); return true; } @@ -541,6 +543,7 @@ void QuicTransportChannel::OnConnectionClosed(net::QuicErrorCode error, // does not close due to failure. set_quic_state(QUIC_TRANSPORT_CLOSED); set_writable(false); + SignalClosed(); } void QuicTransportChannel::OnProofValid( @@ -569,4 +572,16 @@ void QuicTransportChannel::set_quic_state(QuicTransportState state) { quic_state_ = state; } +ReliableQuicStream* QuicTransportChannel::CreateQuicStream() { + if (quic_) { + net::SpdyPriority priority = 0; // Priority of the QUIC stream (not used) + return quic_->CreateOutgoingDynamicStream(priority); + } + return nullptr; +} + +void QuicTransportChannel::OnIncomingStream(ReliableQuicStream* stream) { + SignalIncomingStream(stream); +} + } // namespace cricket diff --git a/webrtc/p2p/quic/quictransportchannel.h b/webrtc/p2p/quic/quictransportchannel.h index ab02c77b95..430ed8d9c4 100644 --- a/webrtc/p2p/quic/quictransportchannel.h +++ b/webrtc/p2p/quic/quictransportchannel.h @@ -48,7 +48,7 @@ enum QuicTransportState { // TransportChannelImpl* channel_; // } // -// - Data written to SendPacket() is passed directly to |channel_| if it is +// - Data written to SendPacket() is passed directly to |channel_| if it is // an SRTP packet with the PF_SRTP_BYPASS flag. // // - |quic_| passes outgoing packets to WritePacket(), which transfers them @@ -61,8 +61,11 @@ enum QuicTransportState { // - When the QUIC handshake is completed, quic_state() returns // QUIC_TRANSPORT_CONNECTED and SRTP keying material can be exported. // -// TODO(mikescarlett): Implement secure QUIC handshake, 0-RTT handshakes, and -// QUIC data streams. +// - CreateQuicStream() creates an outgoing QUIC stream. Once the local peer +// sends data from this stream, the remote peer emits SignalIncomingStream +// with a QUIC stream of the same id to handle received data. +// +// TODO(mikescarlett): Implement secure QUIC handshake and 0-RTT handshakes. class QuicTransportChannel : public TransportChannelImpl, public net::QuicPacketWriter, public net::QuicCryptoClientStream::ProofHandler { @@ -113,8 +116,9 @@ class QuicTransportChannel : public TransportChannelImpl, size_t result_len) override; // TODO(mikescarlett): Remove this method once TransportChannel does not // require defining it. - bool GetRemoteSSLCertificate(rtc::SSLCertificate** cert) const override { - return false; + rtc::scoped_ptr GetRemoteSSLCertificate() + const override { + return nullptr; } // TransportChannelImpl overrides that we forward to the wrapped transport. @@ -206,6 +210,14 @@ class QuicTransportChannel : public TransportChannelImpl, void OnCanWrite(); // Connectivity state of QuicTransportChannel. QuicTransportState quic_state() const { return quic_state_; } + // Creates a new QUIC stream that can send data. + ReliableQuicStream* CreateQuicStream(); + + // Emitted when |quic_| creates a QUIC stream to receive data from the remote + // peer, when the stream did not exist previously. + sigslot::signal1 SignalIncomingStream; + // Emitted when the QuicTransportChannel state becomes QUIC_TRANSPORT_CLOSED. + sigslot::signal0<> SignalClosed; private: // Fingerprint of remote peer. @@ -241,6 +253,8 @@ class QuicTransportChannel : public TransportChannelImpl, void OnHandshakeComplete(); // Called when |quic_| has closed the connection. void OnConnectionClosed(net::QuicErrorCode error, bool from_peer); + // Called when |quic_| has created a new QUIC stream for incoming data. + void OnIncomingStream(ReliableQuicStream* stream); // Called by OnReadPacket() when a QUIC packet is received. bool HandleQuicPacket(const char* data, size_t size); diff --git a/webrtc/p2p/quic/quictransportchannel_unittest.cc b/webrtc/p2p/quic/quictransportchannel_unittest.cc index 7b5b51848a..c64aa40f94 100644 --- a/webrtc/p2p/quic/quictransportchannel_unittest.cc +++ b/webrtc/p2p/quic/quictransportchannel_unittest.cc @@ -23,6 +23,7 @@ using cricket::ConnectionRole; using cricket::IceRole; using cricket::QuicTransportChannel; +using cricket::ReliableQuicStream; using cricket::TransportChannel; using cricket::TransportDescription; @@ -97,6 +98,9 @@ class QuicTestPeer : public sigslot::has_slots<> { quic_channel_(&ice_channel_) { quic_channel_.SignalReadPacket.connect( this, &QuicTestPeer::OnTransportChannelReadPacket); + quic_channel_.SignalIncomingStream.connect(this, + &QuicTestPeer::OnIncomingStream); + quic_channel_.SignalClosed.connect(this, &QuicTestPeer::OnClosed); ice_channel_.SetAsync(true); rtc::scoped_refptr local_cert = rtc::RTCCertificate::Create(rtc::scoped_ptr( @@ -198,8 +202,12 @@ class QuicTestPeer : public sigslot::has_slots<> { return local_fingerprint_; } + ReliableQuicStream* incoming_quic_stream() { return incoming_quic_stream_; } + + bool signal_closed_emitted() const { return signal_closed_emitted_; } + private: - // QUIC channel callback. + // QuicTransportChannel callbacks. void OnTransportChannelReadPacket(TransportChannel* channel, const char* data, size_t size, @@ -210,6 +218,10 @@ class QuicTestPeer : public sigslot::has_slots<> { int expected_flags = IsRtpLeadByte(data[0]) ? cricket::PF_SRTP_BYPASS : 0; ASSERT_EQ(expected_flags, flags); } + void OnIncomingStream(ReliableQuicStream* stream) { + incoming_quic_stream_ = stream; + } + void OnClosed() { signal_closed_emitted_ = true; } std::string name_; // Channel name. size_t bytes_sent_; // Bytes sent by QUIC channel. @@ -217,6 +229,8 @@ class QuicTestPeer : public sigslot::has_slots<> { FailableTransportChannel ice_channel_; // Simulates an ICE channel. QuicTransportChannel quic_channel_; // QUIC channel to test. rtc::scoped_ptr local_fingerprint_; + ReliableQuicStream* incoming_quic_stream_ = nullptr; + bool signal_closed_emitted_ = false; }; class QuicTransportChannelTest : public testing::Test { @@ -486,3 +500,29 @@ TEST_F(QuicTransportChannelTest, IceReceivingBeforeConnected) { ASSERT_TRUE_WAIT(quic_connected(), kTimeoutMs); EXPECT_TRUE(peer1_.quic_channel()->receiving()); } + +// Test that when peer 1 creates an outgoing stream, peer 2 creates an incoming +// QUIC stream with the same ID and fires OnIncomingStream. +TEST_F(QuicTransportChannelTest, CreateOutgoingAndIncomingQuicStream) { + Connect(); + EXPECT_EQ(nullptr, peer1_.quic_channel()->CreateQuicStream()); + ASSERT_TRUE_WAIT(quic_connected(), kTimeoutMs); + ReliableQuicStream* stream = peer1_.quic_channel()->CreateQuicStream(); + ASSERT_NE(nullptr, stream); + stream->Write("Hi", 2); + EXPECT_TRUE_WAIT(peer2_.incoming_quic_stream() != nullptr, kTimeoutMs); + EXPECT_EQ(stream->id(), peer2_.incoming_quic_stream()->id()); +} + +// Test that SignalClosed is emitted when the QuicConnection closes. +TEST_F(QuicTransportChannelTest, SignalClosedEmitted) { + Connect(); + ASSERT_TRUE_WAIT(quic_connected(), kTimeoutMs); + ASSERT_FALSE(peer1_.signal_closed_emitted()); + ReliableQuicStream* stream = peer1_.quic_channel()->CreateQuicStream(); + ASSERT_NE(nullptr, stream); + stream->CloseConnectionWithDetails(net::QuicErrorCode::QUIC_NO_ERROR, + "Closing QUIC for testing"); + EXPECT_TRUE(peer1_.signal_closed_emitted()); + EXPECT_TRUE_WAIT(peer2_.signal_closed_emitted(), kTimeoutMs); +} diff --git a/webrtc/p2p/quic/reliablequicstream.cc b/webrtc/p2p/quic/reliablequicstream.cc index ca2e3f0d3b..ed44388edb 100644 --- a/webrtc/p2p/quic/reliablequicstream.cc +++ b/webrtc/p2p/quic/reliablequicstream.cc @@ -12,6 +12,7 @@ #include +#include "net/quic/quic_session.h" #include "webrtc/base/checks.h" namespace cricket { @@ -38,14 +39,26 @@ void ReliableQuicStream::OnClose() { SignalClosed(id(), connection_error()); } -rtc::StreamResult ReliableQuicStream::Write(const char* data, size_t len) { +rtc::StreamResult ReliableQuicStream::Write(const char* data, + size_t len, + bool fin) { // Writes the data, or buffers it. - WriteOrBufferData(std::string(data, len), false, nullptr); + WriteOrBufferData(std::string(data, len), fin, nullptr); if (HasBufferedData()) { return rtc::StreamResult(rtc::SR_BLOCK); } - return rtc::StreamResult(rtc::SR_SUCCESS); } +void ReliableQuicStream::Close() { + net::ReliableQuicStream::session()->CloseStream(id()); +} + +void ReliableQuicStream::OnCanWrite() { + uint64_t prev_queued_bytes = queued_data_bytes(); + net::ReliableQuicStream::OnCanWrite(); + uint64_t queued_bytes_written = prev_queued_bytes - queued_data_bytes(); + SignalQueuedBytesWritten(id(), queued_bytes_written); +} + } // namespace cricket diff --git a/webrtc/p2p/quic/reliablequicstream.h b/webrtc/p2p/quic/reliablequicstream.h index 61d060f850..9493a3fcc1 100644 --- a/webrtc/p2p/quic/reliablequicstream.h +++ b/webrtc/p2p/quic/reliablequicstream.h @@ -29,16 +29,23 @@ class ReliableQuicStream : public net::ReliableQuicStream, // ReliableQuicStream overrides. void OnDataAvailable() override; void OnClose() override; + void OnCanWrite() override; // Process decrypted data into encrypted QUIC packets, which get sent to the // QuicPacketWriter. rtc::SR_BLOCK is returned if the operation blocks instead // of writing, in which case the data is queued until OnCanWrite() is called. - rtc::StreamResult Write(const char* data, size_t len); + // If |fin| == true, then this stream closes after sending data. + rtc::StreamResult Write(const char* data, size_t len, bool fin = false); + // Removes this stream from the QuicSession's stream map. + void Close(); // Called when decrypted data is ready to be read. sigslot::signal3 SignalDataReceived; - // Called when stream closed. - sigslot::signal2 SignalClosed; + // Called when the stream is closed. + sigslot::signal2 SignalClosed; + // Emits the number of queued bytes that were written by OnCanWrite(), after + // the stream was previously write blocked. + sigslot::signal2 SignalQueuedBytesWritten; private: RTC_DISALLOW_COPY_AND_ASSIGN(ReliableQuicStream); diff --git a/webrtc/p2p/quic/reliablequicstream_unittest.cc b/webrtc/p2p/quic/reliablequicstream_unittest.cc index aeb3e1af87..b887589457 100644 --- a/webrtc/p2p/quic/reliablequicstream_unittest.cc +++ b/webrtc/p2p/quic/reliablequicstream_unittest.cc @@ -163,6 +163,8 @@ class ReliableQuicStreamTest : public ::testing::Test, stream_->SignalDataReceived.connect( this, &ReliableQuicStreamTest::OnDataReceived); stream_->SignalClosed.connect(this, &ReliableQuicStreamTest::OnClosed); + stream_->SignalQueuedBytesWritten.connect( + this, &ReliableQuicStreamTest::OnQueuedBytesWritten); session_->register_write_blocked_stream(stream_->id(), kDefaultPriority); } @@ -172,7 +174,11 @@ class ReliableQuicStreamTest : public ::testing::Test, read_buffer_.append(data, length); } - void OnClosed(QuicStreamId id, QuicErrorCode err) { closed_ = true; } + void OnClosed(QuicStreamId id, int err) { closed_ = true; } + + void OnQueuedBytesWritten(QuicStreamId id, uint64_t queued_bytes_written) { + queued_bytes_written_ = queued_bytes_written; + } protected: rtc::scoped_ptr stream_; @@ -184,6 +190,8 @@ class ReliableQuicStreamTest : public ::testing::Test, std::string read_buffer_; // Whether the ReliableQuicStream is closed. bool closed_ = false; + // Bytes written by OnCanWrite(). + uint64_t queued_bytes_written_; }; // Write an entire string. @@ -213,6 +221,7 @@ TEST_F(ReliableQuicStreamTest, BufferData) { session_->set_writable(true); stream_->OnCanWrite(); + EXPECT_EQ(7ul, queued_bytes_written_); EXPECT_FALSE(stream_->HasBufferedData()); EXPECT_EQ("Foo bar", write_buffer_);