Making AsyncInvoker destructor thread-safe.
The documentation for AsyncInvoker states that it owns the lifetime of calls, and when its destructor is called, all in-flight calls are cancelled or finish executing. The "cancelled" part is working, but if a call is in the middle of executing, the destructor does *not* wait. This is fixed by keeping a count of pending invocations, which is decremented when a call is either cleared from a message queue or finishes executing. BUG=webrtc:3914, webrtc:3911 Review-Url: https://codereview.webrtc.org/2694723004 Cr-Commit-Position: refs/heads/master@{#16811}
This commit is contained in:
parent
8e1435519c
commit
162cb53e7b
@ -41,12 +41,8 @@ char kTSanDefaultSuppressions[] =
|
||||
|
||||
// rtc_unittests
|
||||
// https://code.google.com/p/webrtc/issues/detail?id=3911 for details.
|
||||
"race:rtc::AsyncInvoker::OnMessage\n"
|
||||
"race:rtc::FireAndForgetAsyncClosure<FunctorB>::Execute\n"
|
||||
"race:rtc::MessageQueueManager::Clear\n"
|
||||
"race:rtc::Thread::Clear\n"
|
||||
// https://code.google.com/p/webrtc/issues/detail?id=3914
|
||||
"race:rtc::AsyncInvoker::~AsyncInvoker\n"
|
||||
// https://code.google.com/p/webrtc/issues/detail?id=2080
|
||||
"race:webrtc/base/logging.cc\n"
|
||||
"race:webrtc/base/sharedexclusivelock_unittest.cc\n"
|
||||
|
||||
@ -11,6 +11,7 @@
|
||||
#ifndef WEBRTC_BASE_ASYNCINVOKER_INL_H_
|
||||
#define WEBRTC_BASE_ASYNCINVOKER_INL_H_
|
||||
|
||||
#include "webrtc/base/atomicops.h"
|
||||
#include "webrtc/base/bind.h"
|
||||
#include "webrtc/base/callback.h"
|
||||
#include "webrtc/base/criticalsection.h"
|
||||
@ -26,18 +27,23 @@ class AsyncInvoker;
|
||||
// on the calling thread if necessary.
|
||||
class AsyncClosure {
|
||||
public:
|
||||
virtual ~AsyncClosure() {}
|
||||
explicit AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {}
|
||||
virtual ~AsyncClosure();
|
||||
// Runs the asynchronous task, and triggers a callback to the calling
|
||||
// thread if needed. Should be called from the target thread.
|
||||
virtual void Execute() = 0;
|
||||
|
||||
protected:
|
||||
AsyncInvoker* invoker_;
|
||||
};
|
||||
|
||||
// Simple closure that doesn't trigger a callback for the calling thread.
|
||||
template <class FunctorT>
|
||||
class FireAndForgetAsyncClosure : public AsyncClosure {
|
||||
public:
|
||||
explicit FireAndForgetAsyncClosure(const FunctorT& functor)
|
||||
: functor_(functor) {}
|
||||
explicit FireAndForgetAsyncClosure(AsyncInvoker* invoker,
|
||||
const FunctorT& functor)
|
||||
: AsyncClosure(invoker), functor_(functor) {}
|
||||
virtual void Execute() {
|
||||
functor_();
|
||||
}
|
||||
@ -65,7 +71,6 @@ class NotifyingAsyncClosureBase : public AsyncClosure,
|
||||
bool CallbackCanceled() const { return calling_thread_ == NULL; }
|
||||
|
||||
private:
|
||||
AsyncInvoker* invoker_;
|
||||
Location callback_posted_from_;
|
||||
Callback0<void> callback_;
|
||||
CriticalSection crit_;
|
||||
|
||||
@ -10,18 +10,29 @@
|
||||
|
||||
#include "webrtc/base/asyncinvoker.h"
|
||||
|
||||
#include "webrtc/base/atomicops.h"
|
||||
#include "webrtc/base/checks.h"
|
||||
#include "webrtc/base/logging.h"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
AsyncInvoker::AsyncInvoker() : destroying_(false) {}
|
||||
AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {}
|
||||
|
||||
AsyncInvoker::~AsyncInvoker() {
|
||||
destroying_ = true;
|
||||
SignalInvokerDestroyed();
|
||||
// Messages for this need to be cleared *before* our destructor is complete.
|
||||
MessageQueueManager::Clear(this);
|
||||
// And we need to wait for any invocations that are still in progress on
|
||||
// other threads.
|
||||
while (AtomicOps::AcquireLoad(&pending_invocations_)) {
|
||||
// If the destructor was called while AsyncInvoke was being called by
|
||||
// another thread, WITHIN an AsyncInvoked functor, it may do another
|
||||
// Thread::Post even after we called MessageQueueManager::Clear(this). So
|
||||
// we need to keep calling Clear to discard these posts.
|
||||
MessageQueueManager::Clear(this);
|
||||
invocation_complete_.Wait(Event::kForever);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncInvoker::OnMessage(Message* msg) {
|
||||
@ -59,6 +70,7 @@ void AsyncInvoker::DoInvoke(const Location& posted_from,
|
||||
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
|
||||
return;
|
||||
}
|
||||
AtomicOps::Increment(&pending_invocations_);
|
||||
thread->Post(posted_from, this, id,
|
||||
new ScopedMessageData<AsyncClosure>(std::move(closure)));
|
||||
}
|
||||
@ -72,6 +84,7 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
|
||||
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
|
||||
return;
|
||||
}
|
||||
AtomicOps::Increment(&pending_invocations_);
|
||||
thread->PostDelayed(posted_from, delay_ms, this, id,
|
||||
new ScopedMessageData<AsyncClosure>(std::move(closure)));
|
||||
}
|
||||
@ -99,11 +112,16 @@ void GuardedAsyncInvoker::ThreadDestroyed() {
|
||||
thread_ = nullptr;
|
||||
}
|
||||
|
||||
AsyncClosure::~AsyncClosure() {
|
||||
AtomicOps::Decrement(&invoker_->pending_invocations_);
|
||||
invoker_->invocation_complete_.Set();
|
||||
}
|
||||
|
||||
NotifyingAsyncClosureBase::NotifyingAsyncClosureBase(
|
||||
AsyncInvoker* invoker,
|
||||
const Location& callback_posted_from,
|
||||
Thread* calling_thread)
|
||||
: invoker_(invoker),
|
||||
: AsyncClosure(invoker),
|
||||
callback_posted_from_(callback_posted_from),
|
||||
calling_thread_(calling_thread) {
|
||||
calling_thread->SignalQueueDestroyed.connect(
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "webrtc/base/asyncinvoker-inl.h"
|
||||
#include "webrtc/base/bind.h"
|
||||
#include "webrtc/base/constructormagic.h"
|
||||
#include "webrtc/base/event.h"
|
||||
#include "webrtc/base/sigslot.h"
|
||||
#include "webrtc/base/thread.h"
|
||||
|
||||
@ -82,7 +83,7 @@ class AsyncInvoker : public MessageHandler {
|
||||
const FunctorT& functor,
|
||||
uint32_t id = 0) {
|
||||
std::unique_ptr<AsyncClosure> closure(
|
||||
new FireAndForgetAsyncClosure<FunctorT>(functor));
|
||||
new FireAndForgetAsyncClosure<FunctorT>(this, functor));
|
||||
DoInvoke(posted_from, thread, std::move(closure), id);
|
||||
}
|
||||
|
||||
@ -95,7 +96,7 @@ class AsyncInvoker : public MessageHandler {
|
||||
uint32_t delay_ms,
|
||||
uint32_t id = 0) {
|
||||
std::unique_ptr<AsyncClosure> closure(
|
||||
new FireAndForgetAsyncClosure<FunctorT>(functor));
|
||||
new FireAndForgetAsyncClosure<FunctorT>(this, functor));
|
||||
DoInvokeDelayed(posted_from, thread, std::move(closure), delay_ms, id);
|
||||
}
|
||||
|
||||
@ -157,7 +158,10 @@ class AsyncInvoker : public MessageHandler {
|
||||
std::unique_ptr<AsyncClosure> closure,
|
||||
uint32_t delay_ms,
|
||||
uint32_t id);
|
||||
bool destroying_;
|
||||
volatile int pending_invocations_ = 0;
|
||||
Event invocation_complete_;
|
||||
bool destroying_ = false;
|
||||
friend class AsyncClosure;
|
||||
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker);
|
||||
};
|
||||
|
||||
@ -11,6 +11,7 @@
|
||||
#ifndef WEBRTC_BASE_EVENT_H__
|
||||
#define WEBRTC_BASE_EVENT_H__
|
||||
|
||||
#include "webrtc/base/constructormagic.h"
|
||||
#if defined(WEBRTC_WIN)
|
||||
#include "webrtc/base/win32.h" // NOLINT: consider this a system header.
|
||||
#elif defined(WEBRTC_POSIX)
|
||||
@ -44,6 +45,8 @@ class Event {
|
||||
const bool is_manual_reset_;
|
||||
bool event_status_;
|
||||
#endif
|
||||
|
||||
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Event);
|
||||
};
|
||||
|
||||
} // namespace rtc
|
||||
|
||||
@ -356,13 +356,14 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
|
||||
|
||||
// Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
|
||||
// |thread1| starts the call.
|
||||
static void AsyncInvokeSetAndWait(
|
||||
Thread* thread1, Thread* thread2, LockedBool* out) {
|
||||
static void AsyncInvokeSetAndWait(AsyncInvoker* invoker,
|
||||
Thread* thread1,
|
||||
Thread* thread2,
|
||||
LockedBool* out) {
|
||||
CriticalSection crit;
|
||||
LockedBool async_invoked(false);
|
||||
|
||||
AsyncInvoker invoker;
|
||||
invoker.AsyncInvoke<void>(
|
||||
invoker->AsyncInvoke<void>(
|
||||
RTC_FROM_HERE, thread1,
|
||||
Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
|
||||
|
||||
@ -370,14 +371,15 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
|
||||
}
|
||||
};
|
||||
|
||||
AsyncInvoker invoker;
|
||||
LockedBool thread_a_called(false);
|
||||
|
||||
// Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
|
||||
// Thread B returns when C receives the call and C should be blocked until A
|
||||
// starts to process messages.
|
||||
thread_b.Invoke<void>(RTC_FROM_HERE,
|
||||
Bind(&LocalFuncs::AsyncInvokeSetAndWait, &thread_c,
|
||||
thread_a, &thread_a_called));
|
||||
Bind(&LocalFuncs::AsyncInvokeSetAndWait, &invoker,
|
||||
&thread_c, thread_a, &thread_a_called));
|
||||
EXPECT_FALSE(thread_a_called.Get());
|
||||
|
||||
EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000);
|
||||
@ -524,6 +526,41 @@ TEST_F(AsyncInvokeTest, KillInvokerBeforeExecute) {
|
||||
EXPECT_EQ(0, int_value_);
|
||||
}
|
||||
|
||||
TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) {
|
||||
// Use these events to get in a state where the functor is in the middle of
|
||||
// executing, and then to wait for it to finish, ensuring the "EXPECT_FALSE"
|
||||
// is run.
|
||||
Event functor_started(false, false);
|
||||
Event functor_continue(false, false);
|
||||
Event functor_finished(false, false);
|
||||
|
||||
Thread thread;
|
||||
thread.Start();
|
||||
volatile bool invoker_destroyed = false;
|
||||
{
|
||||
AsyncInvoker invoker;
|
||||
invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread,
|
||||
[&functor_started, &functor_continue,
|
||||
&functor_finished, &invoker_destroyed] {
|
||||
functor_started.Set();
|
||||
functor_continue.Wait(Event::kForever);
|
||||
rtc::Thread::Current()->SleepMs(kWaitTimeout);
|
||||
EXPECT_FALSE(invoker_destroyed);
|
||||
functor_finished.Set();
|
||||
});
|
||||
functor_started.Wait(Event::kForever);
|
||||
|
||||
// Allow the functor to continue and immediately destroy the invoker.
|
||||
functor_continue.Set();
|
||||
}
|
||||
|
||||
// If the destructor DIDN'T wait for the functor to finish executing, it will
|
||||
// hit the EXPECT_FALSE(invoker_destroyed) after it finishes sleeping for a
|
||||
// second.
|
||||
invoker_destroyed = true;
|
||||
functor_finished.Wait(Event::kForever);
|
||||
}
|
||||
|
||||
TEST_F(AsyncInvokeTest, Flush) {
|
||||
AsyncInvoker invoker;
|
||||
AtomicBool flag1;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user