From 3987b6de506a7e72a5bdfdf8c8ad9964705c5a28 Mon Sep 17 00:00:00 2001 From: "jiayl@webrtc.org" Date: Wed, 24 Sep 2014 17:14:05 +0000 Subject: [PATCH] Fix a problem in Thread::Send. Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579. The fix is to limit B->ReceiveSends to only process requests from A. Also disallow the worker thread invoking other threads. BUG=3559 R=juberti@webrtc.org Review URL: https://webrtc-codereview.appspot.com/15089004 git-svn-id: http://webrtc.googlecode.com/svn/trunk@7290 4adac7df-926f-26a2-2b94-8c16560cd09d --- talk/app/webrtc/peerconnection_unittest.cc | 9 +-- talk/app/webrtc/peerconnectionfactory.cc | 1 + .../webrtc/test/peerconnectiontestwrapper.cc | 3 +- .../webrtc/test/peerconnectiontestwrapper.h | 1 - talk/session/media/channelmanager.cc | 6 ++ webrtc/base/thread.cc | 33 +++++++-- webrtc/base/thread.h | 19 +++-- webrtc/base/thread_unittest.cc | 72 +++++++++++++++++++ 8 files changed, 121 insertions(+), 23 deletions(-) diff --git a/talk/app/webrtc/peerconnection_unittest.cc b/talk/app/webrtc/peerconnection_unittest.cc index 0d3e426f70..977fc11dd8 100644 --- a/talk/app/webrtc/peerconnection_unittest.cc +++ b/talk/app/webrtc/peerconnection_unittest.cc @@ -481,9 +481,8 @@ class PeerConnectionTestClientBase if (!allocator_factory_) { return false; } - audio_thread_.Start(); fake_audio_capture_module_ = FakeAudioCaptureModule::Create( - &audio_thread_); + rtc::Thread::Current()); if (fake_audio_capture_module_ == NULL) { return false; @@ -557,12 +556,6 @@ class PeerConnectionTestClientBase } std::string id_; - // Separate thread for executing |fake_audio_capture_module_| tasks. Audio - // processing must not be performed on the same thread as signaling due to - // signaling time constraints and relative complexity of the audio pipeline. - // This is consistent with the video pipeline that us a a separate thread for - // encoding and decoding. - rtc::Thread audio_thread_; rtc::scoped_refptr allocator_factory_; diff --git a/talk/app/webrtc/peerconnectionfactory.cc b/talk/app/webrtc/peerconnectionfactory.cc index 5dccba88d3..862ceda96a 100644 --- a/talk/app/webrtc/peerconnectionfactory.cc +++ b/talk/app/webrtc/peerconnectionfactory.cc @@ -41,6 +41,7 @@ #include "talk/media/webrtc/webrtcmediaengine.h" #include "talk/media/webrtc/webrtcvideodecoderfactory.h" #include "talk/media/webrtc/webrtcvideoencoderfactory.h" +#include "webrtc/base/bind.h" #include "webrtc/modules/audio_device/include/audio_device.h" using rtc::scoped_refptr; diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.cc b/talk/app/webrtc/test/peerconnectiontestwrapper.cc index 8a4f45cc93..24932b89f6 100644 --- a/talk/app/webrtc/test/peerconnectiontestwrapper.cc +++ b/talk/app/webrtc/test/peerconnectiontestwrapper.cc @@ -75,9 +75,8 @@ bool PeerConnectionTestWrapper::CreatePc( return false; } - audio_thread_.Start(); fake_audio_capture_module_ = FakeAudioCaptureModule::Create( - &audio_thread_); + rtc::Thread::Current()); if (fake_audio_capture_module_ == NULL) { return false; } diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.h b/talk/app/webrtc/test/peerconnectiontestwrapper.h index f3477cecdc..d4a0e4ecb1 100644 --- a/talk/app/webrtc/test/peerconnectiontestwrapper.h +++ b/talk/app/webrtc/test/peerconnectiontestwrapper.h @@ -111,7 +111,6 @@ class PeerConnectionTestWrapper bool video, const webrtc::FakeConstraints& video_constraints); std::string name_; - rtc::Thread audio_thread_; rtc::scoped_refptr allocator_factory_; rtc::scoped_refptr peer_connection_; diff --git a/talk/session/media/channelmanager.cc b/talk/session/media/channelmanager.cc index 45e7e4764a..199bc8621d 100644 --- a/talk/session/media/channelmanager.cc +++ b/talk/session/media/channelmanager.cc @@ -137,6 +137,12 @@ void ChannelManager::Construct(MediaEngineInterface* me, this, &ChannelManager::OnVideoCaptureStateChange); capture_manager_->SignalCapturerStateChange.connect( this, &ChannelManager::OnVideoCaptureStateChange); + + if (worker_thread_ != rtc::Thread::Current()) { + // Do not allow invoking calls to other threads on the worker thread. + worker_thread_->Invoke( + rtc::Bind(&rtc::Thread::SetAllowBlockingCalls, worker_thread_, false)); + } } ChannelManager::~ChannelManager() { diff --git a/webrtc/base/thread.cc b/webrtc/base/thread.cc index 9d2917d9a0..40257ab88c 100644 --- a/webrtc/base/thread.cc +++ b/webrtc/base/thread.cc @@ -411,15 +411,12 @@ void Thread::Stop() { } void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { - AssertBlockingIsAllowedOnCurrentThread(); - if (fStop_) return; // Sent messages are sent to the MessageHandler directly, in the context // of "thread", like Win32 SendMessage. If in the right context, // call the handler directly. - Message msg; msg.phandler = phandler; msg.message_id = id; @@ -429,6 +426,8 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { return; } + AssertBlockingIsAllowedOnCurrentThread(); + AutoThread thread; Thread *current_thread = Thread::Current(); ASSERT(current_thread != NULL); // AutoThread ensures this @@ -451,7 +450,9 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { crit_.Enter(); while (!ready) { crit_.Leave(); - current_thread->ReceiveSends(); + // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary + // thread invoking calls on the current thread. + current_thread->ReceiveSendsFromThread(this); current_thread->socketserver()->Wait(kForever, false); waited = true; crit_.Enter(); @@ -475,17 +476,23 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { } void Thread::ReceiveSends() { + ReceiveSendsFromThread(NULL); +} + +void Thread::ReceiveSendsFromThread(const Thread* source) { // Receive a sent message. Cleanup scenarios: // - thread sending exits: We don't allow this, since thread can exit // only via Join, so Send must complete. // - thread receiving exits: Wakeup/set ready in Thread::Clear() // - object target cleared: Wakeup/set ready in Thread::Clear() + _SendMessage smsg; + crit_.Enter(); - while (!sendlist_.empty()) { - _SendMessage smsg = sendlist_.front(); - sendlist_.pop_front(); + while (PopSendMessageFromThread(source, &smsg)) { crit_.Leave(); + smsg.msg.phandler->OnMessage(&smsg.msg); + crit_.Enter(); *smsg.ready = true; smsg.thread->socketserver()->WakeUp(); @@ -493,6 +500,18 @@ void Thread::ReceiveSends() { crit_.Leave(); } +bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { + for (std::list<_SendMessage>::iterator it = sendlist_.begin(); + it != sendlist_.end(); ++it) { + if (it->thread == source || source == NULL) { + *msg = *it; + sendlist_.erase(it); + return true; + } + } + return false; +} + void Thread::Clear(MessageHandler *phandler, uint32 id, MessageList* removed) { CritScope cs(&crit_); diff --git a/webrtc/base/thread.h b/webrtc/base/thread.h index 25b0f569fc..34ec45e3b9 100644 --- a/webrtc/base/thread.h +++ b/webrtc/base/thread.h @@ -165,7 +165,6 @@ class Thread : public MessageQueue { // See ScopedDisallowBlockingCalls for details. template ReturnT Invoke(const FunctorT& functor) { - AssertBlockingIsAllowedOnCurrentThread(); FunctorMessageHandler handler(functor); Send(&handler); return handler.result(); @@ -210,6 +209,10 @@ class Thread : public MessageQueue { // of whatever code is conditionally executing because of the return value! bool RunningForTest() { return running(); } + // Sets the per-thread allow-blocking-calls flag and returns the previous + // value. + bool SetAllowBlockingCalls(bool allow); + protected: // This method should be called when thread is created using non standard // method, like derived implementation of rtc::Thread and it can not be @@ -226,10 +229,6 @@ class Thread : public MessageQueue { // Blocks the calling thread until this thread has terminated. void Join(); - // Sets the per-thread allow-blocking-calls flag and returns the previous - // value. - bool SetAllowBlockingCalls(bool allow); - static void AssertBlockingIsAllowedOnCurrentThread(); friend class ScopedDisallowBlockingCalls; @@ -248,6 +247,16 @@ class Thread : public MessageQueue { // Return true if the thread was started and hasn't yet stopped. bool running() { return running_.Wait(0); } + // Processes received "Send" requests. If |source| is not NULL, only requests + // from |source| are processed, otherwise, all requests are processed. + void ReceiveSendsFromThread(const Thread* source); + + // If |source| is not NULL, pops the first "Send" message from |source| in + // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|. + // The caller must lock |crit_| before calling. + // Returns true if there is such a message. + bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg); + std::list<_SendMessage> sendlist_; std::string name_; ThreadPriority priority_; diff --git a/webrtc/base/thread_unittest.cc b/webrtc/base/thread_unittest.cc index 4229df2816..57b6df669f 100644 --- a/webrtc/base/thread_unittest.cc +++ b/webrtc/base/thread_unittest.cc @@ -276,6 +276,78 @@ TEST(ThreadTest, DISABLED_ON_MAC(Invoke)) { thread.Invoke(&LocalFuncs::Func2); } +// Verifies that two threads calling Invoke on each other at the same time does +// not deadlock. +TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) { + AutoThread thread; + Thread* current_thread = Thread::Current(); + ASSERT_TRUE(current_thread != NULL); + + Thread other_thread; + other_thread.Start(); + + struct LocalFuncs { + static void Set(bool* out) { *out = true; } + static void InvokeSet(Thread* thread, bool* out) { + thread->Invoke(Bind(&Set, out)); + } + }; + + bool called = false; + other_thread.Invoke( + Bind(&LocalFuncs::InvokeSet, current_thread, &called)); + + EXPECT_TRUE(called); +} + +// Verifies that if thread A invokes a call on thread B and thread C is trying +// to invoke A at the same time, thread A does not handle C's invoke while +// invoking B. +TEST(ThreadTest, ThreeThreadsInvoke) { + AutoThread thread; + Thread* thread_a = Thread::Current(); + Thread thread_b, thread_c; + thread_b.Start(); + thread_c.Start(); + + struct LocalFuncs { + static void Set(bool* out) { *out = true; } + static void InvokeSet(Thread* thread, bool* out) { + thread->Invoke(Bind(&Set, out)); + } + + // Set |out| true and call InvokeSet on |thread|. + static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) { + *out = true; + InvokeSet(thread, out_inner); + } + + // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until + // |thread1| starts the call. + static void AsyncInvokeSetAndWait( + Thread* thread1, Thread* thread2, bool* out) { + bool async_invoked = false; + + AsyncInvoker invoker; + invoker.AsyncInvoke( + thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out)); + + EXPECT_TRUE_WAIT(async_invoked, 2000); + } + }; + + bool thread_a_called = false; + + // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A. + // Thread B returns when C receives the call and C should be blocked until A + // starts to process messages. + thread_b.Invoke(Bind(&LocalFuncs::AsyncInvokeSetAndWait, + &thread_c, thread_a, &thread_a_called)); + EXPECT_FALSE(thread_a_called); + + EXPECT_TRUE_WAIT(thread_a_called, 2000); +} + class AsyncInvokeTest : public testing::Test { public: void IntCallback(int value) {