diff --git a/audio/voip/test/BUILD.gn b/audio/voip/test/BUILD.gn index e89f2b001a..00e9bee622 100644 --- a/audio/voip/test/BUILD.gn +++ b/audio/voip/test/BUILD.gn @@ -70,6 +70,8 @@ if (rtc_include_tests) { "../../../api/audio_codecs:builtin_audio_decoder_factory", "../../../api/audio_codecs:builtin_audio_encoder_factory", "../../../api/task_queue:default_task_queue_factory", + "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../modules/audio_mixer:audio_mixer_test_utils", "../../../modules/rtp_rtcp:rtp_rtcp", "../../../rtc_base:logging", @@ -77,6 +79,7 @@ if (rtc_include_tests) { "../../../test:mock_transport", "../../../test:run_loop", "../../../test:test_support", + "../../../test/time_controller:time_controller", ] } @@ -88,6 +91,8 @@ if (rtc_include_tests) { "../../../api:transport_api", "../../../api/audio_codecs:builtin_audio_encoder_factory", "../../../api/task_queue:default_task_queue_factory", + "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../modules/audio_mixer:audio_mixer_test_utils", "../../../modules/rtp_rtcp:rtp_rtcp", "../../../modules/rtp_rtcp:rtp_rtcp_format", @@ -96,6 +101,7 @@ if (rtc_include_tests) { "../../../test:mock_transport", "../../../test:run_loop", "../../../test:test_support", + "../../../test/time_controller:time_controller", ] } } diff --git a/audio/voip/test/audio_egress_unittest.cc b/audio/voip/test/audio_egress_unittest.cc index 34c5585347..98679db508 100644 --- a/audio/voip/test/audio_egress_unittest.cc +++ b/audio/voip/test/audio_egress_unittest.cc @@ -13,6 +13,8 @@ #include "api/audio_codecs/builtin_audio_encoder_factory.h" #include "api/call/transport.h" #include "api/task_queue/default_task_queue_factory.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "modules/audio_mixer/sine_wave_generator.h" #include "modules/rtp_rtcp/source/rtp_packet_received.h" #include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h" @@ -22,6 +24,7 @@ #include "test/gtest.h" #include "test/mock_transport.h" #include "test/run_loop.h" +#include "test/time_controller/simulated_time_controller.h" namespace webrtc { namespace { @@ -57,18 +60,18 @@ class AudioEgressTest : public ::testing::Test { static constexpr uint32_t kRemoteSsrc = 0xDEADBEEF; const SdpAudioFormat kPcmuFormat = {"pcmu", 8000, 1}; - AudioEgressTest() - : fake_clock_(kStartTime), wave_generator_(1000.0, kAudioLevel) { - task_queue_factory_ = CreateDefaultTaskQueueFactory(); + AudioEgressTest() : wave_generator_(1000.0, kAudioLevel) { encoder_factory_ = CreateBuiltinAudioEncoderFactory(); } // Prepare test on audio egress by using PCMu codec with specific // sequence number and its status to be running. void SetUp() override { - rtp_rtcp_ = CreateRtpStack(&fake_clock_, &transport_, kRemoteSsrc); - egress_ = std::make_unique(rtp_rtcp_.get(), &fake_clock_, - task_queue_factory_.get()); + rtp_rtcp_ = + CreateRtpStack(time_controller_.GetClock(), &transport_, kRemoteSsrc); + egress_ = std::make_unique( + rtp_rtcp_.get(), time_controller_.GetClock(), + time_controller_.GetTaskQueueFactory()); constexpr int kPcmuPayload = 0; egress_->SetEncoder(kPcmuPayload, kPcmuFormat, encoder_factory_->MakeAudioEncoder( @@ -100,14 +103,10 @@ class AudioEgressTest : public ::testing::Test { return frame; } - test::RunLoop run_loop_; - // SimulatedClock doesn't directly affect this testcase as the the - // AudioFrame's timestamp is driven by GetAudioFrame. - SimulatedClock fake_clock_; + GlobalSimulatedTimeController time_controller_{Timestamp::Micros(kStartTime)}; NiceMock transport_; SineWaveGenerator wave_generator_; std::unique_ptr rtp_rtcp_; - std::unique_ptr task_queue_factory_; rtc::scoped_refptr encoder_factory_; std::unique_ptr egress_; }; @@ -138,7 +137,7 @@ TEST_F(AudioEgressTest, ProcessAudioWithMute) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -174,7 +173,7 @@ TEST_F(AudioEgressTest, ProcessAudioWithSineWave) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -208,7 +207,7 @@ TEST_F(AudioEgressTest, SkipAudioEncodingAfterStopSend) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -222,7 +221,7 @@ TEST_F(AudioEgressTest, SkipAudioEncodingAfterStopSend) { // result in crahses or sanitizer errors due to remaining data. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } } @@ -284,7 +283,7 @@ TEST_F(AudioEgressTest, SendDTMF) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(TimeDelta::Seconds(1)); @@ -309,7 +308,7 @@ TEST_F(AudioEgressTest, TestAudioInputLevelAndEnergyDuration) { // Two 10 ms audio frames will result in rtp packet with ptime 20. for (size_t i = 0; i < kExpected * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(/*give_up_after=*/TimeDelta::Seconds(1)); diff --git a/audio/voip/test/audio_ingress_unittest.cc b/audio/voip/test/audio_ingress_unittest.cc index 3c309dbf82..3a082cdda2 100644 --- a/audio/voip/test/audio_ingress_unittest.cc +++ b/audio/voip/test/audio_ingress_unittest.cc @@ -14,6 +14,7 @@ #include "api/audio_codecs/builtin_audio_encoder_factory.h" #include "api/call/transport.h" #include "api/task_queue/default_task_queue_factory.h" +#include "api/units/time_delta.h" #include "audio/voip/audio_egress.h" #include "modules/audio_mixer/sine_wave_generator.h" #include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h" @@ -23,6 +24,7 @@ #include "test/gtest.h" #include "test/mock_transport.h" #include "test/run_loop.h" +#include "test/time_controller/simulated_time_controller.h" namespace webrtc { namespace { @@ -37,12 +39,12 @@ class AudioIngressTest : public ::testing::Test { public: const SdpAudioFormat kPcmuFormat = {"pcmu", 8000, 1}; - AudioIngressTest() - : fake_clock_(123456789), wave_generator_(1000.0, kAudioLevel) { - receive_statistics_ = ReceiveStatistics::Create(&fake_clock_); + AudioIngressTest() : wave_generator_(1000.0, kAudioLevel) { + receive_statistics_ = + ReceiveStatistics::Create(time_controller_.GetClock()); RtpRtcpInterface::Configuration rtp_config; - rtp_config.clock = &fake_clock_; + rtp_config.clock = time_controller_.GetClock(); rtp_config.audio = true; rtp_config.receive_statistics = receive_statistics_.get(); rtp_config.rtcp_report_interval_ms = 5000; @@ -53,20 +55,20 @@ class AudioIngressTest : public ::testing::Test { rtp_rtcp_->SetSendingMediaStatus(false); rtp_rtcp_->SetRTCPStatus(RtcpMode::kCompound); - task_queue_factory_ = CreateDefaultTaskQueueFactory(); encoder_factory_ = CreateBuiltinAudioEncoderFactory(); decoder_factory_ = CreateBuiltinAudioDecoderFactory(); } void SetUp() override { constexpr int kPcmuPayload = 0; - ingress_ = std::make_unique(rtp_rtcp_.get(), &fake_clock_, - receive_statistics_.get(), - decoder_factory_); + ingress_ = std::make_unique( + rtp_rtcp_.get(), time_controller_.GetClock(), receive_statistics_.get(), + decoder_factory_); ingress_->SetReceiveCodecs({{kPcmuPayload, kPcmuFormat}}); - egress_ = std::make_unique(rtp_rtcp_.get(), &fake_clock_, - task_queue_factory_.get()); + egress_ = std::make_unique( + rtp_rtcp_.get(), time_controller_.GetClock(), + time_controller_.GetTaskQueueFactory()); egress_->SetEncoder(kPcmuPayload, kPcmuFormat, encoder_factory_->MakeAudioEncoder( kPcmuPayload, kPcmuFormat, absl::nullopt)); @@ -93,15 +95,13 @@ class AudioIngressTest : public ::testing::Test { return frame; } - test::RunLoop run_loop_; - SimulatedClock fake_clock_; + GlobalSimulatedTimeController time_controller_{Timestamp::Micros(123456789)}; SineWaveGenerator wave_generator_; NiceMock transport_; std::unique_ptr receive_statistics_; std::unique_ptr rtp_rtcp_; rtc::scoped_refptr encoder_factory_; rtc::scoped_refptr decoder_factory_; - std::unique_ptr task_queue_factory_; std::unique_ptr ingress_; std::unique_ptr egress_; }; @@ -122,7 +122,8 @@ TEST_F(AudioIngressTest, GetAudioFrameAfterRtpReceived) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); egress_->SendAudioData(GetAudioFrame(0)); egress_->SendAudioData(GetAudioFrame(1)); - event.Wait(TimeDelta::Seconds(1)); + time_controller_.AdvanceTime(TimeDelta::Zero()); + ASSERT_TRUE(event.Wait(TimeDelta::Seconds(1))); AudioFrame audio_frame; EXPECT_EQ( @@ -153,7 +154,7 @@ TEST_F(AudioIngressTest, TestSpeechOutputLevelAndEnergyDuration) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); for (int i = 0; i < kNumRtp * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(/*give_up_after=*/TimeDelta::Seconds(1)); @@ -182,7 +183,8 @@ TEST_F(AudioIngressTest, PreferredSampleRate) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); egress_->SendAudioData(GetAudioFrame(0)); egress_->SendAudioData(GetAudioFrame(1)); - event.Wait(TimeDelta::Seconds(1)); + time_controller_.AdvanceTime(TimeDelta::Zero()); + ASSERT_TRUE(event.Wait(TimeDelta::Seconds(1))); AudioFrame audio_frame; EXPECT_EQ( @@ -212,7 +214,7 @@ TEST_F(AudioIngressTest, GetMutedAudioFrameAfterRtpReceivedAndStopPlay) { EXPECT_CALL(transport_, SendRtp).WillRepeatedly(Invoke(handle_rtp)); for (int i = 0; i < kNumRtp * 2; i++) { egress_->SendAudioData(GetAudioFrame(i)); - fake_clock_.AdvanceTimeMilliseconds(10); + time_controller_.AdvanceTime(TimeDelta::Millis(10)); } event.Wait(/*give_up_after=*/TimeDelta::Seconds(1)); diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc index 0687f32352..d6eddb21b1 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc @@ -64,6 +64,7 @@ RtpPacketHistory::PaddingMode GetPaddingMode( } // namespace ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( + TaskQueueBase& worker_queue, const RtpRtcpInterface::Configuration& config) : packet_history(config.clock, GetPaddingMode(config.field_trials)), sequencer(config.local_media_ssrc, @@ -71,7 +72,7 @@ ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( /*require_marker_before_media_padding=*/!config.audio, config.clock), packet_sender(config, &packet_history), - non_paced_sender(&packet_sender, &sequencer), + non_paced_sender(worker_queue, &packet_sender, &sequencer), packet_generator( config, &packet_history, @@ -94,7 +95,8 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) RTC_DCHECK(worker_queue_); rtcp_thread_checker_.Detach(); if (!configuration.receiver_only) { - rtp_sender_ = std::make_unique(configuration); + rtp_sender_ = + std::make_unique(*worker_queue_, configuration); rtp_sender_->sequencing_checker.Detach(); // Make sure rtcp sender use same timestamp offset as rtp sender. rtcp_sender_.SetTimestampOffset( @@ -259,7 +261,9 @@ RTCPSender::FeedbackState ModuleRtpRtcpImpl2::GetFeedbackState() { state.media_bytes_sent = rtp_stats.transmitted.payload_bytes + rtx_stats.transmitted.payload_bytes; state.send_bitrate = - rtp_sender_->packet_sender.GetSendRates().Sum().bps(); + rtp_sender_->packet_sender.GetSendRates(clock_->CurrentTime()) + .Sum() + .bps(); } state.receiver = &rtcp_receiver_; @@ -310,9 +314,9 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp, int64_t capture_time_ms, int payload_type, bool force_sender_report) { - if (!Sending()) + if (!Sending()) { return false; - + } // TODO(bugs.webrtc.org/12873): Migrate this method and it's users to use // optional Timestamps. absl::optional capture_time; @@ -322,11 +326,20 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp, absl::optional payload_type_optional; if (payload_type >= 0) payload_type_optional = payload_type; - rtcp_sender_.SetLastRtpTime(timestamp, capture_time, payload_type_optional); - // Make sure an RTCP report isn't queued behind a key frame. - if (rtcp_sender_.TimeToSendRTCPReport(force_sender_report)) - rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); + auto closure = [this, timestamp, capture_time, payload_type_optional, + force_sender_report] { + RTC_DCHECK_RUN_ON(worker_queue_); + rtcp_sender_.SetLastRtpTime(timestamp, capture_time, payload_type_optional); + // Make sure an RTCP report isn't queued behind a key frame. + if (rtcp_sender_.TimeToSendRTCPReport(force_sender_report)) + rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport); + }; + if (worker_queue_->IsCurrent()) { + closure(); + } else { + worker_queue_->PostTask(SafeTask(task_safety_.flag(), std::move(closure))); + } return true; } @@ -645,9 +658,8 @@ void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) { } RtpSendRates ModuleRtpRtcpImpl2::GetSendRates() const { - // Typically called on the `rtp_transport_queue_` owned by an - // RtpTransportControllerSendInterface instance. - return rtp_sender_->packet_sender.GetSendRates(); + RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker); + return rtp_sender_->packet_sender.GetSendRates(clock_->CurrentTime()); } void ModuleRtpRtcpImpl2::OnRequestSendReport() { diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h index 1e11713ece..06fe5869b6 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h @@ -252,7 +252,8 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, FRIEND_TEST_ALL_PREFIXES(RtpRtcpImpl2Test, RttForReceiverOnly); struct RtpSenderContext { - explicit RtpSenderContext(const RtpRtcpInterface::Configuration& config); + explicit RtpSenderContext(TaskQueueBase& worker_queue, + const RtpRtcpInterface::Configuration& config); // Storage of packets, for retransmissions and padding, if applicable. RtpPacketHistory packet_history; SequenceChecker sequencing_checker; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc index 212bc5260b..0adb436286 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc @@ -16,6 +16,7 @@ #include #include "absl/strings/match.h" +#include "api/units/timestamp.h" #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h" #include "rtc_base/logging.h" @@ -30,15 +31,29 @@ constexpr TimeDelta kUpdateInterval = } // namespace RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender( + TaskQueueBase& worker_queue, RtpSenderEgress* sender, PacketSequencer* sequencer) - : transport_sequence_number_(0), sender_(sender), sequencer_(sequencer) { + : worker_queue_(worker_queue), + transport_sequence_number_(0), + sender_(sender), + sequencer_(sequencer) { RTC_DCHECK(sequencer); } -RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() = default; +RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() { + RTC_DCHECK_RUN_ON(&worker_queue_); +} void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets( std::vector> packets) { + if (!worker_queue_.IsCurrent()) { + worker_queue_.PostTask(SafeTask( + task_safety_.flag(), [this, packets = std::move(packets)]() mutable { + EnqueuePackets(std::move(packets)); + })); + return; + } + RTC_DCHECK_RUN_ON(&worker_queue_); for (auto& packet : packets) { PrepareForSend(packet.get()); sender_->SendPacket(std::move(packet), PacedPacketInfo()); @@ -51,6 +66,7 @@ void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets( void RtpSenderEgress::NonPacedPacketSender::PrepareForSend( RtpPacketToSend* packet) { + RTC_DCHECK_RUN_ON(&worker_queue_); // Assign sequence numbers, but not for flexfec which is already running on // an internally maintained sequence number series. if (packet->Ssrc() != sender_->FlexFecSsrc()) { @@ -97,7 +113,6 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, kRtpSequenceNumberMapMaxEntries) : nullptr) { RTC_DCHECK(worker_queue_); - pacer_checker_.Detach(); if (bitrate_callback_) { update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_, kUpdateInterval, [this]() { @@ -114,7 +129,7 @@ RtpSenderEgress::~RtpSenderEgress() { void RtpSenderEgress::SendPacket(std::unique_ptr packet, const PacedPacketInfo& pacing_info) { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(packet); if (packet->Ssrc() == ssrc_ && @@ -139,30 +154,17 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, } const Timestamp now = clock_->CurrentTime(); - #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE - worker_queue_->PostTask(SafeTask(task_safety_.flag(), - [this, now, packet_ssrc = packet->Ssrc()]() { - BweTestLoggingPlot(now, packet_ssrc); - })); + BweTestLoggingPlot(now, packet->Ssrc()); #endif - if (need_rtp_packet_infos_ && packet->packet_type() == RtpPacketToSend::Type::kVideo) { - worker_queue_->PostTask(SafeTask( - task_safety_.flag(), - [this, packet_timestamp = packet->Timestamp(), - is_first_packet_of_frame = packet->is_first_packet_of_frame(), - is_last_packet_of_frame = packet->Marker(), - sequence_number = packet->SequenceNumber()]() { - RTC_DCHECK_RUN_ON(worker_queue_); - // Last packet of a frame, add it to sequence number info map. - const uint32_t timestamp = packet_timestamp - timestamp_offset_; - rtp_sequence_number_map_->InsertPacket( - sequence_number, - RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame, - is_last_packet_of_frame)); - })); + // Last packet of a frame, add it to sequence number info map. + const uint32_t timestamp = packet->Timestamp() - timestamp_offset_; + rtp_sequence_number_map_->InsertPacket( + packet->SequenceNumber(), + RtpSequenceNumberMap::Info( + timestamp, packet->is_first_packet_of_frame(), packet->Marker())); } if (fec_generator_ && packet->fec_protect_packet()) { @@ -171,10 +173,7 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo); absl::optional> new_fec_params; - { - MutexLock lock(&lock_); - new_fec_params.swap(pending_fec_params_); - } + new_fec_params.swap(pending_fec_params_); if (new_fec_params) { fec_generator_->SetProtectionParameters(new_fec_params->first, new_fec_params->second); @@ -234,7 +233,7 @@ void RtpSenderEgress::SendPacket(std::unique_ptr packet, } void RtpSenderEgress::OnBatchComplete() { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); for (auto& packet : packets_to_send_) { CompleteSendPacket(packet, &packet == &packets_to_send_.back()); } @@ -243,16 +242,14 @@ void RtpSenderEgress::OnBatchComplete() { void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, bool last_in_batch) { + RTC_DCHECK_RUN_ON(worker_queue_); auto& [packet, pacing_info, now] = compound_packet; const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio || packet->packet_type() == RtpPacketMediaType::kVideo; PacketOptions options; - { - MutexLock lock(&lock_); - options.included_in_allocation = force_part_of_allocation_; - } + options.included_in_allocation = force_part_of_allocation_; // Downstream code actually uses this flag to distinguish between media and // everything else. @@ -298,29 +295,12 @@ void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, RtpPacketMediaType packet_type = *packet->packet_type(); RtpPacketCounter counter(*packet); size_t size = packet->size(); - // TODO(crbug.com/1373439): clean up task posting when the combined - // network/worker project launches. - if (TaskQueueBase::Current() != worker_queue_) { - worker_queue_->PostTask(SafeTask( - task_safety_.flag(), [this, now = now, packet_ssrc, packet_type, - counter = std::move(counter), size]() { - RTC_DCHECK_RUN_ON(worker_queue_); - UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), - size); - })); - } else { - RTC_DCHECK_RUN_ON(worker_queue_); - UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), size); - } + UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter), size); } } -RtpSendRates RtpSenderEgress::GetSendRates() const { - MutexLock lock(&lock_); - return GetSendRatesLocked(clock_->CurrentTime()); -} - -RtpSendRates RtpSenderEgress::GetSendRatesLocked(Timestamp now) const { +RtpSendRates RtpSenderEgress::GetSendRates(Timestamp now) const { + RTC_DCHECK_RUN_ON(worker_queue_); RtpSendRates current_rates; for (size_t i = 0; i < kNumMediaTypes; ++i) { RtpPacketMediaType type = static_cast(i); @@ -332,26 +312,24 @@ RtpSendRates RtpSenderEgress::GetSendRatesLocked(Timestamp now) const { void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats, StreamDataCounters* rtx_stats) const { - // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are - // only touched on the worker thread. - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(worker_queue_); *rtp_stats = rtp_stats_; *rtx_stats = rtx_rtp_stats_; } void RtpSenderEgress::ForceIncludeSendPacketsInAllocation( bool part_of_allocation) { - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(worker_queue_); force_part_of_allocation_ = part_of_allocation; } bool RtpSenderEgress::MediaHasBeenSent() const { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); return media_has_been_sent_; } void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); media_has_been_sent_ = media_sent; } @@ -387,15 +365,13 @@ std::vector RtpSenderEgress::GetSentRtpPacketInfos( void RtpSenderEgress::SetFecProtectionParameters( const FecProtectionParams& delta_params, const FecProtectionParams& key_params) { - // TODO(sprang): Post task to pacer queue instead, one pacer is fully - // migrated to a task queue. - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(worker_queue_); pending_fec_params_.emplace(delta_params, key_params); } std::vector> RtpSenderEgress::FetchFecPackets() { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); if (fec_generator_) { return fec_generator_->GetFecPackets(); } @@ -404,7 +380,7 @@ RtpSenderEgress::FetchFecPackets() { void RtpSenderEgress::OnAbortedRetransmissions( rtc::ArrayView sequence_numbers) { - RTC_DCHECK_RUN_ON(&pacer_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); // Mark aborted retransmissions as sent, rather than leaving them in // a 'pending' state - otherwise they can not be requested again and // will not be cleared until the history has reached its max size. @@ -469,13 +445,13 @@ void RtpSenderEgress::AddPacketToTransportFeedback( void RtpSenderEgress::UpdateDelayStatistics(Timestamp capture_time, Timestamp now, uint32_t ssrc) { + RTC_DCHECK_RUN_ON(worker_queue_); if (!send_side_delay_observer_ || capture_time.IsInfinite()) return; TimeDelta avg_delay = TimeDelta::Zero(); TimeDelta max_delay = TimeDelta::Zero(); { - MutexLock lock(&lock_); // Compute the max and average of the recent capture-to-send delays. // The time complexity of the current approach depends on the distribution // of the delay values. This could be done more efficiently. @@ -524,6 +500,7 @@ void RtpSenderEgress::UpdateDelayStatistics(Timestamp capture_time, } void RtpSenderEgress::RecomputeMaxSendDelay() { + RTC_DCHECK_RUN_ON(worker_queue_); max_delay_it_ = send_delays_.begin(); for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) { if (it->second >= max_delay_it_->second) { @@ -545,6 +522,7 @@ void RtpSenderEgress::UpdateOnSendPacket(int packet_id, bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options, const PacedPacketInfo& pacing_info) { + RTC_DCHECK_RUN_ON(worker_queue_); int bytes_sent = -1; if (transport_) { bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options) @@ -573,32 +551,27 @@ void RtpSenderEgress::UpdateRtpStats(Timestamp now, // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the // worker thread. RtpSendRates send_rates; - { - MutexLock lock(&lock_); - // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are - // only touched on the worker thread. - StreamDataCounters* counters = - packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; + StreamDataCounters* counters = + packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; - counters->MaybeSetFirstPacketTime(now); + counters->MaybeSetFirstPacketTime(now); - if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { - counters->fec.Add(counter); - } else if (packet_type == RtpPacketMediaType::kRetransmission) { - counters->retransmitted.Add(counter); - } + if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { + counters->fec.Add(counter); + } else if (packet_type == RtpPacketMediaType::kRetransmission) { + counters->retransmitted.Add(counter); + } counters->transmitted.Add(counter); send_rates_[static_cast(packet_type)].Update(packet_size, now.ms()); if (bitrate_callback_) { - send_rates = GetSendRatesLocked(now); + send_rates = GetSendRates(now); } if (rtp_stats_callback_) { rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc); } - } // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point // to the same object, so these callbacks could be consolidated into one. @@ -612,7 +585,7 @@ void RtpSenderEgress::UpdateRtpStats(Timestamp now, void RtpSenderEgress::PeriodicUpdate() { RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(bitrate_callback_); - RtpSendRates send_rates = GetSendRates(); + RtpSendRates send_rates = GetSendRates(clock_->CurrentTime()); bitrate_callback_->Notify( send_rates.Sum().bps(), send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); @@ -622,7 +595,7 @@ void RtpSenderEgress::PeriodicUpdate() { void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) { RTC_DCHECK_RUN_ON(worker_queue_); - const auto rates = GetSendRates(); + const auto rates = GetSendRates(now); if (is_audio_) { BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now.ms(), rates.Sum().kbps(), packet_ssrc); @@ -638,5 +611,4 @@ void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) { } } #endif // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE - } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h index accdeb15d1..7bb8f80efd 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.h +++ b/modules/rtp_rtcp/source/rtp_sender_egress.h @@ -46,7 +46,9 @@ class RtpSenderEgress { // without passing through an actual paced sender. class NonPacedPacketSender : public RtpPacketSender { public: - NonPacedPacketSender(RtpSenderEgress* sender, PacketSequencer* sequencer); + NonPacedPacketSender(TaskQueueBase& worker_queue, + RtpSenderEgress* sender, + PacketSequencer* sequencer); virtual ~NonPacedPacketSender(); void EnqueuePackets( @@ -56,9 +58,11 @@ class RtpSenderEgress { private: void PrepareForSend(RtpPacketToSend* packet); + TaskQueueBase& worker_queue_; uint16_t transport_sequence_number_; RtpSenderEgress* const sender_; PacketSequencer* sequencer_; + ScopedTaskSafety task_safety_; }; RtpSenderEgress(const RtpRtcpInterface::Configuration& config, @@ -66,22 +70,21 @@ class RtpSenderEgress { ~RtpSenderEgress(); void SendPacket(std::unique_ptr packet, - const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_); + const PacedPacketInfo& pacing_info); void OnBatchComplete(); uint32_t Ssrc() const { return ssrc_; } absl::optional RtxSsrc() const { return rtx_ssrc_; } absl::optional FlexFecSsrc() const { return flexfec_ssrc_; } - RtpSendRates GetSendRates() const RTC_LOCKS_EXCLUDED(lock_); + RtpSendRates GetSendRates(Timestamp now) const; void GetDataCounters(StreamDataCounters* rtp_stats, - StreamDataCounters* rtx_stats) const - RTC_LOCKS_EXCLUDED(lock_); + StreamDataCounters* rtx_stats) const; - void ForceIncludeSendPacketsInAllocation(bool part_of_allocation) - RTC_LOCKS_EXCLUDED(lock_); - bool MediaHasBeenSent() const RTC_LOCKS_EXCLUDED(lock_); - void SetMediaHasBeenSent(bool media_sent) RTC_LOCKS_EXCLUDED(lock_); - void SetTimestampOffset(uint32_t timestamp) RTC_LOCKS_EXCLUDED(lock_); + void ForceIncludeSendPacketsInAllocation(bool part_of_allocation); + + bool MediaHasBeenSent() const; + void SetMediaHasBeenSent(bool media_sent); + void SetTimestampOffset(uint32_t timestamp); // For each sequence number in `sequence_number`, recall the last RTP packet // which bore it - its timestamp and whether it was the first and/or last @@ -89,8 +92,7 @@ class RtpSenderEgress { // recalled, return a vector with all of them (in corresponding order). // If any could not be recalled, return an empty vector. std::vector GetSentRtpPacketInfos( - rtc::ArrayView sequence_numbers) const - RTC_LOCKS_EXCLUDED(lock_); + rtc::ArrayView sequence_numbers) const; void SetFecProtectionParameters(const FecProtectionParams& delta_params, const FecProtectionParams& key_params); @@ -106,10 +108,7 @@ class RtpSenderEgress { PacedPacketInfo info; Timestamp now; }; - void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch) - RTC_LOCKS_EXCLUDED(lock_) RTC_RUN_ON(pacer_checker_); - RtpSendRates GetSendRatesLocked(Timestamp now) const - RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch); bool HasCorrectSsrc(const RtpPacketToSend& packet) const; void AddPacketToTransportFeedback(uint16_t packet_id, const RtpPacketToSend& packet, @@ -117,13 +116,12 @@ class RtpSenderEgress { void UpdateDelayStatistics(Timestamp capture_time, Timestamp now, uint32_t ssrc); - void RecomputeMaxSendDelay() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + void RecomputeMaxSendDelay(); void UpdateOnSendPacket(int packet_id, Timestamp capture_time, uint32_t ssrc); // Sends packet on to `transport_`, leaving the RTP module. bool SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options, const PacedPacketInfo& pacing_info); - void UpdateRtpStats(Timestamp now, uint32_t packet_ssrc, RtpPacketMediaType packet_type, @@ -138,20 +136,19 @@ class RtpSenderEgress { const bool enable_send_packet_batching_; TaskQueueBase* const worker_queue_; - RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_checker_; const uint32_t ssrc_; const absl::optional rtx_ssrc_; const absl::optional flexfec_ssrc_; const bool populate_network2_timestamp_; Clock* const clock_; - RtpPacketHistory* const packet_history_; + RtpPacketHistory* const packet_history_ RTC_GUARDED_BY(worker_queue_); Transport* const transport_; RtcEventLog* const event_log_; const bool is_audio_; const bool need_rtp_packet_infos_; - VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(pacer_checker_); - absl::optional last_sent_seq_ RTC_GUARDED_BY(pacer_checker_); - absl::optional last_sent_rtx_seq_ RTC_GUARDED_BY(pacer_checker_); + VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(worker_queue_); + absl::optional last_sent_seq_ RTC_GUARDED_BY(worker_queue_); + absl::optional last_sent_rtx_seq_ RTC_GUARDED_BY(worker_queue_); TransportFeedbackObserver* const transport_feedback_observer_; SendSideDelayObserver* const send_side_delay_observer_; @@ -159,24 +156,23 @@ class RtpSenderEgress { StreamDataCountersCallback* const rtp_stats_callback_; BitrateStatisticsObserver* const bitrate_callback_; - mutable Mutex lock_; - bool media_has_been_sent_ RTC_GUARDED_BY(pacer_checker_); - bool force_part_of_allocation_ RTC_GUARDED_BY(lock_); + bool media_has_been_sent_ RTC_GUARDED_BY(worker_queue_); + bool force_part_of_allocation_ RTC_GUARDED_BY(worker_queue_); uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_); // Maps capture time to send-side delay. Send-side delay is the difference // between transmission time and capture time. - std::map send_delays_ RTC_GUARDED_BY(lock_); + std::map send_delays_ RTC_GUARDED_BY(worker_queue_); std::map::const_iterator max_delay_it_ - RTC_GUARDED_BY(lock_); + RTC_GUARDED_BY(worker_queue_); // The sum of delays over a kSendSideDelayWindowMs sliding window. - TimeDelta sum_delays_ RTC_GUARDED_BY(lock_); - StreamDataCounters rtp_stats_ RTC_GUARDED_BY(lock_); - StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(lock_); + TimeDelta sum_delays_ RTC_GUARDED_BY(worker_queue_); + StreamDataCounters rtp_stats_ RTC_GUARDED_BY(worker_queue_); + StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(worker_queue_); // One element per value in RtpPacketMediaType, with index matching value. - std::vector send_rates_ RTC_GUARDED_BY(lock_); + std::vector send_rates_ RTC_GUARDED_BY(worker_queue_); absl::optional> - pending_fec_params_ RTC_GUARDED_BY(lock_); + pending_fec_params_ RTC_GUARDED_BY(worker_queue_); // Maps sent packets' sequence numbers to a tuple consisting of: // 1. The timestamp, without the randomizing offset mandated by the RFC. @@ -185,7 +181,7 @@ class RtpSenderEgress { const std::unique_ptr rtp_sequence_number_map_ RTC_GUARDED_BY(worker_queue_); RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_); - std::vector packets_to_send_ RTC_GUARDED_BY(pacer_checker_); + std::vector packets_to_send_ RTC_GUARDED_BY(worker_queue_); ScopedTaskSafety task_safety_; }; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc index 05a079b669..9389dc77cf 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc @@ -465,7 +465,9 @@ TEST_F(RtpSenderEgressTest, ReportsFecRate) { } EXPECT_NEAR( - (sender->GetSendRates()[RtpPacketMediaType::kForwardErrorCorrection]) + (sender->GetSendRates( + time_controller_.GetClock() + ->CurrentTime())[RtpPacketMediaType::kForwardErrorCorrection]) .bps(), (total_fec_data_sent / (kTimeBetweenPackets * kNumPackets)).bps(), 500); }