diff --git a/BUILD.gn b/BUILD.gn index 98dae4e18a..d9c9d39686 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -517,6 +517,7 @@ if (rtc_include_tests) { testonly = true deps = [ "modules/congestion_controller/goog_cc:goog_cc_slow_tests", + "rtc_base/task_utils:repeating_task_unittests", "test:test_main", ] } diff --git a/call/BUILD.gn b/call/BUILD.gn index 6da29fdd91..4362fd18f0 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -146,6 +146,7 @@ rtc_source_set("rtp_sender") { "../rtc_base:rtc_base", "../rtc_base:rtc_base_approved", "../rtc_base:rtc_task_queue", + "../rtc_base/task_utils:repeating_task", "../system_wrappers:field_trial", "//third_party/abseil-cpp/absl/container:inlined_vector", "//third_party/abseil-cpp/absl/memory", diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 9f173b1ad1..a5fe7d513c 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -26,16 +26,11 @@ #include "system_wrappers/include/field_trial.h" namespace webrtc { -class RtpTransportControllerSend::PeriodicTask : public rtc::QueuedTask { - public: - virtual void Stop() = 0; -}; - namespace { static const int64_t kRetransmitWindowSizeMs = 500; static const size_t kMaxOverheadBytes = 500; -const int64_t PacerQueueUpdateIntervalMs = 25; +constexpr TimeDelta kPacerQueueUpdateInterval = TimeDelta::Millis<25>(); TargetRateConstraints ConvertConstraints(int min_bitrate_bps, int max_bitrate_bps, @@ -58,56 +53,6 @@ TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints, contraints.max_bitrate_bps, contraints.start_bitrate_bps, clock); } - -// The template closure pattern is based on rtc::ClosureTask. -template -class PeriodicTaskImpl final : public RtpTransportControllerSend::PeriodicTask { - public: - PeriodicTaskImpl(rtc::TaskQueue* task_queue, - int64_t period_ms, - Closure&& closure) - : task_queue_(task_queue), - period_ms_(period_ms), - closure_(std::forward(closure)) {} - bool Run() override { - if (!running_) - return true; - closure_(); - // absl::WrapUnique lets us repost this task on the TaskQueue. - task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_); - // Return false to tell TaskQueue to not destruct this object, since we have - // taken ownership with absl::WrapUnique. - return false; - } - void Stop() override { - if (task_queue_->IsCurrent()) { - RTC_DCHECK(running_); - running_ = false; - } else { - task_queue_->PostTask([this] { Stop(); }); - } - } - - private: - rtc::TaskQueue* const task_queue_; - const int64_t period_ms_; - typename std::remove_const< - typename std::remove_reference::type>::type closure_; - bool running_ = true; -}; - -template -static RtpTransportControllerSend::PeriodicTask* StartPeriodicTask( - rtc::TaskQueue* task_queue, - int64_t period_ms, - Closure&& closure) { - auto periodic_task = absl::make_unique>( - task_queue, period_ms, std::forward(closure)); - RtpTransportControllerSend::PeriodicTask* periodic_task_ptr = - periodic_task.get(); - task_queue->PostDelayedTask(std::move(periodic_task), period_ms); - return periodic_task_ptr; -} } // namespace RtpTransportControllerSend::RtpTransportControllerSend( @@ -135,8 +80,6 @@ RtpTransportControllerSend::RtpTransportControllerSend( transport_overhead_bytes_per_packet_(0), network_available_(false), packet_feedback_available_(false), - pacer_queue_update_task_(nullptr), - controller_task_(nullptr), retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs), task_queue_("rtp_send_controller") { initial_config_.constraints = ConvertConstraints(bitrate_config, clock_); @@ -570,30 +513,24 @@ void RtpTransportControllerSend::UpdateInitialConstraints( } void RtpTransportControllerSend::StartProcessPeriodicTasks() { - if (!pacer_queue_update_task_) { - pacer_queue_update_task_ = - StartPeriodicTask(&task_queue_, PacerQueueUpdateIntervalMs, [this]() { + if (!pacer_queue_update_task_.Running()) { + pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart( + &task_queue_, kPacerQueueUpdateInterval, [this]() { RTC_DCHECK_RUN_ON(&task_queue_); TimeDelta expected_queue_time = TimeDelta::ms(pacer_.ExpectedQueueTimeMs()); control_handler_->SetPacerQueue(expected_queue_time); UpdateControlState(); + return kPacerQueueUpdateInterval; }); } - if (controller_task_) { - // Stop is not synchronous, but is guaranteed to occur before the first - // invocation of the new controller task started below. - controller_task_->Stop(); - controller_task_ = nullptr; - } + controller_task_.Stop(); if (process_interval_.IsFinite()) { - // The controller task is owned by the task queue and lives until the task - // queue is destroyed or some time after Stop() is called, whichever comes - // first. - controller_task_ = - StartPeriodicTask(&task_queue_, process_interval_.ms(), [this]() { + controller_task_ = RepeatingTaskHandle::DelayedStart( + &task_queue_, process_interval_, [this]() { RTC_DCHECK_RUN_ON(&task_queue_); UpdateControllerWithTimeInterval(); + return process_interval_; }); } } diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 2581a774e3..6a15648738 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -29,6 +29,7 @@ #include "rtc_base/network_route.h" #include "rtc_base/race_checker.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" namespace webrtc { class Clock; @@ -120,9 +121,6 @@ class RtpTransportControllerSend final // Implements CallStatsObserver interface void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override; - - class PeriodicTask; - private: void MaybeCreateControllers() RTC_RUN_ON(task_queue_); void UpdateInitialConstraints(TargetRateConstraints new_contraints) @@ -181,10 +179,8 @@ class RtpTransportControllerSend final std::atomic transport_overhead_bytes_per_packet_; bool network_available_ RTC_GUARDED_BY(task_queue_); bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_); - PeriodicTask* pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_) - RTC_PT_GUARDED_BY(task_queue_); - PeriodicTask* controller_task_ RTC_GUARDED_BY(task_queue_) - RTC_PT_GUARDED_BY(task_queue_); + RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); + RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); // Protects access to last_packet_feedback_vector_ in feedback adapter. // TODO(srte): Remove this checker when feedback adapter runs on task queue. rtc::RaceChecker worker_race_; diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index 3524b44c7f..3ea8666c22 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -252,8 +252,8 @@ rtc_source_set("rtcp_transceiver") { "../../api/video:video_bitrate_allocation", "../../rtc_base:checks", "../../rtc_base:rtc_base_approved", - "../../rtc_base:rtc_cancelable_task", "../../rtc_base:rtc_task_queue", + "../../rtc_base/task_utils:repeating_task", "../../system_wrappers", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc index 5acad08db9..5d2cd6ea22 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc @@ -29,10 +29,10 @@ #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h" #include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h" #include "modules/rtp_rtcp/source/time_util.h" -#include "rtc_base/cancelable_periodic_task.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/time_utils.h" namespace webrtc { @@ -90,15 +90,18 @@ class RtcpTransceiverImpl::PacketSender { RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config) : config_(config), ready_to_send_(config.initial_ready_to_send) { RTC_CHECK(config_.Validate()); - if (ready_to_send_ && config_.schedule_periodic_compound_packets) - SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms); + if (ready_to_send_ && config_.schedule_periodic_compound_packets) { + config_.task_queue->PostTask([this] { + SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms); + }); + } } RtcpTransceiverImpl::~RtcpTransceiverImpl() { // If RtcpTransceiverImpl is destroyed off task queue, assume it is destroyed // after TaskQueue. In that case there is no need to Cancel periodic task. if (config_.task_queue == rtc::TaskQueue::Current()) { - periodic_task_handle_.Cancel(); + periodic_task_handle_.Stop(); } } @@ -126,7 +129,7 @@ void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver( void RtcpTransceiverImpl::SetReadyToSend(bool ready) { if (config_.schedule_periodic_compound_packets) { if (ready_to_send_ && !ready) - periodic_task_handle_.Cancel(); + periodic_task_handle_.Stop(); if (!ready_to_send_ && ready) // Restart periodic sending. SchedulePeriodicCompoundPackets(config_.report_period_ms / 2); @@ -323,24 +326,19 @@ void RtcpTransceiverImpl::HandleTargetBitrate( void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() { if (!config_.schedule_periodic_compound_packets) return; - periodic_task_handle_.Cancel(); + periodic_task_handle_.Stop(); RTC_DCHECK(ready_to_send_); SchedulePeriodicCompoundPackets(config_.report_period_ms); } void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) { - auto task = rtc::CreateCancelablePeriodicTask([this] { - RTC_DCHECK(config_.schedule_periodic_compound_packets); - RTC_DCHECK(ready_to_send_); - SendPeriodicCompoundPacket(); - return config_.report_period_ms; - }); - periodic_task_handle_ = task->GetCancellationHandle(); - - if (delay_ms > 0) - config_.task_queue->PostDelayedTask(std::move(task), delay_ms); - else - config_.task_queue->PostTask(std::move(task)); + periodic_task_handle_ = RepeatingTaskHandle::DelayedStart( + config_.task_queue, TimeDelta::ms(delay_ms), [this] { + RTC_DCHECK(config_.schedule_periodic_compound_packets); + RTC_DCHECK(ready_to_send_); + SendPeriodicCompoundPacket(); + return TimeDelta::ms(config_.report_period_ms); + }); } void RtcpTransceiverImpl::CreateCompoundPacket(PacketSender* sender) { diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h index eb9086f53e..b08dd56b8b 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h @@ -24,7 +24,7 @@ #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h" #include "modules/rtp_rtcp/source/rtcp_packet/target_bitrate.h" #include "modules/rtp_rtcp/source/rtcp_transceiver_config.h" -#include "rtc_base/cancelable_task_handle.h" +#include "rtc_base/task_utils/repeating_task.h" #include "system_wrappers/include/ntp_time.h" namespace webrtc { @@ -96,7 +96,7 @@ class RtcpTransceiverImpl { // TODO(danilchap): Remove entries from remote_senders_ that are no longer // needed. std::map remote_senders_; - rtc::CancelableTaskHandle periodic_task_handle_; + RepeatingTaskHandle periodic_task_handle_; }; } // namespace webrtc diff --git a/modules/video_coding/BUILD.gn b/modules/video_coding/BUILD.gn index f3eff5fd99..cce28dc734 100644 --- a/modules/video_coding/BUILD.gn +++ b/modules/video_coding/BUILD.gn @@ -293,6 +293,7 @@ rtc_source_set("video_coding_utility") { "../../rtc_base:sequenced_task_checker", "../../rtc_base/experiments:quality_scaling_experiment", "../../rtc_base/system:arch", + "../../rtc_base/task_utils:repeating_task", "../../system_wrappers", "../../system_wrappers:field_trial", "../rtp_rtcp:rtp_rtcp_format", diff --git a/modules/video_coding/utility/quality_scaler.cc b/modules/video_coding/utility/quality_scaler.cc index e41a010d5e..c55ee2f3b2 100644 --- a/modules/video_coding/utility/quality_scaler.cc +++ b/modules/video_coding/utility/quality_scaler.cc @@ -11,6 +11,7 @@ #include "modules/video_coding/utility/quality_scaler.h" #include +#include #include "absl/types/optional.h" #include "rtc_base/checks.h" @@ -64,34 +65,6 @@ class QualityScaler::QpSmoother { rtc::ExpFilter smoother_; }; -class QualityScaler::CheckQpTask : public rtc::QueuedTask { - public: - explicit CheckQpTask(QualityScaler* scaler) : scaler_(scaler) { - RTC_LOG(LS_INFO) << "Created CheckQpTask. Scheduling on queue..."; - rtc::TaskQueue::Current()->PostDelayedTask( - std::unique_ptr(this), scaler_->GetSamplingPeriodMs()); - } - void Stop() { - RTC_DCHECK_CALLED_SEQUENTIALLY(&task_checker_); - RTC_LOG(LS_INFO) << "Stopping QP Check task."; - stop_ = true; - } - - private: - bool Run() override { - RTC_DCHECK_CALLED_SEQUENTIALLY(&task_checker_); - if (stop_) - return true; // TaskQueue will free this task. - scaler_->CheckQp(); - rtc::TaskQueue::Current()->PostDelayedTask( - std::unique_ptr(this), scaler_->GetSamplingPeriodMs()); - return false; // Retain the task in order to reuse it. - } - - QualityScaler* const scaler_; - bool stop_ = false; - rtc::SequencedTaskChecker task_checker_; -}; QualityScaler::QualityScaler(AdaptationObserverInterface* observer, VideoEncoder::QpThresholds thresholds) @@ -101,8 +74,7 @@ QualityScaler::QualityScaler(AdaptationObserverInterface* observer, QualityScaler::QualityScaler(AdaptationObserverInterface* observer, VideoEncoder::QpThresholds thresholds, int64_t sampling_period_ms) - : check_qp_task_(nullptr), - observer_(observer), + : observer_(observer), thresholds_(thresholds), sampling_period_ms_(sampling_period_ms), fast_rampup_(true), @@ -119,14 +91,18 @@ QualityScaler::QualityScaler(AdaptationObserverInterface* observer, qp_smoother_low_.reset(new QpSmoother(config_.alpha_low)); } RTC_DCHECK(observer_ != nullptr); - check_qp_task_ = new CheckQpTask(this); + check_qp_task_ = RepeatingTaskHandle::DelayedStart( + TimeDelta::ms(GetSamplingPeriodMs()), [this]() { + CheckQp(); + return TimeDelta::ms(GetSamplingPeriodMs()); + }); RTC_LOG(LS_INFO) << "QP thresholds: low: " << thresholds_.low << ", high: " << thresholds_.high; } QualityScaler::~QualityScaler() { RTC_DCHECK_CALLED_SEQUENTIALLY(&task_checker_); - check_qp_task_->Stop(); + check_qp_task_.Stop(); } int64_t QualityScaler::GetSamplingPeriodMs() const { diff --git a/modules/video_coding/utility/quality_scaler.h b/modules/video_coding/utility/quality_scaler.h index d9df2b86d6..1eb145addd 100644 --- a/modules/video_coding/utility/quality_scaler.h +++ b/modules/video_coding/utility/quality_scaler.h @@ -19,7 +19,7 @@ #include "rtc_base/experiments/quality_scaling_experiment.h" #include "rtc_base/numerics/moving_average.h" #include "rtc_base/sequenced_task_checker.h" -#include "rtc_base/thread_annotations.h" +#include "rtc_base/task_utils/repeating_task.h" namespace webrtc { @@ -64,7 +64,6 @@ class QualityScaler { int64_t sampling_period_ms); private: - class CheckQpTask; class QpSmoother; void CheckQp(); @@ -73,7 +72,7 @@ class QualityScaler { void ReportQpHigh(); int64_t GetSamplingPeriodMs() const; - CheckQpTask* check_qp_task_ RTC_GUARDED_BY(&task_checker_); + RepeatingTaskHandle check_qp_task_ RTC_GUARDED_BY(&task_checker_); AdaptationObserverInterface* const observer_ RTC_GUARDED_BY(&task_checker_); rtc::SequencedTaskChecker task_checker_; diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 5c5f78cc00..f8fd591aa4 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -498,6 +498,7 @@ if (rtc_include_tests) { "../rtc_base:rtc_base", "../rtc_base:rtc_base_approved", "../rtc_base:rtc_task_queue", + "../rtc_base/task_utils:repeating_task", "../rtc_base/third_party/sigslot", "../test:test_support", "../test:video_test_common", diff --git a/pc/test/fake_periodic_video_source.h b/pc/test/fake_periodic_video_source.h index 597f0ee8bd..923f10ae4d 100644 --- a/pc/test/fake_periodic_video_source.h +++ b/pc/test/fake_periodic_video_source.h @@ -18,6 +18,7 @@ #include "media/base/fake_frame_source.h" #include "media/base/video_broadcaster.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" namespace webrtc { @@ -38,10 +39,25 @@ class FakePeriodicVideoSource final FakePeriodicVideoSource() : FakePeriodicVideoSource(Config()) {} explicit FakePeriodicVideoSource(Config config) - : task_queue_( + : frame_source_( + config.width, + config.height, + config.frame_interval_ms * rtc::kNumMicrosecsPerMillisec, + config.timestamp_offset_ms * rtc::kNumMicrosecsPerMillisec), + task_queue_( absl::make_unique("FakePeriodicVideoTrackSource")) { thread_checker_.DetachFromThread(); - task_queue_->PostTask(absl::make_unique(config, &broadcaster_)); + frame_source_.SetRotation(config.rotation); + + TimeDelta frame_interval = TimeDelta::ms(config.frame_interval_ms); + RepeatingTaskHandle::Start(task_queue_.get(), [this, frame_interval] { + if (broadcaster_.wants().rotation_applied) { + broadcaster_.OnFrame(frame_source_.GetFrameRotationApplied()); + } else { + broadcaster_.OnFrame(frame_source_.GetFrame()); + } + return frame_interval; + }); } void RemoveSink(rtc::VideoSinkInterface* sink) override { @@ -61,38 +77,10 @@ class FakePeriodicVideoSource final } private: - class FrameTask : public rtc::QueuedTask { - public: - FrameTask(Config config, rtc::VideoBroadcaster* broadcaster) - : frame_interval_ms_(config.frame_interval_ms), - frame_source_( - config.width, - config.height, - config.frame_interval_ms * rtc::kNumMicrosecsPerMillisec, - config.timestamp_offset_ms * rtc::kNumMicrosecsPerMillisec), - broadcaster_(broadcaster) { - frame_source_.SetRotation(config.rotation); - } - - bool Run() override { - if (broadcaster_->wants().rotation_applied) { - broadcaster_->OnFrame(frame_source_.GetFrameRotationApplied()); - } else { - broadcaster_->OnFrame(frame_source_.GetFrame()); - } - - rtc::TaskQueue::Current()->PostDelayedTask(absl::WrapUnique(this), - frame_interval_ms_); - return false; - } - int frame_interval_ms_; - cricket::FakeFrameSource frame_source_; - rtc::VideoBroadcaster* broadcaster_; - }; - rtc::ThreadChecker thread_checker_; rtc::VideoBroadcaster broadcaster_; + cricket::FakeFrameSource frame_source_; std::unique_ptr task_queue_; }; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index cd9d1799f4..d5fcd2c65c 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -539,26 +539,6 @@ rtc_source_set("rtc_task_queue_api") { ] } -rtc_source_set("rtc_cancelable_task") { - sources = [ - "cancelable_periodic_task.h", - "cancelable_task_handle.cc", - "cancelable_task_handle.h", - ] - deps = [ - ":checks", - ":logging", - ":macromagic", - ":ptr_util", - ":refcount", - ":rtc_task_queue", - ":safe_conversions", - ":sequenced_task_checker", - ":thread_checker", - "//third_party/abseil-cpp/absl/memory", - ] -} - if (rtc_enable_libevent) { rtc_source_set("rtc_task_queue_libevent") { visibility = [ ":rtc_task_queue_impl" ] @@ -1430,7 +1410,6 @@ if (rtc_include_tests) { testonly = true sources = [ - "cancelable_periodic_task_unittest.cc", "task_queue_unittest.cc", ] deps = [ @@ -1438,7 +1417,6 @@ if (rtc_include_tests) { ":rtc_base_approved", ":rtc_base_tests_main", ":rtc_base_tests_utils", - ":rtc_cancelable_task", ":rtc_task_queue", ":rtc_task_queue_for_test", "../test:test_support", diff --git a/rtc_base/cancelable_periodic_task.h b/rtc_base/cancelable_periodic_task.h deleted file mode 100644 index d11301597a..0000000000 --- a/rtc_base/cancelable_periodic_task.h +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2018 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef RTC_BASE_CANCELABLE_PERIODIC_TASK_H_ -#define RTC_BASE_CANCELABLE_PERIODIC_TASK_H_ - -#include -#include -#include - -#include "absl/memory/memory.h" -#include "rtc_base/cancelable_task_handle.h" -#include "rtc_base/checks.h" -#include "rtc_base/logging.h" -#include "rtc_base/numerics/safe_conversions.h" - -namespace rtc { -namespace cancelable_periodic_task_internal { -// CancelablePeriodicTask runs a closure multiple times with delay decided -// by the return value of the closure itself. -// The task can be canceled with the handle returned by GetCancelationHandle(). -// Note that the task can only be canceled on the task queue where it runs. -template -class CancelablePeriodicTask final : public BaseCancelableTask { - public: - // |closure| should return time in ms until next run. - explicit CancelablePeriodicTask(Closure&& closure) - : closure_(std::forward(closure)) {} - CancelablePeriodicTask(const CancelablePeriodicTask&) = delete; - CancelablePeriodicTask& operator=(const CancelablePeriodicTask&) = delete; - ~CancelablePeriodicTask() override = default; - - private: - bool Run() override { - // See QueuedTask::Run documentaion for return values meaning. - if (BaseCancelableTask::Canceled()) - return true; // Caller retains ownership of `this`, and will destroy it. - // Run the actual task. - auto delay = closure_(); - // Convert closure_() return type into uint32_t. - uint32_t delay_ms = 0; - if (rtc::IsValueInRangeForNumericType(delay)) { - delay_ms = static_cast(delay); - } else { - // Log and recover in production. - RTC_LOG(LS_ERROR) << "Invalid delay until next run: " << delay; - delay_ms = rtc::saturated_cast(delay); - // But crash in debug. - RTC_DCHECK(false); - } - // Reschedule. - auto owned_task = absl::WrapUnique(this); - if (delay_ms == 0) - TaskQueue::Current()->PostTask(std::move(owned_task)); - else - TaskQueue::Current()->PostDelayedTask(std::move(owned_task), delay_ms); - return false; // Caller will release ownership of `this`. - } - - Closure closure_; -}; -} // namespace cancelable_periodic_task_internal - -template -std::unique_ptr CreateCancelablePeriodicTask( - Closure&& closure) { - using CleanedClosure = typename std::remove_cv< - typename std::remove_reference::type>::type; - return absl::make_unique>( - std::forward(closure)); -} - -} // namespace rtc - -#endif // RTC_BASE_CANCELABLE_PERIODIC_TASK_H_ diff --git a/rtc_base/cancelable_periodic_task_unittest.cc b/rtc_base/cancelable_periodic_task_unittest.cc deleted file mode 100644 index 9f644ea91c..0000000000 --- a/rtc_base/cancelable_periodic_task_unittest.cc +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright 2018 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "rtc_base/cancelable_periodic_task.h" - -#include "absl/memory/memory.h" -#include "rtc_base/event.h" -#include "test/gmock.h" - -namespace { - -using ::testing::AtLeast; -using ::testing::Invoke; -using ::testing::NiceMock; -using ::testing::MockFunction; -using ::testing::Return; - -constexpr int kTimeoutMs = 1000; - -class MockClosure { - public: - MOCK_METHOD0(Call, int()); - MOCK_METHOD0(Delete, void()); -}; - -class MoveOnlyClosure { - public: - explicit MoveOnlyClosure(MockClosure* mock) : mock_(mock) {} - MoveOnlyClosure(const MoveOnlyClosure&) = delete; - MoveOnlyClosure(MoveOnlyClosure&& other) : mock_(other.mock_) { - other.mock_ = nullptr; - } - ~MoveOnlyClosure() { - if (mock_) - mock_->Delete(); - } - int operator()() { return mock_->Call(); } - - private: - MockClosure* mock_; -}; - -class CopyableClosure { - public: - explicit CopyableClosure(MockClosure* mock) : mock_(mock) {} - CopyableClosure(const CopyableClosure& other) : mock_(other.mock_) {} - ~CopyableClosure() { - if (mock_) { - mock_->Delete(); - mock_ = nullptr; - } - } - int operator()() { return mock_->Call(); } - - private: - MockClosure* mock_; -}; - -TEST(CancelablePeriodicTaskTest, CanCallCancelOnEmptyHandle) { - rtc::CancelableTaskHandle handle; - handle.Cancel(); -} - -TEST(CancelablePeriodicTaskTest, CancelTaskBeforeItRuns) { - rtc::Event done; - MockClosure mock; - EXPECT_CALL(mock, Call).Times(0); - EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); - - rtc::TaskQueue task_queue("queue"); - - auto task = rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&mock)); - rtc::CancelableTaskHandle handle = task->GetCancellationHandle(); - task_queue.PostTask([handle] { handle.Cancel(); }); - task_queue.PostTask(std::move(task)); - - EXPECT_TRUE(done.Wait(kTimeoutMs)); -} - -TEST(CancelablePeriodicTaskTest, CancelDelayedTaskBeforeItRuns) { - rtc::Event done; - MockClosure mock; - EXPECT_CALL(mock, Call).Times(0); - EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); - - rtc::TaskQueue task_queue("queue"); - - auto task = rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&mock)); - rtc::CancelableTaskHandle handle = task->GetCancellationHandle(); - task_queue.PostDelayedTask(std::move(task), 100); - task_queue.PostTask([handle] { handle.Cancel(); }); - - EXPECT_TRUE(done.Wait(kTimeoutMs)); -} - -TEST(CancelablePeriodicTaskTest, CancelTaskAfterItRuns) { - rtc::Event done; - MockClosure mock; - EXPECT_CALL(mock, Call).WillOnce(Return(100)); - EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); - - rtc::TaskQueue task_queue("queue"); - - auto task = rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&mock)); - rtc::CancelableTaskHandle handle = task->GetCancellationHandle(); - task_queue.PostTask(std::move(task)); - task_queue.PostTask([handle] { handle.Cancel(); }); - - EXPECT_TRUE(done.Wait(kTimeoutMs)); -} - -TEST(CancelablePeriodicTaskTest, ZeroReturnValueRepostsTheTask) { - NiceMock closure; - rtc::Event done; - EXPECT_CALL(closure, Call()).WillOnce(Return(0)).WillOnce(Invoke([&done] { - done.Set(); - return kTimeoutMs; - })); - rtc::TaskQueue task_queue("queue"); - task_queue.PostTask( - rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&closure))); - EXPECT_TRUE(done.Wait(kTimeoutMs)); -} - -TEST(CancelablePeriodicTaskTest, StartPeriodicTask) { - MockFunction closure; - rtc::Event done; - EXPECT_CALL(closure, Call()) - .WillOnce(Return(20)) - .WillOnce(Return(20)) - .WillOnce(Invoke([&done] { - done.Set(); - return kTimeoutMs; - })); - rtc::TaskQueue task_queue("queue"); - task_queue.PostTask( - rtc::CreateCancelablePeriodicTask(closure.AsStdFunction())); - EXPECT_TRUE(done.Wait(kTimeoutMs)); -} - -// Validates perfect forwarding doesn't keep reference to deleted copy. -TEST(CancelablePeriodicTaskTest, CreateWithCopyOfAClosure) { - rtc::Event done; - MockClosure mock; - EXPECT_CALL(mock, Call).WillOnce(Invoke([&done] { - done.Set(); - return kTimeoutMs; - })); - EXPECT_CALL(mock, Delete).Times(AtLeast(2)); - CopyableClosure closure(&mock); - std::unique_ptr task; - { - CopyableClosure copy = closure; - task = rtc::CreateCancelablePeriodicTask(copy); - } - - rtc::TaskQueue task_queue("queue"); - task_queue.PostTask(std::move(task)); - EXPECT_TRUE(done.Wait(kTimeoutMs)); -} - -TEST(CancelablePeriodicTaskTest, DeletingHandleDoesntStopTheTask) { - rtc::Event run; - rtc::TaskQueue task_queue("queue"); - auto task = rtc::CreateCancelablePeriodicTask(([&] { - run.Set(); - return kTimeoutMs; - })); - rtc::CancelableTaskHandle handle = task->GetCancellationHandle(); - handle = {}; // delete the handle. - task_queue.PostTask(std::move(task)); - EXPECT_TRUE(run.Wait(kTimeoutMs)); -} - -// Example to test there are no thread races and use after free for suggested -// typical usage of the CancelablePeriodicTask -TEST(CancelablePeriodicTaskTest, Example) { - class ObjectOnTaskQueue { - public: - void DoPeriodicTask() {} - int TimeUntilNextRunMs() { return 100; } - - rtc::CancelableTaskHandle StartPeriodicTask(rtc::TaskQueue* task_queue) { - auto periodic_task = rtc::CreateCancelablePeriodicTask([this] { - DoPeriodicTask(); - return TimeUntilNextRunMs(); - }); - rtc::CancelableTaskHandle handle = periodic_task->GetCancellationHandle(); - task_queue->PostTask(std::move(periodic_task)); - return handle; - } - }; - - rtc::TaskQueue task_queue("queue"); - - auto object = absl::make_unique(); - // Create and start the periodic task. - rtc::CancelableTaskHandle handle = object->StartPeriodicTask(&task_queue); - - // Restart the task - task_queue.PostTask([handle] { handle.Cancel(); }); - handle = object->StartPeriodicTask(&task_queue); - - // Stop the task and destroy the object. - struct Destructor { - void operator()() { - // Cancel must be run on the task_queue, but if task failed to start - // because of task queue destruction, there is no need to run Cancel. - handle.Cancel(); - } - // Destruction will happen either on the task queue or because task - // queue is destroyed. - - std::unique_ptr object; - rtc::CancelableTaskHandle handle; - }; - task_queue.PostTask(Destructor{std::move(object), std::move(handle)}); - // Do not wait for the Destructor closure in order to create a race between - // task queue destruction and running the Desctructor closure. -} - -} // namespace diff --git a/rtc_base/cancelable_task_handle.cc b/rtc_base/cancelable_task_handle.cc deleted file mode 100644 index 06c3e80e92..0000000000 --- a/rtc_base/cancelable_task_handle.cc +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2018 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "rtc_base/cancelable_task_handle.h" - -#include - -#include "rtc_base/checks.h" -#include "rtc_base/ref_count.h" -#include "rtc_base/ref_counter.h" -#include "rtc_base/sequenced_task_checker.h" -#include "rtc_base/thread_annotations.h" -#include "rtc_base/thread_checker.h" - -namespace rtc { - -class CancelableTaskHandle::CancellationToken { - public: - CancellationToken() : canceled_(false), ref_count_(0) { checker_.Detach(); } - CancellationToken(const CancellationToken&) = delete; - CancellationToken& operator=(const CancellationToken&) = delete; - - void Cancel() { - RTC_DCHECK_RUN_ON(&checker_); - canceled_ = true; - } - - bool Canceled() { - RTC_DCHECK_RUN_ON(&checker_); - return canceled_; - } - - void AddRef() { ref_count_.IncRef(); } - - void Release() { - if (ref_count_.DecRef() == rtc::RefCountReleaseStatus::kDroppedLastRef) - delete this; - } - - private: - ~CancellationToken() = default; - - rtc::SequencedTaskChecker checker_; - bool canceled_ RTC_GUARDED_BY(checker_); - webrtc::webrtc_impl::RefCounter ref_count_; -}; - -CancelableTaskHandle::CancelableTaskHandle() = default; -CancelableTaskHandle::CancelableTaskHandle(const CancelableTaskHandle&) = - default; -CancelableTaskHandle::CancelableTaskHandle(CancelableTaskHandle&&) = default; -CancelableTaskHandle& CancelableTaskHandle::operator=( - const CancelableTaskHandle&) = default; -CancelableTaskHandle& CancelableTaskHandle::operator=(CancelableTaskHandle&&) = - default; -CancelableTaskHandle::~CancelableTaskHandle() = default; - -void CancelableTaskHandle::Cancel() const { - if (cancellation_token_.get() != nullptr) - cancellation_token_->Cancel(); -} - -CancelableTaskHandle::CancelableTaskHandle( - rtc::scoped_refptr cancellation_token) - : cancellation_token_(std::move(cancellation_token)) {} - -BaseCancelableTask::~BaseCancelableTask() = default; - -CancelableTaskHandle BaseCancelableTask::GetCancellationHandle() const { - return CancelableTaskHandle(cancellation_token_); -} - -BaseCancelableTask::BaseCancelableTask() - : cancellation_token_(new CancelableTaskHandle::CancellationToken) {} - -bool BaseCancelableTask::Canceled() const { - return cancellation_token_->Canceled(); -} - -} // namespace rtc diff --git a/rtc_base/cancelable_task_handle.h b/rtc_base/cancelable_task_handle.h deleted file mode 100644 index 3b1f0d5a6b..0000000000 --- a/rtc_base/cancelable_task_handle.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2018 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef RTC_BASE_CANCELABLE_TASK_HANDLE_H_ -#define RTC_BASE_CANCELABLE_TASK_HANDLE_H_ - -#include "rtc_base/scoped_ref_ptr.h" -#include "rtc_base/task_queue.h" - -namespace rtc { - -class BaseCancelableTask; - -// Allows to cancel a cancelable task. Non-empty handle can be acquired by -// calling GetCancellationHandle() on a cancelable task. -class CancelableTaskHandle { - public: - // This class is copyable and cheaply movable. - CancelableTaskHandle(); - CancelableTaskHandle(const CancelableTaskHandle&); - CancelableTaskHandle(CancelableTaskHandle&&); - CancelableTaskHandle& operator=(const CancelableTaskHandle&); - CancelableTaskHandle& operator=(CancelableTaskHandle&&); - // Deleting the handler doesn't Cancel the task. - ~CancelableTaskHandle(); - - // Prevents the cancelable task to run. - // Must be executed on the same task queue as the task itself. - void Cancel() const; - - private: - friend class BaseCancelableTask; - class CancellationToken; - explicit CancelableTaskHandle( - rtc::scoped_refptr cancelation_token); - - rtc::scoped_refptr cancellation_token_; -}; - -class BaseCancelableTask : public QueuedTask { - public: - ~BaseCancelableTask() override; - - CancelableTaskHandle GetCancellationHandle() const; - - protected: - BaseCancelableTask(); - - bool Canceled() const; - - private: - rtc::scoped_refptr - cancellation_token_; -}; - -} // namespace rtc - -#endif // RTC_BASE_CANCELABLE_TASK_HANDLE_H_ diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn new file mode 100644 index 0000000000..860bc84a32 --- /dev/null +++ b/rtc_base/task_utils/BUILD.gn @@ -0,0 +1,41 @@ +# Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +import("../../webrtc.gni") + +rtc_source_set("repeating_task") { + sources = [ + "repeating_task.cc", + "repeating_task.h", + ] + deps = [ + "..:logging", + "..:rtc_task_queue_api", + "..:sequenced_task_checker", + "..:thread_checker", + "..:timeutils", + "../../api/units:time_delta", + "../../api/units:timestamp", + "//third_party/abseil-cpp/absl/memory", + ] +} + +if (rtc_include_tests) { + rtc_source_set("repeating_task_unittests") { + testonly = true + sources = [ + "repeating_task_unittest.cc", + ] + deps = [ + ":repeating_task", + "..:rtc_base_approved", + "../../test:test_support", + "//third_party/abseil-cpp/absl/memory", + ] + } +} diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc new file mode 100644 index 0000000000..874dcc4013 --- /dev/null +++ b/rtc_base/task_utils/repeating_task.cc @@ -0,0 +1,119 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_utils/repeating_task.h" +#include "rtc_base/logging.h" +#include "rtc_base/timeutils.h" + +namespace webrtc { +namespace webrtc_repeating_task_impl { +RepeatingTaskBase::RepeatingTaskBase(rtc::TaskQueue* task_queue, + TimeDelta first_delay) + : task_queue_(task_queue), + next_run_time_(Timestamp::us(rtc::TimeMicros()) + first_delay) {} + +RepeatingTaskBase::~RepeatingTaskBase() = default; + +bool RepeatingTaskBase::Run() { + RTC_DCHECK_RUN_ON(task_queue_); + // Return true to tell the TaskQueue to destruct this object. + if (next_run_time_.IsPlusInfinity()) + return true; + + TimeDelta delay = RunClosure(); + RTC_DCHECK(delay.IsFinite()); + + // The closure might have stopped this task, in which case we return true to + // destruct this object. + if (next_run_time_.IsPlusInfinity()) + return true; + + TimeDelta lost_time = Timestamp::us(rtc::TimeMicros()) - next_run_time_; + next_run_time_ += delay; + delay -= lost_time; + + if (delay <= TimeDelta::Zero()) { + task_queue_->PostTask(absl::WrapUnique(this)); + } else { + task_queue_->PostDelayedTask(absl::WrapUnique(this), delay.ms()); + } + // Return false to tell the TaskQueue to not destruct this object since we + // have taken ownership with absl::WrapUnique. + return false; +} + +void RepeatingTaskBase::Stop() { + RTC_DCHECK(next_run_time_.IsFinite()); + next_run_time_ = Timestamp::PlusInfinity(); +} + +void RepeatingTaskBase::PostStop() { + if (task_queue_->IsCurrent()) { + RTC_DLOG(LS_INFO) << "Using PostStop() from the task queue running the " + "repeated task. Consider calling Stop() instead."; + } + task_queue_->PostTask([this] { + RTC_DCHECK_RUN_ON(task_queue_); + Stop(); + }); +} + +} // namespace webrtc_repeating_task_impl +RepeatingTaskHandle::RepeatingTaskHandle() { + sequence_checker_.Detach(); +} +RepeatingTaskHandle::~RepeatingTaskHandle() { + sequence_checker_.Detach(); +} + +RepeatingTaskHandle::RepeatingTaskHandle(RepeatingTaskHandle&& other) + : repeating_task_(other.repeating_task_) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + other.repeating_task_ = nullptr; +} + +RepeatingTaskHandle& RepeatingTaskHandle::operator=( + RepeatingTaskHandle&& other) { + RTC_DCHECK_RUN_ON(&other.sequence_checker_); + { + RTC_DCHECK_RUN_ON(&sequence_checker_); + repeating_task_ = other.repeating_task_; + } + other.repeating_task_ = nullptr; + return *this; +} + +RepeatingTaskHandle::RepeatingTaskHandle( + webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task) + : repeating_task_(repeating_task) {} + +void RepeatingTaskHandle::Stop() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (repeating_task_) { + RTC_DCHECK_RUN_ON(repeating_task_->task_queue_); + repeating_task_->Stop(); + repeating_task_ = nullptr; + } +} + +void RepeatingTaskHandle::PostStop() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (repeating_task_) { + repeating_task_->PostStop(); + repeating_task_ = nullptr; + } +} + +bool RepeatingTaskHandle::Running() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + return repeating_task_ != nullptr; +} + +} // namespace webrtc diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h new file mode 100644 index 0000000000..c4e760a55d --- /dev/null +++ b/rtc_base/task_utils/repeating_task.h @@ -0,0 +1,153 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_UTILS_REPEATING_TASK_H_ +#define RTC_BASE_TASK_UTILS_REPEATING_TASK_H_ + +#include +#include + +#include "absl/memory/memory.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "rtc_base/sequenced_task_checker.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/thread_checker.h" + +namespace webrtc { + +class RepeatingTaskHandle; + +namespace webrtc_repeating_task_impl { +class RepeatingTaskBase : public rtc::QueuedTask { + public: + RepeatingTaskBase(rtc::TaskQueue* task_queue, TimeDelta first_delay); + ~RepeatingTaskBase() override; + virtual TimeDelta RunClosure() = 0; + + private: + friend class ::webrtc::RepeatingTaskHandle; + + bool Run() final; + void Stop() RTC_RUN_ON(task_queue_); + void PostStop(); + + rtc::TaskQueue* const task_queue_; + // This is always finite, except for the special case where it's PlusInfinity + // to signal that the task should stop. + Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); +}; + +// The template closure pattern is based on rtc::ClosureTask. +template +class RepeatingTaskImpl final : public RepeatingTaskBase { + public: + RepeatingTaskImpl(rtc::TaskQueue* task_queue, + TimeDelta first_delay, + Closure&& closure) + : RepeatingTaskBase(task_queue, first_delay), + closure_(std::forward(closure)) { + static_assert( + std::is_same::type>::value, + ""); + } + + TimeDelta RunClosure() override { return closure_(); } + + private: + typename std::remove_const< + typename std::remove_reference::type>::type closure_; +}; +} // namespace webrtc_repeating_task_impl + +// Allows starting tasks that repeat themselves on a TaskQueue indefinately +// until they are stopped or the TaskQueue is destroyed. It allows starting and +// stopping multiple times, but you must stop one task before starting another +// and it can only be stopped when in the running state. The public interface is +// not thread safe. +class RepeatingTaskHandle { + public: + RepeatingTaskHandle(); + ~RepeatingTaskHandle(); + RepeatingTaskHandle(RepeatingTaskHandle&& other); + RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other); + RepeatingTaskHandle(const RepeatingTaskHandle&) = delete; + RepeatingTaskHandle& operator=(const RepeatingTaskHandle&) = delete; + + // Start can be used to start a task that will be reposted with a delay + // determined by the return value of the provided closure. The actual task is + // owned by the TaskQueue and will live until it has been stopped or the + // TaskQueue is destroyed. Note that this means that trying to stop the + // repeating task after the TaskQueue is destroyed is an error. However, it's + // perfectly fine to destroy the handle while the task is running, since the + // repeated task is owned by the TaskQueue. + template + static RepeatingTaskHandle Start(rtc::TaskQueue* task_queue, + Closure&& closure) { + auto repeating_task = absl::make_unique< + webrtc_repeating_task_impl::RepeatingTaskImpl>( + task_queue, TimeDelta::Zero(), std::forward(closure)); + auto* repeating_task_ptr = repeating_task.get(); + task_queue->PostTask(std::move(repeating_task)); + return RepeatingTaskHandle(repeating_task_ptr); + } + template + static RepeatingTaskHandle Start(Closure&& closure) { + return Start(rtc::TaskQueue::Current(), std::forward(closure)); + } + + // DelayedStart is equivalent to Start except that the first invocation of the + // closure will be delayed by the given amount. + template + static RepeatingTaskHandle DelayedStart(rtc::TaskQueue* task_queue, + TimeDelta first_delay, + Closure&& closure) { + auto repeating_task = absl::make_unique< + webrtc_repeating_task_impl::RepeatingTaskImpl>( + task_queue, first_delay, std::forward(closure)); + auto* repeating_task_ptr = repeating_task.get(); + task_queue->PostDelayedTask(std::move(repeating_task), first_delay.ms()); + return RepeatingTaskHandle(repeating_task_ptr); + } + template + static RepeatingTaskHandle DelayedStart(TimeDelta first_delay, + Closure&& closure) { + return DelayedStart(rtc::TaskQueue::Current(), first_delay, + std::forward(closure)); + } + + // Stops future invocations of the repeating task closure. Can only be called + // from the TaskQueue where the task is running. The closure is guaranteed to + // not be running after Stop() returns unless Stop() is called from the + // closure itself. + void Stop(); + + // Stops future invocations of the repeating task closure. The closure might + // still be running when PostStop() returns, but there will be no future + // invocation. + void PostStop(); + + // Returns true if Start() or DelayedStart() was called most recently. Returns + // false initially and if Stop() or PostStop() was called most recently. + bool Running() const; + + private: + explicit RepeatingTaskHandle( + webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task); + rtc::SequencedTaskChecker sequence_checker_; + // Owned by the task queue. + webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task_ + RTC_GUARDED_BY(sequence_checker_) = nullptr; +}; + +} // namespace webrtc +#endif // RTC_BASE_TASK_UTILS_REPEATING_TASK_H_ diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc new file mode 100644 index 0000000000..79bce0616a --- /dev/null +++ b/rtc_base/task_utils/repeating_task_unittest.cc @@ -0,0 +1,223 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include // Not allowed in production per Chromium style guide. +#include +#include // Not allowed in production per Chromium style guide. + +#include "absl/memory/memory.h" +#include "rtc_base/event.h" +#include "rtc_base/task_utils/repeating_task.h" +#include "test/gmock.h" +#include "test/gtest.h" + +// NOTE: Since these tests rely on real time behavior, they will be flaky +// if run on heavily loaded systems. +namespace webrtc { +namespace { +using ::testing::AtLeast; +using ::testing::Invoke; +using ::testing::MockFunction; +using ::testing::NiceMock; +using ::testing::Return; + +constexpr TimeDelta kTimeout = TimeDelta::Millis<1000>(); + +void Sleep(TimeDelta time_delta) { + // Note that Chromium style guide prohibits use of and in + // production code, used here since webrtc::SleepMs may return early. + std::this_thread::sleep_for(std::chrono::microseconds(time_delta.us())); +} + +class MockClosure { + public: + MOCK_METHOD0(Call, TimeDelta()); + MOCK_METHOD0(Delete, void()); +}; + +class MoveOnlyClosure { + public: + explicit MoveOnlyClosure(MockClosure* mock) : mock_(mock) {} + MoveOnlyClosure(const MoveOnlyClosure&) = delete; + MoveOnlyClosure(MoveOnlyClosure&& other) : mock_(other.mock_) { + other.mock_ = nullptr; + } + ~MoveOnlyClosure() { + if (mock_) + mock_->Delete(); + } + TimeDelta operator()() { return mock_->Call(); } + + private: + MockClosure* mock_; +}; +} // namespace + +TEST(RepeatingTaskTest, TaskIsStoppedOnStop) { + const TimeDelta kShortInterval = TimeDelta::ms(5); + const TimeDelta kLongInterval = TimeDelta::ms(20); + const int kShortIntervalCount = 4; + const int kMargin = 1; + + rtc::TaskQueue task_queue("TestQueue"); + std::atomic_int counter(0); + auto handle = RepeatingTaskHandle::Start(&task_queue, [&] { + if (++counter >= kShortIntervalCount) + return kLongInterval; + return kShortInterval; + }); + // Sleep long enough to go through the initial phase. + Sleep(kShortInterval * (kShortIntervalCount + kMargin)); + EXPECT_EQ(counter.load(), kShortIntervalCount); + + handle.PostStop(); + // Sleep long enough that the task would run at least once more if not + // stopped. + Sleep(kLongInterval * 2); + EXPECT_EQ(counter.load(), kShortIntervalCount); +} + +TEST(RepeatingTaskTest, CompensatesForLongRunTime) { + const int kTargetCount = 20; + const int kTargetCountMargin = 2; + const TimeDelta kRepeatInterval = TimeDelta::ms(2); + // Sleeping inside the task for longer than the repeat interval once, should + // be compensated for by repeating the task faster to catch up. + const TimeDelta kSleepDuration = TimeDelta::ms(20); + const int kSleepAtCount = 3; + + std::atomic_int counter(0); + rtc::TaskQueue task_queue("TestQueue"); + RepeatingTaskHandle::Start(&task_queue, [&] { + if (++counter == kSleepAtCount) + Sleep(kSleepDuration); + return kRepeatInterval; + }); + Sleep(kRepeatInterval * kTargetCount); + // Execution time should not have affected the run count, + // but we allow some margin to reduce flakiness. + EXPECT_GE(counter.load(), kTargetCount - kTargetCountMargin); +} + +TEST(RepeatingTaskTest, CompensatesForShortRunTime) { + std::atomic_int counter(0); + rtc::TaskQueue task_queue("TestQueue"); + RepeatingTaskHandle::Start(&task_queue, [&] { + ++counter; + // Sleeping for the 5 ms should be compensated. + Sleep(TimeDelta::ms(5)); + return TimeDelta::ms(10); + }); + Sleep(TimeDelta::ms(15)); + // We expect that the task have been called twice, once directly at Start and + // once after 10 ms has passed. + EXPECT_EQ(counter.load(), 2); +} + +TEST(RepeatingTaskTest, CancelDelayedTaskBeforeItRuns) { + rtc::Event done; + MockClosure mock; + EXPECT_CALL(mock, Call).Times(0); + EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); + rtc::TaskQueue task_queue("queue"); + auto handle = RepeatingTaskHandle::DelayedStart( + &task_queue, TimeDelta::ms(100), MoveOnlyClosure(&mock)); + handle.PostStop(); + EXPECT_TRUE(done.Wait(kTimeout.ms())); +} + +TEST(RepeatingTaskTest, CancelTaskAfterItRuns) { + rtc::Event done; + MockClosure mock; + EXPECT_CALL(mock, Call).WillOnce(Return(TimeDelta::ms(100))); + EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); + rtc::TaskQueue task_queue("queue"); + auto handle = RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&mock)); + handle.PostStop(); + EXPECT_TRUE(done.Wait(kTimeout.ms())); +} + +TEST(RepeatingTaskTest, TaskCanStopItself) { + std::atomic_int counter(0); + rtc::TaskQueue task_queue("TestQueue"); + RepeatingTaskHandle handle; + task_queue.PostTask([&] { + handle = RepeatingTaskHandle::Start(&task_queue, [&] { + ++counter; + handle.Stop(); + return TimeDelta::ms(2); + }); + }); + Sleep(TimeDelta::ms(10)); + EXPECT_EQ(counter.load(), 1); +} + +TEST(RepeatingTaskTest, ZeroReturnValueRepostsTheTask) { + NiceMock closure; + rtc::Event done; + EXPECT_CALL(closure, Call()) + .WillOnce(Return(TimeDelta::Zero())) + .WillOnce(Invoke([&done] { + done.Set(); + return kTimeout; + })); + rtc::TaskQueue task_queue("queue"); + RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&closure)); + EXPECT_TRUE(done.Wait(kTimeout.ms())); +} + +TEST(RepeatingTaskTest, StartPeriodicTask) { + MockFunction closure; + rtc::Event done; + EXPECT_CALL(closure, Call()) + .WillOnce(Return(TimeDelta::ms(20))) + .WillOnce(Return(TimeDelta::ms(20))) + .WillOnce(Invoke([&done] { + done.Set(); + return kTimeout; + })); + rtc::TaskQueue task_queue("queue"); + RepeatingTaskHandle::Start(&task_queue, closure.AsStdFunction()); + EXPECT_TRUE(done.Wait(kTimeout.ms())); +} + +TEST(RepeatingTaskTest, Example) { + class ObjectOnTaskQueue { + public: + void DoPeriodicTask() {} + TimeDelta TimeUntilNextRun() { return TimeDelta::ms(100); } + void StartPeriodicTask(RepeatingTaskHandle* handle, + rtc::TaskQueue* task_queue) { + *handle = RepeatingTaskHandle::Start(task_queue, [this] { + DoPeriodicTask(); + return TimeUntilNextRun(); + }); + } + }; + rtc::TaskQueue task_queue("queue"); + auto object = absl::make_unique(); + // Create and start the periodic task. + RepeatingTaskHandle handle; + object->StartPeriodicTask(&handle, &task_queue); + // Restart the task + handle.PostStop(); + object->StartPeriodicTask(&handle, &task_queue); + handle.PostStop(); + struct Destructor { + void operator()() { object.reset(); } + std::unique_ptr object; + }; + task_queue.PostTask(Destructor{std::move(object)}); + // Do not wait for the destructor closure in order to create a race between + // task queue destruction and running the desctructor closure. +} + +} // namespace webrtc diff --git a/test/BUILD.gn b/test/BUILD.gn index 70e554f9c6..fad00e26c2 100644 --- a/test/BUILD.gn +++ b/test/BUILD.gn @@ -73,6 +73,7 @@ rtc_source_set("video_test_common") { "../rtc_base:checks", "../rtc_base:rtc_base", "../rtc_base:rtc_task_queue", + "../rtc_base/task_utils:repeating_task", "../system_wrappers", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", diff --git a/test/frame_generator_capturer.cc b/test/frame_generator_capturer.cc index e1b5c80624..aa3f944e13 100644 --- a/test/frame_generator_capturer.cc +++ b/test/frame_generator_capturer.cc @@ -21,56 +21,13 @@ #include "rtc_base/critical_section.h" #include "rtc_base/logging.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/clock.h" namespace webrtc { namespace test { -class FrameGeneratorCapturer::InsertFrameTask : public rtc::QueuedTask { - public: - explicit InsertFrameTask(FrameGeneratorCapturer* frame_generator_capturer) - : frame_generator_capturer_(frame_generator_capturer), - repeat_interval_ms_(-1), - next_run_time_ms_(-1) {} - - private: - bool Run() override { - // Check if the frame interval for this - // task queue is the same same as the current configured frame rate. - int interval_ms = - 1000 / frame_generator_capturer_->GetCurrentConfiguredFramerate(); - if (repeat_interval_ms_ != interval_ms) { - // Restart the timer if frame rate has changed since task was started. - next_run_time_ms_ = rtc::TimeMillis(); - repeat_interval_ms_ = interval_ms; - } - // Schedule the next frame capture event to happen at approximately the - // correct absolute time point. - next_run_time_ms_ += interval_ms; - - frame_generator_capturer_->InsertFrame(); - - int64_t now_ms = rtc::TimeMillis(); - if (next_run_time_ms_ < now_ms) { - RTC_LOG(LS_ERROR) << "Frame Generator Capturer can't keep up with " - "requested fps."; - rtc::TaskQueue::Current()->PostTask(absl::WrapUnique(this)); - } else { - int64_t delay_ms = next_run_time_ms_ - now_ms; - RTC_DCHECK_GE(delay_ms, 0); - RTC_DCHECK_LE(delay_ms, interval_ms); - rtc::TaskQueue::Current()->PostDelayedTask(absl::WrapUnique(this), - delay_ms); - } - return false; - } - - webrtc::test::FrameGeneratorCapturer* const frame_generator_capturer_; - int repeat_interval_ms_; - int64_t next_run_time_ms_; -}; - FrameGeneratorCapturer* FrameGeneratorCapturer::Create( int width, int height, @@ -158,10 +115,12 @@ bool FrameGeneratorCapturer::Init() { if (frame_generator_.get() == nullptr) return false; - int framerate_fps = GetCurrentConfiguredFramerate(); - task_queue_.PostDelayedTask(absl::make_unique(this), - 1000 / framerate_fps); - + RepeatingTaskHandle::DelayedStart( + &task_queue_, TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(), + [this] { + InsertFrame(); + return TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(); + }); return true; } diff --git a/test/frame_generator_capturer.h b/test/frame_generator_capturer.h index 217858bfad..4e156545f0 100644 --- a/test/frame_generator_capturer.h +++ b/test/frame_generator_capturer.h @@ -84,8 +84,6 @@ class FrameGeneratorCapturer : public TestVideoCapturer { bool Init(); private: - class InsertFrameTask; - void InsertFrame(); static bool Run(void* obj); int GetCurrentConfiguredFramerate(); diff --git a/video/BUILD.gn b/video/BUILD.gn index f82668aeeb..1b3e698ea6 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -87,6 +87,7 @@ rtc_static_library("video") { "../rtc_base/experiments:alr_experiment", "../rtc_base/experiments:quality_scaling_experiment", "../rtc_base/system:fallthrough", + "../rtc_base/task_utils:repeating_task", "../system_wrappers:field_trial", "../system_wrappers:metrics", "//third_party/abseil-cpp/absl/memory", @@ -201,6 +202,7 @@ rtc_source_set("video_stream_encoder_impl") { "../rtc_base:timeutils", "../rtc_base/experiments:quality_scaling_experiment", "../rtc_base/system:fallthrough", + "../rtc_base/task_utils:repeating_task", "../system_wrappers:field_trial", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", diff --git a/video/overuse_frame_detector.cc b/video/overuse_frame_detector.cc index b214b39ad4..510d5e0250 100644 --- a/video/overuse_frame_detector.cc +++ b/video/overuse_frame_detector.cc @@ -518,43 +518,9 @@ OveruseFrameDetector::CreateProcessingUsage(const CpuOveruseOptions& options) { return instance; } -class OveruseFrameDetector::CheckOveruseTask : public rtc::QueuedTask { - public: - CheckOveruseTask(OveruseFrameDetector* overuse_detector, - AdaptationObserverInterface* observer) - : overuse_detector_(overuse_detector), observer_(observer) { - rtc::TaskQueue::Current()->PostDelayedTask( - std::unique_ptr(this), kTimeToFirstCheckForOveruseMs); - } - - void Stop() { - RTC_CHECK(task_checker_.CalledSequentially()); - overuse_detector_ = nullptr; - } - - private: - bool Run() override { - RTC_CHECK(task_checker_.CalledSequentially()); - if (!overuse_detector_) - return true; // This will make the task queue delete this task. - overuse_detector_->CheckForOveruse(observer_); - - rtc::TaskQueue::Current()->PostDelayedTask( - std::unique_ptr(this), kCheckForOveruseIntervalMs); - // Return false to prevent this task from being deleted. Ownership has been - // transferred to the task queue when PostDelayedTask was called. - return false; - } - rtc::SequencedTaskChecker task_checker_; - OveruseFrameDetector* overuse_detector_; - // Observer getting overuse reports. - AdaptationObserverInterface* observer_; -}; - OveruseFrameDetector::OveruseFrameDetector( CpuOveruseMetricsObserver* metrics_observer) - : check_overuse_task_(nullptr), - metrics_observer_(metrics_observer), + : metrics_observer_(metrics_observer), num_process_times_(0), // TODO(nisse): Use absl::optional last_capture_time_us_(-1), @@ -569,26 +535,25 @@ OveruseFrameDetector::OveruseFrameDetector( task_checker_.Detach(); } -OveruseFrameDetector::~OveruseFrameDetector() { - RTC_DCHECK(!check_overuse_task_) << "StopCheckForOverUse must be called."; -} +OveruseFrameDetector::~OveruseFrameDetector() {} void OveruseFrameDetector::StartCheckForOveruse( const CpuOveruseOptions& options, AdaptationObserverInterface* overuse_observer) { RTC_DCHECK_CALLED_SEQUENTIALLY(&task_checker_); - RTC_DCHECK(!check_overuse_task_); + RTC_DCHECK(!check_overuse_task_.Running()); RTC_DCHECK(overuse_observer != nullptr); SetOptions(options); - check_overuse_task_ = new CheckOveruseTask(this, overuse_observer); + check_overuse_task_ = RepeatingTaskHandle::DelayedStart( + TimeDelta::ms(kTimeToFirstCheckForOveruseMs), [this, overuse_observer] { + CheckForOveruse(overuse_observer); + return TimeDelta::ms(kCheckForOveruseIntervalMs); + }); } void OveruseFrameDetector::StopCheckForOveruse() { RTC_DCHECK_CALLED_SEQUENTIALLY(&task_checker_); - if (check_overuse_task_) { - check_overuse_task_->Stop(); - check_overuse_task_ = nullptr; - } + check_overuse_task_.Stop(); } void OveruseFrameDetector::EncodedFrameTimeMeasured(int encode_duration_ms) { diff --git a/video/overuse_frame_detector.h b/video/overuse_frame_detector.h index 3db8643500..d4a2c627e4 100644 --- a/video/overuse_frame_detector.h +++ b/video/overuse_frame_detector.h @@ -21,6 +21,7 @@ #include "rtc_base/numerics/exp_filter.h" #include "rtc_base/sequenced_task_checker.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -108,8 +109,6 @@ class OveruseFrameDetector { CpuOveruseOptions options_; private: - class CheckOveruseTask; - void EncodedFrameTimeMeasured(int encode_duration_ms); bool IsOverusing(int encode_usage_percent); bool IsUnderusing(int encode_usage_percent, int64_t time_now); @@ -124,7 +123,7 @@ class OveruseFrameDetector { rtc::SequencedTaskChecker task_checker_; // Owned by the task queue from where StartCheckForOveruse is called. - CheckOveruseTask* check_overuse_task_; + RepeatingTaskHandle check_overuse_task_ RTC_GUARDED_BY(task_checker_); // Stats metrics. CpuOveruseMetricsObserver* const metrics_observer_; diff --git a/video/video_send_stream_impl.cc b/video/video_send_stream_impl.cc index aa7696728b..f455d18de2 100644 --- a/video/video_send_stream_impl.cc +++ b/video/video_send_stream_impl.cc @@ -44,6 +44,8 @@ static constexpr int kMaxVbaSizeDifferencePercent = 10; // Max time we will throttle similar video bitrate allocations. static constexpr int64_t kMaxVbaThrottleTimeMs = 500; +constexpr TimeDelta kEncoderTimeOut = TimeDelta::Seconds<2>(); + bool TransportSeqNumExtensionConfigured(const VideoSendStream::Config& config) { const std::vector& extensions = config.rtp.extensions; return std::find_if( @@ -179,58 +181,6 @@ PacingConfig::PacingConfig() PacingConfig::PacingConfig(const PacingConfig&) = default; PacingConfig::~PacingConfig() = default; -// CheckEncoderActivityTask is used for tracking when the encoder last produced -// and encoded video frame. If the encoder has not produced anything the last -// kEncoderTimeOutMs we also want to stop sending padding. -class VideoSendStreamImpl::CheckEncoderActivityTask : public rtc::QueuedTask { - public: - static const int kEncoderTimeOutMs = 2000; - explicit CheckEncoderActivityTask( - const rtc::WeakPtr& send_stream) - : activity_(0), send_stream_(std::move(send_stream)), timed_out_(false) {} - - void Stop() { - RTC_CHECK(task_checker_.CalledSequentially()); - send_stream_.reset(); - } - - void UpdateEncoderActivity() { - // UpdateEncoderActivity is called from VideoSendStreamImpl::Encoded on - // whatever thread the real encoder implementation run on. In the case of - // hardware encoders, there might be several encoders - // running in parallel on different threads. - rtc::AtomicOps::ReleaseStore(&activity_, 1); - } - - private: - bool Run() override { - RTC_CHECK(task_checker_.CalledSequentially()); - if (!send_stream_) - return true; - if (!rtc::AtomicOps::AcquireLoad(&activity_)) { - if (!timed_out_) { - send_stream_->SignalEncoderTimedOut(); - } - timed_out_ = true; - } else if (timed_out_) { - send_stream_->SignalEncoderActive(); - timed_out_ = false; - } - rtc::AtomicOps::ReleaseStore(&activity_, 0); - - rtc::TaskQueue::Current()->PostDelayedTask( - std::unique_ptr(this), kEncoderTimeOutMs); - // Return false to prevent this task from being deleted. Ownership has been - // transferred to the task queue when PostDelayedTask was called. - return false; - } - volatile int activity_; - - rtc::SequencedTaskChecker task_checker_; - rtc::WeakPtr send_stream_; - bool timed_out_; -}; - VideoSendStreamImpl::VideoSendStreamImpl( SendStatisticsProxy* stats_proxy, rtc::TaskQueue* worker_queue, @@ -254,7 +204,6 @@ VideoSendStreamImpl::VideoSendStreamImpl( stats_proxy_(stats_proxy), config_(config), worker_queue_(worker_queue), - check_encoder_activity_task_(nullptr), call_stats_(call_stats), transport_(transport), bitrate_allocator_(bitrate_allocator), @@ -420,12 +369,25 @@ void VideoSendStreamImpl::StartupVideoSendStream() { encoder_bitrate_priority_, has_packet_feedback_}); // Start monitoring encoder activity. { - rtc::CritScope lock(&encoder_activity_crit_sect_); - RTC_DCHECK(!check_encoder_activity_task_); - check_encoder_activity_task_ = new CheckEncoderActivityTask(weak_ptr_); - worker_queue_->PostDelayedTask( - std::unique_ptr(check_encoder_activity_task_), - CheckEncoderActivityTask::kEncoderTimeOutMs); + RTC_DCHECK(!check_encoder_activity_task_.Running()); + + activity_ = false; + timed_out_ = false; + check_encoder_activity_task_ = + RepeatingTaskHandle::DelayedStart(kEncoderTimeOut, [this] { + RTC_DCHECK_RUN_ON(worker_queue_); + if (!activity_) { + if (!timed_out_) { + SignalEncoderTimedOut(); + } + timed_out_ = true; + } else if (timed_out_) { + SignalEncoderActive(); + timed_out_ = false; + } + activity_ = false; + return kEncoderTimeOut; + }); } video_stream_encoder_->SendKeyFrame(); @@ -443,18 +405,14 @@ void VideoSendStreamImpl::Stop() { void VideoSendStreamImpl::StopVideoSendStream() { bitrate_allocator_->RemoveObserver(this); - { - rtc::CritScope lock(&encoder_activity_crit_sect_); - check_encoder_activity_task_->Stop(); - check_encoder_activity_task_ = nullptr; - } + check_encoder_activity_task_.Stop(); video_stream_encoder_->OnBitrateUpdated(0, 0, 0); stats_proxy_->OnSetEncoderTargetRate(0); } void VideoSendStreamImpl::SignalEncoderTimedOut() { RTC_DCHECK_RUN_ON(worker_queue_); - // If the encoder has not produced anything the last kEncoderTimeOutMs and it + // If the encoder has not produced anything the last kEncoderTimeOut and it // is supposed to, deregister as BitrateAllocatorObserver. This can happen // if a camera stops producing frames. if (encoder_target_rate_bps_ > 0) { @@ -601,11 +559,9 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage( // Encoded is called on whatever thread the real encoder implementation run // on. In the case of hardware encoders, there might be several encoders // running in parallel on different threads. - { - rtc::CritScope lock(&encoder_activity_crit_sect_); - if (check_encoder_activity_task_) - check_encoder_activity_task_->UpdateEncoderActivity(); - } + + // Indicate that there still is activity going on. + activity_ = true; EncodedImageCallback::Result result(EncodedImageCallback::Result::OK); if (media_transport_) { diff --git a/video/video_send_stream_impl.h b/video/video_send_stream_impl.h index cfc332849a..a453228595 100644 --- a/video/video_send_stream_impl.h +++ b/video/video_send_stream_impl.h @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -35,6 +36,7 @@ #include "modules/video_coding/include/video_codec_interface.h" #include "rtc_base/critical_section.h" #include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/weak_ptr.h" #include "video/call_stats.h" @@ -107,8 +109,6 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver, absl::optional configured_pacing_factor_; private: - class CheckEncoderActivityTask; - // Implements BitrateAllocatorObserver. uint32_t OnBitrateUpdated(BitrateAllocationUpdate update) override; @@ -131,7 +131,7 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver, void StartupVideoSendStream(); // Removes the bitrate observer, stops monitoring and notifies the video // encoder of the bitrate update. - void StopVideoSendStream(); + void StopVideoSendStream() RTC_RUN_ON(worker_queue_); void ConfigureProtection(); void ConfigureSsrcs(); @@ -146,9 +146,11 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver, rtc::TaskQueue* const worker_queue_; - rtc::CriticalSection encoder_activity_crit_sect_; - CheckEncoderActivityTask* check_encoder_activity_task_ - RTC_GUARDED_BY(encoder_activity_crit_sect_); + RepeatingTaskHandle check_encoder_activity_task_ + RTC_GUARDED_BY(worker_queue_); + + std::atomic_bool activity_; + bool timed_out_ RTC_GUARDED_BY(worker_queue_); CallStats* const call_stats_; RtpTransportControllerSendInterface* const transport_;