From bc3eebc722eec38c7436a2a5ddf2c545c2bea02d Mon Sep 17 00:00:00 2001 From: Bjorn A Mellem Date: Mon, 23 Sep 2019 14:53:54 -0700 Subject: [PATCH] Reland "Reland "Refactor SCTP data channels to use DataChannelTransportInterface."" This is a reland of 487f9a17e426fd14bb06b13e861071b3f15d119b Original change's description: > Reland "Refactor SCTP data channels to use DataChannelTransportInterface." > > Also clears SctpTransport before deleting JsepTransport. > > SctpTransport is ref-counted, but the underlying transport is deleted when > JsepTransport clears the rtp_dtls_transport. This results in crashes when > usrsctp attempts to send outgoing packets through a dangling pointer to the > underlying transport. > > Clearing SctpTransport before DtlsTransport removes the pointer to the > underlying transport before it becomes invalid. > > This fixes a crash in chromium's web platform tests (see > https://chromium-review.googlesource.com/c/chromium/src/+/1776711). > > Original change's description: > > Refactor SCTP data channels to use DataChannelTransportInterface. > > > > This change moves SctpTransport to be owned by JsepTransport, which now > > holds a DataChannelTransport implementation for SCTP when it is used for > > data channels. > > > > This simplifies negotiation and fallback to SCTP. Negotiation can now > > use a composite DataChannelTransport, just as negotiation for RTP uses a > > composite RTP transport. > > > > PeerConnection also has one fewer way it needs to manage data channels. > > It now handles SCTP and datagram- or media-transport-based data channels > > the same way. > > > > There are a few leaky abstractions left. For example, PeerConnection > > calls Start() on the SctpTransport at a particular point in negotiation, > > but does not need to call this for other transports. Similarly, PC > > exposes an interface to the SCTP transport directly to the user; there > > is no equivalent for other transports. > > Bug: webrtc:9719 > Change-Id: I64e94b88afb119fdbf5f22750f88c8a084d53937 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/151981 > Reviewed-by: Benjamin Wright > Reviewed-by: Steve Anton > Commit-Queue: Benjamin Wright > Commit-Queue: Bjorn Mellem > Cr-Commit-Position: refs/heads/master@{#29120} Bug: webrtc:9719 Change-Id: I28481a3de64a3506bc57748106383eeba4ef205c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/152740 Reviewed-by: Artem Titov Reviewed-by: Benjamin Wright Reviewed-by: Seth Hampson Commit-Queue: Bjorn Mellem Cr-Commit-Position: refs/heads/master@{#29290} --- api/test/loopback_media_transport.cc | 22 +- api/test/loopback_media_transport.h | 22 + pc/BUILD.gn | 8 +- pc/composite_data_channel_transport.cc | 117 +++++ pc/composite_data_channel_transport.h | 62 +++ pc/jsep_transport.cc | 68 ++- pc/jsep_transport.h | 39 +- pc/jsep_transport_controller.cc | 50 +- pc/jsep_transport_controller.h | 23 +- pc/jsep_transport_controller_unittest.cc | 3 +- pc/jsep_transport_unittest.cc | 4 +- pc/peer_connection.cc | 536 ++++++-------------- pc/peer_connection.h | 77 +-- pc/peer_connection_data_channel_unittest.cc | 14 + pc/peer_connection_integrationtest.cc | 94 ++++ pc/sctp_data_channel_transport.cc | 112 ++++ pc/sctp_data_channel_transport.h | 50 ++ pc/sctp_utils.cc | 29 ++ pc/sctp_utils.h | 7 + test/fuzzers/BUILD.gn | 2 +- test/peer_scenario/scenario_connection.cc | 6 +- 21 files changed, 835 insertions(+), 510 deletions(-) create mode 100644 pc/composite_data_channel_transport.cc create mode 100644 pc/composite_data_channel_transport.h create mode 100644 pc/sctp_data_channel_transport.cc create mode 100644 pc/sctp_data_channel_transport.h diff --git a/api/test/loopback_media_transport.cc b/api/test/loopback_media_transport.cc index cadcff0e71..e341a38876 100644 --- a/api/test/loopback_media_transport.cc +++ b/api/test/loopback_media_transport.cc @@ -271,7 +271,11 @@ void MediaTransportPair::LoopbackMediaTransport::Connect( } void MediaTransportPair::LoopbackMediaTransport::Connect( - rtc::PacketTransportInternal* packet_transport) {} + rtc::PacketTransportInternal* packet_transport) { + if (state_after_connect_) { + SetState(*state_after_connect_); + } +} absl::optional MediaTransportPair::LoopbackMediaTransport::GetTransportParametersOffer() @@ -504,6 +508,11 @@ void MediaTransportPair::LoopbackMediaTransport::SetState( }); } +void MediaTransportPair::LoopbackMediaTransport::SetStateAfterConnect( + MediaTransportState state) { + state_after_connect_ = state; +} + void MediaTransportPair::LoopbackMediaTransport::FlushAsyncInvokes() { invoker_.Flush(thread_); dc_transport_.FlushAsyncInvokes(); @@ -610,7 +619,11 @@ void MediaTransportPair::LoopbackDatagramTransport::Connect( } void MediaTransportPair::LoopbackDatagramTransport::Connect( - rtc::PacketTransportInternal* packet_transport) {} + rtc::PacketTransportInternal* packet_transport) { + if (state_after_connect_) { + SetState(*state_after_connect_); + } +} CongestionControlInterface* MediaTransportPair::LoopbackDatagramTransport::congestion_control() { @@ -670,6 +683,11 @@ void MediaTransportPair::LoopbackDatagramTransport::SetState( dc_transport_.OnReadyToSend(state == MediaTransportState::kWritable); } +void MediaTransportPair::LoopbackDatagramTransport::SetStateAfterConnect( + MediaTransportState state) { + state_after_connect_ = state; +} + void MediaTransportPair::LoopbackDatagramTransport::FlushAsyncInvokes() { dc_transport_.FlushAsyncInvokes(); } diff --git a/api/test/loopback_media_transport.h b/api/test/loopback_media_transport.h index 1087d9a857..bacfd5ea38 100644 --- a/api/test/loopback_media_transport.h +++ b/api/test/loopback_media_transport.h @@ -111,6 +111,16 @@ class MediaTransportPair { second_datagram_transport_.SetState(state); } + void SetFirstState(MediaTransportState state) { + first_.SetState(state); + first_datagram_transport_.SetState(state); + } + + void SetSecondStateAfterConnect(MediaTransportState state) { + second_.SetState(state); + second_datagram_transport_.SetState(state); + } + void SetFirstDatagramTransportParameters(const std::string& params) { first_datagram_transport_.set_transport_parameters(params); } @@ -214,6 +224,10 @@ class MediaTransportPair { void SetState(MediaTransportState state); + // When Connect() is called, the media transport will enter this state. + // This is useful for mimicking zero-RTT connectivity, for example. + void SetStateAfterConnect(MediaTransportState state); + RTCError OpenChannel(int channel_id) override; RTCError SendData(int channel_id, @@ -270,6 +284,8 @@ class MediaTransportPair { MediaTransportState state_ RTC_GUARDED_BY(thread_) = MediaTransportState::kPending; + absl::optional state_after_connect_; + LoopbackMediaTransport* other_; Stats stats_ RTC_GUARDED_BY(stats_lock_); @@ -306,6 +322,10 @@ class MediaTransportPair { // Loopback-specific functionality. void SetState(MediaTransportState state); + + // When Connect() is called, the datagram transport will enter this state. + // This is useful for mimicking zero-RTT connectivity, for example. + void SetStateAfterConnect(MediaTransportState state); void FlushAsyncInvokes(); void set_transport_parameters(const std::string& value) { @@ -316,6 +336,8 @@ class MediaTransportPair { LoopbackDataChannelTransport dc_transport_; std::string transport_parameters_; + + absl::optional state_after_connect_; }; LoopbackMediaTransport first_; diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 07db208cc4..18025b8299 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -34,6 +34,8 @@ rtc_static_library("rtc_pc_base") { "channel_interface.h", "channel_manager.cc", "channel_manager.h", + "composite_data_channel_transport.cc", + "composite_data_channel_transport.h", "composite_rtp_transport.cc", "composite_rtp_transport.h", "datagram_rtp_transport.cc", @@ -59,8 +61,12 @@ rtc_static_library("rtc_pc_base") { "rtp_transport.cc", "rtp_transport.h", "rtp_transport_internal.h", + "sctp_data_channel_transport.cc", + "sctp_data_channel_transport.h", "sctp_transport.cc", "sctp_transport.h", + "sctp_utils.cc", + "sctp_utils.h", "session_description.cc", "session_description.h", "simulcast_description.cc", @@ -192,8 +198,6 @@ rtc_static_library("peerconnection") { "rtp_sender.h", "rtp_transceiver.cc", "rtp_transceiver.h", - "sctp_utils.cc", - "sctp_utils.h", "sdp_serializer.cc", "sdp_serializer.h", "sdp_utils.cc", diff --git a/pc/composite_data_channel_transport.cc b/pc/composite_data_channel_transport.cc new file mode 100644 index 0000000000..185dd1e23a --- /dev/null +++ b/pc/composite_data_channel_transport.cc @@ -0,0 +1,117 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "pc/composite_data_channel_transport.h" + +#include + +#include "absl/algorithm/container.h" + +namespace webrtc { + +CompositeDataChannelTransport::CompositeDataChannelTransport( + std::vector transports) + : transports_(std::move(transports)) { + for (auto transport : transports_) { + transport->SetDataSink(this); + } +} + +void CompositeDataChannelTransport::SetSendTransport( + DataChannelTransportInterface* send_transport) { + if (!absl::c_linear_search(transports_, send_transport)) { + return; + } + send_transport_ = send_transport; + // NB: OnReadyToSend() checks if we're actually ready to send, and signals + // |sink_| if appropriate. This signal is required upon setting the sink. + OnReadyToSend(); +} + +void CompositeDataChannelTransport::RemoveTransport( + DataChannelTransportInterface* transport) { + RTC_DCHECK(transport != send_transport_) << "Cannot remove send transport"; + + auto it = absl::c_find(transports_, transport); + if (it == transports_.end()) { + return; + } + + transport->SetDataSink(nullptr); + transports_.erase(it); +} + +RTCError CompositeDataChannelTransport::OpenChannel(int channel_id) { + RTCError error = RTCError::OK(); + for (auto transport : transports_) { + RTCError e = transport->OpenChannel(channel_id); + if (!e.ok()) { + error = std::move(e); + } + } + return error; +} + +RTCError CompositeDataChannelTransport::SendData( + int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) { + if (send_transport_) { + return send_transport_->SendData(channel_id, params, buffer); + } + return RTCError(RTCErrorType::NETWORK_ERROR, "Send transport is not ready"); +} + +RTCError CompositeDataChannelTransport::CloseChannel(int channel_id) { + if (send_transport_) { + return send_transport_->CloseChannel(channel_id); + } + return RTCError(RTCErrorType::NETWORK_ERROR, "Send transport is not ready"); +} + +void CompositeDataChannelTransport::SetDataSink(DataChannelSink* sink) { + sink_ = sink; + // NB: OnReadyToSend() checks if we're actually ready to send, and signals + // |sink_| if appropriate. This signal is required upon setting the sink. + OnReadyToSend(); +} + +bool CompositeDataChannelTransport::IsReadyToSend() const { + return send_transport_ && send_transport_->IsReadyToSend(); +} + +void CompositeDataChannelTransport::OnDataReceived( + int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) { + if (sink_) { + sink_->OnDataReceived(channel_id, type, buffer); + } +} + +void CompositeDataChannelTransport::OnChannelClosing(int channel_id) { + if (sink_) { + sink_->OnChannelClosing(channel_id); + } +} + +void CompositeDataChannelTransport::OnChannelClosed(int channel_id) { + if (sink_) { + sink_->OnChannelClosed(channel_id); + } +} + +void CompositeDataChannelTransport::OnReadyToSend() { + if (sink_ && send_transport_ && send_transport_->IsReadyToSend()) { + sink_->OnReadyToSend(); + } +} + +} // namespace webrtc diff --git a/pc/composite_data_channel_transport.h b/pc/composite_data_channel_transport.h new file mode 100644 index 0000000000..ccff4fe7ab --- /dev/null +++ b/pc/composite_data_channel_transport.h @@ -0,0 +1,62 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef PC_COMPOSITE_DATA_CHANNEL_TRANSPORT_H_ +#define PC_COMPOSITE_DATA_CHANNEL_TRANSPORT_H_ + +#include + +#include "api/data_channel_transport_interface.h" +#include "rtc_base/critical_section.h" + +namespace webrtc { + +// Composite implementation of DataChannelTransportInterface. Allows users to +// receive data channel messages over multiple transports and send over one of +// those transports. +class CompositeDataChannelTransport : public DataChannelTransportInterface, + public DataChannelSink { + public: + explicit CompositeDataChannelTransport( + std::vector transports); + + // Specifies which transport to be used for sending. Must be called before + // sending data. + void SetSendTransport(DataChannelTransportInterface* send_transport); + + // Removes a given transport from the composite, if present. + void RemoveTransport(DataChannelTransportInterface* transport); + + // DataChannelTransportInterface overrides. + RTCError OpenChannel(int channel_id) override; + RTCError SendData(int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) override; + RTCError CloseChannel(int channel_id) override; + void SetDataSink(DataChannelSink* sink) override; + bool IsReadyToSend() const override; + + // DataChannelSink overrides. + void OnDataReceived(int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) override; + void OnChannelClosing(int channel_id) override; + void OnChannelClosed(int channel_id) override; + void OnReadyToSend() override; + + private: + std::vector transports_; + DataChannelTransportInterface* send_transport_ = nullptr; + DataChannelSink* sink_ = nullptr; +}; + +} // namespace webrtc + +#endif // PC_COMPOSITE_DATA_CHANNEL_TRANSPORT_H_ diff --git a/pc/jsep_transport.cc b/pc/jsep_transport.cc index 22f4f8d1f5..b95dc22cfb 100644 --- a/pc/jsep_transport.cc +++ b/pc/jsep_transport.cc @@ -21,6 +21,7 @@ #include "api/candidate.h" #include "p2p/base/p2p_constants.h" #include "p2p/base/p2p_transport_channel.h" +#include "pc/sctp_data_channel_transport.h" #include "rtc_base/checks.h" #include "rtc_base/copy_on_write_buffer.h" #include "rtc_base/logging.h" @@ -101,8 +102,10 @@ JsepTransport::JsepTransport( std::unique_ptr datagram_rtp_transport, std::unique_ptr rtp_dtls_transport, std::unique_ptr rtcp_dtls_transport, + std::unique_ptr sctp_transport, std::unique_ptr media_transport, - std::unique_ptr datagram_transport) + std::unique_ptr datagram_transport, + webrtc::DataChannelTransportInterface* data_channel_transport) : network_thread_(rtc::Thread::Current()), mid_(mid), local_certificate_(local_certificate), @@ -121,8 +124,17 @@ JsepTransport::JsepTransport( ? new rtc::RefCountedObject( std::move(rtcp_dtls_transport)) : nullptr), + sctp_data_channel_transport_( + sctp_transport ? std::make_unique( + sctp_transport.get()) + : nullptr), + sctp_transport_(sctp_transport + ? new rtc::RefCountedObject( + std::move(sctp_transport)) + : nullptr), media_transport_(std::move(media_transport)), - datagram_transport_(std::move(datagram_transport)) { + datagram_transport_(std::move(datagram_transport)), + data_channel_transport_(data_channel_transport) { RTC_DCHECK(ice_transport_); RTC_DCHECK(rtp_dtls_transport_); // |rtcp_ice_transport_| must be present iff |rtcp_dtls_transport_| is @@ -143,6 +155,10 @@ JsepTransport::JsepTransport( RTC_DCHECK(!sdes_transport); } + if (sctp_transport_) { + sctp_transport_->SetDtlsTransport(rtp_dtls_transport_); + } + if (datagram_rtp_transport_ && default_rtp_transport()) { composite_rtp_transport_ = std::make_unique( std::vector{ @@ -152,6 +168,13 @@ JsepTransport::JsepTransport( if (media_transport_) { media_transport_->SetMediaTransportStateCallback(this); } + + if (data_channel_transport_ && sctp_data_channel_transport_) { + composite_data_channel_transport_ = + std::make_unique( + std::vector{ + data_channel_transport_, sctp_data_channel_transport_.get()}); + } } JsepTransport::~JsepTransport() { @@ -162,6 +185,10 @@ JsepTransport::~JsepTransport() { media_transport_.reset(); } + if (sctp_transport_) { + sctp_transport_->Clear(); + } + // Clear all DtlsTransports. There may be pointers to these from // other places, so we can't assume they'll be deleted by the destructor. rtp_dtls_transport_->Clear(); @@ -789,26 +816,20 @@ void JsepTransport::NegotiateDatagramTransport(SdpType type) { use_datagram_transport ? datagram_rtp_transport_.get() : default_rtp_transport()); } + if (composite_data_channel_transport_) { + composite_data_channel_transport_->SetSendTransport( + use_datagram_transport ? data_channel_transport_ + : sctp_data_channel_transport_.get()); + } if (type != SdpType::kAnswer) { - // A provisional answer lets the peer start sending on the chosen - // transport, but does not allow it to destroy other transports yet. - SignalDataChannelTransportNegotiated( - this, use_datagram_transport ? datagram_transport_.get() : nullptr, - /*provisional=*/true); return; } - // A full answer lets the peer delete the remaining transports. - // First, signal that the transports will be deleted so the application can - // stop using them. - SignalDataChannelTransportNegotiated( - this, use_datagram_transport ? datagram_transport_.get() : nullptr, - /*provisional=*/false); - if (use_datagram_transport) { if (composite_rtp_transport_) { - // Remove and delete the non-datagram RTP transport. + // Negotiated use of datagram transport for RTP, so remove the + // non-datagram RTP transport. composite_rtp_transport_->RemoveTransport(default_rtp_transport()); if (unencrypted_rtp_transport_) { unencrypted_rtp_transport_ = nullptr; @@ -818,12 +839,29 @@ void JsepTransport::NegotiateDatagramTransport(SdpType type) { dtls_srtp_transport_ = nullptr; } } + if (composite_data_channel_transport_) { + // Negotiated use of datagram transport for data channels, so remove the + // non-datagram data channel transport. + composite_data_channel_transport_->RemoveTransport( + sctp_data_channel_transport_.get()); + sctp_data_channel_transport_ = nullptr; + sctp_transport_ = nullptr; + } } else { // Remove and delete the datagram transport. if (composite_rtp_transport_) { composite_rtp_transport_->RemoveTransport(datagram_rtp_transport_.get()); } + if (composite_data_channel_transport_) { + composite_data_channel_transport_->RemoveTransport( + data_channel_transport_); + } else { + // If there's no composite data channel transport, we need to signal that + // the data channel is about to be deleted. + SignalDataChannelTransportNegotiated(this, nullptr); + } datagram_rtp_transport_ = nullptr; + data_channel_transport_ = nullptr; datagram_transport_ = nullptr; } } diff --git a/pc/jsep_transport.h b/pc/jsep_transport.h index 7bd0b0719e..868f7b92c6 100644 --- a/pc/jsep_transport.h +++ b/pc/jsep_transport.h @@ -21,14 +21,17 @@ #include "api/jsep.h" #include "api/transport/datagram_transport_interface.h" #include "api/transport/media/media_transport_interface.h" +#include "media/sctp/sctp_transport_internal.h" #include "p2p/base/dtls_transport.h" #include "p2p/base/p2p_constants.h" #include "p2p/base/transport_info.h" +#include "pc/composite_data_channel_transport.h" #include "pc/composite_rtp_transport.h" #include "pc/dtls_srtp_transport.h" #include "pc/dtls_transport.h" #include "pc/rtcp_mux_filter.h" #include "pc/rtp_transport.h" +#include "pc/sctp_transport.h" #include "pc/session_description.h" #include "pc/srtp_filter.h" #include "pc/srtp_transport.h" @@ -96,8 +99,10 @@ class JsepTransport : public sigslot::has_slots<>, std::unique_ptr datagram_rtp_transport, std::unique_ptr rtp_dtls_transport, std::unique_ptr rtcp_dtls_transport, + std::unique_ptr sctp_transport, std::unique_ptr media_transport, - std::unique_ptr datagram_transport); + std::unique_ptr datagram_transport, + webrtc::DataChannelTransportInterface* data_channel_transport); ~JsepTransport() override; @@ -215,6 +220,21 @@ class JsepTransport : public sigslot::has_slots<>, return rtp_dtls_transport_; } + rtc::scoped_refptr SctpTransport() const { + rtc::CritScope scope(&accessor_lock_); + return sctp_transport_; + } + + webrtc::DataChannelTransportInterface* data_channel_transport() const { + rtc::CritScope scope(&accessor_lock_); + if (composite_data_channel_transport_) { + return composite_data_channel_transport_.get(); + } else if (sctp_data_channel_transport_) { + return sctp_data_channel_transport_.get(); + } + return data_channel_transport_; + } + // Returns media transport, if available. // Note that media transport is owned by jseptransport and the pointer // to media transport will becomes invalid after destruction of jseptransport. @@ -249,7 +269,7 @@ class JsepTransport : public sigslot::has_slots<>, // channel transport. The third parameter (bool) indicates whether the // negotiation was provisional or final. If true, it is provisional, if // false, it is final. - sigslot::signal3 + sigslot::signal2 SignalDataChannelTransportNegotiated; // TODO(deadbeef): The methods below are only public for testing. Should make @@ -375,6 +395,11 @@ class JsepTransport : public sigslot::has_slots<>, rtc::scoped_refptr datagram_dtls_transport_ RTC_GUARDED_BY(accessor_lock_); + std::unique_ptr + sctp_data_channel_transport_ RTC_GUARDED_BY(accessor_lock_); + rtc::scoped_refptr sctp_transport_ + RTC_GUARDED_BY(accessor_lock_); + SrtpFilter sdes_negotiator_ RTC_GUARDED_BY(network_thread_); RtcpMuxFilter rtcp_mux_negotiator_ RTC_GUARDED_BY(network_thread_); @@ -392,6 +417,16 @@ class JsepTransport : public sigslot::has_slots<>, std::unique_ptr datagram_transport_ RTC_GUARDED_BY(accessor_lock_); + // Non-SCTP data channel transport. Set to one of |media_transport_| or + // |datagram_transport_| if that transport should be used for data chanels. + // Unset if neither should be used for data channels. + webrtc::DataChannelTransportInterface* data_channel_transport_ + RTC_GUARDED_BY(accessor_lock_) = nullptr; + + // Composite data channel transport, used during negotiation. + std::unique_ptr + composite_data_channel_transport_ RTC_GUARDED_BY(accessor_lock_); + // If |media_transport_| is provided, this variable represents the state of // media transport. // diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index c9ed4d573f..75f5d9db8c 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -175,14 +175,7 @@ DataChannelTransportInterface* JsepTransportController::GetDataChannelTransport( if (!jsep_transport) { return nullptr; } - - if (config_.use_media_transport_for_data_channels) { - return jsep_transport->media_transport(); - } else if (config_.use_datagram_transport_for_data_channels) { - return jsep_transport->datagram_transport(); - } - // Not configured to use a data channel transport. - return nullptr; + return jsep_transport->data_channel_transport(); } MediaTransportState JsepTransportController::GetMediaTransportState( @@ -221,6 +214,15 @@ JsepTransportController::LookupDtlsTransportByMid(const std::string& mid) { return jsep_transport->RtpDtlsTransport(); } +rtc::scoped_refptr JsepTransportController::GetSctpTransport( + const std::string& mid) const { + auto jsep_transport = GetJsepTransportForMid(mid); + if (!jsep_transport) { + return nullptr; + } + return jsep_transport->SctpTransport(); +} + void JsepTransportController::SetIceConfig(const cricket::IceConfig& config) { if (!network_thread_->IsCurrent()) { network_thread_->Invoke(RTC_FROM_HERE, [&] { SetIceConfig(config); }); @@ -873,13 +875,13 @@ bool JsepTransportController::SetTransportForMid( mid_to_transport_[mid] = jsep_transport; return config_.transport_observer->OnTransportChanged( mid, jsep_transport->rtp_transport(), jsep_transport->RtpDtlsTransport(), - jsep_transport->media_transport(), jsep_transport->datagram_transport(), - NegotiationState::kInitial); + jsep_transport->media_transport(), + jsep_transport->data_channel_transport()); } void JsepTransportController::RemoveTransportForMid(const std::string& mid) { bool ret = config_.transport_observer->OnTransportChanged( - mid, nullptr, nullptr, nullptr, nullptr, NegotiationState::kFinal); + mid, nullptr, nullptr, nullptr, nullptr); // Calling OnTransportChanged with nullptr should always succeed, since it is // only expected to fail when adding media to a transport (not removing). RTC_DCHECK(ret); @@ -1229,13 +1231,27 @@ RTCError JsepTransportController::MaybeCreateJsepTransport( content_info.name, rtp_dtls_transport.get(), rtcp_dtls_transport.get()); } + std::unique_ptr sctp_transport; + if (config_.sctp_factory) { + sctp_transport = + config_.sctp_factory->CreateSctpTransport(rtp_dtls_transport.get()); + } + + DataChannelTransportInterface* data_channel_transport = nullptr; + if (config_.use_datagram_transport_for_data_channels) { + data_channel_transport = datagram_transport.get(); + } else if (config_.use_media_transport_for_data_channels) { + data_channel_transport = media_transport.get(); + } + std::unique_ptr jsep_transport = std::make_unique( content_info.name, certificate_, std::move(ice), std::move(rtcp_ice), std::move(unencrypted_rtp_transport), std::move(sdes_transport), std::move(dtls_srtp_transport), std::move(datagram_rtp_transport), std::move(rtp_dtls_transport), std::move(rtcp_dtls_transport), - std::move(media_transport), std::move(datagram_transport)); + std::move(sctp_transport), std::move(media_transport), + std::move(datagram_transport), data_channel_transport); jsep_transport->rtp_transport()->SignalRtcpPacketReceived.connect( this, &JsepTransportController::OnRtcpPacketReceived_n); @@ -1277,8 +1293,7 @@ void JsepTransportController::DestroyAllJsepTransports_n() { for (const auto& jsep_transport : jsep_transports_by_name_) { config_.transport_observer->OnTransportChanged( - jsep_transport.first, nullptr, nullptr, nullptr, nullptr, - NegotiationState::kFinal); + jsep_transport.first, nullptr, nullptr, nullptr, nullptr); } jsep_transports_by_name_.clear(); @@ -1455,15 +1470,12 @@ void JsepTransportController::OnMediaTransportStateChanged_n() { void JsepTransportController::OnDataChannelTransportNegotiated_n( cricket::JsepTransport* transport, - DataChannelTransportInterface* data_channel_transport, - bool provisional) { + DataChannelTransportInterface* data_channel_transport) { for (auto it : mid_to_transport_) { if (it.second == transport) { config_.transport_observer->OnTransportChanged( it.first, transport->rtp_transport(), transport->RtpDtlsTransport(), - transport->media_transport(), data_channel_transport, - provisional ? NegotiationState::kProvisional - : NegotiationState::kFinal); + transport->media_transport(), data_channel_transport); } } } diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h index c95a62d9fd..12bcebc335 100644 --- a/pc/jsep_transport_controller.h +++ b/pc/jsep_transport_controller.h @@ -47,18 +47,6 @@ namespace webrtc { class JsepTransportController : public sigslot::has_slots<> { public: - // State of negotiation for a transport. - enum class NegotiationState { - // Transport is in its initial state, not negotiated at all. - kInitial = 0, - - // Transport is negotiated, but not finalized. - kProvisional = 1, - - // Negotiation has completed for this transport. - kFinal = 2, - }; - // Used when the RtpTransport/DtlsTransport of the m= section is changed // because the section is rejected or BUNDLE is enabled. class Observer { @@ -84,8 +72,7 @@ class JsepTransportController : public sigslot::has_slots<> { RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, MediaTransportInterface* media_transport, - DataChannelTransportInterface* data_channel_transport, - NegotiationState negotiation_state) = 0; + DataChannelTransportInterface* data_channel_transport) = 0; }; struct Config { @@ -114,6 +101,9 @@ class JsepTransportController : public sigslot::has_slots<> { bool active_reset_srtp_params = false; RtcEventLog* event_log = nullptr; + // Factory for SCTP transports. + cricket::SctpTransportInternalFactory* sctp_factory = nullptr; + // Whether media transport is used for media. bool use_media_transport_for_media = false; @@ -169,6 +159,8 @@ class JsepTransportController : public sigslot::has_slots<> { // Gets the externally sharable version of the DtlsTransport. rtc::scoped_refptr LookupDtlsTransportByMid( const std::string& mid); + rtc::scoped_refptr GetSctpTransport( + const std::string& mid) const; MediaTransportConfig GetMediaTransportConfig(const std::string& mid) const; @@ -433,8 +425,7 @@ class JsepTransportController : public sigslot::has_slots<> { const cricket::CandidatePairChangeEvent& event); void OnDataChannelTransportNegotiated_n( cricket::JsepTransport* transport, - DataChannelTransportInterface* data_channel_transport, - bool provisional); + DataChannelTransportInterface* data_channel_transport); void UpdateAggregateStates_n(); diff --git a/pc/jsep_transport_controller_unittest.cc b/pc/jsep_transport_controller_unittest.cc index ef93898092..8461e86b00 100644 --- a/pc/jsep_transport_controller_unittest.cc +++ b/pc/jsep_transport_controller_unittest.cc @@ -311,8 +311,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, MediaTransportInterface* media_transport, - DataChannelTransportInterface* data_channel_transport, - JsepTransportController::NegotiationState negotiation_state) override { + DataChannelTransportInterface* data_channel_transport) override { changed_rtp_transport_by_mid_[mid] = rtp_transport; if (dtls_transport) { changed_dtls_transport_by_mid_[mid] = dtls_transport->internal(); diff --git a/pc/jsep_transport_unittest.cc b/pc/jsep_transport_unittest.cc index 123482c347..00f58f64a0 100644 --- a/pc/jsep_transport_unittest.cc +++ b/pc/jsep_transport_unittest.cc @@ -109,8 +109,10 @@ class JsepTransport2Test : public ::testing::Test, public sigslot::has_slots<> { std::move(sdes_transport), std::move(dtls_srtp_transport), /*datagram_rtp_transport=*/nullptr, std::move(rtp_dtls_transport), std::move(rtcp_dtls_transport), + /*sctp_transport=*/nullptr, /*media_transport=*/nullptr, - /*datagram_transport=*/nullptr); + /*datagram_transport=*/nullptr, + /*data_channel_transport=*/nullptr); signal_rtcp_mux_active_received_ = false; jsep_transport->SignalRtcpMuxActive.connect( diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index a80cf5a991..414908150c 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -610,35 +610,6 @@ absl::optional RTCConfigurationToIceConfigOptionalInt( return rtc_configuration_parameter; } -cricket::DataMessageType ToCricketDataMessageType(DataMessageType type) { - switch (type) { - case DataMessageType::kText: - return cricket::DMT_TEXT; - case DataMessageType::kBinary: - return cricket::DMT_BINARY; - case DataMessageType::kControl: - return cricket::DMT_CONTROL; - default: - return cricket::DMT_NONE; - } - return cricket::DMT_NONE; -} - -DataMessageType ToWebrtcDataMessageType(cricket::DataMessageType type) { - switch (type) { - case cricket::DMT_TEXT: - return DataMessageType::kText; - case cricket::DMT_BINARY: - return DataMessageType::kBinary; - case cricket::DMT_CONTROL: - return DataMessageType::kControl; - case cricket::DMT_NONE: - default: - RTC_NOTREACHED(); - } - return DataMessageType::kControl; -} - void ReportSimulcastApiVersion(const char* name, const SessionDescription& session) { bool has_legacy = false; @@ -923,6 +894,7 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, remote_streams_(StreamCollection::Create()), call_(std::move(call)), call_ptr_(call_.get()), + data_channel_transport_(nullptr), local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()) {} PeerConnection::~PeerConnection() { @@ -949,7 +921,6 @@ PeerConnection::~PeerConnection() { RTC_LOG(LS_INFO) << "Session: " << session_id() << " is destroyed."; webrtc_session_desc_factory_.reset(); - sctp_invoker_.reset(); sctp_factory_.reset(); data_channel_transport_invoker_.reset(); transport_controller_.reset(); @@ -1146,6 +1117,64 @@ bool PeerConnection::Initialize( config.media_transport_factory = factory_->media_transport_factory(); } + // Obtain a certificate from RTCConfiguration if any were provided (optional). + rtc::scoped_refptr certificate; + if (!configuration.certificates.empty()) { + // TODO(hbos,torbjorng): Decide on certificate-selection strategy instead of + // just picking the first one. The decision should be made based on the DTLS + // handshake. The DTLS negotiations need to know about all certificates. + certificate = configuration.certificates[0]; + } + + if (options.disable_encryption) { + dtls_enabled_ = false; + } else { + // Enable DTLS by default if we have an identity store or a certificate. + dtls_enabled_ = (dependencies.cert_generator || certificate); + // |configuration| can override the default |dtls_enabled_| value. + if (configuration.enable_dtls_srtp) { + dtls_enabled_ = *(configuration.enable_dtls_srtp); + } + } + + sctp_factory_ = factory_->CreateSctpTransportInternalFactory(); + + if (use_datagram_transport_for_data_channels_) { + if (configuration.enable_rtp_data_channel) { + RTC_LOG(LS_ERROR) << "enable_rtp_data_channel and " + "use_datagram_transport_for_data_channels are " + "incompatible and cannot both be set to true"; + return false; + } + if (configuration.enable_dtls_srtp && !*configuration.enable_dtls_srtp) { + RTC_LOG(LS_INFO) << "Using data channel transport with no fallback"; + data_channel_type_ = cricket::DCT_DATA_CHANNEL_TRANSPORT; + } else { + RTC_LOG(LS_INFO) << "Using data channel transport with fallback to SCTP"; + data_channel_type_ = cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP; + config.sctp_factory = sctp_factory_.get(); + } + } else if (configuration.use_media_transport_for_data_channels) { + if (configuration.enable_rtp_data_channel) { + RTC_LOG(LS_ERROR) << "enable_rtp_data_channel and " + "use_media_transport_for_data_channels are " + "incompatible and cannot both be set to true"; + return false; + } + data_channel_type_ = cricket::DCT_MEDIA_TRANSPORT; + } else if (configuration.enable_rtp_data_channel) { + // Enable creation of RTP data channels if the kEnableRtpDataChannels is + // set. It takes precendence over the disable_sctp_data_channels + // PeerConnectionFactoryInterface::Options. + data_channel_type_ = cricket::DCT_RTP; + } else { + // DTLS has to be enabled to use SCTP. + if (!options.disable_sctp_data_channels && dtls_enabled_) { + data_channel_type_ = cricket::DCT_SCTP; + config.sctp_factory = sctp_factory_.get(); + } + } + transport_controller_.reset(new JsepTransportController( signaling_thread(), network_thread(), port_allocator_.get(), async_resolver_factory_.get(), config)); @@ -1168,70 +1197,14 @@ bool PeerConnection::Initialize( transport_controller_->SignalIceCandidatePairChanged.connect( this, &PeerConnection::OnTransportControllerCandidateChanged); - sctp_factory_ = factory_->CreateSctpTransportInternalFactory(); - stats_.reset(new StatsCollector(this)); stats_collector_ = RTCStatsCollector::Create(this); configuration_ = configuration; use_media_transport_ = configuration.use_media_transport; - // Obtain a certificate from RTCConfiguration if any were provided (optional). - rtc::scoped_refptr certificate; - if (!configuration.certificates.empty()) { - // TODO(hbos,torbjorng): Decide on certificate-selection strategy instead of - // just picking the first one. The decision should be made based on the DTLS - // handshake. The DTLS negotiations need to know about all certificates. - certificate = configuration.certificates[0]; - } - transport_controller_->SetIceConfig(ParseIceConfig(configuration)); - if (options.disable_encryption) { - dtls_enabled_ = false; - } else { - // Enable DTLS by default if we have an identity store or a certificate. - dtls_enabled_ = (dependencies.cert_generator || certificate); - // |configuration| can override the default |dtls_enabled_| value. - if (configuration.enable_dtls_srtp) { - dtls_enabled_ = *(configuration.enable_dtls_srtp); - } - } - - if (use_datagram_transport_for_data_channels_) { - if (configuration.enable_rtp_data_channel) { - RTC_LOG(LS_ERROR) << "enable_rtp_data_channel and " - "use_datagram_transport_for_data_channels are " - "incompatible and cannot both be set to true"; - return false; - } - if (configuration.enable_dtls_srtp && !*configuration.enable_dtls_srtp) { - RTC_LOG(LS_INFO) << "Using data channel transport with no fallback"; - data_channel_type_ = cricket::DCT_DATA_CHANNEL_TRANSPORT; - } else { - RTC_LOG(LS_INFO) << "Using data channel transport with fallback to SCTP"; - data_channel_type_ = cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP; - } - } else if (configuration.use_media_transport_for_data_channels) { - if (configuration.enable_rtp_data_channel) { - RTC_LOG(LS_ERROR) << "enable_rtp_data_channel and " - "use_media_transport_for_data_channels are " - "incompatible and cannot both be set to true"; - return false; - } - data_channel_type_ = cricket::DCT_MEDIA_TRANSPORT; - } else if (configuration.enable_rtp_data_channel) { - // Enable creation of RTP data channels if the kEnableRtpDataChannels is - // set. It takes precendence over the disable_sctp_data_channels - // PeerConnectionFactoryInterface::Options. - data_channel_type_ = cricket::DCT_RTP; - } else { - // DTLS has to be enabled to use SCTP. - if (!options.disable_sctp_data_channels && dtls_enabled_) { - data_channel_type_ = cricket::DCT_SCTP; - } - } - video_options_.screencast_min_bitrate_kbps = configuration.screencast_min_bitrate; audio_options_.combined_audio_video_bwe = @@ -3223,7 +3196,7 @@ RTCError PeerConnection::UpdateDataChannel( RTC_LOG(LS_INFO) << "Rejected data channel, mid=" << content.mid(); DestroyDataChannel(); } else { - if (!rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { + if (!rtp_data_channel_ && !data_channel_transport_) { RTC_LOG(LS_INFO) << "Creating data channel, mid=" << content.mid(); if (!CreateDataChannel(content.name)) { LOG_AND_RETURN_ERROR(RTCErrorType::INTERNAL_ERROR, @@ -3957,7 +3930,10 @@ PeerConnection::LookupDtlsTransportByMidInternal(const std::string& mid) { rtc::scoped_refptr PeerConnection::GetSctpTransport() const { RTC_DCHECK_RUN_ON(signaling_thread()); - return sctp_transport_; + if (!sctp_mid_) { + return nullptr; + } + return transport_controller_->GetSctpTransport(*sctp_mid_); } const SessionDescriptionInterface* PeerConnection::local_description() const { @@ -5732,19 +5708,18 @@ bool PeerConnection::GetSctpSslRole(rtc::SSLRole* role) { "SSL Role of the SCTP transport."; return false; } - if (!sctp_transport_ && !data_channel_transport_) { + if (!data_channel_transport_) { RTC_LOG(LS_INFO) << "Non-rejected SCTP m= section is needed to get the " "SSL Role of the SCTP transport."; return false; } absl::optional dtls_role; - if (sctp_mid_ && sctp_transport_) { + if (sctp_mid_) { dtls_role = transport_controller_->GetDtlsRole(*sctp_mid_); - } else if (is_caller_) { - dtls_role = *is_caller_ ? rtc::SSL_SERVER : rtc::SSL_CLIENT; - } - if (dtls_role) { + if (!dtls_role && is_caller_.has_value()) { + dtls_role = *is_caller_ ? rtc::SSL_SERVER : rtc::SSL_CLIENT; + } *role = *dtls_role; return true; } @@ -5870,12 +5845,14 @@ RTCError PeerConnection::PushdownMediaDescription( // Need complete offer/answer with an SCTP m= section before starting SCTP, // according to https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-19 - if (sctp_transport_ && local_description() && remote_description()) { + if (sctp_mid_ && local_description() && remote_description()) { + rtc::scoped_refptr sctp_transport = + transport_controller_->GetSctpTransport(*sctp_mid_); auto local_sctp_description = cricket::GetFirstSctpDataContentDescription( local_description()->description()); auto remote_sctp_description = cricket::GetFirstSctpDataContentDescription( remote_description()->description()); - if (local_sctp_description && remote_sctp_description) { + if (sctp_transport && local_sctp_description && remote_sctp_description) { int max_message_size; // A remote max message size of zero means "any size supported". // We configure the connection with our own max message size. @@ -5886,8 +5863,8 @@ RTCError PeerConnection::PushdownMediaDescription( std::min(local_sctp_description->max_message_size(), remote_sctp_description->max_message_size()); } - sctp_transport_->Start(local_sctp_description->port(), - remote_sctp_description->port(), max_message_size); + sctp_transport->Start(local_sctp_description->port(), + remote_sctp_description->port(), max_message_size); } } @@ -5975,7 +5952,7 @@ bool PeerConnection::SendData(const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (data_channel_transport_ && data_channel_transport_negotiated_) { + if (data_channel_transport_) { SendDataParams send_params; send_params.type = ToWebrtcDataMessageType(params.type); send_params.ordered = params.ordered; @@ -5984,12 +5961,24 @@ bool PeerConnection::SendData(const cricket::SendDataParams& params, } else if (params.max_rtx_ms >= 0) { send_params.max_rtx_ms = params.max_rtx_ms; } - return data_channel_transport_->SendData(params.sid, send_params, payload) - .ok(); - } else if (sctp_transport_ && sctp_negotiated_) { - return network_thread()->Invoke( - RTC_FROM_HERE, Bind(&cricket::SctpTransportInternal::SendData, - cricket_sctp_transport(), params, payload, result)); + + RTCError error = network_thread()->Invoke( + RTC_FROM_HERE, [this, params, send_params, payload] { + return data_channel_transport_->SendData(params.sid, send_params, + payload); + }); + + if (error.ok()) { + *result = cricket::SendDataResult::SDR_SUCCESS; + return true; + } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) { + // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked. + // TODO(mellem): Stop using RTCError here and get rid of the mapping. + *result = cricket::SendDataResult::SDR_BLOCK; + return false; + } + *result = cricket::SendDataResult::SDR_ERROR; + return false; } else if (rtp_data_channel_) { return rtp_data_channel_->SendData(params, payload, result); } @@ -5999,7 +5988,7 @@ bool PeerConnection::SendData(const cricket::SendDataParams& params, bool PeerConnection::ConnectDataChannel(DataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { + if (!rtp_data_channel_ && !data_channel_transport_) { // Don't log an error here, because DataChannels are expected to call // ConnectDataChannel in this state. It's the only way to initially tell // whether or not the underlying transport is ready. @@ -6021,22 +6010,12 @@ bool PeerConnection::ConnectDataChannel(DataChannel* webrtc_data_channel) { rtp_data_channel_->SignalDataReceived.connect(webrtc_data_channel, &DataChannel::OnDataReceived); } - if (sctp_transport_) { - SignalSctpReadyToSendData.connect(webrtc_data_channel, - &DataChannel::OnChannelReady); - SignalSctpDataReceived.connect(webrtc_data_channel, - &DataChannel::OnDataReceived); - SignalSctpClosingProcedureStartedRemotely.connect( - webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); - SignalSctpClosingProcedureComplete.connect( - webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); - } return true; } void PeerConnection::DisconnectDataChannel(DataChannel* webrtc_data_channel) { RTC_DCHECK_RUN_ON(signaling_thread()); - if (!rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { + if (!rtp_data_channel_ && !data_channel_transport_) { RTC_LOG(LS_ERROR) << "DisconnectDataChannel called when rtp_data_channel_ and " "sctp_transport_ are NULL."; @@ -6052,48 +6031,32 @@ void PeerConnection::DisconnectDataChannel(DataChannel* webrtc_data_channel) { rtp_data_channel_->SignalReadyToSendData.disconnect(webrtc_data_channel); rtp_data_channel_->SignalDataReceived.disconnect(webrtc_data_channel); } - if (sctp_transport_) { - SignalSctpReadyToSendData.disconnect(webrtc_data_channel); - SignalSctpDataReceived.disconnect(webrtc_data_channel); - SignalSctpClosingProcedureStartedRemotely.disconnect(webrtc_data_channel); - SignalSctpClosingProcedureComplete.disconnect(webrtc_data_channel); - } } void PeerConnection::AddSctpDataStream(int sid) { if (data_channel_transport_) { - data_channel_transport_->OpenChannel(sid); + network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { + if (data_channel_transport_) { + data_channel_transport_->OpenChannel(sid); + } + }); } - if (!sctp_transport_) { - RTC_LOG(LS_ERROR) - << "AddSctpDataStream called when sctp_transport_ is NULL."; - return; - } - network_thread()->Invoke( - RTC_FROM_HERE, rtc::Bind(&cricket::SctpTransportInternal::OpenStream, - cricket_sctp_transport(), sid)); } void PeerConnection::RemoveSctpDataStream(int sid) { if (data_channel_transport_) { - data_channel_transport_->CloseChannel(sid); + network_thread()->Invoke(RTC_FROM_HERE, [this, sid] { + if (data_channel_transport_) { + data_channel_transport_->CloseChannel(sid); + } + }); } - if (!sctp_transport_) { - RTC_LOG(LS_ERROR) << "RemoveSctpDataStream called when sctp_transport_ is " - "NULL."; - return; - } - network_thread()->Invoke( - RTC_FROM_HERE, rtc::Bind(&cricket::SctpTransportInternal::ResetStream, - cricket_sctp_transport(), sid)); } bool PeerConnection::ReadyToSendData() const { RTC_DCHECK_RUN_ON(signaling_thread()); return (rtp_data_channel_ && rtp_data_channel_->ready_to_send_data()) || - (data_channel_transport_ && data_channel_transport_ready_to_send_ && - data_channel_transport_negotiated_) || - (sctp_ready_to_send_data_ && sctp_negotiated_); + (data_channel_transport_ && data_channel_transport_ready_to_send_); } void PeerConnection::OnDataReceived(int channel_id, @@ -6136,10 +6099,8 @@ void PeerConnection::OnReadyToSend() { RTC_FROM_HERE, signaling_thread(), [this] { RTC_DCHECK_RUN_ON(signaling_thread()); data_channel_transport_ready_to_send_ = true; - if (data_channel_transport_negotiated_) { - SignalDataChannelTransportWritable_s( - data_channel_transport_ready_to_send_); - } + SignalDataChannelTransportWritable_s( + data_channel_transport_ready_to_send_); }); } @@ -6179,7 +6140,7 @@ std::map PeerConnection::GetTransportNamesByMid() transport_names_by_mid[rtp_data_channel_->content_name()] = rtp_data_channel_->transport_name(); } - if (sctp_transport_) { + if (data_channel_transport_) { absl::optional transport_name = sctp_transport_name(); RTC_DCHECK(transport_name); transport_names_by_mid[*sctp_mid_] = *transport_name; @@ -6550,7 +6511,7 @@ RTCError PeerConnection::CreateChannels(const SessionDescription& desc) { const cricket::ContentInfo* data = cricket::GetFirstDataContent(&desc); if (data_channel_type_ != cricket::DCT_NONE && data && !data->rejected && - !rtp_data_channel_ && !sctp_transport_ && !data_channel_transport_) { + !rtp_data_channel_ && !data_channel_transport_) { if (!CreateDataChannel(data->name)) { LOG_AND_RETURN_ERROR(RTCErrorType::INTERNAL_ERROR, "Failed to create data channel."); @@ -6610,32 +6571,21 @@ cricket::VideoChannel* PeerConnection::CreateVideoChannel( bool PeerConnection::CreateDataChannel(const std::string& mid) { switch (data_channel_type_) { case cricket::DCT_SCTP: - // Only using SCTP transport. No more setup required. Since SCTP is - // the only option, it doesn't need to wait for negotiation. - sctp_negotiated_ = true; - if (!CreateSctpDataChannel(mid)) { - return false; - } - break; case cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP: - // Setup a data channel transport with SCTP as a fallback. Which one is - // used will be determined later in negotiation. - if (!CreateSctpDataChannel(mid)) { - return false; - } - if (!SetupDataChannelTransport(mid)) { - return false; - } - break; case cricket::DCT_DATA_CHANNEL_TRANSPORT: case cricket::DCT_MEDIA_TRANSPORT: - // Using data channel transport without a fallback. It is the only - // option. Data channel transport doesn't need to be negotiated. - data_channel_transport_negotiated_ = true; - if (!SetupDataChannelTransport(mid)) { + if (!network_thread()->Invoke( + RTC_FROM_HERE, + rtc::Bind(&PeerConnection::SetupDataChannelTransport_n, this, + mid))) { return false; } - break; + + // All non-RTP data channels must initialize |sctp_data_channels_|. + for (const auto& channel : sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } + return true; case cricket::DCT_RTP: default: RtpTransportInternal* rtp_transport = GetRtpTransport(mid); @@ -6652,36 +6602,7 @@ bool PeerConnection::CreateDataChannel(const std::string& mid) { rtp_data_channel_->SetRtpTransport(rtp_transport); return true; } - - // All non-RTP data channels must initialize |sctp_data_channels_|. - for (const auto& channel : sctp_data_channels_) { - channel->OnTransportChannelCreated(); - } - return true; -} - -bool PeerConnection::CreateSctpDataChannel(const std::string& mid) { - if (!sctp_factory_) { - RTC_LOG(LS_ERROR) - << "Trying to create SCTP transport, but didn't compile with " - "SCTP support (HAVE_SCTP)"; - return false; - } - if (!network_thread()->Invoke( - RTC_FROM_HERE, - rtc::Bind(&PeerConnection::CreateSctpTransport_n, this, mid))) { - return false; - } - return true; -} - -bool PeerConnection::SetupDataChannelTransport(const std::string& mid) { - if (!network_thread()->Invoke( - RTC_FROM_HERE, - rtc::Bind(&PeerConnection::SetupDataChannelTransport_n, this, mid))) { - return false; - } - return true; + return false; } Call::Stats PeerConnection::GetCallStats() { @@ -6697,124 +6618,10 @@ Call::Stats PeerConnection::GetCallStats() { } } -bool PeerConnection::CreateSctpTransport_n(const std::string& mid) { - RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(sctp_factory_); - RTC_LOG(LS_INFO) << "Creating SCTP transport for mid=" << mid; - - rtc::scoped_refptr webrtc_dtls_transport = - transport_controller_->LookupDtlsTransportByMid(mid); - cricket::DtlsTransportInternal* dtls_transport = - webrtc_dtls_transport->internal(); - RTC_DCHECK(dtls_transport); - std::unique_ptr cricket_sctp_transport = - sctp_factory_->CreateSctpTransport(dtls_transport); - RTC_DCHECK(cricket_sctp_transport); - sctp_invoker_.reset(new rtc::AsyncInvoker()); - cricket_sctp_transport->SignalReadyToSendData.connect( - this, &PeerConnection::OnSctpTransportReadyToSendData_n); - cricket_sctp_transport->SignalDataReceived.connect( - this, &PeerConnection::OnSctpTransportDataReceived_n); - // TODO(deadbeef): All we do here is AsyncInvoke to fire the signal on - // another thread. Would be nice if there was a helper class similar to - // sigslot::repeater that did this for us, eliminating a bunch of boilerplate - // code. - cricket_sctp_transport->SignalClosingProcedureStartedRemotely.connect( - this, &PeerConnection::OnSctpClosingProcedureStartedRemotely_n); - cricket_sctp_transport->SignalClosingProcedureComplete.connect( - this, &PeerConnection::OnSctpClosingProcedureComplete_n); - sctp_mid_ = mid; - sctp_transport_ = new rtc::RefCountedObject( - std::move(cricket_sctp_transport)); - sctp_transport_->SetDtlsTransport(std::move(webrtc_dtls_transport)); - return true; -} - -void PeerConnection::DestroySctpTransport_n() { - RTC_DCHECK_RUN_ON(network_thread()); - RTC_LOG(LS_INFO) << "Destroying SCTP transport for mid=" << *sctp_mid_; - - sctp_transport_->Clear(); - sctp_transport_ = nullptr; - // |sctp_mid_| may still be active through a data channel transport. If not, - // unset it. - if (!data_channel_transport_) { - sctp_mid_.reset(); - } - sctp_invoker_.reset(nullptr); -} - -void PeerConnection::OnSctpTransportReadyToSendData_n() { - RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || - data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); - // Note: Cannot use rtc::Bind here because it will grab a reference to - // PeerConnection and potentially cause PeerConnection to live longer than - // expected. It is safe not to grab a reference since the sctp_invoker_ will - // be destroyed before PeerConnection is destroyed, and at that point all - // pending tasks will be cleared. - sctp_invoker_->AsyncInvoke(RTC_FROM_HERE, signaling_thread(), [this] { - OnSctpTransportReadyToSendData_s(true); - }); -} - -void PeerConnection::OnSctpTransportReadyToSendData_s(bool ready) { - RTC_DCHECK_RUN_ON(signaling_thread()); - sctp_ready_to_send_data_ = ready; - if (sctp_negotiated_) { - SignalSctpReadyToSendData(ready); - } -} - -void PeerConnection::OnSctpTransportDataReceived_n( - const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& payload) { - RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || - data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); - // Note: Cannot use rtc::Bind here because it will grab a reference to - // PeerConnection and potentially cause PeerConnection to live longer than - // expected. It is safe not to grab a reference since the sctp_invoker_ will - // be destroyed before PeerConnection is destroyed, and at that point all - // pending tasks will be cleared. - sctp_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, params, payload] { - OnSctpTransportDataReceived_s(params, payload); - }); -} - -void PeerConnection::OnSctpTransportDataReceived_s( - const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& payload) { - RTC_DCHECK_RUN_ON(signaling_thread()); - if (!HandleOpenMessage_s(params, payload)) { - SignalSctpDataReceived(params, payload); - } -} - -void PeerConnection::OnSctpClosingProcedureStartedRemotely_n(int sid) { - RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || - data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); - sctp_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), - rtc::Bind(&sigslot::signal1::operator(), - &SignalSctpClosingProcedureStartedRemotely, sid)); -} - -void PeerConnection::OnSctpClosingProcedureComplete_n(int sid) { - RTC_DCHECK_RUN_ON(network_thread()); - RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP || - data_channel_type_ == cricket::DCT_DATA_CHANNEL_TRANSPORT_SCTP); - sctp_invoker_->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), - rtc::Bind(&sigslot::signal1::operator(), - &SignalSctpClosingProcedureComplete, sid)); -} - bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) { - data_channel_transport_ = transport_controller_->GetDataChannelTransport(mid); - if (!data_channel_transport_) { + DataChannelTransportInterface* transport = + transport_controller_->GetDataChannelTransport(mid); + if (!transport) { RTC_LOG(LS_ERROR) << "Data channel transport is not available for data channels, mid=" << mid; @@ -6822,14 +6629,20 @@ bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) { } RTC_LOG(LS_INFO) << "Setting up data channel transport for mid=" << mid; + data_channel_transport_ = transport; data_channel_transport_invoker_ = std::make_unique(); - data_channel_transport_->SetDataSink(this); sctp_mid_ = mid; + + // Note: setting the data sink and checking initial state must be done last, + // after setting up the data channel. Setting the data sink may trigger + // callbacks to PeerConnection which require the transport to be completely + // set up (eg. OnReadyToSend()). + transport->SetDataSink(this); return true; } void PeerConnection::TeardownDataChannelTransport_n() { - if (!data_channel_transport_) { + if (!sctp_mid_ && !data_channel_transport_) { return; } RTC_LOG(LS_INFO) << "Tearing down data channel transport for mid=" @@ -6837,11 +6650,11 @@ void PeerConnection::TeardownDataChannelTransport_n() { // |sctp_mid_| may still be active through an SCTP transport. If not, unset // it. - if (!sctp_transport_) { - sctp_mid_.reset(); - } - data_channel_transport_->SetDataSink(nullptr); + sctp_mid_.reset(); data_channel_transport_invoker_ = nullptr; + if (data_channel_transport_) { + data_channel_transport_->SetDataSink(nullptr); + } data_channel_transport_ = nullptr; } @@ -7361,7 +7174,7 @@ const std::string PeerConnection::GetTransportName( if (channel) { return channel->transport_name(); } - if (sctp_transport_) { + if (data_channel_transport_) { RTC_DCHECK(sctp_mid_); if (content_name == *sctp_mid_) { return *sctp_transport_name(); @@ -7396,14 +7209,7 @@ void PeerConnection::DestroyDataChannel() { // been destroyed (since it is a subclass of PeerConnection) and using // rtc::Bind will cause "Pure virtual function called" error to appear. - if (sctp_transport_) { - OnDataChannelDestroyed(); - network_thread()->Invoke(RTC_FROM_HERE, - [this] { DestroySctpTransport_n(); }); - sctp_ready_to_send_data_ = false; - } - - if (data_channel_transport_) { + if (sctp_mid_) { OnDataChannelDestroyed(); network_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(network_thread()); @@ -7439,8 +7245,7 @@ bool PeerConnection::OnTransportChanged( RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, MediaTransportInterface* media_transport, - DataChannelTransportInterface* data_channel_transport, - JsepTransportController::NegotiationState negotiation_state) { + DataChannelTransportInterface* data_channel_transport) { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUNS_SERIALIZED(&use_media_transport_race_checker_); bool ret = true; @@ -7448,53 +7253,30 @@ bool PeerConnection::OnTransportChanged( if (base_channel) { ret = base_channel->SetRtpTransport(rtp_transport); } - if (sctp_transport_ && mid == sctp_mid_) { - sctp_transport_->SetDtlsTransport(dtls_transport); - } if (use_media_transport_) { RTC_LOG(LS_ERROR) << "Media transport isn't supported."; } - if (mid == sctp_mid_) { - switch (negotiation_state) { - case JsepTransportController::NegotiationState::kFinal: - if (data_channel_transport) { - if (sctp_transport_) { - DestroySctpTransport_n(); - } - } else { - TeardownDataChannelTransport_n(); - } - // We also need to mark the remaining transport as ready-to-send. - RTC_FALLTHROUGH(); - case JsepTransportController::NegotiationState::kProvisional: { - rtc::AsyncInvoker* invoker = data_channel_transport_invoker_ - ? data_channel_transport_invoker_.get() - : sctp_invoker_.get(); - if (!invoker) { - break; // Have neither SCTP nor DataChannelTransport, nothing to do. - } - invoker->AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, data_channel_transport] { - RTC_DCHECK_RUN_ON(signaling_thread()); - if (data_channel_transport) { - data_channel_transport_negotiated_ = true; - if (data_channel_transport_ready_to_send_) { - SignalDataChannelTransportWritable_s( - data_channel_transport_ready_to_send_); - } - } else { - sctp_negotiated_ = true; - if (sctp_ready_to_send_data_) { - SignalSctpReadyToSendData(sctp_ready_to_send_data_); - } - } - }); - } break; - case JsepTransportController::NegotiationState::kInitial: - // Negotiation isn't finished. Nothing to do here. - break; + if (data_channel_transport_ && mid == sctp_mid_ && + data_channel_transport_ != data_channel_transport) { + // Changed which data channel transport is used for |sctp_mid_| (eg. now + // it's bundled). + data_channel_transport_->SetDataSink(nullptr); + data_channel_transport_ = data_channel_transport; + if (data_channel_transport) { + data_channel_transport->SetDataSink(this); + + // There's a new data channel transport. This needs to be signaled to the + // |sctp_data_channels_| so that they can reopen and reconnect. This is + // necessary when bundling is applied. + data_channel_transport_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + for (auto channel : sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } + }); } } diff --git a/pc/peer_connection.h b/pc/peer_connection.h index da72687a70..393beedaa3 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -1024,28 +1024,6 @@ class PeerConnection : public PeerConnectionInternal, cricket::VideoChannel* CreateVideoChannel(const std::string& mid) RTC_RUN_ON(signaling_thread()); bool CreateDataChannel(const std::string& mid) RTC_RUN_ON(signaling_thread()); - bool CreateSctpDataChannel(const std::string& mid) - RTC_RUN_ON(signaling_thread()); - bool SetupDataChannelTransport(const std::string& mid) - RTC_RUN_ON(signaling_thread()); - - bool CreateSctpTransport_n(const std::string& mid); - // For bundling. - void DestroySctpTransport_n(); - // SctpTransport signal handlers. Needed to marshal signals from the network - // to signaling thread. - void OnSctpTransportReadyToSendData_n(); - // This may be called with "false" if the direction of the m= section causes - // us to tear down the SCTP connection. - void OnSctpTransportReadyToSendData_s(bool ready); - void OnSctpTransportDataReceived_n(const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& payload); - // Beyond just firing the signal to the signaling thread, listens to SCTP - // CONTROL messages on unused SIDs and processes them as OPEN messages. - void OnSctpTransportDataReceived_s(const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& payload); - void OnSctpClosingProcedureStartedRemotely_n(int sid); - void OnSctpClosingProcedureComplete_n(int sid); bool SetupDataChannelTransport_n(const std::string& mid) RTC_RUN_ON(network_thread()); @@ -1157,8 +1135,7 @@ class PeerConnection : public PeerConnectionInternal, RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, MediaTransportInterface* media_transport, - DataChannelTransportInterface* data_channel_transport, - JsepTransportController::NegotiationState negotiation_state) override; + DataChannelTransportInterface* data_channel_transport) override; // RtpSenderBase::SetStreamsObserver override. void OnSetStreams() override; @@ -1331,13 +1308,6 @@ class PeerConnection : public PeerConnectionInternal, nullptr; // TODO(bugs.webrtc.org/9987): Accessed on both // signaling and some other thread. - cricket::SctpTransportInternal* cricket_sctp_transport() { - return sctp_transport_->internal(); - } - rtc::scoped_refptr - sctp_transport_; // TODO(bugs.webrtc.org/9987): Accessed on both - // signaling and network thread. - // |sctp_mid_| is the content name (MID) in SDP. // Note: this is used as the data channel MID by both SCTP and data channel // transports. It is set when either transport is initialized and unset when @@ -1346,56 +1316,25 @@ class PeerConnection : public PeerConnectionInternal, sctp_mid_; // TODO(bugs.webrtc.org/9987): Accessed on both signaling // and network thread. - // Value cached on signaling thread. Only updated when SctpReadyToSendData - // fires on the signaling thread. - bool sctp_ready_to_send_data_ RTC_GUARDED_BY(signaling_thread()) = false; - - // Whether the use of SCTP has been successfully negotiated. - bool sctp_negotiated_ RTC_GUARDED_BY(signaling_thread()) = false; - - // Same as signals provided by SctpTransport, but these are guaranteed to - // fire on the signaling thread, whereas SctpTransport fires on the networking - // thread. - // |sctp_invoker_| is used so that any signals queued on the signaling thread - // from the network thread are immediately discarded if the SctpTransport is - // destroyed (due to m= section being rejected). - // TODO(deadbeef): Use a proxy object to ensure that method calls/signals - // are marshalled to the right thread. Could almost use proxy.h for this, - // but it doesn't have a mechanism for marshalling sigslot::signals - std::unique_ptr sctp_invoker_ - RTC_GUARDED_BY(network_thread()); - sigslot::signal1 SignalSctpReadyToSendData - RTC_GUARDED_BY(signaling_thread()); - sigslot::signal2 - SignalSctpDataReceived RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalSctpClosingProcedureStartedRemotely - RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalSctpClosingProcedureComplete - RTC_GUARDED_BY(signaling_thread()); - // Whether this peer is the caller. Set when the local description is applied. absl::optional is_caller_ RTC_GUARDED_BY(signaling_thread()); - // Plugin transport used for data channels. Thread-safe. - DataChannelTransportInterface* data_channel_transport_ = - nullptr; // TODO(bugs.webrtc.org/9987): Object is thread safe, but - // pointer accessed on both signaling and network thread. + // Plugin transport used for data channels. Pointer may be accessed and + // checked from any thread, but the object may only be touched on the + // network thread. + // TODO(bugs.webrtc.org/9987): Accessed on both signaling and network thread. + DataChannelTransportInterface* data_channel_transport_; // Cached value of whether the data channel transport is ready to send. bool data_channel_transport_ready_to_send_ RTC_GUARDED_BY(signaling_thread()) = false; - // Whether the use of the data channel transport has been successfully - // negotiated. - bool data_channel_transport_negotiated_ RTC_GUARDED_BY(signaling_thread()) = - false; - // Used to invoke data channel transport signals on the signaling thread. std::unique_ptr data_channel_transport_invoker_ RTC_GUARDED_BY(network_thread()); - // Identical to the signals for SCTP, but from media transport: + // Signals from |data_channel_transport_|. These are invoked on the signaling + // thread. sigslot::signal1 SignalDataChannelTransportWritable_s RTC_GUARDED_BY(signaling_thread()); sigslot::signal2sctp_transport_factory()->last_fake_sctp_transport()); } +TEST_P(PeerConnectionDataChannelTest, InternalSctpTransportDeletedOnTeardown) { + auto caller = CreatePeerConnectionWithDataChannel(); + + ASSERT_TRUE(caller->SetLocalDescription(caller->CreateOffer())); + EXPECT_TRUE(caller->sctp_transport_factory()->last_fake_sctp_transport()); + + rtc::scoped_refptr sctp_transport = + caller->GetInternalPeerConnection()->GetSctpTransport(); + + caller.reset(); + EXPECT_EQ(static_cast(sctp_transport.get())->internal(), + nullptr); +} + // Test that sctp_content_name/sctp_transport_name (used for stats) are correct // before and after BUNDLE is negotiated. TEST_P(PeerConnectionDataChannelTest, SctpContentAndTransportNameSetCorrectly) { diff --git a/pc/peer_connection_integrationtest.cc b/pc/peer_connection_integrationtest.cc index 8798278e1b..33fc9b9b9d 100644 --- a/pc/peer_connection_integrationtest.cc +++ b/pc/peer_connection_integrationtest.cc @@ -3641,6 +3641,53 @@ TEST_P(PeerConnectionIntegrationTest, DatagramTransportDataChannelEndToEnd) { kDefaultTimeout); } +// Tests that 'zero-rtt' data channel transports (which are ready-to-send as +// soon as they're created) work correctly. +TEST_P(PeerConnectionIntegrationTest, DatagramTransportDataChannelZeroRtt) { + PeerConnectionInterface::RTCConfiguration rtc_config; + rtc_config.rtcp_mux_policy = PeerConnectionInterface::kRtcpMuxPolicyRequire; + rtc_config.bundle_policy = PeerConnectionInterface::kBundlePolicyMaxBundle; + rtc_config.use_datagram_transport_for_data_channels = true; + rtc_config.enable_dtls_srtp = false; // SDES is required for media transport. + ASSERT_TRUE(CreatePeerConnectionWrappersWithConfigAndMediaTransportFactory( + rtc_config, rtc_config, loopback_media_transports()->first_factory(), + loopback_media_transports()->second_factory())); + ConnectFakeSignaling(); + + // Ensure that the callee's media transport is ready-to-send immediately. + // Note that only the callee can become writable in zero RTTs. The caller + // must wait for the callee's answer. + loopback_media_transports()->SetSecondStateAfterConnect( + webrtc::MediaTransportState::kWritable); + loopback_media_transports()->FlushAsyncInvokes(); + + // Expect that data channel created on caller side will show up for callee as + // well. + caller()->CreateDataChannel(); + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + + loopback_media_transports()->SetFirstState( + webrtc::MediaTransportState::kWritable); + loopback_media_transports()->FlushAsyncInvokes(); + + // Caller data channel should already exist (it created one). Callee data + // channel may not exist yet, since negotiation happens in-band, not in SDP. + ASSERT_NE(nullptr, caller()->data_channel()); + ASSERT_TRUE_WAIT(callee()->data_channel() != nullptr, kDefaultTimeout); + EXPECT_TRUE_WAIT(caller()->data_observer()->IsOpen(), kDefaultTimeout); + EXPECT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout); + + // Ensure data can be sent in both directions. + std::string data = "hello world"; + caller()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, callee()->data_observer()->last_message(), + kDefaultTimeout); + callee()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, caller()->data_observer()->last_message(), + kDefaultTimeout); +} + // Ensures that when the callee closes a datagram transport data channel, the // closing procedure results in the data channel being closed for the caller // as well. @@ -3799,6 +3846,53 @@ TEST_P(PeerConnectionIntegrationTest, MediaTransportDataChannelEndToEnd) { kDefaultTimeout); } +// Tests that 'zero-rtt' data channel transports (which are ready-to-send as +// soon as they're created) work correctly. +TEST_P(PeerConnectionIntegrationTest, MediaTransportDataChannelZeroRtt) { + PeerConnectionInterface::RTCConfiguration rtc_config; + rtc_config.rtcp_mux_policy = PeerConnectionInterface::kRtcpMuxPolicyRequire; + rtc_config.bundle_policy = PeerConnectionInterface::kBundlePolicyMaxBundle; + rtc_config.use_media_transport_for_data_channels = true; + rtc_config.enable_dtls_srtp = false; // SDES is required for media transport. + ASSERT_TRUE(CreatePeerConnectionWrappersWithConfigAndMediaTransportFactory( + rtc_config, rtc_config, loopback_media_transports()->first_factory(), + loopback_media_transports()->second_factory())); + ConnectFakeSignaling(); + + // Ensure that the callee's media transport is ready-to-send immediately. + // Note that only the callee can become writable in zero RTTs. The caller + // must wait for the callee's answer. + loopback_media_transports()->SetSecondStateAfterConnect( + webrtc::MediaTransportState::kWritable); + loopback_media_transports()->FlushAsyncInvokes(); + + // Expect that data channel created on caller side will show up for callee as + // well. + caller()->CreateDataChannel(); + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + + loopback_media_transports()->SetFirstState( + webrtc::MediaTransportState::kWritable); + loopback_media_transports()->FlushAsyncInvokes(); + + // Caller data channel should already exist (it created one). Callee data + // channel may not exist yet, since negotiation happens in-band, not in SDP. + ASSERT_NE(nullptr, caller()->data_channel()); + ASSERT_TRUE_WAIT(callee()->data_channel() != nullptr, kDefaultTimeout); + EXPECT_TRUE_WAIT(caller()->data_observer()->IsOpen(), kDefaultTimeout); + EXPECT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout); + + // Ensure data can be sent in both directions. + std::string data = "hello world"; + caller()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, callee()->data_observer()->last_message(), + kDefaultTimeout); + callee()->data_channel()->Send(DataBuffer(data)); + EXPECT_EQ_WAIT(data, caller()->data_observer()->last_message(), + kDefaultTimeout); +} + // Ensure that when the callee closes a media transport data channel, the // closing procedure results in the data channel being closed for the caller // as well. diff --git a/pc/sctp_data_channel_transport.cc b/pc/sctp_data_channel_transport.cc new file mode 100644 index 0000000000..d1505f3945 --- /dev/null +++ b/pc/sctp_data_channel_transport.cc @@ -0,0 +1,112 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "pc/sctp_data_channel_transport.h" +#include "pc/sctp_utils.h" + +namespace webrtc { + +SctpDataChannelTransport::SctpDataChannelTransport( + cricket::SctpTransportInternal* sctp_transport) + : sctp_transport_(sctp_transport) { + sctp_transport_->SignalReadyToSendData.connect( + this, &SctpDataChannelTransport::OnReadyToSendData); + sctp_transport_->SignalDataReceived.connect( + this, &SctpDataChannelTransport::OnDataReceived); + sctp_transport_->SignalClosingProcedureStartedRemotely.connect( + this, &SctpDataChannelTransport::OnClosingProcedureStartedRemotely); + sctp_transport_->SignalClosingProcedureComplete.connect( + this, &SctpDataChannelTransport::OnClosingProcedureComplete); +} + +RTCError SctpDataChannelTransport::OpenChannel(int channel_id) { + sctp_transport_->OpenStream(channel_id); + return RTCError::OK(); +} + +RTCError SctpDataChannelTransport::SendData( + int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) { + // Map webrtc::SendDataParams to cricket::SendDataParams. + // TODO(mellem): See about unifying these structs. + cricket::SendDataParams sd_params; + sd_params.sid = channel_id; + sd_params.type = ToCricketDataMessageType(params.type); + sd_params.ordered = params.ordered; + sd_params.reliable = !(params.max_rtx_count || params.max_rtx_ms); + sd_params.max_rtx_count = params.max_rtx_count.value_or(-1); + sd_params.max_rtx_ms = params.max_rtx_ms.value_or(-1); + + cricket::SendDataResult result; + sctp_transport_->SendData(sd_params, buffer, &result); + + // TODO(mellem): See about changing the interfaces to not require mapping + // SendDataResult to RTCError and back again. + switch (result) { + case cricket::SendDataResult::SDR_SUCCESS: + return RTCError::OK(); + case cricket::SendDataResult::SDR_BLOCK: { + // Send buffer is full. + ready_to_send_ = false; + return RTCError(RTCErrorType::RESOURCE_EXHAUSTED); + } + case cricket::SendDataResult::SDR_ERROR: + return RTCError(RTCErrorType::NETWORK_ERROR); + } + return RTCError(RTCErrorType::NETWORK_ERROR); +} + +RTCError SctpDataChannelTransport::CloseChannel(int channel_id) { + sctp_transport_->ResetStream(channel_id); + return RTCError::OK(); +} + +void SctpDataChannelTransport::SetDataSink(DataChannelSink* sink) { + sink_ = sink; + if (sink_ && ready_to_send_) { + sink_->OnReadyToSend(); + } +} + +bool SctpDataChannelTransport::IsReadyToSend() const { + return ready_to_send_; +} + +void SctpDataChannelTransport::OnReadyToSendData() { + ready_to_send_ = true; + if (sink_) { + sink_->OnReadyToSend(); + } +} + +void SctpDataChannelTransport::OnDataReceived( + const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) { + if (sink_) { + sink_->OnDataReceived(params.sid, ToWebrtcDataMessageType(params.type), + buffer); + } +} + +void SctpDataChannelTransport::OnClosingProcedureStartedRemotely( + int channel_id) { + if (sink_) { + sink_->OnChannelClosing(channel_id); + } +} + +void SctpDataChannelTransport::OnClosingProcedureComplete(int channel_id) { + if (sink_) { + sink_->OnChannelClosed(channel_id); + } +} + +} // namespace webrtc diff --git a/pc/sctp_data_channel_transport.h b/pc/sctp_data_channel_transport.h new file mode 100644 index 0000000000..2d54be9de8 --- /dev/null +++ b/pc/sctp_data_channel_transport.h @@ -0,0 +1,50 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef PC_SCTP_DATA_CHANNEL_TRANSPORT_H_ +#define PC_SCTP_DATA_CHANNEL_TRANSPORT_H_ + +#include "api/data_channel_transport_interface.h" +#include "media/sctp/sctp_transport_internal.h" +#include "rtc_base/third_party/sigslot/sigslot.h" + +namespace webrtc { + +// SCTP implementation of DataChannelTransportInterface. +class SctpDataChannelTransport : public DataChannelTransportInterface, + public sigslot::has_slots<> { + public: + explicit SctpDataChannelTransport( + cricket::SctpTransportInternal* sctp_transport); + + RTCError OpenChannel(int channel_id) override; + RTCError SendData(int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) override; + RTCError CloseChannel(int channel_id) override; + void SetDataSink(DataChannelSink* sink) override; + bool IsReadyToSend() const override; + + private: + void OnReadyToSendData(); + void OnDataReceived(const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& buffer); + void OnClosingProcedureStartedRemotely(int channel_id); + void OnClosingProcedureComplete(int channel_id); + + cricket::SctpTransportInternal* const sctp_transport_; + + DataChannelSink* sink_ = nullptr; + bool ready_to_send_ = false; +}; + +} // namespace webrtc + +#endif // PC_SCTP_DATA_CHANNEL_TRANSPORT_H_ diff --git a/pc/sctp_utils.cc b/pc/sctp_utils.cc index 7b67fc1839..129ee07a62 100644 --- a/pc/sctp_utils.cc +++ b/pc/sctp_utils.cc @@ -189,4 +189,33 @@ void WriteDataChannelOpenAckMessage(rtc::CopyOnWriteBuffer* payload) { payload->SetData(&data, sizeof(data)); } +cricket::DataMessageType ToCricketDataMessageType(DataMessageType type) { + switch (type) { + case DataMessageType::kText: + return cricket::DMT_TEXT; + case DataMessageType::kBinary: + return cricket::DMT_BINARY; + case DataMessageType::kControl: + return cricket::DMT_CONTROL; + default: + return cricket::DMT_NONE; + } + return cricket::DMT_NONE; +} + +DataMessageType ToWebrtcDataMessageType(cricket::DataMessageType type) { + switch (type) { + case cricket::DMT_TEXT: + return DataMessageType::kText; + case cricket::DMT_BINARY: + return DataMessageType::kBinary; + case cricket::DMT_CONTROL: + return DataMessageType::kControl; + case cricket::DMT_NONE: + default: + RTC_NOTREACHED(); + } + return DataMessageType::kControl; +} + } // namespace webrtc diff --git a/pc/sctp_utils.h b/pc/sctp_utils.h index 468c960949..6d41eb298c 100644 --- a/pc/sctp_utils.h +++ b/pc/sctp_utils.h @@ -14,6 +14,8 @@ #include #include "api/data_channel_interface.h" +#include "api/data_channel_transport_interface.h" +#include "media/base/media_channel.h" namespace rtc { class CopyOnWriteBuffer; @@ -36,6 +38,11 @@ bool WriteDataChannelOpenMessage(const std::string& label, rtc::CopyOnWriteBuffer* payload); void WriteDataChannelOpenAckMessage(rtc::CopyOnWriteBuffer* payload); + +cricket::DataMessageType ToCricketDataMessageType(DataMessageType type); + +DataMessageType ToWebrtcDataMessageType(cricket::DataMessageType type); + } // namespace webrtc #endif // PC_SCTP_UTILS_H_ diff --git a/test/fuzzers/BUILD.gn b/test/fuzzers/BUILD.gn index 2b272aa368..7e4e54553d 100644 --- a/test/fuzzers/BUILD.gn +++ b/test/fuzzers/BUILD.gn @@ -608,7 +608,7 @@ webrtc_fuzzer_test("sctp_utils_fuzzer") { deps = [ "../../api:libjingle_peerconnection_api", "../../pc:libjingle_peerconnection", - "../../pc:peerconnection", + "../../pc:rtc_pc_base", "../../rtc_base:rtc_base_approved", ] } diff --git a/test/peer_scenario/scenario_connection.cc b/test/peer_scenario/scenario_connection.cc index 6c51397d71..2c0ed36323 100644 --- a/test/peer_scenario/scenario_connection.cc +++ b/test/peer_scenario/scenario_connection.cc @@ -45,8 +45,7 @@ class ScenarioIceConnectionImpl : public ScenarioIceConnection, RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, MediaTransportInterface* media_transport, - DataChannelTransportInterface* data_channel_transport, - JsepTransportController::NegotiationState negotiation_state) override; + DataChannelTransportInterface* data_channel_transport) override; void OnRtpPacket(const RtpPacketReceived& packet) override; void OnCandidates(const std::string& mid, @@ -209,8 +208,7 @@ bool ScenarioIceConnectionImpl::OnTransportChanged( RtpTransportInternal* rtp_transport, rtc::scoped_refptr dtls_transport, MediaTransportInterface* media_transport, - DataChannelTransportInterface* data_channel_transport, - JsepTransportController::NegotiationState negotiation_state) { + DataChannelTransportInterface* data_channel_transport) { RTC_DCHECK_RUN_ON(network_thread_); if (rtp_transport == nullptr) { rtp_transport_->UnregisterRtpDemuxerSink(this);