From 9d230d54c7eef31ac1100f0aeef1374dd1ac62fa Mon Sep 17 00:00:00 2001 From: Tommi Date: Mon, 24 Jan 2022 17:56:16 +0100 Subject: [PATCH] (Un/)Subscribe RtpVideoSender for feedback on the transport queue. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * RtpVideoSender now registers/unregisters for feedback callback inside of SetActive(), which runs on the transport queue. * Transport feedback is given on the transport queue * Registration/unregistration for feedback is done on the same * Removed the last mutex from TransportFeedbackDemuxer. Ultimately, this work is related to moving state from the Call class, that's related to network configuration, but due to the code is currently written is attached to the worker thread, over to the Transport, where it's used (e.g. suspended_video_send_ssrcs_). Bug: webrtc:13517, webrtc:11993 Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165 Reviewed-by: Erik Språng Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#35777} --- call/rtp_transport_controller_send.cc | 2 +- call/rtp_video_sender.cc | 17 +++-- call/rtp_video_sender.h | 1 + call/rtp_video_sender_unittest.cc | 66 +++++++++++++------ .../rtp/transport_feedback_demuxer.cc | 32 ++++----- .../rtp/transport_feedback_demuxer.h | 8 +-- 6 files changed, 75 insertions(+), 51 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index fc4f483087..230b048ce4 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -572,10 +572,10 @@ void RtpTransportControllerSend::OnAddPacket( void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { - feedback_demuxer_.OnTransportFeedback(feedback); auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds()); task_queue_.PostTask([this, feedback, feedback_time]() { RTC_DCHECK_RUN_ON(&task_queue_); + feedback_demuxer_.OnTransportFeedback(feedback); absl::optional feedback_msg = transport_feedback_adapter_.ProcessTransportFeedback(feedback, feedback_time); diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc index 78cf2817b3..35e6beeb7c 100644 --- a/call/rtp_video_sender.cc +++ b/call/rtp_video_sender.cc @@ -445,9 +445,6 @@ RtpVideoSender::RtpVideoSender( fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled()); fec_controller_->SetProtectionCallback(this); - // Signal congestion controller this object is ready for OnPacket* callbacks. - transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver( - rtp_config_.ssrcs, this); // Construction happens on the worker thread (see Call::CreateVideoSendStream) // but subseqeuent calls to the RTP state will happen on one of two threads: @@ -466,8 +463,8 @@ RtpVideoSender::~RtpVideoSender() { SetActiveModulesLocked( std::vector(rtp_streams_.size(), /*active=*/false)); - transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver( - this); + + RTC_DCHECK(!registered_for_feedback_); } void RtpVideoSender::SetActive(bool active) { @@ -475,8 +472,18 @@ void RtpVideoSender::SetActive(bool active) { MutexLock lock(&mutex_); if (active_ == active) return; + const std::vector active_modules(rtp_streams_.size(), active); SetActiveModulesLocked(active_modules); + + auto* feedback_provider = transport_->GetStreamFeedbackProvider(); + if (active && !registered_for_feedback_) { + feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this); + registered_for_feedback_ = true; + } else if (!active && registered_for_feedback_) { + feedback_provider->DeRegisterStreamFeedbackObserver(this); + registered_for_feedback_ = false; + } } void RtpVideoSender::SetActiveModules(const std::vector active_modules) { diff --git a/call/rtp_video_sender.h b/call/rtp_video_sender.h index 9832246665..378f902d75 100644 --- a/call/rtp_video_sender.h +++ b/call/rtp_video_sender.h @@ -180,6 +180,7 @@ class RtpVideoSender : public RtpVideoSenderInterface, // transport task queue. mutable Mutex mutex_; bool active_ RTC_GUARDED_BY(mutex_); + bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false; const std::unique_ptr fec_controller_; bool fec_allowed_ RTC_GUARDED_BY(mutex_); diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 0644556e2f..c47717da7f 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include "call/rtp_transport_controller_send.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -177,10 +178,33 @@ class RtpVideoSenderTestFixture { /*frame_count_observer=*/nullptr, /*frame_transformer=*/nullptr) {} + ~RtpVideoSenderTestFixture() { SetActive(false); } + RtpVideoSender* router() { return router_.get(); } MockTransport& transport() { return transport_; } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } + void SetActive(bool active) { + RunOnTransportQueue([&]() { router_->SetActive(active); }); + } + + void SetActiveModules(const std::vector& active_modules) { + RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); }); + } + + // Several RtpVideoSender methods expect to be called on the task queue as + // owned by the send transport. While the SequenceChecker may pick up the + // default thread as the transport queue, explicit checks for the transport + // queue (not just using a SequenceChecker) aren't possible unless such a + // queue is actually active. So RunOnTransportQueue is a convenience function + // that allow for running a closure on the transport queue, similar to + // SendTask(). + template + void RunOnTransportQueue(Closure&& task) { + transport_controller_.GetWorkerQueue()->PostTask(std::move(task)); + AdvanceTime(TimeDelta::Millis(0)); + } + private: NiceMock transport_; NiceMock encoder_feedback_; @@ -217,15 +241,15 @@ TEST(RtpVideoSenderTest, SendOnOneModule) { EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(false); + test.SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); } @@ -244,7 +268,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { CodecSpecificInfo codec_info; codec_info.codecType = kVideoCodecVP8; - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); @@ -254,7 +278,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { test.router()->OnEncodedImage(encoded_image_2, &codec_info).error); // Inactive. - test.router()->SetActive(false); + test.SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); EXPECT_NE(EncodedImageCallback::Result::OK, @@ -284,14 +308,14 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { // Only setting one stream to active will still set the payload router to // active and allow sending data on the active stream. std::vector active_modules({true, false}); - test.router()->SetActiveModules(active_modules); + test.SetActiveModules(active_modules); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); // Setting both streams to inactive will turn the payload router to // inactive. active_modules = {false, false}; - test.router()->SetActiveModules(active_modules); + test.SetActiveModules(active_modules); // An incoming encoded image will not ask the module to send outgoing data // because the payload router is inactive. EXPECT_NE(EncodedImageCallback::Result::OK, @@ -303,7 +327,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -328,7 +352,7 @@ TEST(RtpVideoSenderTest, CreateWithPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, states); - test.router()->SetActive(true); + test.SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -368,7 +392,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { test.router()->OnEncodedImage(encoded_image, nullptr).error); ::testing::Mock::VerifyAndClearExpectations(&callback); - test.router()->SetActive(true); + test.SetActive(true); FrameCounts frame_counts; EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1)) @@ -397,7 +421,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); constexpr uint8_t kPayload = 'a'; EncodedImage encoded_image; @@ -496,8 +520,8 @@ TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { } // This tests that we utilize transport wide feedback to retransmit lost -// packets. This is tested by dropping all ordirary packets from a "lossy" -// stream send along with an secondary untouched stream. The transport wide +// packets. This is tested by dropping all ordinary packets from a "lossy" +// stream sent along with a secondary untouched stream. The transport wide // feedback packets from the secondary stream allows the sending side to // detect and retreansmit the lost packets from the lossy stream. TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { @@ -562,7 +586,7 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { TEST(RtpVideoSenderTest, EarlyRetransmits) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); const uint8_t kPayload[1] = {'a'}; EncodedImage encoded_image; @@ -657,7 +681,7 @@ TEST(RtpVideoSenderTest, EarlyRetransmits) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -717,7 +741,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -773,7 +797,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9NotProvidedByEncoder) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -828,7 +852,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { test::ScopedFieldTrials field_trials( "WebRTC-GenericCodecDependencyDescriptor/Enabled/"); RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -874,7 +898,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -932,7 +956,7 @@ TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -1017,7 +1041,7 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) { kRtpHeaderSizeBytes + kTransportPacketOverheadBytes; RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes); - test.router()->SetActive(true); + test.SetActive(true); { test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000), diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc index 29accb5be0..50987b2302 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc @@ -44,11 +44,7 @@ void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver( } void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { - // Currently called on the send transport queue. - // TODO(tommi): When registration/unregistration as well as - // OnTransportFeedback callbacks occur on the transport queue, we can remove - // this lock. - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(&observer_checker_); StreamFeedbackObserver::StreamPacketInfo info; info.ssrc = packet_info.media_ssrc; @@ -66,24 +62,22 @@ void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { void TransportFeedbackDemuxer::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { + RTC_DCHECK_RUN_ON(&observer_checker_); + std::vector stream_feedbacks; - { - MutexLock lock(&lock_); - for (const auto& packet : feedback.GetAllPackets()) { - int64_t seq_num = - seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); - auto it = history_.find(seq_num); - if (it != history_.end()) { - auto packet_info = it->second; - packet_info.received = packet.received(); - stream_feedbacks.push_back(std::move(packet_info)); - if (packet.received()) - history_.erase(it); - } + for (const auto& packet : feedback.GetAllPackets()) { + int64_t seq_num = + seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); + auto it = history_.find(seq_num); + if (it != history_.end()) { + auto packet_info = it->second; + packet_info.received = packet.received(); + stream_feedbacks.push_back(std::move(packet_info)); + if (packet.received()) + history_.erase(it); } } - RTC_DCHECK_RUN_ON(&observer_checker_); for (auto& observer : observers_) { std::vector selected_feedback; for (const auto& packet_info : stream_feedbacks) { diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h index 895288f776..7f4f5750d2 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h @@ -17,7 +17,6 @@ #include "api/sequence_checker.h" #include "modules/include/module_common_types_public.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/no_unique_address.h" namespace webrtc { @@ -45,15 +44,14 @@ class TransportFeedbackDemuxer final : public StreamFeedbackProvider { void OnTransportFeedback(const rtcp::TransportFeedback& feedback); private: - Mutex lock_; - SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); + RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; + SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_); std::map history_ - RTC_GUARDED_BY(&lock_); + RTC_GUARDED_BY(&observer_checker_); // Maps a set of ssrcs to corresponding observer. Vectors are used rather than // set/map to ensure that the processing order is consistent independently of // the randomized ssrcs. - RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; std::vector, StreamFeedbackObserver*>> observers_ RTC_GUARDED_BY(&observer_checker_); };