Fix a problem in Thread::Send.

Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579.
The fix is to limit B->ReceiveSends to only process requests from A.
Also disallow the worker thread invoking other threads.

BUG=3559
R=juberti@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/15089004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@7290 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
jiayl@webrtc.org 2014-09-24 17:14:05 +00:00
parent a0ce9fa2a6
commit 3987b6de50
8 changed files with 121 additions and 23 deletions

View File

@ -481,9 +481,8 @@ class PeerConnectionTestClientBase
if (!allocator_factory_) {
return false;
}
audio_thread_.Start();
fake_audio_capture_module_ = FakeAudioCaptureModule::Create(
&audio_thread_);
rtc::Thread::Current());
if (fake_audio_capture_module_ == NULL) {
return false;
@ -557,12 +556,6 @@ class PeerConnectionTestClientBase
}
std::string id_;
// Separate thread for executing |fake_audio_capture_module_| tasks. Audio
// processing must not be performed on the same thread as signaling due to
// signaling time constraints and relative complexity of the audio pipeline.
// This is consistent with the video pipeline that us a a separate thread for
// encoding and decoding.
rtc::Thread audio_thread_;
rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface>
allocator_factory_;

View File

@ -41,6 +41,7 @@
#include "talk/media/webrtc/webrtcmediaengine.h"
#include "talk/media/webrtc/webrtcvideodecoderfactory.h"
#include "talk/media/webrtc/webrtcvideoencoderfactory.h"
#include "webrtc/base/bind.h"
#include "webrtc/modules/audio_device/include/audio_device.h"
using rtc::scoped_refptr;

View File

@ -75,9 +75,8 @@ bool PeerConnectionTestWrapper::CreatePc(
return false;
}
audio_thread_.Start();
fake_audio_capture_module_ = FakeAudioCaptureModule::Create(
&audio_thread_);
rtc::Thread::Current());
if (fake_audio_capture_module_ == NULL) {
return false;
}

View File

@ -111,7 +111,6 @@ class PeerConnectionTestWrapper
bool video, const webrtc::FakeConstraints& video_constraints);
std::string name_;
rtc::Thread audio_thread_;
rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface>
allocator_factory_;
rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;

View File

