From 73f048daf07b157961c43e7dbc9d2c378e6457d8 Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Wed, 12 Apr 2023 09:41:17 +0000 Subject: [PATCH] Revert "[WebRTC-SendPacketsOnWorkerThread] Cleanup AudioSendStream" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit dd557fdb1e300068c62c870d9dc5273b48c7b79d. Reason for revert: Looks like the Chromium FYI builders are failing. Original change's description: > [WebRTC-SendPacketsOnWorkerThread] Cleanup AudioSendStream > > This remove use of MaybeWorkerThread* rtp_transport_queue_ from > AudioSendStream. The worker queue is alwauys assumed ot be used where > rtp_transport_queue_ was used. > > Bug: webrtc:14502 > Change-Id: Ia516ce7340d712671e0ecb301bba9d66e7216973 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300400 > Reviewed-by: Evan Shrubsole > Reviewed-by: Jakob Ivarsson‎ > Commit-Queue: Per Kjellander > Cr-Commit-Position: refs/heads/main@{#39816} Bug: webrtc:14502 Change-Id: I0547548032756fc579b76b6bb362f576aa06b8f7 No-Presubmit: true No-Tree-Checks: true No-Try: true Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/301020 Commit-Queue: Tomas Gunnarsson Auto-Submit: Tomas Gunnarsson Bot-Commit: rubber-stamper@appspot.gserviceaccount.com Cr-Commit-Position: refs/heads/main@{#39820} --- audio/BUILD.gn | 2 ++ audio/audio_send_stream.cc | 42 ++++++++++++++++++--------- audio/audio_send_stream.h | 8 ++++-- audio/audio_send_stream_unittest.cc | 44 +++++++++++++++++++++-------- 4 files changed, 68 insertions(+), 28 deletions(-) diff --git a/audio/BUILD.gn b/audio/BUILD.gn index d1a42b6d6d..d3f2d87c69 100644 --- a/audio/BUILD.gn +++ b/audio/BUILD.gn @@ -86,6 +86,7 @@ rtc_library("audio") { "../modules/pacing", "../modules/rtp_rtcp", "../modules/rtp_rtcp:rtp_rtcp_format", + "../modules/utility:utility", "../rtc_base:audio_format_to_string", "../rtc_base:buffer", "../rtc_base:checks", @@ -195,6 +196,7 @@ if (rtc_include_tests) { "../modules/pacing", "../modules/rtp_rtcp:mock_rtp_rtcp", "../modules/rtp_rtcp:rtp_rtcp_format", + "../modules/utility:utility", "../rtc_base:checks", "../rtc_base:gunit_helpers", "../rtc_base:macromagic", diff --git a/audio/audio_send_stream.cc b/audio/audio_send_stream.cc index 19d71c95ad..7d6ec794d4 100644 --- a/audio/audio_send_stream.cc +++ b/audio/audio_send_stream.cc @@ -147,6 +147,7 @@ AudioSendStream::AudioSendStream( const FieldTrialsView& field_trials) : clock_(clock), field_trials_(field_trials), + rtp_transport_queue_(rtp_transport->GetWorkerQueue()), allocate_audio_without_feedback_( field_trials_.IsEnabled("WebRTC-Audio-ABWENoTWCC")), enable_audio_alr_probing_( @@ -163,6 +164,7 @@ AudioSendStream::AudioSendStream( rtp_rtcp_module_(channel_send_->GetRtpRtcp()), suspended_rtp_state_(suspended_rtp_state) { RTC_LOG(LS_INFO) << "AudioSendStream: " << config.rtp.ssrc; + RTC_DCHECK(rtp_transport_queue_); RTC_DCHECK(audio_state_); RTC_DCHECK(channel_send_); RTC_DCHECK(bitrate_allocator_); @@ -180,6 +182,10 @@ AudioSendStream::~AudioSendStream() { RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc; RTC_DCHECK(!sending_); channel_send_->ResetSenderCongestionControlObjects(); + + // Blocking call to synchronize state with worker queue to ensure that there + // are no pending tasks left that keeps references to audio. + rtp_transport_queue_->RunSynchronous([] {}); } const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const { @@ -504,7 +510,7 @@ void AudioSendStream::DeliverRtcp(const uint8_t* packet, size_t length) { } uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) { - RTC_DCHECK_RUN_ON(&worker_thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); // Pick a target bitrate between the constraints. Overrules the allocator if // it 1) allocated a bitrate of zero to disable the stream or 2) allocated a @@ -819,7 +825,6 @@ void AudioSendStream::ReconfigureBitrateObserver( } void AudioSendStream::ConfigureBitrateObserver() { - RTC_DCHECK_RUN_ON(&worker_thread_checker_); // This either updates the current observer or adds a new observer. // TODO(srte): Add overhead compensation here. auto constraints = GetMinMaxBitrateConstraints(); @@ -841,24 +846,30 @@ void AudioSendStream::ConfigureBitrateObserver() { priority_bitrate += min_overhead; } - if (allocation_settings_.priority_bitrate_raw) { + if (allocation_settings_.priority_bitrate_raw) priority_bitrate = *allocation_settings_.priority_bitrate_raw; - } - - bitrate_allocator_->AddObserver( - this, - MediaStreamAllocationConfig{ - constraints->min.bps(), constraints->max.bps(), 0, - priority_bitrate.bps(), true, - allocation_settings_.bitrate_priority.value_or( - config_.bitrate_priority)}); + rtp_transport_queue_->RunOrPost([this, constraints, priority_bitrate, + config_bitrate_priority = + config_.bitrate_priority] { + RTC_DCHECK_RUN_ON(rtp_transport_queue_); + bitrate_allocator_->AddObserver( + this, + MediaStreamAllocationConfig{ + constraints->min.bps(), constraints->max.bps(), + 0, priority_bitrate.bps(), true, + allocation_settings_.bitrate_priority.value_or( + config_bitrate_priority)}); + }); registered_with_allocator_ = true; } void AudioSendStream::RemoveBitrateObserver() { registered_with_allocator_ = false; - bitrate_allocator_->RemoveObserver(this); + rtp_transport_queue_->RunSynchronous([this] { + RTC_DCHECK_RUN_ON(rtp_transport_queue_); + bitrate_allocator_->RemoveObserver(this); + }); } absl::optional @@ -919,7 +930,10 @@ void AudioSendStream::UpdateCachedTargetAudioBitrateConstraints() { if (!new_constraints.has_value()) { return; } - cached_constraints_ = new_constraints; + rtp_transport_queue_->RunOrPost([this, new_constraints]() { + RTC_DCHECK_RUN_ON(rtp_transport_queue_); + cached_constraints_ = new_constraints; + }); } } // namespace internal diff --git a/audio/audio_send_stream.h b/audio/audio_send_stream.h index 6cda9c3d52..42be43afb9 100644 --- a/audio/audio_send_stream.h +++ b/audio/audio_send_stream.h @@ -25,6 +25,7 @@ #include "call/audio_state.h" #include "call/bitrate_allocator.h" #include "modules/rtp_rtcp/source/rtp_rtcp_interface.h" +#include "modules/utility/maybe_worker_thread.h" #include "rtc_base/experiments/struct_parameters_parser.h" #include "rtc_base/race_checker.h" #include "rtc_base/synchronization/mutex.h" @@ -172,6 +173,7 @@ class AudioSendStream final : public webrtc::AudioSendStream, SequenceChecker worker_thread_checker_; rtc::RaceChecker audio_capture_race_checker_; + MaybeWorkerThread* rtp_transport_queue_; const bool allocate_audio_without_feedback_; const bool force_no_audio_feedback_ = allocate_audio_without_feedback_; @@ -194,10 +196,10 @@ class AudioSendStream final : public webrtc::AudioSendStream, webrtc::voe::AudioLevel audio_level_ RTC_GUARDED_BY(audio_level_lock_); BitrateAllocatorInterface* const bitrate_allocator_ - RTC_GUARDED_BY(worker_thread_checker_); + RTC_GUARDED_BY(rtp_transport_queue_); + // Constrains cached to be accessed from `rtp_transport_queue_`. absl::optional - cached_constraints_ RTC_GUARDED_BY(worker_thread_checker_) = - absl::nullopt; + cached_constraints_ RTC_GUARDED_BY(rtp_transport_queue_) = absl::nullopt; RtpTransportControllerSendInterface* const rtp_transport_; RtpRtcpInterface* const rtp_rtcp_module_; diff --git a/audio/audio_send_stream_unittest.cc b/audio/audio_send_stream_unittest.cc index a6450d3e89..a81b40cbe7 100644 --- a/audio/audio_send_stream_unittest.cc +++ b/audio/audio_send_stream_unittest.cc @@ -30,6 +30,7 @@ #include "modules/audio_processing/include/mock_audio_processing.h" #include "modules/rtp_rtcp/mocks/mock_rtcp_bandwidth_observer.h" #include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h" +#include "modules/utility/maybe_worker_thread.h" #include "system_wrappers/include/clock.h" #include "test/gtest.h" #include "test/mock_audio_encoder.h" @@ -154,6 +155,9 @@ struct ConfigHelper { ? nullptr : rtc::make_ref_counted>()), bitrate_allocator_(&limit_observer_), + worker_queue_(field_trials, + "ConfigHelper_worker_queue", + time_controller_.GetTaskQueueFactory()), audio_encoder_(nullptr) { using ::testing::Invoke; @@ -184,6 +188,8 @@ struct ConfigHelper { } std::unique_ptr CreateAudioSendStream() { + EXPECT_CALL(rtp_transport_, GetWorkerQueue()) + .WillRepeatedly(Return(&worker_queue_)); return std::unique_ptr( new internal::AudioSendStream( time_controller_.GetClock(), stream_config_, audio_state_, @@ -313,6 +319,8 @@ struct ConfigHelper { } } + MaybeWorkerThread* worker() { return &worker_queue_; } + test::ScopedKeyValueConfig field_trials; private: @@ -328,6 +336,9 @@ struct ConfigHelper { ::testing::NiceMock rtp_rtcp_; ::testing::NiceMock limit_observer_; BitrateAllocator bitrate_allocator_; + // `worker_queue` is defined last to ensure all pending tasks are cancelled + // and deleted before any other members. + MaybeWorkerThread worker_queue_; std::unique_ptr audio_encoder_; }; @@ -625,7 +636,8 @@ TEST(AudioSendStreamTest, DoesNotPassHigherBitrateThanMaxBitrate) { update.packet_loss_ratio = 0; update.round_trip_time = TimeDelta::Millis(50); update.bwe_period = TimeDelta::Millis(6000); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -641,7 +653,8 @@ TEST(AudioSendStreamTest, SSBweTargetInRangeRespected) { BitrateAllocationUpdate update; update.target_bitrate = DataRate::BitsPerSec(helper.config().max_bitrate_bps - 5000); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -657,7 +670,8 @@ TEST(AudioSendStreamTest, SSBweFieldTrialMinRespected) { Eq(DataRate::KilobitsPerSec(6))))); BitrateAllocationUpdate update; update.target_bitrate = DataRate::KilobitsPerSec(1); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -673,7 +687,8 @@ TEST(AudioSendStreamTest, SSBweFieldTrialMaxRespected) { Eq(DataRate::KilobitsPerSec(64))))); BitrateAllocationUpdate update; update.target_bitrate = DataRate::KilobitsPerSec(128); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -693,7 +708,8 @@ TEST(AudioSendStreamTest, SSBweWithOverhead) { &BitrateAllocationUpdate::target_bitrate, Eq(bitrate)))); BitrateAllocationUpdate update; update.target_bitrate = bitrate; - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -713,7 +729,8 @@ TEST(AudioSendStreamTest, SSBweWithOverheadMinRespected) { &BitrateAllocationUpdate::target_bitrate, Eq(bitrate)))); BitrateAllocationUpdate update; update.target_bitrate = DataRate::KilobitsPerSec(1); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -733,7 +750,8 @@ TEST(AudioSendStreamTest, SSBweWithOverheadMaxRespected) { &BitrateAllocationUpdate::target_bitrate, Eq(bitrate)))); BitrateAllocationUpdate update; update.target_bitrate = DataRate::KilobitsPerSec(128); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -751,7 +769,8 @@ TEST(AudioSendStreamTest, ProbingIntervalOnBitrateUpdated) { update.packet_loss_ratio = 0; update.round_trip_time = TimeDelta::Millis(50); update.bwe_period = TimeDelta::Millis(5000); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); } } @@ -853,7 +872,8 @@ TEST(AudioSendStreamTest, AudioOverheadChanged) { DataRate::BitsPerSec(helper.config().max_bitrate_bps) + kMaxOverheadRate; EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); EXPECT_EQ(audio_overhead_per_packet_bytes, send_stream->TestOnlyGetPerPacketOverheadBytes()); @@ -861,7 +881,8 @@ TEST(AudioSendStreamTest, AudioOverheadChanged) { EXPECT_CALL(*helper.rtp_rtcp(), ExpectedPerPacketOverhead) .WillRepeatedly(Return(audio_overhead_per_packet_bytes + 20)); EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); EXPECT_EQ(audio_overhead_per_packet_bytes + 20, send_stream->TestOnlyGetPerPacketOverheadBytes()); @@ -885,7 +906,8 @@ TEST(AudioSendStreamTest, OnAudioAndTransportOverheadChanged) { DataRate::BitsPerSec(helper.config().max_bitrate_bps) + kMaxOverheadRate; EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation); - send_stream->OnBitrateUpdated(update); + helper.worker()->RunSynchronous( + [&] { send_stream->OnBitrateUpdated(update); }); EXPECT_EQ( transport_overhead_per_packet_bytes + audio_overhead_per_packet_bytes,