From 17b92cb219fe25ea19d23ded9d198d7774551d8e Mon Sep 17 00:00:00 2001 From: Honghai Zhang Date: Thu, 7 Nov 2019 22:58:49 +0000 Subject: [PATCH] Revert "Stop using and delete DEPRECATED_SingleThreadedTaskQueueForTesting" This reverts commit b1c1f6907fec2d18ae8b00ebc44975cb46a95b11. Reason for revert: It may be the cause of iOS64 Debug flakyness. Original change's description: > Stop using and delete DEPRECATED_SingleThreadedTaskQueueForTesting > > Bug: webrtc:10933 > Change-Id: I8307e2aad06d3f3f367af122e43ecc088b52f2d6 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/157896 > Commit-Queue: Danil Chapovalov > Reviewed-by: Sebastian Jansson > Cr-Commit-Position: refs/heads/master@{#29713} TBR=danilchap@webrtc.org,srte@webrtc.org # Not skipping CQ checks because original CL landed > 1 day ago. Bug: webrtc:10933 Change-Id: I94c86ebbae414a7569f253d199efbde6ac4c3765 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/159101 Reviewed-by: Honghai Zhang Commit-Queue: Honghai Zhang Cr-Commit-Position: refs/heads/master@{#29737} --- test/BUILD.gn | 19 + test/call_test.cc | 12 +- test/call_test.h | 5 +- test/single_threaded_task_queue.cc | 161 +++++++++ test/single_threaded_task_queue.h | 135 +++++++ test/single_threaded_task_queue_unittest.cc | 375 ++++++++++++++++++++ 6 files changed, 698 insertions(+), 9 deletions(-) create mode 100644 test/single_threaded_task_queue.cc create mode 100644 test/single_threaded_task_queue.h create mode 100644 test/single_threaded_task_queue_unittest.cc diff --git a/test/BUILD.gn b/test/BUILD.gn index f4c1fc67dd..49c76b0729 100644 --- a/test/BUILD.gn +++ b/test/BUILD.gn @@ -389,6 +389,7 @@ if (rtc_include_tests) { "../rtc_base:rtc_base_approved", "../rtc_base:task_queue_for_test", "../rtc_base/system:file_wrapper", + "../test:single_threaded_task_queue", "pc/e2e:e2e_unittests", "peer_scenario/tests", "scenario:scenario_unittests", @@ -407,6 +408,7 @@ if (rtc_include_tests) { "frame_generator_unittest.cc", "rtp_file_reader_unittest.cc", "rtp_file_writer_unittest.cc", + "single_threaded_task_queue_unittest.cc", "testsupport/perf_test_unittest.cc", "testsupport/test_artifacts_unittest.cc", "testsupport/video_frame_writer_unittest.cc", @@ -625,6 +627,22 @@ rtc_library("direct_transport") { ] } +rtc_library("single_threaded_task_queue") { + testonly = true + sources = [ + "single_threaded_task_queue.cc", + "single_threaded_task_queue.h", + ] + deps = [ + "../api/task_queue", + "../rtc_base:checks", + "../rtc_base:deprecation", + "../rtc_base:rtc_base_approved", + "../rtc_base:task_queue_for_test", + "../rtc_base/task_utils:to_queued_task", + ] +} + rtc_library("fake_video_codecs") { allow_poison = [ "software_video_codecs" ] visibility = [ "*" ] @@ -730,6 +748,7 @@ rtc_library("test_common") { ":fake_video_codecs", ":fileutils", ":rtp_test_utils", + ":single_threaded_task_queue", ":test_support", ":video_test_common", "../api:rtp_headers", diff --git a/test/call_test.cc b/test/call_test.cc index 9f26cc679f..d83f87a8c6 100644 --- a/test/call_test.cc +++ b/test/call_test.cc @@ -56,9 +56,7 @@ CallTest::CallTest() num_flexfec_streams_(0), audio_decoder_factory_(CreateBuiltinAudioDecoderFactory()), audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()), - task_queue_(task_queue_factory_->CreateTaskQueue( - "CallTestTaskQueue", - TaskQueueFactory::Priority::NORMAL)) {} + task_queue_("CallTestTaskQueue") {} CallTest::~CallTest() = default; @@ -86,7 +84,7 @@ void CallTest::RegisterRtpExtension(const RtpExtension& extension) { } void CallTest::RunBaseTest(BaseTest* test) { - SendTask(RTC_FROM_HERE, task_queue(), [this, test]() { + SendTask(RTC_FROM_HERE, &task_queue_, [this, test]() { num_video_streams_ = test->GetNumVideoStreams(); num_audio_streams_ = test->GetNumAudioStreams(); num_flexfec_streams_ = test->GetNumFlexfecStreams(); @@ -125,9 +123,9 @@ void CallTest::RunBaseTest(BaseTest* test) { CreateReceiverCall(recv_config); } test->OnCallsCreated(sender_call_.get(), receiver_call_.get()); - receive_transport_ = test->CreateReceiveTransport(task_queue()); + receive_transport_ = test->CreateReceiveTransport(&task_queue_); send_transport_ = - test->CreateSendTransport(task_queue(), sender_call_.get()); + test->CreateSendTransport(&task_queue_, sender_call_.get()); if (test->ShouldCreateReceivers()) { send_transport_->SetReceiver(receiver_call_->Receiver()); @@ -186,7 +184,7 @@ void CallTest::RunBaseTest(BaseTest* test) { test->PerformTest(); - SendTask(RTC_FROM_HERE, task_queue(), [this, test]() { + SendTask(RTC_FROM_HERE, &task_queue_, [this, test]() { Stop(); test->OnStreamsStopped(); DestroyStreams(); diff --git a/test/call_test.h b/test/call_test.h index ba9740df98..6224a6eb45 100644 --- a/test/call_test.h +++ b/test/call_test.h @@ -30,6 +30,7 @@ #include "test/fake_vp8_encoder.h" #include "test/frame_generator_capturer.h" #include "test/rtp_rtcp_observer.h" +#include "test/single_threaded_task_queue.h" namespace webrtc { namespace test { @@ -173,7 +174,7 @@ class CallTest : public ::testing::Test { void SetVideoEncoderConfig(const VideoEncoderConfig& config); VideoSendStream* GetVideoSendStream(); FlexfecReceiveStream::Config* GetFlexFecConfig(); - TaskQueueBase* task_queue() { return task_queue_.get(); } + TaskQueueBase* task_queue() { return &task_queue_; } Clock* const clock_; @@ -229,7 +230,7 @@ class CallTest : public ::testing::Test { void AddRtpExtensionByUri(const std::string& uri, std::vector* extensions) const; - std::unique_ptr task_queue_; + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue_; std::vector rtp_extensions_; rtc::scoped_refptr apm_send_; rtc::scoped_refptr apm_recv_; diff --git a/test/single_threaded_task_queue.cc b/test/single_threaded_task_queue.cc new file mode 100644 index 0000000000..c3aac1c7ba --- /dev/null +++ b/test/single_threaded_task_queue.cc @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "test/single_threaded_task_queue.h" + +#include +#include + +#include "rtc_base/checks.h" +#include "rtc_base/numerics/safe_conversions.h" +#include "rtc_base/time_utils.h" + +namespace webrtc { +namespace test { + +DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask( + DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id, + std::unique_ptr task) + : task_id(task_id), task(std::move(task)) {} + +DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() = + default; + +DEPRECATED_SingleThreadedTaskQueueForTesting:: + DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name) + : thread_(Run, this, name), running_(true), next_task_id_(0) { + thread_.Start(); +} + +DEPRECATED_SingleThreadedTaskQueueForTesting:: + ~DEPRECATED_SingleThreadedTaskQueueForTesting() { + Stop(); +} + +DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId +DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed( + std::unique_ptr task, + int64_t delay_ms) { + int64_t earliest_exec_time = rtc::TimeAfter(delay_ms); + + rtc::CritScope lock(&cs_); + if (!running_) + return kInvalidTaskId; + + TaskId id = next_task_id_++; + + // Insert after any other tasks with an earlier-or-equal target time. + // Note: multimap has promise "The order of the key-value pairs whose keys + // compare equivalent is the order of insertion and does not change." + tasks_.emplace(std::piecewise_construct, + std::forward_as_tuple(earliest_exec_time), + std::forward_as_tuple(id, std::move(task))); + + // This class is optimized for simplicty, not for performance. This will wake + // the thread up even if the next task in the queue is only scheduled for + // quite some time from now. In that case, the thread will just send itself + // back to sleep. + wake_up_.Set(); + + return id; +} + +bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) { + rtc::CritScope lock(&cs_); + for (auto it = tasks_.begin(); it != tasks_.end(); it++) { + if (it->second.task_id == task_id) { + tasks_.erase(it); + return true; + } + } + return false; +} + +bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsCurrent() { + return rtc::IsThreadRefEqual(thread_.GetThreadRef(), rtc::CurrentThreadRef()); +} + +bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsRunning() { + RTC_DCHECK_RUN_ON(&owner_thread_checker_); + // We could check the |running_| flag here, but this is equivalent for the + // purposes of this function. + return thread_.IsRunning(); +} + +bool DEPRECATED_SingleThreadedTaskQueueForTesting::HasPendingTasks() const { + rtc::CritScope lock(&cs_); + return !tasks_.empty(); +} + +void DEPRECATED_SingleThreadedTaskQueueForTesting::Stop() { + RTC_DCHECK_RUN_ON(&owner_thread_checker_); + if (!thread_.IsRunning()) + return; + + { + rtc::CritScope lock(&cs_); + running_ = false; + } + + wake_up_.Set(); + thread_.Stop(); +} + +void DEPRECATED_SingleThreadedTaskQueueForTesting::Run(void* obj) { + static_cast(obj)->RunLoop(); +} + +void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { + CurrentTaskQueueSetter set_current(this); + while (true) { + std::unique_ptr queued_task; + + // An empty queue would lead to sleeping until the queue becoems non-empty. + // A queue where the earliest task is scheduled for later than now, will + // lead to sleeping until the time of the next scheduled task (or until + // more tasks are scheduled). + int wait_time = rtc::Event::kForever; + + { + rtc::CritScope lock(&cs_); + if (!running_) { + return; + } + if (!tasks_.empty()) { + auto next_delayed_task = tasks_.begin(); + int64_t earliest_exec_time = next_delayed_task->first; + int64_t remaining_delay_ms = + rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis()); + if (remaining_delay_ms <= 0) { + queued_task = std::move(next_delayed_task->second.task); + tasks_.erase(next_delayed_task); + } else { + wait_time = rtc::saturated_cast(remaining_delay_ms); + } + } + } + + if (queued_task) { + if (!queued_task->Run()) { + queued_task.release(); + } + } else { + wake_up_.Wait(wait_time); + } + } +} + +void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() { + Stop(); + delete this; +} + +} // namespace test +} // namespace webrtc diff --git a/test/single_threaded_task_queue.h b/test/single_threaded_task_queue.h new file mode 100644 index 0000000000..38458294de --- /dev/null +++ b/test/single_threaded_task_queue.h @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef TEST_SINGLE_THREADED_TASK_QUEUE_H_ +#define TEST_SINGLE_THREADED_TASK_QUEUE_H_ + +#include +#include +#include +#include + +#include "api/task_queue/task_queue_base.h" +#include "rtc_base/critical_section.h" +#include "rtc_base/deprecation.h" +#include "rtc_base/event.h" +#include "rtc_base/platform_thread.h" +#include "rtc_base/task_queue_for_test.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "rtc_base/thread_checker.h" + +namespace webrtc { +namespace test { + +// DEPRECATED. This class doesn't striclty follow rtc::TaskQueue semantics, +// which makes it surprising and hard to use correctly. +// Please use TaskQueueForTest instead. + +// This class gives capabilities similar to rtc::TaskQueue, but ensures +// everything happens on the same thread. This is intended to make the +// threading model of unit-tests (specifically end-to-end tests) more closely +// resemble that of real WebRTC, thereby allowing us to replace some critical +// sections by thread-checkers. +// This task is NOT tuned for performance, but rather for simplicity. +class DEPRECATED_SingleThreadedTaskQueueForTesting : public TaskQueueBase { + public: + using Task = std::function; + using TaskId = size_t; + constexpr static TaskId kInvalidTaskId = static_cast(-1); + + explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name); + ~DEPRECATED_SingleThreadedTaskQueueForTesting() override; + + // Sends one task to the task-queue, and returns a handle by which the + // task can be cancelled. + // This mimics the behavior of TaskQueue, but only for lambdas, rather than + // for both lambdas and QueuedTask objects. + TaskId PostTask(Task task) { + return PostDelayed(ToQueuedTask(std::move(task)), /*delay_ms=*/0); + } + + // Same as PostTask(), but ensures that the task will not begin execution + // less than |delay_ms| milliseconds after being posted; an upper bound + // is not provided. + TaskId PostDelayedTask(Task task, int64_t delay_ms) { + return PostDelayed(ToQueuedTask(std::move(task)), delay_ms); + } + + // Given an identifier to the task, attempts to eject it from the queue. + // Returns true if the task was found and cancelled. Failure possible + // only for invalid task IDs, or for tasks which have already been executed. + bool CancelTask(TaskId task_id); + + // Returns true iff called on the thread associated with the task queue. + bool IsCurrent(); + + // Returns true iff the task queue is actively being serviced. + bool IsRunning(); + + bool HasPendingTasks() const; + + void Stop(); + + // Implements TaskQueueBase. + void Delete() override; + + void PostTask(std::unique_ptr task) override { + PostDelayed(std::move(task), /*delay_ms=*/0); + } + + void PostDelayedTask(std::unique_ptr task, + uint32_t delay_ms) override { + PostDelayed(std::move(task), delay_ms); + } + + private: + struct StoredTask { + StoredTask(TaskId task_id, std::unique_ptr task); + ~StoredTask(); + + TaskId task_id; + std::unique_ptr task; + }; + + TaskId PostDelayed(std::unique_ptr task, int64_t delay_ms); + + static void Run(void* obj); + + void RunLoop(); + + rtc::CriticalSection cs_; + // Tasks are ordered by earliest execution time. + std::multimap tasks_ RTC_GUARDED_BY(cs_); + rtc::ThreadChecker owner_thread_checker_; + rtc::PlatformThread thread_; + bool running_ RTC_GUARDED_BY(cs_); + + TaskId next_task_id_; + + // The task-queue will sleep when not executing a task. Wake up occurs when: + // * Upon destruction, to make sure that the |thead_| terminates, so that it + // may be joined. [Event will be set.] + // * New task added. Because we optimize for simplicity rahter than for + // performance (this class is a testing facility only), waking up occurs + // when we get a new task even if it is scheduled with a delay. The RunLoop + // is in charge of sending itself back to sleep if the next task is only + // to be executed at a later time. [Event will be set.] + // * When the next task in the queue is a delayed-task, and the time for + // its execution has come. [Event will time-out.] + rtc::Event wake_up_; +}; + +// Warn if new usage. +typedef DEPRECATED_SingleThreadedTaskQueueForTesting RTC_DEPRECATED + SingleThreadedTaskQueueForTesting; + +} // namespace test +} // namespace webrtc + +#endif // TEST_SINGLE_THREADED_TASK_QUEUE_H_ diff --git a/test/single_threaded_task_queue_unittest.cc b/test/single_threaded_task_queue_unittest.cc new file mode 100644 index 0000000000..9e2304d6e4 --- /dev/null +++ b/test/single_threaded_task_queue_unittest.cc @@ -0,0 +1,375 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "test/single_threaded_task_queue.h" + +#include +#include +#include + +#include "api/task_queue/task_queue_test.h" +#include "rtc_base/event.h" +#include "rtc_base/task_queue_for_test.h" +#include "test/gtest.h" + +namespace webrtc { +namespace test { + +namespace { + +using TaskId = DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId; + +// Test should not rely on the object under test not being faulty. If the task +// queue ever blocks forever, we want the tests to fail, rather than hang. +constexpr int kMaxWaitTimeMs = 10000; + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + SanityConstructionDestruction) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, ExecutesPostedTasks) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + std::atomic executed(false); + rtc::Event done; + + task_queue.PostTask([&executed, &done]() { + executed.store(true); + done.Set(); + }); + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); + + EXPECT_TRUE(executed.load()); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + PostMultipleTasksFromSameExternalThread) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + constexpr size_t kCount = 3; + std::atomic executed[kCount]; + for (std::atomic& exec : executed) { + exec.store(false); + } + + std::vector> done_events; + for (size_t i = 0; i < kCount; i++) { + done_events.emplace_back(std::make_unique()); + } + + // To avoid the tasks which comprise the actual test from running before they + // have all be posted, which could result in only one task ever being in the + // queue at any given time, post one waiting task that would block the + // task-queue, and unblock only after all tasks have been posted. + rtc::Event rendezvous; + task_queue.PostTask( + [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); }); + + // Post the tasks which comprise the test. + for (size_t i = 0; i < kCount; i++) { + task_queue.PostTask([&executed, &done_events, i]() { // |i| by value. + executed[i].store(true); + done_events[i]->Set(); + }); + } + + rendezvous.Set(); // Release the task-queue. + + // Wait until the task queue has executed all the tasks. + for (size_t i = 0; i < kCount; i++) { + ASSERT_TRUE(done_events[i]->Wait(kMaxWaitTimeMs)); + } + + for (size_t i = 0; i < kCount; i++) { + EXPECT_TRUE(executed[i].load()); + } +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + PostToTaskQueueFromOwnThread) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + std::atomic executed(false); + rtc::Event done; + + auto internally_posted_task = [&executed, &done]() { + executed.store(true); + done.Set(); + }; + + auto externally_posted_task = [&task_queue, &internally_posted_task]() { + task_queue.PostTask(internally_posted_task); + }; + + task_queue.PostTask(externally_posted_task); + + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); + EXPECT_TRUE(executed.load()); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + TasksExecutedInSequence) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + // The first task would perform: + // accumulator = 10 * accumulator + i + // Where |i| is 1, 2 and 3 for the 1st, 2nd and 3rd tasks, respectively. + // The result would be 123 if and only iff the tasks were executed in order. + size_t accumulator = 0; + size_t expected_value = 0; // Updates to the correct value. + + // Prevent the chain from being set in motion before we've had time to + // schedule it all, lest the queue only contain one task at a time. + rtc::Event rendezvous; + task_queue.PostTask( + [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); }); + + for (size_t i = 0; i < 3; i++) { + task_queue.PostTask([&accumulator, i]() { // |i| passed by value. + accumulator = 10 * accumulator + i; + }); + expected_value = 10 * expected_value + i; + } + + // The test will wait for the task-queue to finish. + rtc::Event done; + task_queue.PostTask([&done]() { done.Set(); }); + + rendezvous.Set(); // Set the chain in motion. + + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); + + EXPECT_EQ(accumulator, expected_value); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + ExecutesPostedDelayedTask) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + std::atomic executed(false); + rtc::Event done; + + constexpr int64_t delay_ms = 20; + static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests."); + + task_queue.PostDelayedTask( + [&executed, &done]() { + executed.store(true); + done.Set(); + }, + delay_ms); + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); + + EXPECT_TRUE(executed.load()); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + DoesNotExecuteDelayedTaskTooSoon) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + std::atomic executed(false); + + constexpr int64_t delay_ms = 2000; + static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests."); + + task_queue.PostDelayedTask([&executed]() { executed.store(true); }, delay_ms); + + // Wait less than is enough, make sure the task was not yet executed. + rtc::Event not_done; + ASSERT_FALSE(not_done.Wait(delay_ms / 2)); + EXPECT_FALSE(executed.load()); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + TaskWithLesserDelayPostedAfterFirstDelayedTaskExectuedBeforeFirst) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + std::atomic earlier_executed(false); + constexpr int64_t earlier_delay_ms = 500; + + std::atomic later_executed(false); + constexpr int64_t later_delay_ms = 1000; + + static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2, + "Delay too long for tests."); + + rtc::Event done; + + auto earlier_task = [&earlier_executed, &later_executed]() { + EXPECT_FALSE(later_executed.load()); + earlier_executed.store(true); + }; + + auto later_task = [&earlier_executed, &later_executed, &done]() { + EXPECT_TRUE(earlier_executed.load()); + later_executed.store(true); + done.Set(); + }; + + task_queue.PostDelayedTask(later_task, later_delay_ms); + task_queue.PostDelayedTask(earlier_task, earlier_delay_ms); + + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); + ASSERT_TRUE(earlier_executed); + ASSERT_TRUE(later_executed); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + TaskWithGreaterDelayPostedAfterFirstDelayedTaskExectuedAfterFirst) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + std::atomic earlier_executed(false); + constexpr int64_t earlier_delay_ms = 500; + + std::atomic later_executed(false); + constexpr int64_t later_delay_ms = 1000; + + static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2, + "Delay too long for tests."); + + rtc::Event done; + + auto earlier_task = [&earlier_executed, &later_executed]() { + EXPECT_FALSE(later_executed.load()); + earlier_executed.store(true); + }; + + auto later_task = [&earlier_executed, &later_executed, &done]() { + EXPECT_TRUE(earlier_executed.load()); + later_executed.store(true); + done.Set(); + }; + + task_queue.PostDelayedTask(earlier_task, earlier_delay_ms); + task_queue.PostDelayedTask(later_task, later_delay_ms); + + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); + ASSERT_TRUE(earlier_executed); + ASSERT_TRUE(later_executed); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + ExternalThreadCancelsTask) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + rtc::Event done; + + // Prevent the to-be-cancelled task from being executed before we've had + // time to cancel it. + rtc::Event rendezvous; + task_queue.PostTask( + [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); }); + + TaskId cancelled_task_id = task_queue.PostTask([]() { EXPECT_TRUE(false); }); + task_queue.PostTask([&done]() { done.Set(); }); + + task_queue.CancelTask(cancelled_task_id); + + // Set the tasks in motion; the cancelled task does not run (otherwise the + // test would fail). The last task ends the test, showing that the queue + // progressed beyond the cancelled task. + rendezvous.Set(); + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); +} + +// In this test, we'll set off a chain where the first task cancels the second +// task, then a third task runs (showing that we really cancelled the task, +// rather than just halted the task-queue). +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + InternalThreadCancelsTask) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + rtc::Event done; + + // Prevent the chain from being set-off before we've set everything up. + rtc::Event rendezvous; + task_queue.PostTask( + [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); }); + + // This is the canceller-task. It takes cancelled_task_id by reference, + // because the ID will only become known after the cancelled task is + // scheduled. + TaskId cancelled_task_id; + auto canceller_task = [&task_queue, &cancelled_task_id]() { + task_queue.CancelTask(cancelled_task_id); + }; + task_queue.PostTask(canceller_task); + + // This task will be cancelled by the task before it. + auto cancelled_task = []() { EXPECT_TRUE(false); }; + cancelled_task_id = task_queue.PostTask(cancelled_task); + + // When this task runs, it will allow the test to be finished. + auto completion_marker_task = [&done]() { done.Set(); }; + task_queue.PostTask(completion_marker_task); + + rendezvous.Set(); // Set the chain in motion. + + ASSERT_TRUE(done.Wait(kMaxWaitTimeMs)); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, SendTask) { + DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue"); + + std::atomic executed(false); + + SendTask(RTC_FROM_HERE, &task_queue, [&executed]() { + // Intentionally delay, so that if SendTask didn't block, the sender thread + // would have time to read |executed|. + rtc::Event delay; + ASSERT_FALSE(delay.Wait(1000)); + executed.store(true); + }); + + EXPECT_TRUE(executed); +} + +TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, + DestructTaskQueueWhileTasksPending) { + auto task_queue = + std::make_unique( + "task_queue"); + + std::atomic counter(0); + + constexpr size_t tasks = 10; + for (size_t i = 0; i < tasks; i++) { + task_queue->PostTask([&counter]() { + std::atomic_fetch_add(&counter, static_cast(1)); + rtc::Event delay; + ASSERT_FALSE(delay.Wait(500)); + }); + } + + task_queue.reset(); + + EXPECT_LT(counter, tasks); +} + +class SingleThreadedTaskQueueForTestingFactory : public TaskQueueFactory { + public: + std::unique_ptr CreateTaskQueue( + absl::string_view /* name */, + Priority /*priority*/) const override { + return std::unique_ptr( + new DEPRECATED_SingleThreadedTaskQueueForTesting("noname")); + } +}; + +INSTANTIATE_TEST_SUITE_P( + DeprecatedSingleThreadedTaskQueueForTesting, + TaskQueueTest, + ::testing::Values( + std::make_unique)); + +} // namespace +} // namespace test +} // namespace webrtc