Make stopping of the RepeatingTask safer
Previous implementation assumes that though RepeatingTask is owned by the task queue, it will stay alive until RepeatingTaskHandler stops it. That assumption doesn't hold by one of downstream TaskQueue implementaions. That TaskQueue implementation shortly before destruction deletes pending delayed tasks because it doesn't plan to run them, and then runs remaining regular tasks. Bug: None Change-Id: Ic95fec2e9961b3f05727ff6fbdaf0664434a995b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221984 Reviewed-by: Tommi <tommi@webrtc.org> Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34274}
This commit is contained in:
parent
4bb81aca75
commit
0f9a8e33c0
@ -14,6 +14,7 @@ rtc_library("repeating_task") {
|
|||||||
"repeating_task.h",
|
"repeating_task.h",
|
||||||
]
|
]
|
||||||
deps = [
|
deps = [
|
||||||
|
":pending_task_safety_flag",
|
||||||
":to_queued_task",
|
":to_queued_task",
|
||||||
"..:logging",
|
"..:logging",
|
||||||
"..:timeutils",
|
"..:timeutils",
|
||||||
|
|||||||
@ -12,32 +12,36 @@
|
|||||||
|
|
||||||
#include "absl/memory/memory.h"
|
#include "absl/memory/memory.h"
|
||||||
#include "rtc_base/logging.h"
|
#include "rtc_base/logging.h"
|
||||||
|
#include "rtc_base/task_utils/pending_task_safety_flag.h"
|
||||||
#include "rtc_base/task_utils/to_queued_task.h"
|
#include "rtc_base/task_utils/to_queued_task.h"
|
||||||
#include "rtc_base/time_utils.h"
|
#include "rtc_base/time_utils.h"
|
||||||
|
|
||||||
namespace webrtc {
|
namespace webrtc {
|
||||||
namespace webrtc_repeating_task_impl {
|
namespace webrtc_repeating_task_impl {
|
||||||
|
|
||||||
RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue,
|
RepeatingTaskBase::RepeatingTaskBase(
|
||||||
|
TaskQueueBase* task_queue,
|
||||||
TimeDelta first_delay,
|
TimeDelta first_delay,
|
||||||
Clock* clock)
|
Clock* clock,
|
||||||
|
rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag)
|
||||||
: task_queue_(task_queue),
|
: task_queue_(task_queue),
|
||||||
clock_(clock),
|
clock_(clock),
|
||||||
next_run_time_(clock_->CurrentTime() + first_delay) {}
|
next_run_time_(clock_->CurrentTime() + first_delay),
|
||||||
|
alive_flag_(std::move(alive_flag)) {}
|
||||||
|
|
||||||
RepeatingTaskBase::~RepeatingTaskBase() = default;
|
RepeatingTaskBase::~RepeatingTaskBase() = default;
|
||||||
|
|
||||||
bool RepeatingTaskBase::Run() {
|
bool RepeatingTaskBase::Run() {
|
||||||
RTC_DCHECK_RUN_ON(task_queue_);
|
RTC_DCHECK_RUN_ON(task_queue_);
|
||||||
// Return true to tell the TaskQueue to destruct this object.
|
// Return true to tell the TaskQueue to destruct this object.
|
||||||
if (next_run_time_.IsPlusInfinity())
|
if (!alive_flag_->alive())
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
TimeDelta delay = RunClosure();
|
TimeDelta delay = RunClosure();
|
||||||
|
|
||||||
// The closure might have stopped this task, in which case we return true to
|
// The closure might have stopped this task, in which case we return true to
|
||||||
// destruct this object.
|
// destruct this object.
|
||||||
if (next_run_time_.IsPlusInfinity())
|
if (!alive_flag_->alive())
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
RTC_DCHECK(delay.IsFinite());
|
RTC_DCHECK(delay.IsFinite());
|
||||||
@ -53,33 +57,11 @@ bool RepeatingTaskBase::Run() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RepeatingTaskBase::Stop() {
|
|
||||||
RTC_DCHECK_RUN_ON(task_queue_);
|
|
||||||
RTC_DCHECK(next_run_time_.IsFinite());
|
|
||||||
next_run_time_ = Timestamp::PlusInfinity();
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace webrtc_repeating_task_impl
|
} // namespace webrtc_repeating_task_impl
|
||||||
|
|
||||||
RepeatingTaskHandle::RepeatingTaskHandle(RepeatingTaskHandle&& other)
|
|
||||||
: repeating_task_(other.repeating_task_) {
|
|
||||||
other.repeating_task_ = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
RepeatingTaskHandle& RepeatingTaskHandle::operator=(
|
|
||||||
RepeatingTaskHandle&& other) {
|
|
||||||
repeating_task_ = other.repeating_task_;
|
|
||||||
other.repeating_task_ = nullptr;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
RepeatingTaskHandle::RepeatingTaskHandle(
|
|
||||||
webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task)
|
|
||||||
: repeating_task_(repeating_task) {}
|
|
||||||
|
|
||||||
void RepeatingTaskHandle::Stop() {
|
void RepeatingTaskHandle::Stop() {
|
||||||
if (repeating_task_) {
|
if (repeating_task_) {
|
||||||
repeating_task_->Stop();
|
repeating_task_->SetNotAlive();
|
||||||
repeating_task_ = nullptr;
|
repeating_task_ = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,22 +19,19 @@
|
|||||||
#include "api/task_queue/task_queue_base.h"
|
#include "api/task_queue/task_queue_base.h"
|
||||||
#include "api/units/time_delta.h"
|
#include "api/units/time_delta.h"
|
||||||
#include "api/units/timestamp.h"
|
#include "api/units/timestamp.h"
|
||||||
|
#include "rtc_base/task_utils/pending_task_safety_flag.h"
|
||||||
#include "system_wrappers/include/clock.h"
|
#include "system_wrappers/include/clock.h"
|
||||||
|
|
||||||
namespace webrtc {
|
namespace webrtc {
|
||||||
|
|
||||||
class RepeatingTaskHandle;
|
|
||||||
|
|
||||||
namespace webrtc_repeating_task_impl {
|
namespace webrtc_repeating_task_impl {
|
||||||
class RepeatingTaskBase : public QueuedTask {
|
class RepeatingTaskBase : public QueuedTask {
|
||||||
public:
|
public:
|
||||||
RepeatingTaskBase(TaskQueueBase* task_queue,
|
RepeatingTaskBase(TaskQueueBase* task_queue,
|
||||||
TimeDelta first_delay,
|
TimeDelta first_delay,
|
||||||
Clock* clock);
|
Clock* clock,
|
||||||
|
rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag);
|
||||||
~RepeatingTaskBase() override;
|
~RepeatingTaskBase() override;
|
||||||
|
|
||||||
void Stop();
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
virtual TimeDelta RunClosure() = 0;
|
virtual TimeDelta RunClosure() = 0;
|
||||||
|
|
||||||
@ -42,9 +39,10 @@ class RepeatingTaskBase : public QueuedTask {
|
|||||||
|
|
||||||
TaskQueueBase* const task_queue_;
|
TaskQueueBase* const task_queue_;
|
||||||
Clock* const clock_;
|
Clock* const clock_;
|
||||||
// This is always finite, except for the special case where it's PlusInfinity
|
// This is always finite.
|
||||||
// to signal that the task should stop.
|
|
||||||
Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_);
|
Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_);
|
||||||
|
rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag_
|
||||||
|
RTC_GUARDED_BY(task_queue_);
|
||||||
};
|
};
|
||||||
|
|
||||||
// The template closure pattern is based on rtc::ClosureTask.
|
// The template closure pattern is based on rtc::ClosureTask.
|
||||||
@ -54,8 +52,12 @@ class RepeatingTaskImpl final : public RepeatingTaskBase {
|
|||||||
RepeatingTaskImpl(TaskQueueBase* task_queue,
|
RepeatingTaskImpl(TaskQueueBase* task_queue,
|
||||||
TimeDelta first_delay,
|
TimeDelta first_delay,
|
||||||
Closure&& closure,
|
Closure&& closure,
|
||||||
Clock* clock)
|
Clock* clock,
|
||||||
: RepeatingTaskBase(task_queue, first_delay, clock),
|
rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag)
|
||||||
|
: RepeatingTaskBase(task_queue,
|
||||||
|
first_delay,
|
||||||
|
clock,
|
||||||
|
std::move(alive_flag)),
|
||||||
closure_(std::forward<Closure>(closure)) {
|
closure_(std::forward<Closure>(closure)) {
|
||||||
static_assert(
|
static_assert(
|
||||||
std::is_same<TimeDelta,
|
std::is_same<TimeDelta,
|
||||||
@ -81,28 +83,27 @@ class RepeatingTaskHandle {
|
|||||||
public:
|
public:
|
||||||
RepeatingTaskHandle() = default;
|
RepeatingTaskHandle() = default;
|
||||||
~RepeatingTaskHandle() = default;
|
~RepeatingTaskHandle() = default;
|
||||||
RepeatingTaskHandle(RepeatingTaskHandle&& other);
|
RepeatingTaskHandle(RepeatingTaskHandle&& other) = default;
|
||||||
RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other);
|
RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other) = default;
|
||||||
RepeatingTaskHandle(const RepeatingTaskHandle&) = delete;
|
RepeatingTaskHandle(const RepeatingTaskHandle&) = delete;
|
||||||
RepeatingTaskHandle& operator=(const RepeatingTaskHandle&) = delete;
|
RepeatingTaskHandle& operator=(const RepeatingTaskHandle&) = delete;
|
||||||
|
|
||||||
// Start can be used to start a task that will be reposted with a delay
|
// Start can be used to start a task that will be reposted with a delay
|
||||||
// determined by the return value of the provided closure. The actual task is
|
// determined by the return value of the provided closure. The actual task is
|
||||||
// owned by the TaskQueue and will live until it has been stopped or the
|
// owned by the TaskQueue and will live until it has been stopped or the
|
||||||
// TaskQueue is destroyed. Note that this means that trying to stop the
|
// TaskQueue deletes it. It's perfectly fine to destroy the handle while the
|
||||||
// repeating task after the TaskQueue is destroyed is an error. However, it's
|
// task is running, since the repeated task is owned by the TaskQueue.
|
||||||
// perfectly fine to destroy the handle while the task is running, since the
|
|
||||||
// repeated task is owned by the TaskQueue.
|
|
||||||
template <class Closure>
|
template <class Closure>
|
||||||
static RepeatingTaskHandle Start(TaskQueueBase* task_queue,
|
static RepeatingTaskHandle Start(TaskQueueBase* task_queue,
|
||||||
Closure&& closure,
|
Closure&& closure,
|
||||||
Clock* clock = Clock::GetRealTimeClock()) {
|
Clock* clock = Clock::GetRealTimeClock()) {
|
||||||
auto repeating_task = std::make_unique<
|
auto alive_flag = PendingTaskSafetyFlag::CreateDetached();
|
||||||
|
task_queue->PostTask(
|
||||||
|
std::make_unique<
|
||||||
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
|
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
|
||||||
task_queue, TimeDelta::Zero(), std::forward<Closure>(closure), clock);
|
task_queue, TimeDelta::Zero(), std::forward<Closure>(closure),
|
||||||
auto* repeating_task_ptr = repeating_task.get();
|
clock, alive_flag));
|
||||||
task_queue->PostTask(std::move(repeating_task));
|
return RepeatingTaskHandle(std::move(alive_flag));
|
||||||
return RepeatingTaskHandle(repeating_task_ptr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DelayedStart is equivalent to Start except that the first invocation of the
|
// DelayedStart is equivalent to Start except that the first invocation of the
|
||||||
@ -113,12 +114,14 @@ class RepeatingTaskHandle {
|
|||||||
TimeDelta first_delay,
|
TimeDelta first_delay,
|
||||||
Closure&& closure,
|
Closure&& closure,
|
||||||
Clock* clock = Clock::GetRealTimeClock()) {
|
Clock* clock = Clock::GetRealTimeClock()) {
|
||||||
auto repeating_task = std::make_unique<
|
auto alive_flag = PendingTaskSafetyFlag::CreateDetached();
|
||||||
|
task_queue->PostDelayedTask(
|
||||||
|
std::make_unique<
|
||||||
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
|
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
|
||||||
task_queue, first_delay, std::forward<Closure>(closure), clock);
|
task_queue, first_delay, std::forward<Closure>(closure), clock,
|
||||||
auto* repeating_task_ptr = repeating_task.get();
|
alive_flag),
|
||||||
task_queue->PostDelayedTask(std::move(repeating_task), first_delay.ms());
|
first_delay.ms());
|
||||||
return RepeatingTaskHandle(repeating_task_ptr);
|
return RepeatingTaskHandle(std::move(alive_flag));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stops future invocations of the repeating task closure. Can only be called
|
// Stops future invocations of the repeating task closure. Can only be called
|
||||||
@ -127,15 +130,15 @@ class RepeatingTaskHandle {
|
|||||||
// closure itself.
|
// closure itself.
|
||||||
void Stop();
|
void Stop();
|
||||||
|
|
||||||
// Returns true if Start() or DelayedStart() was called most recently. Returns
|
// Returns true until Stop() was called.
|
||||||
// false initially and if Stop() or PostStop() was called most recently.
|
// Can only be called from the TaskQueue where the task is running.
|
||||||
bool Running() const;
|
bool Running() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
explicit RepeatingTaskHandle(
|
explicit RepeatingTaskHandle(
|
||||||
webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task);
|
rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag)
|
||||||
// Owned by the task queue.
|
: repeating_task_(std::move(alive_flag)) {}
|
||||||
webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task_ = nullptr;
|
rtc::scoped_refptr<PendingTaskSafetyFlag> repeating_task_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace webrtc
|
} // namespace webrtc
|
||||||
|
|||||||
@ -276,4 +276,22 @@ TEST(RepeatingTaskTest, ClockIntegration) {
|
|||||||
handle.Stop();
|
handle.Stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(RepeatingTaskTest, CanBeStoppedAfterTaskQueueDeletedTheRepeatingTask) {
|
||||||
|
std::unique_ptr<QueuedTask> repeating_task;
|
||||||
|
|
||||||
|
MockTaskQueue task_queue;
|
||||||
|
EXPECT_CALL(task_queue, PostDelayedTask)
|
||||||
|
.WillOnce([&](std::unique_ptr<QueuedTask> task, uint32_t milliseconds) {
|
||||||
|
repeating_task = std::move(task);
|
||||||
|
});
|
||||||
|
|
||||||
|
RepeatingTaskHandle handle =
|
||||||
|
RepeatingTaskHandle::DelayedStart(&task_queue, TimeDelta::Millis(100),
|
||||||
|
[] { return TimeDelta::Millis(100); });
|
||||||
|
|
||||||
|
// shutdown task queue: delete all pending tasks and run 'regular' task.
|
||||||
|
repeating_task = nullptr;
|
||||||
|
handle.Stop();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace webrtc
|
} // namespace webrtc
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user