Stop using and delete DEPRECATED_SingleThreadedTaskQueueForTesting

Bug: webrtc:10933
Change-Id: I8307e2aad06d3f3f367af122e43ecc088b52f2d6
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/157896
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29713}
This commit is contained in:
Danil Chapovalov 2019-11-06 17:00:00 +01:00 committed by Commit Bot
parent d58fdbedcf
commit b1c1f6907f
6 changed files with 9 additions and 698 deletions

View File

@ -389,7 +389,6 @@ if (rtc_include_tests) {
"../rtc_base:rtc_base_approved",
"../rtc_base:task_queue_for_test",
"../rtc_base/system:file_wrapper",
"../test:single_threaded_task_queue",
"pc/e2e:e2e_unittests",
"peer_scenario/tests",
"scenario:scenario_unittests",
@ -408,7 +407,6 @@ if (rtc_include_tests) {
"frame_generator_unittest.cc",
"rtp_file_reader_unittest.cc",
"rtp_file_writer_unittest.cc",
"single_threaded_task_queue_unittest.cc",
"testsupport/perf_test_unittest.cc",
"testsupport/test_artifacts_unittest.cc",
"testsupport/video_frame_writer_unittest.cc",
@ -627,22 +625,6 @@ rtc_library("direct_transport") {
]
}
rtc_library("single_threaded_task_queue") {
testonly = true
sources = [
"single_threaded_task_queue.cc",
"single_threaded_task_queue.h",
]
deps = [
"../api/task_queue",
"../rtc_base:checks",
"../rtc_base:deprecation",
"../rtc_base:rtc_base_approved",
"../rtc_base:task_queue_for_test",
"../rtc_base/task_utils:to_queued_task",
]
}
rtc_library("fake_video_codecs") {
allow_poison = [ "software_video_codecs" ]
visibility = [ "*" ]
@ -748,7 +730,6 @@ rtc_library("test_common") {
":fake_video_codecs",
":fileutils",
":rtp_test_utils",
":single_threaded_task_queue",
":test_support",
":video_test_common",
"../api:rtp_headers",

View File

@ -56,7 +56,9 @@ CallTest::CallTest()
num_flexfec_streams_(0),
audio_decoder_factory_(CreateBuiltinAudioDecoderFactory()),
audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()),
task_queue_("CallTestTaskQueue") {}
task_queue_(task_queue_factory_->CreateTaskQueue(
"CallTestTaskQueue",
TaskQueueFactory::Priority::NORMAL)) {}
CallTest::~CallTest() = default;
@ -84,7 +86,7 @@ void CallTest::RegisterRtpExtension(const RtpExtension& extension) {
}
void CallTest::RunBaseTest(BaseTest* test) {
SendTask(RTC_FROM_HERE, &task_queue_, [this, test]() {
SendTask(RTC_FROM_HERE, task_queue(), [this, test]() {
num_video_streams_ = test->GetNumVideoStreams();
num_audio_streams_ = test->GetNumAudioStreams();
num_flexfec_streams_ = test->GetNumFlexfecStreams();
@ -123,9 +125,9 @@ void CallTest::RunBaseTest(BaseTest* test) {
CreateReceiverCall(recv_config);
}
test->OnCallsCreated(sender_call_.get(), receiver_call_.get());
receive_transport_ = test->CreateReceiveTransport(&task_queue_);
receive_transport_ = test->CreateReceiveTransport(task_queue());
send_transport_ =
test->CreateSendTransport(&task_queue_, sender_call_.get());
test->CreateSendTransport(task_queue(), sender_call_.get());
if (test->ShouldCreateReceivers()) {
send_transport_->SetReceiver(receiver_call_->Receiver());
@ -184,7 +186,7 @@ void CallTest::RunBaseTest(BaseTest* test) {
test->PerformTest();
SendTask(RTC_FROM_HERE, &task_queue_, [this, test]() {
SendTask(RTC_FROM_HERE, task_queue(), [this, test]() {
Stop();
test->OnStreamsStopped();
DestroyStreams();

View File

@ -30,7 +30,6 @@
#include "test/fake_vp8_encoder.h"
#include "test/frame_generator_capturer.h"
#include "test/rtp_rtcp_observer.h"
#include "test/single_threaded_task_queue.h"
namespace webrtc {
namespace test {
@ -174,7 +173,7 @@ class CallTest : public ::testing::Test {
void SetVideoEncoderConfig(const VideoEncoderConfig& config);
VideoSendStream* GetVideoSendStream();
FlexfecReceiveStream::Config* GetFlexFecConfig();
TaskQueueBase* task_queue() { return &task_queue_; }
TaskQueueBase* task_queue() { return task_queue_.get(); }
Clock* const clock_;
@ -230,7 +229,7 @@ class CallTest : public ::testing::Test {
void AddRtpExtensionByUri(const std::string& uri,
std::vector<RtpExtension>* extensions) const;
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
std::vector<RtpExtension> rtp_extensions_;
rtc::scoped_refptr<AudioProcessing> apm_send_;
rtc::scoped_refptr<AudioProcessing> apm_recv_;

View File

@ -1,161 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/single_threaded_task_queue.h"
#include <memory>
#include <utility>
#include "rtc_base/checks.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/time_utils.h"
namespace webrtc {
namespace test {
DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask(
DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id,
std::unique_ptr<QueuedTask> task)
: task_id(task_id), task(std::move(task)) {}
DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() =
default;
DEPRECATED_SingleThreadedTaskQueueForTesting::
DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name)
: thread_(Run, this, name), running_(true), next_task_id_(0) {
thread_.Start();
}
DEPRECATED_SingleThreadedTaskQueueForTesting::
~DEPRECATED_SingleThreadedTaskQueueForTesting() {
Stop();
}
DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId
DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed(
std::unique_ptr<QueuedTask> task,
int64_t delay_ms) {
int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
rtc::CritScope lock(&cs_);
if (!running_)
return kInvalidTaskId;
TaskId id = next_task_id_++;
// Insert after any other tasks with an earlier-or-equal target time.
// Note: multimap has promise "The order of the key-value pairs whose keys
// compare equivalent is the order of insertion and does not change."
tasks_.emplace(std::piecewise_construct,
std::forward_as_tuple(earliest_exec_time),
std::forward_as_tuple(id, std::move(task)));
// This class is optimized for simplicty, not for performance. This will wake
// the thread up even if the next task in the queue is only scheduled for
// quite some time from now. In that case, the thread will just send itself
// back to sleep.
wake_up_.Set();
return id;
}
bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
rtc::CritScope lock(&cs_);
for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
if (it->second.task_id == task_id) {
tasks_.erase(it);
return true;
}
}
return false;
}
bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsCurrent() {
return rtc::IsThreadRefEqual(thread_.GetThreadRef(), rtc::CurrentThreadRef());
}
bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsRunning() {
RTC_DCHECK_RUN_ON(&owner_thread_checker_);
// We could check the |running_| flag here, but this is equivalent for the
// purposes of this function.
return thread_.IsRunning();
}
bool DEPRECATED_SingleThreadedTaskQueueForTesting::HasPendingTasks() const {
rtc::CritScope lock(&cs_);
return !tasks_.empty();
}
void DEPRECATED_SingleThreadedTaskQueueForTesting::Stop() {
RTC_DCHECK_RUN_ON(&owner_thread_checker_);
if (!thread_.IsRunning())
return;
{
rtc::CritScope lock(&cs_);
running_ = false;
}
wake_up_.Set();
thread_.Stop();
}
void DEPRECATED_SingleThreadedTaskQueueForTesting::Run(void* obj) {
static_cast<DEPRECATED_SingleThreadedTaskQueueForTesting*>(obj)->RunLoop();
}
void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() {
CurrentTaskQueueSetter set_current(this);
while (true) {
std::unique_ptr<QueuedTask> queued_task;
// An empty queue would lead to sleeping until the queue becoems non-empty.
// A queue where the earliest task is scheduled for later than now, will
// lead to sleeping until the time of the next scheduled task (or until
// more tasks are scheduled).
int wait_time = rtc::Event::kForever;
{
rtc::CritScope lock(&cs_);
if (!running_) {
return;
}
if (!tasks_.empty()) {
auto next_delayed_task = tasks_.begin();
int64_t earliest_exec_time = next_delayed_task->first;
int64_t remaining_delay_ms =
rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis());
if (remaining_delay_ms <= 0) {
queued_task = std::move(next_delayed_task->second.task);
tasks_.erase(next_delayed_task);
} else {
wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
}
}
}
if (queued_task) {
if (!queued_task->Run()) {
queued_task.release();
}
} else {
wake_up_.Wait(wait_time);
}
}
}
void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() {
Stop();
delete this;
}
} // namespace test
} // namespace webrtc

View File

@ -1,135 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef TEST_SINGLE_THREADED_TASK_QUEUE_H_
#define TEST_SINGLE_THREADED_TASK_QUEUE_H_
#include <functional>
#include <map>
#include <memory>
#include <utility>
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/deprecation.h"
#include "rtc_base/event.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
namespace test {
// DEPRECATED. This class doesn't striclty follow rtc::TaskQueue semantics,
// which makes it surprising and hard to use correctly.
// Please use TaskQueueForTest instead.
// This class gives capabilities similar to rtc::TaskQueue, but ensures
// everything happens on the same thread. This is intended to make the
// threading model of unit-tests (specifically end-to-end tests) more closely
// resemble that of real WebRTC, thereby allowing us to replace some critical
// sections by thread-checkers.
// This task is NOT tuned for performance, but rather for simplicity.
class DEPRECATED_SingleThreadedTaskQueueForTesting : public TaskQueueBase {
public:
using Task = std::function<void()>;
using TaskId = size_t;
constexpr static TaskId kInvalidTaskId = static_cast<TaskId>(-1);
explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name);
~DEPRECATED_SingleThreadedTaskQueueForTesting() override;
// Sends one task to the task-queue, and returns a handle by which the
// task can be cancelled.
// This mimics the behavior of TaskQueue, but only for lambdas, rather than
// for both lambdas and QueuedTask objects.
TaskId PostTask(Task task) {
return PostDelayed(ToQueuedTask(std::move(task)), /*delay_ms=*/0);
}
// Same as PostTask(), but ensures that the task will not begin execution
// less than |delay_ms| milliseconds after being posted; an upper bound
// is not provided.
TaskId PostDelayedTask(Task task, int64_t delay_ms) {
return PostDelayed(ToQueuedTask(std::move(task)), delay_ms);
}
// Given an identifier to the task, attempts to eject it from the queue.
// Returns true if the task was found and cancelled. Failure possible
// only for invalid task IDs, or for tasks which have already been executed.
bool CancelTask(TaskId task_id);
// Returns true iff called on the thread associated with the task queue.
bool IsCurrent();
// Returns true iff the task queue is actively being serviced.
bool IsRunning();
bool HasPendingTasks() const;
void Stop();
// Implements TaskQueueBase.
void Delete() override;
void PostTask(std::unique_ptr<QueuedTask> task) override {
PostDelayed(std::move(task), /*delay_ms=*/0);
}
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t delay_ms) override {
PostDelayed(std::move(task), delay_ms);
}
private:
struct StoredTask {
StoredTask(TaskId task_id, std::unique_ptr<QueuedTask> task);
~StoredTask();
TaskId task_id;
std::unique_ptr<QueuedTask> task;
};
TaskId PostDelayed(std::unique_ptr<QueuedTask> task, int64_t delay_ms);
static void Run(void* obj);
void RunLoop();
rtc::CriticalSection cs_;
// Tasks are ordered by earliest execution time.
std::multimap<int64_t, StoredTask> tasks_ RTC_GUARDED_BY(cs_);
rtc::ThreadChecker owner_thread_checker_;
rtc::PlatformThread thread_;
bool running_ RTC_GUARDED_BY(cs_);
TaskId next_task_id_;
// The task-queue will sleep when not executing a task. Wake up occurs when:
// * Upon destruction, to make sure that the |thead_| terminates, so that it
// may be joined. [Event will be set.]
// * New task added. Because we optimize for simplicity rahter than for
// performance (this class is a testing facility only), waking up occurs
// when we get a new task even if it is scheduled with a delay. The RunLoop
// is in charge of sending itself back to sleep if the next task is only
// to be executed at a later time. [Event will be set.]
// * When the next task in the queue is a delayed-task, and the time for
// its execution has come. [Event will time-out.]
rtc::Event wake_up_;
};
// Warn if new usage.
typedef DEPRECATED_SingleThreadedTaskQueueForTesting RTC_DEPRECATED
SingleThreadedTaskQueueForTesting;
} // namespace test
} // namespace webrtc
#endif // TEST_SINGLE_THREADED_TASK_QUEUE_H_

View File

@ -1,375 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/single_threaded_task_queue.h"
#include <atomic>
#include <memory>
#include <vector>
#include "api/task_queue/task_queue_test.h"
#include "rtc_base/event.h"
#include "rtc_base/task_queue_for_test.h"
#include "test/gtest.h"
namespace webrtc {
namespace test {
namespace {
using TaskId = DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId;
// Test should not rely on the object under test not being faulty. If the task
// queue ever blocks forever, we want the tests to fail, rather than hang.
constexpr int kMaxWaitTimeMs = 10000;
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
SanityConstructionDestruction) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, ExecutesPostedTasks) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
std::atomic<bool> executed(false);
rtc::Event done;
task_queue.PostTask([&executed, &done]() {
executed.store(true);
done.Set();
});
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
EXPECT_TRUE(executed.load());
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
PostMultipleTasksFromSameExternalThread) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
constexpr size_t kCount = 3;
std::atomic<bool> executed[kCount];
for (std::atomic<bool>& exec : executed) {
exec.store(false);
}
std::vector<std::unique_ptr<rtc::Event>> done_events;
for (size_t i = 0; i < kCount; i++) {
done_events.emplace_back(std::make_unique<rtc::Event>());
}
// To avoid the tasks which comprise the actual test from running before they
// have all be posted, which could result in only one task ever being in the
// queue at any given time, post one waiting task that would block the
// task-queue, and unblock only after all tasks have been posted.
rtc::Event rendezvous;
task_queue.PostTask(
[&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
// Post the tasks which comprise the test.
for (size_t i = 0; i < kCount; i++) {
task_queue.PostTask([&executed, &done_events, i]() { // |i| by value.
executed[i].store(true);
done_events[i]->Set();
});
}
rendezvous.Set(); // Release the task-queue.
// Wait until the task queue has executed all the tasks.
for (size_t i = 0; i < kCount; i++) {
ASSERT_TRUE(done_events[i]->Wait(kMaxWaitTimeMs));
}
for (size_t i = 0; i < kCount; i++) {
EXPECT_TRUE(executed[i].load());
}
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
PostToTaskQueueFromOwnThread) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
std::atomic<bool> executed(false);
rtc::Event done;
auto internally_posted_task = [&executed, &done]() {
executed.store(true);
done.Set();
};
auto externally_posted_task = [&task_queue, &internally_posted_task]() {
task_queue.PostTask(internally_posted_task);
};
task_queue.PostTask(externally_posted_task);
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
EXPECT_TRUE(executed.load());
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
TasksExecutedInSequence) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
// The first task would perform:
// accumulator = 10 * accumulator + i
// Where |i| is 1, 2 and 3 for the 1st, 2nd and 3rd tasks, respectively.
// The result would be 123 if and only iff the tasks were executed in order.
size_t accumulator = 0;
size_t expected_value = 0; // Updates to the correct value.
// Prevent the chain from being set in motion before we've had time to
// schedule it all, lest the queue only contain one task at a time.
rtc::Event rendezvous;
task_queue.PostTask(
[&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
for (size_t i = 0; i < 3; i++) {
task_queue.PostTask([&accumulator, i]() { // |i| passed by value.
accumulator = 10 * accumulator + i;
});
expected_value = 10 * expected_value + i;
}
// The test will wait for the task-queue to finish.
rtc::Event done;
task_queue.PostTask([&done]() { done.Set(); });
rendezvous.Set(); // Set the chain in motion.
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
EXPECT_EQ(accumulator, expected_value);
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
ExecutesPostedDelayedTask) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
std::atomic<bool> executed(false);
rtc::Event done;
constexpr int64_t delay_ms = 20;
static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
task_queue.PostDelayedTask(
[&executed, &done]() {
executed.store(true);
done.Set();
},
delay_ms);
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
EXPECT_TRUE(executed.load());
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
DoesNotExecuteDelayedTaskTooSoon) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
std::atomic<bool> executed(false);
constexpr int64_t delay_ms = 2000;
static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
task_queue.PostDelayedTask([&executed]() { executed.store(true); }, delay_ms);
// Wait less than is enough, make sure the task was not yet executed.
rtc::Event not_done;
ASSERT_FALSE(not_done.Wait(delay_ms / 2));
EXPECT_FALSE(executed.load());
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
TaskWithLesserDelayPostedAfterFirstDelayedTaskExectuedBeforeFirst) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
std::atomic<bool> earlier_executed(false);
constexpr int64_t earlier_delay_ms = 500;
std::atomic<bool> later_executed(false);
constexpr int64_t later_delay_ms = 1000;
static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
"Delay too long for tests.");
rtc::Event done;
auto earlier_task = [&earlier_executed, &later_executed]() {
EXPECT_FALSE(later_executed.load());
earlier_executed.store(true);
};
auto later_task = [&earlier_executed, &later_executed, &done]() {
EXPECT_TRUE(earlier_executed.load());
later_executed.store(true);
done.Set();
};
task_queue.PostDelayedTask(later_task, later_delay_ms);
task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
ASSERT_TRUE(earlier_executed);
ASSERT_TRUE(later_executed);
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
TaskWithGreaterDelayPostedAfterFirstDelayedTaskExectuedAfterFirst) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
std::atomic<bool> earlier_executed(false);
constexpr int64_t earlier_delay_ms = 500;
std::atomic<bool> later_executed(false);
constexpr int64_t later_delay_ms = 1000;
static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
"Delay too long for tests.");
rtc::Event done;
auto earlier_task = [&earlier_executed, &later_executed]() {
EXPECT_FALSE(later_executed.load());
earlier_executed.store(true);
};
auto later_task = [&earlier_executed, &later_executed, &done]() {
EXPECT_TRUE(earlier_executed.load());
later_executed.store(true);
done.Set();
};
task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
task_queue.PostDelayedTask(later_task, later_delay_ms);
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
ASSERT_TRUE(earlier_executed);
ASSERT_TRUE(later_executed);
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
ExternalThreadCancelsTask) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
rtc::Event done;
// Prevent the to-be-cancelled task from being executed before we've had
// time to cancel it.
rtc::Event rendezvous;
task_queue.PostTask(
[&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
TaskId cancelled_task_id = task_queue.PostTask([]() { EXPECT_TRUE(false); });
task_queue.PostTask([&done]() { done.Set(); });
task_queue.CancelTask(cancelled_task_id);
// Set the tasks in motion; the cancelled task does not run (otherwise the
// test would fail). The last task ends the test, showing that the queue
// progressed beyond the cancelled task.
rendezvous.Set();
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
}
// In this test, we'll set off a chain where the first task cancels the second
// task, then a third task runs (showing that we really cancelled the task,
// rather than just halted the task-queue).
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
InternalThreadCancelsTask) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
rtc::Event done;
// Prevent the chain from being set-off before we've set everything up.
rtc::Event rendezvous;
task_queue.PostTask(
[&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
// This is the canceller-task. It takes cancelled_task_id by reference,
// because the ID will only become known after the cancelled task is
// scheduled.
TaskId cancelled_task_id;
auto canceller_task = [&task_queue, &cancelled_task_id]() {
task_queue.CancelTask(cancelled_task_id);
};
task_queue.PostTask(canceller_task);
// This task will be cancelled by the task before it.
auto cancelled_task = []() { EXPECT_TRUE(false); };
cancelled_task_id = task_queue.PostTask(cancelled_task);
// When this task runs, it will allow the test to be finished.
auto completion_marker_task = [&done]() { done.Set(); };
task_queue.PostTask(completion_marker_task);
rendezvous.Set(); // Set the chain in motion.
ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, SendTask) {
DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
std::atomic<bool> executed(false);
SendTask(RTC_FROM_HERE, &task_queue, [&executed]() {
// Intentionally delay, so that if SendTask didn't block, the sender thread
// would have time to read |executed|.
rtc::Event delay;
ASSERT_FALSE(delay.Wait(1000));
executed.store(true);
});
EXPECT_TRUE(executed);
}
TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
DestructTaskQueueWhileTasksPending) {
auto task_queue =
std::make_unique<DEPRECATED_SingleThreadedTaskQueueForTesting>(
"task_queue");
std::atomic<size_t> counter(0);
constexpr size_t tasks = 10;
for (size_t i = 0; i < tasks; i++) {
task_queue->PostTask([&counter]() {
std::atomic_fetch_add(&counter, static_cast<size_t>(1));
rtc::Event delay;
ASSERT_FALSE(delay.Wait(500));
});
}
task_queue.reset();
EXPECT_LT(counter, tasks);
}
class SingleThreadedTaskQueueForTestingFactory : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view /* name */,
Priority /*priority*/) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new DEPRECATED_SingleThreadedTaskQueueForTesting("noname"));
}
};
INSTANTIATE_TEST_SUITE_P(
DeprecatedSingleThreadedTaskQueueForTesting,
TaskQueueTest,
::testing::Values(
std::make_unique<SingleThreadedTaskQueueForTestingFactory>));
} // namespace
} // namespace test
} // namespace webrtc