From 7734fc64b9a88812758806d15a417e16818e5f92 Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Thu, 27 Jan 2022 12:29:21 +0000 Subject: [PATCH] Revert "(Un/)Subscribe RtpVideoSender for feedback on the transport queue." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 9d230d54c7eef31ac1100f0aeef1374dd1ac62fa. Reason for revert: Speculative revert to see if it's the cause of a few perf changes (some bad, some not so bad). Bug: webrtc:13613 Original change's description: > (Un/)Subscribe RtpVideoSender for feedback on the transport queue. > > * RtpVideoSender now registers/unregisters for feedback callback > inside of SetActive(), which runs on the transport queue. > * Transport feedback is given on the transport queue > * Registration/unregistration for feedback is done on the same > * Removed the last mutex from TransportFeedbackDemuxer. > > Ultimately, this work is related to moving state from the Call > class, that's related to network configuration, but due to the code > is currently written is attached to the worker thread, over to the > Transport, where it's used (e.g. suspended_video_send_ssrcs_). > > Bug: webrtc:13517, webrtc:11993 > Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165 > Reviewed-by: Erik Språng > Commit-Queue: Tomas Gunnarsson > Cr-Commit-Position: refs/heads/main@{#35777} # Not skipping CQ checks because original CL landed > 1 day ago. Bug: webrtc:13517, webrtc:11993 Change-Id: I824623b3b1c14f0ca7049a2a0890c6d97b7fb608 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249600 Reviewed-by: Tomas Gunnarsson Reviewed-by: Harald Alvestrand Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#35815} --- call/rtp_transport_controller_send.cc | 2 +- call/rtp_video_sender.cc | 17 ++--- call/rtp_video_sender.h | 1 - call/rtp_video_sender_unittest.cc | 66 ++++++------------- .../rtp/transport_feedback_demuxer.cc | 32 +++++---- .../rtp/transport_feedback_demuxer.h | 8 ++- 6 files changed, 51 insertions(+), 75 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 230b048ce4..fc4f483087 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -572,10 +572,10 @@ void RtpTransportControllerSend::OnAddPacket( void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { + feedback_demuxer_.OnTransportFeedback(feedback); auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds()); task_queue_.PostTask([this, feedback, feedback_time]() { RTC_DCHECK_RUN_ON(&task_queue_); - feedback_demuxer_.OnTransportFeedback(feedback); absl::optional feedback_msg = transport_feedback_adapter_.ProcessTransportFeedback(feedback, feedback_time); diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc index 35e6beeb7c..78cf2817b3 100644 --- a/call/rtp_video_sender.cc +++ b/call/rtp_video_sender.cc @@ -445,6 +445,9 @@ RtpVideoSender::RtpVideoSender( fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled()); fec_controller_->SetProtectionCallback(this); + // Signal congestion controller this object is ready for OnPacket* callbacks. + transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver( + rtp_config_.ssrcs, this); // Construction happens on the worker thread (see Call::CreateVideoSendStream) // but subseqeuent calls to the RTP state will happen on one of two threads: @@ -463,8 +466,8 @@ RtpVideoSender::~RtpVideoSender() { SetActiveModulesLocked( std::vector(rtp_streams_.size(), /*active=*/false)); - - RTC_DCHECK(!registered_for_feedback_); + transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver( + this); } void RtpVideoSender::SetActive(bool active) { @@ -472,18 +475,8 @@ void RtpVideoSender::SetActive(bool active) { MutexLock lock(&mutex_); if (active_ == active) return; - const std::vector active_modules(rtp_streams_.size(), active); SetActiveModulesLocked(active_modules); - - auto* feedback_provider = transport_->GetStreamFeedbackProvider(); - if (active && !registered_for_feedback_) { - feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this); - registered_for_feedback_ = true; - } else if (!active && registered_for_feedback_) { - feedback_provider->DeRegisterStreamFeedbackObserver(this); - registered_for_feedback_ = false; - } } void RtpVideoSender::SetActiveModules(const std::vector active_modules) { diff --git a/call/rtp_video_sender.h b/call/rtp_video_sender.h index 7023804506..ea0d1ee87d 100644 --- a/call/rtp_video_sender.h +++ b/call/rtp_video_sender.h @@ -180,7 +180,6 @@ class RtpVideoSender : public RtpVideoSenderInterface, // transport task queue. mutable Mutex mutex_; bool active_ RTC_GUARDED_BY(mutex_); - bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false; const std::unique_ptr fec_controller_; bool fec_allowed_ RTC_GUARDED_BY(mutex_); diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index c47717da7f..0644556e2f 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -13,7 +13,6 @@ #include #include #include -#include #include "call/rtp_transport_controller_send.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -178,33 +177,10 @@ class RtpVideoSenderTestFixture { /*frame_count_observer=*/nullptr, /*frame_transformer=*/nullptr) {} - ~RtpVideoSenderTestFixture() { SetActive(false); } - RtpVideoSender* router() { return router_.get(); } MockTransport& transport() { return transport_; } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } - void SetActive(bool active) { - RunOnTransportQueue([&]() { router_->SetActive(active); }); - } - - void SetActiveModules(const std::vector& active_modules) { - RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); }); - } - - // Several RtpVideoSender methods expect to be called on the task queue as - // owned by the send transport. While the SequenceChecker may pick up the - // default thread as the transport queue, explicit checks for the transport - // queue (not just using a SequenceChecker) aren't possible unless such a - // queue is actually active. So RunOnTransportQueue is a convenience function - // that allow for running a closure on the transport queue, similar to - // SendTask(). - template - void RunOnTransportQueue(Closure&& task) { - transport_controller_.GetWorkerQueue()->PostTask(std::move(task)); - AdvanceTime(TimeDelta::Millis(0)); - } - private: NiceMock transport_; NiceMock encoder_feedback_; @@ -241,15 +217,15 @@ TEST(RtpVideoSenderTest, SendOnOneModule) { EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.SetActive(true); + test.router()->SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.SetActive(false); + test.router()->SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.SetActive(true); + test.router()->SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); } @@ -268,7 +244,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { CodecSpecificInfo codec_info; codec_info.codecType = kVideoCodecVP8; - test.SetActive(true); + test.router()->SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); @@ -278,7 +254,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { test.router()->OnEncodedImage(encoded_image_2, &codec_info).error); // Inactive. - test.SetActive(false); + test.router()->SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); EXPECT_NE(EncodedImageCallback::Result::OK, @@ -308,14 +284,14 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { // Only setting one stream to active will still set the payload router to // active and allow sending data on the active stream. std::vector active_modules({true, false}); - test.SetActiveModules(active_modules); + test.router()->SetActiveModules(active_modules); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); // Setting both streams to inactive will turn the payload router to // inactive. active_modules = {false, false}; - test.SetActiveModules(active_modules); + test.router()->SetActiveModules(active_modules); // An incoming encoded image will not ask the module to send outgoing data // because the payload router is inactive. EXPECT_NE(EncodedImageCallback::Result::OK, @@ -327,7 +303,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -352,7 +328,7 @@ TEST(RtpVideoSenderTest, CreateWithPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, states); - test.SetActive(true); + test.router()->SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -392,7 +368,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { test.router()->OnEncodedImage(encoded_image, nullptr).error); ::testing::Mock::VerifyAndClearExpectations(&callback); - test.SetActive(true); + test.router()->SetActive(true); FrameCounts frame_counts; EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1)) @@ -421,7 +397,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); constexpr uint8_t kPayload = 'a'; EncodedImage encoded_image; @@ -520,8 +496,8 @@ TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { } // This tests that we utilize transport wide feedback to retransmit lost -// packets. This is tested by dropping all ordinary packets from a "lossy" -// stream sent along with a secondary untouched stream. The transport wide +// packets. This is tested by dropping all ordirary packets from a "lossy" +// stream send along with an secondary untouched stream. The transport wide // feedback packets from the secondary stream allows the sending side to // detect and retreansmit the lost packets from the lossy stream. TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { @@ -586,7 +562,7 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { TEST(RtpVideoSenderTest, EarlyRetransmits) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); const uint8_t kPayload[1] = {'a'}; EncodedImage encoded_image; @@ -681,7 +657,7 @@ TEST(RtpVideoSenderTest, EarlyRetransmits) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -741,7 +717,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -797,7 +773,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9NotProvidedByEncoder) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -852,7 +828,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { test::ScopedFieldTrials field_trials( "WebRTC-GenericCodecDependencyDescriptor/Enabled/"); RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -898,7 +874,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -956,7 +932,7 @@ TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {}); - test.SetActive(true); + test.router()->SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -1041,7 +1017,7 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) { kRtpHeaderSizeBytes + kTransportPacketOverheadBytes; RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes); - test.SetActive(true); + test.router()->SetActive(true); { test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000), diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc index 50987b2302..29accb5be0 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc @@ -44,7 +44,11 @@ void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver( } void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { - RTC_DCHECK_RUN_ON(&observer_checker_); + // Currently called on the send transport queue. + // TODO(tommi): When registration/unregistration as well as + // OnTransportFeedback callbacks occur on the transport queue, we can remove + // this lock. + MutexLock lock(&lock_); StreamFeedbackObserver::StreamPacketInfo info; info.ssrc = packet_info.media_ssrc; @@ -62,22 +66,24 @@ void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { void TransportFeedbackDemuxer::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { - RTC_DCHECK_RUN_ON(&observer_checker_); - std::vector stream_feedbacks; - for (const auto& packet : feedback.GetAllPackets()) { - int64_t seq_num = - seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); - auto it = history_.find(seq_num); - if (it != history_.end()) { - auto packet_info = it->second; - packet_info.received = packet.received(); - stream_feedbacks.push_back(std::move(packet_info)); - if (packet.received()) - history_.erase(it); + { + MutexLock lock(&lock_); + for (const auto& packet : feedback.GetAllPackets()) { + int64_t seq_num = + seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); + auto it = history_.find(seq_num); + if (it != history_.end()) { + auto packet_info = it->second; + packet_info.received = packet.received(); + stream_feedbacks.push_back(std::move(packet_info)); + if (packet.received()) + history_.erase(it); + } } } + RTC_DCHECK_RUN_ON(&observer_checker_); for (auto& observer : observers_) { std::vector selected_feedback; for (const auto& packet_info : stream_feedbacks) { diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h index 7f4f5750d2..895288f776 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h @@ -17,6 +17,7 @@ #include "api/sequence_checker.h" #include "modules/include/module_common_types_public.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/no_unique_address.h" namespace webrtc { @@ -44,14 +45,15 @@ class TransportFeedbackDemuxer final : public StreamFeedbackProvider { void OnTransportFeedback(const rtcp::TransportFeedback& feedback); private: - RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; - SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_); + Mutex lock_; + SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); std::map history_ - RTC_GUARDED_BY(&observer_checker_); + RTC_GUARDED_BY(&lock_); // Maps a set of ssrcs to corresponding observer. Vectors are used rather than // set/map to ensure that the processing order is consistent independently of // the randomized ssrcs. + RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; std::vector, StreamFeedbackObserver*>> observers_ RTC_GUARDED_BY(&observer_checker_); };