diff --git a/pc/BUILD.gn b/pc/BUILD.gn index d63d98e483..e8d1e8622b 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -475,9 +475,9 @@ rtc_source_set("rtp_transport_internal") { ":session_description", "../call:rtp_receiver", "../p2p:rtc_p2p", + "../rtc_base:callback_list", "../rtc_base:network_route", "../rtc_base:ssl", - "../rtc_base/third_party/sigslot", ] } diff --git a/pc/channel.cc b/pc/channel.cc index afe9d034fd..82ca1a389e 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -166,14 +166,17 @@ bool BaseChannel::ConnectToRtpTransport_n() { if (!rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this)) { return false; } - rtp_transport_->SignalReadyToSend.connect( - this, &BaseChannel::OnTransportReadyToSend); - rtp_transport_->SignalNetworkRouteChanged.connect( - this, &BaseChannel::OnNetworkRouteChanged); - rtp_transport_->SignalWritableState.connect(this, - &BaseChannel::OnWritableState); - rtp_transport_->SignalSentPacket.connect(this, - &BaseChannel::SignalSentPacket_n); + rtp_transport_->SubscribeReadyToSend( + this, [this](bool ready) { OnTransportReadyToSend(ready); }); + rtp_transport_->SubscribeNetworkRouteChanged( + this, [this](absl::optional route) { + OnNetworkRouteChanged(route); + }); + rtp_transport_->SubscribeWritableState( + this, [this](bool state) { OnWritableState(state); }); + rtp_transport_->SubscribeSentPacket( + this, + [this](const rtc::SentPacket& packet) { SignalSentPacket_n(packet); }); return true; } @@ -181,10 +184,10 @@ void BaseChannel::DisconnectFromRtpTransport_n() { RTC_DCHECK(rtp_transport_); RTC_DCHECK(media_send_channel()); rtp_transport_->UnregisterRtpDemuxerSink(this); - rtp_transport_->SignalReadyToSend.disconnect(this); - rtp_transport_->SignalNetworkRouteChanged.disconnect(this); - rtp_transport_->SignalWritableState.disconnect(this); - rtp_transport_->SignalSentPacket.disconnect(this); + rtp_transport_->UnsubscribeReadyToSend(this); + rtp_transport_->UnsubscribeNetworkRouteChanged(this); + rtp_transport_->UnsubscribeWritableState(this); + rtp_transport_->UnsubscribeSentPacket(this); rtp_transport_ = nullptr; media_send_channel()->SetInterface(nullptr); media_receive_channel()->SetInterface(nullptr); diff --git a/pc/channel.h b/pc/channel.h index 23ae69fa5d..d3a7e89366 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -45,7 +45,6 @@ #include "rtc_base/network/sent_packet.h" #include "rtc_base/network_route.h" #include "rtc_base/socket.h" -#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/unique_id_generator.h" @@ -69,8 +68,6 @@ class VideoChannel; class VoiceChannel; class BaseChannel : public ChannelInterface, - // TODO(tommi): Remove has_slots inheritance. - public sigslot::has_slots<>, // TODO(tommi): Consider implementing these interfaces // via composition. public MediaChannelNetworkInterface, diff --git a/pc/dtls_srtp_transport_unittest.cc b/pc/dtls_srtp_transport_unittest.cc index 5c4eac0050..bf0676c324 100644 --- a/pc/dtls_srtp_transport_unittest.cc +++ b/pc/dtls_srtp_transport_unittest.cc @@ -77,17 +77,23 @@ class DtlsSrtpTransportTest : public ::testing::Test, dtls_srtp_transport2_ = MakeDtlsSrtpTransport(rtp_dtls2, rtcp_dtls2, rtcp_mux_enabled); - dtls_srtp_transport1_->SignalRtcpPacketReceived.connect( + dtls_srtp_transport1_->SubscribeRtcpPacketReceived( &transport_observer1_, - &webrtc::TransportObserver::OnRtcpPacketReceived); - dtls_srtp_transport1_->SignalReadyToSend.connect( - &transport_observer1_, &webrtc::TransportObserver::OnReadyToSend); + [this](rtc::CopyOnWriteBuffer* buffer, int64_t packet_time_ms) { + transport_observer1_.OnRtcpPacketReceived(buffer, packet_time_ms); + }); + dtls_srtp_transport1_->SubscribeReadyToSend( + &transport_observer1_, + [this](bool ready) { transport_observer1_.OnReadyToSend(ready); }); - dtls_srtp_transport2_->SignalRtcpPacketReceived.connect( + dtls_srtp_transport2_->SubscribeRtcpPacketReceived( &transport_observer2_, - &webrtc::TransportObserver::OnRtcpPacketReceived); - dtls_srtp_transport2_->SignalReadyToSend.connect( - &transport_observer2_, &webrtc::TransportObserver::OnReadyToSend); + [this](rtc::CopyOnWriteBuffer* buffer, int64_t packet_time_ms) { + transport_observer2_.OnRtcpPacketReceived(buffer, packet_time_ms); + }); + dtls_srtp_transport2_->SubscribeReadyToSend( + &transport_observer2_, + [this](bool ready) { transport_observer2_.OnReadyToSend(ready); }); webrtc::RtpDemuxerCriteria demuxer_criteria; // 0x00 is the payload type used in kPcmuFrame. demuxer_criteria.payload_types() = {0x00}; diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index 667668053d..792365b521 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -1090,10 +1090,16 @@ RTCError JsepTransportController::MaybeCreateJsepTransport( UpdateAggregateStates_n(); }); - jsep_transport->rtp_transport()->SignalRtcpPacketReceived.connect( - this, &JsepTransportController::OnRtcpPacketReceived_n); - jsep_transport->rtp_transport()->SignalUnDemuxableRtpPacketReceived.connect( - this, &JsepTransportController::OnUnDemuxableRtpPacketReceived_n); + jsep_transport->rtp_transport()->SubscribeRtcpPacketReceived( + this, [this](rtc::CopyOnWriteBuffer* buffer, int64_t packet_time_ms) { + RTC_DCHECK_RUN_ON(network_thread_); + OnRtcpPacketReceived_n(buffer, packet_time_ms); + }); + jsep_transport->rtp_transport()->SetUnDemuxableRtpPacketReceivedHandler( + [this](webrtc::RtpPacketReceived& packet) { + RTC_DCHECK_RUN_ON(network_thread_); + OnUnDemuxableRtpPacketReceived_n(packet); + }); transports_.RegisterTransport(content_info.name, std::move(jsep_transport)); UpdateAggregateStates_n(); diff --git a/pc/rtp_transport.cc b/pc/rtp_transport.cc index 8ef597f58f..32bdf99ec4 100644 --- a/pc/rtp_transport.cc +++ b/pc/rtp_transport.cc @@ -59,7 +59,7 @@ void RtpTransport::SetRtpPacketTransport( rtp_packet_transport_->SignalWritableState.disconnect(this); rtp_packet_transport_->SignalSentPacket.disconnect(this); // Reset the network route of the old transport. - SignalNetworkRouteChanged(absl::optional()); + SendNetworkRouteChanged(absl::optional()); } if (new_packet_transport) { new_packet_transport->SignalReadyToSend.connect( @@ -73,7 +73,7 @@ void RtpTransport::SetRtpPacketTransport( new_packet_transport->SignalSentPacket.connect(this, &RtpTransport::OnSentPacket); // Set the network route for the new transport. - SignalNetworkRouteChanged(new_packet_transport->network_route()); + SendNetworkRouteChanged(new_packet_transport->network_route()); } rtp_packet_transport_ = new_packet_transport; @@ -95,7 +95,7 @@ void RtpTransport::SetRtcpPacketTransport( rtcp_packet_transport_->SignalWritableState.disconnect(this); rtcp_packet_transport_->SignalSentPacket.disconnect(this); // Reset the network route of the old transport. - SignalNetworkRouteChanged(absl::optional()); + SendNetworkRouteChanged(absl::optional()); } if (new_packet_transport) { new_packet_transport->SignalReadyToSend.connect( @@ -109,7 +109,7 @@ void RtpTransport::SetRtcpPacketTransport( new_packet_transport->SignalSentPacket.connect(this, &RtpTransport::OnSentPacket); // Set the network route for the new transport. - SignalNetworkRouteChanged(new_packet_transport->network_route()); + SendNetworkRouteChanged(new_packet_transport->network_route()); } rtcp_packet_transport_ = new_packet_transport; @@ -195,7 +195,7 @@ void RtpTransport::DemuxPacket(rtc::CopyOnWriteBuffer packet, if (!rtp_demuxer_.OnRtpPacket(parsed_packet)) { RTC_LOG(LS_VERBOSE) << "Failed to demux RTP packet: " << RtpDemuxer::DescribePacket(parsed_packet); - SignalUnDemuxableRtpPacketReceived(parsed_packet); + NotifyUnDemuxableRtpPacketReceived(parsed_packet); } } @@ -212,21 +212,21 @@ void RtpTransport::OnReadyToSend(rtc::PacketTransportInternal* transport) { void RtpTransport::OnNetworkRouteChanged( absl::optional network_route) { - SignalNetworkRouteChanged(network_route); + SendNetworkRouteChanged(network_route); } void RtpTransport::OnWritableState( rtc::PacketTransportInternal* packet_transport) { RTC_DCHECK(packet_transport == rtp_packet_transport_ || packet_transport == rtcp_packet_transport_); - SignalWritableState(IsTransportWritable()); + SendWritableState(IsTransportWritable()); } void RtpTransport::OnSentPacket(rtc::PacketTransportInternal* packet_transport, const rtc::SentPacket& sent_packet) { RTC_DCHECK(packet_transport == rtp_packet_transport_ || packet_transport == rtcp_packet_transport_); - SignalSentPacket(sent_packet); + SendSentPacket(sent_packet); } void RtpTransport::OnRtpPacketReceived(rtc::CopyOnWriteBuffer packet, @@ -236,7 +236,7 @@ void RtpTransport::OnRtpPacketReceived(rtc::CopyOnWriteBuffer packet, void RtpTransport::OnRtcpPacketReceived(rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { - SignalRtcpPacketReceived(&packet, packet_time_us); + SendRtcpPacketReceived(&packet, packet_time_us); } void RtpTransport::OnReadPacket(rtc::PacketTransportInternal* transport, @@ -286,7 +286,7 @@ void RtpTransport::MaybeSignalReadyToSend() { rtp_ready_to_send_ && (rtcp_ready_to_send_ || rtcp_mux_enabled_); if (ready_to_send != ready_to_send_) { ready_to_send_ = ready_to_send; - SignalReadyToSend(ready_to_send); + SendReadyToSend(ready_to_send); } } diff --git a/pc/rtp_transport_internal.h b/pc/rtp_transport_internal.h index 264be134a4..4114fa9340 100644 --- a/pc/rtp_transport_internal.h +++ b/pc/rtp_transport_internal.h @@ -12,13 +12,14 @@ #define PC_RTP_TRANSPORT_INTERNAL_H_ #include +#include #include "call/rtp_demuxer.h" #include "p2p/base/ice_transport_internal.h" #include "pc/session_description.h" +#include "rtc_base/callback_list.h" #include "rtc_base/network_route.h" #include "rtc_base/ssl_stream_adapter.h" -#include "rtc_base/third_party/sigslot/sigslot.h" namespace rtc { class CopyOnWriteBuffer; @@ -50,27 +51,59 @@ class RtpTransportInternal : public sigslot::has_slots<> { // Called whenever a transport's ready-to-send state changes. The argument // is true if all used transports are ready to send. This is more specific // than just "writable"; it means the last send didn't return ENOTCONN. - sigslot::signal1 SignalReadyToSend; + void SubscribeReadyToSend(const void* tag, + absl::AnyInvocable callback) { + callback_list_ready_to_send_.AddReceiver(tag, std::move(callback)); + } + void UnsubscribeReadyToSend(const void* tag) { + callback_list_ready_to_send_.RemoveReceivers(tag); + } // Called whenever an RTCP packet is received. There is no equivalent signal // for demuxable RTP packets because they would be forwarded to the // BaseChannel through the RtpDemuxer callback. - sigslot::signal2 SignalRtcpPacketReceived; + void SubscribeRtcpPacketReceived( + const void* tag, + absl::AnyInvocable callback) { + callback_list_rtcp_packet_received_.AddReceiver(tag, std::move(callback)); + } + // There doesn't seem to be a need to unsubscribe from this signal. // Called whenever a RTP packet that can not be demuxed by the transport is // received. - sigslot::signal - SignalUnDemuxableRtpPacketReceived; + void SetUnDemuxableRtpPacketReceivedHandler( + absl::AnyInvocable callback) { + callback_undemuxable_rtp_packet_received_ = std::move(callback); + } // Called whenever the network route of the P2P layer transport changes. // The argument is an optional network route. - sigslot::signal1> SignalNetworkRouteChanged; + void SubscribeNetworkRouteChanged( + const void* tag, + absl::AnyInvocable)> callback) { + callback_list_network_route_changed_.AddReceiver(tag, std::move(callback)); + } + void UnsubscribeNetworkRouteChanged(const void* tag) { + callback_list_network_route_changed_.RemoveReceivers(tag); + } // Called whenever a transport's writable state might change. The argument is // true if the transport is writable, otherwise it is false. - sigslot::signal1 SignalWritableState; - - sigslot::signal1 SignalSentPacket; + void SubscribeWritableState(const void* tag, + absl::AnyInvocable callback) { + callback_list_writable_state_.AddReceiver(tag, std::move(callback)); + } + void UnsubscribeWritableState(const void* tag) { + callback_list_writable_state_.RemoveReceivers(tag); + } + void SubscribeSentPacket( + const void* tag, + absl::AnyInvocable callback) { + callback_list_sent_packet_.AddReceiver(tag, std::move(callback)); + } + void UnsubscribeSentPacket(const void* tag) { + callback_list_sent_packet_.RemoveReceivers(tag); + } virtual bool IsWritable(bool rtcp) const = 0; @@ -103,6 +136,37 @@ class RtpTransportInternal : public sigslot::has_slots<> { RtpPacketSinkInterface* sink) = 0; virtual bool UnregisterRtpDemuxerSink(RtpPacketSinkInterface* sink) = 0; + + protected: + void SendReadyToSend(bool arg) { callback_list_ready_to_send_.Send(arg); } + void SendRtcpPacketReceived(rtc::CopyOnWriteBuffer* buffer, + int64_t packet_time_us) { + callback_list_rtcp_packet_received_.Send(buffer, packet_time_us); + } + void NotifyUnDemuxableRtpPacketReceived(RtpPacketReceived& packet) { + callback_undemuxable_rtp_packet_received_(packet); + } + void SendNetworkRouteChanged(absl::optional route) { + callback_list_network_route_changed_.Send(route); + } + void SendWritableState(bool state) { + callback_list_writable_state_.Send(state); + } + void SendSentPacket(const rtc::SentPacket& packet) { + callback_list_sent_packet_.Send(packet); + } + + private: + CallbackList callback_list_ready_to_send_; + CallbackList + callback_list_rtcp_packet_received_; + absl::AnyInvocable + callback_undemuxable_rtp_packet_received_ = + [](RtpPacketReceived& packet) {}; + CallbackList> + callback_list_network_route_changed_; + CallbackList callback_list_writable_state_; + CallbackList callback_list_sent_packet_; }; } // namespace webrtc diff --git a/pc/rtp_transport_unittest.cc b/pc/rtp_transport_unittest.cc index e3d4101037..ca748ae9a6 100644 --- a/pc/rtp_transport_unittest.cc +++ b/pc/rtp_transport_unittest.cc @@ -30,9 +30,12 @@ class SignalObserver : public sigslot::has_slots<> { public: explicit SignalObserver(RtpTransport* transport) { transport_ = transport; - transport->SignalReadyToSend.connect(this, &SignalObserver::OnReadyToSend); - transport->SignalNetworkRouteChanged.connect( - this, &SignalObserver::OnNetworkRouteChanged); + transport->SubscribeReadyToSend( + this, [this](bool ready) { OnReadyToSend(ready); }); + transport->SubscribeNetworkRouteChanged( + this, [this](absl::optional route) { + OnNetworkRouteChanged(route); + }); if (transport->rtp_packet_transport()) { transport->rtp_packet_transport()->SignalSentPacket.connect( this, &SignalObserver::OnSentPacket); diff --git a/pc/srtp_transport.cc b/pc/srtp_transport.cc index 1698b2128f..cc20216672 100644 --- a/pc/srtp_transport.cc +++ b/pc/srtp_transport.cc @@ -244,7 +244,7 @@ void SrtpTransport::OnRtcpPacketReceived(rtc::CopyOnWriteBuffer packet, return; } packet.SetSize(len); - SignalRtcpPacketReceived(&packet, packet_time_us); + SendRtcpPacketReceived(&packet, packet_time_us); } void SrtpTransport::OnNetworkRouteChanged( @@ -257,12 +257,12 @@ void SrtpTransport::OnNetworkRouteChanged( } network_route->packet_overhead += srtp_overhead; } - SignalNetworkRouteChanged(network_route); + SendNetworkRouteChanged(network_route); } void SrtpTransport::OnWritableState( rtc::PacketTransportInternal* packet_transport) { - SignalWritableState(IsWritable(/*rtcp=*/false) && IsWritable(/*rtcp=*/true)); + SendWritableState(IsWritable(/*rtcp=*/false) && IsWritable(/*rtcp=*/true)); } bool SrtpTransport::SetRtpParams(int send_crypto_suite, @@ -515,7 +515,7 @@ void SrtpTransport::MaybeUpdateWritableState() { // Only fire the signal if the writable state changes. if (writable_ != writable) { writable_ = writable; - SignalWritableState(writable_); + SendWritableState(writable_); } } diff --git a/pc/srtp_transport_unittest.cc b/pc/srtp_transport_unittest.cc index 4e643935a9..ac8be8762b 100644 --- a/pc/srtp_transport_unittest.cc +++ b/pc/srtp_transport_unittest.cc @@ -66,10 +66,16 @@ class SrtpTransportTest : public ::testing::Test, public sigslot::has_slots<> { srtp_transport1_->SetRtpPacketTransport(rtp_packet_transport1_.get()); srtp_transport2_->SetRtpPacketTransport(rtp_packet_transport2_.get()); - srtp_transport1_->SignalRtcpPacketReceived.connect( - &rtp_sink1_, &TransportObserver::OnRtcpPacketReceived); - srtp_transport2_->SignalRtcpPacketReceived.connect( - &rtp_sink2_, &TransportObserver::OnRtcpPacketReceived); + srtp_transport1_->SubscribeRtcpPacketReceived( + &rtp_sink1_, + [this](rtc::CopyOnWriteBuffer* buffer, int64_t packet_time_ms) { + rtp_sink1_.OnRtcpPacketReceived(buffer, packet_time_ms); + }); + srtp_transport2_->SubscribeRtcpPacketReceived( + &rtp_sink2_, + [this](rtc::CopyOnWriteBuffer* buffer, int64_t packet_time_ms) { + rtp_sink2_.OnRtcpPacketReceived(buffer, packet_time_ms); + }); RtpDemuxerCriteria demuxer_criteria; // 0x00 is the payload type used in kPcmuFrame. diff --git a/pc/test/rtp_transport_test_util.h b/pc/test/rtp_transport_test_util.h index e93dbdb3c2..29ffad8539 100644 --- a/pc/test/rtp_transport_test_util.h +++ b/pc/test/rtp_transport_test_util.h @@ -14,24 +14,26 @@ #include "call/rtp_packet_sink_interface.h" #include "modules/rtp_rtcp/source/rtp_packet_received.h" #include "pc/rtp_transport_internal.h" -#include "rtc_base/third_party/sigslot/sigslot.h" namespace webrtc { // Used to handle the signals when the RtpTransport receives an RTP/RTCP packet. // Used in Rtp/Srtp/DtlsTransport unit tests. -class TransportObserver : public RtpPacketSinkInterface, - public sigslot::has_slots<> { +class TransportObserver : public RtpPacketSinkInterface { public: TransportObserver() {} explicit TransportObserver(RtpTransportInternal* rtp_transport) { - rtp_transport->SignalRtcpPacketReceived.connect( - this, &TransportObserver::OnRtcpPacketReceived); - rtp_transport->SignalReadyToSend.connect(this, - &TransportObserver::OnReadyToSend); - rtp_transport->SignalUnDemuxableRtpPacketReceived.connect( - this, &TransportObserver::OnUndemuxableRtpPacket); + rtp_transport->SubscribeRtcpPacketReceived( + this, [this](rtc::CopyOnWriteBuffer* buffer, int64_t packet_time_ms) { + OnRtcpPacketReceived(buffer, packet_time_ms); + }); + rtp_transport->SubscribeReadyToSend( + this, [this](bool arg) { OnReadyToSend(arg); }); + rtp_transport->SetUnDemuxableRtpPacketReceivedHandler( + [this](webrtc::RtpPacketReceived& packet) { + OnUndemuxableRtpPacket(packet); + }); } // RtpPacketInterface override.