Add QuicDataChannel and QuicDataTransport classes

QuicDataChannel implements DataChannelInterface. It
replaces SCTP data channels by using a QuicTransportChannel
to create a ReliableQuicStream for each message.
QuicDataChannel only implements unordered, reliable delivery
for the initial implementation and does not send a hello message.

QuicDataTransport is a helper class that dispatches each incoming
ReliableQuicStream to a QuicDataChannel when the remote
peer receives a message by parsing the data channel id and message id
from the message header. It is also responsible for encoding the header
before QuicDataChannel sends the message.

Split from CL https://codereview.chromium.org/1844803002/.

BUG=

Review-Url: https://codereview.webrtc.org/1886623002
Cr-Commit-Position: refs/heads/master@{#12574}
This commit is contained in:
mikescarlett 2016-04-29 18:30:55 -07:00 committed by Commit bot
parent 70035cae4d
commit 9bc517f123
8 changed files with 1882 additions and 0 deletions

View File

@ -240,6 +240,20 @@
},
},
}],
['use_quic==1', {
'dependencies': [
'<(DEPTH)/third_party/libquic/libquic.gyp:libquic',
],
'sources': [
'quicdatachannel.cc',
'quicdatachannel.h',
'quicdatatransport.cc',
'quicdatatransport.h',
],
'export_dependent_settings': [
'<(DEPTH)/third_party/libquic/libquic.gyp:libquic',
],
}],
],
}, # target libjingle_peerconnection
], # targets

View File

@ -113,6 +113,18 @@
},
},
}],
['use_quic==1', {
'dependencies': [
'<(DEPTH)/third_party/libquic/libquic.gyp:libquic',
],
'sources': [
'quicdatachannel_unittest.cc',
'quicdatatransport_unittest.cc',
],
'export_dependent_settings': [
'<(DEPTH)/third_party/libquic/libquic.gyp:libquic',
],
}],
], # conditions
}, # target peerconnection_unittests
], # targets

View File

