Introduce new interface for TaskQueueBase using absl::AnyInvocable
Bug: webrtc:14245 Change-Id: Ie4f47ea9753d6644aec2e95f531b521cc119a6c8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267402 Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37439}
This commit is contained in:
parent
4f1af1156f
commit
8feb6fd1e9
@ -21,10 +21,12 @@ rtc_library("task_queue") {
|
||||
"../../rtc_base:checks",
|
||||
"../../rtc_base:macromagic",
|
||||
"../../rtc_base/system:rtc_export",
|
||||
"../units:time_delta",
|
||||
]
|
||||
absl_deps = [
|
||||
"//third_party/abseil-cpp/absl/base:config",
|
||||
"//third_party/abseil-cpp/absl/base:core_headers",
|
||||
"//third_party/abseil-cpp/absl/functional:any_invocable",
|
||||
"//third_party/abseil-cpp/absl/strings",
|
||||
]
|
||||
}
|
||||
@ -55,20 +57,20 @@ rtc_library("task_queue_test") {
|
||||
"../../test:test_support",
|
||||
]
|
||||
absl_deps = [
|
||||
"//third_party/abseil-cpp/absl/memory",
|
||||
"//third_party/abseil-cpp/absl/cleanup",
|
||||
"//third_party/abseil-cpp/absl/strings",
|
||||
]
|
||||
} else {
|
||||
deps = [
|
||||
":task_queue",
|
||||
"../../api/task_queue:to_queued_task",
|
||||
"../../api/units:time_delta",
|
||||
"../../rtc_base:refcount",
|
||||
"../../rtc_base:rtc_event",
|
||||
"../../rtc_base:timeutils",
|
||||
"../../test:test_support",
|
||||
]
|
||||
absl_deps = [
|
||||
"//third_party/abseil-cpp/absl/memory",
|
||||
"//third_party/abseil-cpp/absl/cleanup",
|
||||
"//third_party/abseil-cpp/absl/strings",
|
||||
]
|
||||
}
|
||||
|
||||
@ -11,6 +11,8 @@
|
||||
|
||||
#include "absl/base/attributes.h"
|
||||
#include "absl/base/config.h"
|
||||
#include "absl/functional/any_invocable.h"
|
||||
#include "api/units/time_delta.h"
|
||||
#include "rtc_base/checks.h"
|
||||
|
||||
#if defined(ABSL_HAVE_THREAD_LOCAL)
|
||||
@ -77,3 +79,69 @@ TaskQueueBase::CurrentTaskQueueSetter::~CurrentTaskQueueSetter() {
|
||||
#else
|
||||
#error Unsupported platform
|
||||
#endif
|
||||
|
||||
// Functions to support transition from std::unique_ptr<QueuedTask>
|
||||
// representation of a task to absl::AnyInvocable<void() &&> representation. In
|
||||
// the base interface older and newer functions call each other. This way
|
||||
// TaskQueue would work when classes derived from TaskQueueBase implements
|
||||
// either old set of Post functions (before the transition) or new set of Post
|
||||
// functions. Thus callers of the interface can be updated independently of all
|
||||
// of the implementations of that TaskQueueBase interface.
|
||||
|
||||
// TODO(bugs.webrtc.org/14245): Delete these functions when transition is
|
||||
// complete.
|
||||
namespace webrtc {
|
||||
namespace {
|
||||
class Task : public QueuedTask {
|
||||
public:
|
||||
explicit Task(absl::AnyInvocable<void() &&> task) : task_(std::move(task)) {}
|
||||
~Task() override = default;
|
||||
|
||||
bool Run() override {
|
||||
std::move(task_)();
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
absl::AnyInvocable<void() &&> task_;
|
||||
};
|
||||
|
||||
std::unique_ptr<QueuedTask> ToLegacy(absl::AnyInvocable<void() &&> task) {
|
||||
return std::make_unique<Task>(std::move(task));
|
||||
}
|
||||
|
||||
absl::AnyInvocable<void() &&> FromLegacy(std::unique_ptr<QueuedTask> task) {
|
||||
return [task = std::move(task)]() mutable {
|
||||
if (!task->Run()) {
|
||||
task.release();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void TaskQueueBase::PostTask(std::unique_ptr<QueuedTask> task) {
|
||||
PostTask(FromLegacy(std::move(task)));
|
||||
}
|
||||
|
||||
void TaskQueueBase::PostTask(absl::AnyInvocable<void() &&> task) {
|
||||
PostTask(ToLegacy(std::move(task)));
|
||||
}
|
||||
|
||||
void TaskQueueBase::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
||||
uint32_t milliseconds) {
|
||||
PostDelayedTask(FromLegacy(std::move(task)), TimeDelta::Millis(milliseconds));
|
||||
}
|
||||
|
||||
void TaskQueueBase::PostDelayedTask(absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay) {
|
||||
PostDelayedTask(ToLegacy(std::move(task)), delay.ms());
|
||||
}
|
||||
|
||||
void TaskQueueBase::PostDelayedHighPrecisionTask(
|
||||
absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay) {
|
||||
PostDelayedHighPrecisionTask(ToLegacy(std::move(task)), delay.ms());
|
||||
}
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
@ -13,7 +13,9 @@
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include "absl/functional/any_invocable.h"
|
||||
#include "api/task_queue/queued_task.h"
|
||||
#include "api/units/time_delta.h"
|
||||
#include "rtc_base/system/rtc_export.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
|
||||
@ -48,27 +50,32 @@ class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
|
||||
// was created on.
|
||||
virtual void Delete() = 0;
|
||||
|
||||
// Schedules a task to execute. Tasks are executed in FIFO order.
|
||||
// If `task->Run()` returns true, task is deleted on the task queue
|
||||
// before next QueuedTask starts executing.
|
||||
// Schedules a `task` to execute. Tasks are executed in FIFO order.
|
||||
// When a TaskQueue is deleted, pending tasks will not be executed but they
|
||||
// will be deleted. The deletion of tasks may happen synchronously on the
|
||||
// TaskQueue or it may happen asynchronously after TaskQueue is deleted.
|
||||
// This may vary from one implementation to the next so assumptions about
|
||||
// lifetimes of pending tasks should not be made.
|
||||
// May be called on any thread or task queue, including this task queue.
|
||||
virtual void PostTask(std::unique_ptr<QueuedTask> task) = 0;
|
||||
// TODO(bugs.webrtc.org/14245): Make pure virtual when implemented in all
|
||||
// derived classes.
|
||||
virtual void PostTask(absl::AnyInvocable<void() &&> task);
|
||||
|
||||
// Deprecated, use PostTask variant above in new code.
|
||||
// TODO(bugs.webrtc.org/14245): Delete when all usage is updated to the
|
||||
// function above.
|
||||
virtual void PostTask(std::unique_ptr<QueuedTask> task);
|
||||
|
||||
// Prefer PostDelayedTask() over PostDelayedHighPrecisionTask() whenever
|
||||
// possible.
|
||||
//
|
||||
// Schedules a task to execute a specified number of milliseconds from when
|
||||
// the call is made, using "low" precision. All scheduling is affected by
|
||||
// OS-specific leeway and current workloads which means that in terms of
|
||||
// precision there are no hard guarantees, but in addition to the OS induced
|
||||
// leeway, "low" precision adds up to a 17 ms additional leeway. The purpose
|
||||
// of this leeway is to achieve more efficient CPU scheduling and reduce Idle
|
||||
// Wake Up frequency.
|
||||
// Schedules a `task` to execute a specified `delay` from when the call is
|
||||
// made, using "low" precision. All scheduling is affected by OS-specific
|
||||
// leeway and current workloads which means that in terms of precision there
|
||||
// are no hard guarantees, but in addition to the OS induced leeway, "low"
|
||||
// precision adds up to a 17 ms additional leeway. The purpose of this leeway
|
||||
// is to achieve more efficient CPU scheduling and reduce Idle Wake Up
|
||||
// frequency.
|
||||
//
|
||||
// The task may execute with [-1, 17 + OS induced leeway) ms additional delay.
|
||||
//
|
||||
@ -82,16 +89,24 @@ class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
|
||||
// https://crbug.com/webrtc/13583 for more information.
|
||||
//
|
||||
// May be called on any thread or task queue, including this task queue.
|
||||
// TODO(bugs.webrtc.org/14245): Make pure virtual when implemented in all
|
||||
// derived classes.
|
||||
virtual void PostDelayedTask(absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay);
|
||||
|
||||
// Deprecated, use PostDelayedTask variant above in new code.
|
||||
// TODO(bugs.webrtc.org/14245): Delete when all usage is updated to the
|
||||
// function above.
|
||||
virtual void PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
||||
uint32_t milliseconds) = 0;
|
||||
uint32_t milliseconds);
|
||||
|
||||
// Prefer PostDelayedTask() over PostDelayedHighPrecisionTask() whenever
|
||||
// possible.
|
||||
//
|
||||
// Schedules a task to execute a specified number of milliseconds from when
|
||||
// the call is made, using "high" precision. All scheduling is affected by
|
||||
// OS-specific leeway and current workloads which means that in terms of
|
||||
// precision there are no hard guarantees.
|
||||
// Schedules a `task` to execute a specified `delay` from when the call is
|
||||
// made, using "high" precision. All scheduling is affected by OS-specific
|
||||
// leeway and current workloads which means that in terms of precision there
|
||||
// are no hard guarantees.
|
||||
//
|
||||
// The task may execute with [-1, OS induced leeway] ms additional delay.
|
||||
//
|
||||
@ -101,15 +116,37 @@ class RTC_LOCKABLE RTC_EXPORT TaskQueueBase {
|
||||
// battery, when the timer precision can be as poor as 15 ms.
|
||||
//
|
||||
// May be called on any thread or task queue, including this task queue.
|
||||
// TODO(bugs.webrtc.org/14245): Make pure virtual when implemented in all
|
||||
// derived classes.
|
||||
virtual void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay);
|
||||
|
||||
// Deprecated, use `PostDelayedHighPrecisionTask` variant above in new code.
|
||||
// TODO(bugs.webrtc.org/14245): Delete when all usage is updated to the
|
||||
// function above.
|
||||
virtual void PostDelayedHighPrecisionTask(std::unique_ptr<QueuedTask> task,
|
||||
uint32_t milliseconds) {
|
||||
// Remove default implementation when dependencies have implemented this
|
||||
// method.
|
||||
PostDelayedTask(std::move(task), milliseconds);
|
||||
}
|
||||
|
||||
// As specified by |precision|, calls either PostDelayedTask() or
|
||||
// As specified by `precision`, calls either PostDelayedTask() or
|
||||
// PostDelayedHighPrecisionTask().
|
||||
void PostDelayedTaskWithPrecision(DelayPrecision precision,
|
||||
absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay) {
|
||||
switch (precision) {
|
||||
case DelayPrecision::kLow:
|
||||
PostDelayedTask(std::move(task), delay);
|
||||
break;
|
||||
case DelayPrecision::kHigh:
|
||||
PostDelayedHighPrecisionTask(std::move(task), delay);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Deprecated, use `PostDelayedTaskWithPrecision` variant above in new code.
|
||||
// TODO(bugs.webrtc.org/14245): Delete when all usage is updated to the
|
||||
// function above.
|
||||
void PostDelayedTaskWithPrecision(DelayPrecision precision,
|
||||
std::unique_ptr<QueuedTask> task,
|
||||
uint32_t milliseconds) {
|
||||
|
||||
@ -9,9 +9,11 @@
|
||||
*/
|
||||
#include "api/task_queue/task_queue_test.h"
|
||||
|
||||
#include "absl/memory/memory.h"
|
||||
#include <memory>
|
||||
|
||||
#include "absl/cleanup/cleanup.h"
|
||||
#include "absl/strings/string_view.h"
|
||||
#include "api/task_queue/to_queued_task.h"
|
||||
#include "api/units/time_delta.h"
|
||||
#include "rtc_base/event.h"
|
||||
#include "rtc_base/ref_counter.h"
|
||||
#include "rtc_base/time_utils.h"
|
||||
@ -43,10 +45,10 @@ TEST_P(TaskQueueTest, PostAndCheckCurrent) {
|
||||
// means that TaskQueueBase::Current() will still return a valid value.
|
||||
EXPECT_FALSE(queue->IsCurrent());
|
||||
|
||||
queue->PostTask(ToQueuedTask([&event, &queue] {
|
||||
queue->PostTask([&event, &queue] {
|
||||
EXPECT_TRUE(queue->IsCurrent());
|
||||
event.Set();
|
||||
}));
|
||||
});
|
||||
EXPECT_TRUE(event.Wait(1000));
|
||||
}
|
||||
|
||||
@ -55,20 +57,17 @@ TEST_P(TaskQueueTest, PostCustomTask) {
|
||||
rtc::Event ran;
|
||||
auto queue = CreateTaskQueue(factory, "PostCustomImplementation");
|
||||
|
||||
class CustomTask : public QueuedTask {
|
||||
class CustomTask {
|
||||
public:
|
||||
explicit CustomTask(rtc::Event* ran) : ran_(ran) {}
|
||||
|
||||
private:
|
||||
bool Run() override {
|
||||
ran_->Set();
|
||||
return false; // Do not allow the task to be deleted by the queue.
|
||||
}
|
||||
void operator()() { ran_->Set(); }
|
||||
|
||||
private:
|
||||
rtc::Event* const ran_;
|
||||
} my_task(&ran);
|
||||
|
||||
queue->PostTask(absl::WrapUnique(&my_task));
|
||||
queue->PostTask(my_task);
|
||||
EXPECT_TRUE(ran.Wait(1000));
|
||||
}
|
||||
|
||||
@ -77,7 +76,7 @@ TEST_P(TaskQueueTest, PostDelayedZero) {
|
||||
rtc::Event event;
|
||||
auto queue = CreateTaskQueue(factory, "PostDelayedZero");
|
||||
|
||||
queue->PostDelayedTask(ToQueuedTask([&event] { event.Set(); }), 0);
|
||||
queue->PostDelayedTask([&event] { event.Set(); }, TimeDelta::Zero());
|
||||
EXPECT_TRUE(event.Wait(1000));
|
||||
}
|
||||
|
||||
@ -86,9 +85,8 @@ TEST_P(TaskQueueTest, PostFromQueue) {
|
||||
rtc::Event event;
|
||||
auto queue = CreateTaskQueue(factory, "PostFromQueue");
|
||||
|
||||
queue->PostTask(ToQueuedTask([&event, &queue] {
|
||||
queue->PostTask(ToQueuedTask([&event] { event.Set(); }));
|
||||
}));
|
||||
queue->PostTask(
|
||||
[&event, &queue] { queue->PostTask([&event] { event.Set(); }); });
|
||||
EXPECT_TRUE(event.Wait(1000));
|
||||
}
|
||||
|
||||
@ -99,11 +97,12 @@ TEST_P(TaskQueueTest, PostDelayed) {
|
||||
CreateTaskQueue(factory, "PostDelayed", TaskQueueFactory::Priority::HIGH);
|
||||
|
||||
int64_t start = rtc::TimeMillis();
|
||||
queue->PostDelayedTask(ToQueuedTask([&event, &queue] {
|
||||
EXPECT_TRUE(queue->IsCurrent());
|
||||
event.Set();
|
||||
}),
|
||||
100);
|
||||
queue->PostDelayedTask(
|
||||
[&event, &queue] {
|
||||
EXPECT_TRUE(queue->IsCurrent());
|
||||
event.Set();
|
||||
},
|
||||
TimeDelta::Millis(100));
|
||||
EXPECT_TRUE(event.Wait(1000));
|
||||
int64_t end = rtc::TimeMillis();
|
||||
// These tests are a little relaxed due to how "powerful" our test bots can
|
||||
@ -120,11 +119,12 @@ TEST_P(TaskQueueTest, PostMultipleDelayed) {
|
||||
std::vector<rtc::Event> events(100);
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
rtc::Event* event = &events[i];
|
||||
queue->PostDelayedTask(ToQueuedTask([event, &queue] {
|
||||
EXPECT_TRUE(queue->IsCurrent());
|
||||
event->Set();
|
||||
}),
|
||||
i);
|
||||
queue->PostDelayedTask(
|
||||
[event, &queue] {
|
||||
EXPECT_TRUE(queue->IsCurrent());
|
||||
event->Set();
|
||||
},
|
||||
TimeDelta::Millis(i));
|
||||
}
|
||||
|
||||
for (rtc::Event& e : events)
|
||||
@ -136,8 +136,9 @@ TEST_P(TaskQueueTest, PostDelayedAfterDestruct) {
|
||||
rtc::Event run;
|
||||
rtc::Event deleted;
|
||||
auto queue = CreateTaskQueue(factory, "PostDelayedAfterDestruct");
|
||||
queue->PostDelayedTask(
|
||||
ToQueuedTask([&run] { run.Set(); }, [&deleted] { deleted.Set(); }), 100);
|
||||
absl::Cleanup cleanup = [&deleted] { deleted.Set(); };
|
||||
queue->PostDelayedTask([&run, cleanup = std::move(cleanup)] { run.Set(); },
|
||||
TimeDelta::Millis(100));
|
||||
// Destroy the queue.
|
||||
queue = nullptr;
|
||||
// Task might outlive the TaskQueue, but still should be deleted.
|
||||
@ -153,38 +154,33 @@ TEST_P(TaskQueueTest, PostAndReuse) {
|
||||
|
||||
int call_count = 0;
|
||||
|
||||
class ReusedTask : public QueuedTask {
|
||||
class ReusedTask {
|
||||
public:
|
||||
ReusedTask(int* counter, TaskQueueBase* reply_queue, rtc::Event* event)
|
||||
: counter_(*counter), reply_queue_(reply_queue), event_(*event) {
|
||||
EXPECT_EQ(counter_, 0);
|
||||
}
|
||||
ReusedTask(ReusedTask&&) = default;
|
||||
ReusedTask& operator=(ReusedTask&&) = delete;
|
||||
|
||||
private:
|
||||
bool Run() override {
|
||||
void operator()() && {
|
||||
if (++counter_ == 1) {
|
||||
reply_queue_->PostTask(absl::WrapUnique(this));
|
||||
// At this point, the object is owned by reply_queue_ and it's
|
||||
// theoratically possible that the object has been deleted (e.g. if
|
||||
// posting wasn't possible). So, don't touch any member variables here.
|
||||
|
||||
// Indicate to the current queue that ownership has been transferred.
|
||||
return false;
|
||||
reply_queue_->PostTask(std::move(*this));
|
||||
// At this point, the object is in the moved-from state.
|
||||
} else {
|
||||
EXPECT_EQ(counter_, 2);
|
||||
EXPECT_TRUE(reply_queue_->IsCurrent());
|
||||
event_.Set();
|
||||
return true; // Indicate that the object should be deleted.
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
int& counter_;
|
||||
TaskQueueBase* const reply_queue_;
|
||||
rtc::Event& event_;
|
||||
};
|
||||
|
||||
auto task =
|
||||
std::make_unique<ReusedTask>(&call_count, reply_queue.get(), &event);
|
||||
ReusedTask task(&call_count, reply_queue.get(), &event);
|
||||
post_queue->PostTask(std::move(task));
|
||||
EXPECT_TRUE(event.Wait(1000));
|
||||
}
|
||||
@ -215,16 +211,18 @@ TEST_P(TaskQueueTest, PostALot) {
|
||||
int tasks_executed = 0;
|
||||
auto task_queue = CreateTaskQueue(factory, "PostALot");
|
||||
|
||||
task_queue->PostTask(ToQueuedTask([&] {
|
||||
task_queue->PostTask([&] {
|
||||
// Post tasks from the queue to guarantee that the 1st task won't be
|
||||
// executed before the last one is posted.
|
||||
for (int i = 0; i < kTaskCount; ++i) {
|
||||
task_queue->PostTask(ToQueuedTask(
|
||||
[&] { ++tasks_executed; }, [&] { all_destroyed.DecrementCount(); }));
|
||||
absl::Cleanup cleanup = [&] { all_destroyed.DecrementCount(); };
|
||||
task_queue->PostTask([&tasks_executed, cleanup = std::move(cleanup)] {
|
||||
++tasks_executed;
|
||||
});
|
||||
}
|
||||
|
||||
posting_done.Set();
|
||||
}));
|
||||
});
|
||||
|
||||
// Before destroying the task queue wait until all child tasks are posted.
|
||||
posting_done.Wait(rtc::Event::kForever);
|
||||
@ -257,17 +255,17 @@ TEST_P(TaskQueueTest, PostTwoWithSharedUnprotectedState) {
|
||||
|
||||
auto queue = CreateTaskQueue(factory, "PostTwoWithSharedUnprotectedState");
|
||||
rtc::Event done;
|
||||
queue->PostTask(ToQueuedTask([&state, &queue, &done] {
|
||||
queue->PostTask([&state, &queue, &done] {
|
||||
// Post tasks from queue to guarantee, that 1st task won't be
|
||||
// executed before the second one will be posted.
|
||||
queue->PostTask(ToQueuedTask([&state] { state.state = 1; }));
|
||||
queue->PostTask(ToQueuedTask([&state, &done] {
|
||||
queue->PostTask([&state] { state.state = 1; });
|
||||
queue->PostTask([&state, &done] {
|
||||
EXPECT_EQ(state.state, 1);
|
||||
done.Set();
|
||||
}));
|
||||
});
|
||||
// Check, that state changing tasks didn't start yet.
|
||||
EXPECT_EQ(state.state, 0);
|
||||
}));
|
||||
});
|
||||
EXPECT_TRUE(done.Wait(1000));
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user