diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 91faed5447..a4453205b8 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -54,14 +54,14 @@ RtpTransportControllerSend::RtpTransportControllerSend( const BitrateConstraints& bitrate_config) : clock_(clock), pacer_(clock, &packet_router_, event_log), + bitrate_configurator_(bitrate_config), + process_thread_(ProcessThread::Create("SendControllerThread")), + observer_(nullptr), send_side_cc_(CreateController(clock, event_log, &pacer_, bitrate_config, - TaskQueueExperimentEnabled())), - bitrate_configurator_(bitrate_config), - process_thread_(ProcessThread::Create("SendControllerThread")), - observer_(nullptr) { + TaskQueueExperimentEnabled())) { process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE); process_thread_->Start(); diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 84ad424fc2..c9a87eddeb 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -31,8 +31,9 @@ class RtcEventLog; // TODO(nisse): When we get the underlying transports here, we should // have one object implementing RtpTransportControllerSendInterface // per transport, sharing the same congestion controller. -class RtpTransportControllerSend : public RtpTransportControllerSendInterface, - public NetworkChangedObserver { +class RtpTransportControllerSend final + : public RtpTransportControllerSendInterface, + public NetworkChangedObserver { public: RtpTransportControllerSend(Clock* clock, RtcEventLog* event_log, @@ -83,14 +84,15 @@ class RtpTransportControllerSend : public RtpTransportControllerSendInterface, const Clock* const clock_; PacketRouter packet_router_; PacedSender pacer_; - const std::unique_ptr send_side_cc_; RtpKeepAliveConfig keepalive_; RtpBitrateConfigurator bitrate_configurator_; std::map network_routes_; const std::unique_ptr process_thread_; rtc::CriticalSection observer_crit_; TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_); - + // Declared last since it will issue callbacks from a task queue. Declaring it + // last ensures that it is destroyed first. + const std::unique_ptr send_side_cc_; RTC_DISALLOW_COPY_AND_ASSIGN(RtpTransportControllerSend); }; diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index d3f86aa51e..68e4843b54 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -130,19 +130,18 @@ class SendSideCongestionController protected: // TODO(srte): The tests should be rewritten to not depend on internals and // these functions should be removed. - // Post tasks that are normally delayed. This allows unit tests to trigger - // process updates immediately. - void PostDelayedTasksForTest(); + // Post periodic tasks just once. This allows unit tests to trigger process + // updates immediately. + void PostPeriodicTasksForTest(); // Waits for outstanding tasks to be finished. This allos unit tests to ensure - // that expected callbacks has be called. + // that expected callbacks has been called. void WaitOnTasksForTest(); private: void MaybeCreateControllers(); - void StartProcess() RTC_RUN_ON(task_queue_); - void ProcessTask(); - void StartPacerQueueUpdate(); - void PacerQueueUpdateTask(); + void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); + void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); + void UpdatePacerQueue() RTC_RUN_ON(task_queue_); void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void MaybeUpdateOutstandingData(); diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index 1e4dacb953..7145fe3434 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -18,6 +18,7 @@ #include "modules/congestion_controller/network_control/include/network_types.h" #include "modules/congestion_controller/network_control/include/network_units.h" #include "modules/remote_bitrate_estimator/include/bwe_defines.h" +#include "rtc_base/bind.h" #include "rtc_base/checks.h" #include "rtc_base/format_macros.h" #include "rtc_base/logging.h" @@ -38,6 +39,7 @@ namespace webrtc_cc { namespace { const char kPacerPushbackExperiment[] = "WebRTC-PacerPushbackExperiment"; +const int64_t PacerQueueUpdateIntervalMs = 25; bool IsPacerPushbackExperimentEnabled() { return webrtc::field_trial::IsEnabled(kPacerPushbackExperiment) || @@ -96,6 +98,37 @@ TargetRateConstraints ConvertConstraints(int min_bitrate_bps, : DataRate::Infinity(); return msg; } + +// TODO(srte): Make sure this is reusable and move it to task_queue.h +// The template closure pattern is based on rtc::ClosureTask. +template +class PeriodicTask : public rtc::QueuedTask { + public: + PeriodicTask(Closure&& closure, int64_t period_ms) + : closure_(std::forward(closure)), period_ms_(period_ms) {} + bool Run() override { + closure_(); + // WrapUnique lets us repost this task on the TaskQueue. + rtc::TaskQueue::Current()->PostDelayedTask(rtc::WrapUnique(this), + period_ms_); + // Return false to tell TaskQueue to not destruct this object, since we have + // taken ownership with WrapUnique. + return false; + } + + private: + typename std::remove_const< + typename std::remove_reference::type>::type closure_; + const int64_t period_ms_; +}; + +template +static std::unique_ptr NewPeriodicTask(Closure&& closure, + int64_t period_ms) { + return rtc::MakeUnique>(std::forward(closure), + period_ms); +} + } // namespace namespace send_side_cc_internal { @@ -314,7 +347,7 @@ void SendSideCongestionController::MaybeCreateControllers() { controller_ = controller_factory_->Create(control_handler_.get(), initial_config_); UpdateStreamsConfig(); - StartProcess(); + StartProcessPeriodicTasks(); } SendSideCongestionController::~SendSideCongestionController() { @@ -506,18 +539,23 @@ void SendSideCongestionController::Process() { // Ignored, using task queue to process. } -void SendSideCongestionController::StartProcess() { +void SendSideCongestionController::StartProcessPeriodicTasks() { task_queue_->PostDelayedTask( - [this]() { - RTC_DCHECK_RUN_ON(task_queue_.get()); - ProcessTask(); - StartProcess(); - }, + NewPeriodicTask( + rtc::Bind( + &SendSideCongestionController::UpdateControllerWithTimeInterval, + this), + process_interval_.ms()), process_interval_.ms()); + + task_queue_->PostDelayedTask( + NewPeriodicTask( + rtc::Bind(&SendSideCongestionController::UpdatePacerQueue, this), + PacerQueueUpdateIntervalMs), + PacerQueueUpdateIntervalMs); } -void SendSideCongestionController::ProcessTask() { - RTC_DCHECK_RUN_ON(task_queue_.get()); +void SendSideCongestionController::UpdateControllerWithTimeInterval() { if (controller_) { ProcessInterval msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); @@ -525,8 +563,7 @@ void SendSideCongestionController::ProcessTask() { } } -void SendSideCongestionController::PacerQueueUpdateTask() { - RTC_DCHECK_RUN_ON(task_queue_.get()); +void SendSideCongestionController::UpdatePacerQueue() { if (control_handler_) { PacerQueueUpdate msg; msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs()); @@ -591,10 +628,11 @@ SendSideCongestionController::GetTransportFeedbackVector() const { return transport_feedback_adapter_.GetTransportFeedbackVector(); } -void SendSideCongestionController::PostDelayedTasksForTest() { +void SendSideCongestionController::PostPeriodicTasksForTest() { task_queue_->PostTask([this]() { - ProcessTask(); - PacerQueueUpdateTask(); + RTC_DCHECK_RUN_ON(task_queue_.get()); + UpdateControllerWithTimeInterval(); + UpdatePacerQueue(); }); } diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc index e27c559b8c..ca93b41d17 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc @@ -50,7 +50,7 @@ class SendSideCongestionControllerForTest ~SendSideCongestionControllerForTest() {} void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); } void Process() override { - SendSideCongestionController::PostDelayedTasksForTest(); + SendSideCongestionController::PostPeriodicTasksForTest(); SendSideCongestionController::WaitOnTasksForTest(); } };