diff --git a/api/rtp_sender_interface.h b/api/rtp_sender_interface.h index 9afe3e318d..36b261044b 100644 --- a/api/rtp_sender_interface.h +++ b/api/rtp_sender_interface.h @@ -36,6 +36,16 @@ namespace webrtc { +class RtpSenderObserverInterface { + public: + // The observer is called when the first media packet is sent for the observed + // sender. It is called immediately if the first packet was already sent. + virtual void OnFirstPacketSent(cricket::MediaType media_type) = 0; + + protected: + virtual ~RtpSenderObserverInterface() {} +}; + using SetParametersCallback = absl::AnyInvocable; class RTC_EXPORT RtpSenderInterface : public webrtc::RefCountInterface, @@ -88,6 +98,12 @@ class RTC_EXPORT RtpSenderInterface : public webrtc::RefCountInterface, virtual void SetParametersAsync(const RtpParameters& parameters, SetParametersCallback callback); + // Sets an observer which gets a callback when the first media packet is sent + // for this sender. + // Does not take ownership of observer. + // Must call SetObserver(nullptr) before the observer is destroyed. + virtual void SetObserver(RtpSenderObserverInterface* observer) {} + // Returns null for a video sender. virtual rtc::scoped_refptr GetDtmfSender() const = 0; diff --git a/api/test/mock_rtpsender.h b/api/test/mock_rtpsender.h index 4552281f9d..3ee7b84e3e 100644 --- a/api/test/mock_rtpsender.h +++ b/api/test/mock_rtpsender.h @@ -82,6 +82,7 @@ class MockRtpSender : public RtpSenderInterface { SetEncoderSelector, (std::unique_ptr), (override)); + MOCK_METHOD(void, SetObserver, (RtpSenderObserverInterface*), (override)); }; static_assert(!std::is_abstract_v>, ""); diff --git a/media/base/media_channel_impl.cc b/media/base/media_channel_impl.cc index 1c08382969..e3ae4a2878 100644 --- a/media/base/media_channel_impl.cc +++ b/media/base/media_channel_impl.cc @@ -207,6 +207,7 @@ bool MediaChannelUtil::TransportForMediaChannels::SendRtp( included_in_allocation = options.included_in_allocation, batchable = options.batchable, last_packet_in_batch = options.last_packet_in_batch, + is_retransmit = options.is_retransmit, packet = rtc::CopyOnWriteBuffer(packet, kMaxRtpPacketLen)]() mutable { rtc::PacketOptions rtc_options; rtc_options.packet_id = packet_id; @@ -217,6 +218,7 @@ bool MediaChannelUtil::TransportForMediaChannels::SendRtp( included_in_feedback; rtc_options.info_signaled_after_sent.included_in_allocation = included_in_allocation; + rtc_options.info_signaled_after_sent.is_media = !is_retransmit; rtc_options.batchable = batchable; rtc_options.last_packet_in_batch = last_packet_in_batch; DoSendPacket(&packet, false, rtc_options); diff --git a/pc/channel.cc b/pc/channel.cc index 6e4a80be3c..9156ad1d61 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -382,6 +382,13 @@ void BaseChannel::SetFirstPacketReceivedCallback( on_first_packet_received_ = std::move(callback); } +void BaseChannel::SetFirstPacketSentCallback(std::function callback) { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(!on_first_packet_sent_ || !callback); + + on_first_packet_sent_ = std::move(callback); +} + void BaseChannel::OnTransportReadyToSend(bool ready) { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(network_initialized()); @@ -430,6 +437,11 @@ bool BaseChannel::SendPacket(bool rtcp, << "."; } + if (on_first_packet_sent_ && options.info_signaled_after_sent.is_media) { + on_first_packet_sent_(); + on_first_packet_sent_ = nullptr; + } + return rtcp ? rtp_transport_->SendRtcpPacket(packet, options, PF_SRTP_BYPASS) : rtp_transport_->SendRtpPacket(packet, options, PF_SRTP_BYPASS); } diff --git a/pc/channel.h b/pc/channel.h index 9a1b0a7583..a69137c41a 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -149,6 +149,7 @@ class BaseChannel : public ChannelInterface, // Used for latency measurements. void SetFirstPacketReceivedCallback(std::function callback) override; + void SetFirstPacketSentCallback(std::function callback) override; // From RtpTransport - public for testing only void OnTransportReadyToSend(bool ready); @@ -316,8 +317,10 @@ class BaseChannel : public ChannelInterface, webrtc::TaskQueueBase* const signaling_thread_; rtc::scoped_refptr alive_; + // The functions are deleted after they have been called. std::function on_first_packet_received_ RTC_GUARDED_BY(network_thread()); + std::function on_first_packet_sent_ RTC_GUARDED_BY(network_thread()); webrtc::RtpTransportInternal* rtp_transport_ RTC_GUARDED_BY(network_thread()) = nullptr; diff --git a/pc/channel_interface.h b/pc/channel_interface.h index 8d6a9fe745..b4106daa58 100644 --- a/pc/channel_interface.h +++ b/pc/channel_interface.h @@ -11,6 +11,7 @@ #ifndef PC_CHANNEL_INTERFACE_H_ #define PC_CHANNEL_INTERFACE_H_ +#include #include #include #include @@ -78,6 +79,7 @@ class ChannelInterface { // Used for latency measurements. virtual void SetFirstPacketReceivedCallback( std::function callback) = 0; + virtual void SetFirstPacketSentCallback(std::function callback) = 0; // Channel control virtual bool SetLocalContent(const MediaContentDescription* content, diff --git a/pc/peer_connection_integrationtest.cc b/pc/peer_connection_integrationtest.cc index 4e501e0160..694e13538e 100644 --- a/pc/peer_connection_integrationtest.cc +++ b/pc/peer_connection_integrationtest.cc @@ -201,6 +201,48 @@ TEST_P(PeerConnectionIntegrationTest, })); } +TEST_P(PeerConnectionIntegrationTest, RtpSenderObserverOnFirstPacketSent) { + ASSERT_TRUE(CreatePeerConnectionWrappers()); + ConnectFakeSignaling(); + caller()->AddAudioVideoTracks(); + callee()->AddAudioVideoTracks(); + // Start offer/answer exchange and wait for it to complete. + caller()->CreateAndSetAndSignalOffer(); + ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout); + // Should be one sender each for audio/video. + EXPECT_EQ(2U, caller()->rtp_sender_observers().size()); + EXPECT_EQ(2U, callee()->rtp_sender_observers().size()); + // Wait for all "first packet sent" callbacks to be fired. + EXPECT_TRUE_WAIT( + absl::c_all_of(caller()->rtp_sender_observers(), + [](const std::unique_ptr& o) { + return o->first_packet_sent(); + }), + kMaxWaitForFramesMs); + EXPECT_TRUE_WAIT( + absl::c_all_of(callee()->rtp_sender_observers(), + [](const std::unique_ptr& o) { + return o->first_packet_sent(); + }), + kMaxWaitForFramesMs); + // If new observers are set after the first packet was already sent, the + // callback should still be invoked. + caller()->ResetRtpSenderObservers(); + callee()->ResetRtpSenderObservers(); + EXPECT_EQ(2U, caller()->rtp_sender_observers().size()); + EXPECT_EQ(2U, callee()->rtp_sender_observers().size()); + EXPECT_TRUE( + absl::c_all_of(caller()->rtp_sender_observers(), + [](const std::unique_ptr& o) { + return o->first_packet_sent(); + })); + EXPECT_TRUE( + absl::c_all_of(callee()->rtp_sender_observers(), + [](const std::unique_ptr& o) { + return o->first_packet_sent(); + })); +} + class DummyDtmfObserver : public DtmfSenderObserverInterface { public: DummyDtmfObserver() : completed_(false) {} diff --git a/pc/rtp_sender.cc b/pc/rtp_sender.cc index 2a38fcaf3e..de24df5c9a 100644 --- a/pc/rtp_sender.cc +++ b/pc/rtp_sender.cc @@ -407,6 +407,23 @@ void RtpSenderBase::SetParametersAsync(const RtpParameters& parameters, false); } +void RtpSenderBase::SetObserver(RtpSenderObserverInterface* observer) { + RTC_DCHECK_RUN_ON(signaling_thread_); + observer_ = observer; + // Deliver any notifications the observer may have missed by being set late. + if (sent_first_packet_ && observer_) { + observer_->OnFirstPacketSent(media_type()); + } +} + +void RtpSenderBase::NotifyFirstPacketSent() { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (observer_) { + observer_->OnFirstPacketSent(media_type()); + } + sent_first_packet_ = true; +} + void RtpSenderBase::set_stream_ids(const std::vector& stream_ids) { stream_ids_.clear(); absl::c_copy_if(stream_ids, std::back_inserter(stream_ids_), diff --git a/pc/rtp_sender.h b/pc/rtp_sender.h index d8a4d8281a..010e230328 100644 --- a/pc/rtp_sender.h +++ b/pc/rtp_sender.h @@ -106,6 +106,8 @@ class RtpSenderInternal : public RtpSenderInterface { // selected codecs. virtual void SetSendCodecs(std::vector send_codecs) = 0; virtual std::vector GetSendCodecs() const = 0; + + virtual void NotifyFirstPacketSent() = 0; }; // Shared implementation for RtpSenderInternal interface. @@ -230,6 +232,9 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { return send_codecs_; } + void NotifyFirstPacketSent() override; + void SetObserver(RtpSenderObserverInterface* observer) override; + protected: // If `set_streams_observer` is not null, it is invoked when SetStreams() // is called. `set_streams_observer` is not owned by this object. If not @@ -288,6 +293,8 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { std::vector disabled_rids_; SetStreamsObserver* set_streams_observer_ = nullptr; + RtpSenderObserverInterface* observer_ = nullptr; + bool sent_first_packet_ = false; rtc::scoped_refptr frame_transformer_; std::unique_ptr diff --git a/pc/rtp_sender_proxy.h b/pc/rtp_sender_proxy.h index 8ed32f63da..69e2e865ca 100644 --- a/pc/rtp_sender_proxy.h +++ b/pc/rtp_sender_proxy.h @@ -43,6 +43,7 @@ PROXY_CONSTMETHOD0(rtc::scoped_refptr, GetDtmfSender) PROXY_METHOD1(void, SetFrameEncryptor, rtc::scoped_refptr) +PROXY_METHOD1(void, SetObserver, RtpSenderObserverInterface*) PROXY_CONSTMETHOD0(rtc::scoped_refptr, GetFrameEncryptor) PROXY_METHOD1(void, SetStreams, const std::vector&) diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index c2755041af..18a81c67b7 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -356,6 +356,7 @@ void RtpTransceiver::SetChannel( context()->network_thread()->BlockingCall([&]() { if (channel_) { channel_->SetFirstPacketReceivedCallback(nullptr); + channel_->SetFirstPacketSentCallback(nullptr); channel_->SetRtpTransport(nullptr); channel_to_delete = std::move(channel_); } @@ -368,6 +369,11 @@ void RtpTransceiver::SetChannel( thread->PostTask( SafeTask(std::move(flag), [this]() { OnFirstPacketReceived(); })); }); + channel_->SetFirstPacketSentCallback( + [thread = thread_, flag = signaling_thread_safety_, this]() mutable { + thread->PostTask( + SafeTask(std::move(flag), [this]() { OnFirstPacketSent(); })); + }); }); PushNewMediaChannelAndDeleteChannel(nullptr); @@ -392,6 +398,7 @@ void RtpTransceiver::ClearChannel() { context()->network_thread()->BlockingCall([&]() { if (channel_) { channel_->SetFirstPacketReceivedCallback(nullptr); + channel_->SetFirstPacketSentCallback(nullptr); channel_->SetRtpTransport(nullptr); channel_to_delete = std::move(channel_); } @@ -523,6 +530,12 @@ void RtpTransceiver::OnFirstPacketReceived() { } } +void RtpTransceiver::OnFirstPacketSent() { + for (const auto& sender : senders_) { + sender->internal()->NotifyFirstPacketSent(); + } +} + rtc::scoped_refptr RtpTransceiver::sender() const { RTC_DCHECK(unified_plan_); RTC_CHECK_EQ(1u, senders_.size()); diff --git a/pc/rtp_transceiver.h b/pc/rtp_transceiver.h index 610b842db7..d9c38c1fce 100644 --- a/pc/rtp_transceiver.h +++ b/pc/rtp_transceiver.h @@ -302,6 +302,7 @@ class RtpTransceiver : public RtpTransceiverInterface { } ConnectionContext* context() const { return context_; } void OnFirstPacketReceived(); + void OnFirstPacketSent(); void StopSendingAndReceiving(); // Delete a channel, and ensure that references to its media channel // are updated before deleting it. diff --git a/pc/test/integration_test_helpers.h b/pc/test/integration_test_helpers.h index 15bbc1ab60..9a218d9893 100644 --- a/pc/test/integration_test_helpers.h +++ b/pc/test/integration_test_helpers.h @@ -230,6 +230,25 @@ class MockRtpReceiverObserver : public RtpReceiverObserverInterface { cricket::MediaType expected_media_type_; }; +class MockRtpSenderObserver : public RtpSenderObserverInterface { + public: + explicit MockRtpSenderObserver(cricket::MediaType media_type) + : expected_media_type_(media_type) {} + + void OnFirstPacketSent(cricket::MediaType media_type) override { + ASSERT_EQ(expected_media_type_, media_type); + first_packet_sent_ = true; + } + + bool first_packet_sent() const { return first_packet_sent_; } + + virtual ~MockRtpSenderObserver() {} + + private: + bool first_packet_sent_ = false; + cricket::MediaType expected_media_type_; +}; + // Helper class that wraps a peer connection, observes it, and can accept // signaling messages from another wrapper. // @@ -335,6 +354,7 @@ class PeerConnectionIntegrationWrapper : public PeerConnectionObserver, void AddAudioVideoTracks() { AddAudioTrack(); AddVideoTrack(); + ResetRtpSenderObservers(); } rtc::scoped_refptr AddAudioTrack() { @@ -618,6 +638,22 @@ class PeerConnectionIntegrationWrapper : public PeerConnectionObserver, } } + const std::vector>& + rtp_sender_observers() { + return rtp_sender_observers_; + } + + void ResetRtpSenderObservers() { + rtp_sender_observers_.clear(); + for (const rtc::scoped_refptr& sender : + pc()->GetSenders()) { + std::unique_ptr observer( + new MockRtpSenderObserver(sender->media_type())); + sender->SetObserver(observer.get()); + rtp_sender_observers_.push_back(std::move(observer)); + } + } + rtc::FakeNetworkManager* network_manager() const { return fake_network_manager_.get(); } @@ -1126,6 +1162,7 @@ class PeerConnectionIntegrationWrapper : public PeerConnectionObserver, std::vector> data_observers_; std::vector> rtp_receiver_observers_; + std::vector> rtp_sender_observers_; std::vector ice_connection_state_history_; diff --git a/pc/test/mock_channel_interface.h b/pc/test/mock_channel_interface.h index 6b85ed8d11..8a66365e86 100644 --- a/pc/test/mock_channel_interface.h +++ b/pc/test/mock_channel_interface.h @@ -56,6 +56,10 @@ class MockChannelInterface : public cricket::ChannelInterface { SetFirstPacketReceivedCallback, (std::function), (override)); + MOCK_METHOD(void, + SetFirstPacketSentCallback, + (std::function), + (override)); MOCK_METHOD(bool, SetLocalContent, (const cricket::MediaContentDescription*, diff --git a/pc/test/mock_rtp_sender_internal.h b/pc/test/mock_rtp_sender_internal.h index a8ef817ed5..7ad8fbc2c7 100644 --- a/pc/test/mock_rtp_sender_internal.h +++ b/pc/test/mock_rtp_sender_internal.h @@ -93,6 +93,7 @@ class MockRtpSenderInternal : public RtpSenderInternal { SetEncoderSelector, (std::unique_ptr), (override)); + MOCK_METHOD(void, SetObserver, (RtpSenderObserverInterface*), (override)); // RtpSenderInternal methods. MOCK_METHOD1(SetMediaChannel, void(cricket::MediaSendChannelInterface*)); @@ -106,6 +107,7 @@ class MockRtpSenderInternal : public RtpSenderInternal { MOCK_METHOD1(DisableEncodingLayers, RTCError(const std::vector&)); MOCK_METHOD0(SetTransceiverAsStopped, void()); + MOCK_METHOD(void, NotifyFirstPacketSent, (), (override)); }; } // namespace webrtc diff --git a/rtc_base/network/sent_packet.h b/rtc_base/network/sent_packet.h index 3e6f0d0a70..75b09c4646 100644 --- a/rtc_base/network/sent_packet.h +++ b/rtc_base/network/sent_packet.h @@ -44,6 +44,9 @@ struct RTC_EXPORT PacketInfo { bool included_in_feedback = false; bool included_in_allocation = false; + // `is_media` is true if this is an audio or video packet, excluding + // retransmissions. + bool is_media = false; PacketType packet_type = PacketType::kUnknown; PacketInfoProtocolType protocol = PacketInfoProtocolType::kUnknown; // A unique id assigned by the network manager, and std::nullopt if not set. diff --git a/test/peer_scenario/tests/bwe_ramp_up_test.cc b/test/peer_scenario/tests/bwe_ramp_up_test.cc index 5b7c763035..53ed51367f 100644 --- a/test/peer_scenario/tests/bwe_ramp_up_test.cc +++ b/test/peer_scenario/tests/bwe_ramp_up_test.cc @@ -207,6 +207,11 @@ INSTANTIATE_TEST_SUITE_P( .expected_bwe_min = webrtc::DataRate::KilobitsPerSec(400), }})); +class MockRtpSenderObserver : public RtpSenderObserverInterface { + public: + MOCK_METHOD(void, OnFirstPacketSent, (cricket::MediaType)); +}; + // Test that caller and callee BWE rampup even if no media packets are sent. // - BandWidthEstimationSettings.allow_probe_without_media must be set. // - A Video RtpTransceiver with RTX support needs to be negotiated. @@ -217,8 +222,12 @@ TEST_P(BweRampupWithInitialProbeTest, BweRampUpBothDirectionsWithoutMedia) { PeerScenarioClient* caller = s.CreateClient({}); PeerScenarioClient* callee = s.CreateClient({}); - auto video_result = caller->pc()->AddTransceiver(cricket::MEDIA_TYPE_VIDEO); - ASSERT_EQ(video_result.error().type(), RTCErrorType::NONE); + auto transceiver = caller->pc()->AddTransceiver(cricket::MEDIA_TYPE_VIDEO); + ASSERT_TRUE(transceiver.error().ok()); + + MockRtpSenderObserver observer; + EXPECT_CALL(observer, OnFirstPacketSent).Times(0); + transceiver.value()->sender()->SetObserver(&observer); caller->pc()->ReconfigureBandwidthEstimation( {.allow_probe_without_media = true});