Refactor RepeatingTask to use absl::AnyInvocable functions of TaskQueue

Bug: webrtc:14245
Change-Id: Ie02755a4bb732cc25b3a22511e6d8920fc434c65
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267847
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37461}
This commit is contained in:
Danil Chapovalov 2022-07-05 21:26:06 +02:00 committed by WebRTC LUCI CQ
parent e76daab8b3
commit dde7fe4fc5
3 changed files with 74 additions and 83 deletions

View File

@ -19,15 +19,11 @@ rtc_library("repeating_task") {
"../../api:sequence_checker", "../../api:sequence_checker",
"../../api/task_queue", "../../api/task_queue",
"../../api/task_queue:pending_task_safety_flag", "../../api/task_queue:pending_task_safety_flag",
"../../api/task_queue:to_queued_task",
"../../api/units:time_delta", "../../api/units:time_delta",
"../../api/units:timestamp", "../../api/units:timestamp",
"../../system_wrappers:system_wrappers", "../../system_wrappers:system_wrappers",
] ]
absl_deps = [ absl_deps = [ "//third_party/abseil-cpp/absl/functional:any_invocable" ]
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
]
} }
rtc_library("pending_task_safety_flag") { rtc_library("pending_task_safety_flag") {
@ -51,9 +47,11 @@ if (rtc_include_tests) {
"..:task_queue_for_test", "..:task_queue_for_test",
"../../api/task_queue", "../../api/task_queue",
"../../api/task_queue:to_queued_task", "../../api/task_queue:to_queued_task",
"../../api/units:time_delta",
"../../api/units:timestamp", "../../api/units:timestamp",
"../../system_wrappers:system_wrappers", "../../system_wrappers:system_wrappers",
"../../test:test_support", "../../test:test_support",
] ]
absl_deps = [ "//third_party/abseil-cpp/absl/functional:any_invocable" ]
} }
} }

View File

