From 317a5228766ff250fc806b320ac3c9101241c8ed Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Fri, 16 Mar 2018 15:36:37 +0100 Subject: [PATCH] Fixes to posting delayed process tasks in SSCC. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The task queue based SendSideCongestionController (SSCC) was accessing a unique pointer to the task queue from the task queue itself. This triggered a tsan check failure when resetting the same unique pointer. Also move declaration of SSCC member in RtpTransportControllerSend last, to ensure that it, and its TaskQueue, are destroyed before other members. Bug: webrtc:8415 Change-Id: I75c93f41deab637f7e4766ac4b61713c86f866e9 Reviewed-on: https://webrtc-review.googlesource.com/62143 Commit-Queue: Sebastian Jansson Reviewed-by: Björn Terelius Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#22478} --- call/rtp_transport_controller_send.cc | 8 +-- call/rtp_transport_controller_send.h | 10 +-- .../include/send_side_congestion_controller.h | 15 ++--- .../rtp/send_side_congestion_controller.cc | 66 +++++++++++++++---- ...end_side_congestion_controller_unittest.cc | 2 +- 5 files changed, 70 insertions(+), 31 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 91faed5447..a4453205b8 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -54,14 +54,14 @@ RtpTransportControllerSend::RtpTransportControllerSend( const BitrateConstraints& bitrate_config) : clock_(clock), pacer_(clock, &packet_router_, event_log), + bitrate_configurator_(bitrate_config), + process_thread_(ProcessThread::Create("SendControllerThread")), + observer_(nullptr), send_side_cc_(CreateController(clock, event_log, &pacer_, bitrate_config, - TaskQueueExperimentEnabled())), - bitrate_configurator_(bitrate_config), - process_thread_(ProcessThread::Create("SendControllerThread")), - observer_(nullptr) { + TaskQueueExperimentEnabled())) { process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE); process_thread_->Start(); diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 84ad424fc2..c9a87eddeb 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -31,8 +31,9 @@ class RtcEventLog; // TODO(nisse): When we get the underlying transports here, we should // have one object implementing RtpTransportControllerSendInterface // per transport, sharing the same congestion controller. -class RtpTransportControllerSend : public RtpTransportControllerSendInterface, - public NetworkChangedObserver { +class RtpTransportControllerSend final + : public RtpTransportControllerSendInterface, + public NetworkChangedObserver { public: RtpTransportControllerSend(Clock* clock, RtcEventLog* event_log, @@ -83,14 +84,15 @@ class RtpTransportControllerSend : public RtpTransportControllerSendInterface, const Clock* const clock_; PacketRouter packet_router_; PacedSender pacer_; - const std::unique_ptr send_side_cc_; RtpKeepAliveConfig keepalive_; RtpBitrateConfigurator bitrate_configurator_; std::map network_routes_; const std::unique_ptr process_thread_; rtc::CriticalSection observer_crit_; TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_); - + // Declared last since it will issue callbacks from a task queue. Declaring it + // last ensures that it is destroyed first. + const std::unique_ptr send_side_cc_; RTC_DISALLOW_COPY_AND_ASSIGN(RtpTransportControllerSend); }; diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index d3f86aa51e..68e4843b54 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -130,19 +130,18 @@ class SendSideCongestionController protected: // TODO(srte): The tests should be rewritten to not depend on internals and // these functions should be removed. - // Post tasks that are normally delayed. This allows unit tests to trigger - // process updates immediately. - void PostDelayedTasksForTest(); + // Post periodic tasks just once. This allows unit tests to trigger process + // updates immediately. + void PostPeriodicTasksForTest(); // Waits for outstanding tasks to be finished. This allos unit tests to ensure - // that expected callbacks has be called. + // that expected callbacks has been called. void WaitOnTasksForTest(); private: void MaybeCreateControllers(); - void StartProcess() RTC_RUN_ON(task_queue_); - void ProcessTask(); - void StartPacerQueueUpdate(); - void PacerQueueUpdateTask(); + void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); + void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); + void UpdatePacerQueue() RTC_RUN_ON(task_queue_); void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void MaybeUpdateOutstandingData(); diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index 1e4dacb953..7145fe3434 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -18,6 +18,7 @@ #include "modules/congestion_controller/network_control/include/network_types.h" #include "modules/congestion_controller/network_control/include/network_units.h" #include "modules/remote_bitrate_estimator/include/bwe_defines.h" +#include "rtc_base/bind.h" #include "rtc_base/checks.h" #include "rtc_base/format_macros.h" #include "rtc_base/logging.h" @@ -38,6 +39,7 @@ namespace webrtc_cc { namespace { const char kPacerPushbackExperiment[] = "WebRTC-PacerPushbackExperiment"; +const int64_t PacerQueueUpdateIntervalMs = 25; bool IsPacerPushbackExperimentEnabled() { return webrtc::field_trial::IsEnabled(kPacerPushbackExperiment) || @@ -96,6 +98,37 @@ TargetRateConstraints ConvertConstraints(int min_bitrate_bps, : DataRate::Infinity(); return msg; } + +// TODO(srte): Make sure this is reusable and move it to task_queue.h +// The template closure pattern is based on rtc::ClosureTask. +template +class PeriodicTask : public rtc::QueuedTask { + public: + PeriodicTask(Closure&& closure, int64_t period_ms) + : closure_(std::forward(closure)), period_ms_(period_ms) {} + bool Run() override { + closure_(); + // WrapUnique lets us repost this task on the TaskQueue. + rtc::TaskQueue::Current()->PostDelayedTask(rtc::WrapUnique(this), + period_ms_); + // Return false to tell TaskQueue to not destruct this object, since we have + // taken ownership with WrapUnique. + return false; + } + + private: + typename std::remove_const< + typename std::remove_reference::type>::type closure_; + const int64_t period_ms_; +}; + +template +static std::unique_ptr NewPeriodicTask(Closure&& closure, + int64_t period_ms) { + return rtc::MakeUnique>(std::forward(closure), + period_ms); +} + } // namespace namespace send_side_cc_internal { @@ -314,7 +347,7 @@ void SendSideCongestionController::MaybeCreateControllers() { controller_ = controller_factory_->Create(control_handler_.get(), initial_config_); UpdateStreamsConfig(); - StartProcess(); + StartProcessPeriodicTasks(); } SendSideCongestionController::~SendSideCongestionController() { @@ -506,18 +539,23 @@ void SendSideCongestionController::Process() { // Ignored, using task queue to process. } -void SendSideCongestionController::StartProcess() { +void SendSideCongestionController::StartProcessPeriodicTasks() { task_queue_->PostDelayedTask( - [this]() { - RTC_DCHECK_RUN_ON(task_queue_.get()); - ProcessTask(); - StartProcess(); - }, + NewPeriodicTask( + rtc::Bind( + &SendSideCongestionController::UpdateControllerWithTimeInterval, + this), + process_interval_.ms()), process_interval_.ms()); + + task_queue_->PostDelayedTask( + NewPeriodicTask( + rtc::Bind(&SendSideCongestionController::UpdatePacerQueue, this), + PacerQueueUpdateIntervalMs), + PacerQueueUpdateIntervalMs); } -void SendSideCongestionController::ProcessTask() { - RTC_DCHECK_RUN_ON(task_queue_.get()); +void SendSideCongestionController::UpdateControllerWithTimeInterval() { if (controller_) { ProcessInterval msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); @@ -525,8 +563,7 @@ void SendSideCongestionController::ProcessTask() { } } -void SendSideCongestionController::PacerQueueUpdateTask() { - RTC_DCHECK_RUN_ON(task_queue_.get()); +void SendSideCongestionController::UpdatePacerQueue() { if (control_handler_) { PacerQueueUpdate msg; msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs()); @@ -591,10 +628,11 @@ SendSideCongestionController::GetTransportFeedbackVector() const { return transport_feedback_adapter_.GetTransportFeedbackVector(); } -void SendSideCongestionController::PostDelayedTasksForTest() { +void SendSideCongestionController::PostPeriodicTasksForTest() { task_queue_->PostTask([this]() { - ProcessTask(); - PacerQueueUpdateTask(); + RTC_DCHECK_RUN_ON(task_queue_.get()); + UpdateControllerWithTimeInterval(); + UpdatePacerQueue(); }); } diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc index e27c559b8c..ca93b41d17 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc @@ -50,7 +50,7 @@ class SendSideCongestionControllerForTest ~SendSideCongestionControllerForTest() {} void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); } void Process() override { - SendSideCongestionController::PostDelayedTasksForTest(); + SendSideCongestionController::PostPeriodicTasksForTest(); SendSideCongestionController::WaitOnTasksForTest(); } };