diff --git a/test/BUILD.gn b/test/BUILD.gn index 7eae9b9817..8c1d25f3d9 100644 --- a/test/BUILD.gn +++ b/test/BUILD.gn @@ -381,6 +381,7 @@ if (rtc_include_tests) { "../api:create_simulcast_test_fixture_api", "../api:scoped_refptr", "../api:simulcast_test_fixture_api", + "../api/task_queue:task_queue_test", "../api/test/video:function_video_factory", "../api/video:builtin_video_bitrate_allocator_factory", "../api/video:video_frame", @@ -622,9 +623,11 @@ rtc_source_set("single_threaded_task_queue") { "single_threaded_task_queue.h", ] deps = [ + "../api/task_queue", "../rtc_base:checks", "../rtc_base:deprecation", "../rtc_base:rtc_base_approved", + "../rtc_base/task_utils:to_queued_task", ] } diff --git a/test/single_threaded_task_queue.cc b/test/single_threaded_task_queue.cc index 24b9038ede..9fbb24ac88 100644 --- a/test/single_threaded_task_queue.cc +++ b/test/single_threaded_task_queue.cc @@ -20,15 +20,12 @@ namespace webrtc { namespace test { -DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::QueuedTask( +DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask( DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id, - int64_t earliest_execution_time, - DEPRECATED_SingleThreadedTaskQueueForTesting::Task task) - : task_id(task_id), - earliest_execution_time(earliest_execution_time), - task(task) {} + std::unique_ptr task) + : task_id(task_id), task(std::move(task)) {} -DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::~QueuedTask() = +DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() = default; DEPRECATED_SingleThreadedTaskQueueForTesting:: @@ -43,13 +40,8 @@ DEPRECATED_SingleThreadedTaskQueueForTesting:: } DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId -DEPRECATED_SingleThreadedTaskQueueForTesting::PostTask(Task task) { - return PostDelayedTask(task, 0); -} - -DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId -DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayedTask( - Task task, +DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed( + std::unique_ptr task, int64_t delay_ms) { int64_t earliest_exec_time = rtc::TimeAfter(delay_ms); @@ -60,13 +52,11 @@ DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayedTask( TaskId id = next_task_id_++; // Insert after any other tasks with an earlier-or-equal target time. - auto it = tasks_.begin(); - for (; it != tasks_.end(); it++) { - if (earliest_exec_time < (*it)->earliest_execution_time) { - break; - } - } - tasks_.insert(it, std::make_unique(id, earliest_exec_time, task)); + // Note: multimap has promise "The order of the key-value pairs whose keys + // compare equivalent is the order of insertion and does not change." + tasks_.emplace(std::piecewise_construct, + std::forward_as_tuple(earliest_exec_time), + std::forward_as_tuple(id, std::move(task))); // This class is optimized for simplicty, not for performance. This will wake // the thread up even if the next task in the queue is only scheduled for @@ -93,7 +83,7 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::SendTask(Task task) { bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) { rtc::CritScope lock(&cs_); for (auto it = tasks_.begin(); it != tasks_.end(); it++) { - if ((*it)->task_id == task_id) { + if (it->second.task_id == task_id) { tasks_.erase(it); return true; } @@ -136,6 +126,7 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::Run(void* obj) { } void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { + CurrentTaskQueueSetter set_current(this); while (true) { std::unique_ptr queued_task; @@ -151,11 +142,13 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { return; } if (!tasks_.empty()) { - int64_t remaining_delay_ms = rtc::TimeDiff( - tasks_.front()->earliest_execution_time, rtc::TimeMillis()); + auto next_delayed_task = tasks_.begin(); + int64_t earliest_exec_time = next_delayed_task->first; + int64_t remaining_delay_ms = + rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis()); if (remaining_delay_ms <= 0) { - queued_task = std::move(tasks_.front()); - tasks_.pop_front(); + queued_task = std::move(next_delayed_task->second.task); + tasks_.erase(next_delayed_task); } else { wait_time = rtc::saturated_cast(remaining_delay_ms); } @@ -163,12 +156,19 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { } if (queued_task) { - queued_task->task(); + if (!queued_task->Run()) { + queued_task.release(); + } } else { wake_up_.Wait(wait_time); } } } +void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() { + Stop(); + delete this; +} + } // namespace test } // namespace webrtc diff --git a/test/single_threaded_task_queue.h b/test/single_threaded_task_queue.h index 00126730fb..52316c66e9 100644 --- a/test/single_threaded_task_queue.h +++ b/test/single_threaded_task_queue.h @@ -11,13 +11,15 @@ #define TEST_SINGLE_THREADED_TASK_QUEUE_H_ #include -#include +#include #include +#include "api/task_queue/task_queue_base.h" #include "rtc_base/critical_section.h" #include "rtc_base/deprecation.h" #include "rtc_base/event.h" #include "rtc_base/platform_thread.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/thread_checker.h" namespace webrtc { @@ -33,25 +35,29 @@ namespace test { // resemble that of real WebRTC, thereby allowing us to replace some critical // sections by thread-checkers. // This task is NOT tuned for performance, but rather for simplicity. -class DEPRECATED_SingleThreadedTaskQueueForTesting { +class DEPRECATED_SingleThreadedTaskQueueForTesting : public TaskQueueBase { public: using Task = std::function; using TaskId = size_t; constexpr static TaskId kInvalidTaskId = static_cast(-1); explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name); - ~DEPRECATED_SingleThreadedTaskQueueForTesting(); + ~DEPRECATED_SingleThreadedTaskQueueForTesting() override; // Sends one task to the task-queue, and returns a handle by which the // task can be cancelled. // This mimics the behavior of TaskQueue, but only for lambdas, rather than // for both lambdas and QueuedTask objects. - TaskId PostTask(Task task); + TaskId PostTask(Task task) { + return PostDelayed(ToQueuedTask(std::move(task)), /*delay_ms=*/0); + } // Same as PostTask(), but ensures that the task will not begin execution // less than |delay_ms| milliseconds after being posted; an upper bound // is not provided. - TaskId PostDelayedTask(Task task, int64_t delay_ms); + TaskId PostDelayedTask(Task task, int64_t delay_ms) { + return PostDelayed(ToQueuedTask(std::move(task)), delay_ms); + } // Send one task to the queue. The function does not return until the task // has finished executing. No support for canceling the task. @@ -72,22 +78,36 @@ class DEPRECATED_SingleThreadedTaskQueueForTesting { void Stop(); + // Implements TaskQueueBase. + void Delete() override; + + void PostTask(std::unique_ptr task) override { + PostDelayed(std::move(task), /*delay_ms=*/0); + } + + void PostDelayedTask(std::unique_ptr task, + uint32_t delay_ms) override { + PostDelayed(std::move(task), delay_ms); + } + private: - struct QueuedTask { - QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task); - ~QueuedTask(); + struct StoredTask { + StoredTask(TaskId task_id, std::unique_ptr task); + ~StoredTask(); TaskId task_id; - int64_t earliest_execution_time; - Task task; + std::unique_ptr task; }; + TaskId PostDelayed(std::unique_ptr task, int64_t delay_ms); + static void Run(void* obj); void RunLoop(); rtc::CriticalSection cs_; - std::list> tasks_ RTC_GUARDED_BY(cs_); + // Tasks are ordered by earliest execution time. + std::multimap tasks_ RTC_GUARDED_BY(cs_); rtc::ThreadChecker owner_thread_checker_; rtc::PlatformThread thread_; bool running_ RTC_GUARDED_BY(cs_); diff --git a/test/single_threaded_task_queue_unittest.cc b/test/single_threaded_task_queue_unittest.cc index b945bc0d98..dedc78b6b1 100644 --- a/test/single_threaded_task_queue_unittest.cc +++ b/test/single_threaded_task_queue_unittest.cc @@ -14,6 +14,7 @@ #include #include +#include "api/task_queue/task_queue_test.h" #include "rtc_base/event.h" #include "test/gtest.h" @@ -352,6 +353,22 @@ TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, EXPECT_LT(counter, tasks); } +class SingleThreadedTaskQueueForTestingFactory : public TaskQueueFactory { + public: + std::unique_ptr CreateTaskQueue( + absl::string_view /* name */, + Priority /*priority*/) const override { + return std::unique_ptr( + new DEPRECATED_SingleThreadedTaskQueueForTesting("noname")); + } +}; + +INSTANTIATE_TEST_SUITE_P( + DeprecatedSingleThreadedTaskQueueForTesting, + TaskQueueTest, + ::testing::Values( + std::make_unique)); + } // namespace } // namespace test } // namespace webrtc