From 8efc914cf353cea138a453c45e970e589bec0834 Mon Sep 17 00:00:00 2001 From: Harald Alvestrand Date: Mon, 7 Feb 2022 10:21:51 +0000 Subject: [PATCH] Replace use of sigslot with CallbackList in data_channel_controller This is a straightforward replacement; simplifications due to the ability to inline functions in the lambdas are for a later CL. Bug: webrtc:11943 Change-Id: I7274cedde507b954f1d8aa8bc560861102eeb264 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/250540 Reviewed-by: Niels Moller Commit-Queue: Harald Alvestrand Cr-Commit-Position: refs/heads/main@{#35936} --- pc/data_channel_controller.cc | 73 ++++++++++++++++++++++------- pc/data_channel_controller.h | 29 ++++++------ pc/peer_connection.cc | 5 ++ pc/peer_connection.h | 15 ++---- pc/peer_connection_internal.h | 14 ++++-- pc/rtc_stats_collector.cc | 4 +- pc/rtc_stats_collector.h | 6 ++- pc/rtc_stats_collector_unittest.cc | 4 +- pc/test/fake_peer_connection_base.h | 9 ++-- 9 files changed, 102 insertions(+), 57 deletions(-) diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index e11647f2ca..a0df4091ba 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -49,14 +49,33 @@ bool DataChannelController::ConnectDataChannel( // whether or not the underlying transport is ready. return false; } - SignalDataChannelTransportWritable_s.connect( - webrtc_data_channel, &SctpDataChannel::OnTransportReady); - SignalDataChannelTransportReceivedData_s.connect( - webrtc_data_channel, &SctpDataChannel::OnDataReceived); - SignalDataChannelTransportChannelClosing_s.connect( - webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely); - SignalDataChannelTransportChannelClosed_s.connect( - webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete); + data_transport_writable_callbacks_.AddReceiver( + webrtc_data_channel, [webrtc_data_channel](bool ready) { + webrtc_data_channel->OnTransportReady(ready); + }); + data_channel_transport_received_data_callbacks_.AddReceiver( + webrtc_data_channel, + [webrtc_data_channel](const cricket::ReceiveDataParams& params, + const rtc::CopyOnWriteBuffer& data) { + webrtc_data_channel->OnDataReceived(params, data); + }); + data_channel_transport_channel_closing_callbacks_.AddReceiver( + webrtc_data_channel, [webrtc_data_channel](int num) { + webrtc_data_channel->OnClosingProcedureStartedRemotely(num); + }); + // When a datachannel is closed, it may get deleted, so we have to make + // sure the closed callback isn't called again. + // This takes advantage of the fact that a channel is never closed twice. + // Unfortunately it doesn't work for pre-opened datachannels, since these + // have id = -1 (unassigned) at registration time, so they must be called + // upon anyway. + int channel_id = webrtc_data_channel->id(); + data_channel_transport_channel_closed_callbacks_.AddReceiver( + webrtc_data_channel, [webrtc_data_channel, channel_id](int num) { + if (num == channel_id || channel_id < 0) { + webrtc_data_channel->OnClosingProcedureComplete(num); + } + }); return true; } @@ -68,10 +87,24 @@ void DataChannelController::DisconnectDataChannel( << "DisconnectDataChannel called when sctp_transport_ is NULL."; return; } - SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel); - SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel); + data_transport_writable_callbacks_.RemoveReceivers(webrtc_data_channel); + data_channel_transport_received_data_callbacks_.RemoveReceivers( + webrtc_data_channel); + data_channel_transport_channel_closing_callbacks_.RemoveReceivers( + webrtc_data_channel); + // This function is being called from the OnClosingProcedureComplete + // function, which is called from the data_channel_transport_channel_closed + // CallbackList. + // Since CallbackList does not permit removing receivers in a callback, + // schedule the disconnect to happen later. + signaling_thread()->PostTask(ToQueuedTask([self = weak_factory_.GetWeakPtr(), + webrtc_data_channel]() { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->data_channel_transport_channel_closed_callbacks_.RemoveReceivers( + webrtc_data_channel); + } + })); } void DataChannelController::AddSctpDataStream(int sid) { @@ -120,7 +153,8 @@ void DataChannelController::OnDataReceived( // SignalDataChannelTransportReceivedData_s to // SignalDataChannelTransportReceivedData_n). if (!self->HandleOpenMessage_s(params, buffer)) { - self->SignalDataChannelTransportReceivedData_s(params, buffer); + self->data_channel_transport_received_data_callbacks_.Send(params, + buffer); } } })); @@ -132,7 +166,8 @@ void DataChannelController::OnChannelClosing(int channel_id) { ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] { if (self) { RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->SignalDataChannelTransportChannelClosing_s(channel_id); + self->data_channel_transport_channel_closing_callbacks_.Send( + channel_id); } })); } @@ -143,7 +178,8 @@ void DataChannelController::OnChannelClosed(int channel_id) { ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] { if (self) { RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->SignalDataChannelTransportChannelClosed_s(channel_id); + self->data_channel_transport_channel_closed_callbacks_.Send( + channel_id); } })); } @@ -155,7 +191,7 @@ void DataChannelController::OnReadyToSend() { if (self) { RTC_DCHECK_RUN_ON(self->signaling_thread()); self->data_channel_transport_ready_to_send_ = true; - self->SignalDataChannelTransportWritable_s( + self->data_transport_writable_callbacks_.Send( self->data_channel_transport_ready_to_send_); } })); @@ -298,8 +334,9 @@ DataChannelController::InternalCreateSctpDataChannel( return nullptr; } sctp_data_channels_.push_back(channel); - channel->SignalClosed.connect(pc_, &PeerConnection::OnSctpDataChannelClosed); - SignalSctpDataChannelCreated_(channel.get()); + channel->SignalClosed.connect( + pc_, &PeerConnectionInternal::OnSctpDataChannelClosed); + sctp_data_channel_created_callbacks_.Send(channel.get()); return channel; } diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index af0e06353f..6b3cd1b46b 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -27,23 +27,21 @@ #include "media/base/stream_params.h" #include "pc/channel.h" #include "pc/data_channel_utils.h" +#include "pc/peer_connection_internal.h" #include "pc/sctp_data_channel.h" #include "rtc_base/checks.h" #include "rtc_base/copy_on_write_buffer.h" #include "rtc_base/ssl_stream_adapter.h" -#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/weak_ptr.h" namespace webrtc { -class PeerConnection; - class DataChannelController : public SctpDataChannelProviderInterface, public DataChannelSink { public: - explicit DataChannelController(PeerConnection* pc) : pc_(pc) {} + explicit DataChannelController(PeerConnectionInternal* pc) : pc_(pc) {} // Not copyable or movable. DataChannelController(DataChannelController&) = delete; @@ -106,10 +104,12 @@ class DataChannelController : public SctpDataChannelProviderInterface, DataChannelTransportInterface* data_channel_transport() const; void set_data_channel_transport(DataChannelTransportInterface* transport); - sigslot::signal1& SignalSctpDataChannelCreated() { + template + void SubscribeDataChannelCreated(F&& callback) { RTC_DCHECK_RUN_ON(signaling_thread()); - return SignalSctpDataChannelCreated_; + sctp_data_channel_created_callbacks_.AddReceiver(callback); } + // Called when the transport for the data channels is closed or destroyed. void OnTransportChannelClosed(RTCError error); @@ -165,22 +165,21 @@ class DataChannelController : public SctpDataChannelProviderInterface, // signaling thread. // TODO(bugs.webrtc.org/11547): These '_s' signals likely all belong on the // network thread. - sigslot::signal1 SignalDataChannelTransportWritable_s + CallbackList data_transport_writable_callbacks_ RTC_GUARDED_BY(signaling_thread()); - sigslot::signal2 - SignalDataChannelTransportReceivedData_s + CallbackList + data_channel_transport_received_data_callbacks_ RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalDataChannelTransportChannelClosing_s + CallbackList data_channel_transport_channel_closing_callbacks_ RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalDataChannelTransportChannelClosed_s + CallbackList data_channel_transport_channel_closed_callbacks_ RTC_GUARDED_BY(signaling_thread()); - sigslot::signal1 SignalSctpDataChannelCreated_ + // Callback listened to for data channel creation. + CallbackList sctp_data_channel_created_callbacks_ RTC_GUARDED_BY(signaling_thread()); - // Owning PeerConnection. - PeerConnection* const pc_; + PeerConnectionInternal* const pc_; // The weak pointers must be dereferenced and invalidated on the signalling // thread only. rtc::WeakPtrFactory weak_factory_{this}; diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 4e951a534b..f6cf5447a4 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -2175,6 +2175,11 @@ bool PeerConnection::GetSslRole(const std::string& content_name, return false; } +void PeerConnection::SubscribeDataChannelCreated( + std::function callback) { + data_channel_controller()->SubscribeDataChannelCreated(callback); +} + bool PeerConnection::GetTransportDescription( const SessionDescription* description, const std::string& content_name, diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 3f3dda601e..4cafe23dca 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -117,8 +117,7 @@ namespace webrtc { // - The ICE state machine. // - Generating stats. class PeerConnection : public PeerConnectionInternal, - public JsepTransportController::Observer, - public sigslot::has_slots<> { + public JsepTransportController::Observer { public: // Creates a PeerConnection and initializes it with the given values. // If the initialization fails, the function releases the PeerConnection @@ -289,11 +288,6 @@ class PeerConnection : public PeerConnectionInternal, return rtp_manager()->transceivers()->List(); } - sigslot::signal1& SignalSctpDataChannelCreated() override { - RTC_DCHECK_RUN_ON(signaling_thread()); - return data_channel_controller_.SignalSctpDataChannelCreated(); - } - std::vector GetDataChannelStats() const override; absl::optional sctp_transport_name() const override; @@ -312,9 +306,10 @@ class PeerConnection : public PeerConnectionInternal, bool IceRestartPending(const std::string& content_name) const override; bool NeedsIceRestart(const std::string& content_name) const override; bool GetSslRole(const std::string& content_name, rtc::SSLRole* role) override; - + void SubscribeDataChannelCreated( + std::function callback) override; // Functions needed by DataChannelController - void NoteDataAddedEvent() { NoteUsageEvent(UsageEvent::DATA_ADDED); } + void NoteDataAddedEvent() override { NoteUsageEvent(UsageEvent::DATA_ADDED); } // Returns the observer. Will crash on CHECK if the observer is removed. PeerConnectionObserver* Observer() const override; bool IsClosed() const override { @@ -325,7 +320,7 @@ class PeerConnection : public PeerConnectionInternal, // Get current SSL role used by SCTP's underlying transport. bool GetSctpSslRole(rtc::SSLRole* role) override; // Handler for the "channel closed" signal - void OnSctpDataChannelClosed(DataChannelInterface* channel); + void OnSctpDataChannelClosed(DataChannelInterface* channel) override; bool ShouldFireNegotiationNeededEvent(uint32_t event_id) override; diff --git a/pc/peer_connection_internal.h b/pc/peer_connection_internal.h index 3b82dc5271..e10468d53c 100644 --- a/pc/peer_connection_internal.h +++ b/pc/peer_connection_internal.h @@ -127,7 +127,8 @@ class PeerConnectionSdpMethods { // Functions defined in this class are called by other objects, // but not by SdpOfferAnswerHandler. class PeerConnectionInternal : public PeerConnectionInterface, - public PeerConnectionSdpMethods { + public PeerConnectionSdpMethods, + public sigslot::has_slots<> { public: virtual rtc::Thread* network_thread() const = 0; virtual rtc::Thread* worker_thread() const = 0; @@ -139,9 +140,6 @@ class PeerConnectionInternal : public PeerConnectionInterface, rtc::scoped_refptr>> GetTransceiversInternal() const = 0; - virtual sigslot::signal1& - SignalSctpDataChannelCreated() = 0; - // Call on the network thread to fetch stats for all the data channels. // TODO(tommi): Make pure virtual after downstream updates. virtual std::vector GetDataChannelStats() const { @@ -172,6 +170,14 @@ class PeerConnectionInternal : public PeerConnectionInterface, // Get SSL role for an arbitrary m= section (handles bundling correctly). virtual bool GetSslRole(const std::string& content_name, rtc::SSLRole* role) = 0; + + // Subscribe to the creation of datachannels. Used by the rtc-stats module. + virtual void SubscribeDataChannelCreated( + std::function callback) = 0; + // Functions needed by DataChannelController + virtual void NoteDataAddedEvent() = 0; + // Handler for the "channel closed" signal + virtual void OnSctpDataChannelClosed(DataChannelInterface* channel) = 0; }; } // namespace webrtc diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc index 7e9807e449..bc24d5b853 100644 --- a/pc/rtc_stats_collector.cc +++ b/pc/rtc_stats_collector.cc @@ -1173,8 +1173,8 @@ RTCStatsCollector::RTCStatsCollector(PeerConnectionInternal* pc, RTC_DCHECK(worker_thread_); RTC_DCHECK(network_thread_); RTC_DCHECK_GE(cache_lifetime_us_, 0); - pc_->SignalSctpDataChannelCreated().connect( - this, &RTCStatsCollector::OnSctpDataChannelCreated); + pc_->SubscribeDataChannelCreated( + [this](SctpDataChannel* channel) { OnSctpDataChannelCreated(channel); }); } RTCStatsCollector::~RTCStatsCollector() { diff --git a/pc/rtc_stats_collector.h b/pc/rtc_stats_collector.h index c84e6d3fef..e75e82b6c2 100644 --- a/pc/rtc_stats_collector.h +++ b/pc/rtc_stats_collector.h @@ -84,6 +84,10 @@ class RTCStatsCollector : public rtc::RefCountInterface, // completed. Must be called on the signaling thread. void WaitForPendingRequest(); + // Callback that is called on data channel creation. + // Exposed for testing. + void OnSctpDataChannelCreated(SctpDataChannel* channel); + protected: RTCStatsCollector(PeerConnectionInternal* pc, int64_t cache_lifetime_us); ~RTCStatsCollector(); @@ -238,8 +242,6 @@ class RTCStatsCollector : public rtc::RefCountInterface, // This is a NO-OP if `network_report_` is null. void MergeNetworkReport_s(); - // Slots for signals (sigslot) that are wired up to `pc_`. - void OnSctpDataChannelCreated(SctpDataChannel* channel); // Slots for signals (sigslot) that are wired up to `channel`. void OnDataChannelOpened(DataChannelInterface* channel); void OnDataChannelClosed(DataChannelInterface* channel); diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc index 8f0936c26c..ac0253c85a 100644 --- a/pc/rtc_stats_collector_unittest.cc +++ b/pc/rtc_stats_collector_unittest.cc @@ -1602,11 +1602,11 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { rtc::scoped_refptr dummy_channel_a = SctpDataChannel::Create( &provider, "DummyChannelA", InternalDataChannelInit(), rtc::Thread::Current(), rtc::Thread::Current()); - pc_->SignalSctpDataChannelCreated()(dummy_channel_a.get()); + stats_->stats_collector()->OnSctpDataChannelCreated(dummy_channel_a.get()); rtc::scoped_refptr dummy_channel_b = SctpDataChannel::Create( &provider, "DummyChannelB", InternalDataChannelInit(), rtc::Thread::Current(), rtc::Thread::Current()); - pc_->SignalSctpDataChannelCreated()(dummy_channel_b.get()); + stats_->stats_collector()->OnSctpDataChannelCreated(dummy_channel_b.get()); dummy_channel_a->SignalOpened(dummy_channel_a.get()); // Closing a channel that is not opened should not affect the counts. diff --git a/pc/test/fake_peer_connection_base.h b/pc/test/fake_peer_connection_base.h index 3462c8c78e..ff4f33d629 100644 --- a/pc/test/fake_peer_connection_base.h +++ b/pc/test/fake_peer_connection_base.h @@ -247,10 +247,6 @@ class FakePeerConnectionBase : public PeerConnectionInternal { return {}; } - sigslot::signal1& SignalSctpDataChannelCreated() override { - return SignalSctpDataChannelCreated_; - } - absl::optional sctp_transport_name() const override { return absl::nullopt; } @@ -289,6 +285,9 @@ class FakePeerConnectionBase : public PeerConnectionInternal { rtc::SSLRole* role) override { return false; } + void SubscribeDataChannelCreated( + std::function callback) override {} + const PeerConnectionInterface::RTCConfiguration* configuration() const override { return nullptr; @@ -356,6 +355,8 @@ class FakePeerConnectionBase : public PeerConnectionInternal { void TeardownDataChannelTransport_n() override {} void SetSctpDataMid(const std::string& mid) override {} void ResetSctpDataMid() override {} + void NoteDataAddedEvent() override {} + void OnSctpDataChannelClosed(DataChannelInterface* channel) override {} protected: sigslot::signal1 SignalSctpDataChannelCreated_;