From 48c44e3543416f79d0e9e3173386829dc06ae621 Mon Sep 17 00:00:00 2001 From: Per K Date: Wed, 31 May 2023 15:38:26 +0200 Subject: [PATCH] Ensure RtpSenderEgress run on worker queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit VoipCore still use RtpSenderEgress::NonPacedPacketSender, therefore packets sent using NonPacedPacketSender::EnqueuePackets are proxied to the worker thead. When NonPacedPacketSender is used, the Pacer already guarantee that packets are sent on the worker queue. Lock is removed from RtpSenderEgress and instead calls must be made on the worker thread. Bug: webrtc:15209 Change-Id: Iaf03377ad8a037ecedbbe588a4c1e8e4eadacd81 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/306960 Reviewed-by: Erik Språng Reviewed-by: Jakob Ivarsson‎ Commit-Queue: Per Kjellander Cr-Commit-Position: refs/heads/main@{#40252} --- audio/voip/test/BUILD.gn | 6 + audio/voip/test/audio_egress_unittest.cc | 33 +++-- audio/voip/test/audio_ingress_unittest.cc | 36 ++--- modules/rtp_rtcp/source/rtp_rtcp_impl2.cc | 36 +++-- modules/rtp_rtcp/source/rtp_rtcp_impl2.h | 3 +- modules/rtp_rtcp/source/rtp_sender_egress.cc | 136 +++++++----------- modules/rtp_rtcp/source/rtp_sender_egress.h | 64 ++++----- .../source/rtp_sender_egress_unittest.cc | 4 +- 8 files changed, 154 insertions(+), 164 deletions(-) diff --git a/audio/voip/test/BUILD.gn b/audio/voip/test/BUILD.gn index e89f2b001a..00e9bee622 100644 --- a/audio/voip/test/BUILD.gn +++ b/audio/voip/test/BUILD.gn @@ -70,6 +70,8 @@ if (rtc_include_tests) { "../../../api/audio_codecs:builtin_audio_decoder_factory", "../../../api/audio_codecs:builtin_audio_encoder_factory", "../../../api/task_queue:default_task_queue_factory", + "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../modules/audio_mixer:audio_mixer_test_utils", "../../../modules/rtp_rtcp:rtp_rtcp", "../../../rtc_base:logging", @@ -77,6 +79,7 @@ if (rtc_include_tests) { "../../../test:mock_transport", "../../../test:run_loop", "../../../test:test_support", + "../../../test/time_controller:time_controller", ] } @@ -88,6 +91,8 @@ if (rtc_include_tests) { "../../../api:transport_api", "../../../api/audio_codecs:builtin_audio_encoder_factory", "../../../api/task_queue:default_task_queue_factory", + "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../modules/audio_mixer:audio_mixer_test_utils", "../../../modules/rtp_rtcp:rtp_rtcp", "../../../modules/rtp_rtcp:rtp_rtcp_format", @@ -96,6 +101,7 @@ if (rtc_include_tests) { "../../../test:mock_transport", "../../../test:run_loop", "../../../test:test_support", + "../../../test/time_controller:time_controller", ] } } diff --git a/audio/voip/test/audio_egress_unittest.cc b/audio/voip/test/audio_egress_unittest.cc index 34c5585347..98679db508 100644 --- a/audio/voip/test/audio_egress_unittest.cc +++ b/audio/voip/test/audio_egress_unittest.cc @@ -13,6 +13,8 @@ #include "api/audio_codecs/builtin_audio_encoder_factory.h" #include "api/call/transport.h" #include "api/task_queue/default_task_queue_factory.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "modules/audio_mixer/sine_wave_generator.h" #include "modules/rtp_rtcp/source/rtp_packet_received.h" #include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h" @@ -22,6 +24,7 @@ #include "test/gtest.h" #include "test/mock_transport.h" #include "test/run_loop.h" +#include "test/time_controller/simulated_time_controller.h" namespace webrtc { namespace { @@ -57,18 +60,18 @@ class AudioEgressTest : public ::testing::Test { static constexpr uint32_t kRemoteSsrc = 0xDEADBEEF; const SdpAudioFormat kPcmuFormat = {"pcmu", 8000, 1}; - AudioEgressTest() - : fake_clock_(kStartTime), wave_generator_(1000.0, kAudioLevel) { - task_queue_factory_ = CreateDefaultTaskQueueFactory(); + AudioEgressTest() : wave_generator_(1000.0, kAudioLevel) { encoder_factory_ = CreateBuiltinAudioEncoderFactory(); } // Prepare test on audio egress by using PCMu codec with specific // sequence number and its status to be running. void SetUp() override { - rtp_rtcp_ = CreateRtpStack(&fake_clock_, &transport_, kRemoteSsrc); - egress_ = std::make_unique(rtp_rtcp_.get(), &fake_clock_, - task_queue_factory_.get()); + rtp_rtcp_ = + CreateRtpStack(time_controller_.GetClock(), &transport_, kRemoteSsrc); + egress_ = std::make_unique( + rtp_rtcp_.get(), time_controller_.GetClock(), + time_controller_.GetTaskQueueFactory()); constexpr int kPcmuPayload = 0; egress_->SetEncoder(kPcmuPayload, kPcmuFormat, encoder_factory_->MakeAudioEncoder( @@ -100,14 +103,10 @@ class AudioEgressTest : public ::testing::Test { return frame; } - test::RunLoop run_loop_; - // SimulatedClock doesn't directly affect this testcase as the the - // AudioFrame's timestamp is driven by GetAudioFrame. - SimulatedClock fake_clock_; + GlobalSimulatedTimeController time_controller_{Timestamp::Micros(kStartTime)}; NiceMock transport_; SineWaveGenerator wave_generator_; std::unique_ptr rtp_rtcp_; - std::unique_ptr task_queue_factory_; rtc::scoped_refptr encoder_factory_; std::unique_ptr egress_; }; @@ -138,7 +137,7 @@ TEST_F(AudioEgressTest, ProcessAudioWithMute) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -174,7 +173,7 @@ TEST_F(AudioEgressTest, ProcessAudioWithSineWave) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -208,7 +207,7 @@ TEST_F(AudioEgressTest, SkipAudioEncodingAfterStopSend) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -222,7 +221,7 @@ TEST_F(AudioEgressTest, SkipAudioEncodingAfterStopSend) { // result in crahses or sanitizer errors due to remaining data. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } } @@ -284,7 +283,7 @@ TEST_F(AudioEgressTest, SendDTMF) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -309,7 +308,7 @@ TEST_F(AudioEgressTest, TestAudioInputLevelAndEnergyDuration) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(/*give_up_after=*/TimeDelta::Seconds(1)); diff --git a/audio/voip/test/audio_ingress_unittest.cc b/audio/voip/test/audio_ingress_unittest.cc index 3c309dbf82..3a082cdda2 100644 --- a/audio/voip/test/audio_ingress_unittest.cc +++ b/audio/voip/test/audio_ingress_unittest.cc @@ -14,6 +14,7 @@ #include "api/audio_codecs/builtin_audio_encoder_factory.h" #include "api/call/transport.h" #include "api/task_queue/default_task_queue_factory.h" +#include "api/units/time_delta.h" #include "audio/voip/audio_egress.h" #include "modules/audio_mixer/sine_wave_generator.h" #include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h" @@ -23,6 +24,7 @@ #include "test/gtest.h" #include "test/mock_transport.h" #include "test/run_loop.h" +#include "test/time_controller/simulated_time_controller.h" namespace webrtc { namespace { @@ -37,12 +39,12 @@ class AudioIngressTest : public ::testing::Test { public: const SdpAudioFormat kPcmuFormat = {"pcmu", 8000, 1}; - AudioIngressTest() - : fake_clock_(123456789), wave_generator_(1000.0, kAudioLevel) { - receive_statistics_ = ReceiveStatistics::Create(&fake_clock_); + AudioIngressTest() : wave_generator_(1000.0, kAudioLevel) { + receive_statistics_ = + ReceiveStatistics::Create(time_controller_.GetClock()); RtpRtcpInterface::Configuration rtp_config; - rtp_config.clock = &fake_clock_; + rtp_config.clock = time_controller_.GetClock(); rtp_config.audio = true; rtp_config.receive_statistics = receive_statistics_.get(); rtp_config.rtcp_report_interval_ms = 5000; @@ -53,20 +55,20 @@ class AudioIngressTest : public ::testing::Test { rtp_rtcp_->SetSendingMediaStatus(false); rtp_rtcp_->SetRTCPStatus(RtcpMode::kCompound); - task_queue_factory_ = CreateDefaultTaskQueueFactory(); encoder_factory_ = CreateBuiltinAudioEncoderFactory(); decoder_factory_ = CreateBuiltinAudioDecoderFactory(); } void SetUp() override { constexpr int kPcmuPayload = 0; - ingress_ = std::make_unique(rtp_rtcp_.get(), &fake_clock_, - receive_statistics_.get(), - decoder_factory_); + ingress_ = std::make_unique( + rtp_rtcp_.get(), time_controller_.GetClock(), receive_statistics_.get(), + decoder_factory_); ingress_->SetReceiveCodecs({{kPcmuPayload, kPcmuFormat}}); - egress_ = std::make_unique(rtp_rtcp_.get(), &fake_clock_, - task_queue_factory_.get()); + egress_ = std::make_unique( + rtp_rtcp_.get(), time_controller_.GetClock(), + time_controller_.GetTaskQueueFactory()); egress_->SetEncoder(kPcmuPayload, kPcmuFormat, encoder_factory_->MakeAudioEncoder( kPcmuPayload, kPcmuFormat, absl::nullopt)); @@ -93,15 +95,13 @@ class AudioIngressTest : public ::testing::Test { return frame; } - test::RunLoop run_loop_; - SimulatedClock fake_clock_; + GlobalSimulatedTimeController time_controller_{Timestamp::Micros(123456789)}; SineWaveGenerator wave_generator_; NiceMock transport_; std::unique_ptr receive_statistics_; std::unique_ptr rtp_rtcp_; rtc::scoped_refptr encoder_factory_; rtc::scoped_refptr decoder_factory_; - std::unique_ptr task_queue_factory_; std::unique_ptr ingress_; std::unique_ptr egress_; }; @@ -122,7 +122,8 @@ TEST_F(AudioIngressTest, GetAudioFrameAfterRtpReceived) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); egress_->SendAudioData(GetAudioFrame(0)); egress_->SendAudioData(GetAudioFrame(1)); - event.Wait(TimeDelta::Seconds(1)); + time_controller_.AdvanceTime(TimeDelta::Zero()); + ASSERT_TRUE(event.Wait(TimeDelta::Seconds(1))); AudioFrame audio_frame; EXPECT_EQ( @@ -153,7 +154,7 @@ TEST_F(AudioIngressTest, TestSpeechOutputLevelAndEnergyDuration) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); for (int i = 0; i < kNumRtp * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(/*give_up_after=*/TimeDelta::Seconds(1)); @@ -182,7 +183,8 @@ TEST_F(AudioIngressTest, PreferredSampleRate) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); egress_->SendAudioData(GetAudioFrame(0)); egress_->SendAudioData(GetAudioFrame(1)); - event.Wait(TimeDelta::Seconds(1)); + time_controller_.AdvanceTime(TimeDelta::Zero()); + ASSERT_TRUE(event.Wait(TimeDelta::Seconds(1))); AudioFrame audio_frame; EXPECT_EQ( @@ -212,7 +214,7 @@ TEST_F(AudioIngressTest, GetMutedAudioFrameAfterRtpReceivedAndStopPlay) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); for (int i = 0; i < kNumRtp * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(/*give_up_after=*/TimeDelta::Seconds(1)); diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc index 0687f32352..d6eddb21b1 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc @@ -64,6 +64,7 @@ RtpPacketHistory::PaddingMode GetPaddingMode( } // namespace ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( + TaskQueueBase& worker_queue, const RtpRtcpInterface::Configuration& config) : packet_history(config.clock, GetPaddingMode(config.field_trials)), sequencer(config.local_media_ssrc, @@ -71,7 +72,7 @@ ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( /*require_marker_before_media_padding=*/!config.audio, config.clock), packet_sender(config, &packet_history), - non_paced_sender(&packet_sender, &sequencer), + non_paced_sender(worker_queue, &packet_sender, &sequencer), packet_generator( config, &packet_history, @@ -94,7 +95,8 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) RTC_DCHECK(worker_queue_); rtcp_thread_checker_.Detach(); if (!configuration.receiver_only) { - rtp_sender_ = std::make_unique(configuration); + rtp_sender_ = + std::make_unique(*worker_queue_, configuration); rtp_sender_->sequencing_checker.Detach(); // Make sure rtcp sender use same timestamp offset as rtp sender. rtcp_sender_.SetTimestampOffset( @@ -259,7 +261,9 @@ RTCPSender::FeedbackState ModuleRtpRtcpImpl2::GetFeedbackState() { state.media_bytes_sent = rtp_stats.transmitted.payload_bytes + rtx_stats.transmitted.payload_bytes; state.send_bitrate = - rtp_sender_->packet_sender.GetSendRates().Sum().bps(); + rtp_sender_->packet_sender.GetSendRates(clock_->CurrentTime()) + .Sum() + .bps(); } state.receiver = &rtcp_receiver_; @@ -310,9 +314,9 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp, int64_t capture_time_ms, int payload_type, bool force_sender_report) { - if (!Sending()) + if (!Sending()) { return false; - + } // TODO(bugs.webrtc.org/12873): Migrate this method and it's users to use // optional Timestamps. absl::optional capture_time; @@ -322,11 +326,20 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp, absl::optional payload_type_optional; if (payload_type >= 0) payload_type_optional = payload_type; - rtcp_sender_.SetLastRtpTime(timestamp, capture_time, payload_type_optional); - // Make sure an RTCP report isn't queued behind a key frame. - if (rtcp_sender_.TimeToSendRTCPReport(force_sender_report)) - rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); + auto closure = [this, timestamp, capture_time, payload_type_optional, + force_sender_report] { + RTC_DCHECK_RUN_ON(worker_queue_); + rtcp_sender_.SetLastRtpTime(timestamp, capture_time, payload_type_optional); + // Make sure an RTCP report isn't queued behind a key frame. + if (rtcp_sender_.TimeToSendRTCPReport(force_sender_report)) + rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); + }; + if (worker_queue_->IsCurrent()) { + closure(); + } else { + worker_queue_->PostTask(SafeTask(task_safety_.flag(), std::move(closure))); + } return true; } @@ -645,9 +658,8 @@ void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) { } RtpSendRates ModuleRtpRtcpImpl2::GetSendRates() const { - // Typically called on the `rtp_transport_queue_` owned by an - // RtpTransportControllerSendInterface instance. - return rtp_sender_->packet_sender.GetSendRates(); + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); + return rtp_sender_->packet_sender.GetSendRates(clock_->CurrentTime()); } void ModuleRtpRtcpImpl2::OnRequestSendReport() { diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h index 1e11713ece..06fe5869b6 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h @@ -252,7 +252,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, FRIEND_TEST_ALL_PREFIXES(RtpRtcpImpl2Test, RttForReceiverOnly); struct RtpSenderContext { - explicit RtpSenderContext(const RtpRtcpInterface::Configuration& config); + explicit RtpSenderContext(TaskQueueBase& worker_queue, + const RtpRtcpInterface::Configuration& config); // Storage of packets, for retransmissions and padding, if applicable. RtpPacketHistory packet_history; SequenceChecker sequencing_checker; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc index 212bc5260b..0adb436286 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc @@ -16,6 +16,7 @@ #include #include "absl/strings/match.h" +#include "api/units/timestamp.h" #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h" #include "rtc_base/logging.h" @@ -30,15 +31,29 @@ constexpr TimeDelta kUpdateInterval = } // namespace RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender( + TaskQueueBase& worker_queue, RtpSenderEgress* sender, PacketSequencer* sequencer) - : transport_sequence_number_(0), sender_(sender), sequencer_(sequencer) { + : worker_queue_(worker_queue), + transport_sequence_number_(0), + sender_(sender), + sequencer_(sequencer) { RTC_DCHECK(sequencer); } -RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() = default; +RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() { + RTC_DCHECK_RUN_ON(&worker_queue_); +} void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets( std::vector> packets) { + if (!worker_queue_.IsCurrent()) { + worker_queue_.PostTask(SafeTask( + task_safety_.flag(), [this, packets = std::move(packets)]() mutable { + EnqueuePackets(std::move(packets)); + })); + return; + } + RTC_DCHECK_RUN_ON(&worker_queue_); for (auto& packet : packets) { PrepareForSend(packet.get()); sender_->SendPacket(std::move(packet), PacedPacketInfo()); @@ -51,6 +66,7 @@ void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets( void RtpSenderEgress::NonPacedPacketSender::PrepareForSend( RtpPacketToSend* packet) { + RTC_DCHECK_RUN_ON(&worker_queue_); // Assign sequence numbers, but not for flexfec which is already running on // an internally maintained sequence number series. if (packet->Ssrc() != sender_->FlexFecSsrc()) { @@ -97,7 +113,6 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, kRtpSequenceNumberMapMaxEntries) : nullptr) { RTC_DCHECK(worker_queue_); - pacer_checker_.Detach(); if (bitrate_callback_) { update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_, kUpdateInterval, [this]() { @@ -114,7 +129,7 @@ RtpSenderEgress::~RtpSenderEgress() { void RtpSenderEgress::SendPacket(std::unique_ptr packet, const PacedPacketInfo& pacing_info) { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(packet); if (packet->Ssrc() == ssrc_ && @@ -139,30 +154,17 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, } const Timestamp now = clock_->CurrentTime(); - #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE - worker_queue_->PostTask(SafeTask(task_safety_.flag(), - [this, now, packet_ssrc = packet->Ssrc()]() { - BweTestLoggingPlot(now, packet_ssrc); - })); + BweTestLoggingPlot(now, packet->Ssrc()); #endif - if (need_rtp_packet_infos_ && packet->packet_type() == RtpPacketToSend::Type::kVideo) { - worker_queue_->PostTask(SafeTask( - task_safety_.flag(), - [this, packet_timestamp = packet->Timestamp(), - is_first_packet_of_frame = packet->is_first_packet_of_frame(), - is_last_packet_of_frame = packet->Marker(), - sequence_number = packet->SequenceNumber()]() { - RTC_DCHECK_RUN_ON(worker_queue_); - // Last packet of a frame, add it to sequence number info map. - const uint32_t timestamp = packet_timestamp - timestamp_offset_; - rtp_sequence_number_map_->InsertPacket( - sequence_number, - RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame, - is_last_packet_of_frame)); - })); + // Last packet of a frame, add it to sequence number info map. + const uint32_t timestamp = packet->Timestamp() - timestamp_offset_; + rtp_sequence_number_map_->InsertPacket( + packet->SequenceNumber(), + RtpSequenceNumberMap::Info( + timestamp, packet->is_first_packet_of_frame(), packet->Marker())); } if (fec_generator_ && packet->fec_protect_packet()) { @@ -171,10 +173,7 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo); absl::optional> new_fec_params; - { - MutexLock lock(&lock_); - new_fec_params.swap(pending_fec_params_); - } + new_fec_params.swap(pending_fec_params_); if (new_fec_params) { fec_generator_->SetProtectionParameters(new_fec_params->first, new_fec_params->second); @@ -234,7 +233,7 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, } void RtpSenderEgress::OnBatchComplete() { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); for (auto& packet : packets_to_send_) { CompleteSendPacket(packet, &packet == &packets_to_send_.back()); } @@ -243,16 +242,14 @@ void RtpSenderEgress::OnBatchComplete() { void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, bool last_in_batch) { + RTC_DCHECK_RUN_ON(worker_queue_); auto& [packet, pacing_info, now] = compound_packet; const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio || packet->packet_type() == RtpPacketMediaType::kVideo; PacketOptions options; - { - MutexLock lock(&lock_); - options.included_in_allocation = force_part_of_allocation_; - } + options.included_in_allocation = force_part_of_allocation_; // Downstream code actually uses this flag to distinguish between media and // everything else. @@ -298,29 +295,12 @@ void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, RtpPacketMediaType packet_type = *packet->packet_type(); RtpPacketCounter counter(*packet); size_t size = packet->size(); - // TODO(crbug.com/1373439): clean up task posting when the combined - // network/worker project launches. - if (TaskQueueBase::Current() != worker_queue_) { - worker_queue_->PostTask(SafeTask( - task_safety_.flag(), [this, now = now, packet_ssrc, packet_type, - counter = std::move(counter), size]() { - RTC_DCHECK_RUN_ON(worker_queue_); - UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), - size); - })); - } else { - RTC_DCHECK_RUN_ON(worker_queue_); - UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), size); - } + UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), size); } } -RtpSendRates RtpSenderEgress::GetSendRates() const { - MutexLock lock(&lock_); - return GetSendRatesLocked(clock_->CurrentTime()); -} - -RtpSendRates RtpSenderEgress::GetSendRatesLocked(Timestamp now) const { +RtpSendRates RtpSenderEgress::GetSendRates(Timestamp now) const { + RTC_DCHECK_RUN_ON(worker_queue_); RtpSendRates current_rates; for (size_t i = 0; i < kNumMediaTypes; ++i) { RtpPacketMediaType type = static_cast(i); @@ -332,26 +312,24 @@ RtpSendRates RtpSenderEgress::GetSendRatesLocked(Timestamp now) const { void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats, StreamDataCounters* rtx_stats) const { - // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are - // only touched on the worker thread. - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(worker_queue_); *rtp_stats = rtp_stats_; *rtx_stats = rtx_rtp_stats_; } void RtpSenderEgress::ForceIncludeSendPacketsInAllocation( bool part_of_allocation) { - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(worker_queue_); force_part_of_allocation_ = part_of_allocation; } bool RtpSenderEgress::MediaHasBeenSent() const { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); return media_has_been_sent_; } void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); media_has_been_sent_ = media_sent; } @@ -387,15 +365,13 @@ std::vector RtpSenderEgress::GetSentRtpPacketInfos( void RtpSenderEgress::SetFecProtectionParameters( const FecProtectionParams& delta_params, const FecProtectionParams& key_params) { - // TODO(sprang): Post task to pacer queue instead, one pacer is fully - // migrated to a task queue. - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(worker_queue_); pending_fec_params_.emplace(delta_params, key_params); } std::vector> RtpSenderEgress::FetchFecPackets() { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); if (fec_generator_) { return fec_generator_->GetFecPackets(); } @@ -404,7 +380,7 @@ RtpSenderEgress::FetchFecPackets() { void RtpSenderEgress::OnAbortedRetransmissions( rtc::ArrayView sequence_numbers) { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); // Mark aborted retransmissions as sent, rather than leaving them in // a 'pending' state - otherwise they can not be requested again and // will not be cleared until the history has reached its max size. @@ -469,13 +445,13 @@ void RtpSenderEgress::AddPacketToTransportFeedback( void RtpSenderEgress::UpdateDelayStatistics(Timestamp capture_time, Timestamp now, uint32_t ssrc) { + RTC_DCHECK_RUN_ON(worker_queue_); if (!send_side_delay_observer_ || capture_time.IsInfinite()) return; TimeDelta avg_delay = TimeDelta::Zero(); TimeDelta max_delay = TimeDelta::Zero(); { - MutexLock lock(&lock_); // Compute the max and average of the recent capture-to-send delays. // The time complexity of the current approach depends on the distribution // of the delay values. This could be done more efficiently. @@ -524,6 +500,7 @@ void RtpSenderEgress::UpdateDelayStatistics(Timestamp capture_time, } void RtpSenderEgress::RecomputeMaxSendDelay() { + RTC_DCHECK_RUN_ON(worker_queue_); max_delay_it_ = send_delays_.begin(); for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) { if (it->second >= max_delay_it_->second) { @@ -545,6 +522,7 @@ void RtpSenderEgress::UpdateOnSendPacket(int packet_id, bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options, const PacedPacketInfo& pacing_info) { + RTC_DCHECK_RUN_ON(worker_queue_); int bytes_sent = -1; if (transport_) { bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options) @@ -573,32 +551,27 @@ void RtpSenderEgress::UpdateRtpStats(Timestamp now, // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the // worker thread. RtpSendRates send_rates; - { - MutexLock lock(&lock_); - // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are - // only touched on the worker thread. - StreamDataCounters* counters = - packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; + StreamDataCounters* counters = + packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; - counters->MaybeSetFirstPacketTime(now); + counters->MaybeSetFirstPacketTime(now); - if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { - counters->fec.Add(counter); - } else if (packet_type == RtpPacketMediaType::kRetransmission) { - counters->retransmitted.Add(counter); - } + if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { + counters->fec.Add(counter); + } else if (packet_type == RtpPacketMediaType::kRetransmission) { + counters->retransmitted.Add(counter); + } counters->transmitted.Add(counter); send_rates_[static_cast(packet_type)].Update(packet_size, now.ms()); if (bitrate_callback_) { - send_rates = GetSendRatesLocked(now); + send_rates = GetSendRates(now); } if (rtp_stats_callback_) { rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc); } - } // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point // to the same object, so these callbacks could be consolidated into one. @@ -612,7 +585,7 @@ void RtpSenderEgress::UpdateRtpStats(Timestamp now, void RtpSenderEgress::PeriodicUpdate() { RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(bitrate_callback_); - RtpSendRates send_rates = GetSendRates(); + RtpSendRates send_rates = GetSendRates(clock_->CurrentTime()); bitrate_callback_->Notify( send_rates.Sum().bps(), send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); @@ -622,7 +595,7 @@ void RtpSenderEgress::PeriodicUpdate() { void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) { RTC_DCHECK_RUN_ON(worker_queue_); - const auto rates = GetSendRates(); + const auto rates = GetSendRates(now); if (is_audio_) { BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now.ms(), rates.Sum().kbps(), packet_ssrc); @@ -638,5 +611,4 @@ void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) { } } #endif // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE - } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h index accdeb15d1..7bb8f80efd 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.h +++ b/modules/rtp_rtcp/source/rtp_sender_egress.h @@ -46,7 +46,9 @@ class RtpSenderEgress { // without passing through an actual paced sender. class NonPacedPacketSender : public RtpPacketSender { public: - NonPacedPacketSender(RtpSenderEgress* sender, PacketSequencer* sequencer); + NonPacedPacketSender(TaskQueueBase& worker_queue, + RtpSenderEgress* sender, + PacketSequencer* sequencer); virtual ~NonPacedPacketSender(); void EnqueuePackets( @@ -56,9 +58,11 @@ class RtpSenderEgress { private: void PrepareForSend(RtpPacketToSend* packet); + TaskQueueBase& worker_queue_; uint16_t transport_sequence_number_; RtpSenderEgress* const sender_; PacketSequencer* sequencer_; + ScopedTaskSafety task_safety_; }; RtpSenderEgress(const RtpRtcpInterface::Configuration& config, @@ -66,22 +70,21 @@ class RtpSenderEgress { ~RtpSenderEgress(); void SendPacket(std::unique_ptr packet, - const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_); + const PacedPacketInfo& pacing_info); void OnBatchComplete(); uint32_t Ssrc() const { return ssrc_; } absl::optional RtxSsrc() const { return rtx_ssrc_; } absl::optional FlexFecSsrc() const { return flexfec_ssrc_; } - RtpSendRates GetSendRates() const RTC_LOCKS_EXCLUDED(lock_); + RtpSendRates GetSendRates(Timestamp now) const; void GetDataCounters(StreamDataCounters* rtp_stats, - StreamDataCounters* rtx_stats) const - RTC_LOCKS_EXCLUDED(lock_); + StreamDataCounters* rtx_stats) const; - void ForceIncludeSendPacketsInAllocation(bool part_of_allocation) - RTC_LOCKS_EXCLUDED(lock_); - bool MediaHasBeenSent() const RTC_LOCKS_EXCLUDED(lock_); - void SetMediaHasBeenSent(bool media_sent) RTC_LOCKS_EXCLUDED(lock_); - void SetTimestampOffset(uint32_t timestamp) RTC_LOCKS_EXCLUDED(lock_); + void ForceIncludeSendPacketsInAllocation(bool part_of_allocation); + + bool MediaHasBeenSent() const; + void SetMediaHasBeenSent(bool media_sent); + void SetTimestampOffset(uint32_t timestamp); // For each sequence number in `sequence_number`, recall the last RTP packet // which bore it - its timestamp and whether it was the first and/or last @@ -89,8 +92,7 @@ class RtpSenderEgress { // recalled, return a vector with all of them (in corresponding order). // If any could not be recalled, return an empty vector. std::vector GetSentRtpPacketInfos( - rtc::ArrayView sequence_numbers) const - RTC_LOCKS_EXCLUDED(lock_); + rtc::ArrayView sequence_numbers) const; void SetFecProtectionParameters(const FecProtectionParams& delta_params, const FecProtectionParams& key_params); @@ -106,10 +108,7 @@ class RtpSenderEgress { PacedPacketInfo info; Timestamp now; }; - void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch) - RTC_LOCKS_EXCLUDED(lock_) RTC_RUN_ON(pacer_checker_); - RtpSendRates GetSendRatesLocked(Timestamp now) const - RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch); bool HasCorrectSsrc(const RtpPacketToSend& packet) const; void AddPacketToTransportFeedback(uint16_t packet_id, const RtpPacketToSend& packet, @@ -117,13 +116,12 @@ class RtpSenderEgress { void UpdateDelayStatistics(Timestamp capture_time, Timestamp now, uint32_t ssrc); - void RecomputeMaxSendDelay() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + void RecomputeMaxSendDelay(); void UpdateOnSendPacket(int packet_id, Timestamp capture_time, uint32_t ssrc); // Sends packet on to `transport_`, leaving the RTP module. bool SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options, const PacedPacketInfo& pacing_info); - void UpdateRtpStats(Timestamp now, uint32_t packet_ssrc, RtpPacketMediaType packet_type, @@ -138,20 +136,19 @@ class RtpSenderEgress { const bool enable_send_packet_batching_; TaskQueueBase* const worker_queue_; - RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_checker_; const uint32_t ssrc_; const absl::optional rtx_ssrc_; const absl::optional flexfec_ssrc_; const bool populate_network2_timestamp_; Clock* const clock_; - RtpPacketHistory* const packet_history_; + RtpPacketHistory* const packet_history_ RTC_GUARDED_BY(worker_queue_); Transport* const transport_; RtcEventLog* const event_log_; const bool is_audio_; const bool need_rtp_packet_infos_; - VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(pacer_checker_); - absl::optional last_sent_seq_ RTC_GUARDED_BY(pacer_checker_); - absl::optional last_sent_rtx_seq_ RTC_GUARDED_BY(pacer_checker_); + VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(worker_queue_); + absl::optional last_sent_seq_ RTC_GUARDED_BY(worker_queue_); + absl::optional last_sent_rtx_seq_ RTC_GUARDED_BY(worker_queue_); TransportFeedbackObserver* const transport_feedback_observer_; SendSideDelayObserver* const send_side_delay_observer_; @@ -159,24 +156,23 @@ class RtpSenderEgress { StreamDataCountersCallback* const rtp_stats_callback_; BitrateStatisticsObserver* const bitrate_callback_; - mutable Mutex lock_; - bool media_has_been_sent_ RTC_GUARDED_BY(pacer_checker_); - bool force_part_of_allocation_ RTC_GUARDED_BY(lock_); + bool media_has_been_sent_ RTC_GUARDED_BY(worker_queue_); + bool force_part_of_allocation_ RTC_GUARDED_BY(worker_queue_); uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_); // Maps capture time to send-side delay. Send-side delay is the difference // between transmission time and capture time. - std::map send_delays_ RTC_GUARDED_BY(lock_); + std::map send_delays_ RTC_GUARDED_BY(worker_queue_); std::map::const_iterator max_delay_it_ - RTC_GUARDED_BY(lock_); + RTC_GUARDED_BY(worker_queue_); // The sum of delays over a kSendSideDelayWindowMs sliding window. - TimeDelta sum_delays_ RTC_GUARDED_BY(lock_); - StreamDataCounters rtp_stats_ RTC_GUARDED_BY(lock_); - StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(lock_); + TimeDelta sum_delays_ RTC_GUARDED_BY(worker_queue_); + StreamDataCounters rtp_stats_ RTC_GUARDED_BY(worker_queue_); + StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(worker_queue_); // One element per value in RtpPacketMediaType, with index matching value. - std::vector send_rates_ RTC_GUARDED_BY(lock_); + std::vector send_rates_ RTC_GUARDED_BY(worker_queue_); absl::optional> - pending_fec_params_ RTC_GUARDED_BY(lock_); + pending_fec_params_ RTC_GUARDED_BY(worker_queue_); // Maps sent packets' sequence numbers to a tuple consisting of: // 1. The timestamp, without the randomizing offset mandated by the RFC. @@ -185,7 +181,7 @@ class RtpSenderEgress { const std::unique_ptr rtp_sequence_number_map_ RTC_GUARDED_BY(worker_queue_); RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_); - std::vector packets_to_send_ RTC_GUARDED_BY(pacer_checker_); + std::vector packets_to_send_ RTC_GUARDED_BY(worker_queue_); ScopedTaskSafety task_safety_; }; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc index 05a079b669..9389dc77cf 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc @@ -465,7 +465,9 @@ TEST_F(RtpSenderEgressTest, ReportsFecRate) { } EXPECT_NEAR( - (sender->GetSendRates()[RtpPacketMediaType::kForwardErrorCorrection]) + (sender->GetSendRates( + time_controller_.GetClock() + ->CurrentTime())[RtpPacketMediaType::kForwardErrorCorrection]) .bps(), (total_fec_data_sent / (kTimeBetweenPackets * kNumPackets)).bps(), 500); }