From 0bd166530d5b2e277b89e8edf5499d2b43830c3d Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Wed, 24 Aug 2022 18:35:45 +0200 Subject: [PATCH] in rtc::Thread remove special handling of the Dispose message rtc::Thread::Dispose is only used in test code, but complicates the main thread loop. Bug: webrtc:8324 Change-Id: I2dccdadcdc932b9992958d1e70fb93d1879b7618 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272821 Reviewed-by: Tomas Gunnarsson Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#37894} --- rtc_base/BUILD.gn | 1 + rtc_base/test_echo_server.h | 5 ++- rtc_base/thread.cc | 75 +++++++++++++------------------------ rtc_base/thread.h | 7 ++-- rtc_base/thread_message.h | 1 + rtc_base/thread_unittest.cc | 8 +++- 6 files changed, 42 insertions(+), 55 deletions(-) diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 6eb3622eef..80839cf872 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -940,6 +940,7 @@ rtc_library("threading") { "//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/cleanup", "//third_party/abseil-cpp/absl/functional:any_invocable", + "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", ] deps = [ diff --git a/rtc_base/test_echo_server.h b/rtc_base/test_echo_server.h index e4f70caee2..8e3c432853 100644 --- a/rtc_base/test_echo_server.h +++ b/rtc_base/test_echo_server.h @@ -18,6 +18,7 @@ #include #include "absl/algorithm/container.h" +#include "absl/memory/memory.h" #include "rtc_base/async_packet_socket.h" #include "rtc_base/async_tcp_socket.h" #include "rtc_base/socket.h" @@ -61,7 +62,9 @@ class TestEchoServer : public sigslot::has_slots<> { void OnClose(AsyncPacketSocket* socket, int err) { ClientList::iterator it = absl::c_find(client_sockets_, socket); client_sockets_.erase(it); - Thread::Current()->Dispose(socket); + // `OnClose` is triggered by socket Close callback, deleting `socket` while + // processing that callback might be unsafe. + Thread::Current()->PostTask([socket = absl::WrapUnique(socket)] {}); } typedef std::list ClientList; diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 99469896e6..5246cbe231 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -72,6 +72,8 @@ class ScopedAutoReleasePool { namespace rtc { namespace { +using ::webrtc::TimeDelta; + struct AnyInvocableMessage final : public MessageData { explicit AnyInvocableMessage(absl::AnyInvocable task) : task(std::move(task)) {} @@ -216,19 +218,6 @@ void ThreadManager::ProcessAllMessageQueuesInternal() { // that came before it were also dispatched. std::atomic queues_not_done(0); - // This class is used so that whether the posted message is processed, or the - // message queue is simply cleared, queues_not_done gets decremented. - class ScopedIncrement : public MessageData { - public: - ScopedIncrement(std::atomic* value) : value_(value) { - value_->fetch_add(1); - } - ~ScopedIncrement() override { value_->fetch_sub(1); } - - private: - std::atomic* value_; - }; - { MarkProcessingCritScope cs(&crit_, &processing_); for (Thread* queue : message_queues_) { @@ -238,8 +227,13 @@ void ThreadManager::ProcessAllMessageQueuesInternal() { // or ignored. continue; } - queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, - new ScopedIncrement(&queues_not_done)); + queues_not_done.fetch_add(1); + // Whether the task is processed, or the thread is simply cleared, + // queues_not_done gets decremented. + absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); }; + // Post delayed task instead of regular task to wait for all delayed tasks + // that are ready for processing. + queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero()); } } @@ -459,44 +453,27 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { while (true) { // Check for posted events int64_t cmsDelayNext = kForever; - bool first_pass = true; - while (true) { + { // All queue operations need to be locked, but nothing else in this loop - // (specifically handling disposed message) can happen inside the crit. - // Otherwise, disposed MessageHandlers will cause deadlocks. - { - CritScope cs(&crit_); - // On the first pass, check for delayed messages that have been - // triggered and calculate the next trigger time. - if (first_pass) { - first_pass = false; - while (!delayed_messages_.empty()) { - if (msCurrent < delayed_messages_.top().run_time_ms_) { - cmsDelayNext = - TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); - break; - } - messages_.push_back(delayed_messages_.top().msg_); - delayed_messages_.pop(); - } - } - // Pull a message off the message queue, if available. - if (messages_.empty()) { + // can happen inside the crit. + CritScope cs(&crit_); + // Check for delayed messages that have been triggered and calculate the + // next trigger time. + while (!delayed_messages_.empty()) { + if (msCurrent < delayed_messages_.top().run_time_ms_) { + cmsDelayNext = + TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); break; - } else { - *pmsg = messages_.front(); - messages_.pop_front(); } - } // crit_ is released here. - - // If this was a dispose message, delete it and skip it. - if (MQID_DISPOSE == pmsg->message_id) { - RTC_DCHECK(nullptr == pmsg->phandler); - delete pmsg->pdata; - *pmsg = Message(); - continue; + messages_.push_back(delayed_messages_.top().msg_); + delayed_messages_.pop(); + } + // Pull a message off the message queue, if available. + if (!messages_.empty()) { + *pmsg = messages_.front(); + messages_.pop_front(); + return true; } - return true; } if (IsQuitting()) diff --git a/rtc_base/thread.h b/rtc_base/thread.h index ef43e51a75..34848308c4 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -29,6 +29,7 @@ #endif #include "absl/base/attributes.h" #include "absl/functional/any_invocable.h" +#include "absl/memory/memory.h" #include "api/function_view.h" #include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" @@ -298,11 +299,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { } // Internally posts a message which causes the doomed object to be deleted + // TODO(bugs.webrtc.org/8324): Delete when unused by dependencies. template void Dispose(T* doomed) { - if (doomed) { - Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData(doomed)); - } + RTC_DCHECK(doomed); + PostTask([dommed = absl::WrapUnique(doomed)] {}); } bool IsCurrent() const; diff --git a/rtc_base/thread_message.h b/rtc_base/thread_message.h index c610c3b911..cf38338f43 100644 --- a/rtc_base/thread_message.h +++ b/rtc_base/thread_message.h @@ -89,6 +89,7 @@ class DisposeData : public MessageData { }; const uint32_t MQID_ANY = static_cast(-1); +// TODO(bugs.webrtc.org/8324): Delete when unused by dependencies. const uint32_t MQID_DISPOSE = static_cast(-2); // No destructor diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index 68416f5557..32589b3216 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -558,7 +558,11 @@ TEST(ThreadTest, ThreeThreadsInvoke) { class ThreadQueueTest : public ::testing::Test, public Thread { public: - ThreadQueueTest() : Thread(CreateDefaultSocketServer(), true) {} + ThreadQueueTest() : Thread(CreateDefaultSocketServer(), true) { + RTC_DCHECK(Thread::Current() == nullptr); + ThreadManager::Instance()->SetCurrentThread(this); + } + ~ThreadQueueTest() { ThreadManager::Instance()->SetCurrentThread(nullptr); } bool IsLocked_Worker() { if (!CritForTest()->TryEnter()) { return true; @@ -644,7 +648,7 @@ class DeletedMessageHandler : public MessageHandlerAutoCleanup { bool* deleted_; }; -TEST_F(ThreadQueueTest, DiposeHandlerWithPostedMessagePending) { +TEST_F(ThreadQueueTest, DisposeHandlerWithPostedMessagePending) { bool deleted = false; DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted); // First, post a dispose.