diff --git a/media/BUILD.gn b/media/BUILD.gn index e4a0d89c3a..b26be0635f 100644 --- a/media/BUILD.gn +++ b/media/BUILD.gn @@ -406,7 +406,6 @@ rtc_source_set("rtc_data_sctp_transport_internal") { "../p2p:rtc_p2p", "../rtc_base:copy_on_write_buffer", "../rtc_base:threading", - "../rtc_base/third_party/sigslot", ] } @@ -432,6 +431,7 @@ if (rtc_build_dcsctp) { "../rtc_base:copy_on_write_buffer", "../rtc_base:event_tracer", "../rtc_base:logging", + "../rtc_base:macromagic", "../rtc_base:random", "../rtc_base:socket", "../rtc_base:stringutils", diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc index 3da6709702..c8e6da4f00 100644 --- a/media/sctp/dcsctp_transport.cc +++ b/media/sctp/dcsctp_transport.cc @@ -151,6 +151,19 @@ DcSctpTransport::~DcSctpTransport() { } } +void DcSctpTransport::SetOnConnectedCallback(std::function callback) { + RTC_DCHECK_RUN_ON(network_thread_); + on_connected_callback_ = std::move(callback); +} + +void DcSctpTransport::SetDataChannelSink(DataChannelSink* sink) { + RTC_DCHECK_RUN_ON(network_thread_); + data_channel_sink_ = sink; + if (data_channel_sink_ && ready_to_send_data_) { + data_channel_sink_->OnReadyToSend(); + } +} + void DcSctpTransport::SetDtlsTransport( rtc::PacketTransportInternal* transport) { RTC_DCHECK_RUN_ON(network_thread_); @@ -165,10 +178,9 @@ bool DcSctpTransport::Start(int local_sctp_port, int max_message_size) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK(max_message_size > 0); - - RTC_LOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port - << ", remote=" << remote_sctp_port - << ", max_message_size=" << max_message_size << ")"; + RTC_DLOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port + << ", remote=" << remote_sctp_port + << ", max_message_size=" << max_message_size << ")"; if (!socket_) { dcsctp::DcSctpOptions options; @@ -206,7 +218,7 @@ bool DcSctpTransport::Start(int local_sctp_port, } bool DcSctpTransport::OpenStream(int sid) { - RTC_LOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ")."; + RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ")."; if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid << "): Transport is not started."; @@ -216,7 +228,7 @@ bool DcSctpTransport::OpenStream(int sid) { } bool DcSctpTransport::ResetStream(int sid) { - RTC_LOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ")."; + RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ")."; if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid << "): Transport is not started."; @@ -242,10 +254,9 @@ bool DcSctpTransport::SendData(int sid, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) { RTC_DCHECK_RUN_ON(network_thread_); - - RTC_LOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid - << ", type=" << static_cast(params.type) - << ", length=" << payload.size() << ")."; + RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid + << ", type=" << static_cast(params.type) + << ", length=" << payload.size() << ")."; if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ @@ -308,6 +319,7 @@ bool DcSctpTransport::SendData(int sid, << "->SendData(...): send() failed with error " << dcsctp::ToString(error) << "."; *result = cricket::SDR_ERROR; + break; } return *result == cricket::SDR_SUCCESS; @@ -394,18 +406,21 @@ uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) { } void DcSctpTransport::OnTotalBufferedAmountLow() { + RTC_DCHECK_RUN_ON(network_thread_); if (!ready_to_send_data_) { ready_to_send_data_ = true; - SignalReadyToSendData(); + if (data_channel_sink_) { + data_channel_sink_->OnReadyToSend(); + } } } void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) { RTC_DCHECK_RUN_ON(network_thread_); - RTC_LOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid=" - << message.stream_id().value() - << ", ppid=" << message.ppid().value() - << ", length=" << message.payload().size() << ")."; + RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid=" + << message.stream_id().value() + << ", ppid=" << message.ppid().value() + << ", length=" << message.payload().size() << ")."; cricket::ReceiveDataParams receive_data_params; receive_data_params.sid = message.stream_id().value(); auto type = ToDataMessageType(message.ppid()); @@ -423,7 +438,10 @@ void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) { receive_buffer_.AppendData(message.payload().data(), message.payload().size()); - SignalDataReceived(receive_data_params, receive_buffer_); + if (data_channel_sink_) { + data_channel_sink_->OnDataReceived( + receive_data_params.sid, receive_data_params.type, receive_buffer_); + } } void DcSctpTransport::OnError(dcsctp::ErrorKind error, @@ -444,6 +462,7 @@ void DcSctpTransport::OnError(dcsctp::ErrorKind error, void DcSctpTransport::OnAborted(dcsctp::ErrorKind error, absl::string_view message) { + RTC_DCHECK_RUN_ON(network_thread_); RTC_LOG(LS_ERROR) << debug_name_ << "->OnAborted(error=" << dcsctp::ToString(error) << ", message=" << message << ")."; @@ -455,23 +474,30 @@ void DcSctpTransport::OnAborted(dcsctp::ErrorKind error, if (code.has_value()) { rtc_error.set_sctp_cause_code(static_cast(*code)); } - SignalClosedAbruptly(rtc_error); + if (data_channel_sink_) { + data_channel_sink_->OnTransportClosed(rtc_error); + } } void DcSctpTransport::OnConnected() { - RTC_LOG(LS_INFO) << debug_name_ << "->OnConnected()."; + RTC_DCHECK_RUN_ON(network_thread_); + RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnected()."; ready_to_send_data_ = true; - SignalReadyToSendData(); - SignalAssociationChangeCommunicationUp(); + if (data_channel_sink_) { + data_channel_sink_->OnReadyToSend(); + } + if (on_connected_callback_) { + on_connected_callback_(); + } } void DcSctpTransport::OnClosed() { - RTC_LOG(LS_INFO) << debug_name_ << "->OnClosed()."; + RTC_DLOG(LS_INFO) << debug_name_ << "->OnClosed()."; ready_to_send_data_ = false; } void DcSctpTransport::OnConnectionRestarted() { - RTC_LOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted()."; + RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted()."; } void DcSctpTransport::OnStreamsResetFailed( @@ -488,6 +514,7 @@ void DcSctpTransport::OnStreamsResetFailed( void DcSctpTransport::OnStreamsResetPerformed( rtc::ArrayView outgoing_streams) { + RTC_DCHECK_RUN_ON(network_thread_); for (auto& stream_id : outgoing_streams) { RTC_LOG(LS_INFO) << debug_name_ << "->OnStreamsResetPerformed(...): Outgoing stream reset" @@ -498,7 +525,9 @@ void DcSctpTransport::OnStreamsResetPerformed( if (closing_state.incoming_reset_done) { // When the close was not initiated locally, we can signal the end of the // data channel close procedure when the remote ACKs the reset. - SignalClosingProcedureComplete(stream_id.value()); + if (data_channel_sink_) { + data_channel_sink_->OnChannelClosed(stream_id.value()); + } closing_states_.erase(stream_id); } } @@ -506,6 +535,7 @@ void DcSctpTransport::OnStreamsResetPerformed( void DcSctpTransport::OnIncomingStreamsReset( rtc::ArrayView incoming_streams) { + RTC_DCHECK_RUN_ON(network_thread_); for (auto& stream_id : incoming_streams) { RTC_LOG(LS_INFO) << debug_name_ << "->OnIncomingStreamsReset(...): Incoming stream reset" @@ -519,13 +549,17 @@ void DcSctpTransport::OnIncomingStreamsReset( // direction too. dcsctp::StreamID streams[1] = {stream_id}; socket_->ResetStreams(streams); - SignalClosingProcedureStartedRemotely(stream_id.value()); + if (data_channel_sink_) { + data_channel_sink_->OnChannelClosing(stream_id.value()); + } } if (closing_state.outgoing_reset_done) { // The close procedure that was initiated locally is complete when we // receive and incoming reset event. - SignalClosingProcedureComplete(stream_id.value()); + if (data_channel_sink_) { + data_channel_sink_->OnChannelClosed(stream_id.value()); + } closing_states_.erase(stream_id); } } @@ -557,11 +591,9 @@ void DcSctpTransport::OnTransportWritableState( rtc::PacketTransportInternal* transport) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_EQ(transport_, transport); - - RTC_LOG(LS_VERBOSE) << debug_name_ - << "->OnTransportWritableState(), writable=" - << transport->writable(); - + RTC_DLOG(LS_VERBOSE) << debug_name_ + << "->OnTransportWritableState(), writable=" + << transport->writable(); MaybeConnectSocket(); } @@ -587,8 +619,11 @@ void DcSctpTransport::OnTransportReadPacket( void DcSctpTransport::OnTransportClosed( rtc::PacketTransportInternal* transport) { - RTC_LOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed()."; - SignalClosedAbruptly({}); + RTC_DCHECK_RUN_ON(network_thread_); + RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed()."; + if (data_channel_sink_) { + data_channel_sink_->OnTransportClosed({}); + } } void DcSctpTransport::MaybeConnectSocket() { diff --git a/media/sctp/dcsctp_transport.h b/media/sctp/dcsctp_transport.h index c62a28f3c5..1f71db87c4 100644 --- a/media/sctp/dcsctp_transport.h +++ b/media/sctp/dcsctp_transport.h @@ -30,6 +30,7 @@ #include "rtc_base/random.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread.h" +#include "rtc_base/thread_annotations.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -48,6 +49,8 @@ class DcSctpTransport : public cricket::SctpTransportInternal, ~DcSctpTransport() override; // cricket::SctpTransportInternal + void SetOnConnectedCallback(std::function callback) override; + void SetDataChannelSink(DataChannelSink* sink) override; void SetDtlsTransport(rtc::PacketTransportInternal* transport) override; bool Start(int local_sctp_port, int remote_sctp_port, @@ -128,6 +131,8 @@ class DcSctpTransport : public cricket::SctpTransportInternal, flat_map closing_states_; bool ready_to_send_data_ = false; + std::function on_connected_callback_ RTC_GUARDED_BY(network_thread_); + DataChannelSink* data_channel_sink_ RTC_GUARDED_BY(network_thread_) = nullptr; }; } // namespace webrtc diff --git a/media/sctp/dcsctp_transport_unittest.cc b/media/sctp/dcsctp_transport_unittest.cc index 9d78d1d85f..270b06a63f 100644 --- a/media/sctp/dcsctp_transport_unittest.cc +++ b/media/sctp/dcsctp_transport_unittest.cc @@ -29,12 +29,18 @@ using ::testing::Return; namespace webrtc { namespace { -class SctpInternalTransportObserver : public sigslot::has_slots<> { +class MockDataChannelObserver : public DataChannelSink { public: - MOCK_METHOD(void, OnSignalReadyToSendData, ()); - MOCK_METHOD(void, OnSignalAssociationChangeCommunicationUp, ()); - MOCK_METHOD(void, OnSignalClosingProcedureStartedRemotely, (int)); - MOCK_METHOD(void, OnSignalClosingProcedureComplete, (int)); + MOCK_METHOD(void, OnConnected, ()); + + // DataChannelSink + MOCK_METHOD(void, + OnDataReceived, + (int, DataMessageType, const rtc::CopyOnWriteBuffer&)); + MOCK_METHOD(void, OnChannelClosing, (int)); + MOCK_METHOD(void, OnChannelClosed, (int)); + MOCK_METHOD(void, OnReadyToSend, ()); + MOCK_METHOD(void, OnTransportClosed, (RTCError)); }; class Peer { @@ -52,28 +58,16 @@ class Peer { sctp_transport_ = std::make_unique( rtc::Thread::Current(), &fake_packet_transport_, &simulated_clock_, std::move(mock_dcsctp_socket_factory)); - - sctp_transport_->SignalAssociationChangeCommunicationUp.connect( - static_cast(&observer_), - &SctpInternalTransportObserver::OnSignalReadyToSendData); - sctp_transport_->SignalAssociationChangeCommunicationUp.connect( - static_cast(&observer_), - &SctpInternalTransportObserver:: - OnSignalAssociationChangeCommunicationUp); - sctp_transport_->SignalClosingProcedureStartedRemotely.connect( - static_cast(&observer_), - &SctpInternalTransportObserver:: - OnSignalClosingProcedureStartedRemotely); - sctp_transport_->SignalClosingProcedureComplete.connect( - static_cast(&observer_), - &SctpInternalTransportObserver::OnSignalClosingProcedureComplete); + sctp_transport_->SetDataChannelSink(&observer_); + sctp_transport_->SetOnConnectedCallback( + [this]() { observer_.OnConnected(); }); } rtc::FakePacketTransport fake_packet_transport_; webrtc::SimulatedClock simulated_clock_; dcsctp::MockDcSctpSocket* socket_; std::unique_ptr sctp_transport_; - NiceMock observer_; + NiceMock observer_; }; } // namespace @@ -86,8 +80,8 @@ TEST(DcSctpTransportTest, OpenSequence) { .Times(1) .WillOnce(Invoke(peer_a.sctp_transport_.get(), &dcsctp::DcSctpSocketCallbacks::OnConnected)); - EXPECT_CALL(peer_a.observer_, OnSignalReadyToSendData); - EXPECT_CALL(peer_a.observer_, OnSignalAssociationChangeCommunicationUp); + EXPECT_CALL(peer_a.observer_, OnReadyToSend); + EXPECT_CALL(peer_a.observer_, OnConnected); peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); } @@ -109,11 +103,10 @@ TEST(DcSctpTransportTest, CloseSequence) { EXPECT_CALL(*peer_b.socket_, ResetStreams(ElementsAre(dcsctp::StreamID(1)))) .WillOnce(Return(dcsctp::ResetStreamsStatus::kPerformed)); - EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureStartedRemotely(1)) - .Times(0); - EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureStartedRemotely(1)); - EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureComplete(1)); - EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureComplete(1)); + EXPECT_CALL(peer_a.observer_, OnChannelClosing(1)).Times(0); + EXPECT_CALL(peer_b.observer_, OnChannelClosing(1)); + EXPECT_CALL(peer_a.observer_, OnChannelClosed(1)); + EXPECT_CALL(peer_b.observer_, OnChannelClosed(1)); } peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); @@ -135,7 +128,7 @@ TEST(DcSctpTransportTest, CloseSequence) { // Tests that the close sequence initiated from both peers at the same time // terminates properly. Both peers will think they initiated it, so no -// OnSignalClosingProcedureStartedRemotely should be called. +// OnClosingProcedureStartedRemotely should be called. TEST(DcSctpTransportTest, CloseSequenceSimultaneous) { rtc::AutoThread main_thread; Peer peer_a; @@ -151,12 +144,10 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) { EXPECT_CALL(*peer_b.socket_, ResetStreams(ElementsAre(dcsctp::StreamID(1)))) .WillOnce(Return(dcsctp::ResetStreamsStatus::kPerformed)); - EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureStartedRemotely(1)) - .Times(0); - EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureStartedRemotely(1)) - .Times(0); - EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureComplete(1)); - EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureComplete(1)); + EXPECT_CALL(peer_a.observer_, OnChannelClosing(1)).Times(0); + EXPECT_CALL(peer_b.observer_, OnChannelClosing(1)).Times(0); + EXPECT_CALL(peer_a.observer_, OnChannelClosed(1)); + EXPECT_CALL(peer_b.observer_, OnChannelClosed(1)); } peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); diff --git a/media/sctp/sctp_transport_internal.h b/media/sctp/sctp_transport_internal.h index 93a59b9dc7..38da554911 100644 --- a/media/sctp/sctp_transport_internal.h +++ b/media/sctp/sctp_transport_internal.h @@ -19,13 +19,13 @@ #include #include "api/transport/data_channel_transport_interface.h" -#include "rtc_base/copy_on_write_buffer.h" -#include "rtc_base/thread.h" // For SendDataParams/ReceiveDataParams. // TODO(deadbeef): Use something else for SCTP. It's confusing that we use an // SSRC field for SID. #include "media/base/media_channel.h" #include "p2p/base/packet_transport_internal.h" +#include "rtc_base/copy_on_write_buffer.h" +#include "rtc_base/thread.h" namespace cricket { @@ -77,6 +77,9 @@ class SctpTransportInternal { public: virtual ~SctpTransportInternal() {} + virtual void SetOnConnectedCallback(std::function callback) = 0; + virtual void SetDataChannelSink(webrtc::DataChannelSink* sink) = 0; + // Changes what underlying DTLS transport is uses. Used when switching which // bundled transport the SctpTransport uses. virtual void SetDtlsTransport(rtc::PacketTransportInternal* transport) = 0; @@ -140,24 +143,6 @@ class SctpTransportInternal { // Returns the current negotiated max # of inbound streams. virtual absl::optional max_inbound_streams() const = 0; - sigslot::signal0<> SignalReadyToSendData; - sigslot::signal0<> SignalAssociationChangeCommunicationUp; - // ReceiveDataParams includes SID, seq num, timestamp, etc. CopyOnWriteBuffer - // contains message payload. - sigslot::signal2 - SignalDataReceived; - // Parameter is SID; fired when we receive an incoming stream reset on an - // open stream, indicating that the other side started the closing procedure. - // After resetting the outgoing stream, SignalClosingProcedureComplete will - // fire too. - sigslot::signal1 SignalClosingProcedureStartedRemotely; - // Parameter is SID; fired when closing procedure is complete (both incoming - // and outgoing streams reset). - sigslot::signal1 SignalClosingProcedureComplete; - // Fired when the underlying DTLS transport has closed due to an error - // or an incoming DTLS disconnect or SCTP transport errors. - sigslot::signal1 SignalClosedAbruptly; - // Helper for debugging. virtual void set_debug_name_for_testing(const char* debug_name) = 0; }; diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 456f267daf..3d34aaec5b 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -204,7 +204,6 @@ rtc_source_set("jsep_transport") { ":rtcp_mux_filter", ":rtp_transport", ":rtp_transport_internal", - ":sctp_data_channel_transport", ":sctp_transport", ":session_description", ":srtp_filter", @@ -466,22 +465,6 @@ rtc_source_set("rtp_transport_internal") { ] } -rtc_source_set("sctp_data_channel_transport") { - visibility = [ ":*" ] - sources = [ - "sctp_data_channel_transport.cc", - "sctp_data_channel_transport.h", - ] - deps = [ - "../api:rtc_error", - "../api/transport:datagram_transport_interface", - "../media:rtc_data_sctp_transport_internal", - "../media:rtc_media_base", - "../rtc_base:copy_on_write_buffer", - "../rtc_base/third_party/sigslot", - ] -} - rtc_source_set("sctp_transport") { visibility = [ ":*" ] sources = [ @@ -494,6 +477,7 @@ rtc_source_set("sctp_transport") { "../api:libjingle_peerconnection_api", "../api:scoped_refptr", "../api:sequence_checker", + "../api/transport:datagram_transport_interface", "../media:rtc_data_sctp_transport_internal", "../p2p:rtc_p2p", "../rtc_base", diff --git a/pc/jsep_transport.cc b/pc/jsep_transport.cc index dad415b93b..ec186626b7 100644 --- a/pc/jsep_transport.cc +++ b/pc/jsep_transport.cc @@ -22,7 +22,6 @@ #include "api/candidate.h" #include "p2p/base/p2p_constants.h" #include "p2p/base/p2p_transport_channel.h" -#include "pc/sctp_data_channel_transport.h" #include "rtc_base/checks.h" #include "rtc_base/copy_on_write_buffer.h" #include "rtc_base/logging.h" @@ -99,10 +98,6 @@ JsepTransport::JsepTransport( ? rtc::make_ref_counted( std::move(rtcp_dtls_transport)) : nullptr), - sctp_data_channel_transport_( - sctp_transport ? std::make_unique( - sctp_transport.get()) - : nullptr), sctp_transport_(sctp_transport ? rtc::make_ref_counted( std::move(sctp_transport)) diff --git a/pc/jsep_transport.h b/pc/jsep_transport.h index 93604a179f..f2643070a1 100644 --- a/pc/jsep_transport.h +++ b/pc/jsep_transport.h @@ -223,10 +223,7 @@ class JsepTransport { // TODO(bugs.webrtc.org/9719): Delete method, update callers to use // SctpTransport() instead. webrtc::DataChannelTransportInterface* data_channel_transport() const { - if (sctp_data_channel_transport_) { - return sctp_data_channel_transport_.get(); - } - return nullptr; + return sctp_transport_.get(); } // TODO(deadbeef): The methods below are only public for testing. Should make @@ -311,8 +308,6 @@ class JsepTransport { rtc::scoped_refptr rtcp_dtls_transport_ RTC_GUARDED_BY(network_thread_); - const std::unique_ptr - sctp_data_channel_transport_; const rtc::scoped_refptr sctp_transport_; SrtpFilter sdes_negotiator_ RTC_GUARDED_BY(network_thread_); diff --git a/pc/sctp_data_channel_transport.cc b/pc/sctp_data_channel_transport.cc deleted file mode 100644 index 626d1757b7..0000000000 --- a/pc/sctp_data_channel_transport.cc +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2019 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "pc/sctp_data_channel_transport.h" - -namespace webrtc { - -SctpDataChannelTransport::SctpDataChannelTransport( - cricket::SctpTransportInternal* sctp_transport) - : sctp_transport_(sctp_transport) { - sctp_transport_->SignalReadyToSendData.connect( - this, &SctpDataChannelTransport::OnReadyToSendData); - sctp_transport_->SignalDataReceived.connect( - this, &SctpDataChannelTransport::OnDataReceived); - sctp_transport_->SignalClosingProcedureStartedRemotely.connect( - this, &SctpDataChannelTransport::OnClosingProcedureStartedRemotely); - sctp_transport_->SignalClosingProcedureComplete.connect( - this, &SctpDataChannelTransport::OnClosingProcedureComplete); - sctp_transport_->SignalClosedAbruptly.connect( - this, &SctpDataChannelTransport::OnClosedAbruptly); -} - -RTCError SctpDataChannelTransport::OpenChannel(int channel_id) { - sctp_transport_->OpenStream(channel_id); - return RTCError::OK(); -} - -RTCError SctpDataChannelTransport::SendData( - int channel_id, - const SendDataParams& params, - const rtc::CopyOnWriteBuffer& buffer) { - cricket::SendDataResult result; - sctp_transport_->SendData(channel_id, params, buffer, &result); - - // TODO(mellem): See about changing the interfaces to not require mapping - // SendDataResult to RTCError and back again. - switch (result) { - case cricket::SendDataResult::SDR_SUCCESS: - return RTCError::OK(); - case cricket::SendDataResult::SDR_BLOCK: { - // Send buffer is full. - ready_to_send_ = false; - return RTCError(RTCErrorType::RESOURCE_EXHAUSTED); - } - case cricket::SendDataResult::SDR_ERROR: - return RTCError(RTCErrorType::NETWORK_ERROR); - } - return RTCError(RTCErrorType::NETWORK_ERROR); -} - -RTCError SctpDataChannelTransport::CloseChannel(int channel_id) { - sctp_transport_->ResetStream(channel_id); - return RTCError::OK(); -} - -void SctpDataChannelTransport::SetDataSink(DataChannelSink* sink) { - sink_ = sink; - if (sink_ && ready_to_send_) { - sink_->OnReadyToSend(); - } -} - -bool SctpDataChannelTransport::IsReadyToSend() const { - return ready_to_send_; -} - -void SctpDataChannelTransport::OnReadyToSendData() { - ready_to_send_ = true; - if (sink_) { - sink_->OnReadyToSend(); - } -} - -void SctpDataChannelTransport::OnDataReceived( - const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& buffer) { - if (sink_) { - sink_->OnDataReceived(params.sid, params.type, buffer); - } -} - -void SctpDataChannelTransport::OnClosingProcedureStartedRemotely( - int channel_id) { - if (sink_) { - sink_->OnChannelClosing(channel_id); - } -} - -void SctpDataChannelTransport::OnClosingProcedureComplete(int channel_id) { - if (sink_) { - sink_->OnChannelClosed(channel_id); - } -} - -void SctpDataChannelTransport::OnClosedAbruptly(RTCError error) { - if (sink_) { - sink_->OnTransportClosed(error); - } -} - -} // namespace webrtc diff --git a/pc/sctp_data_channel_transport.h b/pc/sctp_data_channel_transport.h deleted file mode 100644 index 4b89205ea1..0000000000 --- a/pc/sctp_data_channel_transport.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2019 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef PC_SCTP_DATA_CHANNEL_TRANSPORT_H_ -#define PC_SCTP_DATA_CHANNEL_TRANSPORT_H_ - -#include "api/rtc_error.h" -#include "api/transport/data_channel_transport_interface.h" -#include "media/base/media_channel.h" -#include "media/sctp/sctp_transport_internal.h" -#include "rtc_base/copy_on_write_buffer.h" -#include "rtc_base/third_party/sigslot/sigslot.h" - -namespace webrtc { - -// SCTP implementation of DataChannelTransportInterface. -class SctpDataChannelTransport : public DataChannelTransportInterface, - public sigslot::has_slots<> { - public: - explicit SctpDataChannelTransport( - cricket::SctpTransportInternal* sctp_transport); - - RTCError OpenChannel(int channel_id) override; - RTCError SendData(int channel_id, - const SendDataParams& params, - const rtc::CopyOnWriteBuffer& buffer) override; - RTCError CloseChannel(int channel_id) override; - void SetDataSink(DataChannelSink* sink) override; - bool IsReadyToSend() const override; - - private: - void OnReadyToSendData(); - void OnDataReceived(const cricket::ReceiveDataParams& params, - const rtc::CopyOnWriteBuffer& buffer); - void OnClosingProcedureStartedRemotely(int channel_id); - void OnClosingProcedureComplete(int channel_id); - void OnClosedAbruptly(RTCError error); - - cricket::SctpTransportInternal* const sctp_transport_; - - DataChannelSink* sink_ = nullptr; - bool ready_to_send_ = false; -}; - -} // namespace webrtc - -#endif // PC_SCTP_DATA_CHANNEL_TRANSPORT_H_ diff --git a/pc/sctp_transport.cc b/pc/sctp_transport.cc index 7d4e4551f1..f411883605 100644 --- a/pc/sctp_transport.cc +++ b/pc/sctp_transport.cc @@ -28,9 +28,8 @@ SctpTransport::SctpTransport( info_(SctpTransportState::kNew), internal_sctp_transport_(std::move(internal)) { RTC_DCHECK(internal_sctp_transport_.get()); - internal_sctp_transport_->SignalAssociationChangeCommunicationUp.connect( - this, &SctpTransport::OnAssociationChangeCommunicationUp); - // TODO(https://bugs.webrtc.org/10360): Add handlers for transport closing. + internal_sctp_transport_->SetOnConnectedCallback( + [this]() { OnAssociationChangeCommunicationUp(); }); if (dtls_transport_) { UpdateInformation(SctpTransportState::kConnecting); @@ -70,6 +69,54 @@ void SctpTransport::UnregisterObserver() { observer_ = nullptr; } +RTCError SctpTransport::OpenChannel(int channel_id) { + RTC_DCHECK_RUN_ON(owner_thread_); + RTC_DCHECK(internal_sctp_transport_); + internal_sctp_transport_->OpenStream(channel_id); + return RTCError::OK(); +} + +RTCError SctpTransport::SendData(int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) { + RTC_DCHECK_RUN_ON(owner_thread_); + RTC_DCHECK(internal_sctp_transport_); + cricket::SendDataResult result; + internal_sctp_transport_->SendData(channel_id, params, buffer, &result); + + // TODO(mellem): See about changing the interfaces to not require mapping + // SendDataResult to RTCError and back again. + switch (result) { + case cricket::SendDataResult::SDR_SUCCESS: + return RTCError::OK(); + case cricket::SendDataResult::SDR_BLOCK: + // Send buffer is full. + return RTCError(RTCErrorType::RESOURCE_EXHAUSTED); + case cricket::SendDataResult::SDR_ERROR: + return RTCError(RTCErrorType::NETWORK_ERROR); + } + return RTCError(RTCErrorType::NETWORK_ERROR); +} + +RTCError SctpTransport::CloseChannel(int channel_id) { + RTC_DCHECK_RUN_ON(owner_thread_); + RTC_DCHECK(internal_sctp_transport_); + internal_sctp_transport_->ResetStream(channel_id); + return RTCError::OK(); +} + +void SctpTransport::SetDataSink(DataChannelSink* sink) { + RTC_DCHECK_RUN_ON(owner_thread_); + RTC_DCHECK(internal_sctp_transport_); + internal_sctp_transport_->SetDataChannelSink(sink); +} + +bool SctpTransport::IsReadyToSend() const { + RTC_DCHECK_RUN_ON(owner_thread_); + RTC_DCHECK(internal_sctp_transport_); + return internal_sctp_transport_->ReadyToSendData(); +} + rtc::scoped_refptr SctpTransport::dtls_transport() const { RTC_DCHECK_RUN_ON(owner_thread_); diff --git a/pc/sctp_transport.h b/pc/sctp_transport.h index 4981db4ede..35e7656100 100644 --- a/pc/sctp_transport.h +++ b/pc/sctp_transport.h @@ -17,11 +17,11 @@ #include "api/scoped_refptr.h" #include "api/sctp_transport_interface.h" #include "api/sequence_checker.h" +#include "api/transport/data_channel_transport_interface.h" #include "media/sctp/sctp_transport_internal.h" #include "p2p/base/dtls_transport_internal.h" #include "pc/dtls_transport.h" #include "rtc_base/checks.h" -#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h" @@ -33,16 +33,26 @@ namespace webrtc { // the same thread as the one the cricket::SctpTransportInternal object // lives on. class SctpTransport : public SctpTransportInterface, - public sigslot::has_slots<> { + public DataChannelTransportInterface { public: explicit SctpTransport( std::unique_ptr internal); + // SctpTransportInterface rtc::scoped_refptr dtls_transport() const override; SctpTransportInformation Information() const override; void RegisterObserver(SctpTransportObserverInterface* observer) override; void UnregisterObserver() override; + // DataChannelTransportInterface + RTCError OpenChannel(int channel_id) override; + RTCError SendData(int channel_id, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& buffer) override; + RTCError CloseChannel(int channel_id) override; + void SetDataSink(DataChannelSink* sink) override; + bool IsReadyToSend() const override; + // Internal functions void Clear(); void SetDtlsTransport(rtc::scoped_refptr); diff --git a/pc/sctp_transport_unittest.cc b/pc/sctp_transport_unittest.cc index 1e12a0a9d2..47ed97d291 100644 --- a/pc/sctp_transport_unittest.cc +++ b/pc/sctp_transport_unittest.cc @@ -39,6 +39,10 @@ namespace { class FakeCricketSctpTransport : public cricket::SctpTransportInternal { public: + void SetOnConnectedCallback(std::function callback) override { + on_connected_callback_ = std::move(callback); + } + void SetDataChannelSink(DataChannelSink* sink) override {} void SetDtlsTransport(rtc::PacketTransportInternal* transport) override {} bool Start(int local_port, int remote_port, int max_message_size) override { return true; @@ -60,20 +64,12 @@ class FakeCricketSctpTransport : public cricket::SctpTransportInternal { absl::optional max_inbound_streams() const override { return max_inbound_streams_; } - // Methods exposed for testing - void SendSignalReadyToSendData() { SignalReadyToSendData(); } void SendSignalAssociationChangeCommunicationUp() { - SignalAssociationChangeCommunicationUp(); + ASSERT_TRUE(on_connected_callback_); + on_connected_callback_(); } - void SendSignalClosingProcedureStartedRemotely() { - SignalClosingProcedureStartedRemotely(1); - } - - void SendSignalClosingProcedureComplete() { - SignalClosingProcedureComplete(1); - } void set_max_outbound_streams(int streams) { max_outbound_streams_ = streams; } @@ -82,6 +78,7 @@ class FakeCricketSctpTransport : public cricket::SctpTransportInternal { private: absl::optional max_outbound_streams_; absl::optional max_inbound_streams_; + std::function on_connected_callback_; }; } // namespace @@ -134,7 +131,6 @@ class SctpTransportTest : public ::testing::Test { } void CompleteSctpHandshake() { - CricketSctpTransport()->SendSignalReadyToSendData(); // The computed MaxChannels shall be the minimum of the outgoing // and incoming # of streams. CricketSctpTransport()->set_max_outbound_streams(kTestMaxSctpStreams); diff --git a/test/pc/sctp/fake_sctp_transport.h b/test/pc/sctp/fake_sctp_transport.h index 42b978a900..94272346f9 100644 --- a/test/pc/sctp/fake_sctp_transport.h +++ b/test/pc/sctp/fake_sctp_transport.h @@ -20,6 +20,8 @@ // local/remote ports. class FakeSctpTransport : public cricket::SctpTransportInternal { public: + void SetOnConnectedCallback(std::function callback) override {} + void SetDataChannelSink(webrtc::DataChannelSink* sink) override {} void SetDtlsTransport(rtc::PacketTransportInternal* transport) override {} bool Start(int local_port, int remote_port, int max_message_size) override { local_port_.emplace(local_port);