From 5f22be7cf831e0b81bba8a57bb94e81cc353981d Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Tue, 13 Mar 2018 11:38:45 +0100 Subject: [PATCH] Congestion controller processing using delayed tasks. Replacing Module based mechanism for processing with posting tasks. This prepares for allowing the interval to be changed at runtime and for removing the dependency on Module threads. Bug: webrtc:8415 Change-Id: Iaad50466bec695be4ba26d8bd670a1981f2e0df4 Reviewed-on: https://webrtc-review.googlesource.com/60862 Commit-Queue: Sebastian Jansson Reviewed-by: Stefan Holmer Cr-Commit-Position: refs/heads/master@{#22406} --- .../include/send_side_congestion_controller.h | 20 ++++--- .../rtp/send_side_congestion_controller.cc | 55 ++++++++++++------- ...end_side_congestion_controller_unittest.cc | 6 +- 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index 91f7117f7e..1bcc543c2e 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -137,11 +137,21 @@ class SendSideCongestionController void SetPacingFactor(float pacing_factor); protected: - // Waits long enough that any outstanding tasks should be finished. - void WaitOnTasks(); + // TODO(srte): The tests should be rewritten to not depend on internals and + // these functions should be removed. + // Post tasks that are normally delayed. This allows unit tests to trigger + // process updates immediately. + void PostDelayedTasksForTest(); + // Waits for outstanding tasks to be finished. This allos unit tests to ensure + // that expected callbacks has be called. + void WaitOnTasksForTest(); private: void MaybeCreateControllers(); + void StartProcess() RTC_RUN_ON(task_queue_); + void ProcessTask(); + void StartPacerQueueUpdate(); + void PacerQueueUpdateTask(); void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void MaybeUpdateOutstandingData(); @@ -166,11 +176,7 @@ class SendSideCongestionController std::unique_ptr controller_ RTC_GUARDED_BY(task_queue_); - // TODO(srte): Review access constraints of these when introducing delayed - // tasks. Only accessed from process threads. - TimeDelta process_interval_; - // Only accessed from process threads. - int64_t last_process_update_ms_ = 0; + TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_); std::map last_report_blocks_ RTC_GUARDED_BY(task_queue_); diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index 2901321048..6b69c9c2f8 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -314,6 +314,7 @@ void SendSideCongestionController::MaybeCreateControllers() { controller_ = controller_factory_->Create(control_handler_.get(), initial_config_); UpdateStreamsConfig(); + StartProcess(); } SendSideCongestionController::~SendSideCongestionController() { @@ -498,33 +499,40 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, } int64_t SendSideCongestionController::TimeUntilNextProcess() { - const int kMaxProcessInterval = 60 * 1000; - if (process_interval_.IsInfinite()) - return kMaxProcessInterval; - int64_t next_process_ms = last_process_update_ms_ + process_interval_.ms(); - int64_t time_until_next_process = - next_process_ms - clock_->TimeInMilliseconds(); - return std::max(time_until_next_process, 0); + // Using task queue to process, just sleep long to avoid wasting resources. + return 60 * 1000; } void SendSideCongestionController::Process() { - int64_t now_ms = clock_->TimeInMilliseconds(); - last_process_update_ms_ = now_ms; - { + // Ignored, using task queue to process. +} + +void SendSideCongestionController::StartProcess() { + task_queue_->PostDelayedTask( + [this]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + ProcessTask(); + StartProcess(); + }, + process_interval_.ms()); +} + +void SendSideCongestionController::ProcessTask() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) { ProcessInterval msg; - msg.at_time = Timestamp::ms(now_ms); - task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_.get()); - if (controller_) - controller_->OnProcessInterval(msg); - }); + msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); + controller_->OnProcessInterval(msg); } - task_queue_->PostTask([this]() { - RTC_DCHECK_RUN_ON(task_queue_.get()); +} + +void SendSideCongestionController::PacerQueueUpdateTask() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (control_handler_) { PacerQueueUpdate msg; msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs()); control_handler_->OnPacerQueueUpdate(msg); - }); + } } void SendSideCongestionController::AddPacket( @@ -584,7 +592,14 @@ SendSideCongestionController::GetTransportFeedbackVector() const { return transport_feedback_adapter_.GetTransportFeedbackVector(); } -void SendSideCongestionController::WaitOnTasks() { +void SendSideCongestionController::PostDelayedTasksForTest() { + task_queue_->PostTask([this]() { + ProcessTask(); + PacerQueueUpdateTask(); + }); +} + +void SendSideCongestionController::WaitOnTasksForTest() { rtc::Event event(false, false); task_queue_->PostTask([&event]() { event.Set(); }); event.Wait(rtc::Event::kForever); diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc index a982c74216..e27c559b8c 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc @@ -48,10 +48,10 @@ class SendSideCongestionControllerForTest public: using SendSideCongestionController::SendSideCongestionController; ~SendSideCongestionControllerForTest() {} - using SendSideCongestionController::WaitOnTasks; + void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); } void Process() override { - SendSideCongestionController::Process(); - SendSideCongestionController::WaitOnTasks(); + SendSideCongestionController::PostDelayedTasksForTest(); + SendSideCongestionController::WaitOnTasksForTest(); } }; } // namespace