From 73387823a7151c69bc914324ad5b724c97074677 Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Thu, 16 Jan 2020 11:15:35 +0100 Subject: [PATCH] Cleanup: Removes MessageQueue header and alias Bug: webrtc:9883 Change-Id: I31aac563e54d61f03ff76ea1e9d284602a633252 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/166170 Reviewed-by: Steve Anton Commit-Queue: Sebastian Jansson Cr-Commit-Position: refs/heads/master@{#30314} --- api/DEPS | 1 - rtc_base/BUILD.gn | 4 - rtc_base/message_queue.h | 18 --- rtc_base/message_queue_unittest.cc | 222 ----------------------------- rtc_base/thread.h | 5 - rtc_base/thread_unittest.cc | 192 +++++++++++++++++++++++++ 6 files changed, 192 insertions(+), 250 deletions(-) delete mode 100644 rtc_base/message_queue.h delete mode 100644 rtc_base/message_queue_unittest.cc diff --git a/api/DEPS b/api/DEPS index bac4232b3e..ef9db30804 100644 --- a/api/DEPS +++ b/api/DEPS @@ -140,7 +140,6 @@ specific_include_rules = { "proxy\.h": [ "+rtc_base/event.h", "+rtc_base/message_handler.h", # Inherits from it. - "+rtc_base/message_queue.h", # Inherits from MessageData. "+rtc_base/ref_counted_object.h", "+rtc_base/thread.h", ], diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 0c8a477134..b4f4ad670c 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -713,8 +713,6 @@ rtc_source_set("threading") { # "defaultsocketserver.h", # "message_handler.cc", # "message_handler.h", - # "message_queue.cc", - # "message_queue.h", # "network_monitor.cc", # "network_monitor.h", # "physical_socket_server.cc", @@ -845,7 +843,6 @@ rtc_library("rtc_base") { "message_digest.h", "message_handler.cc", "message_handler.h", - "message_queue.h", "net_helper.cc", "net_helper.h", "net_helpers.cc", @@ -1344,7 +1341,6 @@ if (rtc_include_tests) { "ip_address_unittest.cc", "memory_usage_unittest.cc", "message_digest_unittest.cc", - "message_queue_unittest.cc", "nat_unittest.cc", "network_unittest.cc", "proxy_unittest.cc", diff --git a/rtc_base/message_queue.h b/rtc_base/message_queue.h deleted file mode 100644 index ffad9fe852..0000000000 --- a/rtc_base/message_queue.h +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2004 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef RTC_BASE_MESSAGE_QUEUE_H_ -#define RTC_BASE_MESSAGE_QUEUE_H_ - -// TODO(srte): Remove this file when all dependencies has been updated. - -#include "rtc_base/thread.h" - -#endif // RTC_BASE_MESSAGE_QUEUE_H_ diff --git a/rtc_base/message_queue_unittest.cc b/rtc_base/message_queue_unittest.cc deleted file mode 100644 index 4d3ea95a80..0000000000 --- a/rtc_base/message_queue_unittest.cc +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright 2004 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "rtc_base/thread.h" - -#include - -#include "rtc_base/atomic_ops.h" -#include "rtc_base/bind.h" -#include "rtc_base/event.h" -#include "rtc_base/gunit.h" -#include "rtc_base/logging.h" -#include "rtc_base/null_socket_server.h" -#include "rtc_base/ref_count.h" -#include "rtc_base/ref_counted_object.h" -#include "rtc_base/task_utils/to_queued_task.h" -#include "rtc_base/thread.h" -#include "rtc_base/time_utils.h" - -namespace rtc { -namespace { - -using ::webrtc::ToQueuedTask; - -class MessageQueueTest : public ::testing::Test, public Thread { - public: - MessageQueueTest() : Thread(SocketServer::CreateDefault(), true) {} - bool IsLocked_Worker() { - if (!CritForTest()->TryEnter()) { - return true; - } - CritForTest()->Leave(); - return false; - } - bool IsLocked() { - // We have to do this on a worker thread, or else the TryEnter will - // succeed, since our critical sections are reentrant. - std::unique_ptr worker(Thread::CreateWithSocketServer()); - worker->Start(); - return worker->Invoke( - RTC_FROM_HERE, rtc::Bind(&MessageQueueTest::IsLocked_Worker, this)); - } -}; - -struct DeletedLockChecker { - DeletedLockChecker(MessageQueueTest* test, bool* was_locked, bool* deleted) - : test(test), was_locked(was_locked), deleted(deleted) {} - ~DeletedLockChecker() { - *deleted = true; - *was_locked = test->IsLocked(); - } - MessageQueueTest* test; - bool* was_locked; - bool* deleted; -}; - -static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) { - EXPECT_TRUE(q != nullptr); - int64_t now = TimeMillis(); - q->PostAt(RTC_FROM_HERE, now, nullptr, 3); - q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0); - q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1); - q->PostAt(RTC_FROM_HERE, now, nullptr, 4); - q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2); - - Message msg; - for (size_t i = 0; i < 5; ++i) { - memset(&msg, 0, sizeof(msg)); - EXPECT_TRUE(q->Get(&msg, 0)); - EXPECT_EQ(i, msg.message_id); - } - - EXPECT_FALSE(q->Get(&msg, 0)); // No more messages -} - -TEST_F(MessageQueueTest, - DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) { - Thread q(SocketServer::CreateDefault(), true); - DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q); - - NullSocketServer nullss; - Thread q_nullss(&nullss, true); - DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss); -} - -TEST_F(MessageQueueTest, DisposeNotLocked) { - bool was_locked = true; - bool deleted = false; - DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted); - Dispose(d); - Message msg; - EXPECT_FALSE(Get(&msg, 0)); - EXPECT_TRUE(deleted); - EXPECT_FALSE(was_locked); -} - -class DeletedMessageHandler : public MessageHandler { - public: - explicit DeletedMessageHandler(bool* deleted) : deleted_(deleted) {} - ~DeletedMessageHandler() override { *deleted_ = true; } - void OnMessage(Message* msg) override {} - - private: - bool* deleted_; -}; - -TEST_F(MessageQueueTest, DiposeHandlerWithPostedMessagePending) { - bool deleted = false; - DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted); - // First, post a dispose. - Dispose(handler); - // Now, post a message, which should *not* be returned by Get(). - Post(RTC_FROM_HERE, handler, 1); - Message msg; - EXPECT_FALSE(Get(&msg, 0)); - EXPECT_TRUE(deleted); -} - -// Ensure that ProcessAllMessageQueues does its essential function; process -// all messages (both delayed and non delayed) up until the current time, on -// all registered message queues. -TEST(ThreadManager, ProcessAllMessageQueues) { - Event entered_process_all_message_queues(true, false); - auto a = Thread::CreateWithSocketServer(); - auto b = Thread::CreateWithSocketServer(); - a->Start(); - b->Start(); - - volatile int messages_processed = 0; - auto incrementer = [&messages_processed, - &entered_process_all_message_queues] { - // Wait for event as a means to ensure Increment doesn't occur outside - // of ProcessAllMessageQueues. The event is set by a message posted to - // the main thread, which is guaranteed to be handled inside - // ProcessAllMessageQueues. - entered_process_all_message_queues.Wait(Event::kForever); - AtomicOps::Increment(&messages_processed); - }; - auto event_signaler = [&entered_process_all_message_queues] { - entered_process_all_message_queues.Set(); - }; - - // Post messages (both delayed and non delayed) to both threads. - a->PostTask(ToQueuedTask(incrementer)); - b->PostTask(ToQueuedTask(incrementer)); - a->PostDelayedTask(ToQueuedTask(incrementer), 0); - b->PostDelayedTask(ToQueuedTask(incrementer), 0); - rtc::Thread::Current()->PostTask(ToQueuedTask(event_signaler)); - - ThreadManager::ProcessAllMessageQueuesForTesting(); - EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed)); -} - -// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting. -TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) { - auto t = Thread::CreateWithSocketServer(); - t->Start(); - t->Quit(); - ThreadManager::ProcessAllMessageQueuesForTesting(); -} - -// Test that ProcessAllMessageQueues doesn't hang if a queue clears its -// messages. -TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) { - Event entered_process_all_message_queues(true, false); - auto t = Thread::CreateWithSocketServer(); - t->Start(); - - auto clearer = [&entered_process_all_message_queues] { - // Wait for event as a means to ensure Clear doesn't occur outside of - // ProcessAllMessageQueues. The event is set by a message posted to the - // main thread, which is guaranteed to be handled inside - // ProcessAllMessageQueues. - entered_process_all_message_queues.Wait(Event::kForever); - rtc::Thread::Current()->Clear(nullptr); - }; - auto event_signaler = [&entered_process_all_message_queues] { - entered_process_all_message_queues.Set(); - }; - - // Post messages (both delayed and non delayed) to both threads. - t->PostTask(RTC_FROM_HERE, clearer); - rtc::Thread::Current()->PostTask(RTC_FROM_HERE, event_signaler); - ThreadManager::ProcessAllMessageQueuesForTesting(); -} - -class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface { - public: - void OnMessage(Message* msg) override {} -}; - -class EmptyHandler : public MessageHandler { - public: - void OnMessage(Message* msg) override {} -}; - -TEST(ThreadManager, ClearReentrant) { - std::unique_ptr t(Thread::Create()); - EmptyHandler handler; - RefCountedHandler* inner_handler( - new rtc::RefCountedObject()); - // When the empty handler is destroyed, it will clear messages queued for - // itself. The message to be cleared itself wraps a MessageHandler object - // (RefCountedHandler) so this will cause the message queue to be cleared - // again in a re-entrant fashion, which previously triggered a DCHECK. - // The inner handler will be removed in a re-entrant fashion from the - // message queue of the thread while the outer handler is removed, verifying - // that the iterator is not invalidated in "MessageQueue::Clear". - t->Post(RTC_FROM_HERE, inner_handler, 0); - t->Post(RTC_FROM_HERE, &handler, 0, - new ScopedRefMessageData(inner_handler)); -} - -} // namespace -} // namespace rtc diff --git a/rtc_base/thread.h b/rtc_base/thread.h index f8b41d16b6..8b853a85d0 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -593,11 +593,6 @@ class AutoSocketServerThread : public Thread { RTC_DISALLOW_COPY_AND_ASSIGN(AutoSocketServerThread); }; - -// TODO(srte): Remove these when all dependencies has been updated. -using MessageQueue = Thread; -using MessageQueueManager = ThreadManager; - } // namespace rtc #endif // RTC_BASE_THREAD_H_ diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index 0e04fae3f8..2cd21de0e8 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -16,11 +16,13 @@ #include "api/task_queue/task_queue_test.h" #include "rtc_base/async_invoker.h" #include "rtc_base/async_udp_socket.h" +#include "rtc_base/atomic_ops.h" #include "rtc_base/event.h" #include "rtc_base/gunit.h" #include "rtc_base/null_socket_server.h" #include "rtc_base/physical_socket_server.h" #include "rtc_base/socket_address.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/third_party/sigslot/sigslot.h" #if defined(WEBRTC_WIN) @@ -30,6 +32,8 @@ namespace rtc { namespace { +using ::webrtc::ToQueuedTask; + // Generates a sequence of numbers (collaboratively). class TestGenerator { public: @@ -431,6 +435,194 @@ TEST(ThreadTest, SetNameOnSignalQueueDestroyed) { delete thread2; } +class ThreadQueueTest : public ::testing::Test, public Thread { + public: + ThreadQueueTest() : Thread(SocketServer::CreateDefault(), true) {} + bool IsLocked_Worker() { + if (!CritForTest()->TryEnter()) { + return true; + } + CritForTest()->Leave(); + return false; + } + bool IsLocked() { + // We have to do this on a worker thread, or else the TryEnter will + // succeed, since our critical sections are reentrant. + std::unique_ptr worker(Thread::CreateWithSocketServer()); + worker->Start(); + return worker->Invoke( + RTC_FROM_HERE, rtc::Bind(&ThreadQueueTest::IsLocked_Worker, this)); + } +}; + +struct DeletedLockChecker { + DeletedLockChecker(ThreadQueueTest* test, bool* was_locked, bool* deleted) + : test(test), was_locked(was_locked), deleted(deleted) {} + ~DeletedLockChecker() { + *deleted = true; + *was_locked = test->IsLocked(); + } + ThreadQueueTest* test; + bool* was_locked; + bool* deleted; +}; + +static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) { + EXPECT_TRUE(q != nullptr); + int64_t now = TimeMillis(); + q->PostAt(RTC_FROM_HERE, now, nullptr, 3); + q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0); + q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1); + q->PostAt(RTC_FROM_HERE, now, nullptr, 4); + q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2); + + Message msg; + for (size_t i = 0; i < 5; ++i) { + memset(&msg, 0, sizeof(msg)); + EXPECT_TRUE(q->Get(&msg, 0)); + EXPECT_EQ(i, msg.message_id); + } + + EXPECT_FALSE(q->Get(&msg, 0)); // No more messages +} + +TEST_F(ThreadQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) { + Thread q(SocketServer::CreateDefault(), true); + DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q); + + NullSocketServer nullss; + Thread q_nullss(&nullss, true); + DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss); +} + +TEST_F(ThreadQueueTest, DisposeNotLocked) { + bool was_locked = true; + bool deleted = false; + DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted); + Dispose(d); + Message msg; + EXPECT_FALSE(Get(&msg, 0)); + EXPECT_TRUE(deleted); + EXPECT_FALSE(was_locked); +} + +class DeletedMessageHandler : public MessageHandler { + public: + explicit DeletedMessageHandler(bool* deleted) : deleted_(deleted) {} + ~DeletedMessageHandler() override { *deleted_ = true; } + void OnMessage(Message* msg) override {} + + private: + bool* deleted_; +}; + +TEST_F(ThreadQueueTest, DiposeHandlerWithPostedMessagePending) { + bool deleted = false; + DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted); + // First, post a dispose. + Dispose(handler); + // Now, post a message, which should *not* be returned by Get(). + Post(RTC_FROM_HERE, handler, 1); + Message msg; + EXPECT_FALSE(Get(&msg, 0)); + EXPECT_TRUE(deleted); +} + +// Ensure that ProcessAllMessageQueues does its essential function; process +// all messages (both delayed and non delayed) up until the current time, on +// all registered message queues. +TEST(ThreadManager, ProcessAllMessageQueues) { + Event entered_process_all_message_queues(true, false); + auto a = Thread::CreateWithSocketServer(); + auto b = Thread::CreateWithSocketServer(); + a->Start(); + b->Start(); + + volatile int messages_processed = 0; + auto incrementer = [&messages_processed, + &entered_process_all_message_queues] { + // Wait for event as a means to ensure Increment doesn't occur outside + // of ProcessAllMessageQueues. The event is set by a message posted to + // the main thread, which is guaranteed to be handled inside + // ProcessAllMessageQueues. + entered_process_all_message_queues.Wait(Event::kForever); + AtomicOps::Increment(&messages_processed); + }; + auto event_signaler = [&entered_process_all_message_queues] { + entered_process_all_message_queues.Set(); + }; + + // Post messages (both delayed and non delayed) to both threads. + a->PostTask(ToQueuedTask(incrementer)); + b->PostTask(ToQueuedTask(incrementer)); + a->PostDelayedTask(ToQueuedTask(incrementer), 0); + b->PostDelayedTask(ToQueuedTask(incrementer), 0); + rtc::Thread::Current()->PostTask(ToQueuedTask(event_signaler)); + + ThreadManager::ProcessAllMessageQueuesForTesting(); + EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed)); +} + +// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting. +TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) { + auto t = Thread::CreateWithSocketServer(); + t->Start(); + t->Quit(); + ThreadManager::ProcessAllMessageQueuesForTesting(); +} + +// Test that ProcessAllMessageQueues doesn't hang if a queue clears its +// messages. +TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) { + Event entered_process_all_message_queues(true, false); + auto t = Thread::CreateWithSocketServer(); + t->Start(); + + auto clearer = [&entered_process_all_message_queues] { + // Wait for event as a means to ensure Clear doesn't occur outside of + // ProcessAllMessageQueues. The event is set by a message posted to the + // main thread, which is guaranteed to be handled inside + // ProcessAllMessageQueues. + entered_process_all_message_queues.Wait(Event::kForever); + rtc::Thread::Current()->Clear(nullptr); + }; + auto event_signaler = [&entered_process_all_message_queues] { + entered_process_all_message_queues.Set(); + }; + + // Post messages (both delayed and non delayed) to both threads. + t->PostTask(RTC_FROM_HERE, clearer); + rtc::Thread::Current()->PostTask(RTC_FROM_HERE, event_signaler); + ThreadManager::ProcessAllMessageQueuesForTesting(); +} + +class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface { + public: + void OnMessage(Message* msg) override {} +}; + +class EmptyHandler : public MessageHandler { + public: + void OnMessage(Message* msg) override {} +}; + +TEST(ThreadManager, ClearReentrant) { + std::unique_ptr t(Thread::Create()); + EmptyHandler handler; + RefCountedHandler* inner_handler( + new rtc::RefCountedObject()); + // When the empty handler is destroyed, it will clear messages queued for + // itself. The message to be cleared itself wraps a MessageHandler object + // (RefCountedHandler) so this will cause the message queue to be cleared + // again in a re-entrant fashion, which previously triggered a DCHECK. + // The inner handler will be removed in a re-entrant fashion from the + // message queue of the thread while the outer handler is removed, verifying + // that the iterator is not invalidated in "MessageQueue::Clear". + t->Post(RTC_FROM_HERE, inner_handler, 0); + t->Post(RTC_FROM_HERE, &handler, 0, + new ScopedRefMessageData(inner_handler)); +} + class AsyncInvokeTest : public ::testing::Test { public: void IntCallback(int value) {