diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index ca9a14a324..64a041908e 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -14,6 +14,7 @@ rtc_library("repeating_task") { "repeating_task.h", ] deps = [ + ":pending_task_safety_flag", ":to_queued_task", "..:logging", "..:timeutils", diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc index 574e6331f1..9636680cb4 100644 --- a/rtc_base/task_utils/repeating_task.cc +++ b/rtc_base/task_utils/repeating_task.cc @@ -12,32 +12,36 @@ #include "absl/memory/memory.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" namespace webrtc { namespace webrtc_repeating_task_impl { -RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue, - TimeDelta first_delay, - Clock* clock) +RepeatingTaskBase::RepeatingTaskBase( + TaskQueueBase* task_queue, + TimeDelta first_delay, + Clock* clock, + rtc::scoped_refptr alive_flag) : task_queue_(task_queue), clock_(clock), - next_run_time_(clock_->CurrentTime() + first_delay) {} + next_run_time_(clock_->CurrentTime() + first_delay), + alive_flag_(std::move(alive_flag)) {} RepeatingTaskBase::~RepeatingTaskBase() = default; bool RepeatingTaskBase::Run() { RTC_DCHECK_RUN_ON(task_queue_); // Return true to tell the TaskQueue to destruct this object. - if (next_run_time_.IsPlusInfinity()) + if (!alive_flag_->alive()) return true; TimeDelta delay = RunClosure(); // The closure might have stopped this task, in which case we return true to // destruct this object. - if (next_run_time_.IsPlusInfinity()) + if (!alive_flag_->alive()) return true; RTC_DCHECK(delay.IsFinite()); @@ -53,33 +57,11 @@ bool RepeatingTaskBase::Run() { return false; } -void RepeatingTaskBase::Stop() { - RTC_DCHECK_RUN_ON(task_queue_); - RTC_DCHECK(next_run_time_.IsFinite()); - next_run_time_ = Timestamp::PlusInfinity(); -} - } // namespace webrtc_repeating_task_impl -RepeatingTaskHandle::RepeatingTaskHandle(RepeatingTaskHandle&& other) - : repeating_task_(other.repeating_task_) { - other.repeating_task_ = nullptr; -} - -RepeatingTaskHandle& RepeatingTaskHandle::operator=( - RepeatingTaskHandle&& other) { - repeating_task_ = other.repeating_task_; - other.repeating_task_ = nullptr; - return *this; -} - -RepeatingTaskHandle::RepeatingTaskHandle( - webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task) - : repeating_task_(repeating_task) {} - void RepeatingTaskHandle::Stop() { if (repeating_task_) { - repeating_task_->Stop(); + repeating_task_->SetNotAlive(); repeating_task_ = nullptr; } } diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h index 487b7d19d4..d5066fdb5c 100644 --- a/rtc_base/task_utils/repeating_task.h +++ b/rtc_base/task_utils/repeating_task.h @@ -19,22 +19,19 @@ #include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "system_wrappers/include/clock.h" namespace webrtc { - -class RepeatingTaskHandle; - namespace webrtc_repeating_task_impl { class RepeatingTaskBase : public QueuedTask { public: RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay, - Clock* clock); + Clock* clock, + rtc::scoped_refptr alive_flag); ~RepeatingTaskBase() override; - void Stop(); - private: virtual TimeDelta RunClosure() = 0; @@ -42,9 +39,10 @@ class RepeatingTaskBase : public QueuedTask { TaskQueueBase* const task_queue_; Clock* const clock_; - // This is always finite, except for the special case where it's PlusInfinity - // to signal that the task should stop. + // This is always finite. Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); + rtc::scoped_refptr alive_flag_ + RTC_GUARDED_BY(task_queue_); }; // The template closure pattern is based on rtc::ClosureTask. @@ -54,8 +52,12 @@ class RepeatingTaskImpl final : public RepeatingTaskBase { RepeatingTaskImpl(TaskQueueBase* task_queue, TimeDelta first_delay, Closure&& closure, - Clock* clock) - : RepeatingTaskBase(task_queue, first_delay, clock), + Clock* clock, + rtc::scoped_refptr alive_flag) + : RepeatingTaskBase(task_queue, + first_delay, + clock, + std::move(alive_flag)), closure_(std::forward(closure)) { static_assert( std::is_same static RepeatingTaskHandle Start(TaskQueueBase* task_queue, Closure&& closure, Clock* clock = Clock::GetRealTimeClock()) { - auto repeating_task = std::make_unique< - webrtc_repeating_task_impl::RepeatingTaskImpl>( - task_queue, TimeDelta::Zero(), std::forward(closure), clock); - auto* repeating_task_ptr = repeating_task.get(); - task_queue->PostTask(std::move(repeating_task)); - return RepeatingTaskHandle(repeating_task_ptr); + auto alive_flag = PendingTaskSafetyFlag::CreateDetached(); + task_queue->PostTask( + std::make_unique< + webrtc_repeating_task_impl::RepeatingTaskImpl>( + task_queue, TimeDelta::Zero(), std::forward(closure), + clock, alive_flag)); + return RepeatingTaskHandle(std::move(alive_flag)); } // DelayedStart is equivalent to Start except that the first invocation of the @@ -113,12 +114,14 @@ class RepeatingTaskHandle { TimeDelta first_delay, Closure&& closure, Clock* clock = Clock::GetRealTimeClock()) { - auto repeating_task = std::make_unique< - webrtc_repeating_task_impl::RepeatingTaskImpl>( - task_queue, first_delay, std::forward(closure), clock); - auto* repeating_task_ptr = repeating_task.get(); - task_queue->PostDelayedTask(std::move(repeating_task), first_delay.ms()); - return RepeatingTaskHandle(repeating_task_ptr); + auto alive_flag = PendingTaskSafetyFlag::CreateDetached(); + task_queue->PostDelayedTask( + std::make_unique< + webrtc_repeating_task_impl::RepeatingTaskImpl>( + task_queue, first_delay, std::forward(closure), clock, + alive_flag), + first_delay.ms()); + return RepeatingTaskHandle(std::move(alive_flag)); } // Stops future invocations of the repeating task closure. Can only be called @@ -127,15 +130,15 @@ class RepeatingTaskHandle { // closure itself. void Stop(); - // Returns true if Start() or DelayedStart() was called most recently. Returns - // false initially and if Stop() or PostStop() was called most recently. + // Returns true until Stop() was called. + // Can only be called from the TaskQueue where the task is running. bool Running() const; private: explicit RepeatingTaskHandle( - webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task); - // Owned by the task queue. - webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task_ = nullptr; + rtc::scoped_refptr alive_flag) + : repeating_task_(std::move(alive_flag)) {} + rtc::scoped_refptr repeating_task_; }; } // namespace webrtc diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc index 2fb15d1e5a..b23284f988 100644 --- a/rtc_base/task_utils/repeating_task_unittest.cc +++ b/rtc_base/task_utils/repeating_task_unittest.cc @@ -276,4 +276,22 @@ TEST(RepeatingTaskTest, ClockIntegration) { handle.Stop(); } +TEST(RepeatingTaskTest, CanBeStoppedAfterTaskQueueDeletedTheRepeatingTask) { + std::unique_ptr repeating_task; + + MockTaskQueue task_queue; + EXPECT_CALL(task_queue, PostDelayedTask) + .WillOnce([&](std::unique_ptr task, uint32_t milliseconds) { + repeating_task = std::move(task); + }); + + RepeatingTaskHandle handle = + RepeatingTaskHandle::DelayedStart(&task_queue, TimeDelta::Millis(100), + [] { return TimeDelta::Millis(100); }); + + // shutdown task queue: delete all pending tasks and run 'regular' task. + repeating_task = nullptr; + handle.Stop(); +} + } // namespace webrtc