From 71037a8e99a1f9167a7eb28e392ef0a1a274549f Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Wed, 25 Sep 2019 17:21:52 +0200 Subject: [PATCH] Implement TaskQueueBase interface by SingleThreadedTaskQueueForTesting that allows to use SingleThreadedTaskQueueForTesting as regular TaskQueue. which allows components that currently depend on SingleThreadedTaskQueueForTesting to depend on TaskQueueBase interface instead. Those updates can be done one-by-one and in the end would allow to stop using SingleThreadedTaskQueueForTesting in favor of other TaskQueue implementations. Bug: webrtc:10933 Change-Id: I3e642c88c968012588b9d9c09918340f37bbedbd Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/154352 Commit-Queue: Danil Chapovalov Reviewed-by: Elad Alon Reviewed-by: Yves Gerey Reviewed-by: Sebastian Jansson Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#29307} --- test/BUILD.gn | 3 ++ test/single_threaded_task_queue.cc | 54 ++++++++++----------- test/single_threaded_task_queue.h | 42 +++++++++++----- test/single_threaded_task_queue_unittest.cc | 17 +++++++ 4 files changed, 78 insertions(+), 38 deletions(-) diff --git a/test/BUILD.gn b/test/BUILD.gn index 7eae9b9817..8c1d25f3d9 100644 --- a/test/BUILD.gn +++ b/test/BUILD.gn @@ -381,6 +381,7 @@ if (rtc_include_tests) { "../api:create_simulcast_test_fixture_api", "../api:scoped_refptr", "../api:simulcast_test_fixture_api", + "../api/task_queue:task_queue_test", "../api/test/video:function_video_factory", "../api/video:builtin_video_bitrate_allocator_factory", "../api/video:video_frame", @@ -622,9 +623,11 @@ rtc_source_set("single_threaded_task_queue") { "single_threaded_task_queue.h", ] deps = [ + "../api/task_queue", "../rtc_base:checks", "../rtc_base:deprecation", "../rtc_base:rtc_base_approved", + "../rtc_base/task_utils:to_queued_task", ] } diff --git a/test/single_threaded_task_queue.cc b/test/single_threaded_task_queue.cc index 24b9038ede..9fbb24ac88 100644 --- a/test/single_threaded_task_queue.cc +++ b/test/single_threaded_task_queue.cc @@ -20,15 +20,12 @@ namespace webrtc { namespace test { -DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::QueuedTask( +DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask( DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id, - int64_t earliest_execution_time, - DEPRECATED_SingleThreadedTaskQueueForTesting::Task task) - : task_id(task_id), - earliest_execution_time(earliest_execution_time), - task(task) {} + std::unique_ptr task) + : task_id(task_id), task(std::move(task)) {} -DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::~QueuedTask() = +DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() = default; DEPRECATED_SingleThreadedTaskQueueForTesting:: @@ -43,13 +40,8 @@ DEPRECATED_SingleThreadedTaskQueueForTesting:: } DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId -DEPRECATED_SingleThreadedTaskQueueForTesting::PostTask(Task task) { - return PostDelayedTask(task, 0); -} - -DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId -DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayedTask( - Task task, +DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed( + std::unique_ptr task, int64_t delay_ms) { int64_t earliest_exec_time = rtc::TimeAfter(delay_ms); @@ -60,13 +52,11 @@ DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayedTask( TaskId id = next_task_id_++; // Insert after any other tasks with an earlier-or-equal target time. - auto it = tasks_.begin(); - for (; it != tasks_.end(); it++) { - if (earliest_exec_time < (*it)->earliest_execution_time) { - break; - } - } - tasks_.insert(it, std::make_unique(id, earliest_exec_time, task)); + // Note: multimap has promise "The order of the key-value pairs whose keys + // compare equivalent is the order of insertion and does not change." + tasks_.emplace(std::piecewise_construct, + std::forward_as_tuple(earliest_exec_time), + std::forward_as_tuple(id, std::move(task))); // This class is optimized for simplicty, not for performance. This will wake // the thread up even if the next task in the queue is only scheduled for @@ -93,7 +83,7 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::SendTask(Task task) { bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) { rtc::CritScope lock(&cs_); for (auto it = tasks_.begin(); it != tasks_.end(); it++) { - if ((*it)->task_id == task_id) { + if (it->second.task_id == task_id) { tasks_.erase(it); return true; } @@ -136,6 +126,7 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::Run(void* obj) { } void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { + CurrentTaskQueueSetter set_current(this); while (true) { std::unique_ptr queued_task; @@ -151,11 +142,13 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { return; } if (!tasks_.empty()) { - int64_t remaining_delay_ms = rtc::TimeDiff( - tasks_.front()->earliest_execution_time, rtc::TimeMillis()); + auto next_delayed_task = tasks_.begin(); + int64_t earliest_exec_time = next_delayed_task->first; + int64_t remaining_delay_ms = + rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis()); if (remaining_delay_ms <= 0) { - queued_task = std::move(tasks_.front()); - tasks_.pop_front(); + queued_task = std::move(next_delayed_task->second.task); + tasks_.erase(next_delayed_task); } else { wait_time = rtc::saturated_cast(remaining_delay_ms); } @@ -163,12 +156,19 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { } if (queued_task) { - queued_task->task(); + if (!queued_task->Run()) { + queued_task.release(); + } } else { wake_up_.Wait(wait_time); } } } +void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() { + Stop(); + delete this; +} + } // namespace test } // namespace webrtc diff --git a/test/single_threaded_task_queue.h b/test/single_threaded_task_queue.h index 00126730fb..52316c66e9 100644 --- a/test/single_threaded_task_queue.h +++ b/test/single_threaded_task_queue.h @@ -11,13 +11,15 @@ #define TEST_SINGLE_THREADED_TASK_QUEUE_H_ #include -#include +#include #include +#include "api/task_queue/task_queue_base.h" #include "rtc_base/critical_section.h" #include "rtc_base/deprecation.h" #include "rtc_base/event.h" #include "rtc_base/platform_thread.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/thread_checker.h" namespace webrtc { @@ -33,25 +35,29 @@ namespace test { // resemble that of real WebRTC, thereby allowing us to replace some critical // sections by thread-checkers. // This task is NOT tuned for performance, but rather for simplicity. -class DEPRECATED_SingleThreadedTaskQueueForTesting { +class DEPRECATED_SingleThreadedTaskQueueForTesting : public TaskQueueBase { public: using Task = std::function; using TaskId = size_t; constexpr static TaskId kInvalidTaskId = static_cast(-1); explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name); - ~DEPRECATED_SingleThreadedTaskQueueForTesting(); + ~DEPRECATED_SingleThreadedTaskQueueForTesting() override; // Sends one task to the task-queue, and returns a handle by which the // task can be cancelled. // This mimics the behavior of TaskQueue, but only for lambdas, rather than // for both lambdas and QueuedTask objects. - TaskId PostTask(Task task); + TaskId PostTask(Task task) { + return PostDelayed(ToQueuedTask(std::move(task)), /*delay_ms=*/0); + } // Same as PostTask(), but ensures that the task will not begin execution // less than |delay_ms| milliseconds after being posted; an upper bound // is not provided. - TaskId PostDelayedTask(Task task, int64_t delay_ms); + TaskId PostDelayedTask(Task task, int64_t delay_ms) { + return PostDelayed(ToQueuedTask(std::move(task)), delay_ms); + } // Send one task to the queue. The function does not return until the task // has finished executing. No support for canceling the task. @@ -72,22 +78,36 @@ class DEPRECATED_SingleThreadedTaskQueueForTesting { void Stop(); + // Implements TaskQueueBase. + void Delete() override; + + void PostTask(std::unique_ptr task) override { + PostDelayed(std::move(task), /*delay_ms=*/0); + } + + void PostDelayedTask(std::unique_ptr task, + uint32_t delay_ms) override { + PostDelayed(std::move(task), delay_ms); + } + private: - struct QueuedTask { - QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task); - ~QueuedTask(); + struct StoredTask { + StoredTask(TaskId task_id, std::unique_ptr task); + ~StoredTask(); TaskId task_id; - int64_t earliest_execution_time; - Task task; + std::unique_ptr task; }; + TaskId PostDelayed(std::unique_ptr task, int64_t delay_ms); + static void Run(void* obj); void RunLoop(); rtc::CriticalSection cs_; - std::list> tasks_ RTC_GUARDED_BY(cs_); + // Tasks are ordered by earliest execution time. + std::multimap tasks_ RTC_GUARDED_BY(cs_); rtc::ThreadChecker owner_thread_checker_; rtc::PlatformThread thread_; bool running_ RTC_GUARDED_BY(cs_); diff --git a/test/single_threaded_task_queue_unittest.cc b/test/single_threaded_task_queue_unittest.cc index b945bc0d98..dedc78b6b1 100644 --- a/test/single_threaded_task_queue_unittest.cc +++ b/test/single_threaded_task_queue_unittest.cc @@ -14,6 +14,7 @@ #include #include +#include "api/task_queue/task_queue_test.h" #include "rtc_base/event.h" #include "test/gtest.h" @@ -352,6 +353,22 @@ TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, EXPECT_LT(counter, tasks); } +class SingleThreadedTaskQueueForTestingFactory : public TaskQueueFactory { + public: + std::unique_ptr CreateTaskQueue( + absl::string_view /* name */, + Priority /*priority*/) const override { + return std::unique_ptr( + new DEPRECATED_SingleThreadedTaskQueueForTesting("noname")); + } +}; + +INSTANTIATE_TEST_SUITE_P( + DeprecatedSingleThreadedTaskQueueForTesting, + TaskQueueTest, + ::testing::Values( + std::make_unique)); + } // namespace } // namespace test } // namespace webrtc