@ -0,0 +1,391 @@
/*
* Copyright 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/api/quicdatachannel.h"
#include "webrtc/base/bind.h"
#include "webrtc/base/bytebuffer.h"
#include "webrtc/base/copyonwritebuffer.h"
#include "webrtc/base/logging.h"
#include "webrtc/p2p/quic/quictransportchannel.h"
#include "webrtc/p2p/quic/reliablequicstream.h"
namespace webrtc {
void WriteQuicDataChannelMessageHeader(int data_channel_id,
uint64_t message_id,
rtc::CopyOnWriteBuffer* header) {
RTC_DCHECK(header);
// 64-bit varints require at most 10 bytes (7*10 == 70), and 32-bit varints
// require at most 5 bytes (7*5 == 35).
size_t max_length = 15;
rtc::ByteBufferWriter byte_buffer(nullptr, max_length,
rtc::ByteBuffer::ByteOrder::ORDER_HOST);
byte_buffer.WriteUVarint(data_channel_id);
byte_buffer.WriteUVarint(message_id);
header->SetData(byte_buffer.Data(), byte_buffer.Length());
}
bool ParseQuicDataMessageHeader(const char* data,
size_t len,
int* data_channel_id,
uint64_t* message_id,
size_t* bytes_read) {
RTC_DCHECK(data_channel_id);
RTC_DCHECK(message_id);
RTC_DCHECK(bytes_read);
rtc::ByteBufferReader byte_buffer(data, len, rtc::ByteBuffer::ORDER_HOST);
uint64_t dcid;
if (!byte_buffer.ReadUVarint(&dcid)) {
LOG(LS_ERROR) << "Could not read the data channel ID";
return false;
}
*data_channel_id = dcid;
if (!byte_buffer.ReadUVarint(message_id)) {
LOG(LS_ERROR) << "Could not read message ID for data channel "
<< *data_channel_id;
return false;
}
size_t remaining_bytes = byte_buffer.Length();
*bytes_read = len - remaining_bytes;
return true;
}
QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread,
rtc::Thread* worker_thread,
const std::string& label,
const DataChannelInit& config)
: signaling_thread_(signaling_thread),
worker_thread_(worker_thread),
id_(config.id),
state_(kConnecting),
buffered_amount_(0),
next_message_id_(0),
label_(label),
protocol_(config.protocol) {}
QuicDataChannel::~QuicDataChannel() {}
void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
RTC_DCHECK(signaling_thread_->IsCurrent());
observer_ = observer;
}
void QuicDataChannel::UnregisterObserver() {
RTC_DCHECK(signaling_thread_->IsCurrent());
observer_ = nullptr;
}
bool QuicDataChannel::Send(const DataBuffer& buffer) {
RTC_DCHECK(signaling_thread_->IsCurrent());
if (state_ != kOpen) {
LOG(LS_ERROR) << "QUIC data channel " << id_
<< " is not open so cannot send.";
return false;
}
return worker_thread_->Invoke<bool>(
rtc::Bind(&QuicDataChannel::Send_w, this, buffer));
}
bool QuicDataChannel::Send_w(const DataBuffer& buffer) {
RTC_DCHECK(worker_thread_->IsCurrent());
// Encode and send the header containing the data channel ID and message ID.
rtc::CopyOnWriteBuffer header;
WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header);
RTC_DCHECK(quic_transport_channel_);
cricket::ReliableQuicStream* stream =
quic_transport_channel_->CreateQuicStream();
RTC_DCHECK(stream);
// Send the header with a FIN if the message is empty.
bool header_fin = (buffer.size() == 0);
rtc::StreamResult header_result =
stream->Write(header.data<char>(), header.size(), header_fin);
if (header_result == rtc::SR_BLOCK) {
// The header is write blocked but we should try sending the message. Since
// the ReliableQuicStream queues data in order, if the header is write
// blocked then the message will be write blocked. Otherwise if the message
// is sent then the header is sent.
LOG(LS_INFO) << "Stream " << stream->id()
<< " header is write blocked for QUIC data channel " << id_;
} else if (header_result != rtc::SR_SUCCESS) {
LOG(LS_ERROR) << "Stream " << stream->id()
<< " failed to write header for QUIC data channel " << id_
<< ". Unexpected error " << header_result;
return false;
}
// If the message is not empty, then send the message with a FIN.
bool message_fin = true;
rtc::StreamResult message_result =
header_fin ? header_result : stream->Write(buffer.data.data<char>(),
buffer.size(), message_fin);
if (message_result == rtc::SR_SUCCESS) {
// The message is sent and we don't need this QUIC stream.
LOG(LS_INFO) << "Stream " << stream->id()
<< " successfully wrote message for QUIC data channel " << id_;
stream->Close();
return true;
}
// TODO(mikescarlett): Register the ReliableQuicStream's priority to the
// QuicWriteBlockedList so that the QUIC session doesn't drop messages when
// the QUIC transport channel becomes unwritable.
if (message_result == rtc::SR_BLOCK) {
// The QUIC stream is write blocked, so the message is queued by the QUIC
// session. If this is due to the QUIC not being writable, it will be sent
// once QUIC becomes writable again. Otherwise it may be due to exceeding
// the QUIC flow control limit, in which case the remote peer's QUIC session
// will tell the QUIC stream to send more data.
LOG(LS_INFO) << "Stream " << stream->id()
<< " message is write blocked for QUIC data channel " << id_;
SetBufferedAmount_w(buffered_amount_ + stream->queued_data_bytes());
stream->SignalQueuedBytesWritten.connect(
this, &QuicDataChannel::OnQueuedBytesWritten);
write_blocked_quic_streams_[stream->id()] = stream;
// The QUIC stream will be removed from |write_blocked_quic_streams_| once
// it closes.
stream->SignalClosed.connect(this,
&QuicDataChannel::OnWriteBlockedStreamClosed);
return true;
}
LOG(LS_ERROR) << "Stream " << stream->id()
<< " failed to write message for QUIC data channel " << id_
<< ". Unexpected error: " << message_result;
return false;
}
void QuicDataChannel::OnQueuedBytesWritten(net::QuicStreamId stream_id,
uint64_t queued_bytes_written) {
RTC_DCHECK(worker_thread_->IsCurrent());
SetBufferedAmount_w(buffered_amount_ - queued_bytes_written);
const auto& kv = write_blocked_quic_streams_.find(stream_id);
if (kv == write_blocked_quic_streams_.end()) {
RTC_DCHECK(false);
return;
}
cricket::ReliableQuicStream* stream = kv->second;
// True if the QUIC stream is done sending data.
if (stream->fin_sent()) {
LOG(LS_INFO) << "Stream " << stream->id()
<< " successfully wrote data for QUIC data channel " << id_;
stream->Close();
}
}
void QuicDataChannel::SetBufferedAmount_w(uint64_t buffered_amount) {
RTC_DCHECK(worker_thread_->IsCurrent());
buffered_amount_ = buffered_amount;
invoker_.AsyncInvoke<void>(
signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s,
this, buffered_amount));
}
void QuicDataChannel::Close() {
RTC_DCHECK(signaling_thread_->IsCurrent());
if (state_ == kClosed || state_ == kClosing) {
return;
}
LOG(LS_INFO) << "Closing QUIC data channel.";
SetState_s(kClosing);
worker_thread_->Invoke<void>(rtc::Bind(&QuicDataChannel::Close_w, this));
SetState_s(kClosed);
}
void QuicDataChannel::Close_w() {
RTC_DCHECK(worker_thread_->IsCurrent());
for (auto& kv : incoming_quic_messages_) {
Message& message = kv.second;
cricket::ReliableQuicStream* stream = message.stream;
stream->Close();
}
for (auto& kv : write_blocked_quic_streams_) {
cricket::ReliableQuicStream* stream = kv.second;
stream->Close();
}
}
bool QuicDataChannel::SetTransportChannel(
cricket::QuicTransportChannel* channel) {
RTC_DCHECK(signaling_thread_->IsCurrent());
if (!channel) {
LOG(LS_ERROR) << "|channel| is NULL. Cannot set transport channel.";
return false;
}
if (quic_transport_channel_) {
if (channel == quic_transport_channel_) {
LOG(LS_WARNING) << "Ignoring duplicate transport channel.";
return true;
}
LOG(LS_ERROR) << "|channel| does not match existing transport channel.";
return false;
}
quic_transport_channel_ = channel;
LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_;
DataState data_channel_state = worker_thread_->Invoke<DataState>(
rtc::Bind(&QuicDataChannel::SetTransportChannel_w, this));
SetState_s(data_channel_state);
return true;
}
DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() {
RTC_DCHECK(worker_thread_->IsCurrent());
quic_transport_channel_->SignalReadyToSend.connect(
this, &QuicDataChannel::OnReadyToSend);
quic_transport_channel_->SignalClosed.connect(
this, &QuicDataChannel::OnConnectionClosed);
if (quic_transport_channel_->writable()) {
return kOpen;
}
return kConnecting;
}
void QuicDataChannel::OnIncomingMessage(Message&& message) {
RTC_DCHECK(worker_thread_->IsCurrent());
RTC_DCHECK(message.stream);
if (!observer_) {
LOG(LS_WARNING) << "QUIC data channel " << id_
<< " received a message but has no observer.";
message.stream->Close();
return;
}
// A FIN is received if the message fits into a single QUIC stream frame and
// the remote peer is done sending.
if (message.stream->fin_received()) {
LOG(LS_INFO) << "Stream " << message.stream->id()
<< " has finished receiving data for QUIC data channel "
<< id_;
DataBuffer final_message(message.buffer, false);
invoker_.AsyncInvoke<void>(signaling_thread_,
rtc::Bind(&QuicDataChannel::OnMessage_s, this,
std::move(final_message)));
message.stream->Close();
return;
}
// Otherwise the message is divided across multiple QUIC stream frames, so
// queue the data. OnDataReceived() will be called each time the remaining
// QUIC stream frames arrive.
LOG(LS_INFO) << "QUIC data channel " << id_
<< " is queuing incoming data for stream "
<< message.stream->id();
incoming_quic_messages_[message.stream->id()] = std::move(message);
message.stream->SignalDataReceived.connect(this,
&QuicDataChannel::OnDataReceived);
// The QUIC stream will be removed from |incoming_quic_messages_| once it
// closes.
message.stream->SignalClosed.connect(
this, &QuicDataChannel::OnIncomingQueuedStreamClosed);
}
void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
const char* data,
size_t len) {
RTC_DCHECK(worker_thread_->IsCurrent());
RTC_DCHECK(data);
const auto& kv = incoming_quic_messages_.find(stream_id);
if (kv == incoming_quic_messages_.end()) {
RTC_DCHECK(false);
return;
}
Message& message = kv->second;
cricket::ReliableQuicStream* stream = message.stream;
rtc::CopyOnWriteBuffer& received_data = message.buffer;
// If the QUIC stream has not received a FIN, then the remote peer is not
// finished sending data.
if (!stream->fin_received()) {
received_data.AppendData(data, len);
return;
}
// Otherwise we are done receiving and can provide the data channel observer
// with the message.
LOG(LS_INFO) << "Stream " << stream_id
<< " has finished receiving data for QUIC data channel " << id_;
received_data.AppendData(data, len);
DataBuffer final_message(std::move(received_data), false);
invoker_.AsyncInvoke<void>(
signaling_thread_,
rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
// Once the stream is closed, OnDataReceived will not fire for the stream.
stream->Close();
}
void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) {
RTC_DCHECK(worker_thread_->IsCurrent());
RTC_DCHECK(channel == quic_transport_channel_);
LOG(LS_INFO) << "QuicTransportChannel is ready to send";
invoker_.AsyncInvoke<void>(
signaling_thread_, rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen));
}
void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id,
int error) {
RTC_DCHECK(worker_thread_->IsCurrent());
LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed.";
write_blocked_quic_streams_.erase(stream_id);
}
void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id,
int error) {
RTC_DCHECK(worker_thread_->IsCurrent());
LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed.";
incoming_quic_messages_.erase(stream_id);
}
void QuicDataChannel::OnConnectionClosed() {
RTC_DCHECK(worker_thread_->IsCurrent());
invoker_.AsyncInvoke<void>(signaling_thread_,
rtc::Bind(&QuicDataChannel::Close, this));
}
void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) {
RTC_DCHECK(signaling_thread_->IsCurrent());
if (observer_) {
observer_->OnMessage(received_data);
}
}
void QuicDataChannel::SetState_s(DataState state) {
RTC_DCHECK(signaling_thread_->IsCurrent());
if (state_ == state || state_ == kClosed) {
return;
}
if (state_ == kClosing && state != kClosed) {
return;
}
LOG(LS_INFO) << "Setting state to " << state << " for QUIC data channel "
<< id_;
state_ = state;
if (observer_) {
observer_->OnStateChange();
}
}
void QuicDataChannel::OnBufferedAmountChange_s(uint64_t buffered_amount) {
RTC_DCHECK(signaling_thread_->IsCurrent());
if (observer_) {
observer_->OnBufferedAmountChange(buffered_amount);
}
}
size_t QuicDataChannel::GetNumWriteBlockedStreams() const {
return write_blocked_quic_streams_.size();
}
size_t QuicDataChannel::GetNumIncomingStreams() const {
return incoming_quic_messages_.size();
}
} // namespace webrtc

View File

@ -0,0 +1,215 @@
/*
* Copyright 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_API_QUICDATACHANNEL_H_
#define WEBRTC_API_QUICDATACHANNEL_H_
#include <string>
#include <unordered_map>
#include <unordered_set>
#include "webrtc/api/datachannelinterface.h"
#include "webrtc/base/asyncinvoker.h"
#include "webrtc/base/sigslot.h"
#include "webrtc/base/thread.h"
namespace cricket {
class QuicTransportChannel;
class ReliableQuicStream;
class TransportChannel;
} // namepsace cricket
namespace net {
// TODO(mikescarlett): Make this uint64_t once QUIC uses 64-bit ids.
typedef uint32_t QuicStreamId;
} // namespace net
namespace rtc {
class CopyOnWriteBuffer;
} // namespace rtc
namespace webrtc {
// Encodes a QUIC message header with the data channel ID and message ID, then
// stores the result in |header|.
void WriteQuicDataChannelMessageHeader(int data_channel_id,
uint64_t message_id,
rtc::CopyOnWriteBuffer* header);
// Decodes the data channel ID and message ID from the initial data received by
// an incoming QUIC stream. The data channel ID is output to |data_channel_id|,
// the message ID is output to |message_id|, and the number of bytes read is
// output to |bytes_read|. Returns false if either ID cannot be read.
bool ParseQuicDataMessageHeader(const char* data,
size_t len,
int* data_channel_id,
uint64_t* message_id,
size_t* bytes_read);
// QuicDataChannel is an implementation of DataChannelInterface based on the
// QUIC protocol. It uses a QuicTransportChannel to establish encryption and
// transfer data, and a QuicDataTransport to receive incoming messages at
// the correct data channel. Currently this class implements unordered, reliable
// delivery and does not send an "OPEN" message.
//
// Each time a message is sent:
//
// - The QuicDataChannel prepends it with the data channel id and message id.
// The QuicTransportChannel creates a ReliableQuicStream, then the
// ReliableQuicStream sends the message with a FIN.
//
// - The remote QuicSession creates a ReliableQuicStream to receive the data.
// The remote QuicDataTransport dispatches the ReliableQuicStream to the
// QuicDataChannel with the same id as this data channel.
//
// - The remote QuicDataChannel queues data from the ReliableQuicStream. Once
// it receives a QUIC stream frame with a FIN, it provides the message to the
// DataChannelObserver.
//
// TODO(mikescarlett): Implement ordered delivery, unreliable delivery, and
// an OPEN message similar to the one for SCTP.
class QuicDataChannel : public rtc::RefCountedObject<DataChannelInterface>,
public sigslot::has_slots<> {
public:
// Message stores buffered data from the incoming QUIC stream. The QUIC stream
// is provided so that remaining data can be received from the remote peer.
struct Message {
uint64_t id;
rtc::CopyOnWriteBuffer buffer;
cricket::ReliableQuicStream* stream;
};
QuicDataChannel(rtc::Thread* signaling_thread,
rtc::Thread* worker_thread,
const std::string& label,
const DataChannelInit& config);
~QuicDataChannel() override;
// DataChannelInterface overrides.
std::string label() const override { return label_; }
bool reliable() const override { return true; }
bool ordered() const override { return false; }
uint16_t maxRetransmitTime() const override { return -1; }
uint16_t maxRetransmits() const override { return -1; }
bool negotiated() const override { return false; }
int id() const override { return id_; }
DataState state() const override { return state_; }
uint64_t buffered_amount() const override { return buffered_amount_; }
std::string protocol() const override { return protocol_; }
void RegisterObserver(DataChannelObserver* observer) override;
void UnregisterObserver() override;
void Close() override;
bool Send(const DataBuffer& buffer) override;
// Called from QuicDataTransport to set the QUIC transport channel that the
// QuicDataChannel sends messages with. Returns false if a different QUIC
// transport channel is already set or |channel| is NULL.
//
// The QUIC transport channel is not set in the constructor to allow creating
// the QuicDataChannel before the PeerConnection has a QUIC transport channel,
// such as before the session description is not set.
bool SetTransportChannel(cricket::QuicTransportChannel* channel);
// Called from QuicDataTransport when an incoming ReliableQuicStream is
// receiving a message received for this data channel. Once this function is
// called, |message| is owned by the QuicDataChannel and should not be
// accessed by the QuicDataTransport.
void OnIncomingMessage(Message&& message);
// Methods for testing.
// Gets the number of outgoing QUIC streams with write blocked data that are
// currently open for this data channel and are not finished writing a
// message. This is equivalent to the size of |write_blocked_quic_streams_|.
size_t GetNumWriteBlockedStreams() const;
// Gets the number of incoming QUIC streams with buffered data that are
// currently open for this data channel and are not finished receiving a
// message. This is equivalent to the size of |incoming_quic_messages_|.
size_t GetNumIncomingStreams() const;
private:
// Callbacks from ReliableQuicStream.
// Called when an incoming QUIC stream in |incoming_quic_messages_| has
// received a QUIC stream frame.
void OnDataReceived(net::QuicStreamId stream_id,
const char* data,
size_t len);
// Called when a write blocked QUIC stream that has been added to
// |write_blocked_quic_streams_| is closed.
void OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, int error);
// Called when an incoming QUIC stream that has been added to
// |incoming_quic_messages_| is closed.
void OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, int error);
// Called when a write blocked QUIC stream in |write_blocked_quic_streams_|
// has written previously queued data.
void OnQueuedBytesWritten(net::QuicStreamId stream_id,
uint64_t queued_bytes_written);
// Callbacks from |quic_transport_channel_|.
void OnReadyToSend(cricket::TransportChannel* channel);
void OnConnectionClosed();
// Worker thread methods.
// Sends the data buffer to the remote peer using an outgoing QUIC stream.
// Returns true if the data buffer can be successfully sent, or if it is
// queued to be sent later.
bool Send_w(const DataBuffer& buffer);
// Connects the |quic_transport_channel_| signals to this QuicDataChannel,
// then returns the new QuicDataChannel state.
DataState SetTransportChannel_w();
// Closes the QUIC streams associated with this QuicDataChannel.
void Close_w();
// Sets |buffered_amount_|.
void SetBufferedAmount_w(uint64_t buffered_amount);
// Signaling thread methods.
// Triggers QuicDataChannelObserver::OnMessage when a message from the remote
// peer is ready to be read.
void OnMessage_s(const DataBuffer& received_data);
// Triggers QuicDataChannel::OnStateChange if the state change is valid.
// Otherwise does nothing if |state| == |state_| or |state| != kClosed when
// the data channel is closing.
void SetState_s(DataState state);
// Triggers QuicDataChannelObserver::OnBufferedAmountChange when the total
// buffered data changes for a QUIC stream.
void OnBufferedAmountChange_s(uint64_t buffered_amount);
// QUIC transport channel which owns the QUIC session. It is used to create
// a QUIC stream for sending outgoing messages.
cricket::QuicTransportChannel* quic_transport_channel_ = nullptr;
// Signaling thread for DataChannelInterface methods.
rtc::Thread* const signaling_thread_;
// Worker thread for sending data and |quic_transport_channel_| callbacks.
rtc::Thread* const worker_thread_;
rtc::AsyncInvoker invoker_;
// Map of QUIC stream ID => ReliableQuicStream* for write blocked QUIC
// streams.
std::unordered_map<net::QuicStreamId, cricket::ReliableQuicStream*>
write_blocked_quic_streams_;
// Map of QUIC stream ID => Message for each incoming QUIC stream.
std::unordered_map<net::QuicStreamId, Message> incoming_quic_messages_;
// Handles received data from the remote peer and data channel state changes.
DataChannelObserver* observer_ = nullptr;
// QuicDataChannel ID.
int id_;
// Connectivity state of the QuicDataChannel.
DataState state_;
// Total bytes that are buffered among the QUIC streams.
uint64_t buffered_amount_;
// Counter for number of sent messages that is used for message IDs.
uint64_t next_message_id_;
// Variables for application use.
const std::string& label_;
const std::string& protocol_;
};
} // namespace webrtc
#endif // WEBRTC_API_QUICDATACHANNEL_H_

View File

@ -0,0 +1,659 @@
/*
* Copyright 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/api/quicdatachannel.h"
#include <map>
#include <sstream>
#include <string>
#include <vector>
#include "webrtc/base/bind.h"
#include "webrtc/base/gunit.h"
#include "webrtc/base/scoped_ptr.h"
#include "webrtc/base/scoped_ref_ptr.h"
#include "webrtc/p2p/base/faketransportcontroller.h"
#include "webrtc/p2p/quic/quictransportchannel.h"
#include "webrtc/p2p/quic/reliablequicstream.h"
using cricket::FakeTransportChannel;
using cricket::QuicTransportChannel;
using cricket::ReliableQuicStream;
using webrtc::DataBuffer;
using webrtc::DataChannelObserver;
using webrtc::DataChannelInit;
using webrtc::QuicDataChannel;
namespace {
// Timeout for asynchronous operations.
static const int kTimeoutMs = 1000; // milliseconds
// Small messages that can be sent within a single QUIC packet.
static const std::string kSmallMessage1 = "Hello, world!";
static const std::string kSmallMessage2 = "WebRTC";
static const std::string kSmallMessage3 = "1";
static const std::string kSmallMessage4 = "abcdefghijklmnopqrstuvwxyz";
static const DataBuffer kSmallBuffer1(kSmallMessage1);
static const DataBuffer kSmallBuffer2(kSmallMessage2);
static const DataBuffer kSmallBuffer3(kSmallMessage3);
static const DataBuffer kSmallBuffer4(kSmallMessage4);
// Large messages (> 1350 bytes) that exceed the max size of a QUIC packet.
// These are < 16 KB so they don't exceed the QUIC stream flow control limit.
static const std::string kLargeMessage1 = std::string("a", 2000);
static const std::string kLargeMessage2 = std::string("a", 4000);
static const std::string kLargeMessage3 = std::string("a", 8000);
static const std::string kLargeMessage4 = std::string("a", 12000);
static const DataBuffer kLargeBuffer1(kLargeMessage1);
static const DataBuffer kLargeBuffer2(kLargeMessage2);
static const DataBuffer kLargeBuffer3(kLargeMessage3);
static const DataBuffer kLargeBuffer4(kLargeMessage4);
// Oversized message (> 16 KB) that violates the QUIC stream flow control limit.
static const std::string kOversizedMessage = std::string("a", 20000);
static const DataBuffer kOversizedBuffer(kOversizedMessage);
// Creates a fingerprint from a certificate.
static rtc::SSLFingerprint* CreateFingerprint(rtc::RTCCertificate* cert) {
std::string digest_algorithm;
cert->ssl_certificate().GetSignatureDigestAlgorithm(&digest_algorithm);
rtc::scoped_ptr<rtc::SSLFingerprint> fingerprint(
rtc::SSLFingerprint::Create(digest_algorithm, cert->identity()));
return fingerprint.release();
}
// FakeObserver receives messages from the QuicDataChannel.
class FakeObserver : public DataChannelObserver {
public:
FakeObserver()
: on_state_change_count_(0), on_buffered_amount_change_count_(0) {}
// DataChannelObserver overrides.
void OnStateChange() override { ++on_state_change_count_; }
void OnBufferedAmountChange(uint64_t previous_amount) override {
++on_buffered_amount_change_count_;
}
void OnMessage(const webrtc::DataBuffer& buffer) override {
messages_.push_back(std::string(buffer.data.data<char>(), buffer.size()));
}
const std::vector<std::string>& messages() const { return messages_; }
size_t messages_received() const { return messages_.size(); }
size_t on_state_change_count() const { return on_state_change_count_; }
size_t on_buffered_amount_change_count() const {
return on_buffered_amount_change_count_;
}
private:
std::vector<std::string> messages_;
size_t on_state_change_count_;
size_t on_buffered_amount_change_count_;
};
// FakeQuicDataTransport simulates QuicDataTransport by dispatching QUIC
// stream messages to data channels and encoding/decoding messages.
class FakeQuicDataTransport : public sigslot::has_slots<> {
public:
FakeQuicDataTransport() {}
void ConnectToTransportChannel(QuicTransportChannel* quic_transport_channel) {
quic_transport_channel->SignalIncomingStream.connect(
this, &FakeQuicDataTransport::OnIncomingStream);
}
rtc::scoped_refptr<QuicDataChannel> CreateDataChannel(
int id,
const std::string& label,
const std::string& protocol) {
DataChannelInit config;
config.id = id;
config.protocol = protocol;
rtc::scoped_refptr<QuicDataChannel> data_channel(new QuicDataChannel(
rtc::Thread::Current(), rtc::Thread::Current(), label, config));
data_channel_by_id_[id] = data_channel;
return data_channel;
}
private:
void OnIncomingStream(cricket::ReliableQuicStream* stream) {
incoming_stream_ = stream;
incoming_stream_->SignalDataReceived.connect(
this, &FakeQuicDataTransport::OnDataReceived);
}
void OnDataReceived(net::QuicStreamId id, const char* data, size_t len) {
ASSERT_EQ(incoming_stream_->id(), id);
incoming_stream_->SignalDataReceived.disconnect(this);
// Retrieve the data channel ID and message ID.
int data_channel_id;
uint64_t message_id;
size_t bytes_read;
ASSERT_TRUE(webrtc::ParseQuicDataMessageHeader(data, len, &data_channel_id,
&message_id, &bytes_read));
data += bytes_read;
len -= bytes_read;
// Dispatch the message to the matching QuicDataChannel.
const auto& kv = data_channel_by_id_.find(data_channel_id);
ASSERT_NE(kv, data_channel_by_id_.end());
QuicDataChannel* data_channel = kv->second;
QuicDataChannel::Message message;
message.id = message_id;
message.buffer = rtc::CopyOnWriteBuffer(data, len);
message.stream = incoming_stream_;
data_channel->OnIncomingMessage(std::move(message));
incoming_stream_ = nullptr;
}
// Map of data channel ID => QuicDataChannel.
std::map<int, rtc::scoped_refptr<QuicDataChannel>> data_channel_by_id_;
// Last incoming QUIC stream which has arrived.
cricket::ReliableQuicStream* incoming_stream_ = nullptr;
};
// A peer who creates a QuicDataChannel to transfer data, and simulates network
// connectivity with a fake ICE channel wrapped by the QUIC transport channel.
class QuicDataChannelPeer {
public:
QuicDataChannelPeer()
: ice_transport_channel_("data", 0),
quic_transport_channel_(&ice_transport_channel_) {
ice_transport_channel_.SetAsync(true);
fake_quic_data_transport_.ConnectToTransportChannel(
&quic_transport_channel_);
}
void GenerateCertificateAndFingerprint() {
rtc::scoped_refptr<rtc::RTCCertificate> local_cert =
rtc::RTCCertificate::Create(rtc::scoped_ptr<rtc::SSLIdentity>(
rtc::SSLIdentity::Generate("cert_name", rtc::KT_DEFAULT)));
quic_transport_channel_.SetLocalCertificate(local_cert);
local_fingerprint_.reset(CreateFingerprint(local_cert.get()));
}
rtc::scoped_refptr<QuicDataChannel> CreateDataChannelWithTransportChannel(
int id,
const std::string& label,
const std::string& protocol) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
fake_quic_data_transport_.CreateDataChannel(id, label, protocol);
data_channel->SetTransportChannel(&quic_transport_channel_);
return data_channel;
}
rtc::scoped_refptr<QuicDataChannel> CreateDataChannelWithoutTransportChannel(
int id,
const std::string& label,
const std::string& protocol) {
return fake_quic_data_transport_.CreateDataChannel(id, label, protocol);
}
// Connects |ice_transport_channel_| to that of the other peer.
void Connect(QuicDataChannelPeer* other_peer) {
ice_transport_channel_.Connect();
other_peer->ice_transport_channel_.Connect();
ice_transport_channel_.SetDestination(&other_peer->ice_transport_channel_);
}
rtc::scoped_ptr<rtc::SSLFingerprint>& local_fingerprint() {
return local_fingerprint_;
}
QuicTransportChannel* quic_transport_channel() {
return &quic_transport_channel_;
}
FakeTransportChannel* ice_transport_channel() {
return &ice_transport_channel_;
}
private:
FakeTransportChannel ice_transport_channel_;
QuicTransportChannel quic_transport_channel_;
rtc::scoped_ptr<rtc::SSLFingerprint> local_fingerprint_;
FakeQuicDataTransport fake_quic_data_transport_;
};
class QuicDataChannelTest : public testing::Test {
public:
QuicDataChannelTest() {}
// Connect the QuicTransportChannels and complete the crypto handshake.
void ConnectTransportChannels() {
SetCryptoParameters();
peer1_.Connect(&peer2_);
ASSERT_TRUE_WAIT(peer1_.quic_transport_channel()->writable() &&
peer2_.quic_transport_channel()->writable(),
kTimeoutMs);
}
// Sets crypto parameters required for the QUIC handshake.
void SetCryptoParameters() {
peer1_.GenerateCertificateAndFingerprint();
peer2_.GenerateCertificateAndFingerprint();
peer1_.quic_transport_channel()->SetSslRole(rtc::SSL_CLIENT);
peer2_.quic_transport_channel()->SetSslRole(rtc::SSL_SERVER);
rtc::scoped_ptr<rtc::SSLFingerprint>& peer1_fingerprint =
peer1_.local_fingerprint();
rtc::scoped_ptr<rtc::SSLFingerprint>& peer2_fingerprint =
peer2_.local_fingerprint();
peer1_.quic_transport_channel()->SetRemoteFingerprint(
peer2_fingerprint->algorithm,
reinterpret_cast<const uint8_t*>(peer2_fingerprint->digest.data()),
peer2_fingerprint->digest.size());
peer2_.quic_transport_channel()->SetRemoteFingerprint(
peer1_fingerprint->algorithm,
reinterpret_cast<const uint8_t*>(peer1_fingerprint->digest.data()),
peer1_fingerprint->digest.size());
}
protected:
QuicDataChannelPeer peer1_;
QuicDataChannelPeer peer2_;
};
// Tests that a QuicDataChannel transitions from connecting to open when
// the QuicTransportChannel becomes writable for the first time.
TEST_F(QuicDataChannelTest, DataChannelOpensWhenTransportChannelConnects) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol");
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state());
ConnectTransportChannels();
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel->state(),
kTimeoutMs);
}
// Tests that a QuicDataChannel transitions from connecting to open when
// SetTransportChannel is called with a QuicTransportChannel that is already
// writable.
TEST_F(QuicDataChannelTest, DataChannelOpensWhenTransportChannelWritable) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithoutTransportChannel(4, "label", "protocol");
ConnectTransportChannels();
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state());
data_channel->SetTransportChannel(peer1_.quic_transport_channel());
EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state());
}
// Tests that the QuicDataChannel transfers messages small enough to fit into a
// single QUIC stream frame.
TEST_F(QuicDataChannelTest, TransferSmallMessage) {
ConnectTransportChannels();
int data_channel_id = 2;
std::string label = "label";
std::string protocol = "protocol";
rtc::scoped_refptr<QuicDataChannel> peer1_data_channel =
peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer1_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
rtc::scoped_refptr<QuicDataChannel> peer2_data_channel =
peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer2_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
FakeObserver peer1_observer;
peer1_data_channel->RegisterObserver(&peer1_observer);
FakeObserver peer2_observer;
peer2_data_channel->RegisterObserver(&peer2_observer);
// peer1 -> peer2
EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer1));
ASSERT_EQ_WAIT(1, peer2_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kSmallMessage1, peer2_observer.messages()[0]);
// peer2 -> peer1
EXPECT_TRUE(peer2_data_channel->Send(kSmallBuffer2));
ASSERT_EQ_WAIT(1, peer1_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kSmallMessage2, peer1_observer.messages()[0]);
// peer2 -> peer1
EXPECT_TRUE(peer2_data_channel->Send(kSmallBuffer3));
ASSERT_EQ_WAIT(2, peer1_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kSmallMessage3, peer1_observer.messages()[1]);
// peer1 -> peer2
EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer4));
ASSERT_EQ_WAIT(2, peer2_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kSmallMessage4, peer2_observer.messages()[1]);
}
// Tests that QuicDataChannel transfers messages large enough to fit into
// multiple QUIC stream frames, which don't violate the QUIC flow control limit.
// These require buffering by the QuicDataChannel.
TEST_F(QuicDataChannelTest, TransferLargeMessage) {
ConnectTransportChannels();
int data_channel_id = 347;
std::string label = "label";
std::string protocol = "protocol";
rtc::scoped_refptr<QuicDataChannel> peer1_data_channel =
peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer1_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
rtc::scoped_refptr<QuicDataChannel> peer2_data_channel =
peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer2_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
FakeObserver peer1_observer;
peer1_data_channel->RegisterObserver(&peer1_observer);
FakeObserver peer2_observer;
peer2_data_channel->RegisterObserver(&peer2_observer);
// peer1 -> peer2
EXPECT_TRUE(peer1_data_channel->Send(kLargeBuffer1));
ASSERT_TRUE_WAIT(peer2_observer.messages_received() == 1, kTimeoutMs);
EXPECT_EQ(kLargeMessage1, peer2_observer.messages()[0]);
// peer2 -> peer1
EXPECT_TRUE(peer2_data_channel->Send(kLargeBuffer2));
ASSERT_EQ_WAIT(1, peer1_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kLargeMessage2, peer1_observer.messages()[0]);
// peer2 -> peer1
EXPECT_TRUE(peer2_data_channel->Send(kLargeBuffer3));
ASSERT_EQ_WAIT(2, peer1_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kLargeMessage3, peer1_observer.messages()[1]);
// peer1 -> peer2
EXPECT_TRUE(peer1_data_channel->Send(kLargeBuffer4));
ASSERT_EQ_WAIT(2, peer2_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kLargeMessage4, peer2_observer.messages()[1]);
}
// Tests that when a message size exceeds the flow control limit (> 16KB), the
// QuicDataChannel can queue the data and send it after receiving window update
// frames from the remote peer.
TEST_F(QuicDataChannelTest, TransferOversizedMessage) {
ConnectTransportChannels();
int data_channel_id = 189;
std::string label = "label";
std::string protocol = "protocol";
rtc::scoped_refptr<QuicDataChannel> peer1_data_channel =
peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
rtc::scoped_refptr<QuicDataChannel> peer2_data_channel =
peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer2_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
FakeObserver peer1_observer;
peer1_data_channel->RegisterObserver(&peer1_observer);
FakeObserver peer2_observer;
peer2_data_channel->RegisterObserver(&peer2_observer);
EXPECT_TRUE(peer1_data_channel->Send(kOversizedBuffer));
EXPECT_EQ(1, peer1_data_channel->GetNumWriteBlockedStreams());
EXPECT_EQ_WAIT(1, peer2_data_channel->GetNumIncomingStreams(), kTimeoutMs);
ASSERT_EQ_WAIT(1, peer2_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(kOversizedMessage, peer2_observer.messages()[0]);
EXPECT_EQ(0, peer1_data_channel->GetNumWriteBlockedStreams());
EXPECT_EQ(0, peer2_data_channel->GetNumIncomingStreams());
}
// Tests that empty messages can be sent.
TEST_F(QuicDataChannelTest, TransferEmptyMessage) {
ConnectTransportChannels();
int data_channel_id = 69;
std::string label = "label";
std::string protocol = "protocol";
rtc::scoped_refptr<QuicDataChannel> peer1_data_channel =
peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
rtc::scoped_refptr<QuicDataChannel> peer2_data_channel =
peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer2_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
FakeObserver peer1_observer;
peer1_data_channel->RegisterObserver(&peer1_observer);
FakeObserver peer2_observer;
peer2_data_channel->RegisterObserver(&peer2_observer);
EXPECT_TRUE(peer1_data_channel->Send(DataBuffer("")));
ASSERT_EQ_WAIT(1, peer2_observer.messages_received(), kTimeoutMs);
EXPECT_EQ("", peer2_observer.messages()[0]);
}
// Tests that when the QuicDataChannel is open and sends a message while the
// QuicTransportChannel is unwritable, it gets buffered then received once the
// QuicTransportChannel becomes writable again.
TEST_F(QuicDataChannelTest, MessagesReceivedWhenTransportChannelReconnects) {
ConnectTransportChannels();
int data_channel_id = 401;
std::string label = "label";
std::string protocol = "protocol";
rtc::scoped_refptr<QuicDataChannel> peer1_data_channel =
peer1_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer1_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
rtc::scoped_refptr<QuicDataChannel> peer2_data_channel =
peer2_.CreateDataChannelWithTransportChannel(data_channel_id, label,
protocol);
ASSERT_TRUE(peer2_data_channel->state() ==
webrtc::DataChannelInterface::kOpen);
FakeObserver peer1_observer;
peer1_data_channel->RegisterObserver(&peer1_observer);
FakeObserver peer2_observer;
peer2_data_channel->RegisterObserver(&peer2_observer);
// writable => unwritable
peer1_.ice_transport_channel()->SetWritable(false);
ASSERT_FALSE(peer1_.quic_transport_channel()->writable());
// Verify that sent data is buffered.
EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer1));
EXPECT_EQ(1, peer1_data_channel->GetNumWriteBlockedStreams());
EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer2));
EXPECT_EQ(2, peer1_data_channel->GetNumWriteBlockedStreams());
EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer3));
EXPECT_EQ(3, peer1_data_channel->GetNumWriteBlockedStreams());
EXPECT_TRUE(peer1_data_channel->Send(kSmallBuffer4));
EXPECT_EQ(4, peer1_data_channel->GetNumWriteBlockedStreams());
// unwritable => writable
peer1_.ice_transport_channel()->SetWritable(true);
ASSERT_TRUE(peer1_.quic_transport_channel()->writable());
ASSERT_EQ_WAIT(4, peer2_observer.messages_received(), kTimeoutMs);
EXPECT_EQ(0, peer1_data_channel->GetNumWriteBlockedStreams());
EXPECT_EQ(0, peer2_data_channel->GetNumIncomingStreams());
}
// Tests that the QuicDataChannel does not send before it is open.
TEST_F(QuicDataChannelTest, TransferMessageBeforeChannelOpens) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(6, "label", "protocol");
ASSERT_TRUE(data_channel->state() ==
webrtc::DataChannelInterface::kConnecting);
EXPECT_FALSE(data_channel->Send(kSmallBuffer1));
}
// Tests that the QuicDataChannel does not send after it is closed.
TEST_F(QuicDataChannelTest, TransferDataAfterChannelClosed) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(42, "label", "protocol");
data_channel->Close();
ASSERT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(),
kTimeoutMs);
EXPECT_FALSE(data_channel->Send(kSmallBuffer1));
}
// Tests that QuicDataChannel state changes fire OnStateChanged() for the
// observer, with the correct data channel states, when the data channel
// transitions from kConnecting => kOpen => kClosing => kClosed.
TEST_F(QuicDataChannelTest, OnStateChangedFired) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(7, "label", "protocol");
FakeObserver observer;
data_channel->RegisterObserver(&observer);
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state());
EXPECT_EQ(0, observer.on_state_change_count());
ConnectTransportChannels();
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel->state(),
kTimeoutMs);
EXPECT_EQ(1, observer.on_state_change_count());
data_channel->Close();
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(),
kTimeoutMs);
// 2 state changes due to kClosing and kClosed.
EXPECT_EQ(3, observer.on_state_change_count());
}
// Tests that a QuicTransportChannel can be closed without being opened when it
// is connected to a transprot chanenl.
TEST_F(QuicDataChannelTest, NeverOpenedWithTransportChannel) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(7, "label", "protocol");
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state());
data_channel->Close();
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(),
kTimeoutMs);
}
// Tests that a QuicTransportChannel can be closed without being opened or
// connected to a transport channel.
TEST_F(QuicDataChannelTest, NeverOpenedWithoutTransportChannel) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithoutTransportChannel(7, "label", "protocol");
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel->state());
data_channel->Close();
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(),
kTimeoutMs);
}
// Tests that the QuicDataChannel is closed when the QUIC connection closes.
TEST_F(QuicDataChannelTest, ClosedOnTransportError) {
ConnectTransportChannels();
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(1, "label", "protocol");
EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state());
ReliableQuicStream* stream =
peer1_.quic_transport_channel()->CreateQuicStream();
ASSERT_NE(nullptr, stream);
stream->CloseConnectionWithDetails(net::QuicErrorCode::QUIC_NO_ERROR,
"Closing QUIC for testing");
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(),
kTimeoutMs);
}
// Tests that an already closed QuicDataChannel does not fire onStateChange and
// remains closed.
TEST_F(QuicDataChannelTest, DoesNotChangeStateWhenClosed) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol");
FakeObserver observer;
data_channel->RegisterObserver(&observer);
data_channel->Close();
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed, data_channel->state(),
kTimeoutMs);
// OnStateChange called for kClosing and kClosed.
EXPECT_EQ(2, observer.on_state_change_count());
// Call Close() again to verify that the state cannot be kClosing.
data_channel->Close();
EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state());
EXPECT_EQ(2, observer.on_state_change_count());
ConnectTransportChannels();
EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state());
EXPECT_EQ(2, observer.on_state_change_count());
// writable => unwritable
peer1_.ice_transport_channel()->SetWritable(false);
ASSERT_FALSE(peer1_.quic_transport_channel()->writable());
EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state());
EXPECT_EQ(2, observer.on_state_change_count());
// unwritable => writable
peer1_.ice_transport_channel()->SetWritable(true);
ASSERT_TRUE(peer1_.quic_transport_channel()->writable());
EXPECT_EQ(webrtc::DataChannelInterface::kClosed, data_channel->state());
EXPECT_EQ(2, observer.on_state_change_count());
}
// Tests that when the QuicDataChannel is open and the QuicTransportChannel
// transitions between writable and unwritable, it does not fire onStateChange
// and remains open.
TEST_F(QuicDataChannelTest, DoesNotChangeStateWhenTransportChannelReconnects) {
ConnectTransportChannels();
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol");
FakeObserver observer;
data_channel->RegisterObserver(&observer);
EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state());
EXPECT_EQ(0, observer.on_state_change_count());
// writable => unwritable
peer1_.ice_transport_channel()->SetWritable(false);
ASSERT_FALSE(peer1_.quic_transport_channel()->writable());
EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state());
EXPECT_EQ(0, observer.on_state_change_count());
// unwritable => writable
peer1_.ice_transport_channel()->SetWritable(true);
ASSERT_TRUE(peer1_.quic_transport_channel()->writable());
EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel->state());
EXPECT_EQ(0, observer.on_state_change_count());
}
// Tests that SetTransportChannel returns false when setting a NULL transport
// channel or a transport channel that is not equivalent to the one already set.
TEST_F(QuicDataChannelTest, SetTransportChannelReturnValue) {
rtc::scoped_refptr<QuicDataChannel> data_channel =
peer1_.CreateDataChannelWithTransportChannel(4, "label", "protocol");
EXPECT_FALSE(data_channel->SetTransportChannel(nullptr));
QuicTransportChannel* transport_channel = peer1_.quic_transport_channel();
EXPECT_TRUE(data_channel->SetTransportChannel(transport_channel));
EXPECT_TRUE(data_channel->SetTransportChannel(transport_channel));
QuicTransportChannel* other_transport_channel =
peer2_.quic_transport_channel();
EXPECT_FALSE(data_channel->SetTransportChannel(other_transport_channel));
}
// Tests that the QUIC message header is encoded with the correct number of
// bytes and is properly decoded.
TEST_F(QuicDataChannelTest, EncodeParseQuicDataMessageHeader) {
int data_channel_id1 = 127; // 1 byte
uint64_t message_id1 = 0; // 1 byte
rtc::CopyOnWriteBuffer header1;
webrtc::WriteQuicDataChannelMessageHeader(data_channel_id1, message_id1,
&header1);
EXPECT_EQ(2u, header1.size());
int decoded_data_channel_id1;
uint64_t decoded_message_id1;
size_t bytes_read1;
ASSERT_TRUE(webrtc::ParseQuicDataMessageHeader(
header1.data<char>(), header1.size(), &decoded_data_channel_id1,
&decoded_message_id1, &bytes_read1));
EXPECT_EQ(data_channel_id1, decoded_data_channel_id1);
EXPECT_EQ(message_id1, decoded_message_id1);
EXPECT_EQ(2u, bytes_read1);
int data_channel_id2 = 4178; // 2 bytes
uint64_t message_id2 = 1324921792003; // 6 bytes
rtc::CopyOnWriteBuffer header2;
webrtc::WriteQuicDataChannelMessageHeader(data_channel_id2, message_id2,
&header2);
EXPECT_EQ(8u, header2.size());
int decoded_data_channel_id2;
uint64_t decoded_message_id2;
size_t bytes_read2;
ASSERT_TRUE(webrtc::ParseQuicDataMessageHeader(
header2.data<char>(), header2.size(), &decoded_data_channel_id2,
&decoded_message_id2, &bytes_read2));
EXPECT_EQ(data_channel_id2, decoded_data_channel_id2);
EXPECT_EQ(message_id2, decoded_message_id2);
EXPECT_EQ(8u, bytes_read2);
}
} // namespace

View File

@ -0,0 +1,146 @@
/*
* Copyright 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/api/quicdatatransport.h"
#include "webrtc/base/logging.h"
#include "webrtc/p2p/quic/quictransportchannel.h"
#include "webrtc/p2p/quic/reliablequicstream.h"
namespace webrtc {
QuicDataTransport::QuicDataTransport(rtc::Thread* signaling_thread,
rtc::Thread* worker_thread)
: signaling_thread_(signaling_thread), worker_thread_(worker_thread) {
RTC_DCHECK(signaling_thread_);
RTC_DCHECK(worker_thread_);
}
QuicDataTransport::~QuicDataTransport() {}
bool QuicDataTransport::SetTransportChannel(
cricket::QuicTransportChannel* channel) {
if (!channel) {
LOG(LS_ERROR) << "|channel| is NULL. Cannot set transport channel.";
return false;
}
if (quic_transport_channel_) {
if (channel == quic_transport_channel_) {
LOG(LS_WARNING) << "Ignoring duplicate transport channel.";
return true;
}
LOG(LS_ERROR) << "|channel| does not match existing transport channel.";
return false;
}
LOG(LS_INFO) << "Setting QuicTransportChannel for QuicDataTransport";
quic_transport_channel_ = channel;
quic_transport_channel_->SignalIncomingStream.connect(
this, &QuicDataTransport::OnIncomingStream);
bool success = true;
for (const auto& kv : data_channel_by_id_) {
rtc::scoped_refptr<QuicDataChannel> data_channel = kv.second;
if (!data_channel->SetTransportChannel(quic_transport_channel_)) {
LOG(LS_ERROR)
<< "Cannot set QUIC transport channel for QUIC data channel "
<< kv.first;
success = false;
}
}
return success;
}
rtc::scoped_refptr<DataChannelInterface> QuicDataTransport::CreateDataChannel(
const std::string& label,
const DataChannelInit* config) {
if (config == nullptr) {
return nullptr;
}
if (data_channel_by_id_.find(config->id) != data_channel_by_id_.end()) {
LOG(LS_ERROR) << "QUIC data channel already exists with id " << config->id;
return nullptr;
}
rtc::scoped_refptr<QuicDataChannel> data_channel(
new QuicDataChannel(signaling_thread_, worker_thread_, label, *config));
if (quic_transport_channel_) {
if (!data_channel->SetTransportChannel(quic_transport_channel_)) {
LOG(LS_ERROR)
<< "Cannot set QUIC transport channel for QUIC data channel "
<< config->id;
}
}
data_channel_by_id_[data_channel->id()] = data_channel;
return data_channel;
}
void QuicDataTransport::DestroyDataChannel(int id) {
data_channel_by_id_.erase(id);
}
bool QuicDataTransport::HasDataChannel(int id) const {
return data_channel_by_id_.find(id) != data_channel_by_id_.end();
}
bool QuicDataTransport::HasDataChannels() const {
return !data_channel_by_id_.empty();
}
// Called when a QUIC stream is created for incoming data.
void QuicDataTransport::OnIncomingStream(cricket::ReliableQuicStream* stream) {
RTC_DCHECK(stream != nullptr);
quic_stream_by_id_[stream->id()] = stream;
stream->SignalDataReceived.connect(this, &QuicDataTransport::OnDataReceived);
}
// Called when the first QUIC stream frame is received for incoming data.
void QuicDataTransport::OnDataReceived(net::QuicStreamId id,
const char* data,
size_t len) {
const auto& quic_stream_kv = quic_stream_by_id_.find(id);
if (quic_stream_kv == quic_stream_by_id_.end()) {
RTC_DCHECK(false);
return;
}
cricket::ReliableQuicStream* stream = quic_stream_kv->second;
stream->SignalDataReceived.disconnect(this);
quic_stream_by_id_.erase(id);
// Read the data channel ID and message ID.
int data_channel_id;
uint64_t message_id;
size_t bytes_read;
if (!ParseQuicDataMessageHeader(data, len, &data_channel_id, &message_id,
&bytes_read)) {
LOG(LS_ERROR) << "Could not read QUIC message header from QUIC stream "
<< id;
return;
}
data += bytes_read;
len -= bytes_read;
// Retrieve the data channel which will handle the message.
const auto& data_channel_kv = data_channel_by_id_.find(data_channel_id);
if (data_channel_kv == data_channel_by_id_.end()) {
// TODO(mikescarlett): Implement OPEN message to create a new
// QuicDataChannel when messages are received for a nonexistent ID.
LOG(LS_ERROR) << "Data was received for QUIC data channel "
<< data_channel_id
<< " but it is not registered to the QuicDataTransport.";
return;
}
QuicDataChannel* data_channel = data_channel_kv->second;
QuicDataChannel::Message message;
message.id = message_id;
message.buffer = rtc::CopyOnWriteBuffer(data, len);
message.stream = stream;
data_channel->OnIncomingMessage(std::move(message));
}
} // namespace webrtc

View File

@ -0,0 +1,90 @@
/*
* Copyright 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_API_QUICDATATRANSPORT_H_
#define WEBRTC_API_QUICDATATRANSPORT_H_
#include <string>
#include <unordered_map>
#include "webrtc/api/datachannelinterface.h"
#include "webrtc/api/quicdatachannel.h"
#include "webrtc/base/scoped_ref_ptr.h"
#include "webrtc/base/sigslot.h"
#include "webrtc/base/thread.h"
namespace cricket {
class QuicTransportChannel;
class ReliableQuicStream;
} // namepsace cricket
namespace webrtc {
// QuicDataTransport creates QuicDataChannels for the PeerConnection. It also
// handles QUIC stream demuxing by distributing incoming QUIC streams from the
// QuicTransportChannel among the QuicDataChannels that it has created.
//
// QuicDataTransport reads the data channel ID from the incoming QUIC stream,
// then looks it up in a map of ID => QuicDataChannel. If the data channel
// exists, it sends the QUIC stream to the QuicDataChannel.
class QuicDataTransport : public sigslot::has_slots<> {
public:
QuicDataTransport(rtc::Thread* signaling_thread, rtc::Thread* worker_thread);
~QuicDataTransport() override;
// Sets the QUIC transport channel for the QuicDataChannels and the
// QuicDataTransport. Returns false if a different QUIC transport channel is
// already set, the QUIC transport channel cannot be set for any of the
// QuicDataChannels, or |channel| is NULL.
bool SetTransportChannel(cricket::QuicTransportChannel* channel);
// Creates a QuicDataChannel that uses this QuicDataTransport.
rtc::scoped_refptr<DataChannelInterface> CreateDataChannel(
const std::string& label,
const DataChannelInit* config);
// Removes a QuicDataChannel with the given ID from the QuicDataTransport's
// data channel map.
void DestroyDataChannel(int id);
// True if the QuicDataTransport has a data channel with the given ID.
bool HasDataChannel(int id) const;
// True if the QuicDataTransport has data channels.
bool HasDataChannels() const;
private:
// Called from the QuicTransportChannel when a ReliableQuicStream is created
// to receive incoming data.
void OnIncomingStream(cricket::ReliableQuicStream* stream);
// Called from the ReliableQuicStream when the first QUIC stream frame is
// received for incoming data. The QuicDataTransport reads the data channel ID
// and message ID from the incoming data, then dispatches the
// ReliableQuicStream to the QuicDataChannel with the same data channel ID.
void OnDataReceived(net::QuicStreamId stream_id,
const char* data,
size_t len);
// Map of data channel ID => QUIC data channel values.
std::unordered_map<int, rtc::scoped_refptr<QuicDataChannel>>
data_channel_by_id_;
// Map of QUIC stream ID => ReliableQuicStream* values.
std::unordered_map<net::QuicStreamId, cricket::ReliableQuicStream*>
quic_stream_by_id_;
// QuicTransportChannel for sending/receiving data.
cricket::QuicTransportChannel* quic_transport_channel_ = nullptr;
// Signaling and worker threads for the QUIC data channel.
rtc::Thread* const signaling_thread_;
rtc::Thread* const worker_thread_;
};
} // namespace webrtc
#endif // WEBRTC_API_QUICDATATRANSPORT_H_

View File

@ -0,0 +1,355 @@
/*
* Copyright 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/api/quicdatatransport.h"
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "webrtc/api/quicdatachannel.h"
#include "webrtc/base/bytebuffer.h"
#include "webrtc/base/gunit.h"
#include "webrtc/p2p/base/faketransportcontroller.h"
#include "webrtc/p2p/quic/quictransportchannel.h"
#include "webrtc/p2p/quic/reliablequicstream.h"
using webrtc::DataBuffer;
using webrtc::DataChannelInit;
using webrtc::DataChannelInterface;
using webrtc::DataChannelObserver;
using webrtc::QuicDataChannel;
using webrtc::QuicDataTransport;
using cricket::FakeTransportChannel;
using cricket::QuicTransportChannel;
using cricket::ReliableQuicStream;
namespace {
// Timeout for asynchronous operations.
static const int kTimeoutMs = 1000; // milliseconds
// FakeObserver receives messages from the data channel.
class FakeObserver : public DataChannelObserver {
public:
FakeObserver() {}
void OnStateChange() override {}
void OnBufferedAmountChange(uint64_t previous_amount) override {}
void OnMessage(const webrtc::DataBuffer& buffer) override {
messages_.push_back(std::string(buffer.data.data<char>(), buffer.size()));
}
const std::vector<std::string>& messages() const { return messages_; }
size_t messages_received() const { return messages_.size(); }
private:
std::vector<std::string> messages_;
};
// A peer who uses a QUIC transport channel and fake ICE transport channel to
// send or receive data.
class QuicDataTransportPeer {
public:
QuicDataTransportPeer()
: quic_data_transport_(rtc::Thread::Current(), rtc::Thread::Current()),
ice_transport_channel_("data", 0),
quic_transport_channel_(&ice_transport_channel_) {
ice_transport_channel_.SetAsync(true);
}
void GenerateCertificateAndFingerprint() {
rtc::scoped_refptr<rtc::RTCCertificate> local_cert =
rtc::RTCCertificate::Create(rtc::scoped_ptr<rtc::SSLIdentity>(
rtc::SSLIdentity::Generate("cert_name", rtc::KT_DEFAULT)));
quic_transport_channel_.SetLocalCertificate(local_cert);
local_fingerprint_.reset(CreateFingerprint(local_cert.get()));
}
// Connects |ice_transport_channel_| to that of the other peer.
void Connect(QuicDataTransportPeer* other_peer) {
ice_transport_channel_.Connect();
other_peer->ice_transport_channel_.Connect();
ice_transport_channel_.SetDestination(&other_peer->ice_transport_channel_);
}
rtc::scoped_ptr<rtc::SSLFingerprint>& local_fingerprint() {
return local_fingerprint_;
}
QuicTransportChannel* quic_transport_channel() {
return &quic_transport_channel_;
}
// Write a messge directly to the ReliableQuicStream.
void WriteMessage(int data_channel_id,
uint64_t message_id,
const std::string& message) {
ReliableQuicStream* stream = quic_transport_channel_.CreateQuicStream();
rtc::CopyOnWriteBuffer payload;
webrtc::WriteQuicDataChannelMessageHeader(data_channel_id, message_id,
&payload);
stream->Write(payload.data<char>(), payload.size(), false);
stream->Write(message.data(), message.size(), true);
}
rtc::scoped_refptr<DataChannelInterface> CreateDataChannel(
const DataChannelInit* config) {
return quic_data_transport_.CreateDataChannel("testing", config);
}
QuicDataTransport* quic_data_transport() { return &quic_data_transport_; }
private:
// Creates a fingerprint from a certificate.
rtc::SSLFingerprint* CreateFingerprint(rtc::RTCCertificate* cert) {
std::string digest_algorithm;
cert->ssl_certificate().GetSignatureDigestAlgorithm(&digest_algorithm);
rtc::scoped_ptr<rtc::SSLFingerprint> fingerprint(
rtc::SSLFingerprint::Create(digest_algorithm, cert->identity()));
return fingerprint.release();
}
QuicDataTransport quic_data_transport_;
FakeTransportChannel ice_transport_channel_;
QuicTransportChannel quic_transport_channel_;
rtc::scoped_ptr<rtc::SSLFingerprint> local_fingerprint_;
};
class QuicDataTransportTest : public testing::Test {
public:
QuicDataTransportTest() {}
void ConnectTransportChannels() {
SetCryptoParameters();
peer1_.Connect(&peer2_);
ASSERT_TRUE_WAIT(peer1_.quic_transport_channel()->writable() &&
peer2_.quic_transport_channel()->writable(),
kTimeoutMs);
}
void SetTransportChannels() {
ASSERT_TRUE(peer1_.quic_data_transport()->SetTransportChannel(
peer1_.quic_transport_channel()));
ASSERT_TRUE(peer2_.quic_data_transport()->SetTransportChannel(
peer2_.quic_transport_channel()));
}
// Sets crypto parameters required for the QUIC handshake.
void SetCryptoParameters() {
peer1_.GenerateCertificateAndFingerprint();
peer2_.GenerateCertificateAndFingerprint();
peer1_.quic_transport_channel()->SetSslRole(rtc::SSL_CLIENT);
peer2_.quic_transport_channel()->SetSslRole(rtc::SSL_SERVER);
rtc::scoped_ptr<rtc::SSLFingerprint>& peer1_fingerprint =
peer1_.local_fingerprint();
rtc::scoped_ptr<rtc::SSLFingerprint>& peer2_fingerprint =
peer2_.local_fingerprint();
peer1_.quic_transport_channel()->SetRemoteFingerprint(
peer2_fingerprint->algorithm,
reinterpret_cast<const uint8_t*>(peer2_fingerprint->digest.data()),
peer2_fingerprint->digest.size());
peer2_.quic_transport_channel()->SetRemoteFingerprint(
peer1_fingerprint->algorithm,
reinterpret_cast<const uint8_t*>(peer1_fingerprint->digest.data()),
peer1_fingerprint->digest.size());
}
protected:
QuicDataTransportPeer peer1_;
QuicDataTransportPeer peer2_;
};
// Tests creation and destruction of data channels.
TEST_F(QuicDataTransportTest, CreateAndDestroyDataChannels) {
QuicDataTransport* quic_data_transport = peer2_.quic_data_transport();
EXPECT_FALSE(quic_data_transport->HasDataChannels());
for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) {
EXPECT_FALSE(quic_data_transport->HasDataChannel(data_channel_id));
webrtc::DataChannelInit config;
config.id = data_channel_id;
rtc::scoped_refptr<DataChannelInterface> data_channel =
peer2_.CreateDataChannel(&config);
EXPECT_NE(nullptr, data_channel);
EXPECT_EQ(data_channel_id, data_channel->id());
EXPECT_TRUE(quic_data_transport->HasDataChannel(data_channel_id));
}
EXPECT_TRUE(quic_data_transport->HasDataChannels());
for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) {
quic_data_transport->DestroyDataChannel(data_channel_id);
EXPECT_FALSE(quic_data_transport->HasDataChannel(data_channel_id));
}
EXPECT_FALSE(quic_data_transport->HasDataChannels());
}
// Tests that the QuicDataTransport does not allow creating multiple
// QuicDataChannels with the same id.
TEST_F(QuicDataTransportTest, CannotCreateDataChannelsWithSameId) {
webrtc::DataChannelInit config;
config.id = 2;
EXPECT_NE(nullptr, peer2_.CreateDataChannel(&config));
EXPECT_EQ(nullptr, peer2_.CreateDataChannel(&config));
}
// Tests that any data channels created by the QuicDataTransport are in state
// kConnecting before the QuicTransportChannel is set, then transiton to state
// kOpen when the transport channel becomes writable.
TEST_F(QuicDataTransportTest, DataChannelsOpenWhenTransportChannelWritable) {
webrtc::DataChannelInit config1;
config1.id = 7;
rtc::scoped_refptr<DataChannelInterface> data_channel1 =
peer2_.CreateDataChannel(&config1);
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel1->state());
SetTransportChannels();
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel1->state());
webrtc::DataChannelInit config2;
config2.id = 14;
rtc::scoped_refptr<DataChannelInterface> data_channel2 =
peer2_.CreateDataChannel(&config2);
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, data_channel2->state());
// Existing data channels should open once the transport channel is writable.
ConnectTransportChannels();
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel1->state(),
kTimeoutMs);
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, data_channel2->state(),
kTimeoutMs);
// Any data channels created afterwards should start in state kOpen.
webrtc::DataChannelInit config3;
config3.id = 21;
rtc::scoped_refptr<DataChannelInterface> data_channel3 =
peer2_.CreateDataChannel(&config3);
EXPECT_EQ(webrtc::DataChannelInterface::kOpen, data_channel3->state());
}
// Tests that the QuicTransport dispatches messages for one QuicDataChannel.
TEST_F(QuicDataTransportTest, ReceiveMessagesForSingleDataChannel) {
ConnectTransportChannels();
SetTransportChannels();
int data_channel_id = 1337;
webrtc::DataChannelInit config;
config.id = data_channel_id;
rtc::scoped_refptr<DataChannelInterface> peer2_data_channel =
peer2_.CreateDataChannel(&config);
FakeObserver observer;
peer2_data_channel->RegisterObserver(&observer);
uint64_t message1_id = 26u;
peer1_.WriteMessage(data_channel_id, message1_id, "Testing");
ASSERT_EQ_WAIT(1, observer.messages_received(), kTimeoutMs);
EXPECT_EQ("Testing", observer.messages()[0]);
uint64_t message2_id = 402u;
peer1_.WriteMessage(data_channel_id, message2_id, "Hello, World!");
ASSERT_EQ_WAIT(2, observer.messages_received(), kTimeoutMs);
EXPECT_EQ("Hello, World!", observer.messages()[1]);
uint64_t message3_id = 100260415u;
peer1_.WriteMessage(data_channel_id, message3_id, "Third message");
ASSERT_EQ_WAIT(3, observer.messages_received(), kTimeoutMs);
EXPECT_EQ("Third message", observer.messages()[2]);
}
// Tests that the QuicTransport dispatches messages to the correct data channel
// when multiple are in use.
TEST_F(QuicDataTransportTest, ReceiveMessagesForMultipleDataChannels) {
ConnectTransportChannels();
SetTransportChannels();
std::vector<rtc::scoped_refptr<DataChannelInterface>> data_channels;
for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) {
webrtc::DataChannelInit config;
config.id = data_channel_id;
data_channels.push_back(peer2_.CreateDataChannel(&config));
}
for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) {
uint64_t message1_id = 48023u;
FakeObserver observer;
DataChannelInterface* peer2_data_channel =
data_channels[data_channel_id].get();
peer2_data_channel->RegisterObserver(&observer);
peer1_.WriteMessage(data_channel_id, message1_id, "Testing");
ASSERT_EQ_WAIT(1, observer.messages_received(), kTimeoutMs);
EXPECT_EQ("Testing", observer.messages()[0]);
uint64_t message2_id = 1372643095u;
peer1_.WriteMessage(data_channel_id, message2_id, "Hello, World!");
ASSERT_EQ_WAIT(2, observer.messages_received(), kTimeoutMs);
EXPECT_EQ("Hello, World!", observer.messages()[1]);
}
}
// Tests end-to-end that both peers can use multiple QuicDataChannels to
// send/receive messages using a QuicDataTransport.
TEST_F(QuicDataTransportTest, EndToEndSendReceiveMessages) {
ConnectTransportChannels();
SetTransportChannels();
std::vector<rtc::scoped_refptr<DataChannelInterface>> peer1_data_channels;
std::vector<rtc::scoped_refptr<DataChannelInterface>> peer2_data_channels;
for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) {
webrtc::DataChannelInit config;
config.id = data_channel_id;
peer1_data_channels.push_back(peer1_.CreateDataChannel(&config));
peer2_data_channels.push_back(peer2_.CreateDataChannel(&config));
}
for (int data_channel_id = 0; data_channel_id < 5; ++data_channel_id) {
DataChannelInterface* peer1_data_channel =
peer1_data_channels[data_channel_id].get();
FakeObserver observer1;
peer1_data_channel->RegisterObserver(&observer1);
DataChannelInterface* peer2_data_channel =
peer2_data_channels[data_channel_id].get();
FakeObserver observer2;
peer2_data_channel->RegisterObserver(&observer2);
peer1_data_channel->Send(webrtc::DataBuffer("Peer 1 message 1"));
ASSERT_EQ_WAIT(1, observer2.messages_received(), kTimeoutMs);
EXPECT_EQ("Peer 1 message 1", observer2.messages()[0]);
peer1_data_channel->Send(webrtc::DataBuffer("Peer 1 message 2"));
ASSERT_EQ_WAIT(2, observer2.messages_received(), kTimeoutMs);
EXPECT_EQ("Peer 1 message 2", observer2.messages()[1]);
peer2_data_channel->Send(webrtc::DataBuffer("Peer 2 message 1"));
ASSERT_EQ_WAIT(1, observer1.messages_received(), kTimeoutMs);
EXPECT_EQ("Peer 2 message 1", observer1.messages()[0]);
peer2_data_channel->Send(webrtc::DataBuffer("Peer 2 message 2"));
ASSERT_EQ_WAIT(2, observer1.messages_received(), kTimeoutMs);
EXPECT_EQ("Peer 2 message 2", observer1.messages()[1]);
}
}
// Tests that SetTransportChannel returns false when setting a NULL transport
// channel or a transport channel that is not equivalent to the one already set.
TEST_F(QuicDataTransportTest, SetTransportChannelReturnValue) {
QuicDataTransport* quic_data_transport = peer1_.quic_data_transport();
EXPECT_FALSE(quic_data_transport->SetTransportChannel(nullptr));
QuicTransportChannel* transport_channel = peer1_.quic_transport_channel();
EXPECT_TRUE(quic_data_transport->SetTransportChannel(transport_channel));
EXPECT_TRUE(quic_data_transport->SetTransportChannel(transport_channel));
QuicTransportChannel* other_transport_channel =
peer2_.quic_transport_channel();
EXPECT_FALSE(
quic_data_transport->SetTransportChannel(other_transport_channel));
}
} // namespace