diff --git a/webrtc/api/api.gyp b/webrtc/api/api.gyp index 1267595327..e705831b0a 100644 --- a/webrtc/api/api.gyp +++ b/webrtc/api/api.gyp @@ -240,6 +240,20 @@ }, }, }], + ['use_quic==1', { + 'dependencies': [ + '<(DEPTH)/third_party/libquic/libquic.gyp:libquic', + ], + 'sources': [ + 'quicdatachannel.cc', + 'quicdatachannel.h', + 'quicdatatransport.cc', + 'quicdatatransport.h', + ], + 'export_dependent_settings': [ + '<(DEPTH)/third_party/libquic/libquic.gyp:libquic', + ], + }], ], }, # target libjingle_peerconnection ], # targets diff --git a/webrtc/api/api_tests.gyp b/webrtc/api/api_tests.gyp index f59eaec0f2..44344201c6 100644 --- a/webrtc/api/api_tests.gyp +++ b/webrtc/api/api_tests.gyp @@ -113,6 +113,18 @@ }, }, }], + ['use_quic==1', { + 'dependencies': [ + '<(DEPTH)/third_party/libquic/libquic.gyp:libquic', + ], + 'sources': [ + 'quicdatachannel_unittest.cc', + 'quicdatatransport_unittest.cc', + ], + 'export_dependent_settings': [ + '<(DEPTH)/third_party/libquic/libquic.gyp:libquic', + ], + }], ], # conditions }, # target peerconnection_unittests ], # targets diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc new file mode 100644 index 0000000000..5420da15ea --- /dev/null +++ b/webrtc/api/quicdatachannel.cc @@ -0,0 +1,391 @@ +/* + * Copyright 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/api/quicdatachannel.h" + +#include "webrtc/base/bind.h" +#include "webrtc/base/bytebuffer.h" +#include "webrtc/base/copyonwritebuffer.h" +#include "webrtc/base/logging.h" +#include "webrtc/p2p/quic/quictransportchannel.h" +#include "webrtc/p2p/quic/reliablequicstream.h" + +namespace webrtc { + +void WriteQuicDataChannelMessageHeader(int data_channel_id, + uint64_t message_id, + rtc::CopyOnWriteBuffer* header) { + RTC_DCHECK(header); + // 64-bit varints require at most 10 bytes (7*10 == 70), and 32-bit varints + // require at most 5 bytes (7*5 == 35). + size_t max_length = 15; + rtc::ByteBufferWriter byte_buffer(nullptr, max_length, + rtc::ByteBuffer::ByteOrder::ORDER_HOST); + byte_buffer.WriteUVarint(data_channel_id); + byte_buffer.WriteUVarint(message_id); + header->SetData(byte_buffer.Data(), byte_buffer.Length()); +} + +bool ParseQuicDataMessageHeader(const char* data, + size_t len, + int* data_channel_id, + uint64_t* message_id, + size_t* bytes_read) { + RTC_DCHECK(data_channel_id); + RTC_DCHECK(message_id); + RTC_DCHECK(bytes_read); + + rtc::ByteBufferReader byte_buffer(data, len, rtc::ByteBuffer::ORDER_HOST); + uint64_t dcid; + if (!byte_buffer.ReadUVarint(&dcid)) { + LOG(LS_ERROR) << "Could not read the data channel ID"; + return false; + } + *data_channel_id = dcid; + if (!byte_buffer.ReadUVarint(message_id)) { + LOG(LS_ERROR) << "Could not read message ID for data channel " + << *data_channel_id; + return false; + } + size_t remaining_bytes = byte_buffer.Length(); + *bytes_read = len - remaining_bytes; + return true; +} + +QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, + rtc::Thread* worker_thread, + const std::string& label, + const DataChannelInit& config) + : signaling_thread_(signaling_thread), + worker_thread_(worker_thread), + id_(config.id), + state_(kConnecting), + buffered_amount_(0), + next_message_id_(0), + label_(label), + protocol_(config.protocol) {} + +QuicDataChannel::~QuicDataChannel() {} + +void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + observer_ = observer; +} + +void QuicDataChannel::UnregisterObserver() { + RTC_DCHECK(signaling_thread_->IsCurrent()); + observer_ = nullptr; +} + +bool QuicDataChannel::Send(const DataBuffer& buffer) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + if (state_ != kOpen) { + LOG(LS_ERROR) << "QUIC data channel " << id_ + << " is not open so cannot send."; + return false; + } + return worker_thread_->Invoke( + rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); +} + +bool QuicDataChannel::Send_w(const DataBuffer& buffer) { + RTC_DCHECK(worker_thread_->IsCurrent()); + + // Encode and send the header containing the data channel ID and message ID. + rtc::CopyOnWriteBuffer header; + WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); + RTC_DCHECK(quic_transport_channel_); + cricket::ReliableQuicStream* stream = + quic_transport_channel_->CreateQuicStream(); + RTC_DCHECK(stream); + + // Send the header with a FIN if the message is empty. + bool header_fin = (buffer.size() == 0); + rtc::StreamResult header_result = + stream->Write(header.data(), header.size(), header_fin); + + if (header_result == rtc::SR_BLOCK) { + // The header is write blocked but we should try sending the message. Since + // the ReliableQuicStream queues data in order, if the header is write + // blocked then the message will be write blocked. Otherwise if the message + // is sent then the header is sent. + LOG(LS_INFO) << "Stream " << stream->id() + << " header is write blocked for QUIC data channel " << id_; + } else if (header_result != rtc::SR_SUCCESS) { + LOG(LS_ERROR) << "Stream " << stream->id() + << " failed to write header for QUIC data channel " << id_ + << ". Unexpected error " << header_result; + return false; + } + + // If the message is not empty, then send the message with a FIN. + bool message_fin = true; + rtc::StreamResult message_result = + header_fin ? header_result : stream->Write(buffer.data.data(), + buffer.size(), message_fin); + + if (message_result == rtc::SR_SUCCESS) { + // The message is sent and we don't need this QUIC stream. + LOG(LS_INFO) << "Stream " << stream->id() + << " successfully wrote message for QUIC data channel " << id_; + stream->Close(); + return true; + } + // TODO(mikescarlett): Register the ReliableQuicStream's priority to the + // QuicWriteBlockedList so that the QUIC session doesn't drop messages when + // the QUIC transport channel becomes unwritable. + if (message_result == rtc::SR_BLOCK) { + // The QUIC stream is write blocked, so the message is queued by the QUIC + // session. If this is due to the QUIC not being writable, it will be sent + // once QUIC becomes writable again. Otherwise it may be due to exceeding + // the QUIC flow control limit, in which case the remote peer's QUIC session + // will tell the QUIC stream to send more data. + LOG(LS_INFO) << "Stream " << stream->id() + << " message is write blocked for QUIC data channel " << id_; + SetBufferedAmount_w(buffered_amount_ + stream->queued_data_bytes()); + stream->SignalQueuedBytesWritten.connect( + this, &QuicDataChannel::OnQueuedBytesWritten); + write_blocked_quic_streams_[stream->id()] = stream; + // The QUIC stream will be removed from |write_blocked_quic_streams_| once + // it closes. + stream->SignalClosed.connect(this, + &QuicDataChannel::OnWriteBlockedStreamClosed); + return true; + } + LOG(LS_ERROR) << "Stream " << stream->id() + << " failed to write message for QUIC data channel " << id_ + << ". Unexpected error: " << message_result; + return false; +} + +void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id, + uint64_t queued_bytes_written) { + RTC_DCHECK(worker_thread_->IsCurrent()); + SetBufferedAmount_w(buffered_amount_ - queued_bytes_written); + const auto& kv = write_blocked_quic_streams_.find(stream_id); + if (kv == write_blocked_quic_streams_.end()) { + RTC_DCHECK(false); + return; + } + cricket::ReliableQuicStream* stream = kv->second; + // True if the QUIC stream is done sending data. + if (stream->fin_sent()) { + LOG(LS_INFO) << "Stream " << stream->id() + << " successfully wrote data for QUIC data channel " << id_; + stream->Close(); + } +} + +void QuicDataChannel::SetBufferedAmount_w(uint64_t buffered_amount) { + RTC_DCHECK(worker_thread_->IsCurrent()); + buffered_amount_ = buffered_amount; + invoker_.AsyncInvoke( + signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s, + this, buffered_amount)); +} + +void QuicDataChannel::Close() { + RTC_DCHECK(signaling_thread_->IsCurrent()); + if (state_ == kClosed || state_ == kClosing) { + return; + } + LOG(LS_INFO) << "Closing QUIC data channel."; + SetState_s(kClosing); + worker_thread_->Invoke(rtc::Bind(&QuicDataChannel::Close_w, this)); + SetState_s(kClosed); +} + +void QuicDataChannel::Close_w() { + RTC_DCHECK(worker_thread_->IsCurrent()); + for (auto& kv : incoming_quic_messages_) { + Message& message = kv.second; + cricket::ReliableQuicStream* stream = message.stream; + stream->Close(); + } + + for (auto& kv : write_blocked_quic_streams_) { + cricket::ReliableQuicStream* stream = kv.second; + stream->Close(); + } +} + +bool QuicDataChannel::SetTransportChannel( + cricket::QuicTransportChannel* channel) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + + if (!channel) { + LOG(LS_ERROR) << "|channel| is NULL. Cannot set transport channel."; + return false; + } + if (quic_transport_channel_) { + if (channel == quic_transport_channel_) { + LOG(LS_WARNING) << "Ignoring duplicate transport channel."; + return true; + } + LOG(LS_ERROR) << "|channel| does not match existing transport channel."; + return false; + } + + quic_transport_channel_ = channel; + LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; + DataState data_channel_state = worker_thread_->Invoke( + rtc::Bind(&QuicDataChannel::SetTransportChannel_w, this)); + SetState_s(data_channel_state); + return true; +} + +DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() { + RTC_DCHECK(worker_thread_->IsCurrent()); + quic_transport_channel_->SignalReadyToSend.connect( + this, &QuicDataChannel::OnReadyToSend); + quic_transport_channel_->SignalClosed.connect( + this, &QuicDataChannel::OnConnectionClosed); + if (quic_transport_channel_->writable()) { + return kOpen; + } + return kConnecting; +} + +void QuicDataChannel::OnIncomingMessage(Message&& message) { + RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(message.stream); + if (!observer_) { + LOG(LS_WARNING) << "QUIC data channel " << id_ + << " received a message but has no observer."; + message.stream->Close(); + return; + } + // A FIN is received if the message fits into a single QUIC stream frame and + // the remote peer is done sending. + if (message.stream->fin_received()) { + LOG(LS_INFO) << "Stream " << message.stream->id() + << " has finished receiving data for QUIC data channel " + << id_; + DataBuffer final_message(message.buffer, false); + invoker_.AsyncInvoke(signaling_thread_, + rtc::Bind(&QuicDataChannel::OnMessage_s, this, + std::move(final_message))); + message.stream->Close(); + return; + } + // Otherwise the message is divided across multiple QUIC stream frames, so + // queue the data. OnDataReceived() will be called each time the remaining + // QUIC stream frames arrive. + LOG(LS_INFO) << "QUIC data channel " << id_ + << " is queuing incoming data for stream " + << message.stream->id(); + incoming_quic_messages_[message.stream->id()] = std::move(message); + message.stream->SignalDataReceived.connect(this, + &QuicDataChannel::OnDataReceived); + // The QUIC stream will be removed from |incoming_quic_messages_| once it + // closes. + message.stream->SignalClosed.connect( + this, &QuicDataChannel::OnIncomingQueuedStreamClosed); +} + +void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, + const char* data, + size_t len) { + RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(data); + const auto& kv = incoming_quic_messages_.find(stream_id); + if (kv == incoming_quic_messages_.end()) { + RTC_DCHECK(false); + return; + } + Message& message = kv->second; + cricket::ReliableQuicStream* stream = message.stream; + rtc::CopyOnWriteBuffer& received_data = message.buffer; + // If the QUIC stream has not received a FIN, then the remote peer is not + // finished sending data. + if (!stream->fin_received()) { + received_data.AppendData(data, len); + return; + } + // Otherwise we are done receiving and can provide the data channel observer + // with the message. + LOG(LS_INFO) << "Stream " << stream_id + << " has finished receiving data for QUIC data channel " << id_; + received_data.AppendData(data, len); + DataBuffer final_message(std::move(received_data), false); + invoker_.AsyncInvoke( + signaling_thread_, + rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); + // Once the stream is closed, OnDataReceived will not fire for the stream. + stream->Close(); +} + +void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { + RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(channel == quic_transport_channel_); + LOG(LS_INFO) << "QuicTransportChannel is ready to send"; + invoker_.AsyncInvoke( + signaling_thread_, rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); +} + +void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, + int error) { + RTC_DCHECK(worker_thread_->IsCurrent()); + LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; + write_blocked_quic_streams_.erase(stream_id); +} + +void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, + int error) { + RTC_DCHECK(worker_thread_->IsCurrent()); + LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; + incoming_quic_messages_.erase(stream_id); +} + +void QuicDataChannel::OnConnectionClosed() { + RTC_DCHECK(worker_thread_->IsCurrent()); + invoker_.AsyncInvoke(signaling_thread_, + rtc::Bind(&QuicDataChannel::Close, this)); +} + +void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + if (observer_) { + observer_->OnMessage(received_data); + } +} + +void QuicDataChannel::SetState_s(DataState state) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + if (state_ == state || state_ == kClosed) { + return; + } + if (state_ == kClosing && state != kClosed) { + return; + } + LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel " + << id_; + state_ = state; + if (observer_) { + observer_->OnStateChange(); + } +} + +void QuicDataChannel::OnBufferedAmountChange_s(uint64_t buffered_amount) { + RTC_DCHECK(signaling_thread_->IsCurrent()); + if (observer_) { + observer_->OnBufferedAmountChange(buffered_amount); + } +} + +size_t QuicDataChannel::GetNumWriteBlockedStreams() const { + return write_blocked_quic_streams_.size(); +} + +size_t QuicDataChannel::GetNumIncomingStreams() const { + return incoming_quic_messages_.size(); +} + +} // namespace webrtc diff --git a/webrtc/api/quicdatachannel.h b/webrtc/api/quicdatachannel.h new file mode 100644 index 0000000000..a6b987b144 --- /dev/null +++ b/webrtc/api/quicdatachannel.h @@ -0,0 +1,215 @@ +/* + * Copyright 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_API_QUICDATACHANNEL_H_ +#define WEBRTC_API_QUICDATACHANNEL_H_ + +#include +#include +#include + +#include "webrtc/api/datachannelinterface.h" +#include "webrtc/base/asyncinvoker.h" +#include "webrtc/base/sigslot.h" +#include "webrtc/base/thread.h" + +namespace cricket { +class QuicTransportChannel; +class ReliableQuicStream; +class TransportChannel; +} // namepsace cricket + +namespace net { +// TODO(mikescarlett): Make this uint64_t once QUIC uses 64-bit ids. +typedef uint32_t QuicStreamId; +} // namespace net + +namespace rtc { +class CopyOnWriteBuffer; +} // namespace rtc + +namespace webrtc { + +// Encodes a QUIC message header with the data channel ID and message ID, then +// stores the result in |header|. +void WriteQuicDataChannelMessageHeader(int data_channel_id, + uint64_t message_id, + rtc::CopyOnWriteBuffer* header); + +// Decodes the data channel ID and message ID from the initial data received by +// an incoming QUIC stream. The data channel ID is output to |data_channel_id|, +// the message ID is output to |message_id|, and the number of bytes read is +// output to |bytes_read|. Returns false if either ID cannot be read. +bool ParseQuicDataMessageHeader(const char* data, + size_t len, + int* data_channel_id, + uint64_t* message_id, + size_t* bytes_read); + +// QuicDataChannel is an implementation of DataChannelInterface based on the +// QUIC protocol. It uses a QuicTransportChannel to establish encryption and +// transfer data, and a QuicDataTransport to receive incoming messages at +// the correct data channel. Currently this class implements unordered, reliable +// delivery and does not send an "OPEN" message. +// +// Each time a message is sent: +// +// - The QuicDataChannel prepends it with the data channel id and message id. +// The QuicTransportChannel creates a ReliableQuicStream, then the +// ReliableQuicStream sends the message with a FIN. +// +// - The remote QuicSession creates a ReliableQuicStream to receive the data. +// The remote QuicDataTransport dispatches the ReliableQuicStream to the +// QuicDataChannel with the same id as this data channel. +// +// - The remote QuicDataChannel queues data from the ReliableQuicStream. Once +// it receives a QUIC stream frame with a FIN, it provides the message to the +// DataChannelObserver. +// +// TODO(mikescarlett): Implement ordered delivery, unreliable delivery, and +// an OPEN message similar to the one for SCTP. +class QuicDataChannel : public rtc::RefCountedObject, + public sigslot::has_slots<> { + public: + // Message stores buffered data from the incoming QUIC stream. The QUIC stream + // is provided so that remaining data can be received from the remote peer. + struct Message { + uint64_t id; + rtc::CopyOnWriteBuffer buffer; + cricket::ReliableQuicStream* stream; + }; + + QuicDataChannel(rtc::Thread* signaling_thread, + rtc::Thread* worker_thread, + const std::string& label, + const DataChannelInit& config); + ~QuicDataChannel() override; + + // DataChannelInterface overrides. + std::string label() const override { return label_; } + bool reliable() const override { return true; } + bool ordered() const override { return false; } + uint16_t maxRetransmitTime() const override { return -1; } + uint16_t maxRetransmits() const override { return -1; } + bool negotiated() const override { return false; } + int id() const override { return id_; } + DataState state() const override { return state_; } + uint64_t buffered_amount() const override { return buffered_amount_; } + std::string protocol() const override { return protocol_; } + void RegisterObserver(DataChannelObserver* observer) override; + void UnregisterObserver() override; + void Close() override; + bool Send(const DataBuffer& buffer) override; + + // Called from QuicDataTransport to set the QUIC transport channel that the + // QuicDataChannel sends messages with. Returns false if a different QUIC + // transport channel is already set or |channel| is NULL. + // + // The QUIC transport channel is not set in the constructor to allow creating + // the QuicDataChannel before the PeerConnection has a QUIC transport channel, + // such as before the session description is not set. + bool SetTransportChannel(cricket::QuicTransportChannel* channel); + + // Called from QuicDataTransport when an incoming ReliableQuicStream is + // receiving a message received for this data channel. Once this function is + // called, |message| is owned by the QuicDataChannel and should not be + // accessed by the QuicDataTransport. + void OnIncomingMessage(Message&& message); + + // Methods for testing. + // Gets the number of outgoing QUIC streams with write blocked data that are + // currently open for this data channel and are not finished writing a + // message. This is equivalent to the size of |write_blocked_quic_streams_|. + size_t GetNumWriteBlockedStreams() const; + // Gets the number of incoming QUIC streams with buffered data that are + // currently open for this data channel and are not finished receiving a + // message. This is equivalent to the size of |incoming_quic_messages_|. + size_t GetNumIncomingStreams() const; + + private: + // Callbacks from ReliableQuicStream. + // Called when an incoming QUIC stream in |incoming_quic_messages_| has + // received a QUIC stream frame. + void OnDataReceived(net::QuicStreamId stream_id, + const char* data, + size_t len); + // Called when a write blocked QUIC stream that has been added to + // |write_blocked_quic_streams_| is closed. + void OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, int error); + // Called when an incoming QUIC stream that has been added to + // |incoming_quic_messages_| is closed. + void OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, int error); + // Called when a write blocked QUIC stream in |write_blocked_quic_streams_| + // has written previously queued data. + void OnQueuedBytesWritten(net::QuicStreamId stream_id, + uint64_t queued_bytes_written); + + // Callbacks from |quic_transport_channel_|. + void OnReadyToSend(cricket::TransportChannel* channel); + void OnConnectionClosed(); + + // Worker thread methods. + // Sends the data buffer to the remote peer using an outgoing QUIC stream. + // Returns true if the data buffer can be successfully sent, or if it is + // queued to be sent later. + bool Send_w(const DataBuffer& buffer); + // Connects the |quic_transport_channel_| signals to this QuicDataChannel, + // then returns the new QuicDataChannel state. + DataState SetTransportChannel_w(); + // Closes the QUIC streams associated with this QuicDataChannel. + void Close_w(); + // Sets |buffered_amount_|. + void SetBufferedAmount_w(uint64_t buffered_amount); + + // Signaling thread methods. + // Triggers QuicDataChannelObserver::OnMessage when a message from the remote + // peer is ready to be read. + void OnMessage_s(const DataBuffer& received_data); + // Triggers QuicDataChannel::OnStateChange if the state change is valid. + // Otherwise does nothing if |state| == |state_| or |state| != kClosed when + // the data channel is closing. + void SetState_s(DataState state); + // Triggers QuicDataChannelObserver::OnBufferedAmountChange when the total + // buffered data changes for a QUIC stream. + void OnBufferedAmountChange_s(uint64_t buffered_amount); + + // QUIC transport channel which owns the QUIC session. It is used to create + // a QUIC stream for sending outgoing messages. + cricket::QuicTransportChannel* quic_transport_channel_ = nullptr; + // Signaling thread for DataChannelInterface methods. + rtc::Thread* const signaling_thread_; + // Worker thread for sending data and |quic_transport_channel_| callbacks. + rtc::Thread* const worker_thread_; + rtc::AsyncInvoker invoker_; + // Map of QUIC stream ID => ReliableQuicStream* for write blocked QUIC + // streams. + std::unordered_map + write_blocked_quic_streams_; + // Map of QUIC stream ID => Message for each incoming QUIC stream. + std::unordered_map incoming_quic_messages_; + // Handles received data from the remote peer and data channel state changes. + DataChannelObserver* observer_ = nullptr; + // QuicDataChannel ID. + int id_; + // Connectivity state of the QuicDataChannel. + DataState state_; + // Total bytes that are buffered among the QUIC streams. + uint64_t buffered_amount_; + // Counter for number of sent messages that is used for message IDs. + uint64_t next_message_id_; + + // Variables for application use. + const std::string& label_; + const std::string& protocol_; +}; + +} // namespace webrtc + +#endif // WEBRTC_API_QUICDATACHANNEL_H_ diff --git a/webrtc/api/quicdatachannel_unittest.cc b/webrtc/api/quicdatachannel_unittest.cc new file mode 100644 index 0000000000..2dd91024d0 --- /dev/null +++ b/webrtc/api/quicdatachannel_unittest.cc @@ -0,0 +1,659 @@ +/* + * Copyright 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/api/quicdatachannel.h" + +#include +#include +#include +#include + +#include "webrtc/base/bind.h" +#include "webrtc/base/gunit.h" +#include "webrtc/base/scoped_ptr.h" +#include "webrtc/base/scoped_ref_ptr.h" +#include "webrtc/p2p/base/faketransportcontroller.h" +#include "webrtc/p2p/quic/quictransportchannel.h" +#include "webrtc/p2p/quic/reliablequicstream.h" + +using cricket::FakeTransportChannel; +using cricket::QuicTransportChannel; +using cricket::ReliableQuicStream; + +using webrtc::DataBuffer; +using webrtc::DataChannelObserver; +using webrtc::DataChannelInit; +using webrtc::QuicDataChannel; + +namespace { + +// Timeout for asynchronous operations. +static const int kTimeoutMs = 1000; // milliseconds + +// Small messages that can be sent within a single QUIC packet. +static const std::string kSmallMessage1 = "Hello, world!"; +static const std::string kSmallMessage2 = "WebRTC"; +static const std::string kSmallMessage3 = "1"; +static const std::string kSmallMessage4 = "abcdefghijklmnopqrstuvwxyz"; +static const DataBuffer kSmallBuffer1(kSmallMessage1); +static const DataBuffer kSmallBuffer2(kSmallMessage2); +static const DataBuffer kSmallBuffer3(kSmallMessage3); +static const DataBuffer kSmallBuffer4(kSmallMessage4); + +// Large messages (> 1350 bytes) that exceed the max size of a QUIC packet. +// These are < 16 KB so they don't exceed the QUIC stream flow control limit. +static const std::string kLargeMessage1 = std::string("a", 2000); +static const std::string kLargeMessage2 = std::string("a", 4000); +static const std::string kLargeMessage3 = std::string("a", 8000); +static const std::string kLargeMessage4 = std::string("a", 12000); +static const DataBuffer kLargeBuffer1(kLargeMessage1); +static const DataBuffer kLargeBuffer2(kLargeMessage2); +static const DataBuffer kLargeBuffer3(kLargeMessage3); +static const DataBuffer kLargeBuffer4(kLargeMessage4); + +// Oversized message (> 16 KB) that violates the QUIC stream flow control limit. +static const std::string kOversizedMessage = std::string("a", 20000); +static const DataBuffer kOversizedBuffer(kOversizedMessage); + +// Creates a fingerprint from a certificate. +static rtc::SSLFingerprint* CreateFingerprint(rtc::RTCCertificate* cert) { + std::string digest_algorithm; + cert->ssl_certificate().GetSignatureDigestAlgorithm(&digest_algorithm); + rtc::scoped_ptr fingerprint( + rtc::SSLFingerprint::Create(digest_algorithm, cert->identity())); + return fingerprint.release(); +} + +// FakeObserver receives messages from the QuicDataChannel. +class FakeObserver : public DataChannelObserver { + public: + FakeObserver() + : on_state_change_count_(0), on_buffered_amount_change_count_(0) {} + + // DataChannelObserver overrides. + void OnStateChange() override { ++on_state_change_count_; } + void OnBufferedAmountChange(uint64_t previous_amount) override { + ++on_buffered_amount_change_count_; + } + void OnMessage(const webrtc::DataBuffer& buffer) override { + messages_.push_back(std::string(buffer.data.data(), buffer.size())); + } + + const std::vector& messages() const { return messages_; } + + size_t messages_received() const { return messages_.size(); } + + size_t on_state_change_count() const { return on_state_change_count_; } + + size_t on_buffered_amount_change_count() const { + return on_buffered_amount_change_count_; + } + + private: + std::vector messages_; + size_t on_state_change_count_; + size_t on_buffered_amount_change_count_; +}; + +// FakeQuicDataTransport simulates QuicDataTransport by dispatching QUIC +// stream messages to data channels and encoding/decoding messages. +class FakeQuicDataTransport : public sigslot::has_slots<> { + public: + FakeQuicDataTransport() {} + + void ConnectToTransportChannel(QuicTransportChannel* quic_transport_channel) { + quic_transport_channel->SignalIncomingStream.connect( + this, &FakeQuicDataTransport::OnIncomingStream); + } + + rtc::scoped_refptr CreateDataChannel( + int id, + const std::string& label, + const std::string& protocol) { + DataChannelInit config; + config.id = id; + config.protocol = protocol; + rtc::scoped_refptr data_channel(new QuicDataChannel( + rtc::Thread::Current(), rtc::Thread::Current(), label, config)); + data_channel_by_id_[id] = data_channel; + return data_channel; + } + + private: + void OnIncomingStream(cricket::ReliableQuicStream* stream) { + incoming_stream_ = stream; + incoming_stream_->SignalDataReceived.connect( + this, &FakeQuicDataTransport::OnDataReceived); + } + + void OnDataReceived(net::QuicStreamId id, const char* data, size_t len) { + ASSERT_EQ(incoming_stream_->id(), id); + incoming_stream_->SignalDataReceived.disconnect(this); + // Retrieve the data channel ID and message ID. + int data_channel_id; + uint64_t message_id; + size_t bytes_read; + ASSERT_TRUE(webrtc::ParseQuicDataMessageHeader(data, len, &data_channel_id, + &message_id, &bytes_read)); + data += bytes_read; + len -= bytes_read; + // Dispatch the message to the matching QuicDataChannel. + const auto& kv = data_channel_by_id_.find(data_channel_id); + ASSERT_NE(kv, data_channel_by_id_.end()); + QuicDataChannel* data_channel = kv->second; + QuicDataChannel::Message message; + message.id = message_id; + message.buffer = rtc::CopyOnWriteBuffer(data, len); + message.stream = incoming_stream_; + data_channel->OnIncomingMessage(std::move(message)); + incoming_stream_ = nullptr; + } + + // Map of data channel ID => QuicDataChannel. + std::map> data_channel_by_id_; + // Last incoming QUIC stream which has arrived. + cricket::ReliableQuicStream* incoming_stream_ = nullptr; +}; + +// A peer who creates a QuicDataChannel to transfer data, and simulates network +// connectivity with a fake ICE channel wrapped by the QUIC transport channel. +class QuicDataChannelPeer { + public: + QuicDataChannelPeer() + : ice_transport_channel_("data", 0), + quic_transport_channel_(&ice_transport_channel_) { + ice_transport_channel_.SetAsync(true); + fake_quic_data_transport_.ConnectToTransportChannel( + &quic_transport_channel_); + } + + void GenerateCertificateAndFingerprint() { + rtc::scoped_refptr local_cert = + rtc::RTCCertificate::Create(rtc::scoped_ptr( + rtc::SSLIdentity::Generate("cert_name", rtc::KT_DEFAULT))); + quic_transport_channel_.SetLocalCertificate(local_cert); + local_fingerprint_.reset(CreateFingerprint(local_cert.get())); + } + + rtc::scoped_refptr CreateDataChannelWithTransportChannel( + int id, + const std::string& label, + const std::string& protocol) { + rtc::scoped_refptr data_channel = + fake_quic_data_transport_.CreateDataChannel(id, label, protocol); + data_channel->SetTransportChannel(&quic_transport_channel_); + return data_channel; + } + + rtc::scoped_refptr CreateDataChannelWithoutTransportChannel( + int id, + const std::string& label, + const std::string& protocol) { + return fake_quic_data_transport_.CreateDataChannel(id, label, protocol); + } + + // Connects |ice_transport_channel_| to that of the other peer. + void Connect(QuicDataChannelPeer* other_peer) { + ice_transport_channel_.Connect(); + other_peer->ice_transport_channel_.Connect(); + ice_transport_channel_.SetDestination(&other_peer->ice_transport_channel_); + } + + rtc::scoped_ptr& local_fingerprint() { + return local_fingerprint_; + } + + QuicTransportChannel* quic_transport_channel() { + return &quic_transport_channel_; + } + + FakeTransportChannel* ice_transport_channel() { + return &ice_transport_channel_; + } + + private: + FakeTransportChannel ice_transport_channel_; + QuicTransportChannel quic_transport_channel_; + + rtc::scoped_ptr local_fingerprint_; + + FakeQuicDataTransport fake_quic_data_transport_; +}; + +class QuicDataChannelTest : public testing::Test { + public: + QuicDataChannelTest() {} + + // Connect the QuicTransportChannels and complete the crypto handshake. + void ConnectTransportChannels() { + SetCryptoParameters(); + peer1_.Connect(&peer2_); + ASSERT_TRUE_WAIT(peer1_.quic_transport_channel()->writable() && + peer2_.quic_transport_channel()->writable(), + kTimeoutMs); + } + + // Sets crypto parameters required for the QUIC handshake. + void SetCryptoParameters() { + peer1_.GenerateCertificateAndFingerprint(); + peer2_.GenerateCertificateAndFingerprint(); + + peer1_.quic_transport_channel()->SetSslRole(rtc::SSL_CLIENT); + peer2_.quic_transport_channel()->SetSslRole(rtc::SSL_SERVER); + + rtc::scoped_ptr& peer1_fingerprint = + peer1_.local_fingerprint(); + rtc::scoped_ptr& peer2_fingerprint = + peer2_.local_fingerprint(); + + peer1_.quic_transport_channel()->SetRemoteFingerprint( + peer2_fingerprint->algorithm, + reinterpret_cast(peer2_fingerprint->digest.data()), + peer2_fingerprint->digest.size()); + peer2_.quic_transport_channel()->SetRemoteFingerprint( + peer1_fingerprint->algorithm, + reinterpret_cast(peer1_fingerprint->digest.data()), + peer1_fingerprint->digest.size()); + } + + protected: + QuicDataChannelPeer peer1_; + QuicDataChannelPeer peer2_; +}; + +// Tests that a QuicDataChannel transitions from connecting to open when +// the QuicTransportChannel becomes writable for the first time. +TEST_F(QuicDataChannelTest, DataChannelOpensWhenTransportChannelConnects) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol"); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state()); + ConnectTransportChannels(); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel->state(), + kTimeoutMs); +} + +// Tests that a QuicDataChannel transitions from connecting to open when +// SetTransportChannel is called with a QuicTransportChannel that is already +// writable. +TEST_F(QuicDataChannelTest, DataChannelOpensWhenTransportChannelWritable) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithoutTransportChannel(4, "label", "protocol"); + ConnectTransportChannels(); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state()); + data_channel->SetTransportChannel(peer1_.quic_transport_channel()); + EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state()); +} + +// Tests that the QuicDataChannel transfers messages small enough to fit into a +// single QUIC stream frame. +TEST_F(QuicDataChannelTest, TransferSmallMessage) { + ConnectTransportChannels(); + int data_channel_id = 2; + std::string label = "label"; + std::string protocol = "protocol"; + rtc::scoped_refptr peer1_data_channel = + peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer1_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + rtc::scoped_refptr peer2_data_channel = + peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer2_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + + FakeObserver peer1_observer; + peer1_data_channel->RegisterObserver(&peer1_observer); + FakeObserver peer2_observer; + peer2_data_channel->RegisterObserver(&peer2_observer); + + // peer1 -> peer2 + EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer1)); + ASSERT_EQ_WAIT(1, peer2_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kSmallMessage1, peer2_observer.messages()[0]); + // peer2 -> peer1 + EXPECT_TRUE(peer2_data_channel->Send(kSmallBuffer2)); + ASSERT_EQ_WAIT(1, peer1_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kSmallMessage2, peer1_observer.messages()[0]); + // peer2 -> peer1 + EXPECT_TRUE(peer2_data_channel->Send(kSmallBuffer3)); + ASSERT_EQ_WAIT(2, peer1_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kSmallMessage3, peer1_observer.messages()[1]); + // peer1 -> peer2 + EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer4)); + ASSERT_EQ_WAIT(2, peer2_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kSmallMessage4, peer2_observer.messages()[1]); +} + +// Tests that QuicDataChannel transfers messages large enough to fit into +// multiple QUIC stream frames, which don't violate the QUIC flow control limit. +// These require buffering by the QuicDataChannel. +TEST_F(QuicDataChannelTest, TransferLargeMessage) { + ConnectTransportChannels(); + int data_channel_id = 347; + std::string label = "label"; + std::string protocol = "protocol"; + rtc::scoped_refptr peer1_data_channel = + peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer1_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + rtc::scoped_refptr peer2_data_channel = + peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer2_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + + FakeObserver peer1_observer; + peer1_data_channel->RegisterObserver(&peer1_observer); + FakeObserver peer2_observer; + peer2_data_channel->RegisterObserver(&peer2_observer); + + // peer1 -> peer2 + EXPECT_TRUE(peer1_data_channel->Send(kLargeBuffer1)); + ASSERT_TRUE_WAIT(peer2_observer.messages_received() == 1, kTimeoutMs); + EXPECT_EQ(kLargeMessage1, peer2_observer.messages()[0]); + // peer2 -> peer1 + EXPECT_TRUE(peer2_data_channel->Send(kLargeBuffer2)); + ASSERT_EQ_WAIT(1, peer1_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kLargeMessage2, peer1_observer.messages()[0]); + // peer2 -> peer1 + EXPECT_TRUE(peer2_data_channel->Send(kLargeBuffer3)); + ASSERT_EQ_WAIT(2, peer1_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kLargeMessage3, peer1_observer.messages()[1]); + // peer1 -> peer2 + EXPECT_TRUE(peer1_data_channel->Send(kLargeBuffer4)); + ASSERT_EQ_WAIT(2, peer2_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kLargeMessage4, peer2_observer.messages()[1]); +} + +// Tests that when a message size exceeds the flow control limit (> 16KB), the +// QuicDataChannel can queue the data and send it after receiving window update +// frames from the remote peer. +TEST_F(QuicDataChannelTest, TransferOversizedMessage) { + ConnectTransportChannels(); + int data_channel_id = 189; + std::string label = "label"; + std::string protocol = "protocol"; + rtc::scoped_refptr peer1_data_channel = + peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + rtc::scoped_refptr peer2_data_channel = + peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer2_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + + FakeObserver peer1_observer; + peer1_data_channel->RegisterObserver(&peer1_observer); + FakeObserver peer2_observer; + peer2_data_channel->RegisterObserver(&peer2_observer); + + EXPECT_TRUE(peer1_data_channel->Send(kOversizedBuffer)); + EXPECT_EQ(1, peer1_data_channel->GetNumWriteBlockedStreams()); + EXPECT_EQ_WAIT(1, peer2_data_channel->GetNumIncomingStreams(), kTimeoutMs); + ASSERT_EQ_WAIT(1, peer2_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(kOversizedMessage, peer2_observer.messages()[0]); + EXPECT_EQ(0, peer1_data_channel->GetNumWriteBlockedStreams()); + EXPECT_EQ(0, peer2_data_channel->GetNumIncomingStreams()); +} + +// Tests that empty messages can be sent. +TEST_F(QuicDataChannelTest, TransferEmptyMessage) { + ConnectTransportChannels(); + int data_channel_id = 69; + std::string label = "label"; + std::string protocol = "protocol"; + rtc::scoped_refptr peer1_data_channel = + peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + rtc::scoped_refptr peer2_data_channel = + peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer2_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + + FakeObserver peer1_observer; + peer1_data_channel->RegisterObserver(&peer1_observer); + FakeObserver peer2_observer; + peer2_data_channel->RegisterObserver(&peer2_observer); + + EXPECT_TRUE(peer1_data_channel->Send(DataBuffer(""))); + ASSERT_EQ_WAIT(1, peer2_observer.messages_received(), kTimeoutMs); + EXPECT_EQ("", peer2_observer.messages()[0]); +} + +// Tests that when the QuicDataChannel is open and sends a message while the +// QuicTransportChannel is unwritable, it gets buffered then received once the +// QuicTransportChannel becomes writable again. +TEST_F(QuicDataChannelTest, MessagesReceivedWhenTransportChannelReconnects) { + ConnectTransportChannels(); + int data_channel_id = 401; + std::string label = "label"; + std::string protocol = "protocol"; + rtc::scoped_refptr peer1_data_channel = + peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer1_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + rtc::scoped_refptr peer2_data_channel = + peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label, + protocol); + ASSERT_TRUE(peer2_data_channel->state() == + webrtc::DataChannelInterface::kOpen); + + FakeObserver peer1_observer; + peer1_data_channel->RegisterObserver(&peer1_observer); + FakeObserver peer2_observer; + peer2_data_channel->RegisterObserver(&peer2_observer); + // writable => unwritable + peer1_.ice_transport_channel()->SetWritable(false); + ASSERT_FALSE(peer1_.quic_transport_channel()->writable()); + // Verify that sent data is buffered. + EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer1)); + EXPECT_EQ(1, peer1_data_channel->GetNumWriteBlockedStreams()); + EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer2)); + EXPECT_EQ(2, peer1_data_channel->GetNumWriteBlockedStreams()); + EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer3)); + EXPECT_EQ(3, peer1_data_channel->GetNumWriteBlockedStreams()); + EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer4)); + EXPECT_EQ(4, peer1_data_channel->GetNumWriteBlockedStreams()); + // unwritable => writable + peer1_.ice_transport_channel()->SetWritable(true); + ASSERT_TRUE(peer1_.quic_transport_channel()->writable()); + ASSERT_EQ_WAIT(4, peer2_observer.messages_received(), kTimeoutMs); + EXPECT_EQ(0, peer1_data_channel->GetNumWriteBlockedStreams()); + EXPECT_EQ(0, peer2_data_channel->GetNumIncomingStreams()); +} + +// Tests that the QuicDataChannel does not send before it is open. +TEST_F(QuicDataChannelTest, TransferMessageBeforeChannelOpens) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(6, "label", "protocol"); + ASSERT_TRUE(data_channel->state() == + webrtc::DataChannelInterface::kConnecting); + EXPECT_FALSE(data_channel->Send(kSmallBuffer1)); +} + +// Tests that the QuicDataChannel does not send after it is closed. +TEST_F(QuicDataChannelTest, TransferDataAfterChannelClosed) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(42, "label", "protocol"); + data_channel->Close(); + ASSERT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(), + kTimeoutMs); + EXPECT_FALSE(data_channel->Send(kSmallBuffer1)); +} + +// Tests that QuicDataChannel state changes fire OnStateChanged() for the +// observer, with the correct data channel states, when the data channel +// transitions from kConnecting => kOpen => kClosing => kClosed. +TEST_F(QuicDataChannelTest, OnStateChangedFired) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(7, "label", "protocol"); + FakeObserver observer; + data_channel->RegisterObserver(&observer); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state()); + EXPECT_EQ(0, observer.on_state_change_count()); + ConnectTransportChannels(); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel->state(), + kTimeoutMs); + EXPECT_EQ(1, observer.on_state_change_count()); + data_channel->Close(); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(), + kTimeoutMs); + // 2 state changes due to kClosing and kClosed. + EXPECT_EQ(3, observer.on_state_change_count()); +} + +// Tests that a QuicTransportChannel can be closed without being opened when it +// is connected to a transprot chanenl. +TEST_F(QuicDataChannelTest, NeverOpenedWithTransportChannel) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(7, "label", "protocol"); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state()); + data_channel->Close(); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(), + kTimeoutMs); +} + +// Tests that a QuicTransportChannel can be closed without being opened or +// connected to a transport channel. +TEST_F(QuicDataChannelTest, NeverOpenedWithoutTransportChannel) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithoutTransportChannel(7, "label", "protocol"); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state()); + data_channel->Close(); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(), + kTimeoutMs); +} + +// Tests that the QuicDataChannel is closed when the QUIC connection closes. +TEST_F(QuicDataChannelTest, ClosedOnTransportError) { + ConnectTransportChannels(); + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(1, "label", "protocol"); + EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state()); + ReliableQuicStream* stream = + peer1_.quic_transport_channel()->CreateQuicStream(); + ASSERT_NE(nullptr, stream); + stream->CloseConnectionWithDetails(net::QuicErrorCode::QUIC_NO_ERROR, + "Closing QUIC for testing"); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(), + kTimeoutMs); +} + +// Tests that an already closed QuicDataChannel does not fire onStateChange and +// remains closed. +TEST_F(QuicDataChannelTest, DoesNotChangeStateWhenClosed) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol"); + FakeObserver observer; + data_channel->RegisterObserver(&observer); + data_channel->Close(); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(), + kTimeoutMs); + // OnStateChange called for kClosing and kClosed. + EXPECT_EQ(2, observer.on_state_change_count()); + // Call Close() again to verify that the state cannot be kClosing. + data_channel->Close(); + EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state()); + EXPECT_EQ(2, observer.on_state_change_count()); + ConnectTransportChannels(); + EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state()); + EXPECT_EQ(2, observer.on_state_change_count()); + // writable => unwritable + peer1_.ice_transport_channel()->SetWritable(false); + ASSERT_FALSE(peer1_.quic_transport_channel()->writable()); + EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state()); + EXPECT_EQ(2, observer.on_state_change_count()); + // unwritable => writable + peer1_.ice_transport_channel()->SetWritable(true); + ASSERT_TRUE(peer1_.quic_transport_channel()->writable()); + EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state()); + EXPECT_EQ(2, observer.on_state_change_count()); +} + +// Tests that when the QuicDataChannel is open and the QuicTransportChannel +// transitions between writable and unwritable, it does not fire onStateChange +// and remains open. +TEST_F(QuicDataChannelTest, DoesNotChangeStateWhenTransportChannelReconnects) { + ConnectTransportChannels(); + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol"); + FakeObserver observer; + data_channel->RegisterObserver(&observer); + EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state()); + EXPECT_EQ(0, observer.on_state_change_count()); + // writable => unwritable + peer1_.ice_transport_channel()->SetWritable(false); + ASSERT_FALSE(peer1_.quic_transport_channel()->writable()); + EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state()); + EXPECT_EQ(0, observer.on_state_change_count()); + // unwritable => writable + peer1_.ice_transport_channel()->SetWritable(true); + ASSERT_TRUE(peer1_.quic_transport_channel()->writable()); + EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state()); + EXPECT_EQ(0, observer.on_state_change_count()); +} + +// Tests that SetTransportChannel returns false when setting a NULL transport +// channel or a transport channel that is not equivalent to the one already set. +TEST_F(QuicDataChannelTest, SetTransportChannelReturnValue) { + rtc::scoped_refptr data_channel = + peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol"); + EXPECT_FALSE(data_channel->SetTransportChannel(nullptr)); + QuicTransportChannel* transport_channel = peer1_.quic_transport_channel(); + EXPECT_TRUE(data_channel->SetTransportChannel(transport_channel)); + EXPECT_TRUE(data_channel->SetTransportChannel(transport_channel)); + QuicTransportChannel* other_transport_channel = + peer2_.quic_transport_channel(); + EXPECT_FALSE(data_channel->SetTransportChannel(other_transport_channel)); +} + +// Tests that the QUIC message header is encoded with the correct number of +// bytes and is properly decoded. +TEST_F(QuicDataChannelTest, EncodeParseQuicDataMessageHeader) { + int data_channel_id1 = 127; // 1 byte + uint64_t message_id1 = 0; // 1 byte + rtc::CopyOnWriteBuffer header1; + webrtc::WriteQuicDataChannelMessageHeader(data_channel_id1, message_id1, + &header1); + EXPECT_EQ(2u, header1.size()); + + int decoded_data_channel_id1; + uint64_t decoded_message_id1; + size_t bytes_read1; + ASSERT_TRUE(webrtc::ParseQuicDataMessageHeader( + header1.data(), header1.size(), &decoded_data_channel_id1, + &decoded_message_id1, &bytes_read1)); + EXPECT_EQ(data_channel_id1, decoded_data_channel_id1); + EXPECT_EQ(message_id1, decoded_message_id1); + EXPECT_EQ(2u, bytes_read1); + + int data_channel_id2 = 4178; // 2 bytes + uint64_t message_id2 = 1324921792003; // 6 bytes + rtc::CopyOnWriteBuffer header2; + webrtc::WriteQuicDataChannelMessageHeader(data_channel_id2, message_id2, + &header2); + EXPECT_EQ(8u, header2.size()); + + int decoded_data_channel_id2; + uint64_t decoded_message_id2; + size_t bytes_read2; + ASSERT_TRUE(webrtc::ParseQuicDataMessageHeader( + header2.data(), header2.size(), &decoded_data_channel_id2, + &decoded_message_id2, &bytes_read2)); + EXPECT_EQ(data_channel_id2, decoded_data_channel_id2); + EXPECT_EQ(message_id2, decoded_message_id2); + EXPECT_EQ(8u, bytes_read2); +} + +} // namespace diff --git a/webrtc/api/quicdatatransport.cc b/webrtc/api/quicdatatransport.cc new file mode 100644 index 0000000000..70ad03dbfd --- /dev/null +++ b/webrtc/api/quicdatatransport.cc @@ -0,0 +1,146 @@ +/* + * Copyright 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/api/quicdatatransport.h" + +#include "webrtc/base/logging.h" +#include "webrtc/p2p/quic/quictransportchannel.h" +#include "webrtc/p2p/quic/reliablequicstream.h" + +namespace webrtc { + +QuicDataTransport::QuicDataTransport(rtc::Thread* signaling_thread, + rtc::Thread* worker_thread) + : signaling_thread_(signaling_thread), worker_thread_(worker_thread) { + RTC_DCHECK(signaling_thread_); + RTC_DCHECK(worker_thread_); +} + +QuicDataTransport::~QuicDataTransport() {} + +bool QuicDataTransport::SetTransportChannel( + cricket::QuicTransportChannel* channel) { + if (!channel) { + LOG(LS_ERROR) << "|channel| is NULL. Cannot set transport channel."; + return false; + } + if (quic_transport_channel_) { + if (channel == quic_transport_channel_) { + LOG(LS_WARNING) << "Ignoring duplicate transport channel."; + return true; + } + LOG(LS_ERROR) << "|channel| does not match existing transport channel."; + return false; + } + + LOG(LS_INFO) << "Setting QuicTransportChannel for QuicDataTransport"; + quic_transport_channel_ = channel; + quic_transport_channel_->SignalIncomingStream.connect( + this, &QuicDataTransport::OnIncomingStream); + + bool success = true; + for (const auto& kv : data_channel_by_id_) { + rtc::scoped_refptr data_channel = kv.second; + if (!data_channel->SetTransportChannel(quic_transport_channel_)) { + LOG(LS_ERROR) + << "Cannot set QUIC transport channel for QUIC data channel " + << kv.first; + success = false; + } + } + return success; +} + +rtc::scoped_refptr QuicDataTransport::CreateDataChannel( + const std::string& label, + const DataChannelInit* config) { + if (config == nullptr) { + return nullptr; + } + if (data_channel_by_id_.find(config->id) != data_channel_by_id_.end()) { + LOG(LS_ERROR) << "QUIC data channel already exists with id " << config->id; + return nullptr; + } + rtc::scoped_refptr data_channel( + new QuicDataChannel(signaling_thread_, worker_thread_, label, *config)); + if (quic_transport_channel_) { + if (!data_channel->SetTransportChannel(quic_transport_channel_)) { + LOG(LS_ERROR) + << "Cannot set QUIC transport channel for QUIC data channel " + << config->id; + } + } + + data_channel_by_id_[data_channel->id()] = data_channel; + return data_channel; +} + +void QuicDataTransport::DestroyDataChannel(int id) { + data_channel_by_id_.erase(id); +} + +bool QuicDataTransport::HasDataChannel(int id) const { + return data_channel_by_id_.find(id) != data_channel_by_id_.end(); +} + +bool QuicDataTransport::HasDataChannels() const { + return !data_channel_by_id_.empty(); +} + +// Called when a QUIC stream is created for incoming data. +void QuicDataTransport::OnIncomingStream(cricket::ReliableQuicStream* stream) { + RTC_DCHECK(stream != nullptr); + quic_stream_by_id_[stream->id()] = stream; + stream->SignalDataReceived.connect(this, &QuicDataTransport::OnDataReceived); +} + +// Called when the first QUIC stream frame is received for incoming data. +void QuicDataTransport::OnDataReceived(net::QuicStreamId id, + const char* data, + size_t len) { + const auto& quic_stream_kv = quic_stream_by_id_.find(id); + if (quic_stream_kv == quic_stream_by_id_.end()) { + RTC_DCHECK(false); + return; + } + cricket::ReliableQuicStream* stream = quic_stream_kv->second; + stream->SignalDataReceived.disconnect(this); + quic_stream_by_id_.erase(id); + // Read the data channel ID and message ID. + int data_channel_id; + uint64_t message_id; + size_t bytes_read; + if (!ParseQuicDataMessageHeader(data, len, &data_channel_id, &message_id, + &bytes_read)) { + LOG(LS_ERROR) << "Could not read QUIC message header from QUIC stream " + << id; + return; + } + data += bytes_read; + len -= bytes_read; + // Retrieve the data channel which will handle the message. + const auto& data_channel_kv = data_channel_by_id_.find(data_channel_id); + if (data_channel_kv == data_channel_by_id_.end()) { + // TODO(mikescarlett): Implement OPEN message to create a new + // QuicDataChannel when messages are received for a nonexistent ID. + LOG(LS_ERROR) << "Data was received for QUIC data channel " + << data_channel_id + << " but it is not registered to the QuicDataTransport."; + return; + } + QuicDataChannel* data_channel = data_channel_kv->second; + QuicDataChannel::Message message; + message.id = message_id; + message.buffer = rtc::CopyOnWriteBuffer(data, len); + message.stream = stream; + data_channel->OnIncomingMessage(std::move(message)); +} + +} // namespace webrtc diff --git a/webrtc/api/quicdatatransport.h b/webrtc/api/quicdatatransport.h new file mode 100644 index 0000000000..f0c427d1b5 --- /dev/null +++ b/webrtc/api/quicdatatransport.h @@ -0,0 +1,90 @@ +/* + * Copyright 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_API_QUICDATATRANSPORT_H_ +#define WEBRTC_API_QUICDATATRANSPORT_H_ + +#include +#include + +#include "webrtc/api/datachannelinterface.h" +#include "webrtc/api/quicdatachannel.h" +#include "webrtc/base/scoped_ref_ptr.h" +#include "webrtc/base/sigslot.h" +#include "webrtc/base/thread.h" + +namespace cricket { +class QuicTransportChannel; +class ReliableQuicStream; +} // namepsace cricket + +namespace webrtc { + +// QuicDataTransport creates QuicDataChannels for the PeerConnection. It also +// handles QUIC stream demuxing by distributing incoming QUIC streams from the +// QuicTransportChannel among the QuicDataChannels that it has created. +// +// QuicDataTransport reads the data channel ID from the incoming QUIC stream, +// then looks it up in a map of ID => QuicDataChannel. If the data channel +// exists, it sends the QUIC stream to the QuicDataChannel. +class QuicDataTransport : public sigslot::has_slots<> { + public: + QuicDataTransport(rtc::Thread* signaling_thread, rtc::Thread* worker_thread); + ~QuicDataTransport() override; + + // Sets the QUIC transport channel for the QuicDataChannels and the + // QuicDataTransport. Returns false if a different QUIC transport channel is + // already set, the QUIC transport channel cannot be set for any of the + // QuicDataChannels, or |channel| is NULL. + bool SetTransportChannel(cricket::QuicTransportChannel* channel); + + // Creates a QuicDataChannel that uses this QuicDataTransport. + rtc::scoped_refptr CreateDataChannel( + const std::string& label, + const DataChannelInit* config); + + // Removes a QuicDataChannel with the given ID from the QuicDataTransport's + // data channel map. + void DestroyDataChannel(int id); + + // True if the QuicDataTransport has a data channel with the given ID. + bool HasDataChannel(int id) const; + + // True if the QuicDataTransport has data channels. + bool HasDataChannels() const; + + private: + // Called from the QuicTransportChannel when a ReliableQuicStream is created + // to receive incoming data. + void OnIncomingStream(cricket::ReliableQuicStream* stream); + // Called from the ReliableQuicStream when the first QUIC stream frame is + // received for incoming data. The QuicDataTransport reads the data channel ID + // and message ID from the incoming data, then dispatches the + // ReliableQuicStream to the QuicDataChannel with the same data channel ID. + void OnDataReceived(net::QuicStreamId stream_id, + const char* data, + size_t len); + + // Map of data channel ID => QUIC data channel values. + std::unordered_map> + data_channel_by_id_; + // Map of QUIC stream ID => ReliableQuicStream* values. + std::unordered_map + quic_stream_by_id_; + // QuicTransportChannel for sending/receiving data. + cricket::QuicTransportChannel* quic_transport_channel_ = nullptr; + // Signaling and worker threads for the QUIC data channel. + rtc::Thread* const signaling_thread_; + rtc::Thread* const worker_thread_; +}; + +} // namespace webrtc + +#endif // WEBRTC_API_QUICDATATRANSPORT_H_ diff --git a/webrtc/api/quicdatatransport_unittest.cc b/webrtc/api/quicdatatransport_unittest.cc new file mode 100644 index 0000000000..312e825ac4 --- /dev/null +++ b/webrtc/api/quicdatatransport_unittest.cc @@ -0,0 +1,355 @@ +/* + * Copyright 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/api/quicdatatransport.h" + +#include +#include +#include +#include + +#include "webrtc/api/quicdatachannel.h" +#include "webrtc/base/bytebuffer.h" +#include "webrtc/base/gunit.h" +#include "webrtc/p2p/base/faketransportcontroller.h" +#include "webrtc/p2p/quic/quictransportchannel.h" +#include "webrtc/p2p/quic/reliablequicstream.h" + +using webrtc::DataBuffer; +using webrtc::DataChannelInit; +using webrtc::DataChannelInterface; +using webrtc::DataChannelObserver; +using webrtc::QuicDataChannel; +using webrtc::QuicDataTransport; +using cricket::FakeTransportChannel; +using cricket::QuicTransportChannel; +using cricket::ReliableQuicStream; + +namespace { + +// Timeout for asynchronous operations. +static const int kTimeoutMs = 1000; // milliseconds + +// FakeObserver receives messages from the data channel. +class FakeObserver : public DataChannelObserver { + public: + FakeObserver() {} + + void OnStateChange() override {} + + void OnBufferedAmountChange(uint64_t previous_amount) override {} + + void OnMessage(const webrtc::DataBuffer& buffer) override { + messages_.push_back(std::string(buffer.data.data(), buffer.size())); + } + + const std::vector& messages() const { return messages_; } + + size_t messages_received() const { return messages_.size(); } + + private: + std::vector messages_; +}; + +// A peer who uses a QUIC transport channel and fake ICE transport channel to +// send or receive data. +class QuicDataTransportPeer { + public: + QuicDataTransportPeer() + : quic_data_transport_(rtc::Thread::Current(), rtc::Thread::Current()), + ice_transport_channel_("data", 0), + quic_transport_channel_(&ice_transport_channel_) { + ice_transport_channel_.SetAsync(true); + } + + void GenerateCertificateAndFingerprint() { + rtc::scoped_refptr local_cert = + rtc::RTCCertificate::Create(rtc::scoped_ptr( + rtc::SSLIdentity::Generate("cert_name", rtc::KT_DEFAULT))); + quic_transport_channel_.SetLocalCertificate(local_cert); + local_fingerprint_.reset(CreateFingerprint(local_cert.get())); + } + + // Connects |ice_transport_channel_| to that of the other peer. + void Connect(QuicDataTransportPeer* other_peer) { + ice_transport_channel_.Connect(); + other_peer->ice_transport_channel_.Connect(); + ice_transport_channel_.SetDestination(&other_peer->ice_transport_channel_); + } + + rtc::scoped_ptr& local_fingerprint() { + return local_fingerprint_; + } + + QuicTransportChannel* quic_transport_channel() { + return &quic_transport_channel_; + } + + // Write a messge directly to the ReliableQuicStream. + void WriteMessage(int data_channel_id, + uint64_t message_id, + const std::string& message) { + ReliableQuicStream* stream = quic_transport_channel_.CreateQuicStream(); + rtc::CopyOnWriteBuffer payload; + webrtc::WriteQuicDataChannelMessageHeader(data_channel_id, message_id, + &payload); + stream->Write(payload.data(), payload.size(), false); + stream->Write(message.data(), message.size(), true); + } + + rtc::scoped_refptr CreateDataChannel( + const DataChannelInit* config) { + return quic_data_transport_.CreateDataChannel("testing", config); + } + + QuicDataTransport* quic_data_transport() { return &quic_data_transport_; } + + private: + // Creates a fingerprint from a certificate. + rtc::SSLFingerprint* CreateFingerprint(rtc::RTCCertificate* cert) { + std::string digest_algorithm; + cert->ssl_certificate().GetSignatureDigestAlgorithm(&digest_algorithm); + rtc::scoped_ptr fingerprint( + rtc::SSLFingerprint::Create(digest_algorithm, cert->identity())); + return fingerprint.release(); + } + + QuicDataTransport quic_data_transport_; + FakeTransportChannel ice_transport_channel_; + QuicTransportChannel quic_transport_channel_; + rtc::scoped_ptr local_fingerprint_; +}; + +class QuicDataTransportTest : public testing::Test { + public: + QuicDataTransportTest() {} + + void ConnectTransportChannels() { + SetCryptoParameters(); + peer1_.Connect(&peer2_); + ASSERT_TRUE_WAIT(peer1_.quic_transport_channel()->writable() && + peer2_.quic_transport_channel()->writable(), + kTimeoutMs); + } + + void SetTransportChannels() { + ASSERT_TRUE(peer1_.quic_data_transport()->SetTransportChannel( + peer1_.quic_transport_channel())); + ASSERT_TRUE(peer2_.quic_data_transport()->SetTransportChannel( + peer2_.quic_transport_channel())); + } + + // Sets crypto parameters required for the QUIC handshake. + void SetCryptoParameters() { + peer1_.GenerateCertificateAndFingerprint(); + peer2_.GenerateCertificateAndFingerprint(); + + peer1_.quic_transport_channel()->SetSslRole(rtc::SSL_CLIENT); + peer2_.quic_transport_channel()->SetSslRole(rtc::SSL_SERVER); + + rtc::scoped_ptr& peer1_fingerprint = + peer1_.local_fingerprint(); + rtc::scoped_ptr& peer2_fingerprint = + peer2_.local_fingerprint(); + + peer1_.quic_transport_channel()->SetRemoteFingerprint( + peer2_fingerprint->algorithm, + reinterpret_cast(peer2_fingerprint->digest.data()), + peer2_fingerprint->digest.size()); + peer2_.quic_transport_channel()->SetRemoteFingerprint( + peer1_fingerprint->algorithm, + reinterpret_cast(peer1_fingerprint->digest.data()), + peer1_fingerprint->digest.size()); + } + + protected: + QuicDataTransportPeer peer1_; + QuicDataTransportPeer peer2_; +}; + +// Tests creation and destruction of data channels. +TEST_F(QuicDataTransportTest, CreateAndDestroyDataChannels) { + QuicDataTransport* quic_data_transport = peer2_.quic_data_transport(); + EXPECT_FALSE(quic_data_transport->HasDataChannels()); + for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) { + EXPECT_FALSE(quic_data_transport->HasDataChannel(data_channel_id)); + webrtc::DataChannelInit config; + config.id = data_channel_id; + rtc::scoped_refptr data_channel = + peer2_.CreateDataChannel(&config); + EXPECT_NE(nullptr, data_channel); + EXPECT_EQ(data_channel_id, data_channel->id()); + EXPECT_TRUE(quic_data_transport->HasDataChannel(data_channel_id)); + } + EXPECT_TRUE(quic_data_transport->HasDataChannels()); + for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) { + quic_data_transport->DestroyDataChannel(data_channel_id); + EXPECT_FALSE(quic_data_transport->HasDataChannel(data_channel_id)); + } + EXPECT_FALSE(quic_data_transport->HasDataChannels()); +} + +// Tests that the QuicDataTransport does not allow creating multiple +// QuicDataChannels with the same id. +TEST_F(QuicDataTransportTest, CannotCreateDataChannelsWithSameId) { + webrtc::DataChannelInit config; + config.id = 2; + EXPECT_NE(nullptr, peer2_.CreateDataChannel(&config)); + EXPECT_EQ(nullptr, peer2_.CreateDataChannel(&config)); +} + +// Tests that any data channels created by the QuicDataTransport are in state +// kConnecting before the QuicTransportChannel is set, then transiton to state +// kOpen when the transport channel becomes writable. +TEST_F(QuicDataTransportTest, DataChannelsOpenWhenTransportChannelWritable) { + webrtc::DataChannelInit config1; + config1.id = 7; + rtc::scoped_refptr data_channel1 = + peer2_.CreateDataChannel(&config1); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel1->state()); + SetTransportChannels(); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel1->state()); + webrtc::DataChannelInit config2; + config2.id = 14; + rtc::scoped_refptr data_channel2 = + peer2_.CreateDataChannel(&config2); + EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel2->state()); + // Existing data channels should open once the transport channel is writable. + ConnectTransportChannels(); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel1->state(), + kTimeoutMs); + EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel2->state(), + kTimeoutMs); + // Any data channels created afterwards should start in state kOpen. + webrtc::DataChannelInit config3; + config3.id = 21; + rtc::scoped_refptr data_channel3 = + peer2_.CreateDataChannel(&config3); + EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel3->state()); +} + +// Tests that the QuicTransport dispatches messages for one QuicDataChannel. +TEST_F(QuicDataTransportTest, ReceiveMessagesForSingleDataChannel) { + ConnectTransportChannels(); + SetTransportChannels(); + + int data_channel_id = 1337; + webrtc::DataChannelInit config; + config.id = data_channel_id; + rtc::scoped_refptr peer2_data_channel = + peer2_.CreateDataChannel(&config); + FakeObserver observer; + peer2_data_channel->RegisterObserver(&observer); + + uint64_t message1_id = 26u; + peer1_.WriteMessage(data_channel_id, message1_id, "Testing"); + ASSERT_EQ_WAIT(1, observer.messages_received(), kTimeoutMs); + EXPECT_EQ("Testing", observer.messages()[0]); + + uint64_t message2_id = 402u; + peer1_.WriteMessage(data_channel_id, message2_id, "Hello, World!"); + ASSERT_EQ_WAIT(2, observer.messages_received(), kTimeoutMs); + EXPECT_EQ("Hello, World!", observer.messages()[1]); + + uint64_t message3_id = 100260415u; + peer1_.WriteMessage(data_channel_id, message3_id, "Third message"); + ASSERT_EQ_WAIT(3, observer.messages_received(), kTimeoutMs); + EXPECT_EQ("Third message", observer.messages()[2]); +} + +// Tests that the QuicTransport dispatches messages to the correct data channel +// when multiple are in use. +TEST_F(QuicDataTransportTest, ReceiveMessagesForMultipleDataChannels) { + ConnectTransportChannels(); + SetTransportChannels(); + + std::vector> data_channels; + for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) { + webrtc::DataChannelInit config; + config.id = data_channel_id; + data_channels.push_back(peer2_.CreateDataChannel(&config)); + } + + for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) { + uint64_t message1_id = 48023u; + FakeObserver observer; + DataChannelInterface* peer2_data_channel = + data_channels[data_channel_id].get(); + peer2_data_channel->RegisterObserver(&observer); + peer1_.WriteMessage(data_channel_id, message1_id, "Testing"); + ASSERT_EQ_WAIT(1, observer.messages_received(), kTimeoutMs); + EXPECT_EQ("Testing", observer.messages()[0]); + + uint64_t message2_id = 1372643095u; + peer1_.WriteMessage(data_channel_id, message2_id, "Hello, World!"); + ASSERT_EQ_WAIT(2, observer.messages_received(), kTimeoutMs); + EXPECT_EQ("Hello, World!", observer.messages()[1]); + } +} + +// Tests end-to-end that both peers can use multiple QuicDataChannels to +// send/receive messages using a QuicDataTransport. +TEST_F(QuicDataTransportTest, EndToEndSendReceiveMessages) { + ConnectTransportChannels(); + SetTransportChannels(); + + std::vector> peer1_data_channels; + std::vector> peer2_data_channels; + + for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) { + webrtc::DataChannelInit config; + config.id = data_channel_id; + peer1_data_channels.push_back(peer1_.CreateDataChannel(&config)); + peer2_data_channels.push_back(peer2_.CreateDataChannel(&config)); + } + + for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) { + DataChannelInterface* peer1_data_channel = + peer1_data_channels[data_channel_id].get(); + FakeObserver observer1; + peer1_data_channel->RegisterObserver(&observer1); + DataChannelInterface* peer2_data_channel = + peer2_data_channels[data_channel_id].get(); + FakeObserver observer2; + peer2_data_channel->RegisterObserver(&observer2); + + peer1_data_channel->Send(webrtc::DataBuffer("Peer 1 message 1")); + ASSERT_EQ_WAIT(1, observer2.messages_received(), kTimeoutMs); + EXPECT_EQ("Peer 1 message 1", observer2.messages()[0]); + + peer1_data_channel->Send(webrtc::DataBuffer("Peer 1 message 2")); + ASSERT_EQ_WAIT(2, observer2.messages_received(), kTimeoutMs); + EXPECT_EQ("Peer 1 message 2", observer2.messages()[1]); + + peer2_data_channel->Send(webrtc::DataBuffer("Peer 2 message 1")); + ASSERT_EQ_WAIT(1, observer1.messages_received(), kTimeoutMs); + EXPECT_EQ("Peer 2 message 1", observer1.messages()[0]); + + peer2_data_channel->Send(webrtc::DataBuffer("Peer 2 message 2")); + ASSERT_EQ_WAIT(2, observer1.messages_received(), kTimeoutMs); + EXPECT_EQ("Peer 2 message 2", observer1.messages()[1]); + } +} + +// Tests that SetTransportChannel returns false when setting a NULL transport +// channel or a transport channel that is not equivalent to the one already set. +TEST_F(QuicDataTransportTest, SetTransportChannelReturnValue) { + QuicDataTransport* quic_data_transport = peer1_.quic_data_transport(); + EXPECT_FALSE(quic_data_transport->SetTransportChannel(nullptr)); + QuicTransportChannel* transport_channel = peer1_.quic_transport_channel(); + EXPECT_TRUE(quic_data_transport->SetTransportChannel(transport_channel)); + EXPECT_TRUE(quic_data_transport->SetTransportChannel(transport_channel)); + QuicTransportChannel* other_transport_channel = + peer2_.quic_transport_channel(); + EXPECT_FALSE( + quic_data_transport->SetTransportChannel(other_transport_channel)); +} + +} // namespace