diff --git a/webrtc/rtc_base/asyncinvoker-inl.h b/webrtc/rtc_base/asyncinvoker-inl.h index 5b2cf4e250..7878b15f73 100644 --- a/webrtc/rtc_base/asyncinvoker-inl.h +++ b/webrtc/rtc_base/asyncinvoker-inl.h @@ -11,10 +11,12 @@ #ifndef WEBRTC_RTC_BASE_ASYNCINVOKER_INL_H_ #define WEBRTC_RTC_BASE_ASYNCINVOKER_INL_H_ -#include "webrtc/rtc_base/atomicops.h" #include "webrtc/rtc_base/bind.h" #include "webrtc/rtc_base/criticalsection.h" +#include "webrtc/rtc_base/event.h" #include "webrtc/rtc_base/messagehandler.h" +#include "webrtc/rtc_base/refcountedobject.h" +#include "webrtc/rtc_base/scoped_ref_ptr.h" #include "webrtc/rtc_base/sigslot.h" #include "webrtc/rtc_base/thread.h" #include "webrtc/rtc_base/thread_annotations.h" @@ -27,7 +29,7 @@ class AsyncInvoker; // on the calling thread if necessary. class AsyncClosure { public: - explicit AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {} + explicit AsyncClosure(AsyncInvoker* invoker); virtual ~AsyncClosure(); // Runs the asynchronous task, and triggers a callback to the calling // thread if needed. Should be called from the target thread. @@ -35,6 +37,11 @@ class AsyncClosure { protected: AsyncInvoker* invoker_; + // Reference counted so that if the AsyncInvoker destructor finishes before + // an AsyncClosure's destructor that's about to call + // "invocation_complete_->Set()", it's not dereferenced after being + // destroyed. + scoped_refptr> invocation_complete_; }; // Simple closure that doesn't trigger a callback for the calling thread. diff --git a/webrtc/rtc_base/asyncinvoker.cc b/webrtc/rtc_base/asyncinvoker.cc index 94abfd5d08..89e4e778ea 100644 --- a/webrtc/rtc_base/asyncinvoker.cc +++ b/webrtc/rtc_base/asyncinvoker.cc @@ -10,27 +10,30 @@ #include "webrtc/rtc_base/asyncinvoker.h" -#include "webrtc/rtc_base/atomicops.h" #include "webrtc/rtc_base/checks.h" #include "webrtc/rtc_base/logging.h" namespace rtc { -AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {} +AsyncInvoker::AsyncInvoker() + : pending_invocations_(0), + invocation_complete_(new RefCountedObject(false, false)), + destroying_(false) {} AsyncInvoker::~AsyncInvoker() { - destroying_ = true; + destroying_.store(true, std::memory_order_relaxed); // Messages for this need to be cleared *before* our destructor is complete. MessageQueueManager::Clear(this); // And we need to wait for any invocations that are still in progress on - // other threads. - while (AtomicOps::AcquireLoad(&pending_invocations_)) { + // other threads. Using memory_order_acquire for synchronization with + // AsyncClosure destructors. + while (pending_invocations_.load(std::memory_order_acquire) > 0) { // If the destructor was called while AsyncInvoke was being called by // another thread, WITHIN an AsyncInvoked functor, it may do another // Thread::Post even after we called MessageQueueManager::Clear(this). So // we need to keep calling Clear to discard these posts. - MessageQueueManager::Clear(this); - invocation_complete_.Wait(Event::kForever); + Thread::Current()->Clear(this); + invocation_complete_->Wait(Event::kForever); } } @@ -44,7 +47,10 @@ void AsyncInvoker::OnMessage(Message* msg) { } void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { - if (destroying_) return; + // If the destructor is waiting for invocations to finish, don't start + // running even more tasks. + if (destroying_.load(std::memory_order_relaxed)) + return; // Run this on |thread| to reduce the number of context switches. if (Thread::Current() != thread) { @@ -65,11 +71,14 @@ void AsyncInvoker::DoInvoke(const Location& posted_from, Thread* thread, std::unique_ptr closure, uint32_t id) { - if (destroying_) { + if (destroying_.load(std::memory_order_relaxed)) { + // Note that this may be expected, if the application is AsyncInvoking + // tasks that AsyncInvoke other tasks. But otherwise it indicates a race + // between a thread destroying the AsyncInvoker and a thread still trying + // to use it. LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; return; } - AtomicOps::Increment(&pending_invocations_); thread->Post(posted_from, this, id, new ScopedMessageData(std::move(closure))); } @@ -79,11 +88,11 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, std::unique_ptr closure, uint32_t delay_ms, uint32_t id) { - if (destroying_) { + if (destroying_.load(std::memory_order_relaxed)) { + // See above comment. LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; return; } - AtomicOps::Increment(&pending_invocations_); thread->PostDelayed(posted_from, delay_ms, this, id, new ScopedMessageData(std::move(closure))); } @@ -97,7 +106,7 @@ GuardedAsyncInvoker::~GuardedAsyncInvoker() { } bool GuardedAsyncInvoker::Flush(uint32_t id) { - rtc::CritScope cs(&crit_); + CritScope cs(&crit_); if (thread_ == nullptr) return false; invoker_.Flush(thread_, id); @@ -105,15 +114,30 @@ bool GuardedAsyncInvoker::Flush(uint32_t id) { } void GuardedAsyncInvoker::ThreadDestroyed() { - rtc::CritScope cs(&crit_); + CritScope cs(&crit_); // We should never get more than one notification about the thread dying. RTC_DCHECK(thread_ != nullptr); thread_ = nullptr; } +AsyncClosure::AsyncClosure(AsyncInvoker* invoker) + : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) { + invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed); +} + AsyncClosure::~AsyncClosure() { - AtomicOps::Decrement(&invoker_->pending_invocations_); - invoker_->invocation_complete_.Set(); + // Using memory_order_release for synchronization with the AsyncInvoker + // destructor. + invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release); + + // After |pending_invocations_| is decremented, we may need to signal + // |invocation_complete_| in case the AsyncInvoker is being destroyed and + // waiting for pending tasks to complete. + // + // It's also possible that the destructor finishes before "Set()" is called, + // which is safe because the event is reference counted (and in a thread-safe + // way). + invocation_complete_->Set(); } } // namespace rtc diff --git a/webrtc/rtc_base/asyncinvoker.h b/webrtc/rtc_base/asyncinvoker.h index 17d702a37b..455ded24dd 100644 --- a/webrtc/rtc_base/asyncinvoker.h +++ b/webrtc/rtc_base/asyncinvoker.h @@ -11,6 +11,7 @@ #ifndef WEBRTC_RTC_BASE_ASYNCINVOKER_H_ #define WEBRTC_RTC_BASE_ASYNCINVOKER_H_ +#include #include #include @@ -18,6 +19,8 @@ #include "webrtc/rtc_base/bind.h" #include "webrtc/rtc_base/constructormagic.h" #include "webrtc/rtc_base/event.h" +#include "webrtc/rtc_base/refcountedobject.h" +#include "webrtc/rtc_base/scoped_ref_ptr.h" #include "webrtc/rtc_base/sigslot.h" #include "webrtc/rtc_base/thread.h" @@ -70,6 +73,20 @@ namespace rtc { // AsyncInvoker invoker_; // int result_; // }; +// +// More details about threading: +// - It's safe to construct/destruct AsyncInvoker on different threads. +// - It's safe to call AsyncInvoke from different threads. +// - It's safe to call AsyncInvoke recursively from *within* a functor that's +// being AsyncInvoked. +// - However, it's *not* safe to call AsyncInvoke from *outside* a functor +// that's being AsyncInvoked while the AsyncInvoker is being destroyed on +// another thread. This is just inherently unsafe and there's no way to +// prevent that. So, the user of this class should ensure that the start of +// each "chain" of invocations is synchronized somehow with the AsyncInvoker's +// destruction. This can be done by starting each chain of invocations on the +// same thread on which it will be destroyed, or by using some other +// synchronization method. class AsyncInvoker : public MessageHandler { public: AsyncInvoker(); @@ -118,9 +135,28 @@ class AsyncInvoker : public MessageHandler { std::unique_ptr closure, uint32_t delay_ms, uint32_t id); - volatile int pending_invocations_ = 0; - Event invocation_complete_; - bool destroying_ = false; + + // Used to keep track of how many invocations (AsyncClosures) are still + // alive, so that the destructor can wait for them to finish, as described in + // the class documentation. + // + // TODO(deadbeef): Using a raw std::atomic like this is prone to error and + // difficult to maintain. We should try to wrap this functionality in a + // separate class to reduce the chance of errors being introduced in the + // future. + std::atomic pending_invocations_; + + // Reference counted so that if the AsyncInvoker destructor finishes before + // an AsyncClosure's destructor that's about to call + // "invocation_complete_->Set()", it's not dereferenced after being + // destroyed. + scoped_refptr> invocation_complete_; + + // This flag is used to ensure that if an application AsyncInvokes tasks that + // recursively AsyncInvoke other tasks ad infinitum, the cycle eventually + // terminates. + std::atomic destroying_; + friend class AsyncClosure; RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker); @@ -149,7 +185,7 @@ class GuardedAsyncInvoker : public sigslot::has_slots<> { bool AsyncInvoke(const Location& posted_from, const FunctorT& functor, uint32_t id = 0) { - rtc::CritScope cs(&crit_); + CritScope cs(&crit_); if (thread_ == nullptr) return false; invoker_.AsyncInvoke(posted_from, thread_, functor, id); @@ -163,7 +199,7 @@ class GuardedAsyncInvoker : public sigslot::has_slots<> { const FunctorT& functor, uint32_t delay_ms, uint32_t id = 0) { - rtc::CritScope cs(&crit_); + CritScope cs(&crit_); if (thread_ == nullptr) return false; invoker_.AsyncInvokeDelayed(posted_from, thread_, @@ -180,7 +216,7 @@ class GuardedAsyncInvoker : public sigslot::has_slots<> { void (HostT::*callback)(ReturnT), HostT* callback_host, uint32_t id = 0) { - rtc::CritScope cs(&crit_); + CritScope cs(&crit_); if (thread_ == nullptr) return false; invoker_.AsyncInvoke( @@ -198,7 +234,7 @@ class GuardedAsyncInvoker : public sigslot::has_slots<> { void (HostT::*callback)(), HostT* callback_host, uint32_t id = 0) { - rtc::CritScope cs(&crit_); + CritScope cs(&crit_); if (thread_ == nullptr) return false; invoker_.AsyncInvoke( diff --git a/webrtc/rtc_base/thread_unittest.cc b/webrtc/rtc_base/thread_unittest.cc index a8c20d1b77..e7701e84b3 100644 --- a/webrtc/rtc_base/thread_unittest.cc +++ b/webrtc/rtc_base/thread_unittest.cc @@ -458,19 +458,20 @@ TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) { thread->Start(); volatile bool invoker_destroyed = false; { + auto functor = [&functor_started, &functor_continue, &functor_finished, + &invoker_destroyed] { + functor_started.Set(); + functor_continue.Wait(Event::kForever); + rtc::Thread::Current()->SleepMs(kWaitTimeout); + EXPECT_FALSE(invoker_destroyed); + functor_finished.Set(); + }; AsyncInvoker invoker; - invoker.AsyncInvoke(RTC_FROM_HERE, thread.get(), - [&functor_started, &functor_continue, - &functor_finished, &invoker_destroyed] { - functor_started.Set(); - functor_continue.Wait(Event::kForever); - rtc::Thread::Current()->SleepMs(kWaitTimeout); - EXPECT_FALSE(invoker_destroyed); - functor_finished.Set(); - }); + invoker.AsyncInvoke(RTC_FROM_HERE, thread.get(), functor); functor_started.Wait(Event::kForever); - // Allow the functor to continue and immediately destroy the invoker. + // Destroy the invoker while the functor is still executing (doing + // SleepMs). functor_continue.Set(); } @@ -481,6 +482,37 @@ TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) { functor_finished.Wait(Event::kForever); } +// Variant of the above test where the async-invoked task calls AsyncInvoke +// *again*, for the thread on which the AsyncInvoker is currently being +// destroyed. This shouldn't deadlock or crash; this second invocation should +// just be ignored. +TEST_F(AsyncInvokeTest, KillInvokerDuringExecuteWithReentrantInvoke) { + Event functor_started(false, false); + // Flag used to verify that the recursively invoked task never actually runs. + bool reentrant_functor_run = false; + + Thread* main = Thread::Current(); + Thread thread; + thread.Start(); + { + AsyncInvoker invoker; + auto reentrant_functor = [&reentrant_functor_run] { + reentrant_functor_run = true; + }; + auto functor = [&functor_started, &invoker, main, reentrant_functor] { + functor_started.Set(); + Thread::Current()->SleepMs(kWaitTimeout); + invoker.AsyncInvoke(RTC_FROM_HERE, main, reentrant_functor); + }; + // This queues a task on |thread| to sleep for |kWaitTimeout| then queue a + // task on |main|. But this second queued task should never run, since the + // destructor will be entered before it's even invoked. + invoker.AsyncInvoke(RTC_FROM_HERE, &thread, functor); + functor_started.Wait(Event::kForever); + } + EXPECT_FALSE(reentrant_functor_run); +} + TEST_F(AsyncInvokeTest, Flush) { AsyncInvoker invoker; AtomicBool flag1;