From 675dfb4a1f3cd1c53e88cba0f223d18d55c31b1f Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Mon, 20 Jun 2022 12:46:30 +0200 Subject: [PATCH] Move receive side congestion controller periodic task to worker thread This way call no longer needs dedicated process thread Bug: webrtc:7219 Change-Id: I8ab677b1e6b909eeb726aefed5e6d10ce4bc43b7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265921 Commit-Queue: Danil Chapovalov Reviewed-by: Philip Eliasson Cr-Commit-Position: refs/heads/main@{#37279} --- call/BUILD.gn | 1 + call/call.cc | 28 ++++++++----------- .../receive_side_congestion_controller.h | 12 ++++---- .../receive_side_congestion_controller.cc | 15 ++++++++++ 4 files changed, 35 insertions(+), 21 deletions(-) diff --git a/call/BUILD.gn b/call/BUILD.gn index 1f5e707c29..7e8eb6549d 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -332,6 +332,7 @@ rtc_library("call") { "../rtc_base/experiments:field_trial_parser", "../rtc_base/network:sent_packet", "../rtc_base/system:no_unique_address", + "../rtc_base/task_utils:repeating_task", "../system_wrappers", "../system_wrappers:field_trial", "../system_wrappers:metrics", diff --git a/call/call.cc b/call/call.cc index d84964dd1c..b86cc10ac2 100644 --- a/call/call.cc +++ b/call/call.cc @@ -57,6 +57,7 @@ #include "rtc_base/logging.h" #include "rtc_base/strings/string_builder.h" #include "rtc_base/system/no_unique_address.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" #include "rtc_base/trace_event.h" @@ -203,7 +204,6 @@ class Call final : public webrtc::Call, Call(Clock* clock, const Call::Config& config, std::unique_ptr transport_send, - rtc::scoped_refptr module_process_thread, TaskQueueFactory* task_queue_factory); ~Call() override; @@ -378,7 +378,6 @@ class Call final : public webrtc::Call, RTC_NO_UNIQUE_ADDRESS SequenceChecker send_transport_sequence_checker_; const int num_cpu_cores_; - const rtc::scoped_refptr module_process_thread_; const std::unique_ptr call_stats_; const std::unique_ptr bitrate_allocator_; const Call::Config config_ RTC_GUARDED_BY(worker_thread_); @@ -456,6 +455,7 @@ class Call final : public webrtc::Call, std::atomic configured_max_padding_bitrate_bps_{0}; ReceiveSideCongestionController receive_side_cc_; + RepeatingTaskHandle receive_side_cc_periodic_task_; const std::unique_ptr receive_time_calculator_; @@ -513,7 +513,7 @@ Call* Call::Create(const Call::Config& config) { Call* Call::Create(const Call::Config& config, Clock* clock, - rtc::scoped_refptr call_thread, + rtc::scoped_refptr /*call_thread*/, std::unique_ptr pacer_thread) { RTC_DCHECK(config.task_queue_factory); @@ -524,17 +524,17 @@ Call* Call::Create(const Call::Config& config, return new internal::Call( clock, config, transport_controller_factory_.Create(transportConfig, clock), - std::move(call_thread), config.task_queue_factory); + config.task_queue_factory); } Call* Call::Create(const Call::Config& config, Clock* clock, - rtc::scoped_refptr call_thread, + rtc::scoped_refptr /*call_thread*/, std::unique_ptr transportControllerSend) { RTC_DCHECK(config.task_queue_factory); return new internal::Call(clock, config, std::move(transportControllerSend), - std::move(call_thread), config.task_queue_factory); + config.task_queue_factory); } class SharedModuleThread::Impl { @@ -796,7 +796,6 @@ void Call::SendStats::SetMinAllocatableRate(BitrateAllocationLimits limits) { Call::Call(Clock* clock, const Call::Config& config, std::unique_ptr transport_send, - rtc::scoped_refptr module_process_thread, TaskQueueFactory* task_queue_factory) : clock_(clock), task_queue_factory_(task_queue_factory), @@ -811,7 +810,6 @@ Call::Call(Clock* clock, worker_thread_) : nullptr), num_cpu_cores_(CpuInfo::DetectNumberOfCores()), - module_process_thread_(std::move(module_process_thread)), call_stats_(new CallStats(clock_, worker_thread_)), bitrate_allocator_(new BitrateAllocator(this)), config_(config), @@ -849,10 +847,11 @@ Call::Call(Clock* clock, call_stats_->RegisterStatsObserver(&receive_side_cc_); - module_process_thread_->process_thread()->RegisterModule( - receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE); - module_process_thread_->process_thread()->RegisterModule(&receive_side_cc_, - RTC_FROM_HERE); + ReceiveSideCongestionController* receive_side_cc = &receive_side_cc_; + receive_side_cc_periodic_task_ = RepeatingTaskHandle::Start( + worker_thread_, + [receive_side_cc] { return receive_side_cc->MaybeProcess(); }, + TaskQueueBase::DelayPrecision::kLow, clock_); } Call::~Call() { @@ -864,9 +863,7 @@ Call::~Call() { RTC_CHECK(audio_receive_streams_.empty()); RTC_CHECK(video_receive_streams_.empty()); - module_process_thread_->process_thread()->DeRegisterModule( - receive_side_cc_.GetRemoteBitrateEstimator(true)); - module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_); + receive_side_cc_periodic_task_.Stop(); call_stats_->DeregisterStatsObserver(&receive_side_cc_); send_stats_.SetFirstPacketTime(transport_send_->GetFirstPacketTime()); @@ -887,7 +884,6 @@ void Call::EnsureStarted() { // off being kicked off on request rather than in the ctor. transport_send_->RegisterTargetTransferRateObserver(this); - module_process_thread_->EnsureStarted(); transport_send_->EnsureStarted(); } diff --git a/modules/congestion_controller/include/receive_side_congestion_controller.h b/modules/congestion_controller/include/receive_side_congestion_controller.h index 1dcb468569..53a0de21c7 100644 --- a/modules/congestion_controller/include/receive_side_congestion_controller.h +++ b/modules/congestion_controller/include/receive_side_congestion_controller.h @@ -32,8 +32,7 @@ class RemoteBitrateObserver; // relaying for each received RTP packet back to the sender. While for // receive side bandwidth estimation, we do the estimation locally and // send our results back to the sender. -class ReceiveSideCongestionController : public CallStatsObserver, - public Module { +class ReceiveSideCongestionController : public CallStatsObserver { public: ReceiveSideCongestionController( Clock* clock, @@ -65,9 +64,12 @@ class ReceiveSideCongestionController : public CallStatsObserver, void SetTransportOverhead(DataSize overhead_per_packet); - // Implements Module. - int64_t TimeUntilNextProcess() override; - void Process() override; + [[deprecated]] int64_t TimeUntilNextProcess(); + [[deprecated]] void Process(); + + // Runs periodic tasks if it is time to run them, returns time until next + // call to `MaybeProcess` should be non idle. + TimeDelta MaybeProcess(); private: class WrappingBitrateEstimator : public RemoteBitrateEstimator { diff --git a/modules/congestion_controller/receive_side_congestion_controller.cc b/modules/congestion_controller/receive_side_congestion_controller.cc index 57be0d22fd..e495a4dbc8 100644 --- a/modules/congestion_controller/receive_side_congestion_controller.cc +++ b/modules/congestion_controller/receive_side_congestion_controller.cc @@ -184,6 +184,21 @@ void ReceiveSideCongestionController::Process() { remote_bitrate_estimator_.Process(); } +TimeDelta ReceiveSideCongestionController::MaybeProcess() { + int64_t time_until_rbe_ms = remote_bitrate_estimator_.TimeUntilNextProcess(); + if (time_until_rbe_ms <= 0) { + remote_bitrate_estimator_.Process(); + time_until_rbe_ms = remote_bitrate_estimator_.TimeUntilNextProcess(); + } + int64_t time_until_rep_ms = remote_estimator_proxy_.TimeUntilNextProcess(); + if (time_until_rep_ms <= 0) { + remote_estimator_proxy_.Process(); + time_until_rep_ms = remote_estimator_proxy_.TimeUntilNextProcess(); + } + int64_t time_until_next_ms = std::min(time_until_rbe_ms, time_until_rep_ms); + return TimeDelta::Millis(std::max(time_until_next_ms, 0)); +} + void ReceiveSideCongestionController::SetMaxDesiredReceiveBitrate( DataRate bitrate) { remb_throttler_.SetMaxDesiredReceiveBitrate(bitrate);