Inject signaling and network threads to DataChannel.

Add a few DCHECKs and comments about upcoming work.

Bug: webrtc:11547
Change-Id: I2d42f48cb93f31e70cf9fe4b3b62241c38bc9d8c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177106
Reviewed-by: Taylor <deadbeef@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31530}
This commit is contained in:
Tomas Gunnarsson 2020-06-15 13:47:42 +02:00 committed by Commit Bot
parent 2b7bbd9c5b
commit 7d3cfbf90d
9 changed files with 270 additions and 118 deletions

View File

@ -137,9 +137,12 @@ rtc::scoped_refptr<DataChannel> DataChannel::Create(
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
const std::string& label,
const InternalDataChannelInit& config) {
const InternalDataChannelInit& config,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread) {
rtc::scoped_refptr<DataChannel> channel(
new rtc::RefCountedObject<DataChannel>(config, provider, dct, label));
new rtc::RefCountedObject<DataChannel>(config, provider, dct, label,
signaling_thread, network_thread));
if (!channel->Init()) {
return nullptr;
}
@ -155,8 +158,12 @@ bool DataChannel::IsSctpLike(cricket::DataChannelType type) {
DataChannel::DataChannel(const InternalDataChannelInit& config,
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
const std::string& label)
: internal_id_(GenerateUniqueId()),
const std::string& label,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread)
: signaling_thread_(signaling_thread),
network_thread_(network_thread),
internal_id_(GenerateUniqueId()),
label_(label),
config_(config),
observer_(nullptr),
@ -174,9 +181,12 @@ DataChannel::DataChannel(const InternalDataChannelInit& config,
receive_ssrc_set_(false),
writable_(false),
send_ssrc_(0),
receive_ssrc_(0) {}
receive_ssrc_(0) {
RTC_DCHECK_RUN_ON(signaling_thread_);
}
bool DataChannel::Init() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (data_channel_type_ == cricket::DCT_RTP) {
if (config_.reliable || config_.id != -1 || config_.maxRetransmits ||
config_.maxRetransmitTime) {
@ -229,18 +239,23 @@ bool DataChannel::Init() {
return true;
}
DataChannel::~DataChannel() {}
DataChannel::~DataChannel() {
RTC_DCHECK_RUN_ON(signaling_thread_);
}
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = observer;
DeliverQueuedReceivedData();
}
void DataChannel::UnregisterObserver() {
observer_ = NULL;
RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = nullptr;
}
bool DataChannel::reliable() const {
// May be called on any thread.
if (data_channel_type_ == cricket::DCT_RTP) {
return false;
} else {
@ -249,10 +264,12 @@ bool DataChannel::reliable() const {
}
uint64_t DataChannel::buffered_amount() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return buffered_amount_;
}
void DataChannel::Close() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ == kClosed)
return;
send_ssrc_ = 0;
@ -262,11 +279,42 @@ void DataChannel::Close() {
UpdateState();
}
DataChannel::DataState DataChannel::state() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return state_;
}
RTCError DataChannel::error() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return error_;
}
uint32_t DataChannel::messages_sent() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return messages_sent_;
}
uint64_t DataChannel::bytes_sent() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return bytes_sent_;
}
uint32_t DataChannel::messages_received() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return messages_received_;
}
uint64_t DataChannel::bytes_received() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return bytes_received_;
}
bool DataChannel::Send(const DataBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
// TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
// thread. Bring buffer management etc to the network thread and keep the
// operational state management on the signaling thread.
buffered_amount_ += buffer.size();
if (state_ != kOpen) {
return false;
@ -306,6 +354,7 @@ bool DataChannel::Send(const DataBuffer& buffer) {
}
void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
if (receive_ssrc_set_) {
@ -329,6 +378,7 @@ void DataChannel::SetSctpSid(int sid) {
}
void DataChannel::OnClosingProcedureStartedRemotely(int sid) {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (IsSctpLike(data_channel_type_) && sid == config_.id &&
state_ != kClosing && state_ != kClosed) {
// Don't bother sending queued data since the side that initiated the
@ -345,6 +395,7 @@ void DataChannel::OnClosingProcedureStartedRemotely(int sid) {
}
void DataChannel::OnClosingProcedureComplete(int sid) {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (IsSctpLike(data_channel_type_) && sid == config_.id) {
// If the closing procedure is complete, we should have finished sending
// all pending data and transitioned to kClosing already.
@ -356,6 +407,7 @@ void DataChannel::OnClosingProcedureComplete(int sid) {
}
void DataChannel::OnTransportChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(IsSctpLike(data_channel_type_));
if (!connected_to_provider_) {
connected_to_provider_ = provider_->ConnectDataChannel(this);
@ -385,6 +437,7 @@ void DataChannel::RemotePeerRequestClose() {
}
void DataChannel::SetSendSsrc(uint32_t send_ssrc) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
if (send_ssrc_set_) {
return;
@ -396,6 +449,7 @@ void DataChannel::SetSendSsrc(uint32_t send_ssrc) {
void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& payload) {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (data_channel_type_ == cricket::DCT_RTP && params.ssrc != receive_ssrc_) {
return;
}
@ -462,6 +516,8 @@ void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
}
void DataChannel::OnChannelReady(bool writable) {
RTC_DCHECK_RUN_ON(signaling_thread_);
writable_ = writable;
if (!writable) {
return;
@ -473,6 +529,8 @@ void DataChannel::OnChannelReady(bool writable) {
}
void DataChannel::CloseAbruptlyWithError(RTCError error) {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ == kClosed) {
return;
}
@ -501,6 +559,7 @@ void DataChannel::CloseAbruptlyWithDataChannelFailure(
}
void DataChannel::UpdateState() {
RTC_DCHECK_RUN_ON(signaling_thread_);
// UpdateState determines what to do from a few state variables. Include
// all conditions required for each state transition here for
// clarity. OnChannelReady(true) will send any queued data and then invoke
@ -568,6 +627,7 @@ void DataChannel::UpdateState() {
}
void DataChannel::SetState(DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ == state) {
return;
}
@ -584,6 +644,7 @@ void DataChannel::SetState(DataState state) {
}
void DataChannel::DisconnectFromProvider() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (!connected_to_provider_)
return;
@ -592,6 +653,7 @@ void DataChannel::DisconnectFromProvider() {
}
void DataChannel::DeliverQueuedReceivedData() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (!observer_) {
return;
}
@ -605,6 +667,7 @@ void DataChannel::DeliverQueuedReceivedData() {
}
void DataChannel::SendQueuedDataMessages() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (queued_send_data_.Empty()) {
return;
}
@ -623,6 +686,7 @@ void DataChannel::SendQueuedDataMessages() {
bool DataChannel::SendDataMessage(const DataBuffer& buffer,
bool queue_if_blocked) {
RTC_DCHECK_RUN_ON(signaling_thread_);
cricket::SendDataParams send_params;
if (IsSctpLike(data_channel_type_)) {
@ -681,6 +745,7 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer,
}
bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
size_t start_buffered_amount = queued_send_data_.byte_count();
if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) {
RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
@ -691,6 +756,7 @@ bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
}
void DataChannel::SendQueuedControlMessages() {
RTC_DCHECK_RUN_ON(signaling_thread_);
PacketQueue control_packets;
control_packets.Swap(&queued_control_data_);
@ -701,10 +767,12 @@ void DataChannel::SendQueuedControlMessages() {
}
void DataChannel::QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
}
bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
RTC_DCHECK(IsSctpLike(data_channel_type_));

View File

@ -117,47 +117,51 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> {
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
const std::string& label,
const InternalDataChannelInit& config);
const InternalDataChannelInit& config,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread);
static bool IsSctpLike(cricket::DataChannelType type);
virtual void RegisterObserver(DataChannelObserver* observer);
virtual void UnregisterObserver();
void RegisterObserver(DataChannelObserver* observer) override;
void UnregisterObserver() override;
virtual std::string label() const { return label_; }
virtual bool reliable() const;
virtual bool ordered() const { return config_.ordered; }
std::string label() const override { return label_; }
bool reliable() const override;
bool ordered() const override { return config_.ordered; }
// Backwards compatible accessors
virtual uint16_t maxRetransmitTime() const {
uint16_t maxRetransmitTime() const override {
return config_.maxRetransmitTime ? *config_.maxRetransmitTime
: static_cast<uint16_t>(-1);
}
virtual uint16_t maxRetransmits() const {
uint16_t maxRetransmits() const override {
return config_.maxRetransmits ? *config_.maxRetransmits
: static_cast<uint16_t>(-1);
}
virtual absl::optional<int> maxPacketLifeTime() const {
absl::optional<int> maxPacketLifeTime() const override {
return config_.maxRetransmitTime;
}
virtual absl::optional<int> maxRetransmitsOpt() const {
absl::optional<int> maxRetransmitsOpt() const override {
return config_.maxRetransmits;
}
virtual std::string protocol() const { return config_.protocol; }
virtual bool negotiated() const { return config_.negotiated; }
virtual int id() const { return config_.id; }
virtual Priority priority() const {
std::string protocol() const override { return config_.protocol; }
bool negotiated() const override { return config_.negotiated; }
int id() const override { return config_.id; }
Priority priority() const override {
return config_.priority ? *config_.priority : Priority::kLow;
}
virtual int internal_id() const { return internal_id_; }
virtual uint64_t buffered_amount() const;
virtual void Close();
virtual DataState state() const { return state_; }
virtual RTCError error() const;
virtual uint32_t messages_sent() const { return messages_sent_; }
virtual uint64_t bytes_sent() const { return bytes_sent_; }
virtual uint32_t messages_received() const { return messages_received_; }
virtual uint64_t bytes_received() const { return bytes_received_; }
virtual bool Send(const DataBuffer& buffer);
uint64_t buffered_amount() const override;
void Close() override;
DataState state() const override;
RTCError error() const override;
uint32_t messages_sent() const override;
uint64_t bytes_sent() const override;
uint32_t messages_received() const override;
uint64_t bytes_received() const override;
bool Send(const DataBuffer& buffer) override;
// Close immediately, ignoring any queued data or closing procedure.
// This is called for RTP data channels when SDP indicates a channel should
@ -234,8 +238,10 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> {
DataChannel(const InternalDataChannelInit& config,
DataChannelProviderInterface* client,
cricket::DataChannelType dct,
const std::string& label);
virtual ~DataChannel();
const std::string& label,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread);
~DataChannel() override;
private:
// A packet queue which tracks the total queued bytes. Queued packets are
@ -284,36 +290,38 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> {
void QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer);
bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
rtc::Thread* const signaling_thread_;
rtc::Thread* const network_thread_;
const int internal_id_;
const std::string label_;
const InternalDataChannelInit config_;
DataChannelObserver* observer_;
DataState state_;
RTCError error_;
uint32_t messages_sent_;
uint64_t bytes_sent_;
uint32_t messages_received_;
uint64_t bytes_received_;
DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_);
DataState state_ RTC_GUARDED_BY(signaling_thread_);
RTCError error_ RTC_GUARDED_BY(signaling_thread_);
uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_);
uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_);
uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_);
uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_);
// Number of bytes of data that have been queued using Send(). Increased
// before each transport send and decreased after each successful send.
uint64_t buffered_amount_;
uint64_t buffered_amount_ RTC_GUARDED_BY(signaling_thread_);
const cricket::DataChannelType data_channel_type_;
DataChannelProviderInterface* provider_;
HandshakeState handshake_state_;
bool connected_to_provider_;
bool send_ssrc_set_;
bool receive_ssrc_set_;
bool writable_;
DataChannelProviderInterface* const provider_;
HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_);
bool connected_to_provider_ RTC_GUARDED_BY(signaling_thread_);
bool send_ssrc_set_ RTC_GUARDED_BY(signaling_thread_);
bool receive_ssrc_set_ RTC_GUARDED_BY(signaling_thread_);
bool writable_ RTC_GUARDED_BY(signaling_thread_);
// Did we already start the graceful SCTP closing procedure?
bool started_closing_procedure_ = false;
uint32_t send_ssrc_;
uint32_t receive_ssrc_;
bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false;
uint32_t send_ssrc_ RTC_GUARDED_BY(signaling_thread_);
uint32_t receive_ssrc_ RTC_GUARDED_BY(signaling_thread_);
// Control messages that always have to get sent out before any queued
// data.
PacketQueue queued_control_data_;
PacketQueue queued_received_data_;
PacketQueue queued_send_data_;
rtc::AsyncInvoker invoker_;
PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_);
rtc::AsyncInvoker invoker_ RTC_GUARDED_BY(signaling_thread_);
};
// Define proxy for DataChannelInterface.
@ -341,6 +349,7 @@ PROXY_CONSTMETHOD0(uint32_t, messages_received)
PROXY_CONSTMETHOD0(uint64_t, bytes_received)
PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
PROXY_METHOD0(void, Close)
// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
PROXY_METHOD1(bool, Send, const DataBuffer&)
END_PROXY_MAP()

View File

@ -25,37 +25,10 @@ bool DataChannelController::HasDataChannels() const {
bool DataChannelController::SendData(const cricket::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
// RTC_DCHECK_RUN_ON(signaling_thread());
if (data_channel_transport()) {
SendDataParams send_params;
send_params.type = ToWebrtcDataMessageType(params.type);
send_params.ordered = params.ordered;
if (params.max_rtx_count >= 0) {
send_params.max_rtx_count = params.max_rtx_count;
} else if (params.max_rtx_ms >= 0) {
send_params.max_rtx_ms = params.max_rtx_ms;
}
RTCError error = network_thread()->Invoke<RTCError>(
RTC_FROM_HERE, [this, params, send_params, payload] {
return data_channel_transport()->SendData(params.sid, send_params,
payload);
});
if (error.ok()) {
*result = cricket::SendDataResult::SDR_SUCCESS;
return true;
} else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
// SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
// TODO(mellem): Stop using RTCError here and get rid of the mapping.
*result = cricket::SendDataResult::SDR_BLOCK;
return false;
}
*result = cricket::SendDataResult::SDR_ERROR;
return false;
} else if (rtp_data_channel()) {
if (data_channel_transport())
return DataChannelSendData(params, payload, result);
if (rtp_data_channel())
return rtp_data_channel()->SendData(params, payload, result);
}
RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
return false;
}
@ -146,6 +119,14 @@ void DataChannelController::OnDataReceived(
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this, params, buffer] {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/11547): The data being received should be
// delivered on the network thread. The way HandleOpenMessage_s works
// right now is that it's called for all types of buffers and operates
// as a selector function. Change this so that it's only called for
// buffers that it should be able to handle. Once we do that, we can
// deliver all other buffers on the network thread (change
// SignalDataChannelTransportReceivedData_s to
// SignalDataChannelTransportReceivedData_n).
if (!HandleOpenMessage_s(params, buffer)) {
SignalDataChannelTransportReceivedData_s(params, buffer);
}
@ -261,6 +242,7 @@ void DataChannelController::OnDataChannelOpenMessage(
return;
}
// TODO(bugs.webrtc.org/11547): Inject the network thread as well.
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
@ -299,7 +281,8 @@ DataChannelController::InternalCreateDataChannel(
}
rtc::scoped_refptr<DataChannel> channel(
DataChannel::Create(this, data_channel_type(), label, new_config));
DataChannel::Create(this, data_channel_type(), label, new_config,
signaling_thread(), network_thread()));
if (!channel) {
sid_allocator_.ReleaseSid(new_config.id);
return nullptr;
@ -424,9 +407,10 @@ void DataChannelController::UpdateLocalRtpDataChannels(
void DataChannelController::UpdateRemoteRtpDataChannels(
const cricket::StreamParamsVec& streams) {
RTC_DCHECK_RUN_ON(signaling_thread());
std::vector<std::string> existing_channels;
RTC_DCHECK_RUN_ON(signaling_thread());
// Find new and active data channels.
for (const cricket::StreamParams& params : streams) {
// The data channel label is either the mslabel or the SSRC if the mslabel
@ -447,6 +431,44 @@ void DataChannelController::UpdateRemoteRtpDataChannels(
UpdateClosingRtpDataChannels(existing_channels, false);
}
cricket::DataChannelType DataChannelController::data_channel_type() const {
// TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
// RTC_DCHECK_RUN_ON(signaling_thread());
return data_channel_type_;
}
void DataChannelController::set_data_channel_type(
cricket::DataChannelType type) {
RTC_DCHECK_RUN_ON(signaling_thread());
data_channel_type_ = type;
}
DataChannelTransportInterface* DataChannelController::data_channel_transport()
const {
// TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
// network thread.
// RTC_DCHECK_RUN_ON(network_thread());
return data_channel_transport_;
}
void DataChannelController::set_data_channel_transport(
DataChannelTransportInterface* transport) {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_ = transport;
}
const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
DataChannelController::rtp_data_channels() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return &rtp_data_channels_;
}
const std::vector<rtc::scoped_refptr<DataChannel>>*
DataChannelController::sctp_data_channels() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return &sctp_data_channels_;
}
void DataChannelController::UpdateClosingRtpDataChannels(
const std::vector<std::string>& active_channels,
bool is_local_update) {
@ -483,11 +505,50 @@ void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label,
return;
}
channel->SetReceiveSsrc(remote_ssrc);
// TODO(bugs.webrtc.org/11547): Inject the network thread as well.
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
}
bool DataChannelController::DataChannelSendData(
const cricket::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
// TODO(bugs.webrtc.org/11547): Expect method to be called on the network
// thread instead. Remove the Invoke() below and move assocated state to
// the network thread.
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(data_channel_transport());
SendDataParams send_params;
send_params.type = ToWebrtcDataMessageType(params.type);
send_params.ordered = params.ordered;
if (params.max_rtx_count >= 0) {
send_params.max_rtx_count = params.max_rtx_count;
} else if (params.max_rtx_ms >= 0) {
send_params.max_rtx_ms = params.max_rtx_ms;
}
RTCError error = network_thread()->Invoke<RTCError>(
RTC_FROM_HERE, [this, params, send_params, payload] {
return data_channel_transport()->SendData(params.sid, send_params,
payload);
});
if (error.ok()) {
*result = cricket::SendDataResult::SDR_SUCCESS;
return true;
} else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
// SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
// TODO(mellem): Stop using RTCError here and get rid of the mapping.
*result = cricket::SendDataResult::SDR_BLOCK;
return false;
}
*result = cricket::SendDataResult::SDR_ERROR;
return false;
}
rtc::Thread* DataChannelController::network_thread() const {
return pc_->network_thread();
}

View File

@ -89,34 +89,20 @@ class DataChannelController : public DataChannelProviderInterface,
void UpdateRemoteRtpDataChannels(const cricket::StreamParamsVec& streams);
// Accessors
cricket::DataChannelType data_channel_type() const {
return data_channel_type_;
}
void set_data_channel_type(cricket::DataChannelType type) {
data_channel_type_ = type;
}
cricket::DataChannelType data_channel_type() const;
void set_data_channel_type(cricket::DataChannelType type);
cricket::RtpDataChannel* rtp_data_channel() const {
return rtp_data_channel_;
}
void set_rtp_data_channel(cricket::RtpDataChannel* channel) {
rtp_data_channel_ = channel;
}
DataChannelTransportInterface* data_channel_transport() const {
return data_channel_transport_;
}
void set_data_channel_transport(DataChannelTransportInterface* transport) {
data_channel_transport_ = transport;
}
DataChannelTransportInterface* data_channel_transport() const;
void set_data_channel_transport(DataChannelTransportInterface* transport);
const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
rtp_data_channels() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return &rtp_data_channels_;
}
rtp_data_channels() const;
const std::vector<rtc::scoped_refptr<DataChannel>>* sctp_data_channels()
const {
RTC_DCHECK_RUN_ON(signaling_thread());
return &sctp_data_channels_;
}
const;
sigslot::signal1<DataChannel*>& SignalDataChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread());
@ -146,6 +132,11 @@ class DataChannelController : public DataChannelProviderInterface,
const std::vector<std::string>& active_channels,
bool is_local_update) RTC_RUN_ON(signaling_thread());
// Called from SendData when data_channel_transport() is true.
bool DataChannelSendData(const cricket::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result);
rtc::Thread* network_thread() const;
rtc::Thread* signaling_thread() const;
@ -189,6 +180,8 @@ class DataChannelController : public DataChannelProviderInterface,
// Signals from |data_channel_transport_|. These are invoked on the
// signaling thread.
// TODO(bugs.webrtc.org/11547): These '_s' signals likely all belong on the
// network thread.
sigslot::signal1<bool> SignalDataChannelTransportWritable_s
RTC_GUARDED_BY(signaling_thread());
sigslot::signal2<const cricket::ReceiveDataParams&,

View File

@ -64,6 +64,7 @@ class FakeDataChannelObserver : public webrtc::DataChannelObserver {
// TODO(deadbeef): The fact that these tests use a fake provider makes them not
// too valuable. Should rewrite using the
// peerconnection_datachannel_unittest.cc infrastructure.
// TODO(bugs.webrtc.org/11547): Incorporate a dedicated network thread.
class SctpDataChannelTest : public ::testing::Test {
protected:
SctpDataChannelTest()
@ -71,7 +72,9 @@ class SctpDataChannelTest : public ::testing::Test {
webrtc_data_channel_(DataChannel::Create(provider_.get(),
cricket::DCT_SCTP,
"test",
init_)) {}
init_,
rtc::Thread::Current(),
rtc::Thread::Current())) {}
void SetChannelReady() {
provider_->set_transport_available(true);
@ -111,7 +114,8 @@ class StateSignalsListener : public sigslot::has_slots<> {
TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
provider_->set_transport_available(true);
rtc::scoped_refptr<DataChannel> dc =
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_);
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_,
rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_TRUE(provider_->IsConnected(dc.get()));
// The sid is not set yet, so it should not have added the streams.
@ -305,7 +309,8 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
webrtc::InternalDataChannelInit init;
init.id = 1;
rtc::scoped_refptr<DataChannel> dc =
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, dc->state());
EXPECT_TRUE_WAIT(webrtc::DataChannelInterface::kOpen == dc->state(), 1000);
}
@ -318,7 +323,8 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) {
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<DataChannel> dc =
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
@ -348,7 +354,8 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) {
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<DataChannel> dc =
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
@ -449,7 +456,8 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) {
SetChannelReady();
rtc::scoped_refptr<DataChannel> dc =
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config);
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config,
rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ(0U, provider_->last_send_data_params().ssrc);
@ -512,7 +520,8 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) {
SetChannelReady();
rtc::scoped_refptr<DataChannel> dc =
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config);
DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config,
rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);

View File

@ -2211,6 +2211,7 @@ rtc::scoped_refptr<DataChannelInterface> PeerConnection::CreateDataChannel(
UpdateNegotiationNeeded();
}
NoteUsageEvent(UsageEvent::DATA_ADDED);
// TODO(bugs.webrtc.org/11547): Inject the network thread as well.
return DataChannelProxy::Create(signaling_thread(), channel.get());
}
@ -6714,6 +6715,8 @@ bool PeerConnection::CreateDataChannel(const std::string& mid) {
case cricket::DCT_RTP:
default:
RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
// TODO(bugs.webrtc.org/9987): set_rtp_data_channel() should be called on
// the network thread like set_data_channel_transport is.
data_channel_controller_.set_rtp_data_channel(
channel_manager()->CreateRtpDataChannel(
configuration_.media_config, rtp_transport, signaling_thread(),

View File

@ -1398,11 +1398,14 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
report->Get("RTCPeerConnection")->cast_to<RTCPeerConnectionStats>());
}
// TODO(bugs.webrtc.org/11547): Supply a separate network thread.
rtc::scoped_refptr<DataChannel> dummy_channel_a = DataChannel::Create(
nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit());
nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());
pc_->SignalDataChannelCreated()(dummy_channel_a.get());
rtc::scoped_refptr<DataChannel> dummy_channel_b = DataChannel::Create(
nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit());
nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());
pc_->SignalDataChannelCreated()(dummy_channel_b.get());
dummy_channel_a->SignalOpened(dummy_channel_a.get());

