diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 001bf02512..70becebc99 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -166,6 +166,7 @@ rtc_library("peerconnection") { "audio_track.h", "data_channel.cc", "data_channel.h", + "data_channel_controller.cc", "dtmf_sender.cc", "dtmf_sender.h", "ice_server_parsing.cc", diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc new file mode 100644 index 0000000000..484886b79c --- /dev/null +++ b/pc/data_channel_controller.cc @@ -0,0 +1,363 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +// This file contains the implementation of the class +// webrtc::PeerConnection::DataChannelController. +// +// The intent is that this should be webrtc::DataChannelController, but +// as a migration stage, it is simpler to have it as an inner class, +// declared in the header file pc/peer_connection.h + +#include "pc/peer_connection.h" +#include "pc/sctp_utils.h" + +namespace webrtc { + +bool PeerConnection::DataChannelController::SendData( + const cricket::SendDataParams& params, + const rtc::CopyOnWriteBuffer& payload, + cricket::SendDataResult* result) { + // RTC_DCHECK_RUN_ON(signaling_thread()); + if (data_channel_transport()) { + SendDataParams send_params; + send_params.type = ToWebrtcDataMessageType(params.type); + send_params.ordered = params.ordered; + if (params.max_rtx_count >= 0) { + send_params.max_rtx_count = params.max_rtx_count; + } else if (params.max_rtx_ms >= 0) { + send_params.max_rtx_ms = params.max_rtx_ms; + } + + RTCError error = network_thread()->Invoke( + RTC_FROM_HERE, [this, params, send_params, payload] { + return data_channel_transport()->SendData(params.sid, send_params, + payload); + }); + + if (error.ok()) { + *result = cricket::SendDataResult::SDR_SUCCESS; + return true; + } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) { + // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked. + // TODO(mellem): Stop using RTCError here and get rid of the mapping. + *result = cricket::SendDataResult::SDR_BLOCK; + return false; + } + *result = cricket::SendDataResult::SDR_ERROR; + return false; + } else if (rtp_data_channel()) { + return rtp_data_channel()->SendData(params, payload, result); + } + RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; + return false; +} + +bool PeerConnection::DataChannelController::ConnectDataChannel( + DataChannel* webrtc_data_channel) { + RTC_DCHECK_RUN_ON(signaling_thread()); + if (!rtp_data_channel() && !data_channel_transport()) { + // Don't log an error here, because DataChannels are expected to call + // ConnectDataChannel in this state. It's the only way to initially tell + // whether or not the underlying transport is ready. + return false; + } + if (data_channel_transport()) { + SignalDataChannelTransportWritable_s.connect(webrtc_data_channel, + &DataChannel::OnChannelReady); + SignalDataChannelTransportReceivedData_s.connect( + webrtc_data_channel, &DataChannel::OnDataReceived); + SignalDataChannelTransportChannelClosing_s.connect( + webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); + SignalDataChannelTransportChannelClosed_s.connect( + webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); + } + if (rtp_data_channel()) { + rtp_data_channel()->SignalReadyToSendData.connect( + webrtc_data_channel, &DataChannel::OnChannelReady); + rtp_data_channel()->SignalDataReceived.connect( + webrtc_data_channel, &DataChannel::OnDataReceived); + } + return true; +} + +void PeerConnection::DataChannelController::DisconnectDataChannel( + DataChannel* webrtc_data_channel) { + RTC_DCHECK_RUN_ON(signaling_thread()); + if (!rtp_data_channel() && !data_channel_transport()) { + RTC_LOG(LS_ERROR) + << "DisconnectDataChannel called when rtp_data_channel_ and " + "sctp_transport_ are NULL."; + return; + } + if (data_channel_transport()) { + SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); + } + if (rtp_data_channel()) { + rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel); + rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel); + } +} + +void PeerConnection::DataChannelController::AddSctpDataStream(int sid) { + if (data_channel_transport()) { + network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { + if (data_channel_transport()) { + data_channel_transport()->OpenChannel(sid); + } + }); + } +} + +void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) { + if (data_channel_transport()) { + network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { + if (data_channel_transport()) { + data_channel_transport()->CloseChannel(sid); + } + }); + } +} + +bool PeerConnection::DataChannelController::ReadyToSendData() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) || + (data_channel_transport() && data_channel_transport_ready_to_send_); +} + +void PeerConnection::DataChannelController::OnDataReceived( + int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) { + RTC_DCHECK_RUN_ON(network_thread()); + cricket::ReceiveDataParams params; + params.sid = channel_id; + params.type = ToCricketDataMessageType(type); + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { + RTC_DCHECK_RUN_ON(signaling_thread()); + if (!HandleOpenMessage_s(params, buffer)) { + SignalDataChannelTransportReceivedData_s(params, buffer); + } + }); +} + +void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this, channel_id] { + RTC_DCHECK_RUN_ON(signaling_thread()); + SignalDataChannelTransportChannelClosing_s(channel_id); + }); +} + +void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this, channel_id] { + RTC_DCHECK_RUN_ON(signaling_thread()); + SignalDataChannelTransportChannelClosed_s(channel_id); + }); +} + +void PeerConnection::DataChannelController::OnReadyToSend() { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + data_channel_transport_ready_to_send_ = true; + SignalDataChannelTransportWritable_s( + data_channel_transport_ready_to_send_); + }); +} + +void PeerConnection::DataChannelController::SetupDataChannelTransport_n() { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_invoker_ = std::make_unique(); +} + +void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_invoker_ = nullptr; + if (data_channel_transport()) { + data_channel_transport()->SetDataSink(nullptr); + } + set_data_channel_transport(nullptr); +} + +void PeerConnection::DataChannelController::OnTransportChanged( + DataChannelTransportInterface* new_data_channel_transport) { + RTC_DCHECK_RUN_ON(network_thread()); + if (data_channel_transport() && + data_channel_transport() != new_data_channel_transport) { + // Changed which data channel transport is used for |sctp_mid_| (eg. now + // it's bundled). + data_channel_transport()->SetDataSink(nullptr); + set_data_channel_transport(new_data_channel_transport); + if (new_data_channel_transport) { + new_data_channel_transport->SetDataSink(this); + + // There's a new data channel transport. This needs to be signaled to the + // |sctp_data_channels_| so that they can reopen and reconnect. This is + // necessary when bundling is applied. + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this] { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); + for (auto channel : pc_->sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } + }); + } + } +} + +bool PeerConnection::DataChannelController::HandleOpenMessage_s( + const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) { + if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) { + // Received OPEN message; parse and signal that a new data channel should + // be created. + std::string label; + InternalDataChannelInit config; + config.id = params.ssrc; + if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { + RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc " + << params.ssrc; + return true; + } + config.open_handshake_role = InternalDataChannelInit::kAcker; + OnDataChannelOpenMessage(label, config); + return true; + } + return false; +} + +void PeerConnection::DataChannelController::OnDataChannelOpenMessage( + const std::string& label, + const InternalDataChannelInit& config) { + rtc::scoped_refptr channel( + InternalCreateDataChannel(label, &config)); + if (!channel.get()) { + RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."; + return; + } + + rtc::scoped_refptr proxy_channel = + DataChannelProxy::Create(signaling_thread(), channel); + { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); + pc_->Observer()->OnDataChannel(std::move(proxy_channel)); + pc_->NoteUsageEvent(UsageEvent::DATA_ADDED); + } +} + +rtc::scoped_refptr +PeerConnection::DataChannelController::InternalCreateDataChannel( + const std::string& label, + const InternalDataChannelInit* config) { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); + if (pc_->IsClosed()) { + return nullptr; + } + if (pc_->data_channel_type() == cricket::DCT_NONE) { + RTC_LOG(LS_ERROR) + << "InternalCreateDataChannel: Data is not supported in this call."; + return nullptr; + } + InternalDataChannelInit new_config = + config ? (*config) : InternalDataChannelInit(); + if (DataChannel::IsSctpLike(pc_->data_channel_type_)) { + if (new_config.id < 0) { + rtc::SSLRole role; + if ((pc_->GetSctpSslRole(&role)) && + !sid_allocator_.AllocateSid(role, &new_config.id)) { + RTC_LOG(LS_ERROR) + << "No id can be allocated for the SCTP data channel."; + return nullptr; + } + } else if (!sid_allocator_.ReserveSid(new_config.id)) { + RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel " + "because the id is already in use or out of range."; + return nullptr; + } + } + + rtc::scoped_refptr channel( + DataChannel::Create(this, pc_->data_channel_type(), label, new_config)); + if (!channel) { + sid_allocator_.ReleaseSid(new_config.id); + return nullptr; + } + + if (channel->data_channel_type() == cricket::DCT_RTP) { + if (pc_->rtp_data_channels_.find(channel->label()) != + pc_->rtp_data_channels_.end()) { + RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label() + << " already exists."; + return nullptr; + } + pc_->rtp_data_channels_[channel->label()] = channel; + } else { + RTC_DCHECK(DataChannel::IsSctpLike(pc_->data_channel_type_)); + pc_->sctp_data_channels_.push_back(channel); + channel->SignalClosed.connect(pc_, + &PeerConnection::OnSctpDataChannelClosed); + } + + pc_->SignalDataChannelCreated_(channel.get()); + return channel; +} + +void PeerConnection::DataChannelController::AllocateSctpSids( + rtc::SSLRole role) { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); + std::vector> channels_to_close; + for (const auto& channel : pc_->sctp_data_channels_) { + if (channel->id() < 0) { + int sid; + if (!sid_allocator_.AllocateSid(role, &sid)) { + RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel."; + channels_to_close.push_back(channel); + continue; + } + channel->SetSctpSid(sid); + } + } + // Since closing modifies the list of channels, we have to do the actual + // closing outside the loop. + for (const auto& channel : channels_to_close) { + channel->CloseAbruptly(); + } +} + +void PeerConnection::DataChannelController::OnSctpDataChannelClosed( + DataChannel* channel) { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); + for (auto it = pc_->sctp_data_channels_.begin(); + it != pc_->sctp_data_channels_.end(); ++it) { + if (it->get() == channel) { + if (channel->id() >= 0) { + // After the closing procedure is done, it's safe to use this ID for + // another data channel. + sid_allocator_.ReleaseSid(channel->id()); + } + // Since this method is triggered by a signal from the DataChannel, + // we can't free it directly here; we need to free it asynchronously. + pc_->sctp_data_channels_to_free_.push_back(*it); + pc_->sctp_data_channels_.erase(it); + pc_->SignalFreeDataChannels(); + return; + } + } +} + +} // namespace webrtc diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index c24bd2e88a..0f632abb69 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -1050,8 +1050,8 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, remote_streams_(StreamCollection::Create()), call_(std::move(call)), call_ptr_(call_.get()), - data_channel_transport_(nullptr), local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()), + data_channel_controller_(this), weak_ptr_factory_(this) {} PeerConnection::~PeerConnection() { @@ -1081,7 +1081,6 @@ PeerConnection::~PeerConnection() { webrtc_session_desc_factory_.reset(); sctp_factory_.reset(); - data_channel_transport_invoker_.reset(); transport_controller_.reset(); // port_allocator_ lives on the network thread and should be destroyed there. @@ -2167,7 +2166,8 @@ rtc::scoped_refptr PeerConnection::CreateDataChannel( internal_config.reset(new InternalDataChannelInit(*config)); } rtc::scoped_refptr channel( - InternalCreateDataChannel(label, internal_config.get())); + data_channel_controller_.InternalCreateDataChannel( + label, internal_config.get())); if (!channel.get()) { return nullptr; } @@ -2767,7 +2767,7 @@ RTCError PeerConnection::ApplyLocalDescription( // SCTP sids. rtc::SSLRole role; if (DataChannel::IsSctpLike(data_channel_type_) && GetSctpSslRole(&role)) { - AllocateSctpSids(role); + data_channel_controller_.AllocateSctpSids(role); } if (IsUnifiedPlan()) { @@ -3205,7 +3205,7 @@ RTCError PeerConnection::ApplyRemoteDescription( // SCTP sids. rtc::SSLRole role; if (DataChannel::IsSctpLike(data_channel_type_) && GetSctpSslRole(&role)) { - AllocateSctpSids(role); + data_channel_controller_.AllocateSctpSids(role); } if (IsUnifiedPlan()) { @@ -3593,7 +3593,8 @@ RTCError PeerConnection::UpdateDataChannel( RTC_LOG(LS_INFO) << "Rejected data channel, mid=" << content.mid(); DestroyDataChannelTransport(); } else { - if (!rtp_data_channel_ && !data_channel_transport_) { + if (!data_channel_controller_.rtp_data_channel() && + !data_channel_controller_.data_channel_transport()) { RTC_LOG(LS_INFO) << "Creating data channel, mid=" << content.mid(); if (!CreateDataChannel(content.name)) { LOG_AND_RETURN_ERROR(RTCErrorType::INTERNAL_ERROR, @@ -5394,10 +5395,10 @@ PeerConnection::GetMediaDescriptionOptionsForRejectedData( absl::optional PeerConnection::GetDataMid() const { switch (data_channel_type_) { case cricket::DCT_RTP: - if (!rtp_data_channel_) { + if (!data_channel_controller_.rtp_data_channel()) { return absl::nullopt; } - return rtp_data_channel_->content_name(); + return data_channel_controller_.rtp_data_channel()->content_name(); case cricket::DCT_SCTP: case cricket::DCT_DATA_CHANNEL_TRANSPORT: case cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP: @@ -5738,7 +5739,7 @@ void PeerConnection::UpdateClosingRtpDataChannels( void PeerConnection::CreateRemoteRtpDataChannel(const std::string& label, uint32_t remote_ssrc) { rtc::scoped_refptr channel( - InternalCreateDataChannel(label, nullptr)); + data_channel_controller_.InternalCreateDataChannel(label, nullptr)); if (!channel.get()) { RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but" "CreateDataChannel failed."; @@ -5750,103 +5751,18 @@ void PeerConnection::CreateRemoteRtpDataChannel(const std::string& label, Observer()->OnDataChannel(std::move(proxy_channel)); } -rtc::scoped_refptr PeerConnection::InternalCreateDataChannel( - const std::string& label, - const InternalDataChannelInit* config) { - if (IsClosed()) { - return nullptr; - } - if (data_channel_type() == cricket::DCT_NONE) { - RTC_LOG(LS_ERROR) - << "InternalCreateDataChannel: Data is not supported in this call."; - return nullptr; - } - InternalDataChannelInit new_config = - config ? (*config) : InternalDataChannelInit(); - if (DataChannel::IsSctpLike(data_channel_type_)) { - if (new_config.id < 0) { - rtc::SSLRole role; - if ((GetSctpSslRole(&role)) && - !sid_allocator_.AllocateSid(role, &new_config.id)) { - RTC_LOG(LS_ERROR) - << "No id can be allocated for the SCTP data channel."; - return nullptr; - } - } else if (!sid_allocator_.ReserveSid(new_config.id)) { - RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel " - "because the id is already in use or out of range."; - return nullptr; - } - } - - rtc::scoped_refptr channel( - DataChannel::Create(this, data_channel_type(), label, new_config)); - if (!channel) { - sid_allocator_.ReleaseSid(new_config.id); - return nullptr; - } - - if (channel->data_channel_type() == cricket::DCT_RTP) { - if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) { - RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label() - << " already exists."; - return nullptr; - } - rtp_data_channels_[channel->label()] = channel; - } else { - RTC_DCHECK(DataChannel::IsSctpLike(data_channel_type_)); - sctp_data_channels_.push_back(channel); - channel->SignalClosed.connect(this, - &PeerConnection::OnSctpDataChannelClosed); - } - - SignalDataChannelCreated_(channel.get()); - return channel; -} - bool PeerConnection::HasDataChannels() const { return !rtp_data_channels_.empty() || !sctp_data_channels_.empty(); } -void PeerConnection::AllocateSctpSids(rtc::SSLRole role) { - std::vector> channels_to_close; - for (const auto& channel : sctp_data_channels_) { - if (channel->id() < 0) { - int sid; - if (!sid_allocator_.AllocateSid(role, &sid)) { - RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel."; - channels_to_close.push_back(channel); - continue; - } - channel->SetSctpSid(sid); - } - } - // Since closing modifies the list of channels, we have to do the actual - // closing outside the loop. - for (const auto& channel : channels_to_close) { - channel->CloseAbruptly(); - } +void PeerConnection::OnSctpDataChannelClosed(DataChannel* channel) { + // Since data_channel_controller doesn't do signals, this + // signal is relayed here. + data_channel_controller_.OnSctpDataChannelClosed(channel); } -void PeerConnection::OnSctpDataChannelClosed(DataChannel* channel) { - RTC_DCHECK(signaling_thread()->IsCurrent()); - for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end(); - ++it) { - if (it->get() == channel) { - if (channel->id() >= 0) { - // After the closing procedure is done, it's safe to use this ID for - // another data channel. - sid_allocator_.ReleaseSid(channel->id()); - } - // Since this method is triggered by a signal from the DataChannel, - // we can't free it directly here; we need to free it asynchronously. - sctp_data_channels_to_free_.push_back(*it); - sctp_data_channels_.erase(it); - signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FREE_DATACHANNELS, - nullptr); - return; - } - } +void PeerConnection::SignalFreeDataChannels() { + signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FREE_DATACHANNELS, nullptr); } void PeerConnection::OnTransportChannelClosed() { @@ -5865,43 +5781,6 @@ void PeerConnection::OnTransportChannelClosed() { } } -void PeerConnection::OnDataChannelOpenMessage( - const std::string& label, - const InternalDataChannelInit& config) { - rtc::scoped_refptr channel( - InternalCreateDataChannel(label, &config)); - if (!channel.get()) { - RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."; - return; - } - - rtc::scoped_refptr proxy_channel = - DataChannelProxy::Create(signaling_thread(), channel); - Observer()->OnDataChannel(std::move(proxy_channel)); - NoteUsageEvent(UsageEvent::DATA_ADDED); -} - -bool PeerConnection::HandleOpenMessage_s( - const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& buffer) { - if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) { - // Received OPEN message; parse and signal that a new data channel should - // be created. - std::string label; - InternalDataChannelInit config; - config.id = params.ssrc; - if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { - RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc " - << params.ssrc; - return true; - } - config.open_handshake_role = InternalDataChannelInit::kAcker; - OnDataChannelOpenMessage(label, config); - return true; - } - return false; -} - rtc::scoped_refptr> PeerConnection::GetAudioTransceiver() const { // This method only works with Plan B SDP, where there is a single @@ -6162,7 +6041,7 @@ bool PeerConnection::GetSctpSslRole(rtc::SSLRole* role) { "SSL Role of the SCTP transport."; return false; } - if (!data_channel_transport_) { + if (!data_channel_controller_.data_channel_transport()) { RTC_LOG(LS_INFO) << "Non-rejected SCTP m= section is needed to get the " "SSL Role of the SCTP transport."; return false; @@ -6279,7 +6158,7 @@ RTCError PeerConnection::PushdownMediaDescription( } // If using the RtpDataChannel, push down the new SDP section for it too. - if (rtp_data_channel_) { + if (data_channel_controller_.rtp_data_channel()) { const ContentInfo* data_content = cricket::GetFirstDataContent(sdesc->description()); if (data_content && !data_content->rejected) { @@ -6289,8 +6168,10 @@ RTCError PeerConnection::PushdownMediaDescription( std::string error; bool success = (source == cricket::CS_LOCAL) - ? rtp_data_channel_->SetLocalContent(data_desc, type, &error) - : rtp_data_channel_->SetRemoteContent(data_desc, type, &error); + ? data_channel_controller_.rtp_data_channel()->SetLocalContent( + data_desc, type, &error) + : data_channel_controller_.rtp_data_channel()->SetRemoteContent( + data_desc, type, &error); if (!success) { LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER, error); } @@ -6403,162 +6284,6 @@ cricket::IceConfig PeerConnection::ParseIceConfig( return ice_config; } -bool PeerConnection::SendData(const cricket::SendDataParams& params, - const rtc::CopyOnWriteBuffer& payload, - cricket::SendDataResult* result) { - RTC_DCHECK_RUN_ON(signaling_thread()); - if (data_channel_transport_) { - SendDataParams send_params; - send_params.type = ToWebrtcDataMessageType(params.type); - send_params.ordered = params.ordered; - if (params.max_rtx_count >= 0) { - send_params.max_rtx_count = params.max_rtx_count; - } else if (params.max_rtx_ms >= 0) { - send_params.max_rtx_ms = params.max_rtx_ms; - } - - RTCError error = network_thread()->Invoke( - RTC_FROM_HERE, [this, params, send_params, payload] { - return data_channel_transport_->SendData(params.sid, send_params, - payload); - }); - - if (error.ok()) { - *result = cricket::SendDataResult::SDR_SUCCESS; - return true; - } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) { - // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked. - // TODO(mellem): Stop using RTCError here and get rid of the mapping. - *result = cricket::SendDataResult::SDR_BLOCK; - return false; - } - *result = cricket::SendDataResult::SDR_ERROR; - return false; - } else if (rtp_data_channel_) { - return rtp_data_channel_->SendData(params, payload, result); - } - RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; - return false; -} - -bool PeerConnection::ConnectDataChannel(DataChannel* webrtc_data_channel) { - RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel_ && !data_channel_transport_) { - // Don't log an error here, because DataChannels are expected to call - // ConnectDataChannel in this state. It's the only way to initially tell - // whether or not the underlying transport is ready. - return false; - } - if (data_channel_transport_) { - SignalDataChannelTransportWritable_s.connect(webrtc_data_channel, - &DataChannel::OnChannelReady); - SignalDataChannelTransportReceivedData_s.connect( - webrtc_data_channel, &DataChannel::OnDataReceived); - SignalDataChannelTransportChannelClosing_s.connect( - webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); - SignalDataChannelTransportChannelClosed_s.connect( - webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); - } - if (rtp_data_channel_) { - rtp_data_channel_->SignalReadyToSendData.connect( - webrtc_data_channel, &DataChannel::OnChannelReady); - rtp_data_channel_->SignalDataReceived.connect(webrtc_data_channel, - &DataChannel::OnDataReceived); - } - return true; -} - -void PeerConnection::DisconnectDataChannel(DataChannel* webrtc_data_channel) { - RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel_ && !data_channel_transport_) { - RTC_LOG(LS_ERROR) - << "DisconnectDataChannel called when rtp_data_channel_ and " - "sctp_transport_ are NULL."; - return; - } - if (data_channel_transport_) { - SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); - } - if (rtp_data_channel_) { - rtp_data_channel_->SignalReadyToSendData.disconnect(webrtc_data_channel); - rtp_data_channel_->SignalDataReceived.disconnect(webrtc_data_channel); - } -} - -void PeerConnection::AddSctpDataStream(int sid) { - if (data_channel_transport_) { - network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { - if (data_channel_transport_) { - data_channel_transport_->OpenChannel(sid); - } - }); - } -} - -void PeerConnection::RemoveSctpDataStream(int sid) { - if (data_channel_transport_) { - network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { - if (data_channel_transport_) { - data_channel_transport_->CloseChannel(sid); - } - }); - } -} - -bool PeerConnection::ReadyToSendData() const { - RTC_DCHECK_RUN_ON(signaling_thread()); - return (rtp_data_channel_ && rtp_data_channel_->ready_to_send_data()) || - (data_channel_transport_ && data_channel_transport_ready_to_send_); -} - -void PeerConnection::OnDataReceived(int channel_id, - DataMessageType type, - const rtc::CopyOnWriteBuffer& buffer) { - RTC_DCHECK_RUN_ON(network_thread()); - cricket::ReceiveDataParams params; - params.sid = channel_id; - params.type = ToCricketDataMessageType(type); - data_channel_transport_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { - RTC_DCHECK_RUN_ON(signaling_thread()); - if (!HandleOpenMessage_s(params, buffer)) { - SignalDataChannelTransportReceivedData_s(params, buffer); - } - }); -} - -void PeerConnection::OnChannelClosing(int channel_id) { - RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, channel_id] { - RTC_DCHECK_RUN_ON(signaling_thread()); - SignalDataChannelTransportChannelClosing_s(channel_id); - }); -} - -void PeerConnection::OnChannelClosed(int channel_id) { - RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, channel_id] { - RTC_DCHECK_RUN_ON(signaling_thread()); - SignalDataChannelTransportChannelClosed_s(channel_id); - }); -} - -void PeerConnection::OnReadyToSend() { - RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - data_channel_transport_ready_to_send_ = true; - SignalDataChannelTransportWritable_s( - data_channel_transport_ready_to_send_); - }); -} - absl::optional PeerConnection::sctp_transport_name() const { RTC_DCHECK_RUN_ON(signaling_thread()); if (sctp_mid_ && transport_controller_) { @@ -6591,11 +6316,12 @@ std::map PeerConnection::GetTransportNamesByMid() channel->transport_name(); } } - if (rtp_data_channel_) { - transport_names_by_mid[rtp_data_channel_->content_name()] = - rtp_data_channel_->transport_name(); + if (data_channel_controller_.rtp_data_channel()) { + transport_names_by_mid[data_channel_controller_.rtp_data_channel() + ->content_name()] = + data_channel_controller_.rtp_data_channel()->transport_name(); } - if (data_channel_transport_) { + if (data_channel_controller_.data_channel_transport()) { absl::optional transport_name = sctp_transport_name(); RTC_DCHECK(transport_name); transport_names_by_mid[*sctp_mid_] = *transport_name; @@ -6779,8 +6505,9 @@ void PeerConnection::EnableSending() { } } - if (rtp_data_channel_ && !rtp_data_channel_->enabled()) { - rtp_data_channel_->Enable(true); + if (data_channel_controller_.rtp_data_channel() && + !data_channel_controller_.rtp_data_channel()->enabled()) { + data_channel_controller_.rtp_data_channel()->Enable(true); } } @@ -6966,7 +6693,8 @@ RTCError PeerConnection::CreateChannels(const SessionDescription& desc) { const cricket::ContentInfo* data = cricket::GetFirstDataContent(&desc); if (data_channel_type_ != cricket::DCT_NONE && data && !data->rejected && - !rtp_data_channel_ && !data_channel_transport_) { + !data_channel_controller_.rtp_data_channel() && + !data_channel_controller_.data_channel_transport()) { if (!CreateDataChannel(data->name)) { LOG_AND_RETURN_ERROR(RTCErrorType::INTERNAL_ERROR, "Failed to create data channel."); @@ -7043,17 +6771,20 @@ bool PeerConnection::CreateDataChannel(const std::string& mid) { case cricket::DCT_RTP: default: RtpTransportInternal* rtp_transport = GetRtpTransport(mid); - rtp_data_channel_ = channel_manager()->CreateRtpDataChannel( - configuration_.media_config, rtp_transport, signaling_thread(), mid, - SrtpRequired(), GetCryptoOptions(), &ssrc_generator_); - if (!rtp_data_channel_) { + data_channel_controller_.set_rtp_data_channel( + channel_manager()->CreateRtpDataChannel( + configuration_.media_config, rtp_transport, signaling_thread(), + mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_)); + if (!data_channel_controller_.rtp_data_channel()) { return false; } - rtp_data_channel_->SignalDtlsSrtpSetupFailure.connect( - this, &PeerConnection::OnDtlsSrtpSetupFailure); - rtp_data_channel_->SignalSentPacket.connect( + data_channel_controller_.rtp_data_channel() + ->SignalDtlsSrtpSetupFailure.connect( + this, &PeerConnection::OnDtlsSrtpSetupFailure); + data_channel_controller_.rtp_data_channel()->SignalSentPacket.connect( this, &PeerConnection::OnSentPacket_w); - rtp_data_channel_->SetRtpTransport(rtp_transport); + data_channel_controller_.rtp_data_channel()->SetRtpTransport( + rtp_transport); return true; } return false; @@ -7083,20 +6814,20 @@ bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) { } RTC_LOG(LS_INFO) << "Setting up data channel transport for mid=" << mid; - data_channel_transport_ = transport; - data_channel_transport_invoker_ = std::make_unique(); + data_channel_controller_.set_data_channel_transport(transport); + data_channel_controller_.SetupDataChannelTransport_n(); sctp_mid_ = mid; // Note: setting the data sink and checking initial state must be done last, // after setting up the data channel. Setting the data sink may trigger // callbacks to PeerConnection which require the transport to be completely // set up (eg. OnReadyToSend()). - transport->SetDataSink(this); + transport->SetDataSink(&data_channel_controller_); return true; } void PeerConnection::TeardownDataChannelTransport_n() { - if (!sctp_mid_ && !data_channel_transport_) { + if (!sctp_mid_ && !data_channel_controller_.data_channel_transport()) { return; } RTC_LOG(LS_INFO) << "Tearing down data channel transport for mid=" @@ -7105,11 +6836,7 @@ void PeerConnection::TeardownDataChannelTransport_n() { // |sctp_mid_| may still be active through an SCTP transport. If not, unset // it. sctp_mid_.reset(); - data_channel_transport_invoker_ = nullptr; - if (data_channel_transport_) { - data_channel_transport_->SetDataSink(nullptr); - } - data_channel_transport_ = nullptr; + data_channel_controller_.TeardownDataChannelTransport_n(); } // Returns false if bundle is enabled and rtcp_mux is disabled. @@ -7628,7 +7355,7 @@ const std::string PeerConnection::GetTransportName( if (channel) { return channel->transport_name(); } - if (data_channel_transport_) { + if (data_channel_controller_.data_channel_transport()) { RTC_DCHECK(sctp_mid_); if (content_name == *sctp_mid_) { return *sctp_transport_name(); @@ -7651,10 +7378,10 @@ void PeerConnection::DestroyTransceiverChannel( } void PeerConnection::DestroyDataChannelTransport() { - if (rtp_data_channel_) { + if (data_channel_controller_.rtp_data_channel()) { OnTransportChannelClosed(); - DestroyChannelInterface(rtp_data_channel_); - rtp_data_channel_ = nullptr; + DestroyChannelInterface(data_channel_controller_.rtp_data_channel()); + data_channel_controller_.set_rtp_data_channel(nullptr); } // Note: Cannot use rtc::Bind to create a functor to invoke because it will @@ -7705,29 +7432,9 @@ bool PeerConnection::OnTransportChanged( if (base_channel) { ret = base_channel->SetRtpTransport(rtp_transport); } - - if (data_channel_transport_ && mid == sctp_mid_ && - data_channel_transport_ != data_channel_transport) { - // Changed which data channel transport is used for |sctp_mid_| (eg. now - // it's bundled). - data_channel_transport_->SetDataSink(nullptr); - data_channel_transport_ = data_channel_transport; - if (data_channel_transport) { - data_channel_transport->SetDataSink(this); - - // There's a new data channel transport. This needs to be signaled to the - // |sctp_data_channels_| so that they can reopen and reconnect. This is - // necessary when bundling is applied. - data_channel_transport_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - for (auto channel : sctp_data_channels_) { - channel->OnTransportChannelCreated(); - } - }); - } + if (mid == sctp_mid_) { + data_channel_controller_.OnTransportChanged(data_channel_transport); } - return ret; } diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 9bc6119461..7a1576611b 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -59,8 +59,6 @@ class RtcEventLog; // - The ICE state machine. // - Generating stats. class PeerConnection : public PeerConnectionInternal, - public DataChannelProviderInterface, - public DataChannelSink, public JsepTransportController::Observer, public RtpSenderBase::SetStreamsObserver, public rtc::MessageHandler, @@ -275,7 +273,7 @@ class PeerConnection : public PeerConnectionInternal, } cricket::RtpDataChannel* rtp_data_channel() const override { - return rtp_data_channel_; + return data_channel_controller_.rtp_data_channel(); } std::vector> sctp_data_channels() @@ -397,6 +395,120 @@ class PeerConnection : public PeerConnectionInternal, FieldTrialFlag receive_only; }; + // Controller for datachannels. Intended to be separated out; placed here as a + // first stage in refactoring. + class DataChannelController : public DataChannelProviderInterface, + public DataChannelSink { + public: + explicit DataChannelController(PeerConnection* pc) : pc_(pc) {} + ~DataChannelController() { data_channel_transport_invoker_.reset(); } + + // Implements DataChannelProviderInterface. + bool SendData(const cricket::SendDataParams& params, + const rtc::CopyOnWriteBuffer& payload, + cricket::SendDataResult* result) override; + bool ConnectDataChannel(DataChannel* webrtc_data_channel) override; + void DisconnectDataChannel(DataChannel* webrtc_data_channel) override; + void AddSctpDataStream(int sid) override; + void RemoveSctpDataStream(int sid) override; + bool ReadyToSendData() const override; + + // Implements DataChannelSink. + void OnDataReceived(int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) override; + void OnChannelClosing(int channel_id) override; + void OnChannelClosed(int channel_id) override; + void OnReadyToSend() override; + + // Called from PeerConnection::SetupDataChannelTransport_n + void SetupDataChannelTransport_n(); + // Called from PeerConnection::TeardownDataChannelTransport_n + void TeardownDataChannelTransport_n(); + + // Called from PeerConnection::OnTransportChanged + // to make required changes to datachannels' transports. + void OnTransportChanged( + DataChannelTransportInterface* data_channel_transport); + + // Parses and handles open messages. Returns true if the message is an open + // message, false otherwise. + bool HandleOpenMessage_s(const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) + RTC_RUN_ON(signaling_thread()); + // Called when a valid data channel OPEN message is received. + void OnDataChannelOpenMessage(const std::string& label, + const InternalDataChannelInit& config) + RTC_RUN_ON(signaling_thread()); + + // Creates channel and adds it to the collection of DataChannels that will + // be offered in a SessionDescription. + rtc::scoped_refptr InternalCreateDataChannel( + const std::string& label, + const InternalDataChannelInit* + config) /* RTC_RUN_ON(signaling_thread()) */; + void AllocateSctpSids( + rtc::SSLRole role) /* RTC_RUN_ON(signaling_thread()) */; + void OnSctpDataChannelClosed(DataChannel* channel); + /* RTC_RUN_ON(signaling_thread() */ + + // Accessors + cricket::RtpDataChannel* rtp_data_channel() const { + return rtp_data_channel_; + } + void set_rtp_data_channel(cricket::RtpDataChannel* channel) { + rtp_data_channel_ = channel; + } + DataChannelTransportInterface* data_channel_transport() const { + return data_channel_transport_; + } + void set_data_channel_transport(DataChannelTransportInterface* transport) { + data_channel_transport_ = transport; + } + + private: + rtc::Thread* network_thread() const { return pc_->network_thread(); } + rtc::Thread* signaling_thread() const { return pc_->signaling_thread(); } + // Plugin transport used for data channels. Pointer may be accessed and + // checked from any thread, but the object may only be touched on the + // network thread. + // TODO(bugs.webrtc.org/9987): Accessed on both signaling and network + // thread. + DataChannelTransportInterface* data_channel_transport_ = nullptr; + + // Cached value of whether the data channel transport is ready to send. + bool data_channel_transport_ready_to_send_ + RTC_GUARDED_BY(signaling_thread()) = false; + + // |rtp_data_channel_| is used if in RTP data channel mode, + // |data_channel_transport_| when using SCTP. + cricket::RtpDataChannel* rtp_data_channel_ = nullptr; + // TODO(bugs.webrtc.org/9987): Accessed on both + // signaling and some other thread. + + SctpSidAllocator sid_allocator_ /* RTC_GUARDED_BY(signaling_thread()) */; + + // Signals from |data_channel_transport_|. These are invoked on the + // signaling thread. + sigslot::signal1 SignalDataChannelTransportWritable_s + RTC_GUARDED_BY(signaling_thread()); + sigslot::signal2 + SignalDataChannelTransportReceivedData_s + RTC_GUARDED_BY(signaling_thread()); + sigslot::signal1 SignalDataChannelTransportChannelClosing_s + RTC_GUARDED_BY(signaling_thread()); + sigslot::signal1 SignalDataChannelTransportChannelClosed_s + RTC_GUARDED_BY(signaling_thread()); + + // Used to invoke data channel transport signals on the signaling thread. + std::unique_ptr data_channel_transport_invoker_ + RTC_GUARDED_BY(network_thread()); + + // Owning PeerConnection. + PeerConnection* pc_; + }; + // Captures partial state to be used for rollback. Applicable only in // Unified Plan. class TransceiverStableState { @@ -843,31 +955,15 @@ class PeerConnection : public PeerConnectionInternal, uint32_t remote_ssrc) RTC_RUN_ON(signaling_thread()); - // Creates channel and adds it to the collection of DataChannels that will - // be offered in a SessionDescription. - rtc::scoped_refptr InternalCreateDataChannel( - const std::string& label, - const InternalDataChannelInit* config) RTC_RUN_ON(signaling_thread()); - // Checks if any data channel has been added. bool HasDataChannels() const RTC_RUN_ON(signaling_thread()); - - void AllocateSctpSids(rtc::SSLRole role) RTC_RUN_ON(signaling_thread()); - void OnSctpDataChannelClosed(DataChannel* channel) - RTC_RUN_ON(signaling_thread()); + // Handler for the "channel closed" signal + void OnSctpDataChannelClosed(DataChannel* channel); + // Sends the MSG_FREE_DATACHANNELS signal + void SignalFreeDataChannels(); // Called when the transport for the data channels is closed or destroyed. void OnTransportChannelClosed() RTC_RUN_ON(signaling_thread()); - // Called when a valid data channel OPEN message is received. - void OnDataChannelOpenMessage(const std::string& label, - const InternalDataChannelInit& config) - RTC_RUN_ON(signaling_thread()); - - // Parses and handles open messages. Returns true if the message is an open - // message, false otherwise. - bool HandleOpenMessage_s(const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& buffer) - RTC_RUN_ON(signaling_thread()); // Returns true if the PeerConnection is configured to use Unified Plan // semantics for creating offers/answers and setting local/remote @@ -975,26 +1071,8 @@ class PeerConnection : public PeerConnectionInternal, cricket::IceConfig ParseIceConfig( const PeerConnectionInterface::RTCConfiguration& config) const; - // Implements DataChannelProviderInterface. - bool SendData(const cricket::SendDataParams& params, - const rtc::CopyOnWriteBuffer& payload, - cricket::SendDataResult* result) override; - bool ConnectDataChannel(DataChannel* webrtc_data_channel) override; - void DisconnectDataChannel(DataChannel* webrtc_data_channel) override; - void AddSctpDataStream(int sid) override; - void RemoveSctpDataStream(int sid) override; - bool ReadyToSendData() const override; - cricket::DataChannelType data_channel_type() const; - // Implements DataChannelSink. - void OnDataReceived(int channel_id, - DataMessageType type, - const rtc::CopyOnWriteBuffer& buffer) override; - void OnChannelClosing(int channel_id) override; - void OnChannelClosed(int channel_id) override; - void OnReadyToSend() override; - // Called when an RTCCertificate is generated or retrieved by // WebRTCSessionDescriptionFactory. Should happen before setLocalDescription. void OnCertificateReady( @@ -1328,7 +1406,6 @@ class PeerConnection : public PeerConnectionInternal, std::vector local_video_sender_infos_ RTC_GUARDED_BY(signaling_thread()); - SctpSidAllocator sid_allocator_ RTC_GUARDED_BY(signaling_thread()); // label -> DataChannel std::map> rtp_data_channels_ RTC_GUARDED_BY(signaling_thread()); @@ -1389,11 +1466,6 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr sctp_factory_; // TODO(bugs.webrtc.org/9987): Accessed on both // signaling and network thread. - // |rtp_data_channel_| is used if in RTP data channel mode, |sctp_transport_| - // when using SCTP. - cricket::RtpDataChannel* rtp_data_channel_ = - nullptr; // TODO(bugs.webrtc.org/9987): Accessed on both - // signaling and some other thread. // |sctp_mid_| is the content name (MID) in SDP. // Note: this is used as the data channel MID by both SCTP and data channel @@ -1406,32 +1478,7 @@ class PeerConnection : public PeerConnectionInternal, // Whether this peer is the caller. Set when the local description is applied. absl::optional is_caller_ RTC_GUARDED_BY(signaling_thread()); - // Plugin transport used for data channels. Pointer may be accessed and - // checked from any thread, but the object may only be touched on the - // network thread. - // TODO(bugs.webrtc.org/9987): Accessed on both signaling and network thread. - DataChannelTransportInterface* data_channel_transport_; - // Cached value of whether the data channel transport is ready to send. - bool data_channel_transport_ready_to_send_ - RTC_GUARDED_BY(signaling_thread()) = false; - - // Used to invoke data channel transport signals on the signaling thread. - std::unique_ptr data_channel_transport_invoker_ - RTC_GUARDED_BY(network_thread()); - - // Signals from |data_channel_transport_|. These are invoked on the signaling - // thread. - sigslot::signal1 SignalDataChannelTransportWritable_s - RTC_GUARDED_BY(signaling_thread()); - sigslot::signal2 - SignalDataChannelTransportReceivedData_s - RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalDataChannelTransportChannelClosing_s - RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalDataChannelTransportChannelClosed_s - RTC_GUARDED_BY(signaling_thread()); std::unique_ptr current_local_description_ RTC_GUARDED_BY(signaling_thread()); @@ -1487,6 +1534,7 @@ class PeerConnection : public PeerConnectionInternal, local_ice_credentials_to_replace_ RTC_GUARDED_BY(signaling_thread()); bool is_negotiation_needed_ RTC_GUARDED_BY(signaling_thread()) = false; + DataChannelController data_channel_controller_; rtc::WeakPtrFactory weak_ptr_factory_ RTC_GUARDED_BY(signaling_thread()); };