diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index 1394648e5f..ca41452e23 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -37,6 +37,7 @@ rtc_source_set("task_queue_test") { ] deps = [ ":task_queue", + "../../rtc_base:refcount", "../../rtc_base:rtc_event", "../../rtc_base:timeutils", "../../rtc_base/task_utils:to_queued_task", diff --git a/api/task_queue/task_queue_test.cc b/api/task_queue/task_queue_test.cc index 8d02ed6c9e..8cf59ce423 100644 --- a/api/task_queue/task_queue_test.cc +++ b/api/task_queue/task_queue_test.cc @@ -12,6 +12,7 @@ #include "absl/memory/memory.h" #include "absl/strings/string_view.h" #include "rtc_base/event.h" +#include "rtc_base/ref_counter.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" @@ -186,37 +187,53 @@ TEST_P(TaskQueueTest, PostAndReuse) { EXPECT_TRUE(event.Wait(1000)); } -// Tests posting more messages than a queue can queue up. -// In situations like that, tasks will get dropped. TEST_P(TaskQueueTest, PostALot) { + // Waits until DecrementCount called |count| times. Thread safe. + class BlockingCounter { + public: + explicit BlockingCounter(int initial_count) : count_(initial_count) {} + + void DecrementCount() { + if (count_.DecRef() == rtc::RefCountReleaseStatus::kDroppedLastRef) { + event_.Set(); + } + } + bool Wait(int give_up_after_ms) { return event_.Wait(give_up_after_ms); } + + private: + webrtc_impl::RefCounter count_; + rtc::Event event_; + }; + std::unique_ptr factory = GetParam()(); - // To destruct the event after the queue has gone out of scope. - rtc::Event event; + static constexpr int kTaskCount = 0xffff; + rtc::Event posting_done; + BlockingCounter all_destroyed(kTaskCount); int tasks_executed = 0; - int tasks_cleaned_up = 0; - static const int kTaskCount = 0xffff; + auto task_queue = CreateTaskQueue(factory, "PostALot"); - { - auto queue = CreateTaskQueue(factory, "PostALot"); + task_queue->PostTask(ToQueuedTask([&] { + // Post tasks from the queue to guarantee that the 1st task won't be + // executed before the last one is posted. + for (int i = 0; i < kTaskCount; ++i) { + task_queue->PostTask(ToQueuedTask( + [&] { ++tasks_executed; }, [&] { all_destroyed.DecrementCount(); })); + } - // On linux, the limit of pending bytes in the pipe buffer is 0xffff. - // So here we post a total of 0xffff+1 messages, which triggers a failure - // case inside of the libevent queue implementation. + posting_done.Set(); + })); - queue->PostTask(ToQueuedTask([&event] { - rtc::ScopedAllowBaseSyncPrimitivesForTesting allow_base_sync_primitives; - event.Wait(rtc::Event::kForever); - })); - for (int i = 0; i < kTaskCount; ++i) - queue->PostTask( - ToQueuedTask([&tasks_executed] { ++tasks_executed; }, - [&tasks_cleaned_up] { ++tasks_cleaned_up; })); - event.Set(); // Unblock the first task. - } + // Before destroying the task queue wait until all child tasks are posted. + EXPECT_TRUE(posting_done.Wait(1000)); + // Destroy the task queue. + task_queue = nullptr; - EXPECT_GE(tasks_cleaned_up, tasks_executed); - EXPECT_EQ(tasks_cleaned_up, kTaskCount); + // Expect all tasks are destroyed eventually. In some task queue + // implementations that might happen on a different thread after task queue is + // destroyed. + EXPECT_TRUE(all_destroyed.Wait(1000)); + EXPECT_LE(tasks_executed, kTaskCount); } // Test posting two tasks that have shared state not protected by a