diff --git a/BUILD.gn b/BUILD.gn index 936c89dda2..f5b34591d2 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -492,6 +492,7 @@ if (rtc_include_tests) { "rtc_base:sigslot_unittest", "rtc_base:weak_ptr_unittests", "rtc_base/experiments:experiments_unittests", + "rtc_base/task_utils:to_queued_task_unittests", "sdk:sdk_tests", "test/scenario/network:network_emulation_unittests", ] diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index 333c872540..f6bbbc3e70 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -42,8 +42,8 @@ rtc_source_set("task_queue_test") { deps = [ ":task_queue", "../../rtc_base:rtc_event", - "../../rtc_base:rtc_task_queue_api", "../../rtc_base:timeutils", + "../../rtc_base/task_utils:to_queued_task", "../../test:test_support", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", diff --git a/api/task_queue/task_queue_test.cc b/api/task_queue/task_queue_test.cc index b6f04cd8f6..e78dbd232a 100644 --- a/api/task_queue/task_queue_test.cc +++ b/api/task_queue/task_queue_test.cc @@ -12,7 +12,7 @@ #include "absl/memory/memory.h" #include "absl/strings/string_view.h" #include "rtc_base/event.h" -#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" namespace webrtc { @@ -40,7 +40,7 @@ TEST_P(TaskQueueTest, PostAndCheckCurrent) { EXPECT_FALSE(queue->IsCurrent()); EXPECT_FALSE(TaskQueueBase::Current()); - queue->PostTask(rtc::NewClosure([&event, &queue] { + queue->PostTask(ToQueuedTask([&event, &queue] { EXPECT_TRUE(queue->IsCurrent()); event.Set(); })); @@ -74,7 +74,7 @@ TEST_P(TaskQueueTest, PostDelayedZero) { rtc::Event event; auto queue = CreateTaskQueue(factory, "PostDelayedZero"); - queue->PostDelayedTask(rtc::NewClosure([&event] { event.Set(); }), 0); + queue->PostDelayedTask(ToQueuedTask([&event] { event.Set(); }), 0); EXPECT_TRUE(event.Wait(1000)); } @@ -83,8 +83,8 @@ TEST_P(TaskQueueTest, PostFromQueue) { rtc::Event event; auto queue = CreateTaskQueue(factory, "PostFromQueue"); - queue->PostTask(rtc::NewClosure([&event, &queue] { - queue->PostTask(rtc::NewClosure([&event] { event.Set(); })); + queue->PostTask(ToQueuedTask([&event, &queue] { + queue->PostTask(ToQueuedTask([&event] { event.Set(); })); })); EXPECT_TRUE(event.Wait(1000)); } @@ -96,7 +96,7 @@ TEST_P(TaskQueueTest, PostDelayed) { CreateTaskQueue(factory, "PostDelayed", TaskQueueFactory::Priority::HIGH); int64_t start = rtc::TimeMillis(); - queue->PostDelayedTask(rtc::NewClosure([&event, &queue] { + queue->PostDelayedTask(ToQueuedTask([&event, &queue] { EXPECT_TRUE(queue->IsCurrent()); event.Set(); }), @@ -117,7 +117,7 @@ TEST_P(TaskQueueTest, PostMultipleDelayed) { std::vector events(100); for (int i = 0; i < 100; ++i) { rtc::Event* event = &events[i]; - queue->PostDelayedTask(rtc::NewClosure([event, &queue] { + queue->PostDelayedTask(ToQueuedTask([event, &queue] { EXPECT_TRUE(queue->IsCurrent()); event->Set(); }), @@ -134,8 +134,7 @@ TEST_P(TaskQueueTest, PostDelayedAfterDestruct) { rtc::Event deleted; auto queue = CreateTaskQueue(factory, "PostDelayedAfterDestruct"); queue->PostDelayedTask( - rtc::NewClosure([&run] { run.Set(); }, [&deleted] { deleted.Set(); }), - 100); + ToQueuedTask([&run] { run.Set(); }, [&deleted] { deleted.Set(); }), 100); // Destroy the queue. queue = nullptr; // Task might outlive the TaskQueue, but still should be deleted. @@ -206,11 +205,11 @@ TEST_P(TaskQueueTest, PostALot) { // case inside of the libevent queue implementation. queue->PostTask( - rtc::NewClosure([&event] { event.Wait(rtc::Event::kForever); })); + ToQueuedTask([&event] { event.Wait(rtc::Event::kForever); })); for (int i = 0; i < kTaskCount; ++i) queue->PostTask( - rtc::NewClosure([&tasks_executed] { ++tasks_executed; }, - [&tasks_cleaned_up] { ++tasks_cleaned_up; })); + ToQueuedTask([&tasks_executed] { ++tasks_executed; }, + [&tasks_cleaned_up] { ++tasks_cleaned_up; })); event.Set(); // Unblock the first task. } @@ -237,11 +236,11 @@ TEST_P(TaskQueueTest, PostTwoWithSharedUnprotectedState) { auto queue = CreateTaskQueue(factory, "PostTwoWithSharedUnprotectedState"); rtc::Event done; - queue->PostTask(rtc::NewClosure([&state, &queue, &done] { + queue->PostTask(ToQueuedTask([&state, &queue, &done] { // Post tasks from queue to guarantee, that 1st task won't be // executed before the second one will be posted. - queue->PostTask(rtc::NewClosure([&state] { state.state = 1; })); - queue->PostTask(rtc::NewClosure([&state, &done] { + queue->PostTask(ToQueuedTask([&state] { state.state = 1; })); + queue->PostTask(ToQueuedTask([&state, &done] { EXPECT_EQ(state.state, 1); done.Set(); })); diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 6ebb980f29..0759fde15d 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -500,6 +500,7 @@ rtc_source_set("rtc_task_queue_api") { ":macromagic", "../api/task_queue", "system:rtc_export", + "task_utils:to_queued_task", "//third_party/abseil-cpp/absl/memory", ] } diff --git a/rtc_base/task_queue.h b/rtc_base/task_queue.h index 3bc1e1fdc3..ffb7a11589 100644 --- a/rtc_base/task_queue.h +++ b/rtc_base/task_queue.h @@ -13,7 +13,6 @@ #include #include -#include #include #include "absl/memory/memory.h" @@ -22,6 +21,7 @@ #include "api/task_queue/task_queue_factory.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/system/rtc_export.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/thread_annotations.h" namespace rtc { @@ -29,54 +29,11 @@ namespace rtc { // TODO(danilchap): Remove the alias when all of webrtc is updated to use // webrtc::QueuedTask directly. using ::webrtc::QueuedTask; - -// Simple implementation of QueuedTask for use with rtc::Bind and lambdas. -template -class ClosureTask : public QueuedTask { - public: - explicit ClosureTask(Closure&& closure) - : closure_(std::forward(closure)) {} - - private: - bool Run() override { - closure_(); - return true; - } - - typename std::remove_const< - typename std::remove_reference::type>::type closure_; -}; - -// Extends ClosureTask to also allow specifying cleanup code. -// This is useful when using lambdas if guaranteeing cleanup, even if a task -// was dropped (queue is too full), is required. -template -class ClosureTaskWithCleanup : public ClosureTask { - public: - ClosureTaskWithCleanup(Closure&& closure, Cleanup&& cleanup) - : ClosureTask(std::forward(closure)), - cleanup_(std::forward(cleanup)) {} - ~ClosureTaskWithCleanup() { cleanup_(); } - - private: - typename std::remove_const< - typename std::remove_reference::type>::type cleanup_; -}; - -// Convenience function to construct closures that can be passed directly -// to methods that support std::unique_ptr but not template -// based parameters. -template -static std::unique_ptr NewClosure(Closure&& closure) { - return absl::make_unique>( - std::forward(closure)); -} - -template -static std::unique_ptr NewClosure(Closure&& closure, - Cleanup&& cleanup) { - return absl::make_unique>( - std::forward(closure), std::forward(cleanup)); +// TODO(danilchap): Remove the alias when all of webrtc is updated to use +// webrtc::ToQueuedTask directly. +template +std::unique_ptr NewClosure(Args&&... args) { + return webrtc::ToQueuedTask(std::forward(args)...); } // Implements a task queue that asynchronously executes tasks in a way that diff --git a/rtc_base/task_queue_unittest.cc b/rtc_base/task_queue_unittest.cc index 04861f348e..5c80c4fd54 100644 --- a/rtc_base/task_queue_unittest.cc +++ b/rtc_base/task_queue_unittest.cc @@ -77,112 +77,4 @@ TEST(TaskQueueTest, DISABLED_PostDelayedHighRes) { EXPECT_NEAR(end - start, 3, 3u); } -// TODO(danilchap): Reshape and rename tests below to show they are verifying -// rtc::NewClosure helper rather than TaskQueue implementation. -TEST(TaskQueueTest, PostLambda) { - TaskQueue queue("PostLambda"); - Event ran; - queue.PostTask([&ran] { ran.Set(); }); - EXPECT_TRUE(ran.Wait(1000)); -} - -TEST(TaskQueueTest, PostCopyableClosure) { - struct CopyableClosure { - CopyableClosure(int* num_copies, int* num_moves, Event* event) - : num_copies(num_copies), num_moves(num_moves), event(event) {} - CopyableClosure(const CopyableClosure& other) - : num_copies(other.num_copies), - num_moves(other.num_moves), - event(other.event) { - ++*num_copies; - } - CopyableClosure(CopyableClosure&& other) - : num_copies(other.num_copies), - num_moves(other.num_moves), - event(other.event) { - ++*num_moves; - } - void operator()() { event->Set(); } - - int* num_copies; - int* num_moves; - Event* event; - }; - - int num_copies = 0; - int num_moves = 0; - Event event; - - static const char kPostQueue[] = "PostCopyableClosure"; - TaskQueue post_queue(kPostQueue); - { - CopyableClosure closure(&num_copies, &num_moves, &event); - post_queue.PostTask(closure); - // Destroy closure to check with msan and tsan posted task has own copy. - } - - EXPECT_TRUE(event.Wait(1000)); - EXPECT_EQ(num_copies, 1); - EXPECT_EQ(num_moves, 0); -} - -TEST(TaskQueueTest, PostMoveOnlyClosure) { - struct SomeState { - explicit SomeState(Event* event) : event(event) {} - ~SomeState() { event->Set(); } - Event* event; - }; - struct MoveOnlyClosure { - MoveOnlyClosure(int* num_moves, std::unique_ptr state) - : num_moves(num_moves), state(std::move(state)) {} - MoveOnlyClosure(const MoveOnlyClosure&) = delete; - MoveOnlyClosure(MoveOnlyClosure&& other) - : num_moves(other.num_moves), state(std::move(other.state)) { - ++*num_moves; - } - void operator()() { state.reset(); } - - int* num_moves; - std::unique_ptr state; - }; - - int num_moves = 0; - Event event; - std::unique_ptr state(new SomeState(&event)); - - static const char kPostQueue[] = "PostMoveOnlyClosure"; - TaskQueue post_queue(kPostQueue); - post_queue.PostTask(MoveOnlyClosure(&num_moves, std::move(state))); - - EXPECT_TRUE(event.Wait(1000)); - EXPECT_EQ(num_moves, 1); -} - -TEST(TaskQueueTest, PostMoveOnlyCleanup) { - struct SomeState { - explicit SomeState(Event* event) : event(event) {} - ~SomeState() { event->Set(); } - Event* event; - }; - struct MoveOnlyClosure { - void operator()() { state.reset(); } - - std::unique_ptr state; - }; - - Event event_run; - Event event_cleanup; - std::unique_ptr state_run(new SomeState(&event_run)); - std::unique_ptr state_cleanup(new SomeState(&event_cleanup)); - - static const char kPostQueue[] = "PostMoveOnlyCleanup"; - TaskQueue post_queue(kPostQueue); - post_queue.PostTask(NewClosure(MoveOnlyClosure{std::move(state_run)}, - MoveOnlyClosure{std::move(state_cleanup)})); - - EXPECT_TRUE(event_cleanup.Wait(1000)); - // Expect run closure to complete before cleanup closure. - EXPECT_TRUE(event_run.Wait(0)); -} - } // namespace rtc diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index 860bc84a32..126bff8b65 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -25,6 +25,16 @@ rtc_source_set("repeating_task") { ] } +rtc_source_set("to_queued_task") { + sources = [ + "to_queued_task.h", + ] + deps = [ + "../../api/task_queue", + "//third_party/abseil-cpp/absl/memory", + ] +} + if (rtc_include_tests) { rtc_source_set("repeating_task_unittests") { testonly = true @@ -38,4 +48,17 @@ if (rtc_include_tests) { "//third_party/abseil-cpp/absl/memory", ] } + + rtc_source_set("to_queued_task_unittests") { + testonly = true + sources = [ + "to_queued_task_unittest.cc", + ] + deps = [ + ":to_queued_task", + "../../api/task_queue", + "../../test:test_support", + "//third_party/abseil-cpp/absl/memory", + ] + } } diff --git a/rtc_base/task_utils/to_queued_task.h b/rtc_base/task_utils/to_queued_task.h new file mode 100644 index 0000000000..5088af91ee --- /dev/null +++ b/rtc_base/task_utils/to_queued_task.h @@ -0,0 +1,73 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_UTILS_TO_QUEUED_TASK_H_ +#define RTC_BASE_TASK_UTILS_TO_QUEUED_TASK_H_ + +#include +#include +#include + +#include "absl/memory/memory.h" +#include "api/task_queue/queued_task.h" + +namespace webrtc { +namespace webrtc_new_closure_impl { +// Simple implementation of QueuedTask for use with rtc::Bind and lambdas. +template +class ClosureTask : public QueuedTask { + public: + explicit ClosureTask(Closure&& closure) + : closure_(std::forward(closure)) {} + + private: + bool Run() override { + closure_(); + return true; + } + + typename std::decay::type closure_; +}; + +// Extends ClosureTask to also allow specifying cleanup code. +// This is useful when using lambdas if guaranteeing cleanup, even if a task +// was dropped (queue is too full), is required. +template +class ClosureTaskWithCleanup : public ClosureTask { + public: + ClosureTaskWithCleanup(Closure&& closure, Cleanup&& cleanup) + : ClosureTask(std::forward(closure)), + cleanup_(std::forward(cleanup)) {} + ~ClosureTaskWithCleanup() override { cleanup_(); } + + private: + typename std::decay::type cleanup_; +}; +} // namespace webrtc_new_closure_impl + +// Convenience function to construct closures that can be passed directly +// to methods that support std::unique_ptr but not template +// based parameters. +template +std::unique_ptr ToQueuedTask(Closure&& closure) { + return absl::make_unique>( + std::forward(closure)); +} + +template +std::unique_ptr ToQueuedTask(Closure&& closure, Cleanup&& cleanup) { + return absl::make_unique< + webrtc_new_closure_impl::ClosureTaskWithCleanup>( + std::forward(closure), std::forward(cleanup)); +} + +} // namespace webrtc + +#endif // RTC_BASE_TASK_UTILS_TO_QUEUED_TASK_H_ diff --git a/rtc_base/task_utils/to_queued_task_unittest.cc b/rtc_base/task_utils/to_queued_task_unittest.cc new file mode 100644 index 0000000000..45dec773fc --- /dev/null +++ b/rtc_base/task_utils/to_queued_task_unittest.cc @@ -0,0 +1,130 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/task_utils/to_queued_task.h" + +#include + +#include "absl/memory/memory.h" +#include "api/task_queue/queued_task.h" +#include "test/gmock.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { + +using ::testing::InSequence; +using ::testing::MockFunction; + +void RunTask(std::unique_ptr task) { + // Simulate how task queue suppose to run tasks. + QueuedTask* raw = task.release(); + if (raw->Run()) + delete raw; +} + +TEST(ToQueuedTaskTest, AcceptsLambda) { + bool run = false; + std::unique_ptr task = ToQueuedTask([&run] { run = true; }); + EXPECT_FALSE(run); + RunTask(std::move(task)); + EXPECT_TRUE(run); +} + +TEST(ToQueuedTaskTest, AcceptsCopyableClosure) { + struct CopyableClosure { + CopyableClosure(int* num_copies, int* num_moves, int* num_runs) + : num_copies(num_copies), num_moves(num_moves), num_runs(num_runs) {} + CopyableClosure(const CopyableClosure& other) + : num_copies(other.num_copies), + num_moves(other.num_moves), + num_runs(other.num_runs) { + ++*num_copies; + } + CopyableClosure(CopyableClosure&& other) + : num_copies(other.num_copies), + num_moves(other.num_moves), + num_runs(other.num_runs) { + ++*num_moves; + } + void operator()() { ++*num_runs; } + + int* num_copies; + int* num_moves; + int* num_runs; + }; + + int num_copies = 0; + int num_moves = 0; + int num_runs = 0; + + std::unique_ptr task; + { + CopyableClosure closure(&num_copies, &num_moves, &num_runs); + task = ToQueuedTask(closure); + // Destroy closure to check with msan task has own copy. + } + EXPECT_EQ(num_copies, 1); + EXPECT_EQ(num_runs, 0); + RunTask(std::move(task)); + EXPECT_EQ(num_copies, 1); + EXPECT_EQ(num_moves, 0); + EXPECT_EQ(num_runs, 1); +} + +TEST(ToQueuedTaskTest, AcceptsMoveOnlyClosure) { + struct MoveOnlyClosure { + MoveOnlyClosure(int* num_moves, std::function trigger) + : num_moves(num_moves), trigger(std::move(trigger)) {} + MoveOnlyClosure(const MoveOnlyClosure&) = delete; + MoveOnlyClosure(MoveOnlyClosure&& other) + : num_moves(other.num_moves), trigger(std::move(other.trigger)) { + ++*num_moves; + } + void operator()() { trigger(); } + + int* num_moves; + std::function trigger; + }; + + int num_moves = 0; + MockFunction run; + + auto task = ToQueuedTask(MoveOnlyClosure(&num_moves, run.AsStdFunction())); + EXPECT_EQ(num_moves, 1); + EXPECT_CALL(run, Call); + RunTask(std::move(task)); + EXPECT_EQ(num_moves, 1); +} + +TEST(ToQueuedTaskTest, AcceptsMoveOnlyCleanup) { + struct MoveOnlyClosure { + MoveOnlyClosure(const MoveOnlyClosure&) = delete; + MoveOnlyClosure(MoveOnlyClosure&&) = default; + void operator()() { trigger(); } + + std::function trigger; + }; + + MockFunction run; + MockFunction cleanup; + + auto task = ToQueuedTask(MoveOnlyClosure{run.AsStdFunction()}, + MoveOnlyClosure{cleanup.AsStdFunction()}); + + // Expect run closure to complete before cleanup closure. + InSequence in_sequence; + EXPECT_CALL(run, Call); + EXPECT_CALL(cleanup, Call); + RunTask(std::move(task)); +} + +} // namespace +} // namespace webrtc