@ -137,6 +137,12 @@ void ChannelManager::Construct(MediaEngineInterface* me,
this, &ChannelManager::OnVideoCaptureStateChange);
capture_manager_->SignalCapturerStateChange.connect(
this, &ChannelManager::OnVideoCaptureStateChange);
if (worker_thread_ != rtc::Thread::Current()) {
// Do not allow invoking calls to other threads on the worker thread.
worker_thread_->Invoke<bool>(
rtc::Bind(&rtc::Thread::SetAllowBlockingCalls, worker_thread_, false));
}
}
ChannelManager::~ChannelManager() {

View File

@ -411,15 +411,12 @@ void Thread::Stop() {
}
void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
AssertBlockingIsAllowedOnCurrentThread();
if (fStop_)
return;
// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.
Message msg;
msg.phandler = phandler;
msg.message_id = id;
@ -429,6 +426,8 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
return;
}
AssertBlockingIsAllowedOnCurrentThread();
AutoThread thread;
Thread *current_thread = Thread::Current();
ASSERT(current_thread != NULL); // AutoThread ensures this
@ -451,7 +450,9 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
crit_.Enter();
while (!ready) {
crit_.Leave();
current_thread->ReceiveSends();
// We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
// thread invoking calls on the current thread.
current_thread->ReceiveSendsFromThread(this);
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
@ -475,17 +476,23 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
}
void Thread::ReceiveSends() {
ReceiveSendsFromThread(NULL);
}
void Thread::ReceiveSendsFromThread(const Thread* source) {
// Receive a sent message. Cleanup scenarios:
// - thread sending exits: We don't allow this, since thread can exit
// only via Join, so Send must complete.
// - thread receiving exits: Wakeup/set ready in Thread::Clear()
// - object target cleared: Wakeup/set ready in Thread::Clear()
_SendMessage smsg;
crit_.Enter();
while (!sendlist_.empty()) {
_SendMessage smsg = sendlist_.front();
sendlist_.pop_front();
while (PopSendMessageFromThread(source, &smsg)) {
crit_.Leave();
smsg.msg.phandler->OnMessage(&smsg.msg);
crit_.Enter();
*smsg.ready = true;
smsg.thread->socketserver()->WakeUp();
@ -493,6 +500,18 @@ void Thread::ReceiveSends() {
crit_.Leave();
}
bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
for (std::list<_SendMessage>::iterator it = sendlist_.begin();
it != sendlist_.end(); ++it) {
if (it->thread == source || source == NULL) {
*msg = *it;
sendlist_.erase(it);
return true;
}
}
return false;
}
void Thread::Clear(MessageHandler *phandler, uint32 id,
MessageList* removed) {
CritScope cs(&crit_);

View File

@ -165,7 +165,6 @@ class Thread : public MessageQueue {
// See ScopedDisallowBlockingCalls for details.
template <class ReturnT, class FunctorT>
ReturnT Invoke(const FunctorT& functor) {
AssertBlockingIsAllowedOnCurrentThread();
FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
Send(&handler);
return handler.result();
@ -210,6 +209,10 @@ class Thread : public MessageQueue {
// of whatever code is conditionally executing because of the return value!
bool RunningForTest() { return running(); }
// Sets the per-thread allow-blocking-calls flag and returns the previous
// value.
bool SetAllowBlockingCalls(bool allow);
protected:
// This method should be called when thread is created using non standard
// method, like derived implementation of rtc::Thread and it can not be
@ -226,10 +229,6 @@ class Thread : public MessageQueue {
// Blocks the calling thread until this thread has terminated.
void Join();
// Sets the per-thread allow-blocking-calls flag and returns the previous
// value.
bool SetAllowBlockingCalls(bool allow);
static void AssertBlockingIsAllowedOnCurrentThread();
friend class ScopedDisallowBlockingCalls;
@ -248,6 +247,16 @@ class Thread : public MessageQueue {
// Return true if the thread was started and hasn't yet stopped.
bool running() { return running_.Wait(0); }
// Processes received "Send" requests. If |source| is not NULL, only requests
// from |source| are processed, otherwise, all requests are processed.
void ReceiveSendsFromThread(const Thread* source);
// If |source| is not NULL, pops the first "Send" message from |source| in
// |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
// The caller must lock |crit_| before calling.
// Returns true if there is such a message.
bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
std::list<_SendMessage> sendlist_;
std::string name_;
ThreadPriority priority_;

View File

@ -276,6 +276,78 @@ TEST(ThreadTest, DISABLED_ON_MAC(Invoke)) {
thread.Invoke<void>(&LocalFuncs::Func2);
}
// Verifies that two threads calling Invoke on each other at the same time does
// not deadlock.
TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
AutoThread thread;
Thread* current_thread = Thread::Current();
ASSERT_TRUE(current_thread != NULL);
Thread other_thread;
other_thread.Start();
struct LocalFuncs {
static void Set(bool* out) { *out = true; }
static void InvokeSet(Thread* thread, bool* out) {
thread->Invoke<void>(Bind(&Set, out));
}
};
bool called = false;
other_thread.Invoke<void>(
Bind(&LocalFuncs::InvokeSet, current_thread, &called));
EXPECT_TRUE(called);
}
// Verifies that if thread A invokes a call on thread B and thread C is trying
// to invoke A at the same time, thread A does not handle C's invoke while
// invoking B.
TEST(ThreadTest, ThreeThreadsInvoke) {
AutoThread thread;
Thread* thread_a = Thread::Current();
Thread thread_b, thread_c;
thread_b.Start();
thread_c.Start();
struct LocalFuncs {
static void Set(bool* out) { *out = true; }
static void InvokeSet(Thread* thread, bool* out) {
thread->Invoke<void>(Bind(&Set, out));
}
// Set |out| true and call InvokeSet on |thread|.
static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) {
*out = true;
InvokeSet(thread, out_inner);
}
// Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
// |thread1| starts the call.
static void AsyncInvokeSetAndWait(
Thread* thread1, Thread* thread2, bool* out) {
bool async_invoked = false;
AsyncInvoker invoker;
invoker.AsyncInvoke<void>(
thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
EXPECT_TRUE_WAIT(async_invoked, 2000);
}
};
bool thread_a_called = false;
// Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
// Thread B returns when C receives the call and C should be blocked until A
// starts to process messages.
thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait,
&thread_c, thread_a, &thread_a_called));
EXPECT_FALSE(thread_a_called);
EXPECT_TRUE_WAIT(thread_a_called, 2000);
}
class AsyncInvokeTest : public testing::Test {
public:
void IntCallback(int value) {