diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 2882f50da3..fbbac28187 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -32,6 +32,7 @@ #include "rtc_base/atomic_ops.h" #include "rtc_base/checks.h" #include "rtc_base/deprecated/recursive_critical_section.h" +#include "rtc_base/event.h" #include "rtc_base/logging.h" #include "rtc_base/null_socket_server.h" #include "rtc_base/synchronization/sequence_checker.h" @@ -164,6 +165,9 @@ void ThreadManager::RemoveFromSendGraph(Thread* thread) { void ThreadManager::RegisterSendAndCheckForCycles(Thread* source, Thread* target) { + RTC_DCHECK(source); + RTC_DCHECK(target); + CritScope cs(&crit_); std::deque all_targets({target}); // We check the pre-existing who-sends-to-who graph for any path from target @@ -890,46 +894,62 @@ void Thread::Send(const Location& posted_from, AssertBlockingIsAllowedOnCurrentThread(); - AutoThread thread; Thread* current_thread = Thread::Current(); - RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this - RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); + #if RTC_DCHECK_IS_ON - ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, - this); -#endif - bool ready = false; - PostTask( - webrtc::ToQueuedTask([msg]() mutable { msg.phandler->OnMessage(&msg); }, - [this, &ready, current_thread] { - CritScope cs(&crit_); - ready = true; - current_thread->socketserver()->WakeUp(); - })); - - bool waited = false; - crit_.Enter(); - while (!ready) { - crit_.Leave(); - current_thread->socketserver()->Wait(kForever, false); - waited = true; - crit_.Enter(); + if (current_thread) { + RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); + ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, + this); } - crit_.Leave(); +#endif - // Our Wait loop above may have consumed some WakeUp events for this - // Thread, that weren't relevant to this Send. Losing these WakeUps can - // cause problems for some SocketServers. - // - // Concrete example: - // Win32SocketServer on thread A calls Send on thread B. While processing the - // message, thread B Posts a message to A. We consume the wakeup for that - // Post while waiting for the Send to complete, which means that when we exit - // this loop, we need to issue another WakeUp, or else the Posted message - // won't be processed in a timely manner. + // Perhaps down the line we can get rid of this workaround and always require + // current_thread to be valid when Send() is called. + std::unique_ptr done_event; + if (!current_thread) + done_event.reset(new rtc::Event()); - if (waited) { - current_thread->socketserver()->WakeUp(); + bool ready = false; + PostTask(webrtc::ToQueuedTask( + [&msg]() mutable { msg.phandler->OnMessage(&msg); }, + [this, &ready, current_thread, done = done_event.get()] { + if (current_thread) { + CritScope cs(&crit_); + ready = true; + current_thread->socketserver()->WakeUp(); + } else { + done->Set(); + } + })); + + if (current_thread) { + bool waited = false; + crit_.Enter(); + while (!ready) { + crit_.Leave(); + current_thread->socketserver()->Wait(kForever, false); + waited = true; + crit_.Enter(); + } + crit_.Leave(); + + // Our Wait loop above may have consumed some WakeUp events for this + // Thread, that weren't relevant to this Send. Losing these WakeUps can + // cause problems for some SocketServers. + // + // Concrete example: + // Win32SocketServer on thread A calls Send on thread B. While processing + // the message, thread B Posts a message to A. We consume the wakeup for + // that Post while waiting for the Send to complete, which means that when + // we exit this loop, we need to issue another WakeUp, or else the Posted + // message won't be processed in a timely manner. + + if (waited) { + current_thread->socketserver()->WakeUp(); + } + } else { + done_event->Wait(rtc::Event::kForever); } } diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 353d63032d..dbd693e70b 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -625,7 +625,9 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // AutoThread automatically installs itself at construction // uninstalls at destruction, if a Thread object is // _not already_ associated with the current OS thread. - +// +// NOTE: *** This class should only be used by tests *** +// class AutoThread : public Thread { public: AutoThread();