diff --git a/api/data_channel_transport_interface.cc b/api/data_channel_transport_interface.cc index e5d8fdda9d..d9947e2c97 100644 --- a/api/data_channel_transport_interface.cc +++ b/api/data_channel_transport_interface.cc @@ -31,4 +31,10 @@ RTCError DataChannelTransportInterface::CloseChannel(int channel_id) { void DataChannelTransportInterface::SetDataSink(DataChannelSink* /*sink*/) {} +bool DataChannelTransportInterface::IsReadyToSend() const { + return false; +} + +void DataChannelSink::OnReadyToSend() {} + } // namespace webrtc diff --git a/api/data_channel_transport_interface.h b/api/data_channel_transport_interface.h index a63abe0d89..a6825f69b9 100644 --- a/api/data_channel_transport_interface.h +++ b/api/data_channel_transport_interface.h @@ -77,6 +77,14 @@ class DataChannelSink { // procedure. Closing channels become closed after all pending data has been // transmitted. virtual void OnChannelClosed(int channel_id) = 0; + + // Callback issued when the data channel becomes ready to send. + // This callback will be issued immediately when the data channel sink is + // registered if the transport is ready at that time. This callback may be + // invoked again following send errors (eg. due to the transport being + // temporarily blocked or unavailable). + // TODO(mellem): Make pure virtual when downstream sinks override this. + virtual void OnReadyToSend(); }; // Transport for data channels. @@ -104,6 +112,12 @@ class DataChannelTransportInterface { // transport is destroyed, the sink must be unregistered by setting it to // nullptr. virtual void SetDataSink(DataChannelSink* sink); + + // Returns whether this data channel transport is ready to send. + // Note: the default implementation always returns false (as it assumes no one + // has implemented the interface). This default implementation is temporary. + // TODO(mellem): Change this to pure virtual. + virtual bool IsReadyToSend() const; }; } // namespace webrtc diff --git a/api/peer_connection_interface.h b/api/peer_connection_interface.h index c6f68da0ad..4ade0b3e2a 100644 --- a/api/peer_connection_interface.h +++ b/api/peer_connection_interface.h @@ -631,6 +631,12 @@ class RTC_EXPORT PeerConnectionInterface : public rtc::RefCountInterface { // MediaTransportFactory wasn't provided. absl::optional use_datagram_transport; + // If MediaTransportFactory is provided in PeerConnectionFactory, this flag + // informs PeerConnection that it should use the DatagramTransport's + // implementation of DataChannelTransportInterface for data channels instead + // of SCTP-DTLS. + absl::optional use_datagram_transport_for_data_channels; + // Defines advanced optional cryptographic settings related to SRTP and // frame encryption for native WebRTC. Setting this will overwrite any // settings set in PeerConnectionFactory (which is deprecated). diff --git a/api/test/fake_datagram_transport.h b/api/test/fake_datagram_transport.h index a73a7e8d1c..9a1ddef12e 100644 --- a/api/test/fake_datagram_transport.h +++ b/api/test/fake_datagram_transport.h @@ -55,6 +55,8 @@ class FakeDatagramTransport : public DatagramTransportInterface { void SetDatagramSink(DatagramSinkInterface* sink) override {} + bool IsReadyToSend() const override { return false; } + std::string GetTransportParameters() const override { if (settings_.remote_transport_parameters) { return *settings_.remote_transport_parameters; diff --git a/api/test/fake_media_transport.h b/api/test/fake_media_transport.h index 38b94c9143..025965b737 100644 --- a/api/test/fake_media_transport.h +++ b/api/test/fake_media_transport.h @@ -74,6 +74,8 @@ class FakeMediaTransport : public MediaTransportInterface { void SetDataSink(DataChannelSink* sink) override {} + bool IsReadyToSend() const override { return false; } + void SetMediaTransportStateCallback( MediaTransportStateCallback* callback) override { state_callback_ = callback; diff --git a/api/test/loopback_media_transport.cc b/api/test/loopback_media_transport.cc index 4e8fb0e099..8c7f240ee4 100644 --- a/api/test/loopback_media_transport.cc +++ b/api/test/loopback_media_transport.cc @@ -86,6 +86,8 @@ class WrapperMediaTransport : public MediaTransportInterface { wrapped_->SetDataSink(sink); } + bool IsReadyToSend() const override { return wrapped_->IsReadyToSend(); } + void SetAllocatedBitrateLimits( const MediaTransportAllocatedBitrateLimits& limits) override {} @@ -97,11 +99,74 @@ class WrapperMediaTransport : public MediaTransportInterface { MediaTransportInterface* wrapped_; }; +class WrapperDatagramTransport : public DatagramTransportInterface { + public: + explicit WrapperDatagramTransport(DatagramTransportInterface* wrapped) + : wrapped_(wrapped) {} + + // Datagram transport overrides. + void Connect(rtc::PacketTransportInternal* packet_transport) override { + return wrapped_->Connect(packet_transport); + } + + CongestionControlInterface* congestion_control() override { + return wrapped_->congestion_control(); + } + + void SetTransportStateCallback( + MediaTransportStateCallback* callback) override { + return wrapped_->SetTransportStateCallback(callback); + } + + RTCError SendDatagram(rtc::ArrayView data, + DatagramId datagram_id) override { + return wrapped_->SendDatagram(data, datagram_id); + } + + size_t GetLargestDatagramSize() const override { + return wrapped_->GetLargestDatagramSize(); + } + + void SetDatagramSink(DatagramSinkInterface* sink) override { + return wrapped_->SetDatagramSink(sink); + } + + std::string GetTransportParameters() const override { + return wrapped_->GetTransportParameters(); + } + + // Data channel overrides. + RTCError OpenChannel(int channel_id) override { + return wrapped_->OpenChannel(channel_id); + } + + RTCError SendData(int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) override { + return wrapped_->SendData(channel_id, params, buffer); + } + + RTCError CloseChannel(int channel_id) override { + return wrapped_->CloseChannel(channel_id); + } + + void SetDataSink(DataChannelSink* sink) override { + wrapped_->SetDataSink(sink); + } + + bool IsReadyToSend() const override { return wrapped_->IsReadyToSend(); } + + private: + DatagramTransportInterface* wrapped_; +}; + } // namespace WrapperMediaTransportFactory::WrapperMediaTransportFactory( - MediaTransportInterface* wrapped) - : wrapped_(wrapped) {} + MediaTransportInterface* wrapped_media_transport, + DatagramTransportInterface* wrapped_datagram_transport) + : wrapped_media_transport_(wrapped_media_transport), + wrapped_datagram_transport_(wrapped_datagram_transport) {} WrapperMediaTransportFactory::WrapperMediaTransportFactory( MediaTransportFactory* wrapped) @@ -117,7 +182,19 @@ WrapperMediaTransportFactory::CreateMediaTransport( return wrapped_factory_->CreateMediaTransport(packet_transport, network_thread, settings); } - return {absl::make_unique(wrapped_)}; + return {absl::make_unique(wrapped_media_transport_)}; +} + +RTCErrorOr> +WrapperMediaTransportFactory::CreateDatagramTransport( + rtc::Thread* network_thread, + const MediaTransportSettings& settings) { + created_transport_count_++; + if (wrapped_factory_) { + return wrapped_factory_->CreateDatagramTransport(network_thread, settings); + } + return { + absl::make_unique(wrapped_datagram_transport_)}; } std::string WrapperMediaTransportFactory::GetTransportName() const { @@ -139,21 +216,41 @@ WrapperMediaTransportFactory::CreateMediaTransport( if (wrapped_factory_) { return wrapped_factory_->CreateMediaTransport(network_thread, settings); } - return {absl::make_unique(wrapped_)}; + return {absl::make_unique(wrapped_media_transport_)}; } MediaTransportPair::MediaTransportPair(rtc::Thread* thread) - : first_(thread, &second_), - second_(thread, &first_), - first_factory_(&first_), - second_factory_(&second_) {} + : first_(thread), + second_(thread), + first_datagram_transport_(thread), + second_datagram_transport_(thread), + first_factory_(&first_, &first_datagram_transport_), + second_factory_(&second_, &second_datagram_transport_) { + first_.Connect(&second_); + second_.Connect(&first_); + first_datagram_transport_.Connect(&second_datagram_transport_); + second_datagram_transport_.Connect(&first_datagram_transport_); +} MediaTransportPair::~MediaTransportPair() = default; +MediaTransportPair::LoopbackDataChannelTransport::LoopbackDataChannelTransport( + rtc::Thread* thread) + : thread_(thread) {} + +MediaTransportPair::LoopbackDataChannelTransport:: + ~LoopbackDataChannelTransport() { + RTC_CHECK(data_sink_ == nullptr); +} + +void MediaTransportPair::LoopbackDataChannelTransport::Connect( + LoopbackDataChannelTransport* other) { + other_ = other; +} + MediaTransportPair::LoopbackMediaTransport::LoopbackMediaTransport( - rtc::Thread* thread, - LoopbackMediaTransport* other) - : thread_(thread), other_(other) { + rtc::Thread* thread) + : dc_transport_(thread), thread_(thread), other_(nullptr) { RTC_LOG(LS_INFO) << "LoopbackMediaTransport"; } @@ -162,11 +259,19 @@ MediaTransportPair::LoopbackMediaTransport::~LoopbackMediaTransport() { rtc::CritScope lock(&sink_lock_); RTC_CHECK(audio_sink_ == nullptr); RTC_CHECK(video_sink_ == nullptr); - RTC_CHECK(data_sink_ == nullptr); RTC_CHECK(target_transfer_rate_observers_.empty()); RTC_CHECK(rtt_observers_.empty()); } +void MediaTransportPair::LoopbackMediaTransport::Connect( + LoopbackMediaTransport* other) { + other_ = other; + dc_transport_.Connect(&other->dc_transport_); +} + +void MediaTransportPair::LoopbackMediaTransport::Connect( + rtc::PacketTransportInternal* packet_transport) {} + absl::optional MediaTransportPair::LoopbackMediaTransport::GetTransportParametersOffer() const { @@ -322,6 +427,12 @@ void MediaTransportPair::LoopbackMediaTransport::SetMediaTransportStateCallback( RTCError MediaTransportPair::LoopbackMediaTransport::OpenChannel( int channel_id) { // No-op. No need to open channels for the loopback. + return dc_transport_.OpenChannel(channel_id); +} + +RTCError MediaTransportPair::LoopbackDataChannelTransport::OpenChannel( + int channel_id) { + // No-op. No need to open channels for the loopback. return RTCError::OK(); } @@ -329,6 +440,13 @@ RTCError MediaTransportPair::LoopbackMediaTransport::SendData( int channel_id, const SendDataParams& params, const rtc::CopyOnWriteBuffer& buffer) { + return dc_transport_.SendData(channel_id, params, buffer); +} + +RTCError MediaTransportPair::LoopbackDataChannelTransport::SendData( + int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) { invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, [this, channel_id, params, buffer] { other_->OnData(channel_id, params.type, buffer); @@ -338,6 +456,11 @@ RTCError MediaTransportPair::LoopbackMediaTransport::SendData( RTCError MediaTransportPair::LoopbackMediaTransport::CloseChannel( int channel_id) { + return dc_transport_.CloseChannel(channel_id); +} + +RTCError MediaTransportPair::LoopbackDataChannelTransport::CloseChannel( + int channel_id) { invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, [this, channel_id] { other_->OnRemoteCloseChannel(channel_id); rtc::CritScope lock(&sink_lock_); @@ -350,9 +473,27 @@ RTCError MediaTransportPair::LoopbackMediaTransport::CloseChannel( void MediaTransportPair::LoopbackMediaTransport::SetDataSink( DataChannelSink* sink) { + dc_transport_.SetDataSink(sink); +} + +bool MediaTransportPair::LoopbackMediaTransport::IsReadyToSend() const { + return dc_transport_.IsReadyToSend(); +} + +void MediaTransportPair::LoopbackDataChannelTransport::SetDataSink( + DataChannelSink* sink) { rtc::CritScope lock(&sink_lock_); data_sink_ = sink; + if (data_sink_ && ready_to_send_) { + data_sink_->OnReadyToSend(); + } } + +bool MediaTransportPair::LoopbackDataChannelTransport::IsReadyToSend() const { + rtc::CritScope lock(&sink_lock_); + return ready_to_send_; +} + void MediaTransportPair::LoopbackMediaTransport::SetState( MediaTransportState state) { invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, [this, state] { @@ -364,6 +505,11 @@ void MediaTransportPair::LoopbackMediaTransport::SetState( void MediaTransportPair::LoopbackMediaTransport::FlushAsyncInvokes() { invoker_.Flush(thread_); + dc_transport_.FlushAsyncInvokes(); +} + +void MediaTransportPair::LoopbackDataChannelTransport::FlushAsyncInvokes() { + invoker_.Flush(thread_); } MediaTransportPair::Stats @@ -402,7 +548,7 @@ void MediaTransportPair::LoopbackMediaTransport::OnData( } } -void MediaTransportPair::LoopbackMediaTransport::OnData( +void MediaTransportPair::LoopbackDataChannelTransport::OnData( int channel_id, DataMessageType type, const rtc::CopyOnWriteBuffer& buffer) { @@ -420,7 +566,7 @@ void MediaTransportPair::LoopbackMediaTransport::OnKeyFrameRequested( } } -void MediaTransportPair::LoopbackMediaTransport::OnRemoteCloseChannel( +void MediaTransportPair::LoopbackDataChannelTransport::OnRemoteCloseChannel( int channel_id) { rtc::CritScope lock(&sink_lock_); if (data_sink_) { @@ -434,9 +580,97 @@ void MediaTransportPair::LoopbackMediaTransport::OnStateChanged() { if (state_callback_) { state_callback_->OnStateChanged(state_); } + + dc_transport_.OnReadyToSend(state_ == MediaTransportState::kWritable); +} + +void MediaTransportPair::LoopbackDataChannelTransport::OnReadyToSend( + bool ready_to_send) { + invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, [this, ready_to_send] { + rtc::CritScope lock(&sink_lock_); + ready_to_send_ = ready_to_send; + // Propagate state to data channel sink, if present. + if (data_sink_ && ready_to_send_) { + data_sink_->OnReadyToSend(); + } + }); } void MediaTransportPair::LoopbackMediaTransport::SetAllocatedBitrateLimits( const MediaTransportAllocatedBitrateLimits& limits) {} +MediaTransportPair::LoopbackDatagramTransport::LoopbackDatagramTransport( + rtc::Thread* thread) + : dc_transport_(thread) {} + +void MediaTransportPair::LoopbackDatagramTransport::Connect( + LoopbackDatagramTransport* other) { + dc_transport_.Connect(&other->dc_transport_); +} + +void MediaTransportPair::LoopbackDatagramTransport::Connect( + rtc::PacketTransportInternal* packet_transport) {} + +CongestionControlInterface* +MediaTransportPair::LoopbackDatagramTransport::congestion_control() { + return nullptr; +} + +void MediaTransportPair::LoopbackDatagramTransport::SetTransportStateCallback( + MediaTransportStateCallback* callback) {} + +RTCError MediaTransportPair::LoopbackDatagramTransport::SendDatagram( + rtc::ArrayView data, + DatagramId datagram_id) { + return RTCError::OK(); +} + +size_t MediaTransportPair::LoopbackDatagramTransport::GetLargestDatagramSize() + const { + return 0; +} + +void MediaTransportPair::LoopbackDatagramTransport::SetDatagramSink( + DatagramSinkInterface* sink) {} + +std::string +MediaTransportPair::LoopbackDatagramTransport::GetTransportParameters() const { + return transport_parameters_; +} + +RTCError MediaTransportPair::LoopbackDatagramTransport::OpenChannel( + int channel_id) { + return dc_transport_.OpenChannel(channel_id); +} + +RTCError MediaTransportPair::LoopbackDatagramTransport::SendData( + int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) { + return dc_transport_.SendData(channel_id, params, buffer); +} + +RTCError MediaTransportPair::LoopbackDatagramTransport::CloseChannel( + int channel_id) { + return dc_transport_.CloseChannel(channel_id); +} + +void MediaTransportPair::LoopbackDatagramTransport::SetDataSink( + DataChannelSink* sink) { + dc_transport_.SetDataSink(sink); +} + +bool MediaTransportPair::LoopbackDatagramTransport::IsReadyToSend() const { + return dc_transport_.IsReadyToSend(); +} + +void MediaTransportPair::LoopbackDatagramTransport::SetState( + MediaTransportState state) { + dc_transport_.OnReadyToSend(state == MediaTransportState::kWritable); +} + +void MediaTransportPair::LoopbackDatagramTransport::FlushAsyncInvokes() { + dc_transport_.FlushAsyncInvokes(); +} + } // namespace webrtc diff --git a/api/test/loopback_media_transport.h b/api/test/loopback_media_transport.h index 2972b49e0e..cc66d627ae 100644 --- a/api/test/loopback_media_transport.h +++ b/api/test/loopback_media_transport.h @@ -17,6 +17,7 @@ #include #include "absl/memory/memory.h" +#include "api/datagram_transport_interface.h" #include "api/media_transport_interface.h" #include "rtc_base/async_invoker.h" #include "rtc_base/critical_section.h" @@ -42,7 +43,9 @@ namespace webrtc { // CreateMediaTransport(); class WrapperMediaTransportFactory : public MediaTransportFactory { public: - explicit WrapperMediaTransportFactory(MediaTransportInterface* wrapped); + WrapperMediaTransportFactory( + MediaTransportInterface* wrapped_media_transport, + DatagramTransportInterface* wrapped_datagram_transport); explicit WrapperMediaTransportFactory(MediaTransportFactory* wrapped); RTCErrorOr> CreateMediaTransport( @@ -54,12 +57,17 @@ class WrapperMediaTransportFactory : public MediaTransportFactory { rtc::Thread* network_thread, const MediaTransportSettings& settings) override; + RTCErrorOr> + CreateDatagramTransport(rtc::Thread* network_thread, + const MediaTransportSettings& settings) override; + std::string GetTransportName() const override; int created_transport_count() const; private: - MediaTransportInterface* wrapped_; + MediaTransportInterface* wrapped_media_transport_ = nullptr; + DatagramTransportInterface* wrapped_datagram_transport_ = nullptr; MediaTransportFactory* wrapped_factory_ = nullptr; int created_transport_count_ = 0; }; @@ -82,6 +90,13 @@ class MediaTransportPair { MediaTransportInterface* first() { return &first_; } MediaTransportInterface* second() { return &second_; } + DatagramTransportInterface* first_datagram_transport() { + return &first_datagram_transport_; + } + DatagramTransportInterface* second_datagram_transport() { + return &second_datagram_transport_; + } + std::unique_ptr first_factory() { return absl::make_unique(&first_factory_); } @@ -93,6 +108,12 @@ class MediaTransportPair { void SetState(MediaTransportState state) { first_.SetState(state); second_.SetState(state); + first_datagram_transport_.SetState(state); + second_datagram_transport_.SetState(state); + } + + void SetFirstDatagramTransportParameters(const std::string& params) { + first_datagram_transport_.set_transport_parameters(params); } void FlushAsyncInvokes() { @@ -112,12 +133,58 @@ class MediaTransportPair { } private: + class LoopbackDataChannelTransport : public DataChannelTransportInterface { + public: + explicit LoopbackDataChannelTransport(rtc::Thread* thread); + ~LoopbackDataChannelTransport() override; + + void Connect(LoopbackDataChannelTransport* other); + + RTCError OpenChannel(int channel_id) override; + + RTCError SendData(int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) override; + + RTCError CloseChannel(int channel_id) override; + + bool IsReadyToSend() const override; + + void SetDataSink(DataChannelSink* sink) override; + + void OnReadyToSend(bool ready_to_send); + + void FlushAsyncInvokes(); + + private: + void OnData(int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer); + + void OnRemoteCloseChannel(int channel_id); + + rtc::Thread* const thread_; + rtc::CriticalSection sink_lock_; + DataChannelSink* data_sink_ RTC_GUARDED_BY(sink_lock_) = nullptr; + + bool ready_to_send_ RTC_GUARDED_BY(sink_lock_) = false; + + LoopbackDataChannelTransport* other_; + + rtc::AsyncInvoker invoker_; + }; + class LoopbackMediaTransport : public MediaTransportInterface { public: - LoopbackMediaTransport(rtc::Thread* thread, LoopbackMediaTransport* other); + explicit LoopbackMediaTransport(rtc::Thread* thread); ~LoopbackMediaTransport() override; + // Connects this loopback transport to another loopback transport. + void Connect(LoopbackMediaTransport* other); + + void Connect(rtc::PacketTransportInternal* transport) override; + RTCError SendAudioFrame(uint64_t channel_id, MediaTransportEncodedAudioFrame frame) override; @@ -146,6 +213,8 @@ class MediaTransportPair { void SetMediaTransportStateCallback( MediaTransportStateCallback* callback) override; + void SetState(MediaTransportState state); + RTCError OpenChannel(int channel_id) override; RTCError SendData(int channel_id, @@ -156,7 +225,7 @@ class MediaTransportPair { void SetDataSink(DataChannelSink* sink) override; - void SetState(MediaTransportState state); + bool IsReadyToSend() const override; void FlushAsyncInvokes(); @@ -172,16 +241,13 @@ class MediaTransportPair { void OnData(uint64_t channel_id, MediaTransportEncodedVideoFrame frame); - void OnData(int channel_id, - DataMessageType type, - const rtc::CopyOnWriteBuffer& buffer); - void OnKeyFrameRequested(int channel_id); - void OnRemoteCloseChannel(int channel_id); - void OnStateChanged() RTC_RUN_ON(thread_); + // Implementation of the data channel transport. + LoopbackDataChannelTransport dc_transport_; + rtc::Thread* const thread_; rtc::CriticalSection sink_lock_; rtc::CriticalSection stats_lock_; @@ -190,7 +256,6 @@ class MediaTransportPair { nullptr; MediaTransportVideoSinkInterface* video_sink_ RTC_GUARDED_BY(sink_lock_) = nullptr; - DataChannelSink* data_sink_ RTC_GUARDED_BY(sink_lock_) = nullptr; MediaTransportKeyFrameRequestCallback* key_frame_callback_ RTC_GUARDED_BY(sink_lock_) = nullptr; @@ -206,15 +271,58 @@ class MediaTransportPair { MediaTransportState state_ RTC_GUARDED_BY(thread_) = MediaTransportState::kPending; - LoopbackMediaTransport* const other_; + LoopbackMediaTransport* other_; Stats stats_ RTC_GUARDED_BY(stats_lock_); rtc::AsyncInvoker invoker_; }; + class LoopbackDatagramTransport : public DatagramTransportInterface { + public: + explicit LoopbackDatagramTransport(rtc::Thread* thread); + + void Connect(LoopbackDatagramTransport* other); + + // Datagram transport overrides. + // TODO(mellem): Implement these when tests actually need to use them. + void Connect(rtc::PacketTransportInternal* packet_transport) override; + CongestionControlInterface* congestion_control() override; + void SetTransportStateCallback( + MediaTransportStateCallback* callback) override; + RTCError SendDatagram(rtc::ArrayView data, + DatagramId datagram_id) override; + size_t GetLargestDatagramSize() const override; + void SetDatagramSink(DatagramSinkInterface* sink) override; + std::string GetTransportParameters() const override; + + // Data channel overrides. + RTCError OpenChannel(int channel_id) override; + RTCError SendData(int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) override; + RTCError CloseChannel(int channel_id) override; + void SetDataSink(DataChannelSink* sink) override; + bool IsReadyToSend() const override; + + // Loopback-specific functionality. + void SetState(MediaTransportState state); + void FlushAsyncInvokes(); + + void set_transport_parameters(const std::string& value) { + transport_parameters_ = value; + } + + private: + LoopbackDataChannelTransport dc_transport_; + + std::string transport_parameters_; + }; + LoopbackMediaTransport first_; LoopbackMediaTransport second_; + LoopbackDatagramTransport first_datagram_transport_; + LoopbackDatagramTransport second_datagram_transport_; WrapperMediaTransportFactory first_factory_; WrapperMediaTransportFactory second_factory_; }; diff --git a/api/test/loopback_media_transport_unittest.cc b/api/test/loopback_media_transport_unittest.cc index d1351c5935..346ac5faeb 100644 --- a/api/test/loopback_media_transport_unittest.cc +++ b/api/test/loopback_media_transport_unittest.cc @@ -44,6 +44,7 @@ class MockDataChannelSink : public DataChannelSink { void(int, DataMessageType, const rtc::CopyOnWriteBuffer&)); MOCK_METHOD1(OnChannelClosing, void(int)); MOCK_METHOD1(OnChannelClosed, void(int)); + MOCK_METHOD0(OnReadyToSend, void()); }; class MockStateCallback : public MediaTransportStateCallback { @@ -203,8 +204,8 @@ TEST(LoopbackMediaTransport, InitialStateDeliveredWhenCallbackSet) { MediaTransportPair transport_pair(thread.get()); MockStateCallback state_callback; - EXPECT_CALL(state_callback, OnStateChanged(MediaTransportState::kPending)); + transport_pair.first()->SetMediaTransportStateCallback(&state_callback); transport_pair.FlushAsyncInvokes(); } @@ -238,4 +239,47 @@ TEST(LoopbackMediaTransport, StateChangeDeliveredToCallback) { transport_pair.FlushAsyncInvokes(); } +TEST(LoopbackMediaTransport, NotReadyToSendWhenDataSinkSet) { + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); + + MockDataChannelSink data_channel_sink; + EXPECT_CALL(data_channel_sink, OnReadyToSend()).Times(0); + + transport_pair.first()->SetDataSink(&data_channel_sink); + transport_pair.FlushAsyncInvokes(); + transport_pair.first()->SetDataSink(nullptr); +} + +TEST(LoopbackMediaTransport, ReadyToSendWhenDataSinkSet) { + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); + + transport_pair.SetState(MediaTransportState::kWritable); + transport_pair.FlushAsyncInvokes(); + + MockDataChannelSink data_channel_sink; + EXPECT_CALL(data_channel_sink, OnReadyToSend()); + + transport_pair.first()->SetDataSink(&data_channel_sink); + transport_pair.FlushAsyncInvokes(); + transport_pair.first()->SetDataSink(nullptr); +} + +TEST(LoopbackMediaTransport, StateChangeDeliveredToDataSink) { + std::unique_ptr thread = rtc::Thread::Create(); + thread->Start(); + MediaTransportPair transport_pair(thread.get()); + + MockDataChannelSink data_channel_sink; + EXPECT_CALL(data_channel_sink, OnReadyToSend()); + + transport_pair.first()->SetDataSink(&data_channel_sink); + transport_pair.SetState(MediaTransportState::kWritable); + transport_pair.FlushAsyncInvokes(); + transport_pair.first()->SetDataSink(nullptr); +} + } // namespace webrtc diff --git a/media/base/media_engine.h b/media/base/media_engine.h index bf4fd3cc25..e53c89d1a5 100644 --- a/media/base/media_engine.h +++ b/media/base/media_engine.h @@ -147,7 +147,18 @@ enum DataChannelType { DCT_NONE = 0, DCT_RTP = 1, DCT_SCTP = 2, - DCT_MEDIA_TRANSPORT = 3 + + // Data channel transport over media transport. + DCT_MEDIA_TRANSPORT = 3, + + // Data channel transport over datagram transport (with no fallback). This is + // the same behavior as data channel transport over media transport, and is + // usable without DTLS. + DCT_DATA_CHANNEL_TRANSPORT = 4, + + // Data channel transport over datagram transport (with SCTP negotiation + // semantics and a fallback to SCTP). Only usable with DTLS. + DCT_DATA_CHANNEL_TRANSPORT_SCTP = 5, }; class DataEngineInterface { diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 33b6fa2753..c3d8d4bd17 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -238,6 +238,7 @@ rtc_static_library("peerconnection") { "../rtc_base:rtc_base_approved", "../rtc_base:safe_minmax", "../rtc_base/experiments:field_trial_parser", + "../rtc_base/system:fallthrough", "../rtc_base/system:file_wrapper", "../rtc_base/system:rtc_export", "../rtc_base/third_party/base64", diff --git a/pc/data_channel.cc b/pc/data_channel.cc index f4a3818624..586520b6e0 100644 --- a/pc/data_channel.cc +++ b/pc/data_channel.cc @@ -148,7 +148,9 @@ rtc::scoped_refptr DataChannel::Create( } bool DataChannel::IsSctpLike(cricket::DataChannelType type) { - return type == cricket::DCT_SCTP || type == cricket::DCT_MEDIA_TRANSPORT; + return type == cricket::DCT_SCTP || type == cricket::DCT_MEDIA_TRANSPORT || + type == cricket::DCT_DATA_CHANNEL_TRANSPORT || + type == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP; } DataChannel::DataChannel(DataChannelProviderInterface* provider, diff --git a/pc/jsep_transport.cc b/pc/jsep_transport.cc index e710f09fa9..519c6fcfd1 100644 --- a/pc/jsep_transport.cc +++ b/pc/jsep_transport.cc @@ -246,7 +246,7 @@ webrtc::RTCError JsepTransport::SetLocalJsepTransportDescription( // If PRANSWER/ANSWER is set, we should decide transport protocol type. if (type == SdpType::kPrAnswer || type == SdpType::kAnswer) { error = NegotiateAndSetDtlsParameters(type); - NegotiateRtpTransport(type); + NegotiateDatagramTransport(type); } if (!error.ok()) { local_description_.reset(); @@ -317,7 +317,7 @@ webrtc::RTCError JsepTransport::SetRemoteJsepTransportDescription( // If PRANSWER/ANSWER is set, we should decide transport protocol type. if (type == SdpType::kPrAnswer || type == SdpType::kAnswer) { error = NegotiateAndSetDtlsParameters(SdpType::kOffer); - NegotiateRtpTransport(type); + NegotiateDatagramTransport(type); } if (!error.ok()) { remote_description_.reset(); @@ -766,7 +766,7 @@ void JsepTransport::OnStateChanged(webrtc::MediaTransportState state) { SignalMediaTransportStateChanged(); } -void JsepTransport::NegotiateRtpTransport(SdpType type) { +void JsepTransport::NegotiateDatagramTransport(SdpType type) { RTC_DCHECK(type == SdpType::kAnswer || type == SdpType::kPrAnswer); rtc::CritScope lock(&accessor_lock_); if (!composite_rtp_transport_) { @@ -778,6 +778,8 @@ void JsepTransport::NegotiateRtpTransport(SdpType type) { remote_description_->transport_desc.opaque_parameters == local_description_->transport_desc.opaque_parameters; + // A provisional or full or answer lets the peer start sending on one of the + // transports. if (use_datagram_transport) { RTC_LOG(INFO) << "Datagram transport provisionally activated"; composite_rtp_transport_->SetSendTransport(datagram_rtp_transport_.get()); @@ -789,11 +791,19 @@ void JsepTransport::NegotiateRtpTransport(SdpType type) { if (type != SdpType::kAnswer) { // A provisional answer lets the peer start sending on the chosen // transport, but does not allow it to destroy other transports yet. + SignalDataChannelTransportNegotiated( + this, use_datagram_transport ? datagram_transport_.get() : nullptr, + /*provisional=*/true); return; } - // A full answer lets the peer send on the chosen transport and delete the - // rest. + // A full answer lets the peer delete the remaining transports. + // First, signal that the transports will be deleted so the application can + // stop using them. + SignalDataChannelTransportNegotiated( + this, use_datagram_transport ? datagram_transport_.get() : nullptr, + /*provisional=*/false); + if (use_datagram_transport) { RTC_LOG(INFO) << "Datagram transport activated"; composite_rtp_transport_->RemoveTransport(default_rtp_transport()); diff --git a/pc/jsep_transport.h b/pc/jsep_transport.h index 59b227606b..1a0e7b499a 100644 --- a/pc/jsep_transport.h +++ b/pc/jsep_transport.h @@ -243,6 +243,15 @@ class JsepTransport : public sigslot::has_slots<>, // This is signaled for changes in |media_transport_| state. sigslot::signal<> SignalMediaTransportStateChanged; + // Signals that a data channel transport was negotiated and may be used to + // send data. The first parameter is |this|. The second parameter is the + // transport that was negotiated, or null if negotiation rejected the data + // channel transport. The third parameter (bool) indicates whether the + // negotiation was provisional or final. If true, it is provisional, if + // false, it is final. + sigslot::signal3 + SignalDataChannelTransportNegotiated; + // TODO(deadbeef): The methods below are only public for testing. Should make // them utility functions or objects so they can be tested independently from // this class. @@ -303,8 +312,9 @@ class JsepTransport : public sigslot::has_slots<>, // Deactivates, signals removal, and deletes |composite_rtp_transport_| if the // current state of negotiation is sufficient to determine which rtp_transport - // to use. - void NegotiateRtpTransport(webrtc::SdpType type) RTC_RUN_ON(network_thread_); + // and data channel transport to use. + void NegotiateDatagramTransport(webrtc::SdpType type) + RTC_RUN_ON(network_thread_); // Returns the default (non-datagram) rtp transport, if any. webrtc::RtpTransportInternal* default_rtp_transport() const diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index ea14523ddb..1818858942 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -151,8 +151,10 @@ MediaTransportConfig JsepTransportController::GetMediaTransportConfig( media_transport = jsep_transport->media_transport(); } - DatagramTransportInterface* datagram_transport = - jsep_transport->datagram_transport(); + DatagramTransportInterface* datagram_transport = nullptr; + if (config_.use_datagram_transport) { + datagram_transport = jsep_transport->datagram_transport(); + } // Media transport and datagram transports can not be used together. RTC_DCHECK(!media_transport || !datagram_transport); @@ -167,15 +169,20 @@ MediaTransportConfig JsepTransportController::GetMediaTransportConfig( } } -MediaTransportInterface* -JsepTransportController::GetMediaTransportForDataChannel( +DataChannelTransportInterface* JsepTransportController::GetDataChannelTransport( const std::string& mid) const { auto jsep_transport = GetJsepTransportForMid(mid); - if (!jsep_transport || !config_.use_media_transport_for_data_channels) { + if (!jsep_transport) { return nullptr; } - return jsep_transport->media_transport(); + if (config_.use_media_transport_for_data_channels) { + return jsep_transport->media_transport(); + } else if (config_.use_datagram_transport_for_data_channels) { + return jsep_transport->datagram_transport(); + } + // Not configured to use a data channel transport. + return nullptr; } MediaTransportState JsepTransportController::GetMediaTransportState( @@ -437,7 +444,8 @@ void JsepTransportController::SetActiveResetSrtpParams( void JsepTransportController::SetMediaTransportSettings( bool use_media_transport_for_media, bool use_media_transport_for_data_channels, - bool use_datagram_transport) { + bool use_datagram_transport, + bool use_datagram_transport_for_data_channels) { RTC_DCHECK(use_media_transport_for_media == config_.use_media_transport_for_media || jsep_transports_by_name_.empty()) @@ -454,6 +462,8 @@ void JsepTransportController::SetMediaTransportSettings( config_.use_media_transport_for_data_channels = use_media_transport_for_data_channels; config_.use_datagram_transport = use_datagram_transport; + config_.use_datagram_transport_for_data_channels = + use_datagram_transport_for_data_channels; } std::unique_ptr @@ -482,7 +492,8 @@ JsepTransportController::CreateDtlsTransport( std::unique_ptr dtls; if (datagram_transport) { - RTC_DCHECK(config_.use_datagram_transport); + RTC_DCHECK(config_.use_datagram_transport || + config_.use_datagram_transport_for_data_channels); } else if (config_.media_transport_factory && config_.use_media_transport_for_media && config_.use_media_transport_for_data_channels) { @@ -862,12 +873,13 @@ bool JsepTransportController::SetTransportForMid( mid_to_transport_[mid] = jsep_transport; return config_.transport_observer->OnTransportChanged( mid, jsep_transport->rtp_transport(), jsep_transport->RtpDtlsTransport(), - jsep_transport->media_transport()); + jsep_transport->media_transport(), jsep_transport->datagram_transport(), + NegotiationState::kInitial); } void JsepTransportController::RemoveTransportForMid(const std::string& mid) { - bool ret = config_.transport_observer->OnTransportChanged(mid, nullptr, - nullptr, nullptr); + bool ret = config_.transport_observer->OnTransportChanged( + mid, nullptr, nullptr, nullptr, nullptr, NegotiationState::kFinal); // Calling OnTransportChanged with nullptr should always succeed, since it is // only expected to fail when adding media to a transport (not removing). RTC_DCHECK(ret); @@ -1076,7 +1088,8 @@ JsepTransportController::MaybeCreateDatagramTransport( return nullptr; } - if (!config_.use_datagram_transport) { + if (!(config_.use_datagram_transport || + config_.use_datagram_transport_for_data_channels)) { return nullptr; } @@ -1226,6 +1239,8 @@ RTCError JsepTransportController::MaybeCreateJsepTransport( this, &JsepTransportController::UpdateAggregateStates_n); jsep_transport->SignalMediaTransportStateChanged.connect( this, &JsepTransportController::OnMediaTransportStateChanged_n); + jsep_transport->SignalDataChannelTransportNegotiated.connect( + this, &JsepTransportController::OnDataChannelTransportNegotiated_n); SetTransportForMid(content_info.name, jsep_transport.get()); jsep_transports_by_name_[content_info.name] = std::move(jsep_transport); @@ -1256,8 +1271,9 @@ void JsepTransportController::DestroyAllJsepTransports_n() { RTC_DCHECK(network_thread_->IsCurrent()); for (const auto& jsep_transport : jsep_transports_by_name_) { - config_.transport_observer->OnTransportChanged(jsep_transport.first, - nullptr, nullptr, nullptr); + config_.transport_observer->OnTransportChanged( + jsep_transport.first, nullptr, nullptr, nullptr, nullptr, + NegotiationState::kFinal); } jsep_transports_by_name_.clear(); @@ -1433,6 +1449,21 @@ void JsepTransportController::OnMediaTransportStateChanged_n() { UpdateAggregateStates_n(); } +void JsepTransportController::OnDataChannelTransportNegotiated_n( + cricket::JsepTransport* transport, + DataChannelTransportInterface* data_channel_transport, + bool provisional) { + for (auto it : mid_to_transport_) { + if (it.second == transport) { + config_.transport_observer->OnTransportChanged( + it.first, transport->rtp_transport(), transport->RtpDtlsTransport(), + transport->media_transport(), data_channel_transport, + provisional ? NegotiationState::kProvisional + : NegotiationState::kFinal); + } + } +} + void JsepTransportController::UpdateAggregateStates_n() { RTC_DCHECK(network_thread_->IsCurrent()); @@ -1723,7 +1754,8 @@ JsepTransportController::GenerateOrGetLastMediaTransportOffer() { absl::optional JsepTransportController::GetTransportParameters(const std::string& mid) { - if (!config_.use_datagram_transport) { + if (!(config_.use_datagram_transport || + config_.use_datagram_transport_for_data_channels)) { return absl::nullopt; } diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h index 69af4574e3..de75db9432 100644 --- a/pc/jsep_transport_controller.h +++ b/pc/jsep_transport_controller.h @@ -47,6 +47,18 @@ namespace webrtc { class JsepTransportController : public sigslot::has_slots<> { public: + // State of negotiation for a transport. + enum class NegotiationState { + // Transport is in its initial state, not negotiated at all. + kInitial = 0, + + // Transport is negotiated, but not finalized. + kProvisional = 1, + + // Negotiation has completed for this transport. + kFinal = 2, + }; + // Used when the RtpTransport/DtlsTransport of the m= section is changed // because the section is rejected or BUNDLE is enabled. class Observer { @@ -56,11 +68,24 @@ class JsepTransportController : public sigslot::has_slots<> { // Returns true if media associated with |mid| was successfully set up to be // demultiplexed on |rtp_transport|. Could return false if two bundled m= // sections use the same SSRC, for example. + // + // If a data channel transport must be negotiated, |data_channel_transport| + // and |negotiation_state| indicate negotiation status. If + // |data_channel_transport| is null, the data channel transport should not + // be used. Otherwise, the value is a pointer to the transport to be used + // for data channels on |mid|, if any. + // + // The observer should not send data on |data_channel_transport| until + // |negotiation_state| is provisional or final. It should not delete + // |data_channel_transport| or any fallback transport until + // |negotiation_state| is final. virtual bool OnTransportChanged( const std::string& mid, RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, - MediaTransportInterface* media_transport) = 0; + MediaTransportInterface* media_transport, + DataChannelTransportInterface* data_channel_transport, + NegotiationState negotiation_state) = 0; }; struct Config { @@ -97,6 +122,9 @@ class JsepTransportController : public sigslot::has_slots<> { // Use encrypted datagram transport to send packets. bool use_datagram_transport = false; + // Use datagram transport's implementation of data channels instead of SCTP. + bool use_datagram_transport_for_data_channels = false; + // Optional media transport factory (experimental). If provided it will be // used to create media_transport (as long as either // |use_media_transport_for_media| or @@ -139,7 +167,7 @@ class JsepTransportController : public sigslot::has_slots<> { MediaTransportConfig GetMediaTransportConfig(const std::string& mid) const; - MediaTransportInterface* GetMediaTransportForDataChannel( + DataChannelTransportInterface* GetDataChannelTransport( const std::string& mid) const; // TODO(sukhanov): Deprecate, return only config. @@ -204,7 +232,8 @@ class JsepTransportController : public sigslot::has_slots<> { // Jsep transport is created, you can't change this setting. void SetMediaTransportSettings(bool use_media_transport_for_media, bool use_media_transport_for_data_channels, - bool use_datagram_transport); + bool use_datagram_transport, + bool use_datagram_transport_for_data_channels); // If media transport is present enabled and supported, // when this method is called, it creates a media transport and generates its @@ -253,6 +282,8 @@ class JsepTransportController : public sigslot::has_slots<> { sigslot::signal1 SignalDtlsHandshakeError; + // TODO(mellem): Delete this signal once PeerConnection no longer + // uses it to determine data channel state. sigslot::signal<> SignalMediaTransportStateChanged; private: @@ -399,6 +430,10 @@ class JsepTransportController : public sigslot::has_slots<> { void OnMediaTransportStateChanged_n(); void OnTransportCandidatePairChanged_n( const cricket::CandidatePairChangeEvent& event); + void OnDataChannelTransportNegotiated_n( + cricket::JsepTransport* transport, + DataChannelTransportInterface* data_channel_transport, + bool provisional); void UpdateAggregateStates_n(); diff --git a/pc/jsep_transport_controller_unittest.cc b/pc/jsep_transport_controller_unittest.cc index c4fd8d277e..458e09c38a 100644 --- a/pc/jsep_transport_controller_unittest.cc +++ b/pc/jsep_transport_controller_unittest.cc @@ -305,10 +305,13 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, } // JsepTransportController::Observer overrides. - bool OnTransportChanged(const std::string& mid, - RtpTransportInternal* rtp_transport, - rtc::scoped_refptr dtls_transport, - MediaTransportInterface* media_transport) override { + bool OnTransportChanged( + const std::string& mid, + RtpTransportInternal* rtp_transport, + rtc::scoped_refptr dtls_transport, + MediaTransportInterface* media_transport, + DataChannelTransportInterface* data_channel_transport, + JsepTransportController::NegotiationState negotiation_state) override { changed_rtp_transport_by_mid_[mid] = rtp_transport; if (dtls_transport) { changed_dtls_transport_by_mid_[mid] = dtls_transport->internal(); @@ -442,7 +445,7 @@ TEST_F(JsepTransportControllerTest, .ok()); FakeMediaTransport* media_transport = static_cast( - transport_controller_->GetMediaTransportForDataChannel(kAudioMid1)); + transport_controller_->GetDataChannelTransport(kAudioMid1)); ASSERT_NE(nullptr, media_transport); @@ -452,7 +455,7 @@ TEST_F(JsepTransportControllerTest, // Return nullptr for non-existing mids. EXPECT_EQ(nullptr, - transport_controller_->GetMediaTransportForDataChannel(kVideoMid2)); + transport_controller_->GetDataChannelTransport(kVideoMid2)); EXPECT_EQ(cricket::ICE_CANDIDATE_COMPONENT_RTP, transport_controller_->GetDtlsTransport(kAudioMid1)->component()) diff --git a/pc/media_session.cc b/pc/media_session.cc index f4f1554a62..e229ed6758 100644 --- a/pc/media_session.cc +++ b/pc/media_session.cc @@ -2255,7 +2255,9 @@ bool MediaSessionDescriptionFactory::AddDataContentForOffer( StreamParamsVec* current_streams, SessionDescription* desc, IceCredentialsIterator* ice_credentials) const { - bool is_sctp = (session_options.data_channel_type == DCT_SCTP); + bool is_sctp = + (session_options.data_channel_type == DCT_SCTP || + session_options.data_channel_type == DCT_DATA_CHANNEL_TRANSPORT_SCTP); // If the DataChannel type is not specified, use the DataChannel type in // the current description. if (session_options.data_channel_type == DCT_NONE && current_content) { diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index cc334b1c0e..0f7970c536 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -56,6 +56,7 @@ #include "rtc_base/logging.h" #include "rtc_base/string_encode.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/system/fallthrough.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/field_trial.h" @@ -110,6 +111,9 @@ namespace { // Field trials. // Controls datagram transport support. const char kDatagramTransportFieldTrial[] = "WebRTC-DatagramTransport"; +// Controls datagram transport data channel support. +const char kDatagramTransportDataChannelFieldTrial[] = + "WebRTC-DatagramTransportDataChannels"; // UMA metric names. const char kSimulcastVersionApplyLocalDescription[] = @@ -802,6 +806,7 @@ bool PeerConnectionInterface::RTCConfiguration::operator==( bool use_media_transport; bool use_media_transport_for_data_channels; absl::optional use_datagram_transport; + absl::optional use_datagram_transport_for_data_channels; absl::optional crypto_options; bool offer_extmap_allow_mixed; }; @@ -863,6 +868,8 @@ bool PeerConnectionInterface::RTCConfiguration::operator==( use_media_transport_for_data_channels == o.use_media_transport_for_data_channels && use_datagram_transport == o.use_datagram_transport && + use_datagram_transport_for_data_channels == + o.use_datagram_transport_for_data_channels && crypto_options == o.crypto_options && offer_extmap_allow_mixed == o.offer_extmap_allow_mixed; } @@ -907,6 +914,8 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, event_log_ptr_(event_log_.get()), datagram_transport_config_( field_trial::FindFullName(kDatagramTransportFieldTrial)), + datagram_transport_data_channel_config_( + field_trial::FindFullName(kDatagramTransportDataChannelFieldTrial)), rtcp_cname_(GenerateRtcpCname()), local_streams_(StreamCollection::Create()), remote_streams_(StreamCollection::Create()), @@ -940,7 +949,7 @@ PeerConnection::~PeerConnection() { webrtc_session_desc_factory_.reset(); sctp_invoker_.reset(); sctp_factory_.reset(); - media_transport_invoker_.reset(); + data_channel_transport_invoker_.reset(); transport_controller_.reset(); // port_allocator_ lives on the network thread and should be destroyed there. @@ -1070,7 +1079,12 @@ bool PeerConnection::Initialize( use_datagram_transport_ = datagram_transport_config_.enabled && configuration.use_datagram_transport.value_or( datagram_transport_config_.default_value); - if (use_datagram_transport_ || configuration.use_media_transport || + use_datagram_transport_for_data_channels_ = + datagram_transport_data_channel_config_.enabled && + configuration.use_datagram_transport_for_data_channels.value_or( + datagram_transport_data_channel_config_.default_value); + if (use_datagram_transport_ || use_datagram_transport_for_data_channels_ || + configuration.use_media_transport || configuration.use_media_transport_for_data_channels) { if (!factory_->media_transport_factory()) { RTC_DCHECK(false) @@ -1101,6 +1115,8 @@ bool PeerConnection::Initialize( config.use_media_transport_for_data_channels = configuration.use_media_transport_for_data_channels; config.use_datagram_transport = use_datagram_transport_; + config.use_datagram_transport_for_data_channels = + use_datagram_transport_for_data_channels_; config.media_transport_factory = factory_->media_transport_factory(); } @@ -1156,7 +1172,21 @@ bool PeerConnection::Initialize( } } - if (configuration.use_media_transport_for_data_channels) { + if (use_datagram_transport_for_data_channels_) { + if (configuration.enable_rtp_data_channel) { + RTC_LOG(LS_ERROR) << "enable_rtp_data_channel and " + "use_datagram_transport_for_data_channels are " + "incompatible and cannot both be set to true"; + return false; + } + if (configuration.enable_dtls_srtp && !*configuration.enable_dtls_srtp) { + RTC_LOG(LS_INFO) << "Using data channel transport with no fallback"; + data_channel_type_ = cricket::DCT_DATA_CHANNEL_TRANSPORT; + } else { + RTC_LOG(LS_INFO) << "Using data channel transport with fallback to SCTP"; + data_channel_type_ = cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP; + } + } else if (configuration.use_media_transport_for_data_channels) { if (configuration.enable_rtp_data_channel) { RTC_LOG(LS_ERROR) << "enable_rtp_data_channel and " "use_media_transport_for_data_channels are " @@ -3162,9 +3192,11 @@ RTCError PeerConnection::UpdateDataChannel( return RTCError::OK(); } if (content.rejected) { + RTC_LOG(LS_INFO) << "Rejected data channel, mid=" << content.mid(); DestroyDataChannel(); } else { - if (!rtp_data_channel_ && !sctp_transport_ && !media_transport_) { + if (!rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { + RTC_LOG(LS_INFO) << "Creating data channel, mid=" << content.mid(); if (!CreateDataChannel(content.name)) { LOG_AND_RETURN_ERROR(RTCErrorType::INTERNAL_ERROR, "Failed to create data channel."); @@ -3532,10 +3564,30 @@ RTCError PeerConnection::SetConfiguration( "after calling SetRemoteDescription."); } + if (local_description() && + configuration.use_datagram_transport_for_data_channels != + configuration_.use_datagram_transport_for_data_channels) { + LOG_AND_RETURN_ERROR( + RTCErrorType::INVALID_MODIFICATION, + "Can't change use_datagram_transport_for_data_channels " + "after calling SetLocalDescription."); + } + + if (remote_description() && + configuration.use_datagram_transport_for_data_channels != + configuration_.use_datagram_transport_for_data_channels) { + LOG_AND_RETURN_ERROR( + RTCErrorType::INVALID_MODIFICATION, + "Can't change use_datagram_transport_for_data_channels " + "after calling SetRemoteDescription."); + } + if (configuration.use_media_transport_for_data_channels || configuration.use_media_transport || (configuration.use_datagram_transport && - *configuration.use_datagram_transport)) { + *configuration.use_datagram_transport) || + (configuration.use_datagram_transport_for_data_channels && + *configuration.use_datagram_transport_for_data_channels)) { RTC_CHECK(configuration.bundle_policy == kBundlePolicyMaxBundle) << "Media transport requires MaxBundle policy."; } @@ -3571,6 +3623,8 @@ RTCError PeerConnection::SetConfiguration( modified_config.use_media_transport_for_data_channels = configuration.use_media_transport_for_data_channels; modified_config.use_datagram_transport = configuration.use_datagram_transport; + modified_config.use_datagram_transport_for_data_channels = + configuration.use_datagram_transport_for_data_channels; if (configuration != modified_config) { LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_MODIFICATION, "Modifying the configuration in an unsupported way."); @@ -3633,10 +3687,14 @@ RTCError PeerConnection::SetConfiguration( use_datagram_transport_ = datagram_transport_config_.enabled && modified_config.use_datagram_transport.value_or( datagram_transport_config_.default_value); + use_datagram_transport_for_data_channels_ = + datagram_transport_data_channel_config_.enabled && + modified_config.use_datagram_transport_for_data_channels.value_or( + datagram_transport_data_channel_config_.default_value); transport_controller_->SetMediaTransportSettings( modified_config.use_media_transport, modified_config.use_media_transport_for_data_channels, - use_datagram_transport_); + use_datagram_transport_, use_datagram_transport_for_data_channels_); if (configuration_.active_reset_srtp_params != modified_config.active_reset_srtp_params) { @@ -4398,7 +4456,7 @@ void PeerConnection::GetOptionsForOffer( } // If datagram transport is in use, add opaque transport parameters. - if (use_datagram_transport_) { + if (use_datagram_transport_ || use_datagram_transport_for_data_channels_) { for (auto& options : session_options->media_description_options) { options.transport_options.opaque_parameters = transport_controller_->GetTransportParameters(options.mid); @@ -4704,7 +4762,7 @@ void PeerConnection::GetOptionsForAnswer( port_allocator_.get())); // If datagram transport is in use, add opaque transport parameters. - if (use_datagram_transport_) { + if (use_datagram_transport_ || use_datagram_transport_for_data_channels_) { for (auto& options : session_options->media_description_options) { options.transport_options.opaque_parameters = transport_controller_->GetTransportParameters(options.mid); @@ -4878,9 +4936,10 @@ absl::optional PeerConnection::GetDataMid() const { } return rtp_data_channel_->content_name(); case cricket::DCT_SCTP: - return sctp_mid_; case cricket::DCT_MEDIA_TRANSPORT: - return media_transport_data_mid_; + case cricket::DCT_DATA_CHANNEL_TRANSPORT: + case cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP: + return sctp_mid_; default: return absl::nullopt; } @@ -5640,14 +5699,14 @@ bool PeerConnection::GetSctpSslRole(rtc::SSLRole* role) { "SSL Role of the SCTP transport."; return false; } - if (!sctp_transport_ && !media_transport_) { + if (!sctp_transport_ && !data_channel_transport_) { RTC_LOG(LS_INFO) << "Non-rejected SCTP m= section is needed to get the " "SSL Role of the SCTP transport."; return false; } absl::optional dtls_role; - if (sctp_mid_) { + if (sctp_mid_ && sctp_transport_) { dtls_role = transport_controller_->GetDtlsRole(*sctp_mid_); } else if (is_caller_) { dtls_role = *is_caller_ ? rtc::SSL_SERVER : rtc::SSL_CLIENT; @@ -5883,12 +5942,7 @@ bool PeerConnection::SendData(const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel_ && !sctp_transport_ && !media_transport_) { - RTC_LOG(LS_ERROR) << "SendData called when rtp_data_channel_, " - "sctp_transport_, and media_transport_ are NULL."; - return false; - } - if (media_transport_) { + if (data_channel_transport_ && data_channel_transport_negotiated_) { SendDataParams send_params; send_params.type = ToWebrtcDataMessageType(params.type); send_params.ordered = params.ordered; @@ -5897,39 +5951,44 @@ bool PeerConnection::SendData(const cricket::SendDataParams& params, } else if (params.max_rtx_ms >= 0) { send_params.max_rtx_ms = params.max_rtx_ms; } - return media_transport_->SendData(params.sid, send_params, payload).ok(); + return data_channel_transport_->SendData(params.sid, send_params, payload) + .ok(); + } else if (sctp_transport_ && sctp_negotiated_) { + return network_thread()->Invoke( + RTC_FROM_HERE, Bind(&cricket::SctpTransportInternal::SendData, + cricket_sctp_transport(), params, payload, result)); + } else if (rtp_data_channel_) { + return rtp_data_channel_->SendData(params, payload, result); } - return rtp_data_channel_ - ? rtp_data_channel_->SendData(params, payload, result) - : network_thread()->Invoke( - RTC_FROM_HERE, - Bind(&cricket::SctpTransportInternal::SendData, - cricket_sctp_transport(), params, payload, result)); + RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; + return false; } bool PeerConnection::ConnectDataChannel(DataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel_ && !sctp_transport_ && !media_transport_) { + if (!rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { // Don't log an error here, because DataChannels are expected to call // ConnectDataChannel in this state. It's the only way to initially tell // whether or not the underlying transport is ready. return false; } - if (media_transport_) { - SignalMediaTransportWritable_s.connect(webrtc_data_channel, - &DataChannel::OnChannelReady); - SignalMediaTransportReceivedData_s.connect(webrtc_data_channel, - &DataChannel::OnDataReceived); - SignalMediaTransportChannelClosing_s.connect( + if (data_channel_transport_) { + SignalDataChannelTransportWritable_s.connect(webrtc_data_channel, + &DataChannel::OnChannelReady); + SignalDataChannelTransportReceivedData_s.connect( + webrtc_data_channel, &DataChannel::OnDataReceived); + SignalDataChannelTransportChannelClosing_s.connect( webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); - SignalMediaTransportChannelClosed_s.connect( + SignalDataChannelTransportChannelClosed_s.connect( webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); - } else if (rtp_data_channel_) { + } + if (rtp_data_channel_) { rtp_data_channel_->SignalReadyToSendData.connect( webrtc_data_channel, &DataChannel::OnChannelReady); rtp_data_channel_->SignalDataReceived.connect(webrtc_data_channel, &DataChannel::OnDataReceived); - } else { + } + if (sctp_transport_) { SignalSctpReadyToSendData.connect(webrtc_data_channel, &DataChannel::OnChannelReady); SignalSctpDataReceived.connect(webrtc_data_channel, @@ -5944,21 +6003,23 @@ bool PeerConnection::ConnectDataChannel(DataChannel* webrtc_data_channel) { void PeerConnection::DisconnectDataChannel(DataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel_ && !sctp_transport_ && !media_transport_) { + if (!rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { RTC_LOG(LS_ERROR) << "DisconnectDataChannel called when rtp_data_channel_ and " "sctp_transport_ are NULL."; return; } - if (media_transport_) { - SignalMediaTransportWritable_s.disconnect(webrtc_data_channel); - SignalMediaTransportReceivedData_s.disconnect(webrtc_data_channel); - SignalMediaTransportChannelClosing_s.disconnect(webrtc_data_channel); - SignalMediaTransportChannelClosed_s.disconnect(webrtc_data_channel); - } else if (rtp_data_channel_) { + if (data_channel_transport_) { + SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); + SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); + } + if (rtp_data_channel_) { rtp_data_channel_->SignalReadyToSendData.disconnect(webrtc_data_channel); rtp_data_channel_->SignalDataReceived.disconnect(webrtc_data_channel); - } else { + } + if (sctp_transport_) { SignalSctpReadyToSendData.disconnect(webrtc_data_channel); SignalSctpDataReceived.disconnect(webrtc_data_channel); SignalSctpClosingProcedureStartedRemotely.disconnect(webrtc_data_channel); @@ -5967,9 +6028,8 @@ void PeerConnection::DisconnectDataChannel(DataChannel* webrtc_data_channel) { } void PeerConnection::AddSctpDataStream(int sid) { - if (media_transport_) { - media_transport_->OpenChannel(sid); - return; + if (data_channel_transport_) { + data_channel_transport_->OpenChannel(sid); } if (!sctp_transport_) { RTC_LOG(LS_ERROR) @@ -5982,9 +6042,8 @@ void PeerConnection::AddSctpDataStream(int sid) { } void PeerConnection::RemoveSctpDataStream(int sid) { - if (media_transport_) { - media_transport_->CloseChannel(sid); - return; + if (data_channel_transport_) { + data_channel_transport_->CloseChannel(sid); } if (!sctp_transport_) { RTC_LOG(LS_ERROR) << "RemoveSctpDataStream called when sctp_transport_ is " @@ -5999,8 +6058,9 @@ void PeerConnection::RemoveSctpDataStream(int sid) { bool PeerConnection::ReadyToSendData() const { RTC_DCHECK_RUN_ON(signaling_thread()); return (rtp_data_channel_ && rtp_data_channel_->ready_to_send_data()) || - (media_transport_ && media_transport_ready_to_send_data_) || - sctp_ready_to_send_data_; + (data_channel_transport_ && data_channel_transport_ready_to_send_ && + data_channel_transport_negotiated_) || + (sctp_ready_to_send_data_ && sctp_negotiated_); } void PeerConnection::OnDataReceived(int channel_id, @@ -6010,30 +6070,43 @@ void PeerConnection::OnDataReceived(int channel_id, cricket::ReceiveDataParams params; params.sid = channel_id; params.type = ToCricketDataMessageType(type); - media_transport_invoker_->AsyncInvoke( + data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { RTC_DCHECK_RUN_ON(signaling_thread()); if (!HandleOpenMessage_s(params, buffer)) { - SignalMediaTransportReceivedData_s(params, buffer); + SignalDataChannelTransportReceivedData_s(params, buffer); } }); } void PeerConnection::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - media_transport_invoker_->AsyncInvoke( + data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this, channel_id] { RTC_DCHECK_RUN_ON(signaling_thread()); - SignalMediaTransportChannelClosing_s(channel_id); + SignalDataChannelTransportChannelClosing_s(channel_id); }); } void PeerConnection::OnChannelClosed(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - media_transport_invoker_->AsyncInvoke( + data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this, channel_id] { RTC_DCHECK_RUN_ON(signaling_thread()); - SignalMediaTransportChannelClosed_s(channel_id); + SignalDataChannelTransportChannelClosed_s(channel_id); + }); +} + +void PeerConnection::OnReadyToSend() { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + data_channel_transport_ready_to_send_ = true; + if (data_channel_transport_negotiated_) { + SignalDataChannelTransportWritable_s( + data_channel_transport_ready_to_send_); + } }); } @@ -6444,7 +6517,7 @@ RTCError PeerConnection::CreateChannels(const SessionDescription& desc) { const cricket::ContentInfo* data = cricket::GetFirstDataContent(&desc); if (data_channel_type_ != cricket::DCT_NONE && data && !data->rejected && - !rtp_data_channel_ && !sctp_transport_ && !media_transport_) { + !rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { if (!CreateDataChannel(data->name)) { LOG_AND_RETURN_ERROR(RTCErrorType::INTERNAL_ERROR, "Failed to create data channel."); @@ -6503,33 +6576,33 @@ cricket::VideoChannel* PeerConnection::CreateVideoChannel( bool PeerConnection::CreateDataChannel(const std::string& mid) { switch (data_channel_type_) { - case cricket::DCT_MEDIA_TRANSPORT: - if (network_thread()->Invoke( - RTC_FROM_HERE, - rtc::Bind(&PeerConnection::SetupMediaTransportForDataChannels_n, - this, mid))) { - for (const auto& channel : sctp_data_channels_) { - channel->OnTransportChannelCreated(); - } - return true; - } - return false; case cricket::DCT_SCTP: - if (!sctp_factory_) { - RTC_LOG(LS_ERROR) - << "Trying to create SCTP transport, but didn't compile with " - "SCTP support (HAVE_SCTP)"; + // Only using SCTP transport. No more setup required. Since SCTP is + // the only option, it doesn't need to wait for negotiation. + sctp_negotiated_ = true; + if (!CreateSctpDataChannel(mid)) { return false; } - if (!network_thread()->Invoke( - RTC_FROM_HERE, - rtc::Bind(&PeerConnection::CreateSctpTransport_n, this, mid))) { + break; + case cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP: + // Setup a data channel transport with SCTP as a fallback. Which one is + // used will be determined later in negotiation. + if (!CreateSctpDataChannel(mid)) { return false; } - for (const auto& channel : sctp_data_channels_) { - channel->OnTransportChannelCreated(); + if (!SetupDataChannelTransport(mid)) { + return false; } - return true; + break; + case cricket::DCT_DATA_CHANNEL_TRANSPORT: + case cricket::DCT_MEDIA_TRANSPORT: + // Using data channel transport without a fallback. It is the only + // option. Data channel transport doesn't need to be negotiated. + data_channel_transport_negotiated_ = true; + if (!SetupDataChannelTransport(mid)) { + return false; + } + break; case cricket::DCT_RTP: default: RtpTransportInternal* rtp_transport = GetRtpTransport(mid); @@ -6547,6 +6620,34 @@ bool PeerConnection::CreateDataChannel(const std::string& mid) { return true; } + // All non-RTP data channels must initialize |sctp_data_channels_|. + for (const auto& channel : sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } + return true; +} + +bool PeerConnection::CreateSctpDataChannel(const std::string& mid) { + if (!sctp_factory_) { + RTC_LOG(LS_ERROR) + << "Trying to create SCTP transport, but didn't compile with " + "SCTP support (HAVE_SCTP)"; + return false; + } + if (!network_thread()->Invoke( + RTC_FROM_HERE, + rtc::Bind(&PeerConnection::CreateSctpTransport_n, this, mid))) { + return false; + } + return true; +} + +bool PeerConnection::SetupDataChannelTransport(const std::string& mid) { + if (!network_thread()->Invoke( + RTC_FROM_HERE, + rtc::Bind(&PeerConnection::SetupDataChannelTransport_n, this, mid))) { + return false; + } return true; } @@ -6566,6 +6667,8 @@ Call::Stats PeerConnection::GetCallStats() { bool PeerConnection::CreateSctpTransport_n(const std::string& mid) { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(sctp_factory_); + RTC_LOG(LS_INFO) << "Creating SCTP transport for mid=" << mid; + rtc::scoped_refptr webrtc_dtls_transport = transport_controller_->LookupDtlsTransportByMid(mid); cricket::DtlsTransportInternal* dtls_transport = @@ -6596,15 +6699,22 @@ bool PeerConnection::CreateSctpTransport_n(const std::string& mid) { void PeerConnection::DestroySctpTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); + RTC_LOG(LS_INFO) << "Destroying SCTP transport for mid=" << *sctp_mid_; + sctp_transport_->Clear(); sctp_transport_ = nullptr; - sctp_mid_.reset(); + // |sctp_mid_| may still be active through a data channel transport. If not, + // unset it. + if (!data_channel_transport_) { + sctp_mid_.reset(); + } sctp_invoker_.reset(nullptr); } void PeerConnection::OnSctpTransportReadyToSendData_n() { RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); + RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || + data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); // Note: Cannot use rtc::Bind here because it will grab a reference to // PeerConnection and potentially cause PeerConnection to live longer than // expected. It is safe not to grab a reference since the sctp_invoker_ will @@ -6618,14 +6728,17 @@ void PeerConnection::OnSctpTransportReadyToSendData_n() { void PeerConnection::OnSctpTransportReadyToSendData_s(bool ready) { RTC_DCHECK_RUN_ON(signaling_thread()); sctp_ready_to_send_data_ = ready; - SignalSctpReadyToSendData(ready); + if (sctp_negotiated_) { + SignalSctpReadyToSendData(ready); + } } void PeerConnection::OnSctpTransportDataReceived_n( const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& payload) { RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); + RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || + data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); // Note: Cannot use rtc::Bind here because it will grab a reference to // PeerConnection and potentially cause PeerConnection to live longer than // expected. It is safe not to grab a reference since the sctp_invoker_ will @@ -6648,7 +6761,8 @@ void PeerConnection::OnSctpTransportDataReceived_s( void PeerConnection::OnSctpClosingProcedureStartedRemotely_n(int sid) { RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); + RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || + data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); sctp_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), rtc::Bind(&sigslot::signal1::operator(), @@ -6657,26 +6771,30 @@ void PeerConnection::OnSctpClosingProcedureStartedRemotely_n(int sid) { void PeerConnection::OnSctpClosingProcedureComplete_n(int sid) { RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); + RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || + data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); sctp_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), rtc::Bind(&sigslot::signal1::operator(), &SignalSctpClosingProcedureComplete, sid)); } -bool PeerConnection::SetupMediaTransportForDataChannels_n( - const std::string& mid) { - media_transport_ = - transport_controller_->GetMediaTransportForDataChannel(mid); - if (!media_transport_) { +bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) { + data_channel_transport_ = transport_controller_->GetDataChannelTransport(mid); + if (!data_channel_transport_) { RTC_LOG(LS_ERROR) - << "Media transport is not available for data channels, mid=" << mid; + << "Data channel transport is not available for data channels, mid=" + << mid; return false; } + RTC_LOG(LS_INFO) << "Setting up data channel transport for mid=" << mid; - media_transport_invoker_ = absl::make_unique(); - media_transport_->SetDataSink(this); - media_transport_data_mid_ = mid; + data_channel_transport_invoker_ = absl::make_unique(); + data_channel_transport_->SetDataSink(this); + sctp_mid_ = mid; + // TODO(mellem): Handling data channel state through media transport is + // deprecated. Delete these lines when downstream implementations call + // DataChannelSink::OnStateChanged(). transport_controller_->SignalMediaTransportStateChanged.connect( this, &PeerConnection::OnMediaTransportStateChanged_n); // Check the initial state right away, in case transport is already writable. @@ -6684,28 +6802,42 @@ bool PeerConnection::SetupMediaTransportForDataChannels_n( return true; } -void PeerConnection::TeardownMediaTransportForDataChannels_n() { - if (!media_transport_) { +void PeerConnection::TeardownDataChannelTransport_n() { + if (!data_channel_transport_) { return; } + RTC_LOG(LS_INFO) << "Tearing down data channel transport for mid=" + << *sctp_mid_; + + // TODO(mellem): Delete this line when downstream implementations call + // DataChannelSink::OnStateChanged(). transport_controller_->SignalMediaTransportStateChanged.disconnect(this); - media_transport_data_mid_.reset(); - media_transport_->SetDataSink(nullptr); - media_transport_invoker_ = nullptr; - media_transport_ = nullptr; + // |sctp_mid_| may still be active through an SCTP transport. If not, unset + // it. + if (!sctp_transport_) { + sctp_mid_.reset(); + } + data_channel_transport_->SetDataSink(nullptr); + data_channel_transport_invoker_ = nullptr; + data_channel_transport_ = nullptr; } +// TODO(mellem): Handling of data channel state through the media transport +// callback is deprecated. This function should be deleted once downstream +// implementations call DataChannelSink::OnStateChanged(). void PeerConnection::OnMediaTransportStateChanged_n() { - if (!media_transport_data_mid_ || - transport_controller_->GetMediaTransportState( - *media_transport_data_mid_) != MediaTransportState::kWritable) { + if (!sctp_mid_ || transport_controller_->GetMediaTransportState(*sctp_mid_) != + MediaTransportState::kWritable) { return; } - media_transport_invoker_->AsyncInvoke( + data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this] { RTC_DCHECK_RUN_ON(signaling_thread()); - media_transport_ready_to_send_data_ = true; - SignalMediaTransportWritable_s(media_transport_ready_to_send_data_); + data_channel_transport_ready_to_send_ = true; + if (data_channel_transport_negotiated_) { + SignalDataChannelTransportWritable_s( + data_channel_transport_ready_to_send_); + } }); } @@ -7267,11 +7399,11 @@ void PeerConnection::DestroyDataChannel() { sctp_ready_to_send_data_ = false; } - if (media_transport_) { + if (data_channel_transport_) { OnDataChannelDestroyed(); network_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(network_thread()); - TeardownMediaTransportForDataChannels_n(); + TeardownDataChannelTransport_n(); }); } } @@ -7302,7 +7434,9 @@ bool PeerConnection::OnTransportChanged( const std::string& mid, RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, - MediaTransportInterface* media_transport) { + MediaTransportInterface* media_transport, + DataChannelTransportInterface* data_channel_transport, + JsepTransportController::NegotiationState negotiation_state) { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUNS_SERIALIZED(&use_media_transport_race_checker_); bool ret = true; @@ -7318,6 +7452,48 @@ bool PeerConnection::OnTransportChanged( RTC_LOG(LS_ERROR) << "Media transport isn't supported."; } + if (mid == sctp_mid_) { + switch (negotiation_state) { + case JsepTransportController::NegotiationState::kFinal: + if (data_channel_transport) { + if (sctp_transport_) { + DestroySctpTransport_n(); + } + } else { + TeardownDataChannelTransport_n(); + } + // We also need to mark the remaining transport as ready-to-send. + RTC_FALLTHROUGH(); + case JsepTransportController::NegotiationState::kProvisional: { + rtc::AsyncInvoker* invoker = data_channel_transport_invoker_ + ? data_channel_transport_invoker_.get() + : sctp_invoker_.get(); + if (!invoker) { + break; // Have neither SCTP nor DataChannelTransport, nothing to do. + } + invoker->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this, data_channel_transport] { + RTC_DCHECK_RUN_ON(signaling_thread()); + if (data_channel_transport) { + data_channel_transport_negotiated_ = true; + if (data_channel_transport_ready_to_send_) { + SignalDataChannelTransportWritable_s( + data_channel_transport_ready_to_send_); + } + } else { + sctp_negotiated_ = true; + if (sctp_ready_to_send_data_) { + SignalSctpReadyToSendData(sctp_ready_to_send_data_); + } + } + }); + } break; + case JsepTransportController::NegotiationState::kInitial: + // Negotiation isn't finished. Nothing to do here. + break; + } + } + return ret; } diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 82d2a70c08..3328a921ef 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -18,6 +18,7 @@ #include #include +#include "api/data_channel_transport_interface.h" #include "api/media_transport_interface.h" #include "api/peer_connection_interface.h" #include "api/turn_customizer.h" @@ -358,6 +359,28 @@ class PeerConnection : public PeerConnectionInternal, FieldTrialFlag default_value; }; + // Field-trial based configuration for datagram transport data channels. + struct DatagramTransportDataChannelConfig { + explicit DatagramTransportDataChannelConfig(const std::string& field_trial) + : enabled("enabled", true), default_value("default_value", false) { + ParseFieldTrial({&enabled, &default_value}, field_trial); + } + + // Whether datagram transport data channel support is enabled at all. + // Defaults to true, allowing datagram transport to be used if (a) the + // application provides a factory for it and (b) the configuration specifies + // its use. This flag provides a kill-switch to force-disable datagram + // transport across all applications, without code changes. + FieldTrialFlag enabled; + + // Whether the datagram transport data channels are enabled or disabled by + // default. Defaults to false, meaning that applications must configure use + // of datagram transport through RTCConfiguration. If set to true, + // applications will use the datagram transport by default (but may still + // explicitly configure themselves not to use it through RTCConfiguration). + FieldTrialFlag default_value; + }; + // Implements MessageHandler. void OnMessage(rtc::Message* msg) override; @@ -906,6 +929,7 @@ class PeerConnection : public PeerConnectionInternal, const rtc::CopyOnWriteBuffer& buffer) override; void OnChannelClosing(int channel_id) override; void OnChannelClosed(int channel_id) override; + void OnReadyToSend() override; // Called when an RTCCertificate is generated or retrieved by // WebRTCSessionDescriptionFactory. Should happen before setLocalDescription. @@ -997,6 +1021,10 @@ class PeerConnection : public PeerConnectionInternal, cricket::VideoChannel* CreateVideoChannel(const std::string& mid) RTC_RUN_ON(signaling_thread()); bool CreateDataChannel(const std::string& mid) RTC_RUN_ON(signaling_thread()); + bool CreateSctpDataChannel(const std::string& mid) + RTC_RUN_ON(signaling_thread()); + bool SetupDataChannelTransport(const std::string& mid) + RTC_RUN_ON(signaling_thread()); bool CreateSctpTransport_n(const std::string& mid); // For bundling. @@ -1016,10 +1044,10 @@ class PeerConnection : public PeerConnectionInternal, void OnSctpClosingProcedureStartedRemotely_n(int sid); void OnSctpClosingProcedureComplete_n(int sid); - bool SetupMediaTransportForDataChannels_n(const std::string& mid) + bool SetupDataChannelTransport_n(const std::string& mid) RTC_RUN_ON(network_thread()); void OnMediaTransportStateChanged_n() RTC_RUN_ON(network_thread()); - void TeardownMediaTransportForDataChannels_n() RTC_RUN_ON(network_thread()); + void TeardownDataChannelTransport_n() RTC_RUN_ON(network_thread()); bool ValidateBundleSettings(const cricket::SessionDescription* desc); bool HasRtcpMuxEnabled(const cricket::ContentInfo* content); @@ -1122,10 +1150,13 @@ class PeerConnection : public PeerConnectionInternal, // from a session description, and the mapping from m= sections to transports // changed (as a result of BUNDLE negotiation, or m= sections being // rejected). - bool OnTransportChanged(const std::string& mid, - RtpTransportInternal* rtp_transport, - rtc::scoped_refptr dtls_transport, - MediaTransportInterface* media_transport) override; + bool OnTransportChanged( + const std::string& mid, + RtpTransportInternal* rtp_transport, + rtc::scoped_refptr dtls_transport, + MediaTransportInterface* media_transport, + DataChannelTransportInterface* data_channel_transport, + JsepTransportController::NegotiationState negotiation_state) override; // RtpSenderBase::SetStreamsObserver override. void OnSetStreams() override; @@ -1185,9 +1216,17 @@ class PeerConnection : public PeerConnectionInternal, // Field-trial based configuration for datagram transport. const DatagramTransportConfig datagram_transport_config_; + // Field-trial based configuration for datagram transport data channels. + const DatagramTransportConfig datagram_transport_data_channel_config_; + // Final, resolved value for whether datagram transport is in use. bool use_datagram_transport_ RTC_GUARDED_BY(signaling_thread()) = false; + // Equivalent of |use_datagram_transport_|, but for its use with data + // channels. + bool use_datagram_transport_for_data_channels_ + RTC_GUARDED_BY(signaling_thread()) = false; + // Cache configuration_.use_media_transport so that we can access it from // other threads. // TODO(bugs.webrtc.org/9987): Caching just this bool and allowing the data @@ -1296,6 +1335,9 @@ class PeerConnection : public PeerConnectionInternal, // signaling and network thread. // |sctp_mid_| is the content name (MID) in SDP. + // Note: this is used as the data channel MID by both SCTP and data channel + // transports. It is set when either transport is initialized and unset when + // both transports are deleted. absl::optional sctp_mid_; // TODO(bugs.webrtc.org/9987): Accessed on both signaling // and network thread. @@ -1304,6 +1346,9 @@ class PeerConnection : public PeerConnectionInternal, // fires on the signaling thread. bool sctp_ready_to_send_data_ RTC_GUARDED_BY(signaling_thread()) = false; + // Whether the use of SCTP has been successfully negotiated. + bool sctp_negotiated_ RTC_GUARDED_BY(signaling_thread()) = false; + // Same as signals provided by SctpTransport, but these are guaranteed to // fire on the signaling thread, whereas SctpTransport fires on the networking // thread. @@ -1328,33 +1373,34 @@ class PeerConnection : public PeerConnectionInternal, // Whether this peer is the caller. Set when the local description is applied. absl::optional is_caller_ RTC_GUARDED_BY(signaling_thread()); - // Content name (MID) for media transport data channels in SDP. - absl::optional - media_transport_data_mid_; // TODO(bugs.webrtc.org/9987): Accessed on - // both signaling and network thread. - - // Media transport used for data channels. Thread-safe. - MediaTransportInterface* media_transport_ = + // Plugin transport used for data channels. Thread-safe. + DataChannelTransportInterface* data_channel_transport_ = nullptr; // TODO(bugs.webrtc.org/9987): Object is thread safe, but // pointer accessed on both signaling and network thread. - // Cached value of whether the media transport is ready to send. - bool media_transport_ready_to_send_data_ RTC_GUARDED_BY(signaling_thread()) = + // Cached value of whether the data channel transport is ready to send. + bool data_channel_transport_ready_to_send_ + RTC_GUARDED_BY(signaling_thread()) = false; + + // Whether the use of the data channel transport has been successfully + // negotiated. + bool data_channel_transport_negotiated_ RTC_GUARDED_BY(signaling_thread()) = false; - // Used to invoke media transport signals on the signaling thread. - std::unique_ptr media_transport_invoker_ + // Used to invoke data channel transport signals on the signaling thread. + std::unique_ptr data_channel_transport_invoker_ RTC_GUARDED_BY(network_thread()); // Identical to the signals for SCTP, but from media transport: - sigslot::signal1 SignalMediaTransportWritable_s + sigslot::signal1 SignalDataChannelTransportWritable_s RTC_GUARDED_BY(signaling_thread()); sigslot::signal2 - SignalMediaTransportReceivedData_s RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalMediaTransportChannelClosing_s + SignalDataChannelTransportReceivedData_s + RTC_GUARDED_BY(signaling_thread()); + sigslot::signal1 SignalDataChannelTransportChannelClosing_s RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalMediaTransportChannelClosed_s + sigslot::signal1 SignalDataChannelTransportChannelClosed_s RTC_GUARDED_BY(signaling_thread()); std::unique_ptr current_local_description_ diff --git a/pc/peer_connection_integrationtest.cc b/pc/peer_connection_integrationtest.cc index 59bfb0483a..2151b5ef18 100644 --- a/pc/peer_connection_integrationtest.cc +++ b/pc/peer_connection_integrationtest.cc @@ -774,6 +774,7 @@ class PeerConnectionWrapper : public webrtc::PeerConnectionObserver, SdpType type = desc->GetType(); std::string sdp; EXPECT_TRUE(desc->ToString(&sdp)); + RTC_LOG(LS_INFO) << debug_name_ << ": local SDP contents=\n" << sdp; pc()->SetLocalDescription(observer, desc.release()); if (sdp_semantics_ == SdpSemantics::kUnifiedPlan) { RemoveUnusedVideoRenderers(); @@ -3554,8 +3555,211 @@ TEST_P(PeerConnectionIntegrationTest, kDefaultTimeout); } +// Tests that the datagram transport to SCTP fallback works correctly when +// datagram transport negotiation fails. +TEST_P(PeerConnectionIntegrationTest, + DatagramTransportDataChannelFallbackToSctp) { + PeerConnectionInterface::RTCConfiguration rtc_config; + rtc_config.rtcp_mux_policy = PeerConnectionInterface::kRtcpMuxPolicyRequire; + rtc_config.bundle_policy = PeerConnectionInterface::kBundlePolicyMaxBundle; + rtc_config.use_datagram_transport_for_data_channels = true; + + // Configure one endpoint to use datagram transport for data channels while + // the other does not. + ASSERT_TRUE(CreatePeerConnectionWrappersWithConfigAndMediaTransportFactory( + rtc_config, RTCConfiguration(), + loopback_media_transports()->first_factory(), nullptr)); + ConnectFakeSignaling(); + + // The caller offers a data channel using either datagram transport or SCTP. + caller()->CreateDataChannel(); + caller()->AddAudioVideoTracks(); + callee()->AddAudioVideoTracks(); + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + + // Negotiation should fallback to SCTP, allowing the data channel to be + // established. + ASSERT_NE(nullptr, caller()->data_channel()); + ASSERT_TRUE_WAIT(callee()->data_channel() != nullptr, kDefaultTimeout); + EXPECT_TRUE_WAIT(caller()->data_observer()->IsOpen(), kDefaultTimeout); + EXPECT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout); + + // Ensure data can be sent in both directions. + std::string data = "hello world"; + caller()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, callee()->data_observer()->last_message(), + kDefaultTimeout); + callee()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, caller()->data_observer()->last_message(), + kDefaultTimeout); + + // Ensure that failure of the datagram negotiation doesn't impede media flow. + MediaExpectations media_expectations; + media_expectations.ExpectBidirectionalAudioAndVideo(); + ASSERT_TRUE(ExpectNewFrames(media_expectations)); +} + #endif // HAVE_SCTP +// This test sets up a call between two parties with a datagram transport data +// channel. +TEST_P(PeerConnectionIntegrationTest, DatagramTransportDataChannelEndToEnd) { + PeerConnectionInterface::RTCConfiguration rtc_config; + rtc_config.rtcp_mux_policy = PeerConnectionInterface::kRtcpMuxPolicyRequire; + rtc_config.bundle_policy = PeerConnectionInterface::kBundlePolicyMaxBundle; + rtc_config.use_datagram_transport_for_data_channels = true; + rtc_config.enable_dtls_srtp = false; + ASSERT_TRUE(CreatePeerConnectionWrappersWithConfigAndMediaTransportFactory( + rtc_config, rtc_config, loopback_media_transports()->first_factory(), + loopback_media_transports()->second_factory())); + ConnectFakeSignaling(); + + // Expect that data channel created on caller side will show up for callee as + // well. + caller()->CreateDataChannel(); + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + + // Ensure that the media transport is ready. + loopback_media_transports()->SetState(webrtc::MediaTransportState::kWritable); + loopback_media_transports()->FlushAsyncInvokes(); + + // Caller data channel should already exist (it created one). Callee data + // channel may not exist yet, since negotiation happens in-band, not in SDP. + ASSERT_NE(nullptr, caller()->data_channel()); + ASSERT_TRUE_WAIT(callee()->data_channel() != nullptr, kDefaultTimeout); + EXPECT_TRUE_WAIT(caller()->data_observer()->IsOpen(), kDefaultTimeout); + EXPECT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout); + + // Ensure data can be sent in both directions. + std::string data = "hello world"; + caller()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, callee()->data_observer()->last_message(), + kDefaultTimeout); + callee()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, caller()->data_observer()->last_message(), + kDefaultTimeout); +} + +// Ensures that when the callee closes a datagram transport data channel, the +// closing procedure results in the data channel being closed for the caller +// as well. +TEST_P(PeerConnectionIntegrationTest, + DatagramTransportDataChannelCalleeCloses) { + PeerConnectionInterface::RTCConfiguration rtc_config; + rtc_config.use_datagram_transport_for_data_channels = true; + rtc_config.enable_dtls_srtp = false; + ASSERT_TRUE(CreatePeerConnectionWrappersWithConfigAndMediaTransportFactory( + rtc_config, rtc_config, loopback_media_transports()->first_factory(), + loopback_media_transports()->second_factory())); + ConnectFakeSignaling(); + + // Create a data channel on the caller and signal it to the callee. + caller()->CreateDataChannel(); + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + + // Ensure that the media transport is ready. + loopback_media_transports()->SetState(webrtc::MediaTransportState::kWritable); + loopback_media_transports()->FlushAsyncInvokes(); + + // Data channels exist and open on both ends of the connection. + ASSERT_NE(nullptr, caller()->data_channel()); + ASSERT_TRUE_WAIT(callee()->data_channel() != nullptr, kDefaultTimeout); + ASSERT_TRUE_WAIT(caller()->data_observer()->IsOpen(), kDefaultTimeout); + ASSERT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout); + + // Close the data channel on the callee side, and wait for it to reach the + // "closed" state on both sides. + callee()->data_channel()->Close(); + EXPECT_TRUE_WAIT(!caller()->data_observer()->IsOpen(), kDefaultTimeout); + EXPECT_TRUE_WAIT(!callee()->data_observer()->IsOpen(), kDefaultTimeout); +} + +// Tests that datagram transport data channels can do in-band negotiation. +TEST_P(PeerConnectionIntegrationTest, + DatagramTransportDataChannelConfigSentToOtherSide) { + PeerConnectionInterface::RTCConfiguration rtc_config; + rtc_config.use_datagram_transport_for_data_channels = true; + rtc_config.enable_dtls_srtp = false; + ASSERT_TRUE(CreatePeerConnectionWrappersWithConfigAndMediaTransportFactory( + rtc_config, rtc_config, loopback_media_transports()->first_factory(), + loopback_media_transports()->second_factory())); + ConnectFakeSignaling(); + + // Create a data channel with a non-default configuration and signal it to the + // callee. + webrtc::DataChannelInit init; + init.id = 53; + init.maxRetransmits = 52; + caller()->CreateDataChannel("data-channel", &init); + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + + // Ensure that the media transport is ready. + loopback_media_transports()->SetState(webrtc::MediaTransportState::kWritable); + loopback_media_transports()->FlushAsyncInvokes(); + + // Ensure that the data channel exists on the callee with the correct + // configuration. + ASSERT_TRUE_WAIT(callee()->data_channel() != nullptr, kDefaultTimeout); + ASSERT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout); + // Since "negotiate" is false, the "id" parameter is ignored. + EXPECT_NE(init.id, callee()->data_channel()->id()); + EXPECT_EQ("data-channel", callee()->data_channel()->label()); + EXPECT_EQ(init.maxRetransmits, callee()->data_channel()->maxRetransmits()); + EXPECT_FALSE(callee()->data_channel()->negotiated()); +} + +TEST_P(PeerConnectionIntegrationTest, + DatagramTransportDataChannelRejectedWithNoFallback) { + PeerConnectionInterface::RTCConfiguration offerer_config; + offerer_config.rtcp_mux_policy = + PeerConnectionInterface::kRtcpMuxPolicyRequire; + offerer_config.bundle_policy = + PeerConnectionInterface::kBundlePolicyMaxBundle; + offerer_config.use_datagram_transport_for_data_channels = true; + // Disabling DTLS precludes a fallback to SCTP. + offerer_config.enable_dtls_srtp = false; + + PeerConnectionInterface::RTCConfiguration answerer_config; + answerer_config.rtcp_mux_policy = + PeerConnectionInterface::kRtcpMuxPolicyRequire; + answerer_config.bundle_policy = + PeerConnectionInterface::kBundlePolicyMaxBundle; + // Both endpoints must disable DTLS or SetRemoteDescription will fail. + answerer_config.enable_dtls_srtp = false; + + // Configure one endpoint to use datagram transport for data channels while + // the other does not. + ASSERT_TRUE(CreatePeerConnectionWrappersWithConfigAndMediaTransportFactory( + offerer_config, answerer_config, + loopback_media_transports()->first_factory(), nullptr)); + ConnectFakeSignaling(); + + // The caller offers a data channel using either datagram transport or SCTP. + caller()->CreateDataChannel(); + caller()->AddAudioVideoTracks(); + callee()->AddAudioVideoTracks(); + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + + // Caller data channel should already exist (it created one). Callee data + // channel should not exist, since negotiation happens in-band, not in SDP. + EXPECT_NE(nullptr, caller()->data_channel()); + EXPECT_EQ(nullptr, callee()->data_channel()); + + // The caller's data channel should close when the datagram transport is + // rejected. + EXPECT_FALSE(caller()->data_observer()->IsOpen()); + + // Media flow should not be impacted by the failed data channel. + MediaExpectations media_expectations; + media_expectations.ExpectBidirectionalAudioAndVideo(); + ASSERT_TRUE(ExpectNewFrames(media_expectations)); +} + // This test sets up a call between two parties with a media transport data // channel. TEST_P(PeerConnectionIntegrationTest, MediaTransportDataChannelEndToEnd) {