@ -11,16 +11,13 @@
#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/task_utils/repeating_task.h"
#include "absl/functional/any_invocable.h" #include "absl/functional/any_invocable.h"
#include "absl/memory/memory.h"
#include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
namespace webrtc { namespace webrtc {
namespace { namespace {
class RepeatingTask : public QueuedTask { class RepeatingTask {
public: public:
RepeatingTask(TaskQueueBase* task_queue, RepeatingTask(TaskQueueBase* task_queue,
TaskQueueBase::DelayPrecision precision, TaskQueueBase::DelayPrecision precision,
@ -28,11 +25,13 @@ class RepeatingTask : public QueuedTask {
absl::AnyInvocable<TimeDelta()> task, absl::AnyInvocable<TimeDelta()> task,
Clock* clock, Clock* clock,
rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag); rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag);
~RepeatingTask() override = default; RepeatingTask(RepeatingTask&&) = default;
RepeatingTask& operator=(RepeatingTask&&) = delete;
~RepeatingTask() = default;
void operator()() &&;
private: private:
bool Run() final;
TaskQueueBase* const task_queue_; TaskQueueBase* const task_queue_;
const TaskQueueBase::DelayPrecision precision_; const TaskQueueBase::DelayPrecision precision_;
Clock* const clock_; Clock* const clock_;
@ -57,33 +56,27 @@ RepeatingTask::RepeatingTask(
next_run_time_(clock_->CurrentTime() + first_delay), next_run_time_(clock_->CurrentTime() + first_delay),
alive_flag_(std::move(alive_flag)) {} alive_flag_(std::move(alive_flag)) {}
bool RepeatingTask::Run() { void RepeatingTask::operator()() && {
RTC_DCHECK_RUN_ON(task_queue_); RTC_DCHECK_RUN_ON(task_queue_);
// Return true to tell the TaskQueue to destruct this object.
if (!alive_flag_->alive()) if (!alive_flag_->alive())
return true; return;
webrtc_repeating_task_impl::RepeatingTaskImplDTraceProbeRun(); webrtc_repeating_task_impl::RepeatingTaskImplDTraceProbeRun();
TimeDelta delay = task_(); TimeDelta delay = task_();
RTC_DCHECK_GE(delay, TimeDelta::Zero()); RTC_DCHECK_GE(delay, TimeDelta::Zero());
// A delay of +infinity means that the task should not be run again. // A delay of +infinity means that the task should not be run again.
// Alternatively, the closure might have stopped this task. In either which // Alternatively, the closure might have stopped this task.
// case we return true to destruct this object.
if (delay.IsPlusInfinity() || !alive_flag_->alive()) if (delay.IsPlusInfinity() || !alive_flag_->alive())
return true; return;
TimeDelta lost_time = clock_->CurrentTime() - next_run_time_; TimeDelta lost_time = clock_->CurrentTime() - next_run_time_;
next_run_time_ += delay; next_run_time_ += delay;
delay -= lost_time; delay -= lost_time;
delay = std::max(delay, TimeDelta::Zero()); delay = std::max(delay, TimeDelta::Zero());
task_queue_->PostDelayedTaskWithPrecision(precision_, absl::WrapUnique(this), task_queue_->PostDelayedTaskWithPrecision(precision_, std::move(*this),
delay.ms()); delay);
// Return false to tell the TaskQueue to not destruct this object since we
// have taken ownership with absl::WrapUnique.
return false;
} }
} // namespace } // namespace
@ -95,8 +88,7 @@ RepeatingTaskHandle RepeatingTaskHandle::Start(
Clock* clock) { Clock* clock) {
auto alive_flag = PendingTaskSafetyFlag::CreateDetached(); auto alive_flag = PendingTaskSafetyFlag::CreateDetached();
webrtc_repeating_task_impl::RepeatingTaskHandleDTraceProbeStart(); webrtc_repeating_task_impl::RepeatingTaskHandleDTraceProbeStart();
task_queue->PostTask( task_queue->PostTask(RepeatingTask(task_queue, precision, TimeDelta::Zero(),
std::make_unique<RepeatingTask>(task_queue, precision, TimeDelta::Zero(),
std::move(closure), clock, alive_flag)); std::move(closure), clock, alive_flag));
return RepeatingTaskHandle(std::move(alive_flag)); return RepeatingTaskHandle(std::move(alive_flag));
} }
@ -113,9 +105,9 @@ RepeatingTaskHandle RepeatingTaskHandle::DelayedStart(
webrtc_repeating_task_impl::RepeatingTaskHandleDTraceProbeDelayedStart(); webrtc_repeating_task_impl::RepeatingTaskHandleDTraceProbeDelayedStart();
task_queue->PostDelayedTaskWithPrecision( task_queue->PostDelayedTaskWithPrecision(
precision, precision,
std::make_unique<RepeatingTask>(task_queue, precision, first_delay, RepeatingTask(task_queue, precision, first_delay, std::move(closure),
std::move(closure), clock, alive_flag), clock, alive_flag),
first_delay.ms()); first_delay);
return RepeatingTaskHandle(std::move(alive_flag)); return RepeatingTaskHandle(std::move(alive_flag));
} }

View File

@ -13,9 +13,9 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include "api/task_queue/queued_task.h" #include "absl/functional/any_invocable.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h" #include "api/units/time_delta.h"
#include "api/units/timestamp.h" #include "api/units/timestamp.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/task_queue_for_test.h" #include "rtc_base/task_queue_for_test.h"
@ -46,10 +46,14 @@ class MockTaskQueue : public TaskQueueBase {
MockTaskQueue() : task_queue_setter_(this) {} MockTaskQueue() : task_queue_setter_(this) {}
MOCK_METHOD(void, Delete, (), (override)); MOCK_METHOD(void, Delete, (), (override));
MOCK_METHOD(void, PostTask, (std::unique_ptr<QueuedTask> task), (override)); MOCK_METHOD(void, PostTask, (absl::AnyInvocable<void() &&>), (override));
MOCK_METHOD(void, MOCK_METHOD(void,
PostDelayedTask, PostDelayedTask,
(std::unique_ptr<QueuedTask> task, uint32_t milliseconds), (absl::AnyInvocable<void() &&>, TimeDelta),
(override));
MOCK_METHOD(void,
PostDelayedHighPrecisionTask,
(absl::AnyInvocable<void() &&>, TimeDelta),
(override)); (override));
private: private:
@ -63,45 +67,41 @@ class FakeTaskQueue : public TaskQueueBase {
void Delete() override {} void Delete() override {}
void PostTask(std::unique_ptr<QueuedTask> task) override { void PostTask(absl::AnyInvocable<void() &&> task) override {
last_task_ = std::move(task); last_task_ = std::move(task);
last_precision_ = absl::nullopt; last_precision_ = absl::nullopt;
last_delay_ = 0; last_delay_ = TimeDelta::Zero();
} }
void PostDelayedTask(std::unique_ptr<QueuedTask> task, void PostDelayedTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) override { TimeDelta delay) override {
last_task_ = std::move(task); last_task_ = std::move(task);
last_precision_ = TaskQueueBase::DelayPrecision::kLow; last_precision_ = TaskQueueBase::DelayPrecision::kLow;
last_delay_ = milliseconds; last_delay_ = delay;
} }
void PostDelayedHighPrecisionTask(std::unique_ptr<QueuedTask> task, void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) override { TimeDelta delay) override {
last_task_ = std::move(task); last_task_ = std::move(task);
last_precision_ = TaskQueueBase::DelayPrecision::kHigh; last_precision_ = TaskQueueBase::DelayPrecision::kHigh;
last_delay_ = milliseconds; last_delay_ = delay;
} }
bool AdvanceTimeAndRunLastTask() { bool AdvanceTimeAndRunLastTask() {
EXPECT_TRUE(last_task_); EXPECT_TRUE(last_task_);
EXPECT_TRUE(last_delay_); EXPECT_TRUE(last_delay_.IsFinite());
clock_->AdvanceTimeMilliseconds(last_delay_.value_or(0)); clock_->AdvanceTime(last_delay_);
last_delay_.reset(); last_delay_ = TimeDelta::MinusInfinity();
auto task = std::move(last_task_); auto task = std::move(last_task_);
bool delete_task = task->Run(); std::move(task)();
if (!delete_task) { return last_task_ == nullptr;
// If the task should not be deleted then just release it.
task.release();
}
return delete_task;
} }
bool IsTaskQueued() { return !!last_task_; } bool IsTaskQueued() { return !!last_task_; }
uint32_t last_delay() const { TimeDelta last_delay() const {
EXPECT_TRUE(last_delay_.has_value()); EXPECT_TRUE(last_delay_.IsFinite());
return last_delay_.value_or(-1); return last_delay_;
} }
absl::optional<TaskQueueBase::DelayPrecision> last_precision() const { absl::optional<TaskQueueBase::DelayPrecision> last_precision() const {
@ -111,8 +111,8 @@ class FakeTaskQueue : public TaskQueueBase {
private: private:
CurrentTaskQueueSetter task_queue_setter_; CurrentTaskQueueSetter task_queue_setter_;
SimulatedClock* clock_; SimulatedClock* clock_;
std::unique_ptr<QueuedTask> last_task_; absl::AnyInvocable<void() &&> last_task_;
absl::optional<uint32_t> last_delay_; TimeDelta last_delay_ = TimeDelta::MinusInfinity();
absl::optional<TaskQueueBase::DelayPrecision> last_precision_; absl::optional<TaskQueueBase::DelayPrecision> last_precision_;
}; };
@ -146,16 +146,19 @@ TEST(RepeatingTaskTest, TaskIsStoppedOnStop) {
SimulatedClock clock(Timestamp::Zero()); SimulatedClock clock(Timestamp::Zero());
FakeTaskQueue task_queue(&clock); FakeTaskQueue task_queue(&clock);
std::atomic_int counter(0); std::atomic_int counter(0);
auto handle = RepeatingTaskHandle::Start(&task_queue, [&] { auto handle = RepeatingTaskHandle::Start(
&task_queue,
[&] {
counter++; counter++;
return kShortInterval; return kShortInterval;
}); },
EXPECT_EQ(task_queue.last_delay(), 0u); TaskQueueBase::DelayPrecision::kLow, &clock);
EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero());
EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask());
EXPECT_EQ(counter.load(), 1); EXPECT_EQ(counter.load(), 1);
// The handle reposted at the short interval. // The handle reposted at the short interval.
EXPECT_EQ(task_queue.last_delay(), kShortInterval.ms()); EXPECT_EQ(task_queue.last_delay(), kShortInterval);
// Stop the handle. This prevernts the counter from incrementing. // Stop the handle. This prevernts the counter from incrementing.
handle.Stop(); handle.Stop();
@ -182,12 +185,12 @@ TEST(RepeatingTaskTest, CompensatesForLongRunTime) {
}, },
TaskQueueBase::DelayPrecision::kLow, &clock); TaskQueueBase::DelayPrecision::kLow, &clock);
EXPECT_EQ(task_queue.last_delay(), 0u); EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero());
EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask());
// Task is posted right away since it took longer to run then the repeat // Task is posted right away since it took longer to run then the repeat
// interval. // interval.
EXPECT_EQ(task_queue.last_delay(), 0u); EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero());
EXPECT_EQ(counter.load(), 1); EXPECT_EQ(counter.load(), 1);
} }
@ -206,11 +209,11 @@ TEST(RepeatingTaskTest, CompensatesForShortRunTime) {
TaskQueueBase::DelayPrecision::kLow, &clock); TaskQueueBase::DelayPrecision::kLow, &clock);
// Expect instant post task. // Expect instant post task.
EXPECT_EQ(task_queue.last_delay(), 0u); EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero());
// Task should be retained by the handler since it is not cancelled. // Task should be retained by the handler since it is not cancelled.
EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask()); EXPECT_FALSE(task_queue.AdvanceTimeAndRunLastTask());
// New delay should be 200ms since repeat delay was 300ms but task took 100ms. // New delay should be 200ms since repeat delay was 300ms but task took 100ms.
EXPECT_EQ(task_queue.last_delay(), 200u); EXPECT_EQ(task_queue.last_delay(), TimeDelta::Millis(200));
} }
TEST(RepeatingTaskTest, CancelDelayedTaskBeforeItRuns) { TEST(RepeatingTaskTest, CancelDelayedTaskBeforeItRuns) {
@ -248,7 +251,7 @@ TEST(RepeatingTaskTest, TaskCanStopItself) {
handle.Stop(); handle.Stop();
return TimeDelta::Millis(2); return TimeDelta::Millis(2);
}); });
EXPECT_EQ(task_queue.last_delay(), 0u); EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero());
// Task cancelled itself so wants to be released. // Task cancelled itself so wants to be released.
EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask());
EXPECT_EQ(counter.load(), 1); EXPECT_EQ(counter.load(), 1);
@ -262,7 +265,7 @@ TEST(RepeatingTaskTest, TaskCanStopItselfByReturningInfinity) {
++counter; ++counter;
return TimeDelta::PlusInfinity(); return TimeDelta::PlusInfinity();
}); });
EXPECT_EQ(task_queue.last_delay(), 0u); EXPECT_EQ(task_queue.last_delay(), TimeDelta::Zero());
// Task cancelled itself so wants to be released. // Task cancelled itself so wants to be released.
EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask()); EXPECT_TRUE(task_queue.AdvanceTimeAndRunLastTask());
EXPECT_EQ(counter.load(), 1); EXPECT_EQ(counter.load(), 1);
@ -331,20 +334,18 @@ TEST(RepeatingTaskTest, Example) {
} }
TEST(RepeatingTaskTest, ClockIntegration) { TEST(RepeatingTaskTest, ClockIntegration) {
std::unique_ptr<QueuedTask> delayed_task; absl::AnyInvocable<void() &&> delayed_task;
uint32_t expected_ms = 0; TimeDelta expected_delay = TimeDelta::Zero();
SimulatedClock clock(Timestamp::Millis(0)); SimulatedClock clock(Timestamp::Millis(0));
NiceMock<MockTaskQueue> task_queue; NiceMock<MockTaskQueue> task_queue;
ON_CALL(task_queue, PostDelayedTask) ON_CALL(task_queue, PostDelayedTask)
.WillByDefault( .WillByDefault([&](absl::AnyInvocable<void() &&> task, TimeDelta delay) {
Invoke([&delayed_task, &expected_ms](std::unique_ptr<QueuedTask> task, EXPECT_EQ(delay, expected_delay);
uint32_t milliseconds) {
EXPECT_EQ(milliseconds, expected_ms);
delayed_task = std::move(task); delayed_task = std::move(task);
})); });
expected_ms = 100; expected_delay = TimeDelta::Millis(100);
RepeatingTaskHandle handle = RepeatingTaskHandle::DelayedStart( RepeatingTaskHandle handle = RepeatingTaskHandle::DelayedStart(
&task_queue, TimeDelta::Millis(100), &task_queue, TimeDelta::Millis(100),
[&clock]() { [&clock]() {
@ -356,19 +357,19 @@ TEST(RepeatingTaskTest, ClockIntegration) {
TaskQueueBase::DelayPrecision::kLow, &clock); TaskQueueBase::DelayPrecision::kLow, &clock);
clock.AdvanceTimeMilliseconds(100); clock.AdvanceTimeMilliseconds(100);
QueuedTask* task_to_run = delayed_task.release(); absl::AnyInvocable<void()&&> task_to_run = std::move(delayed_task);
expected_ms = 90; expected_delay = TimeDelta::Millis(90);
EXPECT_FALSE(task_to_run->Run()); std::move(task_to_run)();
EXPECT_NE(nullptr, delayed_task.get()); EXPECT_NE(delayed_task, nullptr);
handle.Stop(); handle.Stop();
} }
TEST(RepeatingTaskTest, CanBeStoppedAfterTaskQueueDeletedTheRepeatingTask) { TEST(RepeatingTaskTest, CanBeStoppedAfterTaskQueueDeletedTheRepeatingTask) {
std::unique_ptr<QueuedTask> repeating_task; absl::AnyInvocable<void() &&> repeating_task;
MockTaskQueue task_queue; MockTaskQueue task_queue;
EXPECT_CALL(task_queue, PostDelayedTask) EXPECT_CALL(task_queue, PostDelayedTask)
.WillOnce([&](std::unique_ptr<QueuedTask> task, uint32_t milliseconds) { .WillOnce([&](absl::AnyInvocable<void() &&> task, TimeDelta delay) {
repeating_task = std::move(task); repeating_task = std::move(task);
}); });