diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index fc4f483087..230b048ce4 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -572,10 +572,10 @@ void RtpTransportControllerSend::OnAddPacket( void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { - feedback_demuxer_.OnTransportFeedback(feedback); auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds()); task_queue_.PostTask([this, feedback, feedback_time]() { RTC_DCHECK_RUN_ON(&task_queue_); + feedback_demuxer_.OnTransportFeedback(feedback); absl::optional feedback_msg = transport_feedback_adapter_.ProcessTransportFeedback(feedback, feedback_time); diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc index 78cf2817b3..35e6beeb7c 100644 --- a/call/rtp_video_sender.cc +++ b/call/rtp_video_sender.cc @@ -445,9 +445,6 @@ RtpVideoSender::RtpVideoSender( fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled()); fec_controller_->SetProtectionCallback(this); - // Signal congestion controller this object is ready for OnPacket* callbacks. - transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver( - rtp_config_.ssrcs, this); // Construction happens on the worker thread (see Call::CreateVideoSendStream) // but subseqeuent calls to the RTP state will happen on one of two threads: @@ -466,8 +463,8 @@ RtpVideoSender::~RtpVideoSender() { SetActiveModulesLocked( std::vector(rtp_streams_.size(), /*active=*/false)); - transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver( - this); + + RTC_DCHECK(!registered_for_feedback_); } void RtpVideoSender::SetActive(bool active) { @@ -475,8 +472,18 @@ void RtpVideoSender::SetActive(bool active) { MutexLock lock(&mutex_); if (active_ == active) return; + const std::vector active_modules(rtp_streams_.size(), active); SetActiveModulesLocked(active_modules); + + auto* feedback_provider = transport_->GetStreamFeedbackProvider(); + if (active && !registered_for_feedback_) { + feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this); + registered_for_feedback_ = true; + } else if (!active && registered_for_feedback_) { + feedback_provider->DeRegisterStreamFeedbackObserver(this); + registered_for_feedback_ = false; + } } void RtpVideoSender::SetActiveModules(const std::vector active_modules) { diff --git a/call/rtp_video_sender.h b/call/rtp_video_sender.h index ea0d1ee87d..7023804506 100644 --- a/call/rtp_video_sender.h +++ b/call/rtp_video_sender.h @@ -180,6 +180,7 @@ class RtpVideoSender : public RtpVideoSenderInterface, // transport task queue. mutable Mutex mutex_; bool active_ RTC_GUARDED_BY(mutex_); + bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false; const std::unique_ptr fec_controller_; bool fec_allowed_ RTC_GUARDED_BY(mutex_); diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 0644556e2f..c47717da7f 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include "call/rtp_transport_controller_send.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -177,10 +178,33 @@ class RtpVideoSenderTestFixture { /*frame_count_observer=*/nullptr, /*frame_transformer=*/nullptr) {} + ~RtpVideoSenderTestFixture() { SetActive(false); } + RtpVideoSender* router() { return router_.get(); } MockTransport& transport() { return transport_; } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } + void SetActive(bool active) { + RunOnTransportQueue([&]() { router_->SetActive(active); }); + } + + void SetActiveModules(const std::vector& active_modules) { + RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); }); + } + + // Several RtpVideoSender methods expect to be called on the task queue as + // owned by the send transport. While the SequenceChecker may pick up the + // default thread as the transport queue, explicit checks for the transport + // queue (not just using a SequenceChecker) aren't possible unless such a + // queue is actually active. So RunOnTransportQueue is a convenience function + // that allow for running a closure on the transport queue, similar to + // SendTask(). + template + void RunOnTransportQueue(Closure&& task) { + transport_controller_.GetWorkerQueue()->PostTask(std::move(task)); + AdvanceTime(TimeDelta::Millis(0)); + } + private: NiceMock transport_; NiceMock encoder_feedback_; @@ -217,15 +241,15 @@ TEST(RtpVideoSenderTest, SendOnOneModule) { EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(false); + test.SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); } @@ -244,7 +268,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { CodecSpecificInfo codec_info; codec_info.codecType = kVideoCodecVP8; - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); @@ -254,7 +278,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { test.router()->OnEncodedImage(encoded_image_2, &codec_info).error); // Inactive. - test.router()->SetActive(false); + test.SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); EXPECT_NE(EncodedImageCallback::Result::OK, @@ -284,14 +308,14 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { // Only setting one stream to active will still set the payload router to // active and allow sending data on the active stream. std::vector active_modules({true, false}); - test.router()->SetActiveModules(active_modules); + test.SetActiveModules(active_modules); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); // Setting both streams to inactive will turn the payload router to // inactive. active_modules = {false, false}; - test.router()->SetActiveModules(active_modules); + test.SetActiveModules(active_modules); // An incoming encoded image will not ask the module to send outgoing data // because the payload router is inactive. EXPECT_NE(EncodedImageCallback::Result::OK, @@ -303,7 +327,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -328,7 +352,7 @@ TEST(RtpVideoSenderTest, CreateWithPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, states); - test.router()->SetActive(true); + test.SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -368,7 +392,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { test.router()->OnEncodedImage(encoded_image, nullptr).error); ::testing::Mock::VerifyAndClearExpectations(&callback); - test.router()->SetActive(true); + test.SetActive(true); FrameCounts frame_counts; EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1)) @@ -397,7 +421,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); constexpr uint8_t kPayload = 'a'; EncodedImage encoded_image; @@ -496,8 +520,8 @@ TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { } // This tests that we utilize transport wide feedback to retransmit lost -// packets. This is tested by dropping all ordirary packets from a "lossy" -// stream send along with an secondary untouched stream. The transport wide +// packets. This is tested by dropping all ordinary packets from a "lossy" +// stream sent along with a secondary untouched stream. The transport wide // feedback packets from the secondary stream allows the sending side to // detect and retreansmit the lost packets from the lossy stream. TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { @@ -562,7 +586,7 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { TEST(RtpVideoSenderTest, EarlyRetransmits) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); const uint8_t kPayload[1] = {'a'}; EncodedImage encoded_image; @@ -657,7 +681,7 @@ TEST(RtpVideoSenderTest, EarlyRetransmits) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -717,7 +741,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -773,7 +797,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9NotProvidedByEncoder) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -828,7 +852,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { test::ScopedFieldTrials field_trials( "WebRTC-GenericCodecDependencyDescriptor/Enabled/"); RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -874,7 +898,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -932,7 +956,7 @@ TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -1017,7 +1041,7 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) { kRtpHeaderSizeBytes + kTransportPacketOverheadBytes; RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes); - test.router()->SetActive(true); + test.SetActive(true); { test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000), diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc index 29accb5be0..50987b2302 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc @@ -44,11 +44,7 @@ void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver( } void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { - // Currently called on the send transport queue. - // TODO(tommi): When registration/unregistration as well as - // OnTransportFeedback callbacks occur on the transport queue, we can remove - // this lock. - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(&observer_checker_); StreamFeedbackObserver::StreamPacketInfo info; info.ssrc = packet_info.media_ssrc; @@ -66,24 +62,22 @@ void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { void TransportFeedbackDemuxer::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { + RTC_DCHECK_RUN_ON(&observer_checker_); + std::vector stream_feedbacks; - { - MutexLock lock(&lock_); - for (const auto& packet : feedback.GetAllPackets()) { - int64_t seq_num = - seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); - auto it = history_.find(seq_num); - if (it != history_.end()) { - auto packet_info = it->second; - packet_info.received = packet.received(); - stream_feedbacks.push_back(std::move(packet_info)); - if (packet.received()) - history_.erase(it); - } + for (const auto& packet : feedback.GetAllPackets()) { + int64_t seq_num = + seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); + auto it = history_.find(seq_num); + if (it != history_.end()) { + auto packet_info = it->second; + packet_info.received = packet.received(); + stream_feedbacks.push_back(std::move(packet_info)); + if (packet.received()) + history_.erase(it); } } - RTC_DCHECK_RUN_ON(&observer_checker_); for (auto& observer : observers_) { std::vector selected_feedback; for (const auto& packet_info : stream_feedbacks) { diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h index 895288f776..7f4f5750d2 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h @@ -17,7 +17,6 @@ #include "api/sequence_checker.h" #include "modules/include/module_common_types_public.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/no_unique_address.h" namespace webrtc { @@ -45,15 +44,14 @@ class TransportFeedbackDemuxer final : public StreamFeedbackProvider { void OnTransportFeedback(const rtcp::TransportFeedback& feedback); private: - Mutex lock_; - SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); + RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; + SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_); std::map history_ - RTC_GUARDED_BY(&lock_); + RTC_GUARDED_BY(&observer_checker_); // Maps a set of ssrcs to corresponding observer. Vectors are used rather than // set/map to ensure that the processing order is consistent independently of // the randomized ssrcs. - RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; std::vector, StreamFeedbackObserver*>> observers_ RTC_GUARDED_BY(&observer_checker_); };