diff --git a/rtc_base/async_invoker.cc b/rtc_base/async_invoker.cc index 26f8c523ab..8b410a4561 100644 --- a/rtc_base/async_invoker.cc +++ b/rtc_base/async_invoker.cc @@ -101,28 +101,6 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, new ScopedMessageData(std::move(closure))); } -GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) { - thread_->SignalQueueDestroyed.connect(this, - &GuardedAsyncInvoker::ThreadDestroyed); -} - -GuardedAsyncInvoker::~GuardedAsyncInvoker() {} - -bool GuardedAsyncInvoker::Flush(uint32_t id) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.Flush(thread_, id); - return true; -} - -void GuardedAsyncInvoker::ThreadDestroyed() { - CritScope cs(&crit_); - // We should never get more than one notification about the thread dying. - RTC_DCHECK(thread_ != nullptr); - thread_ = nullptr; -} - AsyncClosure::AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) { invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed); diff --git a/rtc_base/async_invoker.h b/rtc_base/async_invoker.h index f15955d811..ed2df1cdcb 100644 --- a/rtc_base/async_invoker.h +++ b/rtc_base/async_invoker.h @@ -169,97 +169,6 @@ class AsyncInvoker : public MessageHandler { RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker); }; -// Similar to AsyncInvoker, but guards against the Thread being destroyed while -// there are outstanding dangling pointers to it. It will connect to the current -// thread in the constructor, and will get notified when that thread is -// destroyed. After GuardedAsyncInvoker is constructed, it can be used from -// other threads to post functors to the thread it was constructed on. If that -// thread dies, any further calls to AsyncInvoke() will be safely ignored. -class GuardedAsyncInvoker : public sigslot::has_slots<> { - public: - GuardedAsyncInvoker(); - ~GuardedAsyncInvoker() override; - - // Synchronously execute all outstanding calls we own, and wait for calls to - // complete before returning. Optionally filter by message id. The destructor - // will not wait for outstanding calls, so if that behavior is desired, call - // Flush() first. Returns false if the thread has died. - bool Flush(uint32_t id = MQID_ANY); - - // Call |functor| asynchronously with no callback upon completion. Returns - // immediately. Returns false if the thread has died. - template - bool AsyncInvoke(const Location& posted_from, - FunctorT&& functor, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvoke( - posted_from, thread_, std::forward(functor), id); - return true; - } - - // Call |functor| asynchronously with |delay_ms|, with no callback upon - // completion. Returns immediately. Returns false if the thread has died. - template - bool AsyncInvokeDelayed(const Location& posted_from, - FunctorT&& functor, - uint32_t delay_ms, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvokeDelayed( - posted_from, thread_, std::forward(functor), delay_ms, id); - return true; - } - - // Call |functor| asynchronously, calling |callback| when done. Returns false - // if the thread has died. - template - bool AsyncInvoke(const Location& posted_from, - const Location& callback_posted_from, - FunctorT&& functor, - void (HostT::*callback)(ReturnT), - HostT* callback_host, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvoke( - posted_from, callback_posted_from, thread_, - std::forward(functor), callback, callback_host, id); - return true; - } - - // Call |functor| asynchronously calling |callback| when done. Overloaded for - // void return. Returns false if the thread has died. - template - bool AsyncInvoke(const Location& posted_from, - const Location& callback_posted_from, - FunctorT&& functor, - void (HostT::*callback)(), - HostT* callback_host, - uint32_t id = 0) { - CritScope cs(&crit_); - if (thread_ == nullptr) - return false; - invoker_.AsyncInvoke( - posted_from, callback_posted_from, thread_, - std::forward(functor), callback, callback_host, id); - return true; - } - - private: - // Callback when |thread_| is destroyed. - void ThreadDestroyed(); - - CriticalSection crit_; - Thread* thread_ RTC_GUARDED_BY(crit_); - AsyncInvoker invoker_ RTC_GUARDED_BY(crit_); -}; - } // namespace rtc #endif // RTC_BASE_ASYNC_INVOKER_H_ diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index f0eede3550..bc0c3442eb 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -837,105 +837,6 @@ TEST_F(AsyncInvokeTest, FlushWithIds) { EXPECT_TRUE(flag2.get()); } -class GuardedAsyncInvokeTest : public ::testing::Test { - public: - void IntCallback(int value) { - EXPECT_EQ(expected_thread_, Thread::Current()); - int_value_ = value; - } - void SetExpectedThreadForIntCallback(Thread* thread) { - expected_thread_ = thread; - } - - protected: - constexpr static int kWaitTimeout = 1000; - GuardedAsyncInvokeTest() : int_value_(0), expected_thread_(nullptr) {} - - int int_value_; - Thread* expected_thread_; -}; - -// Functor for creating an invoker. -struct CreateInvoker { - CreateInvoker(std::unique_ptr* invoker) - : invoker_(invoker) {} - void operator()() { invoker_->reset(new GuardedAsyncInvoker()); } - std::unique_ptr* invoker_; -}; - -// Test that we can call AsyncInvoke() after the thread died. -TEST_F(GuardedAsyncInvokeTest, KillThreadFireAndForget) { - // Create and start the thread. - std::unique_ptr thread(Thread::Create()); - thread->Start(); - std::unique_ptr invoker; - // Create the invoker on |thread|. - thread->Invoke(RTC_FROM_HERE, CreateInvoker(&invoker)); - // Kill |thread|. - thread = nullptr; - // Try calling functor. - AtomicBool called; - EXPECT_FALSE(invoker->AsyncInvoke(RTC_FROM_HERE, FunctorB(&called))); - // With thread gone, nothing should happen. - WAIT(called.get(), kWaitTimeout); - EXPECT_FALSE(called.get()); -} - -// The remaining tests check that GuardedAsyncInvoker behaves as AsyncInvoker -// when Thread is still alive. -TEST_F(GuardedAsyncInvokeTest, FireAndForget) { - GuardedAsyncInvoker invoker; - // Try calling functor. - AtomicBool called; - EXPECT_TRUE(invoker.AsyncInvoke(RTC_FROM_HERE, FunctorB(&called))); - EXPECT_TRUE_WAIT(called.get(), kWaitTimeout); -} - -TEST_F(GuardedAsyncInvokeTest, NonCopyableFunctor) { - GuardedAsyncInvoker invoker; - // Try calling functor. - AtomicBool called; - EXPECT_TRUE(invoker.AsyncInvoke(RTC_FROM_HERE, FunctorD(&called))); - EXPECT_TRUE_WAIT(called.get(), kWaitTimeout); -} - -TEST_F(GuardedAsyncInvokeTest, Flush) { - GuardedAsyncInvoker invoker; - AtomicBool flag1; - AtomicBool flag2; - // Queue two async calls to the current thread. - EXPECT_TRUE(invoker.AsyncInvoke(RTC_FROM_HERE, FunctorB(&flag1))); - EXPECT_TRUE(invoker.AsyncInvoke(RTC_FROM_HERE, FunctorB(&flag2))); - // Because we haven't pumped messages, these should not have run yet. - EXPECT_FALSE(flag1.get()); - EXPECT_FALSE(flag2.get()); - // Force them to run now. - EXPECT_TRUE(invoker.Flush()); - EXPECT_TRUE(flag1.get()); - EXPECT_TRUE(flag2.get()); -} - -TEST_F(GuardedAsyncInvokeTest, FlushWithIds) { - GuardedAsyncInvoker invoker; - AtomicBool flag1; - AtomicBool flag2; - // Queue two async calls to the current thread, one with a message id. - EXPECT_TRUE(invoker.AsyncInvoke(RTC_FROM_HERE, FunctorB(&flag1), 5)); - EXPECT_TRUE(invoker.AsyncInvoke(RTC_FROM_HERE, FunctorB(&flag2))); - // Because we haven't pumped messages, these should not have run yet. - EXPECT_FALSE(flag1.get()); - EXPECT_FALSE(flag2.get()); - // Execute pending calls with id == 5. - EXPECT_TRUE(invoker.Flush(5)); - EXPECT_TRUE(flag1.get()); - EXPECT_FALSE(flag2.get()); - flag1 = false; - // Execute all pending calls. The id == 5 call should not execute again. - EXPECT_TRUE(invoker.Flush()); - EXPECT_FALSE(flag1.get()); - EXPECT_TRUE(flag2.get()); -} - void ThreadIsCurrent(Thread* thread, bool* result, Event* event) { *result = thread->IsCurrent(); event->Set();