diff --git a/pc/BUILD.gn b/pc/BUILD.gn index e096f84039..531db6831e 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -163,10 +163,10 @@ rtc_library("peerconnection") { "audio_rtp_receiver.h", "audio_track.cc", "audio_track.h", - "data_channel.cc", - "data_channel.h", "data_channel_controller.cc", "data_channel_controller.h", + "data_channel_utils.cc", + "data_channel_utils.h", "dtmf_sender.cc", "dtmf_sender.h", "ice_server_parsing.cc", @@ -195,6 +195,8 @@ rtc_library("peerconnection") { "rtc_stats_collector.h", "rtc_stats_traversal.cc", "rtc_stats_traversal.h", + "rtp_data_channel.cc", + "rtp_data_channel.h", "rtp_parameters_conversion.cc", "rtp_parameters_conversion.h", "rtp_receiver.cc", @@ -203,6 +205,8 @@ rtc_library("peerconnection") { "rtp_sender.h", "rtp_transceiver.cc", "rtp_transceiver.h", + "sctp_data_channel.cc", + "sctp_data_channel.h", "sdp_serializer.cc", "sdp_serializer.h", "sdp_utils.cc", diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index 710ca8e531..04a4bb6245 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -34,52 +34,65 @@ bool DataChannelController::SendData(const cricket::SendDataParams& params, } bool DataChannelController::ConnectDataChannel( - DataChannel* webrtc_data_channel) { + RtpDataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel() && !data_channel_transport()) { + if (!rtp_data_channel()) { // Don't log an error here, because DataChannels are expected to call // ConnectDataChannel in this state. It's the only way to initially tell // whether or not the underlying transport is ready. return false; } - if (data_channel_transport()) { - SignalDataChannelTransportWritable_s.connect(webrtc_data_channel, - &DataChannel::OnChannelReady); - SignalDataChannelTransportReceivedData_s.connect( - webrtc_data_channel, &DataChannel::OnDataReceived); - SignalDataChannelTransportChannelClosing_s.connect( - webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); - SignalDataChannelTransportChannelClosed_s.connect( - webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); - } - if (rtp_data_channel()) { - rtp_data_channel()->SignalReadyToSendData.connect( - webrtc_data_channel, &DataChannel::OnChannelReady); - rtp_data_channel()->SignalDataReceived.connect( - webrtc_data_channel, &DataChannel::OnDataReceived); - } + rtp_data_channel()->SignalReadyToSendData.connect( + webrtc_data_channel, &RtpDataChannel::OnChannelReady); + rtp_data_channel()->SignalDataReceived.connect( + webrtc_data_channel, &RtpDataChannel::OnDataReceived); return true; } void DataChannelController::DisconnectDataChannel( - DataChannel* webrtc_data_channel) { + RtpDataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel() && !data_channel_transport()) { + if (!rtp_data_channel()) { RTC_LOG(LS_ERROR) - << "DisconnectDataChannel called when rtp_data_channel_ and " - "sctp_transport_ are NULL."; + << "DisconnectDataChannel called when rtp_data_channel_ is NULL."; return; } - if (data_channel_transport()) { - SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); + rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel); + rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel); +} + +bool DataChannelController::ConnectDataChannel( + SctpDataChannel* webrtc_data_channel) { + RTC_DCHECK_RUN_ON(signaling_thread()); + if (!data_channel_transport()) { + // Don't log an error here, because DataChannels are expected to call + // ConnectDataChannel in this state. It's the only way to initially tell + // whether or not the underlying transport is ready. + return false; } - if (rtp_data_channel()) { - rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel); - rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel); + SignalDataChannelTransportWritable_s.connect( + webrtc_data_channel, &SctpDataChannel::OnTransportReady); + SignalDataChannelTransportReceivedData_s.connect( + webrtc_data_channel, &SctpDataChannel::OnDataReceived); + SignalDataChannelTransportChannelClosing_s.connect( + webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely); + SignalDataChannelTransportChannelClosed_s.connect( + webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete); + return true; +} + +void DataChannelController::DisconnectDataChannel( + SctpDataChannel* webrtc_data_channel) { + RTC_DCHECK_RUN_ON(signaling_thread()); + if (!data_channel_transport()) { + RTC_LOG(LS_ERROR) + << "DisconnectDataChannel called when sctp_transport_ is NULL."; + return; } + SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); } void DataChannelController::AddSctpDataStream(int sid) { @@ -210,10 +223,10 @@ void DataChannelController::OnTransportChanged( } } -std::vector DataChannelController::GetDataChannelStats() +std::vector DataChannelController::GetDataChannelStats() const { RTC_DCHECK_RUN_ON(signaling_thread()); - std::vector stats; + std::vector stats; stats.reserve(sctp_data_channels_.size()); for (const auto& channel : sctp_data_channels_) stats.push_back(channel->GetStats()); @@ -244,21 +257,19 @@ bool DataChannelController::HandleOpenMessage_s( void DataChannelController::OnDataChannelOpenMessage( const std::string& label, const InternalDataChannelInit& config) { - rtc::scoped_refptr channel( - InternalCreateDataChannel(label, &config)); + rtc::scoped_refptr channel( + InternalCreateDataChannelWithProxy(label, &config)); if (!channel.get()) { RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."; return; } - rtc::scoped_refptr proxy_channel = - DataChannel::CreateProxy(std::move(channel)); - pc_->Observer()->OnDataChannel(std::move(proxy_channel)); + pc_->Observer()->OnDataChannel(std::move(channel)); pc_->NoteDataAddedEvent(); } -rtc::scoped_refptr -DataChannelController::InternalCreateDataChannel( +rtc::scoped_refptr +DataChannelController::InternalCreateDataChannelWithProxy( const std::string& label, const InternalDataChannelInit* config) { RTC_DCHECK_RUN_ON(signaling_thread()); @@ -270,52 +281,78 @@ DataChannelController::InternalCreateDataChannel( << "InternalCreateDataChannel: Data is not supported in this call."; return nullptr; } - InternalDataChannelInit new_config = - config ? (*config) : InternalDataChannelInit(); - if (DataChannel::IsSctpLike(data_channel_type_)) { - if (new_config.id < 0) { - rtc::SSLRole role; - if ((pc_->GetSctpSslRole(&role)) && - !sid_allocator_.AllocateSid(role, &new_config.id)) { - RTC_LOG(LS_ERROR) - << "No id can be allocated for the SCTP data channel."; - return nullptr; - } - } else if (!sid_allocator_.ReserveSid(new_config.id)) { - RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel " - "because the id is already in use or out of range."; - return nullptr; + if (IsSctpLike(data_channel_type())) { + rtc::scoped_refptr channel = + InternalCreateSctpDataChannel(label, config); + if (channel) { + return SctpDataChannel::CreateProxy(channel); + } + } else if (data_channel_type() == cricket::DCT_RTP) { + rtc::scoped_refptr channel = + InternalCreateRtpDataChannel(label, config); + if (channel) { + return RtpDataChannel::CreateProxy(channel); } } - rtc::scoped_refptr channel( - DataChannel::Create(this, data_channel_type(), label, new_config, - signaling_thread(), network_thread())); + return nullptr; +} + +rtc::scoped_refptr +DataChannelController::InternalCreateRtpDataChannel( + const std::string& label, + const DataChannelInit* config) { + RTC_DCHECK_RUN_ON(signaling_thread()); + DataChannelInit new_config = config ? (*config) : DataChannelInit(); + rtc::scoped_refptr channel( + RtpDataChannel::Create(this, label, new_config, signaling_thread())); + if (!channel) { + return nullptr; + } + if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) { + RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label() + << " already exists."; + return nullptr; + } + rtp_data_channels_[channel->label()] = channel; + SignalRtpDataChannelCreated_(channel.get()); + return channel; +} + +rtc::scoped_refptr +DataChannelController::InternalCreateSctpDataChannel( + const std::string& label, + const InternalDataChannelInit* config) { + RTC_DCHECK_RUN_ON(signaling_thread()); + InternalDataChannelInit new_config = + config ? (*config) : InternalDataChannelInit(); + if (new_config.id < 0) { + rtc::SSLRole role; + if ((pc_->GetSctpSslRole(&role)) && + !sid_allocator_.AllocateSid(role, &new_config.id)) { + RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel."; + return nullptr; + } + } else if (!sid_allocator_.ReserveSid(new_config.id)) { + RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel " + "because the id is already in use or out of range."; + return nullptr; + } + rtc::scoped_refptr channel(SctpDataChannel::Create( + this, label, new_config, signaling_thread(), network_thread())); if (!channel) { sid_allocator_.ReleaseSid(new_config.id); return nullptr; } - - if (channel->data_channel_type() == cricket::DCT_RTP) { - if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) { - RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label() - << " already exists."; - return nullptr; - } - rtp_data_channels_[channel->label()] = channel; - } else { - RTC_DCHECK(DataChannel::IsSctpLike(data_channel_type_)); - sctp_data_channels_.push_back(channel); - channel->SignalClosed.connect(pc_, - &PeerConnection::OnSctpDataChannelClosed); - } - SignalDataChannelCreated_(channel.get()); + sctp_data_channels_.push_back(channel); + channel->SignalClosed.connect(pc_, &PeerConnection::OnSctpDataChannelClosed); + SignalSctpDataChannelCreated_(channel.get()); return channel; } void DataChannelController::AllocateSctpSids(rtc::SSLRole role) { RTC_DCHECK_RUN_ON(signaling_thread()); - std::vector> channels_to_close; + std::vector> channels_to_close; for (const auto& channel : sctp_data_channels_) { if (channel->id() < 0) { int sid; @@ -334,7 +371,7 @@ void DataChannelController::AllocateSctpSids(rtc::SSLRole role) { } } -void DataChannelController::OnSctpDataChannelClosed(DataChannel* channel) { +void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { RTC_DCHECK_RUN_ON(signaling_thread()); for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end(); ++it) { @@ -364,20 +401,20 @@ void DataChannelController::OnTransportChannelClosed() { RTC_DCHECK_RUN_ON(signaling_thread()); // Use a temporary copy of the RTP/SCTP DataChannel list because the // DataChannel may callback to us and try to modify the list. - std::map> temp_rtp_dcs; + std::map> temp_rtp_dcs; temp_rtp_dcs.swap(rtp_data_channels_); for (const auto& kv : temp_rtp_dcs) { kv.second->OnTransportChannelClosed(); } - std::vector> temp_sctp_dcs; + std::vector> temp_sctp_dcs; temp_sctp_dcs.swap(sctp_data_channels_); for (const auto& channel : temp_sctp_dcs) { channel->OnTransportChannelClosed(); } } -DataChannel* DataChannelController::FindDataChannelBySid(int sid) const { +SctpDataChannel* DataChannelController::FindDataChannelBySid(int sid) const { RTC_DCHECK_RUN_ON(signaling_thread()); for (const auto& channel : sctp_data_channels_) { if (channel->id() == sid) { @@ -465,7 +502,7 @@ void DataChannelController::set_data_channel_transport( data_channel_transport_ = transport; } -const std::map>* +const std::map>* DataChannelController::rtp_data_channels() const { RTC_DCHECK_RUN_ON(signaling_thread()); return &rtp_data_channels_; @@ -476,7 +513,7 @@ void DataChannelController::UpdateClosingRtpDataChannels( bool is_local_update) { auto it = rtp_data_channels_.begin(); while (it != rtp_data_channels_.end()) { - DataChannel* data_channel = it->second; + RtpDataChannel* data_channel = it->second; if (absl::c_linear_search(active_channels, data_channel->label())) { ++it; continue; @@ -488,7 +525,7 @@ void DataChannelController::UpdateClosingRtpDataChannels( data_channel->RemotePeerRequestClose(); } - if (data_channel->state() == DataChannel::kClosed) { + if (data_channel->state() == RtpDataChannel::kClosed) { rtp_data_channels_.erase(it); it = rtp_data_channels_.begin(); } else { @@ -499,8 +536,11 @@ void DataChannelController::UpdateClosingRtpDataChannels( void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label, uint32_t remote_ssrc) { - rtc::scoped_refptr channel( - InternalCreateDataChannel(label, nullptr)); + if (data_channel_type() != cricket::DCT_RTP) { + return; + } + rtc::scoped_refptr channel( + InternalCreateRtpDataChannel(label, nullptr)); if (!channel.get()) { RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but" "CreateDataChannel failed."; @@ -508,7 +548,7 @@ void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label, } channel->SetReceiveSsrc(remote_ssrc); rtc::scoped_refptr proxy_channel = - DataChannel::CreateProxy(std::move(channel)); + RtpDataChannel::CreateProxy(std::move(channel)); pc_->Observer()->OnDataChannel(std::move(proxy_channel)); } diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index c3e64aba95..3daee11381 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -17,14 +17,16 @@ #include #include "pc/channel.h" -#include "pc/data_channel.h" +#include "pc/rtp_data_channel.h" +#include "pc/sctp_data_channel.h" #include "rtc_base/weak_ptr.h" namespace webrtc { class PeerConnection; -class DataChannelController : public DataChannelProviderInterface, +class DataChannelController : public RtpDataChannelProviderInterface, + public SctpDataChannelProviderInterface, public DataChannelSink { public: explicit DataChannelController(PeerConnection* pc) : pc_(pc) {} @@ -35,12 +37,15 @@ class DataChannelController : public DataChannelProviderInterface, DataChannelController(DataChannelController&&) = delete; DataChannelController& operator=(DataChannelController&& other) = delete; - // Implements DataChannelProviderInterface. + // Implements RtpDataChannelProviderInterface/ + // SctpDataChannelProviderInterface. bool SendData(const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) override; - bool ConnectDataChannel(DataChannel* webrtc_data_channel) override; - void DisconnectDataChannel(DataChannel* webrtc_data_channel) override; + bool ConnectDataChannel(RtpDataChannel* webrtc_data_channel) override; + void DisconnectDataChannel(RtpDataChannel* webrtc_data_channel) override; + bool ConnectDataChannel(SctpDataChannel* webrtc_data_channel) override; + void DisconnectDataChannel(SctpDataChannel* webrtc_data_channel) override; void AddSctpDataStream(int sid) override; void RemoveSctpDataStream(int sid) override; bool ReadyToSendData() const override; @@ -65,17 +70,17 @@ class DataChannelController : public DataChannelProviderInterface, DataChannelTransportInterface* data_channel_transport); // Called from PeerConnection::GetDataChannelStats on the signaling thread. - std::vector GetDataChannelStats() const; + std::vector GetDataChannelStats() const; // Creates channel and adds it to the collection of DataChannels that will - // be offered in a SessionDescription. - rtc::scoped_refptr InternalCreateDataChannel( + // be offered in a SessionDescription, and wraps it in a proxy object. + rtc::scoped_refptr InternalCreateDataChannelWithProxy( const std::string& label, const InternalDataChannelInit* config) /* RTC_RUN_ON(signaling_thread()) */; void AllocateSctpSids(rtc::SSLRole role); - DataChannel* FindDataChannelBySid(int sid) const; + SctpDataChannel* FindDataChannelBySid(int sid) const; // Checks if any data channel has been added. bool HasDataChannels() const; @@ -102,19 +107,32 @@ class DataChannelController : public DataChannelProviderInterface, } DataChannelTransportInterface* data_channel_transport() const; void set_data_channel_transport(DataChannelTransportInterface* transport); - const std::map>* + const std::map>* rtp_data_channels() const; - sigslot::signal1& SignalDataChannelCreated() { + sigslot::signal1& SignalRtpDataChannelCreated() { RTC_DCHECK_RUN_ON(signaling_thread()); - return SignalDataChannelCreated_; + return SignalRtpDataChannelCreated_; + } + sigslot::signal1& SignalSctpDataChannelCreated() { + RTC_DCHECK_RUN_ON(signaling_thread()); + return SignalSctpDataChannelCreated_; } // Called when the transport for the data channels is closed or destroyed. void OnTransportChannelClosed(); - void OnSctpDataChannelClosed(DataChannel* channel); + void OnSctpDataChannelClosed(SctpDataChannel* channel); private: + rtc::scoped_refptr InternalCreateRtpDataChannel( + const std::string& label, + const DataChannelInit* config) /* RTC_RUN_ON(signaling_thread()) */; + + rtc::scoped_refptr InternalCreateSctpDataChannel( + const std::string& label, + const InternalDataChannelInit* + config) /* RTC_RUN_ON(signaling_thread()) */; + // Parses and handles open messages. Returns true if the message is an open // message, false otherwise. bool HandleOpenMessage_s(const cricket::ReceiveDataParams& params, @@ -174,13 +192,13 @@ class DataChannelController : public DataChannelProviderInterface, // signaling and some other thread. SctpSidAllocator sid_allocator_ /* RTC_GUARDED_BY(signaling_thread()) */; - std::vector> sctp_data_channels_ + std::vector> sctp_data_channels_ RTC_GUARDED_BY(signaling_thread()); - std::vector> sctp_data_channels_to_free_ + std::vector> sctp_data_channels_to_free_ RTC_GUARDED_BY(signaling_thread()); // Map of label -> DataChannel - std::map> rtp_data_channels_ + std::map> rtp_data_channels_ RTC_GUARDED_BY(signaling_thread()); // Signals from |data_channel_transport_|. These are invoked on the @@ -198,7 +216,9 @@ class DataChannelController : public DataChannelProviderInterface, sigslot::signal1 SignalDataChannelTransportChannelClosed_s RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalDataChannelCreated_ + sigslot::signal1 SignalRtpDataChannelCreated_ + RTC_GUARDED_BY(signaling_thread()); + sigslot::signal1 SignalSctpDataChannelCreated_ RTC_GUARDED_BY(signaling_thread()); // Used to invoke data channel transport signals on the signaling thread. diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc index 11dfcc4aee..7048dc82b7 100644 --- a/pc/data_channel_unittest.cc +++ b/pc/data_channel_unittest.cc @@ -8,20 +8,20 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "pc/data_channel.h" - #include #include #include +#include "pc/sctp_data_channel.h" #include "pc/sctp_utils.h" #include "pc/test/fake_data_channel_provider.h" #include "rtc_base/gunit.h" #include "rtc_base/numerics/safe_conversions.h" #include "test/gtest.h" -using webrtc::DataChannel; +using webrtc::DataChannelInterface; +using webrtc::SctpDataChannel; using webrtc::SctpSidAllocator; static constexpr int kDefaultTimeout = 10000; @@ -69,12 +69,11 @@ class SctpDataChannelTest : public ::testing::Test { protected: SctpDataChannelTest() : provider_(new FakeDataChannelProvider()), - webrtc_data_channel_(DataChannel::Create(provider_.get(), - cricket::DCT_SCTP, - "test", - init_, - rtc::Thread::Current(), - rtc::Thread::Current())) {} + webrtc_data_channel_(SctpDataChannel::Create(provider_.get(), + "test", + init_, + rtc::Thread::Current(), + rtc::Thread::Current())) {} void SetChannelReady() { provider_->set_transport_available(true); @@ -93,7 +92,7 @@ class SctpDataChannelTest : public ::testing::Test { webrtc::InternalDataChannelInit init_; std::unique_ptr provider_; std::unique_ptr observer_; - rtc::scoped_refptr webrtc_data_channel_; + rtc::scoped_refptr webrtc_data_channel_; }; class StateSignalsListener : public sigslot::has_slots<> { @@ -101,9 +100,9 @@ class StateSignalsListener : public sigslot::has_slots<> { int opened_count() const { return opened_count_; } int closed_count() const { return closed_count_; } - void OnSignalOpened(DataChannel* data_channel) { ++opened_count_; } + void OnSignalOpened(DataChannelInterface* data_channel) { ++opened_count_; } - void OnSignalClosed(DataChannel* data_channel) { ++closed_count_; } + void OnSignalClosed(DataChannelInterface* data_channel) { ++closed_count_; } private: int opened_count_ = 0; @@ -113,9 +112,9 @@ class StateSignalsListener : public sigslot::has_slots<> { // Verifies that the data channel is connected to the transport after creation. TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) { provider_->set_transport_available(true); - rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_, - rtc::Thread::Current(), rtc::Thread::Current()); + rtc::scoped_refptr dc = + SctpDataChannel::Create(provider_.get(), "test1", init_, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_TRUE(provider_->IsConnected(dc.get())); // The sid is not set yet, so it should not have added the streams. @@ -308,9 +307,9 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) { SetChannelReady(); webrtc::InternalDataChannelInit init; init.id = 1; - rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init, - rtc::Thread::Current(), rtc::Thread::Current()); + rtc::scoped_refptr dc = + SctpDataChannel::Create(provider_.get(), "test1", init, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, dc->state()); EXPECT_TRUE_WAIT(webrtc::DataChannelInterface::kOpen == dc->state(), 1000); } @@ -322,9 +321,9 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) { webrtc::InternalDataChannelInit init; init.id = 1; init.ordered = false; - rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init, - rtc::Thread::Current(), rtc::Thread::Current()); + rtc::scoped_refptr dc = + SctpDataChannel::Create(provider_.get(), "test1", init, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); @@ -353,9 +352,9 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) { webrtc::InternalDataChannelInit init; init.id = 1; init.ordered = false; - rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init, - rtc::Thread::Current(), rtc::Thread::Current()); + rtc::scoped_refptr dc = + SctpDataChannel::Create(provider_.get(), "test1", init, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); @@ -455,9 +454,9 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) { config.open_handshake_role = webrtc::InternalDataChannelInit::kNone; SetChannelReady(); - rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config, - rtc::Thread::Current(), rtc::Thread::Current()); + rtc::scoped_refptr dc = + SctpDataChannel::Create(provider_.get(), "test1", config, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); EXPECT_EQ(0U, provider_->last_send_data_params().ssrc); @@ -519,9 +518,9 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) { config.open_handshake_role = webrtc::InternalDataChannelInit::kAcker; SetChannelReady(); - rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config, - rtc::Thread::Current(), rtc::Thread::Current()); + rtc::scoped_refptr dc = + SctpDataChannel::Create(provider_.get(), "test1", config, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); diff --git a/pc/data_channel_utils.cc b/pc/data_channel_utils.cc new file mode 100644 index 0000000000..21b1573cd7 --- /dev/null +++ b/pc/data_channel_utils.cc @@ -0,0 +1,56 @@ +/* + * Copyright 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "pc/data_channel_utils.h" + +namespace webrtc { + +bool PacketQueue::Empty() const { + return packets_.empty(); +} + +std::unique_ptr PacketQueue::PopFront() { + RTC_DCHECK(!packets_.empty()); + byte_count_ -= packets_.front()->size(); + std::unique_ptr packet = std::move(packets_.front()); + packets_.pop_front(); + return packet; +} + +void PacketQueue::PushFront(std::unique_ptr packet) { + byte_count_ += packet->size(); + packets_.push_front(std::move(packet)); +} + +void PacketQueue::PushBack(std::unique_ptr packet) { + byte_count_ += packet->size(); + packets_.push_back(std::move(packet)); +} + +void PacketQueue::Clear() { + packets_.clear(); + byte_count_ = 0; +} + +void PacketQueue::Swap(PacketQueue* other) { + size_t other_byte_count = other->byte_count_; + other->byte_count_ = byte_count_; + byte_count_ = other_byte_count; + + other->packets_.swap(packets_); +} + +bool IsSctpLike(cricket::DataChannelType type) { + return type == cricket::DCT_SCTP || type == cricket::DCT_MEDIA_TRANSPORT || + type == cricket::DCT_DATA_CHANNEL_TRANSPORT || + type == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP; +} + +} // namespace webrtc diff --git a/pc/data_channel_utils.h b/pc/data_channel_utils.h new file mode 100644 index 0000000000..13c6620cd8 --- /dev/null +++ b/pc/data_channel_utils.h @@ -0,0 +1,62 @@ +/* + * Copyright 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef PC_DATA_CHANNEL_UTILS_H_ +#define PC_DATA_CHANNEL_UTILS_H_ + +#include +#include +#include +#include + +#include "api/data_channel_interface.h" +#include "media/base/media_engine.h" + +namespace webrtc { + +// A packet queue which tracks the total queued bytes. Queued packets are +// owned by this class. +class PacketQueue final { + public: + size_t byte_count() const { return byte_count_; } + + bool Empty() const; + + std::unique_ptr PopFront(); + + void PushFront(std::unique_ptr packet); + void PushBack(std::unique_ptr packet); + + void Clear(); + + void Swap(PacketQueue* other); + + private: + std::deque> packets_; + size_t byte_count_ = 0; +}; + +struct DataChannelStats { + int internal_id; + int id; + std::string label; + std::string protocol; + DataChannelInterface::DataState state; + uint32_t messages_sent; + uint32_t messages_received; + uint64_t bytes_sent; + uint64_t bytes_received; +}; + +bool IsSctpLike(cricket::DataChannelType type); + +} // namespace webrtc + +#endif // PC_DATA_CHANNEL_UTILS_H_ diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 071867db9f..60d6b16895 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -215,7 +215,7 @@ void AddPlanBRtpSenderOptions( // Add options to |session_options| from |rtp_data_channels|. void AddRtpDataChannelOptions( - const std::map>& + const std::map>& rtp_data_channels, cricket::MediaDescriptionOptions* data_media_description_options) { if (!data_media_description_options) { @@ -223,9 +223,9 @@ void AddRtpDataChannelOptions( } // Check for data channels. for (const auto& kv : rtp_data_channels) { - const DataChannel* channel = kv.second; - if (channel->state() == DataChannel::kConnecting || - channel->state() == DataChannel::kOpen) { + const RtpDataChannel* channel = kv.second; + if (channel->state() == RtpDataChannel::kConnecting || + channel->state() == RtpDataChannel::kOpen) { // Legacy RTP data channels are signaled with the track/stream ID set to // the data channel's label. data_media_description_options->AddRtpDataChannel(channel->label(), @@ -2130,8 +2130,8 @@ rtc::scoped_refptr PeerConnection::CreateDataChannel( if (config) { internal_config.reset(new InternalDataChannelInit(*config)); } - rtc::scoped_refptr channel( - data_channel_controller_.InternalCreateDataChannel( + rtc::scoped_refptr channel( + data_channel_controller_.InternalCreateDataChannelWithProxy( label, internal_config.get())); if (!channel.get()) { return nullptr; @@ -2143,7 +2143,7 @@ rtc::scoped_refptr PeerConnection::CreateDataChannel( UpdateNegotiationNeeded(); } NoteUsageEvent(UsageEvent::DATA_ADDED); - return DataChannel::CreateProxy(std::move(channel)); + return channel; } void PeerConnection::RestartIce() { @@ -2731,7 +2731,7 @@ RTCError PeerConnection::ApplyLocalDescription( // If setting the description decided our SSL role, allocate any necessary // SCTP sids. rtc::SSLRole role; - if (DataChannel::IsSctpLike(data_channel_type()) && GetSctpSslRole(&role)) { + if (IsSctpLike(data_channel_type()) && GetSctpSslRole(&role)) { data_channel_controller_.AllocateSctpSids(role); } @@ -3170,7 +3170,7 @@ RTCError PeerConnection::ApplyRemoteDescription( // If setting the description decided our SSL role, allocate any necessary // SCTP sids. rtc::SSLRole role; - if (DataChannel::IsSctpLike(data_channel_type()) && GetSctpSslRole(&role)) { + if (IsSctpLike(data_channel_type()) && GetSctpSslRole(&role)) { data_channel_controller_.AllocateSctpSids(role); } @@ -5541,10 +5541,11 @@ void PeerConnection::OnLocalSenderRemoved(const RtpSenderInfo& sender_info, sender->internal()->SetSsrc(0); } -void PeerConnection::OnSctpDataChannelClosed(DataChannel* channel) { +void PeerConnection::OnSctpDataChannelClosed(DataChannelInterface* channel) { // Since data_channel_controller doesn't do signals, this // signal is relayed here. - data_channel_controller_.OnSctpDataChannelClosed(channel); + data_channel_controller_.OnSctpDataChannelClosed( + static_cast(channel)); } rtc::scoped_refptr> @@ -5656,7 +5657,7 @@ const PeerConnection::RtpSenderInfo* PeerConnection::FindSenderInfo( return nullptr; } -DataChannel* PeerConnection::FindDataChannelBySid(int sid) const { +SctpDataChannel* PeerConnection::FindDataChannelBySid(int sid) const { return data_channel_controller_.FindDataChannelBySid(sid); } @@ -6045,7 +6046,7 @@ cricket::IceConfig PeerConnection::ParseIceConfig( return ice_config; } -std::vector PeerConnection::GetDataChannelStats() const { +std::vector PeerConnection::GetDataChannelStats() const { RTC_DCHECK_RUN_ON(signaling_thread()); return data_channel_controller_.GetDataChannelStats(); } diff --git a/pc/peer_connection.h b/pc/peer_connection.h index a91dc9c042..4351831237 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -272,15 +272,19 @@ class PeerConnection : public PeerConnectionInternal, return transceivers_; } - sigslot::signal1& SignalDataChannelCreated() override { - return data_channel_controller_.SignalDataChannelCreated(); + sigslot::signal1& SignalRtpDataChannelCreated() override { + return data_channel_controller_.SignalRtpDataChannelCreated(); + } + + sigslot::signal1& SignalSctpDataChannelCreated() override { + return data_channel_controller_.SignalSctpDataChannelCreated(); } cricket::RtpDataChannel* rtp_data_channel() const override { return data_channel_controller_.rtp_data_channel(); } - std::vector GetDataChannelStats() const override; + std::vector GetDataChannelStats() const override; absl::optional sctp_transport_name() const override; @@ -310,7 +314,7 @@ class PeerConnection : public PeerConnectionInternal, // Get current SSL role used by SCTP's underlying transport. bool GetSctpSslRole(rtc::SSLRole* role); // Handler for the "channel closed" signal - void OnSctpDataChannelClosed(DataChannel* channel); + void OnSctpDataChannelClosed(DataChannelInterface* channel); // Functions made public for testing. void ReturnHistogramVeryQuicklyForTesting() { @@ -835,7 +839,7 @@ class PeerConnection : public PeerConnectionInternal, // Returns the specified SCTP DataChannel in sctp_data_channels_, // or nullptr if not found. - DataChannel* FindDataChannelBySid(int sid) const + SctpDataChannel* FindDataChannelBySid(int sid) const RTC_RUN_ON(signaling_thread()); // Called when first configuring the port allocator. diff --git a/pc/peer_connection_internal.h b/pc/peer_connection_internal.h index 66d585b592..1a78ed204b 100644 --- a/pc/peer_connection_internal.h +++ b/pc/peer_connection_internal.h @@ -19,8 +19,9 @@ #include "api/peer_connection_interface.h" #include "call/call.h" -#include "pc/data_channel.h" +#include "pc/rtp_data_channel.h" #include "pc/rtp_transceiver.h" +#include "pc/sctp_data_channel.h" namespace webrtc { @@ -41,14 +42,16 @@ class PeerConnectionInternal : public PeerConnectionInterface { rtc::scoped_refptr>> GetTransceiversInternal() const = 0; - virtual sigslot::signal1& SignalDataChannelCreated() = 0; + virtual sigslot::signal1& SignalRtpDataChannelCreated() = 0; + virtual sigslot::signal1& + SignalSctpDataChannelCreated() = 0; // Only valid when using deprecated RTP data channels. virtual cricket::RtpDataChannel* rtp_data_channel() const = 0; // Call on the network thread to fetch stats for all the data channels. // TODO(tommi): Make pure virtual after downstream updates. - virtual std::vector GetDataChannelStats() const { + virtual std::vector GetDataChannelStats() const { return {}; } diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc index 9e9213eb72..be0bbd6f73 100644 --- a/pc/rtc_stats_collector.cc +++ b/pc/rtc_stats_collector.cc @@ -1013,8 +1013,10 @@ RTCStatsCollector::RTCStatsCollector(PeerConnectionInternal* pc, RTC_DCHECK(worker_thread_); RTC_DCHECK(network_thread_); RTC_DCHECK_GE(cache_lifetime_us_, 0); - pc_->SignalDataChannelCreated().connect( - this, &RTCStatsCollector::OnDataChannelCreated); + pc_->SignalRtpDataChannelCreated().connect( + this, &RTCStatsCollector::OnRtpDataChannelCreated); + pc_->SignalSctpDataChannelCreated().connect( + this, &RTCStatsCollector::OnSctpDataChannelCreated); } RTCStatsCollector::~RTCStatsCollector() { @@ -1324,8 +1326,7 @@ void RTCStatsCollector::ProduceDataChannelStats_s( RTCStatsReport* report) const { RTC_DCHECK_RUN_ON(signaling_thread_); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; - - std::vector data_stats = pc_->GetDataChannelStats(); + std::vector data_stats = pc_->GetDataChannelStats(); for (const auto& stats : data_stats) { std::unique_ptr data_channel_stats( new RTCDataChannelStats( @@ -2017,12 +2018,17 @@ std::set RTCStatsCollector::PrepareTransportNames_s() const { return transport_names; } -void RTCStatsCollector::OnDataChannelCreated(DataChannel* channel) { +void RTCStatsCollector::OnRtpDataChannelCreated(RtpDataChannel* channel) { channel->SignalOpened.connect(this, &RTCStatsCollector::OnDataChannelOpened); channel->SignalClosed.connect(this, &RTCStatsCollector::OnDataChannelClosed); } -void RTCStatsCollector::OnDataChannelOpened(DataChannel* channel) { +void RTCStatsCollector::OnSctpDataChannelCreated(SctpDataChannel* channel) { + channel->SignalOpened.connect(this, &RTCStatsCollector::OnDataChannelOpened); + channel->SignalClosed.connect(this, &RTCStatsCollector::OnDataChannelClosed); +} + +void RTCStatsCollector::OnDataChannelOpened(DataChannelInterface* channel) { RTC_DCHECK(signaling_thread_->IsCurrent()); bool result = internal_record_.opened_data_channels .insert(reinterpret_cast(channel)) @@ -2031,7 +2037,7 @@ void RTCStatsCollector::OnDataChannelOpened(DataChannel* channel) { RTC_DCHECK(result); } -void RTCStatsCollector::OnDataChannelClosed(DataChannel* channel) { +void RTCStatsCollector::OnDataChannelClosed(DataChannelInterface* channel) { RTC_DCHECK(signaling_thread_->IsCurrent()); // Only channels that have been fully opened (and have increased the // |data_channels_opened_| counter) increase the closed counter. diff --git a/pc/rtc_stats_collector.h b/pc/rtc_stats_collector.h index 24b9ef21cf..e1bc27d9e5 100644 --- a/pc/rtc_stats_collector.h +++ b/pc/rtc_stats_collector.h @@ -24,7 +24,7 @@ #include "api/stats/rtcstats_objects.h" #include "call/call.h" #include "media/base/media_channel.h" -#include "pc/data_channel.h" +#include "pc/data_channel_utils.h" #include "pc/peer_connection_internal.h" #include "pc/track_media_info_map.h" #include "rtc_base/event.h" @@ -226,10 +226,11 @@ class RTCStatsCollector : public virtual rtc::RefCountInterface, void MergeNetworkReport_s(); // Slots for signals (sigslot) that are wired up to |pc_|. - void OnDataChannelCreated(DataChannel* channel); + void OnRtpDataChannelCreated(RtpDataChannel* channel); + void OnSctpDataChannelCreated(SctpDataChannel* channel); // Slots for signals (sigslot) that are wired up to |channel|. - void OnDataChannelOpened(DataChannel* channel); - void OnDataChannelClosed(DataChannel* channel); + void OnDataChannelOpened(DataChannelInterface* channel); + void OnDataChannelClosed(DataChannelInterface* channel); PeerConnectionInternal* const pc_; rtc::Thread* const signaling_thread_; diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc index 60b1a78b22..af9e5456d4 100644 --- a/pc/rtc_stats_collector_unittest.cc +++ b/pc/rtc_stats_collector_unittest.cc @@ -32,6 +32,7 @@ #include "p2p/base/port.h" #include "pc/media_stream.h" #include "pc/media_stream_track.h" +#include "pc/test/fake_data_channel_provider.h" #include "pc/test/fake_peer_connection_for_stats.h" #include "pc/test/mock_data_channel.h" #include "pc/test/mock_rtp_receiver_internal.h" @@ -976,9 +977,9 @@ TEST_F(RTCStatsCollectorTest, CollectRTCCertificateStatsChain) { TEST_F(RTCStatsCollectorTest, CollectTwoRTCDataChannelStatsWithPendingId) { pc_->AddSctpDataChannel( - new MockDataChannel(/*id=*/-1, DataChannelInterface::kConnecting)); + new MockSctpDataChannel(/*id=*/-1, DataChannelInterface::kConnecting)); pc_->AddSctpDataChannel( - new MockDataChannel(/*id=*/-1, DataChannelInterface::kConnecting)); + new MockSctpDataChannel(/*id=*/-1, DataChannelInterface::kConnecting)); rtc::scoped_refptr report = stats_->GetStatsReport(); } @@ -987,12 +988,12 @@ TEST_F(RTCStatsCollectorTest, CollectRTCDataChannelStats) { // Note: The test assumes data channel IDs are predictable. // This is not a safe assumption, but in order to make it work for // the test, we reset the ID allocator at test start. - DataChannel::ResetInternalIdAllocatorForTesting(-1); - pc_->AddSctpDataChannel(new MockDataChannel(0, "MockDataChannel0", - DataChannelInterface::kConnecting, - "udp", 1, 2, 3, 4)); + SctpDataChannel::ResetInternalIdAllocatorForTesting(-1); + pc_->AddSctpDataChannel(new MockSctpDataChannel( + 0, "MockSctpDataChannel0", DataChannelInterface::kConnecting, "udp", 1, 2, + 3, 4)); RTCDataChannelStats expected_data_channel0("RTCDataChannel_0", 0); - expected_data_channel0.label = "MockDataChannel0"; + expected_data_channel0.label = "MockSctpDataChannel0"; expected_data_channel0.protocol = "udp"; expected_data_channel0.data_channel_identifier = 0; expected_data_channel0.state = "connecting"; @@ -1001,10 +1002,11 @@ TEST_F(RTCStatsCollectorTest, CollectRTCDataChannelStats) { expected_data_channel0.messages_received = 3; expected_data_channel0.bytes_received = 4; - pc_->AddSctpDataChannel(new MockDataChannel( - 1, "MockDataChannel1", DataChannelInterface::kOpen, "tcp", 5, 6, 7, 8)); + pc_->AddSctpDataChannel(new MockSctpDataChannel(1, "MockSctpDataChannel1", + DataChannelInterface::kOpen, + "tcp", 5, 6, 7, 8)); RTCDataChannelStats expected_data_channel1("RTCDataChannel_1", 0); - expected_data_channel1.label = "MockDataChannel1"; + expected_data_channel1.label = "MockSctpDataChannel1"; expected_data_channel1.protocol = "tcp"; expected_data_channel1.data_channel_identifier = 1; expected_data_channel1.state = "open"; @@ -1013,11 +1015,11 @@ TEST_F(RTCStatsCollectorTest, CollectRTCDataChannelStats) { expected_data_channel1.messages_received = 7; expected_data_channel1.bytes_received = 8; - pc_->AddSctpDataChannel(new MockDataChannel(2, "MockDataChannel2", - DataChannelInterface::kClosing, - "udp", 9, 10, 11, 12)); + pc_->AddSctpDataChannel(new MockSctpDataChannel( + 2, "MockSctpDataChannel2", DataChannelInterface::kClosing, "udp", 9, 10, + 11, 12)); RTCDataChannelStats expected_data_channel2("RTCDataChannel_2", 0); - expected_data_channel2.label = "MockDataChannel2"; + expected_data_channel2.label = "MockSctpDataChannel2"; expected_data_channel2.protocol = "udp"; expected_data_channel2.data_channel_identifier = 2; expected_data_channel2.state = "closing"; @@ -1026,11 +1028,11 @@ TEST_F(RTCStatsCollectorTest, CollectRTCDataChannelStats) { expected_data_channel2.messages_received = 11; expected_data_channel2.bytes_received = 12; - pc_->AddSctpDataChannel(new MockDataChannel(3, "MockDataChannel3", - DataChannelInterface::kClosed, - "tcp", 13, 14, 15, 16)); + pc_->AddSctpDataChannel(new MockSctpDataChannel(3, "MockSctpDataChannel3", + DataChannelInterface::kClosed, + "tcp", 13, 14, 15, 16)); RTCDataChannelStats expected_data_channel3("RTCDataChannel_3", 0); - expected_data_channel3.label = "MockDataChannel3"; + expected_data_channel3.label = "MockSctpDataChannel3"; expected_data_channel3.protocol = "tcp"; expected_data_channel3.data_channel_identifier = 3; expected_data_channel3.state = "closed"; @@ -1400,14 +1402,15 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { } // TODO(bugs.webrtc.org/11547): Supply a separate network thread. - rtc::scoped_refptr dummy_channel_a = DataChannel::Create( - nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit(), + FakeDataChannelProvider provider; + rtc::scoped_refptr dummy_channel_a = SctpDataChannel::Create( + &provider, "DummyChannelA", InternalDataChannelInit(), rtc::Thread::Current(), rtc::Thread::Current()); - pc_->SignalDataChannelCreated()(dummy_channel_a.get()); - rtc::scoped_refptr dummy_channel_b = DataChannel::Create( - nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit(), + pc_->SignalSctpDataChannelCreated()(dummy_channel_a.get()); + rtc::scoped_refptr dummy_channel_b = SctpDataChannel::Create( + &provider, "DummyChannelB", InternalDataChannelInit(), rtc::Thread::Current(), rtc::Thread::Current()); - pc_->SignalDataChannelCreated()(dummy_channel_b.get()); + pc_->SignalSctpDataChannelCreated()(dummy_channel_b.get()); dummy_channel_a->SignalOpened(dummy_channel_a.get()); // Closing a channel that is not opened should not affect the counts. diff --git a/pc/rtp_data_channel.cc b/pc/rtp_data_channel.cc new file mode 100644 index 0000000000..b08b2b2ffb --- /dev/null +++ b/pc/rtp_data_channel.cc @@ -0,0 +1,394 @@ +/* + * Copyright 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "pc/rtp_data_channel.h" + +#include +#include +#include + +#include "api/proxy.h" +#include "rtc_base/checks.h" +#include "rtc_base/location.h" +#include "rtc_base/logging.h" +#include "rtc_base/ref_counted_object.h" +#include "rtc_base/thread.h" + +namespace webrtc { + +namespace { + +static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024; + +static std::atomic g_unique_id{0}; + +int GenerateUniqueId() { + return ++g_unique_id; +} + +// Define proxy for DataChannelInterface. +BEGIN_SIGNALING_PROXY_MAP(DataChannel) +PROXY_SIGNALING_THREAD_DESTRUCTOR() +PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*) +PROXY_METHOD0(void, UnregisterObserver) +BYPASS_PROXY_CONSTMETHOD0(std::string, label) +BYPASS_PROXY_CONSTMETHOD0(bool, reliable) +BYPASS_PROXY_CONSTMETHOD0(bool, ordered) +BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime) +BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits) +BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxRetransmitsOpt) +BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxPacketLifeTime) +BYPASS_PROXY_CONSTMETHOD0(std::string, protocol) +BYPASS_PROXY_CONSTMETHOD0(bool, negotiated) +// Can't bypass the proxy since the id may change. +PROXY_CONSTMETHOD0(int, id) +BYPASS_PROXY_CONSTMETHOD0(Priority, priority) +PROXY_CONSTMETHOD0(DataState, state) +PROXY_CONSTMETHOD0(RTCError, error) +PROXY_CONSTMETHOD0(uint32_t, messages_sent) +PROXY_CONSTMETHOD0(uint64_t, bytes_sent) +PROXY_CONSTMETHOD0(uint32_t, messages_received) +PROXY_CONSTMETHOD0(uint64_t, bytes_received) +PROXY_CONSTMETHOD0(uint64_t, buffered_amount) +PROXY_METHOD0(void, Close) +// TODO(bugs.webrtc.org/11547): Change to run on the network thread. +PROXY_METHOD1(bool, Send, const DataBuffer&) +END_PROXY_MAP() + +} // namespace + +rtc::scoped_refptr RtpDataChannel::Create( + RtpDataChannelProviderInterface* provider, + const std::string& label, + const DataChannelInit& config, + rtc::Thread* signaling_thread) { + rtc::scoped_refptr channel( + new rtc::RefCountedObject(config, provider, label, + signaling_thread)); + if (!channel->Init()) { + return nullptr; + } + return channel; +} + +// static +rtc::scoped_refptr RtpDataChannel::CreateProxy( + rtc::scoped_refptr channel) { + return DataChannelProxy::Create(channel->signaling_thread_, channel.get()); +} + +RtpDataChannel::RtpDataChannel(const DataChannelInit& config, + RtpDataChannelProviderInterface* provider, + const std::string& label, + rtc::Thread* signaling_thread) + : signaling_thread_(signaling_thread), + internal_id_(GenerateUniqueId()), + label_(label), + config_(config), + provider_(provider) { + RTC_DCHECK_RUN_ON(signaling_thread_); +} + +bool RtpDataChannel::Init() { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (config_.reliable || config_.id != -1 || config_.maxRetransmits || + config_.maxRetransmitTime) { + RTC_LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " + "invalid DataChannelInit."; + return false; + } + + return true; +} + +RtpDataChannel::~RtpDataChannel() { + RTC_DCHECK_RUN_ON(signaling_thread_); +} + +void RtpDataChannel::RegisterObserver(DataChannelObserver* observer) { + RTC_DCHECK_RUN_ON(signaling_thread_); + observer_ = observer; + DeliverQueuedReceivedData(); +} + +void RtpDataChannel::UnregisterObserver() { + RTC_DCHECK_RUN_ON(signaling_thread_); + observer_ = nullptr; +} + +void RtpDataChannel::Close() { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (state_ == kClosed) + return; + send_ssrc_ = 0; + send_ssrc_set_ = false; + SetState(kClosing); + UpdateState(); +} + +RtpDataChannel::DataState RtpDataChannel::state() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return state_; +} + +RTCError RtpDataChannel::error() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return error_; +} + +uint32_t RtpDataChannel::messages_sent() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return messages_sent_; +} + +uint64_t RtpDataChannel::bytes_sent() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return bytes_sent_; +} + +uint32_t RtpDataChannel::messages_received() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return messages_received_; +} + +uint64_t RtpDataChannel::bytes_received() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return bytes_received_; +} + +bool RtpDataChannel::Send(const DataBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); + + if (state_ != kOpen) { + return false; + } + + // TODO(jiayl): the spec is unclear about if the remote side should get the + // onmessage event. We need to figure out the expected behavior and change the + // code accordingly. + if (buffer.size() == 0) { + return true; + } + + return SendDataMessage(buffer); +} + +void RtpDataChannel::SetReceiveSsrc(uint32_t receive_ssrc) { + RTC_DCHECK_RUN_ON(signaling_thread_); + + if (receive_ssrc_set_) { + return; + } + receive_ssrc_ = receive_ssrc; + receive_ssrc_set_ = true; + UpdateState(); +} + +void RtpDataChannel::OnTransportChannelClosed() { + RTCError error = RTCError(RTCErrorType::OPERATION_ERROR_WITH_DATA, + "Transport channel closed"); + CloseAbruptlyWithError(std::move(error)); +} + +DataChannelStats RtpDataChannel::GetStats() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + DataChannelStats stats{internal_id_, id(), label(), + protocol(), state(), messages_sent(), + messages_received(), bytes_sent(), bytes_received()}; + return stats; +} + +// The remote peer request that this channel shall be closed. +void RtpDataChannel::RemotePeerRequestClose() { + // Close with error code explicitly set to OK. + CloseAbruptlyWithError(RTCError()); +} + +void RtpDataChannel::SetSendSsrc(uint32_t send_ssrc) { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (send_ssrc_set_) { + return; + } + send_ssrc_ = send_ssrc; + send_ssrc_set_ = true; + UpdateState(); +} + +void RtpDataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& payload) { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (params.ssrc != receive_ssrc_) { + return; + } + + RTC_DCHECK(params.type == cricket::DMT_BINARY || + params.type == cricket::DMT_TEXT); + + RTC_LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " + << params.sid; + + bool binary = (params.type == cricket::DMT_BINARY); + auto buffer = std::make_unique(payload, binary); + if (state_ == kOpen && observer_) { + ++messages_received_; + bytes_received_ += buffer->size(); + observer_->OnMessage(*buffer.get()); + } else { + if (queued_received_data_.byte_count() + payload.size() > + kMaxQueuedReceivedDataBytes) { + RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size."; + + queued_received_data_.Clear(); + CloseAbruptlyWithError( + RTCError(RTCErrorType::RESOURCE_EXHAUSTED, + "Queued received data exceeds the max buffer size.")); + + return; + } + queued_received_data_.PushBack(std::move(buffer)); + } +} + +void RtpDataChannel::OnChannelReady(bool writable) { + RTC_DCHECK_RUN_ON(signaling_thread_); + + writable_ = writable; + if (!writable) { + return; + } + + UpdateState(); +} + +void RtpDataChannel::CloseAbruptlyWithError(RTCError error) { + RTC_DCHECK_RUN_ON(signaling_thread_); + + if (state_ == kClosed) { + return; + } + + if (connected_to_provider_) { + DisconnectFromProvider(); + } + + // Still go to "kClosing" before "kClosed", since observers may be expecting + // that. + SetState(kClosing); + error_ = std::move(error); + SetState(kClosed); +} + +void RtpDataChannel::UpdateState() { + RTC_DCHECK_RUN_ON(signaling_thread_); + // UpdateState determines what to do from a few state variables. Include + // all conditions required for each state transition here for + // clarity. + switch (state_) { + case kConnecting: { + if (send_ssrc_set_ == receive_ssrc_set_) { + if (!connected_to_provider_) { + connected_to_provider_ = provider_->ConnectDataChannel(this); + } + if (connected_to_provider_ && writable_) { + SetState(kOpen); + // If we have received buffers before the channel got writable. + // Deliver them now. + DeliverQueuedReceivedData(); + } + } + break; + } + case kOpen: { + break; + } + case kClosing: { + // For RTP data channels, we can go to "closed" after we finish + // sending data and the send/recv SSRCs are unset. + if (connected_to_provider_) { + DisconnectFromProvider(); + } + if (!send_ssrc_set_ && !receive_ssrc_set_) { + SetState(kClosed); + } + break; + } + case kClosed: + break; + } +} + +void RtpDataChannel::SetState(DataState state) { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (state_ == state) { + return; + } + + state_ = state; + if (observer_) { + observer_->OnStateChange(); + } + if (state_ == kOpen) { + SignalOpened(this); + } else if (state_ == kClosed) { + SignalClosed(this); + } +} + +void RtpDataChannel::DisconnectFromProvider() { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (!connected_to_provider_) + return; + + provider_->DisconnectDataChannel(this); + connected_to_provider_ = false; +} + +void RtpDataChannel::DeliverQueuedReceivedData() { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (!observer_) { + return; + } + + while (!queued_received_data_.Empty()) { + std::unique_ptr buffer = queued_received_data_.PopFront(); + ++messages_received_; + bytes_received_ += buffer->size(); + observer_->OnMessage(*buffer); + } +} + +bool RtpDataChannel::SendDataMessage(const DataBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); + cricket::SendDataParams send_params; + + send_params.ssrc = send_ssrc_; + send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; + + cricket::SendDataResult send_result = cricket::SDR_SUCCESS; + bool success = provider_->SendData(send_params, buffer.data, &send_result); + + if (success) { + ++messages_sent_; + bytes_sent_ += buffer.size(); + if (observer_ && buffer.size() > 0) { + observer_->OnBufferedAmountChange(buffer.size()); + } + return true; + } + + return false; +} + +// static +void RtpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) { + g_unique_id = new_value; +} + +} // namespace webrtc diff --git a/pc/rtp_data_channel.h b/pc/rtp_data_channel.h new file mode 100644 index 0000000000..adc724d64b --- /dev/null +++ b/pc/rtp_data_channel.h @@ -0,0 +1,199 @@ +/* + * Copyright 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef PC_RTP_DATA_CHANNEL_H_ +#define PC_RTP_DATA_CHANNEL_H_ + +#include +#include + +#include "api/data_channel_interface.h" +#include "api/priority.h" +#include "api/scoped_refptr.h" +#include "api/transport/data_channel_transport_interface.h" +#include "media/base/media_channel.h" +#include "pc/channel.h" +#include "pc/data_channel_utils.h" +#include "rtc_base/async_invoker.h" +#include "rtc_base/third_party/sigslot/sigslot.h" + +namespace webrtc { + +class RtpDataChannel; + +// TODO(deadbeef): Once RTP data channels go away, get rid of this and have +// DataChannel depend on SctpTransportInternal (pure virtual SctpTransport +// interface) instead. +class RtpDataChannelProviderInterface { + public: + // Sends the data to the transport. + virtual bool SendData(const cricket::SendDataParams& params, + const rtc::CopyOnWriteBuffer& payload, + cricket::SendDataResult* result) = 0; + // Connects to the transport signals. + virtual bool ConnectDataChannel(RtpDataChannel* data_channel) = 0; + // Disconnects from the transport signals. + virtual void DisconnectDataChannel(RtpDataChannel* data_channel) = 0; + // Returns true if the transport channel is ready to send data. + virtual bool ReadyToSendData() const = 0; + + protected: + virtual ~RtpDataChannelProviderInterface() {} +}; + +// RtpDataChannel is an implementation of the DataChannelInterface based on +// libjingle's data engine. It provides an implementation of unreliable data +// channels. + +// DataChannel states: +// kConnecting: The channel has been created the transport might not yet be +// ready. +// kOpen: The channel have a local SSRC set by a call to UpdateSendSsrc +// and a remote SSRC set by call to UpdateReceiveSsrc and the transport +// has been writable once. +// kClosing: DataChannelInterface::Close has been called or UpdateReceiveSsrc +// has been called with SSRC==0 +// kClosed: Both UpdateReceiveSsrc and UpdateSendSsrc has been called with +// SSRC==0. +class RtpDataChannel : public DataChannelInterface, + public sigslot::has_slots<> { + public: + static rtc::scoped_refptr Create( + RtpDataChannelProviderInterface* provider, + const std::string& label, + const DataChannelInit& config, + rtc::Thread* signaling_thread); + + // Instantiates an API proxy for a DataChannel instance that will be handed + // out to external callers. + static rtc::scoped_refptr CreateProxy( + rtc::scoped_refptr channel); + + void RegisterObserver(DataChannelObserver* observer) override; + void UnregisterObserver() override; + + std::string label() const override { return label_; } + bool reliable() const override { return false; } + bool ordered() const override { return config_.ordered; } + // Backwards compatible accessors + uint16_t maxRetransmitTime() const override { + return config_.maxRetransmitTime ? *config_.maxRetransmitTime + : static_cast(-1); + } + uint16_t maxRetransmits() const override { + return config_.maxRetransmits ? *config_.maxRetransmits + : static_cast(-1); + } + absl::optional maxPacketLifeTime() const override { + return config_.maxRetransmitTime; + } + absl::optional maxRetransmitsOpt() const override { + return config_.maxRetransmits; + } + std::string protocol() const override { return config_.protocol; } + bool negotiated() const override { return config_.negotiated; } + int id() const override { return config_.id; } + Priority priority() const override { + return config_.priority ? *config_.priority : Priority::kLow; + } + + virtual int internal_id() const { return internal_id_; } + + uint64_t buffered_amount() const override { return 0; } + void Close() override; + DataState state() const override; + RTCError error() const override; + uint32_t messages_sent() const override; + uint64_t bytes_sent() const override; + uint32_t messages_received() const override; + uint64_t bytes_received() const override; + bool Send(const DataBuffer& buffer) override; + + // Close immediately, ignoring any queued data or closing procedure. + // This is called when SDP indicates a channel should be removed. + void CloseAbruptlyWithError(RTCError error); + + // Called when the channel's ready to use. That can happen when the + // underlying DataMediaChannel becomes ready, or when this channel is a new + // stream on an existing DataMediaChannel, and we've finished negotiation. + void OnChannelReady(bool writable); + + // Slots for provider to connect signals to. + void OnDataReceived(const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& payload); + + // Called when the transport channel is unusable. + // This method makes sure the DataChannel is disconnected and changes state + // to kClosed. + void OnTransportChannelClosed(); + + DataChannelStats GetStats() const; + + // The remote peer requested that this channel should be closed. + void RemotePeerRequestClose(); + // Set the SSRC this channel should use to send data on the + // underlying data engine. |send_ssrc| == 0 means that the channel is no + // longer part of the session negotiation. + void SetSendSsrc(uint32_t send_ssrc); + // Set the SSRC this channel should use to receive data from the + // underlying data engine. + void SetReceiveSsrc(uint32_t receive_ssrc); + + // Emitted when state transitions to kOpen. + sigslot::signal1 SignalOpened; + // Emitted when state transitions to kClosed. + sigslot::signal1 SignalClosed; + + // Reset the allocator for internal ID values for testing, so that + // the internal IDs generated are predictable. Test only. + static void ResetInternalIdAllocatorForTesting(int new_value); + + protected: + RtpDataChannel(const DataChannelInit& config, + RtpDataChannelProviderInterface* client, + const std::string& label, + rtc::Thread* signaling_thread); + ~RtpDataChannel() override; + + private: + bool Init(); + void UpdateState(); + void SetState(DataState state); + void DisconnectFromProvider(); + + void DeliverQueuedReceivedData(); + + bool SendDataMessage(const DataBuffer& buffer); + + rtc::Thread* const signaling_thread_; + const int internal_id_; + const std::string label_; + const DataChannelInit config_; + DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr; + DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting; + RTCError error_ RTC_GUARDED_BY(signaling_thread_); + uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0; + RtpDataChannelProviderInterface* const provider_; + bool connected_to_provider_ RTC_GUARDED_BY(signaling_thread_) = false; + bool send_ssrc_set_ RTC_GUARDED_BY(signaling_thread_) = false; + bool receive_ssrc_set_ RTC_GUARDED_BY(signaling_thread_) = false; + bool writable_ RTC_GUARDED_BY(signaling_thread_) = false; + uint32_t send_ssrc_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint32_t receive_ssrc_ RTC_GUARDED_BY(signaling_thread_) = 0; + PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_); + rtc::AsyncInvoker invoker_ RTC_GUARDED_BY(signaling_thread_); +}; + +} // namespace webrtc + +#endif // PC_RTP_DATA_CHANNEL_H_ diff --git a/pc/data_channel.cc b/pc/sctp_data_channel.cc similarity index 59% rename from pc/data_channel.cc rename to pc/sctp_data_channel.cc index fcf38f9574..e603dddd0f 100644 --- a/pc/data_channel.cc +++ b/pc/sctp_data_channel.cc @@ -1,5 +1,5 @@ /* - * Copyright 2012 The WebRTC project authors. All Rights Reserved. + * Copyright 2020 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source @@ -8,7 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "pc/data_channel.h" +#include "pc/sctp_data_channel.h" #include #include @@ -25,17 +25,46 @@ namespace webrtc { +namespace { + static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024; static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024; -namespace { - static std::atomic g_unique_id{0}; int GenerateUniqueId() { return ++g_unique_id; } +// Define proxy for DataChannelInterface. +BEGIN_SIGNALING_PROXY_MAP(DataChannel) +PROXY_SIGNALING_THREAD_DESTRUCTOR() +PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*) +PROXY_METHOD0(void, UnregisterObserver) +BYPASS_PROXY_CONSTMETHOD0(std::string, label) +BYPASS_PROXY_CONSTMETHOD0(bool, reliable) +BYPASS_PROXY_CONSTMETHOD0(bool, ordered) +BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime) +BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits) +BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxRetransmitsOpt) +BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxPacketLifeTime) +BYPASS_PROXY_CONSTMETHOD0(std::string, protocol) +BYPASS_PROXY_CONSTMETHOD0(bool, negotiated) +// Can't bypass the proxy since the id may change. +PROXY_CONSTMETHOD0(int, id) +BYPASS_PROXY_CONSTMETHOD0(Priority, priority) +PROXY_CONSTMETHOD0(DataState, state) +PROXY_CONSTMETHOD0(RTCError, error) +PROXY_CONSTMETHOD0(uint32_t, messages_sent) +PROXY_CONSTMETHOD0(uint64_t, bytes_sent) +PROXY_CONSTMETHOD0(uint32_t, messages_received) +PROXY_CONSTMETHOD0(uint64_t, bytes_received) +PROXY_CONSTMETHOD0(uint64_t, buffered_amount) +PROXY_METHOD0(void, Close) +// TODO(bugs.webrtc.org/11547): Change to run on the network thread. +PROXY_METHOD1(bool, Send, const DataBuffer&) +END_PROXY_MAP() + } // namespace InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base) @@ -99,258 +128,156 @@ bool SctpSidAllocator::IsSidAvailable(int sid) const { return used_sids_.find(sid) == used_sids_.end(); } -bool DataChannel::PacketQueue::Empty() const { - return packets_.empty(); -} - -std::unique_ptr DataChannel::PacketQueue::PopFront() { - RTC_DCHECK(!packets_.empty()); - byte_count_ -= packets_.front()->size(); - std::unique_ptr packet = std::move(packets_.front()); - packets_.pop_front(); - return packet; -} - -void DataChannel::PacketQueue::PushFront(std::unique_ptr packet) { - byte_count_ += packet->size(); - packets_.push_front(std::move(packet)); -} - -void DataChannel::PacketQueue::PushBack(std::unique_ptr packet) { - byte_count_ += packet->size(); - packets_.push_back(std::move(packet)); -} - -void DataChannel::PacketQueue::Clear() { - packets_.clear(); - byte_count_ = 0; -} - -void DataChannel::PacketQueue::Swap(PacketQueue* other) { - size_t other_byte_count = other->byte_count_; - other->byte_count_ = byte_count_; - byte_count_ = other_byte_count; - - other->packets_.swap(packets_); -} - -rtc::scoped_refptr DataChannel::Create( - DataChannelProviderInterface* provider, - cricket::DataChannelType dct, +rtc::scoped_refptr SctpDataChannel::Create( + SctpDataChannelProviderInterface* provider, const std::string& label, const InternalDataChannelInit& config, rtc::Thread* signaling_thread, rtc::Thread* network_thread) { - rtc::scoped_refptr channel( - new rtc::RefCountedObject(config, provider, dct, label, - signaling_thread, network_thread)); + rtc::scoped_refptr channel( + new rtc::RefCountedObject( + config, provider, label, signaling_thread, network_thread)); if (!channel->Init()) { return nullptr; } return channel; } -// Define proxy for DataChannelInterface. -BEGIN_SIGNALING_PROXY_MAP(DataChannel) -PROXY_SIGNALING_THREAD_DESTRUCTOR() -PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*) -PROXY_METHOD0(void, UnregisterObserver) -BYPASS_PROXY_CONSTMETHOD0(std::string, label) -BYPASS_PROXY_CONSTMETHOD0(bool, reliable) -BYPASS_PROXY_CONSTMETHOD0(bool, ordered) -BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime) -BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits) -BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxRetransmitsOpt) -BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxPacketLifeTime) -BYPASS_PROXY_CONSTMETHOD0(std::string, protocol) -BYPASS_PROXY_CONSTMETHOD0(bool, negotiated) -// Can't bypass the proxy since the id may change. -PROXY_CONSTMETHOD0(int, id) -BYPASS_PROXY_CONSTMETHOD0(Priority, priority) -PROXY_CONSTMETHOD0(DataState, state) -PROXY_CONSTMETHOD0(RTCError, error) -PROXY_CONSTMETHOD0(uint32_t, messages_sent) -PROXY_CONSTMETHOD0(uint64_t, bytes_sent) -PROXY_CONSTMETHOD0(uint32_t, messages_received) -PROXY_CONSTMETHOD0(uint64_t, bytes_received) -PROXY_CONSTMETHOD0(uint64_t, buffered_amount) -PROXY_METHOD0(void, Close) -// TODO(bugs.webrtc.org/11547): Change to run on the network thread. -PROXY_METHOD1(bool, Send, const DataBuffer&) -END_PROXY_MAP() - // static -rtc::scoped_refptr DataChannel::CreateProxy( - rtc::scoped_refptr channel) { +rtc::scoped_refptr SctpDataChannel::CreateProxy( + rtc::scoped_refptr channel) { // TODO(bugs.webrtc.org/11547): incorporate the network thread in the proxy. // Also, consider allowing the proxy object to own the reference (std::move). // As is, the proxy has a raw pointer and no reference to the channel object // and trusting that the lifetime management aligns with the - // sctp_data_channels_ array in DataChannelController. + // sctp_data_channels_ array in SctpDataChannelController. return DataChannelProxy::Create(channel->signaling_thread_, channel.get()); } -bool DataChannel::IsSctpLike(cricket::DataChannelType type) { - return type == cricket::DCT_SCTP || type == cricket::DCT_MEDIA_TRANSPORT || - type == cricket::DCT_DATA_CHANNEL_TRANSPORT || - type == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP; -} - -DataChannel::DataChannel(const InternalDataChannelInit& config, - DataChannelProviderInterface* provider, - cricket::DataChannelType dct, - const std::string& label, - rtc::Thread* signaling_thread, - rtc::Thread* network_thread) +SctpDataChannel::SctpDataChannel(const InternalDataChannelInit& config, + SctpDataChannelProviderInterface* provider, + const std::string& label, + rtc::Thread* signaling_thread, + rtc::Thread* network_thread) : signaling_thread_(signaling_thread), network_thread_(network_thread), internal_id_(GenerateUniqueId()), label_(label), config_(config), observer_(nullptr), - state_(kConnecting), - messages_sent_(0), - bytes_sent_(0), - messages_received_(0), - bytes_received_(0), - buffered_amount_(0), - data_channel_type_(dct), - provider_(provider), - handshake_state_(kHandshakeInit), - connected_to_provider_(false), - send_ssrc_set_(false), - receive_ssrc_set_(false), - writable_(false), - send_ssrc_(0), - receive_ssrc_(0) { + provider_(provider) { RTC_DCHECK_RUN_ON(signaling_thread_); } -bool DataChannel::Init() { +bool SctpDataChannel::Init() { RTC_DCHECK_RUN_ON(signaling_thread_); - if (data_channel_type_ == cricket::DCT_RTP) { - if (config_.reliable || config_.id != -1 || config_.maxRetransmits || - config_.maxRetransmitTime) { - RTC_LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " - "invalid DataChannelInit."; - return false; - } - handshake_state_ = kHandshakeReady; - } else if (IsSctpLike(data_channel_type_)) { - if (config_.id < -1 || - (config_.maxRetransmits && *config_.maxRetransmits < 0) || - (config_.maxRetransmitTime && *config_.maxRetransmitTime < 0)) { - RTC_LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to " - "invalid DataChannelInit."; - return false; - } - if (config_.maxRetransmits && config_.maxRetransmitTime) { - RTC_LOG(LS_ERROR) - << "maxRetransmits and maxRetransmitTime should not be both set."; - return false; - } + if (config_.id < -1 || + (config_.maxRetransmits && *config_.maxRetransmits < 0) || + (config_.maxRetransmitTime && *config_.maxRetransmitTime < 0)) { + RTC_LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to " + "invalid DataChannelInit."; + return false; + } + if (config_.maxRetransmits && config_.maxRetransmitTime) { + RTC_LOG(LS_ERROR) + << "maxRetransmits and maxRetransmitTime should not be both set."; + return false; + } - switch (config_.open_handshake_role) { - case webrtc::InternalDataChannelInit::kNone: // pre-negotiated - handshake_state_ = kHandshakeReady; - break; - case webrtc::InternalDataChannelInit::kOpener: - handshake_state_ = kHandshakeShouldSendOpen; - break; - case webrtc::InternalDataChannelInit::kAcker: - handshake_state_ = kHandshakeShouldSendAck; - break; - } + switch (config_.open_handshake_role) { + case webrtc::InternalDataChannelInit::kNone: // pre-negotiated + handshake_state_ = kHandshakeReady; + break; + case webrtc::InternalDataChannelInit::kOpener: + handshake_state_ = kHandshakeShouldSendOpen; + break; + case webrtc::InternalDataChannelInit::kAcker: + handshake_state_ = kHandshakeShouldSendAck; + break; + } - // Try to connect to the transport in case the transport channel already - // exists. - OnTransportChannelCreated(); + // Try to connect to the transport in case the transport channel already + // exists. + OnTransportChannelCreated(); - // Checks if the transport is ready to send because the initial channel - // ready signal may have been sent before the DataChannel creation. - // This has to be done async because the upper layer objects (e.g. - // Chrome glue and WebKit) are not wired up properly until after this - // function returns. - if (provider_->ReadyToSendData()) { - invoker_.AsyncInvoke(RTC_FROM_HERE, rtc::Thread::Current(), - [this] { OnChannelReady(true); }); - } + // Checks if the transport is ready to send because the initial channel + // ready signal may have been sent before the DataChannel creation. + // This has to be done async because the upper layer objects (e.g. + // Chrome glue and WebKit) are not wired up properly until after this + // function returns. + if (provider_->ReadyToSendData()) { + invoker_.AsyncInvoke(RTC_FROM_HERE, rtc::Thread::Current(), + [this] { OnTransportReady(true); }); } return true; } -DataChannel::~DataChannel() { +SctpDataChannel::~SctpDataChannel() { RTC_DCHECK_RUN_ON(signaling_thread_); } -void DataChannel::RegisterObserver(DataChannelObserver* observer) { +void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) { RTC_DCHECK_RUN_ON(signaling_thread_); observer_ = observer; DeliverQueuedReceivedData(); } -void DataChannel::UnregisterObserver() { +void SctpDataChannel::UnregisterObserver() { RTC_DCHECK_RUN_ON(signaling_thread_); observer_ = nullptr; } -bool DataChannel::reliable() const { +bool SctpDataChannel::reliable() const { // May be called on any thread. - if (data_channel_type_ == cricket::DCT_RTP) { - return false; - } else { - return !config_.maxRetransmits && !config_.maxRetransmitTime; - } + return !config_.maxRetransmits && !config_.maxRetransmitTime; } -uint64_t DataChannel::buffered_amount() const { +uint64_t SctpDataChannel::buffered_amount() const { RTC_DCHECK_RUN_ON(signaling_thread_); return buffered_amount_; } -void DataChannel::Close() { +void SctpDataChannel::Close() { RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == kClosed) return; - send_ssrc_ = 0; - send_ssrc_set_ = false; SetState(kClosing); // Will send queued data before beginning the underlying closing procedure. UpdateState(); } -DataChannel::DataState DataChannel::state() const { +SctpDataChannel::DataState SctpDataChannel::state() const { RTC_DCHECK_RUN_ON(signaling_thread_); return state_; } -RTCError DataChannel::error() const { +RTCError SctpDataChannel::error() const { RTC_DCHECK_RUN_ON(signaling_thread_); return error_; } -uint32_t DataChannel::messages_sent() const { +uint32_t SctpDataChannel::messages_sent() const { RTC_DCHECK_RUN_ON(signaling_thread_); return messages_sent_; } -uint64_t DataChannel::bytes_sent() const { +uint64_t SctpDataChannel::bytes_sent() const { RTC_DCHECK_RUN_ON(signaling_thread_); return bytes_sent_; } -uint32_t DataChannel::messages_received() const { +uint32_t SctpDataChannel::messages_received() const { RTC_DCHECK_RUN_ON(signaling_thread_); return messages_received_; } -uint64_t DataChannel::bytes_received() const { +uint64_t SctpDataChannel::bytes_received() const { RTC_DCHECK_RUN_ON(signaling_thread_); return bytes_received_; } -bool DataChannel::Send(const DataBuffer& buffer) { +bool SctpDataChannel::Send(const DataBuffer& buffer) { RTC_DCHECK_RUN_ON(signaling_thread_); // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network // thread. Bring buffer management etc to the network thread and keep the @@ -371,49 +298,28 @@ bool DataChannel::Send(const DataBuffer& buffer) { // If the queue is non-empty, we're waiting for SignalReadyToSend, // so just add to the end of the queue and keep waiting. - // Only SCTP DataChannel queues the outgoing data when the transport is - // blocked. - if (IsSctpLike(data_channel_type_)) { - if (!queued_send_data_.Empty()) { - if (!QueueSendDataMessage(buffer)) { - RTC_LOG(LS_ERROR) - << "Closing the DataChannel due to a failure to queue " - "additional data."; - // https://w3c.github.io/webrtc-pc/#dom-rtcdatachannel-send step 5 - // Note that the spec doesn't explicitly say to close in this situation. - CloseAbruptlyWithError(RTCError(RTCErrorType::RESOURCE_EXHAUSTED, - "Unable to queue data for sending")); - } - return true; + if (!queued_send_data_.Empty()) { + if (!QueueSendDataMessage(buffer)) { + RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to queue " + "additional data."; + // https://w3c.github.io/webrtc-pc/#dom-rtcdatachannel-send step 5 + // Note that the spec doesn't explicitly say to close in this situation. + CloseAbruptlyWithError(RTCError(RTCErrorType::RESOURCE_EXHAUSTED, + "Unable to queue data for sending")); } + return true; } - bool success = SendDataMessage(buffer, true); - if (data_channel_type_ == cricket::DCT_RTP) { - return success; - } + SendDataMessage(buffer, true); // Always return true for SCTP DataChannel per the spec. return true; } -void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) { - RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK_EQ(data_channel_type_, cricket::DCT_RTP); - - if (receive_ssrc_set_) { - return; - } - receive_ssrc_ = receive_ssrc; - receive_ssrc_set_ = true; - UpdateState(); -} - -void DataChannel::SetSctpSid(int sid) { +void SctpDataChannel::SetSctpSid(int sid) { RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK_LT(config_.id, 0); RTC_DCHECK_GE(sid, 0); - RTC_DCHECK(IsSctpLike(data_channel_type_)); RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck); RTC_DCHECK_EQ(state_, kConnecting); @@ -425,10 +331,9 @@ void DataChannel::SetSctpSid(int sid) { provider_->AddSctpDataStream(sid); } -void DataChannel::OnClosingProcedureStartedRemotely(int sid) { +void SctpDataChannel::OnClosingProcedureStartedRemotely(int sid) { RTC_DCHECK_RUN_ON(signaling_thread_); - if (IsSctpLike(data_channel_type_) && sid == config_.id && - state_ != kClosing && state_ != kClosed) { + if (sid == config_.id && state_ != kClosing && state_ != kClosed) { // Don't bother sending queued data since the side that initiated the // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy // discussion about this. @@ -442,9 +347,9 @@ void DataChannel::OnClosingProcedureStartedRemotely(int sid) { } } -void DataChannel::OnClosingProcedureComplete(int sid) { +void SctpDataChannel::OnClosingProcedureComplete(int sid) { RTC_DCHECK_RUN_ON(signaling_thread_); - if (IsSctpLike(data_channel_type_) && sid == config_.id) { + if (sid == config_.id) { // If the closing procedure is complete, we should have finished sending // all pending data and transitioned to kClosing already. RTC_DCHECK_EQ(state_, kClosing); @@ -454,9 +359,8 @@ void DataChannel::OnClosingProcedureComplete(int sid) { } } -void DataChannel::OnTransportChannelCreated() { +void SctpDataChannel::OnTransportChannelCreated() { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(IsSctpLike(data_channel_type_)); if (!connected_to_provider_) { connected_to_provider_ = provider_->ConnectDataChannel(this); } @@ -467,7 +371,7 @@ void DataChannel::OnTransportChannelCreated() { } } -void DataChannel::OnTransportChannelClosed() { +void SctpDataChannel::OnTransportChannelClosed() { // The SctpTransport is unusable (for example, because the SCTP m= section // was rejected, or because the DTLS transport closed), so we need to close // abruptly. @@ -477,44 +381,22 @@ void DataChannel::OnTransportChannelClosed() { CloseAbruptlyWithError(std::move(error)); } -DataChannel::Stats DataChannel::GetStats() const { +DataChannelStats SctpDataChannel::GetStats() const { RTC_DCHECK_RUN_ON(signaling_thread_); - Stats stats{internal_id_, id(), label(), - protocol(), state(), messages_sent(), - messages_received(), bytes_sent(), bytes_received()}; + DataChannelStats stats{internal_id_, id(), label(), + protocol(), state(), messages_sent(), + messages_received(), bytes_sent(), bytes_received()}; return stats; } -// The remote peer request that this channel shall be closed. -void DataChannel::RemotePeerRequestClose() { - RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP); - // Close with error code explicitly set to OK. - CloseAbruptlyWithError(RTCError()); -} - -void DataChannel::SetSendSsrc(uint32_t send_ssrc) { +void SctpDataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& payload) { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP); - if (send_ssrc_set_) { - return; - } - send_ssrc_ = send_ssrc; - send_ssrc_set_ = true; - UpdateState(); -} - -void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& payload) { - RTC_DCHECK_RUN_ON(signaling_thread_); - if (data_channel_type_ == cricket::DCT_RTP && params.ssrc != receive_ssrc_) { - return; - } - if (IsSctpLike(data_channel_type_) && params.sid != config_.id) { + if (params.sid != config_.id) { return; } if (params.type == cricket::DMT_CONTROL) { - RTC_DCHECK(IsSctpLike(data_channel_type_)); if (handshake_state_ != kHandshakeWaitingForAck) { // Ignore it if we are not expecting an ACK message. RTC_LOG(LS_WARNING) @@ -559,11 +441,9 @@ void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size."; queued_received_data_.Clear(); - if (data_channel_type_ != cricket::DCT_RTP) { - CloseAbruptlyWithError( - RTCError(RTCErrorType::RESOURCE_EXHAUSTED, - "Queued received data exceeds the max buffer size.")); - } + CloseAbruptlyWithError( + RTCError(RTCErrorType::RESOURCE_EXHAUSTED, + "Queued received data exceeds the max buffer size.")); return; } @@ -571,7 +451,7 @@ void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, } } -void DataChannel::OnChannelReady(bool writable) { +void SctpDataChannel::OnTransportReady(bool writable) { RTC_DCHECK_RUN_ON(signaling_thread_); writable_ = writable; @@ -579,15 +459,13 @@ void DataChannel::OnChannelReady(bool writable) { return; } - if (IsSctpLike(data_channel_type_)) { - SendQueuedControlMessages(); - SendQueuedDataMessages(); - } + SendQueuedControlMessages(); + SendQueuedDataMessages(); UpdateState(); } -void DataChannel::CloseAbruptlyWithError(RTCError error) { +void SctpDataChannel::CloseAbruptlyWithError(RTCError error) { RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == kClosed) { @@ -601,10 +479,8 @@ void DataChannel::CloseAbruptlyWithError(RTCError error) { // Closing abruptly means any queued data gets thrown away. buffered_amount_ = 0; - if (IsSctpLike(data_channel_type_)) { - queued_send_data_.Clear(); - queued_control_data_.Clear(); - } + queued_send_data_.Clear(); + queued_control_data_.Clear(); // Still go to "kClosing" before "kClosed", since observers may be expecting // that. @@ -613,74 +489,19 @@ void DataChannel::CloseAbruptlyWithError(RTCError error) { SetState(kClosed); } -void DataChannel::CloseAbruptlyWithDataChannelFailure( +void SctpDataChannel::CloseAbruptlyWithDataChannelFailure( const std::string& message) { RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message); error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE); CloseAbruptlyWithError(std::move(error)); } -void DataChannel::UpdateState() { +void SctpDataChannel::UpdateState() { RTC_DCHECK_RUN_ON(signaling_thread_); - // UpdateState determines what to do from a few state variables. Include + // UpdateState determines what to do from a few state variables. Include // all conditions required for each state transition here for - // clarity. OnChannelReady(true) will send any queued data and then invoke + // clarity. OnTransportReady(true) will send any queued data and then invoke // UpdateState(). - if (data_channel_type_ == cricket::DCT_RTP) { - UpdateRtpState(); - } else { - UpdateSctpLikeState(); - } -} - -void DataChannel::UpdateRtpState() { - RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK_EQ(data_channel_type_, cricket::DCT_RTP); - - // UpdateState determines what to do from a few state variables. Include - // all conditions required for each state transition here for - // clarity. OnChannelReady(true) will send any queued data and then invoke - // UpdateState(). - switch (state_) { - case kConnecting: { - if (send_ssrc_set_ == receive_ssrc_set_) { - if (!connected_to_provider_) { - connected_to_provider_ = provider_->ConnectDataChannel(this); - } - if (connected_to_provider_) { - if (writable_ && (handshake_state_ == kHandshakeReady || - handshake_state_ == kHandshakeWaitingForAck)) { - SetState(kOpen); - // If we have received buffers before the channel got writable. - // Deliver them now. - DeliverQueuedReceivedData(); - } - } - } - break; - } - case kOpen: { - break; - } - case kClosing: { - // For RTP data channels, we can go to "closed" after we finish - // sending data and the send/recv SSRCs are unset. - if (connected_to_provider_) { - DisconnectFromProvider(); - } - if (!send_ssrc_set_ && !receive_ssrc_set_) { - SetState(kClosed); - } - break; - } - case kClosed: - break; - } -} - -void DataChannel::UpdateSctpLikeState() { - RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(IsSctpLike(data_channel_type_)); switch (state_) { case kConnecting: { @@ -728,7 +549,7 @@ void DataChannel::UpdateSctpLikeState() { } } -void DataChannel::SetState(DataState state) { +void SctpDataChannel::SetState(DataState state) { RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == state) { return; @@ -745,7 +566,7 @@ void DataChannel::SetState(DataState state) { } } -void DataChannel::DisconnectFromProvider() { +void SctpDataChannel::DisconnectFromProvider() { RTC_DCHECK_RUN_ON(signaling_thread_); if (!connected_to_provider_) return; @@ -754,7 +575,7 @@ void DataChannel::DisconnectFromProvider() { connected_to_provider_ = false; } -void DataChannel::DeliverQueuedReceivedData() { +void SctpDataChannel::DeliverQueuedReceivedData() { RTC_DCHECK_RUN_ON(signaling_thread_); if (!observer_) { return; @@ -768,9 +589,8 @@ void DataChannel::DeliverQueuedReceivedData() { } } -void DataChannel::SendQueuedDataMessages() { +void SctpDataChannel::SendQueuedDataMessages() { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(IsSctpLike(data_channel_type_)); if (queued_send_data_.Empty()) { return; } @@ -787,29 +607,25 @@ void DataChannel::SendQueuedDataMessages() { } } -bool DataChannel::SendDataMessage(const DataBuffer& buffer, - bool queue_if_blocked) { +bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, + bool queue_if_blocked) { RTC_DCHECK_RUN_ON(signaling_thread_); cricket::SendDataParams send_params; - if (IsSctpLike(data_channel_type_)) { - send_params.ordered = config_.ordered; - // Send as ordered if it is still going through OPEN/ACK signaling. - if (handshake_state_ != kHandshakeReady && !config_.ordered) { - send_params.ordered = true; - RTC_LOG(LS_VERBOSE) - << "Sending data as ordered for unordered DataChannel " - "because the OPEN_ACK message has not been received."; - } - - send_params.max_rtx_count = - config_.maxRetransmits ? *config_.maxRetransmits : -1; - send_params.max_rtx_ms = - config_.maxRetransmitTime ? *config_.maxRetransmitTime : -1; - send_params.sid = config_.id; - } else { - send_params.ssrc = send_ssrc_; + send_params.ordered = config_.ordered; + // Send as ordered if it is still going through OPEN/ACK signaling. + if (handshake_state_ != kHandshakeReady && !config_.ordered) { + send_params.ordered = true; + RTC_LOG(LS_VERBOSE) + << "Sending data as ordered for unordered DataChannel " + "because the OPEN_ACK message has not been received."; } + + send_params.max_rtx_count = + config_.maxRetransmits ? *config_.maxRetransmits : -1; + send_params.max_rtx_ms = + config_.maxRetransmitTime ? *config_.maxRetransmitTime : -1; + send_params.sid = config_.id; send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; cricket::SendDataResult send_result = cricket::SDR_SUCCESS; @@ -827,10 +643,6 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, return true; } - if (!IsSctpLike(data_channel_type_)) { - return false; - } - if (send_result == cricket::SDR_BLOCK) { if (!queue_if_blocked || QueueSendDataMessage(buffer)) { return false; @@ -847,9 +659,8 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, return false; } -bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { +bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(IsSctpLike(data_channel_type_)); size_t start_buffered_amount = queued_send_data_.byte_count(); if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) { RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; @@ -859,9 +670,8 @@ bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { return true; } -void DataChannel::SendQueuedControlMessages() { +void SctpDataChannel::SendQueuedControlMessages() { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(IsSctpLike(data_channel_type_)); PacketQueue control_packets; control_packets.Swap(&queued_control_data_); @@ -871,15 +681,14 @@ void DataChannel::SendQueuedControlMessages() { } } -void DataChannel::QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer) { +void SctpDataChannel::QueueControlMessage( + const rtc::CopyOnWriteBuffer& buffer) { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(IsSctpLike(data_channel_type_)); queued_control_data_.PushBack(std::make_unique(buffer, true)); } -bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { +bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(IsSctpLike(data_channel_type_)); RTC_DCHECK(writable_); RTC_DCHECK_GE(config_.id, 0); @@ -917,7 +726,7 @@ bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { } // static -void DataChannel::ResetInternalIdAllocatorForTesting(int new_value) { +void SctpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) { g_unique_id = new_value; } diff --git a/pc/data_channel.h b/pc/sctp_data_channel.h similarity index 60% rename from pc/data_channel.h rename to pc/sctp_data_channel.h index f6c5f819e6..871f18af5c 100644 --- a/pc/data_channel.h +++ b/pc/sctp_data_channel.h @@ -1,5 +1,5 @@ /* - * Copyright 2012 The WebRTC project authors. All Rights Reserved. + * Copyright 2020 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source @@ -8,10 +8,9 @@ * be found in the AUTHORS file in the root of the source tree. */ -#ifndef PC_DATA_CHANNEL_H_ -#define PC_DATA_CHANNEL_H_ +#ifndef PC_SCTP_DATA_CHANNEL_H_ +#define PC_SCTP_DATA_CHANNEL_H_ -#include #include #include #include @@ -21,27 +20,27 @@ #include "api/scoped_refptr.h" #include "api/transport/data_channel_transport_interface.h" #include "media/base/media_channel.h" -#include "pc/channel.h" +#include "pc/data_channel_utils.h" #include "rtc_base/async_invoker.h" +#include "rtc_base/ssl_stream_adapter.h" // For SSLRole #include "rtc_base/third_party/sigslot/sigslot.h" namespace webrtc { -class DataChannel; +class SctpDataChannel; -// TODO(deadbeef): Once RTP data channels go away, get rid of this and have -// DataChannel depend on SctpTransportInternal (pure virtual SctpTransport -// interface) instead. -class DataChannelProviderInterface { +// TODO(deadbeef): Get rid of this and have SctpDataChannel depend on +// SctpTransportInternal (pure virtual SctpTransport interface) instead. +class SctpDataChannelProviderInterface { public: // Sends the data to the transport. virtual bool SendData(const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) = 0; // Connects to the transport signals. - virtual bool ConnectDataChannel(DataChannel* data_channel) = 0; + virtual bool ConnectDataChannel(SctpDataChannel* data_channel) = 0; // Disconnects from the transport signals. - virtual void DisconnectDataChannel(DataChannel* data_channel) = 0; + virtual void DisconnectDataChannel(SctpDataChannel* data_channel) = 0; // Adds the data channel SID to the transport for SCTP. virtual void AddSctpDataStream(int sid) = 0; // Begins the closing procedure by sending an outgoing stream reset. Still @@ -51,7 +50,7 @@ class DataChannelProviderInterface { virtual bool ReadyToSendData() const = 0; protected: - virtual ~DataChannelProviderInterface() {} + virtual ~SctpDataChannelProviderInterface() {} }; // TODO(tommi): Change to not inherit from DataChannelInit but to have it as @@ -64,7 +63,7 @@ struct InternalDataChannelInit : public DataChannelInit { OpenHandshakeRole open_handshake_role; }; -// Helper class to allocate unique IDs for SCTP DataChannels +// Helper class to allocate unique IDs for SCTP DataChannels. class SctpSidAllocator { public: // Gets the first unused odd/even id based on the DTLS role. If |role| is @@ -86,21 +85,20 @@ class SctpSidAllocator { std::set used_sids_; }; -// DataChannel is an implementation of the DataChannelInterface based on -// libjingle's data engine. It provides an implementation of unreliable or -// reliabledata channels. Currently this class is specifically designed to use -// both RtpDataChannel and SctpTransport. +// SctpDataChannel is an implementation of the DataChannelInterface based on +// SctpTransport. It provides an implementation of unreliable or +// reliabledata channels. // DataChannel states: // kConnecting: The channel has been created the transport might not yet be // ready. -// kOpen: The channel have a local SSRC set by a call to UpdateSendSsrc -// and a remote SSRC set by call to UpdateReceiveSsrc and the transport -// has been writable once. -// kClosing: DataChannelInterface::Close has been called or UpdateReceiveSsrc -// has been called with SSRC==0 -// kClosed: Both UpdateReceiveSsrc and UpdateSendSsrc has been called with -// SSRC==0. +// kOpen: The open handshake has been performed (if relevant) and the data +// channel is able to send messages. +// kClosing: DataChannelInterface::Close has been called, or the remote side +// initiated the closing procedure, but the closing procedure has not +// yet finished. +// kClosed: The closing handshake is finished (possibly initiated from this, +// side, possibly from the peer). // // How the closing procedure works for SCTP: // 1. Alice calls Close(), state changes to kClosing. @@ -108,37 +106,23 @@ class SctpSidAllocator { // 3. Alice calls RemoveSctpDataStream, sends outgoing stream reset. // 4. Bob receives incoming stream reset; OnClosingProcedureStartedRemotely // called. -// 5. Bob sends outgoing stream reset. 6. Alice receives incoming reset, -// Bob receives acknowledgement. Both receive OnClosingProcedureComplete -// callback and transition to kClosed. -class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { +// 5. Bob sends outgoing stream reset. +// 6. Alice receives incoming reset, Bob receives acknowledgement. Both receive +// OnClosingProcedureComplete callback and transition to kClosed. +class SctpDataChannel : public DataChannelInterface, + public sigslot::has_slots<> { public: - struct Stats { - int internal_id; - int id; - std::string label; - std::string protocol; - DataState state; - uint32_t messages_sent; - uint32_t messages_received; - uint64_t bytes_sent; - uint64_t bytes_received; - }; - - static rtc::scoped_refptr Create( - DataChannelProviderInterface* provider, - cricket::DataChannelType dct, + static rtc::scoped_refptr Create( + SctpDataChannelProviderInterface* provider, const std::string& label, const InternalDataChannelInit& config, rtc::Thread* signaling_thread, rtc::Thread* network_thread); - // Instantiates an API proxy for a DataChannel instance that will be handed - // out to external callers. + // Instantiates an API proxy for a SctpDataChannel instance that will be + // handed out to external callers. static rtc::scoped_refptr CreateProxy( - rtc::scoped_refptr channel); - - static bool IsSctpLike(cricket::DataChannelType type); + rtc::scoped_refptr channel); void RegisterObserver(DataChannelObserver* observer) override; void UnregisterObserver() override; @@ -181,9 +165,7 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { bool Send(const DataBuffer& buffer) override; // Close immediately, ignoring any queued data or closing procedure. - // This is called for RTP data channels when SDP indicates a channel should - // be removed, or SCTP data channels when the underlying SctpTransport is - // being destroyed. + // This is called when the underlying SctpTransport is being destroyed. // It is also called by the PeerConnection if SCTP ID assignment fails. void CloseAbruptlyWithError(RTCError error); // Specializations of CloseAbruptlyWithError @@ -191,19 +173,19 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { void CloseAbruptlyWithSctpCauseCode(const std::string& message, uint16_t cause_code); - // Called when the channel's ready to use. That can happen when the - // underlying DataMediaChannel becomes ready, or when this channel is a new - // stream on an existing DataMediaChannel, and we've finished negotiation. - void OnChannelReady(bool writable); - // Slots for provider to connect signals to. + // + // TODO(deadbeef): Make these private once we're hooking up signals ourselves, + // instead of relying on SctpDataChannelProviderInterface. + + // Called when the SctpTransport's ready to use. That can happen when we've + // finished negotiation, or if the channel was created after negotiation has + // already finished. + void OnTransportReady(bool writable); + void OnDataReceived(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& payload); - /******************************************** - * The following methods are for SCTP only. * - ********************************************/ - // Sets the SCTP sid and adds to transport layer if not set yet. Should only // be called once. void SetSctpSid(int sid); @@ -222,69 +204,27 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { // to kClosed. void OnTransportChannelClosed(); - Stats GetStats() const; - - /******************************************* - * The following methods are for RTP only. * - *******************************************/ - - // The remote peer requested that this channel should be closed. - void RemotePeerRequestClose(); - // Set the SSRC this channel should use to send data on the - // underlying data engine. |send_ssrc| == 0 means that the channel is no - // longer part of the session negotiation. - void SetSendSsrc(uint32_t send_ssrc); - // Set the SSRC this channel should use to receive data from the - // underlying data engine. - void SetReceiveSsrc(uint32_t receive_ssrc); - - cricket::DataChannelType data_channel_type() const { - return data_channel_type_; - } + DataChannelStats GetStats() const; // Emitted when state transitions to kOpen. - sigslot::signal1 SignalOpened; + sigslot::signal1 SignalOpened; // Emitted when state transitions to kClosed. - // In the case of SCTP channels, this signal can be used to tell when the - // channel's sid is free. - sigslot::signal1 SignalClosed; + // This signal can be used to tell when the channel's sid is free. + sigslot::signal1 SignalClosed; // Reset the allocator for internal ID values for testing, so that // the internal IDs generated are predictable. Test only. static void ResetInternalIdAllocatorForTesting(int new_value); protected: - DataChannel(const InternalDataChannelInit& config, - DataChannelProviderInterface* client, - cricket::DataChannelType dct, - const std::string& label, - rtc::Thread* signaling_thread, - rtc::Thread* network_thread); - ~DataChannel() override; + SctpDataChannel(const InternalDataChannelInit& config, + SctpDataChannelProviderInterface* client, + const std::string& label, + rtc::Thread* signaling_thread, + rtc::Thread* network_thread); + ~SctpDataChannel() override; private: - // A packet queue which tracks the total queued bytes. Queued packets are - // owned by this class. - class PacketQueue final { - public: - size_t byte_count() const { return byte_count_; } - - bool Empty() const; - - std::unique_ptr PopFront(); - - void PushFront(std::unique_ptr packet); - void PushBack(std::unique_ptr packet); - - void Clear(); - - void Swap(PacketQueue* other); - - private: - std::deque> packets_; - size_t byte_count_ = 0; - }; - // The OPEN(_ACK) signaling state. enum HandshakeState { kHandshakeInit, @@ -296,8 +236,6 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { bool Init(); void UpdateState(); - void UpdateRtpState(); - void UpdateSctpLikeState(); void SetState(DataState state); void DisconnectFromProvider(); @@ -316,27 +254,24 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { const int internal_id_; const std::string label_; const InternalDataChannelInit config_; - DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_); - DataState state_ RTC_GUARDED_BY(signaling_thread_); + DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr; + DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting; RTCError error_ RTC_GUARDED_BY(signaling_thread_); - uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_); - uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_); - uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_); - uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_); + uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0; // Number of bytes of data that have been queued using Send(). Increased // before each transport send and decreased after each successful send. - uint64_t buffered_amount_ RTC_GUARDED_BY(signaling_thread_); - const cricket::DataChannelType data_channel_type_; - DataChannelProviderInterface* const provider_; - HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_); - bool connected_to_provider_ RTC_GUARDED_BY(signaling_thread_); - bool send_ssrc_set_ RTC_GUARDED_BY(signaling_thread_); - bool receive_ssrc_set_ RTC_GUARDED_BY(signaling_thread_); - bool writable_ RTC_GUARDED_BY(signaling_thread_); + uint64_t buffered_amount_ RTC_GUARDED_BY(signaling_thread_) = 0; + SctpDataChannelProviderInterface* const provider_ + RTC_GUARDED_BY(signaling_thread_); + HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_) = + kHandshakeInit; + bool connected_to_provider_ RTC_GUARDED_BY(signaling_thread_) = false; + bool writable_ RTC_GUARDED_BY(signaling_thread_) = false; // Did we already start the graceful SCTP closing procedure? bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false; - uint32_t send_ssrc_ RTC_GUARDED_BY(signaling_thread_); - uint32_t receive_ssrc_ RTC_GUARDED_BY(signaling_thread_); // Control messages that always have to get sent out before any queued // data. PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_); @@ -347,4 +282,4 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { } // namespace webrtc -#endif // PC_DATA_CHANNEL_H_ +#endif // PC_SCTP_DATA_CHANNEL_H_ diff --git a/pc/stats_collector.cc b/pc/stats_collector.cc index 317e4443d4..73d4510fa8 100644 --- a/pc/stats_collector.cc +++ b/pc/stats_collector.cc @@ -1146,7 +1146,7 @@ void StatsCollector::ExtractDataInfo() { rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; - std::vector data_stats = pc_->GetDataChannelStats(); + std::vector data_stats = pc_->GetDataChannelStats(); for (const auto& stats : data_stats) { StatsReport::Id id(StatsReport::NewTypedIntId( StatsReport::kStatsReportTypeDataChannel, stats.id)); diff --git a/pc/stats_collector_unittest.cc b/pc/stats_collector_unittest.cc index ab469729ae..a5666ff6b6 100644 --- a/pc/stats_collector_unittest.cc +++ b/pc/stats_collector_unittest.cc @@ -23,9 +23,9 @@ #include "call/call.h" #include "media/base/media_channel.h" #include "modules/audio_processing/include/audio_processing_statistics.h" -#include "pc/data_channel.h" #include "pc/media_stream.h" #include "pc/media_stream_track.h" +#include "pc/sctp_data_channel.h" #include "pc/test/fake_peer_connection_for_stats.h" #include "pc/test/fake_video_track_source.h" #include "pc/test/mock_rtp_receiver_internal.h" diff --git a/pc/test/fake_data_channel_provider.h b/pc/test/fake_data_channel_provider.h index 2ada4a992d..7145225ca6 100644 --- a/pc/test/fake_data_channel_provider.h +++ b/pc/test/fake_data_channel_provider.h @@ -13,10 +13,11 @@ #include -#include "pc/data_channel.h" +#include "pc/sctp_data_channel.h" #include "rtc_base/checks.h" -class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { +class FakeDataChannelProvider + : public webrtc::SctpDataChannelProviderInterface { public: FakeDataChannelProvider() : send_blocked_(false), @@ -44,7 +45,7 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { return true; } - bool ConnectDataChannel(webrtc::DataChannel* data_channel) override { + bool ConnectDataChannel(webrtc::SctpDataChannel* data_channel) override { RTC_CHECK(connected_channels_.find(data_channel) == connected_channels_.end()); if (!transport_available_) { @@ -55,7 +56,7 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { return true; } - void DisconnectDataChannel(webrtc::DataChannel* data_channel) override { + void DisconnectDataChannel(webrtc::SctpDataChannel* data_channel) override { RTC_CHECK(connected_channels_.find(data_channel) != connected_channels_.end()); RTC_LOG(LS_INFO) << "DataChannel disconnected " << data_channel; @@ -77,7 +78,7 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { recv_ssrcs_.erase(sid); // Unlike the real SCTP transport, act like the closing procedure finished // instantly, doing the same snapshot thing as below. - for (webrtc::DataChannel* ch : std::set( + for (webrtc::SctpDataChannel* ch : std::set( connected_channels_.begin(), connected_channels_.end())) { if (connected_channels_.count(ch)) { ch->OnClosingProcedureComplete(sid); @@ -93,12 +94,12 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { if (!blocked) { // Take a snapshot of the connected channels and check to see whether // each value is still in connected_channels_ before calling - // OnChannelReady(). This avoids problems where the set gets modified - // in response to OnChannelReady(). - for (webrtc::DataChannel* ch : std::set( + // OnTransportReady(). This avoids problems where the set gets modified + // in response to OnTransportReady(). + for (webrtc::SctpDataChannel* ch : std::set( connected_channels_.begin(), connected_channels_.end())) { if (connected_channels_.count(ch)) { - ch->OnChannelReady(true); + ch->OnTransportReady(true); } } } @@ -116,10 +117,10 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { RTC_CHECK(transport_available_); ready_to_send_ = ready; if (ready) { - std::set::iterator it; + std::set::iterator it; for (it = connected_channels_.begin(); it != connected_channels_.end(); ++it) { - (*it)->OnChannelReady(true); + (*it)->OnTransportReady(true); } } } @@ -130,7 +131,7 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { return last_send_data_params_; } - bool IsConnected(webrtc::DataChannel* data_channel) const { + bool IsConnected(webrtc::SctpDataChannel* data_channel) const { return connected_channels_.find(data_channel) != connected_channels_.end(); } @@ -148,7 +149,7 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { bool transport_available_; bool ready_to_send_; bool transport_error_; - std::set connected_channels_; + std::set connected_channels_; std::set send_ssrcs_; std::set recv_ssrcs_; }; diff --git a/pc/test/fake_peer_connection_base.h b/pc/test/fake_peer_connection_base.h index e1663e6d9f..9531c6de5b 100644 --- a/pc/test/fake_peer_connection_base.h +++ b/pc/test/fake_peer_connection_base.h @@ -248,8 +248,12 @@ class FakePeerConnectionBase : public PeerConnectionInternal { return {}; } - sigslot::signal1& SignalDataChannelCreated() override { - return SignalDataChannelCreated_; + sigslot::signal1& SignalRtpDataChannelCreated() override { + return SignalRtpDataChannelCreated_; + } + + sigslot::signal1& SignalSctpDataChannelCreated() override { + return SignalSctpDataChannelCreated_; } cricket::RtpDataChannel* rtp_data_channel() const override { return nullptr; } @@ -294,7 +298,8 @@ class FakePeerConnectionBase : public PeerConnectionInternal { } protected: - sigslot::signal1 SignalDataChannelCreated_; + sigslot::signal1 SignalRtpDataChannelCreated_; + sigslot::signal1 SignalSctpDataChannelCreated_; }; } // namespace webrtc diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index 175a1ede15..1a4c1a05d0 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -175,12 +175,12 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { void AddSctpDataChannel(const std::string& label, const InternalDataChannelInit& init) { // TODO(bugs.webrtc.org/11547): Supply a separate network thread. - AddSctpDataChannel(DataChannel::Create( - &data_channel_provider_, cricket::DCT_SCTP, label, init, - rtc::Thread::Current(), rtc::Thread::Current())); + AddSctpDataChannel(SctpDataChannel::Create(&data_channel_provider_, label, + init, rtc::Thread::Current(), + rtc::Thread::Current())); } - void AddSctpDataChannel(rtc::scoped_refptr data_channel) { + void AddSctpDataChannel(rtc::scoped_refptr data_channel) { sctp_data_channels_.push_back(data_channel); } @@ -259,9 +259,9 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { return transceivers_; } - std::vector GetDataChannelStats() const override { + std::vector GetDataChannelStats() const override { RTC_DCHECK_RUN_ON(signaling_thread()); - std::vector stats; + std::vector stats; for (const auto& channel : sctp_data_channels_) stats.push_back(channel->GetStats()); return stats; @@ -364,7 +364,7 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { std::unique_ptr voice_channel_; std::unique_ptr video_channel_; - std::vector> sctp_data_channels_; + std::vector> sctp_data_channels_; std::map transport_stats_by_name_; diff --git a/pc/test/mock_data_channel.h b/pc/test/mock_data_channel.h index bc5f94da5f..ab4b0073da 100644 --- a/pc/test/mock_data_channel.h +++ b/pc/test/mock_data_channel.h @@ -13,16 +13,23 @@ #include -#include "pc/data_channel.h" +#include "pc/sctp_data_channel.h" #include "test/gmock.h" namespace webrtc { -class MockDataChannel : public rtc::RefCountedObject { +class MockSctpDataChannel : public rtc::RefCountedObject { public: - MockDataChannel(int id, DataState state) - : MockDataChannel(id, "MockDataChannel", state, "udp", 0, 0, 0, 0) {} - MockDataChannel( + MockSctpDataChannel(int id, DataState state) + : MockSctpDataChannel(id, + "MockSctpDataChannel", + state, + "udp", + 0, + 0, + 0, + 0) {} + MockSctpDataChannel( int id, const std::string& label, DataState state, @@ -34,12 +41,11 @@ class MockDataChannel : public rtc::RefCountedObject { const InternalDataChannelInit& config = InternalDataChannelInit(), rtc::Thread* signaling_thread = rtc::Thread::Current(), rtc::Thread* network_thread = rtc::Thread::Current()) - : rtc::RefCountedObject(config, - nullptr, - cricket::DCT_NONE, - label, - signaling_thread, - network_thread) { + : rtc::RefCountedObject(config, + nullptr, + label, + signaling_thread, + network_thread) { EXPECT_CALL(*this, id()).WillRepeatedly(::testing::Return(id)); EXPECT_CALL(*this, state()).WillRepeatedly(::testing::Return(state)); EXPECT_CALL(*this, protocol()).WillRepeatedly(::testing::Return(protocol));