View File

@ -174,8 +174,10 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase {
void AddSctpDataChannel(const std::string& label,
const InternalDataChannelInit& init) {
AddSctpDataChannel(DataChannel::Create(&data_channel_provider_,
cricket::DCT_SCTP, label, init));
// TODO(bugs.webrtc.org/11547): Supply a separate network thread.
AddSctpDataChannel(DataChannel::Create(
&data_channel_provider_, cricket::DCT_SCTP, label, init,
rtc::Thread::Current(), rtc::Thread::Current()));
}
void AddSctpDataChannel(rtc::scoped_refptr<DataChannel> data_channel) {

View File

@ -31,11 +31,15 @@ class MockDataChannel : public rtc::RefCountedObject<DataChannel> {
uint64_t bytes_sent,
uint32_t messages_received,
uint64_t bytes_received,
const InternalDataChannelInit& config = InternalDataChannelInit())
const InternalDataChannelInit& config = InternalDataChannelInit(),
rtc::Thread* signaling_thread = rtc::Thread::Current(),
rtc::Thread* network_thread = rtc::Thread::Current())
: rtc::RefCountedObject<DataChannel>(config,
nullptr,
cricket::DCT_NONE,
label) {
label,
signaling_thread,
network_thread) {
EXPECT_CALL(*this, id()).WillRepeatedly(::testing::Return(id));
EXPECT_CALL(*this, state()).WillRepeatedly(::testing::Return(state));
EXPECT_CALL(*this, protocol()).WillRepeatedly(::testing::Return(protocol));