diff --git a/webrtc/base/asyncinvoker-inl.h b/webrtc/base/asyncinvoker-inl.h index 5f7cd4959a..a1c2c8e0cf 100644 --- a/webrtc/base/asyncinvoker-inl.h +++ b/webrtc/base/asyncinvoker-inl.h @@ -27,7 +27,7 @@ class AsyncInvoker; // on the calling thread if necessary. class AsyncClosure { public: - explicit AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {} + explicit AsyncClosure(AsyncInvoker* invoker); virtual ~AsyncClosure(); // Runs the asynchronous task, and triggers a callback to the calling // thread if needed. Should be called from the target thread. diff --git a/webrtc/base/asyncinvoker.cc b/webrtc/base/asyncinvoker.cc index bfd13172f7..d2583091cc 100644 --- a/webrtc/base/asyncinvoker.cc +++ b/webrtc/base/asyncinvoker.cc @@ -19,7 +19,7 @@ namespace rtc { AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {} AsyncInvoker::~AsyncInvoker() { - destroying_ = true; + AtomicOps::Increment(&destroying_); // Messages for this need to be cleared *before* our destructor is complete. MessageQueueManager::Clear(this); // And we need to wait for any invocations that are still in progress on @@ -44,7 +44,8 @@ void AsyncInvoker::OnMessage(Message* msg) { } void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { - if (destroying_) return; + if (AtomicOps::AcquireLoad(&destroying_)) + return; // Run this on |thread| to reduce the number of context switches. if (Thread::Current() != thread) { @@ -65,11 +66,10 @@ void AsyncInvoker::DoInvoke(const Location& posted_from, Thread* thread, std::unique_ptr closure, uint32_t id) { - if (destroying_) { + if (AtomicOps::AcquireLoad(&destroying_)) { LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; return; } - AtomicOps::Increment(&pending_invocations_); thread->Post(posted_from, this, id, new ScopedMessageData(std::move(closure))); } @@ -79,11 +79,10 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, std::unique_ptr closure, uint32_t delay_ms, uint32_t id) { - if (destroying_) { + if (AtomicOps::AcquireLoad(&destroying_)) { LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; return; } - AtomicOps::Increment(&pending_invocations_); thread->PostDelayed(posted_from, delay_ms, this, id, new ScopedMessageData(std::move(closure))); } @@ -111,6 +110,10 @@ void GuardedAsyncInvoker::ThreadDestroyed() { thread_ = nullptr; } +AsyncClosure::AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) { + AtomicOps::Increment(&invoker_->pending_invocations_); +} + AsyncClosure::~AsyncClosure() { AtomicOps::Decrement(&invoker_->pending_invocations_); invoker_->invocation_complete_.Set(); diff --git a/webrtc/base/asyncinvoker.h b/webrtc/base/asyncinvoker.h index 5414867418..0684f4cfee 100644 --- a/webrtc/base/asyncinvoker.h +++ b/webrtc/base/asyncinvoker.h @@ -120,7 +120,7 @@ class AsyncInvoker : public MessageHandler { uint32_t id); volatile int pending_invocations_ = 0; Event invocation_complete_; - bool destroying_ = false; + int destroying_ = 0; friend class AsyncClosure; RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker); diff --git a/webrtc/base/thread_unittest.cc b/webrtc/base/thread_unittest.cc index c143120fbc..c8f9e35321 100644 --- a/webrtc/base/thread_unittest.cc +++ b/webrtc/base/thread_unittest.cc @@ -104,7 +104,7 @@ class MessageClient : public MessageHandler, public TestGenerator { Socket* socket_; }; -class CustomThread : public rtc::Thread { +class CustomThread : public Thread { public: CustomThread() {} virtual ~CustomThread() { Stop(); } @@ -150,7 +150,7 @@ class SignalWhenDestroyedThread : public Thread { // Using std::atomic or std::atomic_flag in C++11 is probably // the right thing to do, but those features are not yet allowed. Or -// rtc::AtomicInt, if/when that is added. Since the use isn't +// AtomicInt, if/when that is added. Since the use isn't // performance critical, use a plain critical section for the time // being. @@ -451,27 +451,23 @@ TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) { // executing, and then to wait for it to finish, ensuring the "EXPECT_FALSE" // is run. Event functor_started(false, false); - Event functor_continue(false, false); Event functor_finished(false, false); Thread thread; thread.Start(); volatile bool invoker_destroyed = false; { + auto functor = [&functor_started, &functor_finished, &invoker_destroyed] { + functor_started.Set(); + Thread::Current()->SleepMs(kWaitTimeout); + EXPECT_FALSE(invoker_destroyed); + functor_finished.Set(); + }; AsyncInvoker invoker; - invoker.AsyncInvoke(RTC_FROM_HERE, &thread, - [&functor_started, &functor_continue, - &functor_finished, &invoker_destroyed] { - functor_started.Set(); - functor_continue.Wait(Event::kForever); - rtc::Thread::Current()->SleepMs(kWaitTimeout); - EXPECT_FALSE(invoker_destroyed); - functor_finished.Set(); - }); + invoker.AsyncInvoke(RTC_FROM_HERE, &thread, functor); functor_started.Wait(Event::kForever); - - // Allow the functor to continue and immediately destroy the invoker. - functor_continue.Set(); + // Destroy the invoker while the functor is still executing (doing + // SleepMs). } // If the destructor DIDN'T wait for the functor to finish executing, it will @@ -481,6 +477,35 @@ TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) { functor_finished.Wait(Event::kForever); } +// Variant of the above test where the async-invoked task calls AsyncInvoke +// again, for the thread on which the AsyncInvoker is currently being +// destroyed. This shouldn't deadlock or crash; this second invocation should +// just be ignored. +TEST_F(AsyncInvokeTest, KillInvokerDuringExecuteWithReentrantInvoke) { + Event functor_started(false, false); + bool reentrant_functor_run = false; + + Thread* main = Thread::Current(); + Thread thread; + thread.Start(); + { + AsyncInvoker invoker; + auto reentrant_functor = [&reentrant_functor_run] { + reentrant_functor_run = true; + }; + auto functor = [&functor_started, &invoker, main, reentrant_functor] { + functor_started.Set(); + Thread::Current()->SleepMs(kWaitTimeout); + invoker.AsyncInvoke(RTC_FROM_HERE, main, reentrant_functor); + }; + // This queues a task on |thread| to sleep for |kWaitTimeout| then queue a + // task on |main|. But this second queued task should never run. + invoker.AsyncInvoke(RTC_FROM_HERE, &thread, functor); + functor_started.Wait(Event::kForever); + } + EXPECT_FALSE(reentrant_functor_run); +} + TEST_F(AsyncInvokeTest, Flush) { AsyncInvoker invoker; AtomicBool flag1;