diff --git a/tools-webrtc/sanitizers/tsan_suppressions_webrtc.cc b/tools-webrtc/sanitizers/tsan_suppressions_webrtc.cc index 999a3d0e89..b439322a54 100644 --- a/tools-webrtc/sanitizers/tsan_suppressions_webrtc.cc +++ b/tools-webrtc/sanitizers/tsan_suppressions_webrtc.cc @@ -41,12 +41,8 @@ char kTSanDefaultSuppressions[] = // rtc_unittests // https://code.google.com/p/webrtc/issues/detail?id=3911 for details. -"race:rtc::AsyncInvoker::OnMessage\n" -"race:rtc::FireAndForgetAsyncClosure::Execute\n" "race:rtc::MessageQueueManager::Clear\n" "race:rtc::Thread::Clear\n" -// https://code.google.com/p/webrtc/issues/detail?id=3914 -"race:rtc::AsyncInvoker::~AsyncInvoker\n" // https://code.google.com/p/webrtc/issues/detail?id=2080 "race:webrtc/base/logging.cc\n" "race:webrtc/base/sharedexclusivelock_unittest.cc\n" diff --git a/webrtc/base/asyncinvoker-inl.h b/webrtc/base/asyncinvoker-inl.h index c3691b5659..d172eee0a6 100644 --- a/webrtc/base/asyncinvoker-inl.h +++ b/webrtc/base/asyncinvoker-inl.h @@ -11,6 +11,7 @@ #ifndef WEBRTC_BASE_ASYNCINVOKER_INL_H_ #define WEBRTC_BASE_ASYNCINVOKER_INL_H_ +#include "webrtc/base/atomicops.h" #include "webrtc/base/bind.h" #include "webrtc/base/callback.h" #include "webrtc/base/criticalsection.h" @@ -26,18 +27,23 @@ class AsyncInvoker; // on the calling thread if necessary. class AsyncClosure { public: - virtual ~AsyncClosure() {} + explicit AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {} + virtual ~AsyncClosure(); // Runs the asynchronous task, and triggers a callback to the calling // thread if needed. Should be called from the target thread. virtual void Execute() = 0; + + protected: + AsyncInvoker* invoker_; }; // Simple closure that doesn't trigger a callback for the calling thread. template class FireAndForgetAsyncClosure : public AsyncClosure { public: - explicit FireAndForgetAsyncClosure(const FunctorT& functor) - : functor_(functor) {} + explicit FireAndForgetAsyncClosure(AsyncInvoker* invoker, + const FunctorT& functor) + : AsyncClosure(invoker), functor_(functor) {} virtual void Execute() { functor_(); } @@ -65,7 +71,6 @@ class NotifyingAsyncClosureBase : public AsyncClosure, bool CallbackCanceled() const { return calling_thread_ == NULL; } private: - AsyncInvoker* invoker_; Location callback_posted_from_; Callback0 callback_; CriticalSection crit_; diff --git a/webrtc/base/asyncinvoker.cc b/webrtc/base/asyncinvoker.cc index 77b63a4e66..aa65499c96 100644 --- a/webrtc/base/asyncinvoker.cc +++ b/webrtc/base/asyncinvoker.cc @@ -10,18 +10,29 @@ #include "webrtc/base/asyncinvoker.h" +#include "webrtc/base/atomicops.h" #include "webrtc/base/checks.h" #include "webrtc/base/logging.h" namespace rtc { -AsyncInvoker::AsyncInvoker() : destroying_(false) {} +AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {} AsyncInvoker::~AsyncInvoker() { destroying_ = true; SignalInvokerDestroyed(); // Messages for this need to be cleared *before* our destructor is complete. MessageQueueManager::Clear(this); + // And we need to wait for any invocations that are still in progress on + // other threads. + while (AtomicOps::AcquireLoad(&pending_invocations_)) { + // If the destructor was called while AsyncInvoke was being called by + // another thread, WITHIN an AsyncInvoked functor, it may do another + // Thread::Post even after we called MessageQueueManager::Clear(this). So + // we need to keep calling Clear to discard these posts. + MessageQueueManager::Clear(this); + invocation_complete_.Wait(Event::kForever); + } } void AsyncInvoker::OnMessage(Message* msg) { @@ -59,6 +70,7 @@ void AsyncInvoker::DoInvoke(const Location& posted_from, LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; return; } + AtomicOps::Increment(&pending_invocations_); thread->Post(posted_from, this, id, new ScopedMessageData(std::move(closure))); } @@ -72,6 +84,7 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; return; } + AtomicOps::Increment(&pending_invocations_); thread->PostDelayed(posted_from, delay_ms, this, id, new ScopedMessageData(std::move(closure))); } @@ -99,11 +112,16 @@ void GuardedAsyncInvoker::ThreadDestroyed() { thread_ = nullptr; } +AsyncClosure::~AsyncClosure() { + AtomicOps::Decrement(&invoker_->pending_invocations_); + invoker_->invocation_complete_.Set(); +} + NotifyingAsyncClosureBase::NotifyingAsyncClosureBase( AsyncInvoker* invoker, const Location& callback_posted_from, Thread* calling_thread) - : invoker_(invoker), + : AsyncClosure(invoker), callback_posted_from_(callback_posted_from), calling_thread_(calling_thread) { calling_thread->SignalQueueDestroyed.connect( diff --git a/webrtc/base/asyncinvoker.h b/webrtc/base/asyncinvoker.h index 43fffeac10..ed61243974 100644 --- a/webrtc/base/asyncinvoker.h +++ b/webrtc/base/asyncinvoker.h @@ -17,6 +17,7 @@ #include "webrtc/base/asyncinvoker-inl.h" #include "webrtc/base/bind.h" #include "webrtc/base/constructormagic.h" +#include "webrtc/base/event.h" #include "webrtc/base/sigslot.h" #include "webrtc/base/thread.h" @@ -82,7 +83,7 @@ class AsyncInvoker : public MessageHandler { const FunctorT& functor, uint32_t id = 0) { std::unique_ptr closure( - new FireAndForgetAsyncClosure(functor)); + new FireAndForgetAsyncClosure(this, functor)); DoInvoke(posted_from, thread, std::move(closure), id); } @@ -95,7 +96,7 @@ class AsyncInvoker : public MessageHandler { uint32_t delay_ms, uint32_t id = 0) { std::unique_ptr closure( - new FireAndForgetAsyncClosure(functor)); + new FireAndForgetAsyncClosure(this, functor)); DoInvokeDelayed(posted_from, thread, std::move(closure), delay_ms, id); } @@ -157,7 +158,10 @@ class AsyncInvoker : public MessageHandler { std::unique_ptr closure, uint32_t delay_ms, uint32_t id); - bool destroying_; + volatile int pending_invocations_ = 0; + Event invocation_complete_; + bool destroying_ = false; + friend class AsyncClosure; RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker); }; diff --git a/webrtc/base/event.h b/webrtc/base/event.h index 7686d30af2..d4b58724f0 100644 --- a/webrtc/base/event.h +++ b/webrtc/base/event.h @@ -11,6 +11,7 @@ #ifndef WEBRTC_BASE_EVENT_H__ #define WEBRTC_BASE_EVENT_H__ +#include "webrtc/base/constructormagic.h" #if defined(WEBRTC_WIN) #include "webrtc/base/win32.h" // NOLINT: consider this a system header. #elif defined(WEBRTC_POSIX) @@ -44,6 +45,8 @@ class Event { const bool is_manual_reset_; bool event_status_; #endif + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Event); }; } // namespace rtc diff --git a/webrtc/base/thread_unittest.cc b/webrtc/base/thread_unittest.cc index b0c8cb59af..e4e74d6db0 100644 --- a/webrtc/base/thread_unittest.cc +++ b/webrtc/base/thread_unittest.cc @@ -356,13 +356,14 @@ TEST(ThreadTest, ThreeThreadsInvoke) { // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until // |thread1| starts the call. - static void AsyncInvokeSetAndWait( - Thread* thread1, Thread* thread2, LockedBool* out) { + static void AsyncInvokeSetAndWait(AsyncInvoker* invoker, + Thread* thread1, + Thread* thread2, + LockedBool* out) { CriticalSection crit; LockedBool async_invoked(false); - AsyncInvoker invoker; - invoker.AsyncInvoke( + invoker->AsyncInvoke( RTC_FROM_HERE, thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out)); @@ -370,14 +371,15 @@ TEST(ThreadTest, ThreeThreadsInvoke) { } }; + AsyncInvoker invoker; LockedBool thread_a_called(false); // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A. // Thread B returns when C receives the call and C should be blocked until A // starts to process messages. thread_b.Invoke(RTC_FROM_HERE, - Bind(&LocalFuncs::AsyncInvokeSetAndWait, &thread_c, - thread_a, &thread_a_called)); + Bind(&LocalFuncs::AsyncInvokeSetAndWait, &invoker, + &thread_c, thread_a, &thread_a_called)); EXPECT_FALSE(thread_a_called.Get()); EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000); @@ -524,6 +526,41 @@ TEST_F(AsyncInvokeTest, KillInvokerBeforeExecute) { EXPECT_EQ(0, int_value_); } +TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) { + // Use these events to get in a state where the functor is in the middle of + // executing, and then to wait for it to finish, ensuring the "EXPECT_FALSE" + // is run. + Event functor_started(false, false); + Event functor_continue(false, false); + Event functor_finished(false, false); + + Thread thread; + thread.Start(); + volatile bool invoker_destroyed = false; + { + AsyncInvoker invoker; + invoker.AsyncInvoke(RTC_FROM_HERE, &thread, + [&functor_started, &functor_continue, + &functor_finished, &invoker_destroyed] { + functor_started.Set(); + functor_continue.Wait(Event::kForever); + rtc::Thread::Current()->SleepMs(kWaitTimeout); + EXPECT_FALSE(invoker_destroyed); + functor_finished.Set(); + }); + functor_started.Wait(Event::kForever); + + // Allow the functor to continue and immediately destroy the invoker. + functor_continue.Set(); + } + + // If the destructor DIDN'T wait for the functor to finish executing, it will + // hit the EXPECT_FALSE(invoker_destroyed) after it finishes sleeping for a + // second. + invoker_destroyed = true; + functor_finished.Wait(Event::kForever); +} + TEST_F(AsyncInvokeTest, Flush) { AsyncInvoker invoker; AtomicBool flag1;