diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index 452afb6f12..5d2ae0238a 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -52,6 +52,13 @@ namespace send_side_cc_internal { // in SendSideCongestionController, which would risk causing data race on // destruction unless members are properly ordered. class ControlHandler; + +// TODO(srte): Make sure the PeriodicTask implementation is reusable and move it +// to task_queue.h. +class PeriodicTask : public rtc::QueuedTask { + public: + virtual void Stop() = 0; +}; } // namespace send_side_cc_internal class SendSideCongestionController @@ -188,6 +195,10 @@ class SendSideCongestionController std::atomic transport_overhead_bytes_per_packet_; bool network_available_ RTC_GUARDED_BY(task_queue_ptr_); bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_ptr_); + send_side_cc_internal::PeriodicTask* pacer_queue_update_task_ + RTC_GUARDED_BY(task_queue_ptr_); + send_side_cc_internal::PeriodicTask* controller_task_ + RTC_GUARDED_BY(task_queue_ptr_); // Protects access to last_packet_feedback_vector_ in feedback adapter. // TODO(srte): Remove this checker when feedback adapter runs on task queue. diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index e1defdc140..898a9dd7d0 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -37,6 +37,7 @@ using rtc::MakeUnique; namespace webrtc { namespace webrtc_cc { namespace { +using send_side_cc_internal::PeriodicTask; const char kPacerPushbackExperiment[] = "WebRTC-PacerPushbackExperiment"; const int64_t PacerQueueUpdateIntervalMs = 25; @@ -99,39 +100,58 @@ TargetRateConstraints ConvertConstraints(int min_bitrate_bps, return msg; } -// TODO(srte): Make sure this is reusable and move it to task_queue.h // The template closure pattern is based on rtc::ClosureTask. template -class PeriodicTask : public rtc::QueuedTask { +class PeriodicTaskImpl final : public PeriodicTask { public: - PeriodicTask(Closure&& closure, int64_t period_ms) - : closure_(std::forward(closure)), period_ms_(period_ms) {} + PeriodicTaskImpl(rtc::TaskQueue* task_queue, + int64_t period_ms, + Closure&& closure) + : task_queue_(task_queue), + period_ms_(period_ms), + closure_(std::forward(closure)) {} bool Run() override { + if (!running_) + return true; closure_(); // WrapUnique lets us repost this task on the TaskQueue. - rtc::TaskQueue::Current()->PostDelayedTask(rtc::WrapUnique(this), - period_ms_); + task_queue_->PostDelayedTask(rtc::WrapUnique(this), period_ms_); // Return false to tell TaskQueue to not destruct this object, since we have // taken ownership with WrapUnique. return false; } + void Stop() override { + if (task_queue_->IsCurrent()) { + RTC_DCHECK(running_); + running_ = false; + } else { + task_queue_->PostTask([this] { Stop(); }); + } + } private: + rtc::TaskQueue* const task_queue_; + const int64_t period_ms_; typename std::remove_const< typename std::remove_reference::type>::type closure_; - const int64_t period_ms_; + bool running_ = true; }; template -static std::unique_ptr NewPeriodicTask(Closure&& closure, - int64_t period_ms) { - return rtc::MakeUnique>(std::forward(closure), - period_ms); +static PeriodicTask* StartPeriodicTask(rtc::TaskQueue* task_queue, + int64_t period_ms, + Closure&& closure) { + auto periodic_task = rtc::MakeUnique>( + task_queue, period_ms, std::forward(closure)); + PeriodicTask* periodic_task_ptr = periodic_task.get(); + task_queue->PostDelayedTask(std::move(periodic_task), period_ms); + return periodic_task_ptr; } } // namespace namespace send_side_cc_internal { + class ControlHandler : public NetworkControllerObserver { public: ControlHandler(NetworkChangedObserver* observer, @@ -315,6 +335,8 @@ SendSideCongestionController::SendSideCongestionController( transport_overhead_bytes_per_packet_(0), network_available_(false), periodic_tasks_enabled_(true), + pacer_queue_update_task_(nullptr), + controller_task_(nullptr), task_queue_(MakeUnique("SendSideCCQueue")) { task_queue_ptr_ = task_queue_.get(); initial_config_.constraints = @@ -550,19 +572,29 @@ void SendSideCongestionController::Process() { void SendSideCongestionController::StartProcessPeriodicTasks() { if (!periodic_tasks_enabled_) return; - task_queue_ptr_->PostDelayedTask( - NewPeriodicTask( - rtc::Bind( - &SendSideCongestionController::UpdateControllerWithTimeInterval, - this), - process_interval_.ms()), - process_interval_.ms()); - - task_queue_ptr_->PostDelayedTask( - NewPeriodicTask( - rtc::Bind(&SendSideCongestionController::UpdatePacerQueue, this), - PacerQueueUpdateIntervalMs), - PacerQueueUpdateIntervalMs); + if (!pacer_queue_update_task_) { + pacer_queue_update_task_ = StartPeriodicTask( + task_queue_ptr_, PacerQueueUpdateIntervalMs, [this]() { + RTC_DCHECK_RUN_ON(task_queue_ptr_); + UpdatePacerQueue(); + }); + } + if (controller_task_) { + // Stop is not synchronous, but is guaranteed to occur before the first + // invocation of the new controller task started below. + controller_task_->Stop(); + controller_task_ = nullptr; + } + if (process_interval_.IsFinite()) { + // The controller task is owned by the task queue and lives until the task + // queue is destroyed or some time after Stop() is called, whichever comes + // first. + controller_task_ = + StartPeriodicTask(task_queue_ptr_, process_interval_.ms(), [this]() { + RTC_DCHECK_RUN_ON(task_queue_ptr_); + UpdateControllerWithTimeInterval(); + }); + } } void SendSideCongestionController::UpdateControllerWithTimeInterval() {