From 8290ddfbce0ecd3d178c1bc280007dd796819538 Mon Sep 17 00:00:00 2001 From: deadbeef Date: Tue, 11 Jul 2017 16:56:05 -0700 Subject: [PATCH] Revert of Delete SignalThread class. (patchset #20 id:380001 of https://codereview.webrtc.org/2915253002/ ) Reason for revert: Seems to be causing new crashes, possibly because of changes to the "Destroy(false)" behavior. Will re-land after investigating these crashes more and addressing the root cause. Original issue's description: > Delete SignalThread class. > > Rewrite AsyncResolver to use PlatformThread directly, not > SignalThread, and update includes of peerconnection client to not > depend on signalthread.h. > > BUG=webrtc:6424,webrtc:7723 > > Review-Url: https://codereview.webrtc.org/2915253002 > Cr-Commit-Position: refs/heads/master@{#18833} > Committed: https://chromium.googlesource.com/external/webrtc/+/bc8feda1db02b2a9b501e4aa43926ca7e861b638 TBR=tommi@webrtc.org,kwiberg@webrtc.org,nisse@webrtc.org NOPRESUBMIT=true NOTRY=true BUG=webrtc:6424,webrtc:7723 Review-Url: https://codereview.webrtc.org/2979733002 Cr-Commit-Position: refs/heads/master@{#18980} --- webrtc/base/signalthread.h | 19 ++ .../client/peer_connection_client.cc | 1 - .../client/peer_connection_client.h | 2 +- webrtc/rtc_base/BUILD.gn | 4 +- webrtc/rtc_base/messagequeue.cc | 18 -- webrtc/rtc_base/messagequeue.h | 33 --- webrtc/rtc_base/nethelpers.cc | 92 ++------ webrtc/rtc_base/nethelpers.h | 46 +--- webrtc/rtc_base/signalthread.cc | 154 +++++++++++++ webrtc/rtc_base/signalthread.h | 161 ++++++++++++++ webrtc/rtc_base/signalthread_unittest.cc | 210 ++++++++++++++++++ 11 files changed, 572 insertions(+), 168 deletions(-) create mode 100644 webrtc/base/signalthread.h create mode 100644 webrtc/rtc_base/signalthread.cc create mode 100644 webrtc/rtc_base/signalthread.h create mode 100644 webrtc/rtc_base/signalthread_unittest.cc diff --git a/webrtc/base/signalthread.h b/webrtc/base/signalthread.h new file mode 100644 index 0000000000..f5fcf2c4a1 --- /dev/null +++ b/webrtc/base/signalthread.h @@ -0,0 +1,19 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_BASE_SIGNALTHREAD_H_ +#define WEBRTC_BASE_SIGNALTHREAD_H_ + + +// This header is deprecated and is just left here temporarily during +// refactoring. See https://bugs.webrtc.org/7634 for more details. +#include "webrtc/rtc_base/signalthread.h" + +#endif // WEBRTC_BASE_SIGNALTHREAD_H_ diff --git a/webrtc/examples/peerconnection/client/peer_connection_client.cc b/webrtc/examples/peerconnection/client/peer_connection_client.cc index 2f5916774c..24ae5e515b 100644 --- a/webrtc/examples/peerconnection/client/peer_connection_client.cc +++ b/webrtc/examples/peerconnection/client/peer_connection_client.cc @@ -15,7 +15,6 @@ #include "webrtc/rtc_base/logging.h" #include "webrtc/rtc_base/nethelpers.h" #include "webrtc/rtc_base/stringutils.h" -#include "webrtc/rtc_base/thread.h" #ifdef WIN32 #include "webrtc/rtc_base/win32socketserver.h" diff --git a/webrtc/examples/peerconnection/client/peer_connection_client.h b/webrtc/examples/peerconnection/client/peer_connection_client.h index 2b4f35c2aa..a4bdf66058 100644 --- a/webrtc/examples/peerconnection/client/peer_connection_client.h +++ b/webrtc/examples/peerconnection/client/peer_connection_client.h @@ -15,9 +15,9 @@ #include #include -#include "webrtc/rtc_base/messagehandler.h" #include "webrtc/rtc_base/nethelpers.h" #include "webrtc/rtc_base/physicalsocketserver.h" +#include "webrtc/rtc_base/signalthread.h" #include "webrtc/rtc_base/sigslot.h" typedef std::map Peers; diff --git a/webrtc/rtc_base/BUILD.gn b/webrtc/rtc_base/BUILD.gn index 9e905820d2..851973b15a 100644 --- a/webrtc/rtc_base/BUILD.gn +++ b/webrtc/rtc_base/BUILD.gn @@ -404,7 +404,6 @@ rtc_static_library("rtc_base") { libs = [] defines = [] deps = [ - ":rtc_task_queue", "..:webrtc_common", ] public_deps = [ @@ -480,6 +479,8 @@ rtc_static_library("rtc_base") { "rtccertificate.h", "rtccertificategenerator.cc", "rtccertificategenerator.h", + "signalthread.cc", + "signalthread.h", "sigslot.cc", "sigslot.h", "socket.h", @@ -988,6 +989,7 @@ if (rtc_include_tests) { "rtccertificate_unittest.cc", "rtccertificategenerator_unittest.cc", "sha1digest_unittest.cc", + "signalthread_unittest.cc", "sigslot_unittest.cc", "sigslottester_unittest.cc", "stream_unittest.cc", diff --git a/webrtc/rtc_base/messagequeue.cc b/webrtc/rtc_base/messagequeue.cc index fac7609cbf..883735c9cc 100644 --- a/webrtc/rtc_base/messagequeue.cc +++ b/webrtc/rtc_base/messagequeue.cc @@ -43,16 +43,6 @@ class SCOPED_LOCKABLE MarkProcessingCritScope { RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope); }; - -class FunctorPostMessageHandler : public MessageHandler { - public: - void OnMessage(Message* msg) override { - RunnableData* data = static_cast(msg->pdata); - data->Run(); - delete data; - } -}; - } // namespace //------------------------------------------------------------------ @@ -546,12 +536,4 @@ void MessageQueue::Dispatch(Message *pmsg) { } } -void MessageQueue::PostFunctorInternal(const Location& posted_from, - RunnableData* message_data) { - // Use static to ensure it outlives this scope. Safe since - // FunctorPostMessageHandler keeps no state. - static FunctorPostMessageHandler handler; - Post(posted_from, &handler, 0, message_data); -} - } // namespace rtc diff --git a/webrtc/rtc_base/messagequeue.h b/webrtc/rtc_base/messagequeue.h index 0d0654e2ec..2345dce7db 100644 --- a/webrtc/rtc_base/messagequeue.h +++ b/webrtc/rtc_base/messagequeue.h @@ -149,23 +149,6 @@ class DisposeData : public MessageData { T* data_; }; -// TODO(nisse): Replace RunnableData and FunctorData by a subclass of Message -// owning a QueuedTask. -class RunnableData : public MessageData { - public: - virtual void Run() = 0; -}; - -template -class FunctorData : public RunnableData { - public: - explicit FunctorData(FunctorT functor) : functor_(std::move(functor)) {} - void Run() override { functor_(); } - - private: - FunctorT functor_; -}; - const uint32_t MQID_ANY = static_cast(-1); const uint32_t MQID_DISPOSE = static_cast(-2); @@ -255,19 +238,6 @@ class MessageQueue { uint32_t id = 0, MessageData* pdata = nullptr, bool time_sensitive = false); - - // TODO(nisse): Replace with a method for posting a - // std::unique_ptr, to ease gradual conversion to using TaskQueue. - template ::value>::type* = - nullptr> - void Post(const Location& posted_from, FunctorT functor) { - PostFunctorInternal(posted_from, - new FunctorData(std::move(functor))); - } - virtual void PostDelayed(const Location& posted_from, int cmsDelay, MessageHandler* phandler, @@ -344,9 +314,6 @@ class MessageQueue { bool fDestroyed_; private: - void PostFunctorInternal(const Location& posted_from, - RunnableData* message_data); - volatile int stop_; // The SocketServer might not be owned by MessageQueue. diff --git a/webrtc/rtc_base/nethelpers.cc b/webrtc/rtc_base/nethelpers.cc index 0f56416e22..8489970b39 100644 --- a/webrtc/rtc_base/nethelpers.cc +++ b/webrtc/rtc_base/nethelpers.cc @@ -25,17 +25,13 @@ #endif #endif // defined(WEBRTC_POSIX) && !defined(__native_client__) -#include "webrtc/rtc_base/bind.h" #include "webrtc/rtc_base/byteorder.h" #include "webrtc/rtc_base/checks.h" #include "webrtc/rtc_base/logging.h" -#include "webrtc/rtc_base/ptr_util.h" -#include "webrtc/rtc_base/task_queue.h" -#include "webrtc/rtc_base/thread.h" +#include "webrtc/rtc_base/signalthread.h" namespace rtc { -namespace { int ResolveHostname(const std::string& hostname, int family, std::vector* addresses) { #ifdef __native_client__ @@ -85,54 +81,17 @@ int ResolveHostname(const std::string& hostname, int family, return 0; #endif // !__native_client__ } -} // namespace // AsyncResolver -AsyncResolver::AsyncResolver() : construction_thread_(Thread::Current()) { - RTC_DCHECK(construction_thread_); -} +AsyncResolver::AsyncResolver() + : SignalThread(false /* use_socket_server */), error_(-1) {} -AsyncResolver::~AsyncResolver() { - RTC_DCHECK(construction_thread_->IsCurrent()); - if (state_) - // It's possible that we have a posted message waiting on the MessageQueue - // refering to this object. Indirection via the ref-counted state_ object - // ensure it doesn't access us after deletion. - - // TODO(nisse): An alternative approach to solve this problem would be to - // extend MessageQueue::Clear in some way to let us selectively cancel posts - // directed to this object. Then we wouldn't need any ref count, but its a - // larger change to the MessageQueue. - state_->resolver = nullptr; -} +AsyncResolver::~AsyncResolver() = default; void AsyncResolver::Start(const SocketAddress& addr) { - RTC_DCHECK_RUN_ON(construction_thread_); - RTC_DCHECK(!resolver_queue_); - RTC_DCHECK(!state_); - // TODO(nisse): Support injection of task queue at construction? - resolver_queue_ = rtc::MakeUnique("AsyncResolverQueue"); addr_ = addr; - state_ = new RefCountedObject(this); - - // These member variables need to be copied to local variables to make it - // possible to capture them, even for capture-by-copy. - scoped_refptr state = state_; - rtc::Thread* construction_thread = construction_thread_; - resolver_queue_->PostTask([state, addr, construction_thread]() { - std::vector addresses; - int error = - ResolveHostname(addr.hostname().c_str(), addr.family(), &addresses); - // Ensure SignalDone is called on the main thread. - // TODO(nisse): Should use move of the address list, but not easy until - // C++17. Since this code isn't performance critical, copy should be fine - // for now. - construction_thread->Post(RTC_FROM_HERE, [state, error, addresses]() { - if (!state->resolver) - return; - state->resolver->ResolveDone(error, std::move(addresses)); - }); - }); + // SignalThred Start will kickoff the resolve process. + SignalThread::Start(); } bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const { @@ -154,41 +113,16 @@ int AsyncResolver::GetError() const { } void AsyncResolver::Destroy(bool wait) { - RTC_DCHECK_RUN_ON(construction_thread_); - RTC_DCHECK(!state_ || state_->resolver); - // If we don't wait here, we will nevertheless wait in the destructor. - if (wait || !state_) { - // Destroy task queue, blocks on any currently running task. If we have a - // pending task, it will post a call to attempt to call ResolveDone before - // finishing, which we will never handle. - delete this; - } else { - destroyed_ = true; - } + SignalThread::Destroy(wait); } -void AsyncResolver::ResolveDone(int error, std::vector addresses) { - RTC_DCHECK_RUN_ON(construction_thread_); - error_ = error; - addresses_ = std::move(addresses); - if (destroyed_) { - delete this; - return; - } else { - // Beware that SignalDone may call Destroy. +void AsyncResolver::DoWork() { + error_ = ResolveHostname(addr_.hostname().c_str(), addr_.family(), + &addresses_); +} - // TODO(nisse): Currently allows only Destroy(false) in this case, - // and that's what all webrtc code is using. With Destroy(true), - // this object would be destructed immediately, and the access - // both to |destroyed_| below as well as the sigslot machinery - // involved in SignalDone implies invalid use-after-free. - SignalDone(this); - if (destroyed_) { - delete this; - return; - } - } - state_ = nullptr; +void AsyncResolver::OnWorkDone() { + SignalDone(this); } const char* inet_ntop(int af, const void *src, char* dst, socklen_t size) { diff --git a/webrtc/rtc_base/nethelpers.h b/webrtc/rtc_base/nethelpers.h index b5f0c01a5c..d25879e785 100644 --- a/webrtc/rtc_base/nethelpers.h +++ b/webrtc/rtc_base/nethelpers.h @@ -19,25 +19,19 @@ #endif #include -#include #include "webrtc/rtc_base/asyncresolverinterface.h" -#include "webrtc/rtc_base/refcount.h" -#include "webrtc/rtc_base/scoped_ref_ptr.h" +#include "webrtc/rtc_base/signalthread.h" #include "webrtc/rtc_base/sigslot.h" #include "webrtc/rtc_base/socketaddress.h" -#include "webrtc/rtc_base/thread_checker.h" namespace rtc { -class Thread; -class TaskQueue; +class AsyncResolverTest; -// AsyncResolver will perform async DNS resolution, signaling the result on the -// SignalDone from AsyncResolverInterface when the operation completes. -// SignalDone is fired on the same thread on which the AsyncResolver was -// constructed. -class AsyncResolver : public AsyncResolverInterface { +// AsyncResolver will perform async DNS resolution, signaling the result on +// the SignalDone from AsyncResolverInterface when the operation completes. +class AsyncResolver : public SignalThread, public AsyncResolverInterface { public: AsyncResolver(); ~AsyncResolver() override; @@ -48,34 +42,16 @@ class AsyncResolver : public AsyncResolverInterface { void Destroy(bool wait) override; const std::vector& addresses() const { return addresses_; } + void set_error(int error) { error_ = error; } + + protected: + void DoWork() override; + void OnWorkDone() override; private: - void ResolveDone(int error, std::vector addresses); - - class Trampoline : public RefCountInterface { - public: - Trampoline(AsyncResolver* resolver) : resolver(resolver) {} - // Points back to the resolver, as long as it is alive. Cleared - // by the AsyncResolver destructor. - AsyncResolver* resolver; - }; - - // |state_| is non-null while resolution is pending, i.e., set - // non-null by Start() and cleared by ResolveDone(). The destructor - // clears state_->resolver (assuming |state_| is non-null), to - // indicate that the resolver can no longer be accessed. - scoped_refptr state_ ACCESS_ON(construction_thread_); - - Thread* const construction_thread_; - // Set to true when Destroy() can't delete the object immediately. - // Indicate that the ResolveDone method is now responsible for - // deletion. method should delete the object. - bool destroyed_ = false; - // Queue used only for a single task. - std::unique_ptr resolver_queue_; SocketAddress addr_; std::vector addresses_; - int error_ = -1; + int error_; }; // rtc namespaced wrappers for inet_ntop and inet_pton so we can avoid diff --git a/webrtc/rtc_base/signalthread.cc b/webrtc/rtc_base/signalthread.cc new file mode 100644 index 0000000000..be2741e764 --- /dev/null +++ b/webrtc/rtc_base/signalthread.cc @@ -0,0 +1,154 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/signalthread.h" + +#include "webrtc/base/checks.h" + +namespace rtc { + +/////////////////////////////////////////////////////////////////////////////// +// SignalThread +/////////////////////////////////////////////////////////////////////////////// + +SignalThread::SignalThread(bool use_socket_server) + : main_(Thread::Current()), + worker_(this, use_socket_server), + state_(kInit), + refcount_(1) { + main_->SignalQueueDestroyed.connect(this, + &SignalThread::OnMainThreadDestroyed); + worker_.SetName("SignalThread", this); +} + +SignalThread::~SignalThread() { + RTC_DCHECK(refcount_ == 0); +} + +bool SignalThread::SetName(const std::string& name, const void* obj) { + EnterExit ee(this); + RTC_DCHECK(main_->IsCurrent()); + RTC_DCHECK(kInit == state_); + return worker_.SetName(name, obj); +} + +void SignalThread::Start() { + EnterExit ee(this); + RTC_DCHECK(main_->IsCurrent()); + if (kInit == state_ || kComplete == state_) { + state_ = kRunning; + OnWorkStart(); + worker_.Start(); + } else { + RTC_NOTREACHED(); + } +} + +void SignalThread::Destroy(bool wait) { + EnterExit ee(this); + RTC_DCHECK(main_->IsCurrent()); + if ((kInit == state_) || (kComplete == state_)) { + refcount_--; + } else if (kRunning == state_ || kReleasing == state_) { + state_ = kStopping; + // OnWorkStop() must follow Quit(), so that when the thread wakes up due to + // OWS(), ContinueWork() will return false. + worker_.Quit(); + OnWorkStop(); + if (wait) { + // Release the thread's lock so that it can return from ::Run. + cs_.Leave(); + worker_.Stop(); + cs_.Enter(); + refcount_--; + } + } else { + RTC_NOTREACHED(); + } +} + +void SignalThread::Release() { + EnterExit ee(this); + RTC_DCHECK(main_->IsCurrent()); + if (kComplete == state_) { + refcount_--; + } else if (kRunning == state_) { + state_ = kReleasing; + } else { + // if (kInit == state_) use Destroy() + RTC_NOTREACHED(); + } +} + +bool SignalThread::ContinueWork() { + EnterExit ee(this); + RTC_DCHECK(worker_.IsCurrent()); + return worker_.ProcessMessages(0); +} + +void SignalThread::OnMessage(Message *msg) { + EnterExit ee(this); + if (ST_MSG_WORKER_DONE == msg->message_id) { + RTC_DCHECK(main_->IsCurrent()); + OnWorkDone(); + bool do_delete = false; + if (kRunning == state_) { + state_ = kComplete; + } else { + do_delete = true; + } + if (kStopping != state_) { + // Before signaling that the work is done, make sure that the worker + // thread actually is done. We got here because DoWork() finished and + // Run() posted the ST_MSG_WORKER_DONE message. This means the worker + // thread is about to go away anyway, but sometimes it doesn't actually + // finish before SignalWorkDone is processed, and for a reusable + // SignalThread this makes an assert in thread.cc fire. + // + // Calling Stop() on the worker ensures that the OS thread that underlies + // the worker will finish, and will be set to null, enabling us to call + // Start() again. + worker_.Stop(); + SignalWorkDone(this); + } + if (do_delete) { + refcount_--; + } + } +} + +SignalThread::Worker::~Worker() { + Stop(); +} + +void SignalThread::Worker::Run() { + parent_->Run(); +} + +void SignalThread::Run() { + DoWork(); + { + EnterExit ee(this); + if (main_) { + main_->Post(RTC_FROM_HERE, this, ST_MSG_WORKER_DONE); + } + } +} + +void SignalThread::OnMainThreadDestroyed() { + EnterExit ee(this); + main_ = nullptr; +} + +bool SignalThread::Worker::IsProcessingMessages() { + return false; +} + +} // namespace rtc diff --git a/webrtc/rtc_base/signalthread.h b/webrtc/rtc_base/signalthread.h new file mode 100644 index 0000000000..bc8c98ea7d --- /dev/null +++ b/webrtc/rtc_base/signalthread.h @@ -0,0 +1,161 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_RTC_BASE_SIGNALTHREAD_H_ +#define WEBRTC_RTC_BASE_SIGNALTHREAD_H_ + +#include + +#include "webrtc/base/checks.h" +#include "webrtc/base/constructormagic.h" +#include "webrtc/base/nullsocketserver.h" +#include "webrtc/base/sigslot.h" +#include "webrtc/base/thread.h" + +namespace rtc { + +/////////////////////////////////////////////////////////////////////////////// +// SignalThread - Base class for worker threads. The main thread should call +// Start() to begin work, and then follow one of these models: +// Normal: Wait for SignalWorkDone, and then call Release to destroy. +// Cancellation: Call Release(true), to abort the worker thread. +// Fire-and-forget: Call Release(false), which allows the thread to run to +// completion, and then self-destruct without further notification. +// Periodic tasks: Wait for SignalWorkDone, then eventually call Start() +// again to repeat the task. When the instance isn't needed anymore, +// call Release. DoWork, OnWorkStart and OnWorkStop are called again, +// on a new thread. +// The subclass should override DoWork() to perform the background task. By +// periodically calling ContinueWork(), it can check for cancellation. +// OnWorkStart and OnWorkDone can be overridden to do pre- or post-work +// tasks in the context of the main thread. +/////////////////////////////////////////////////////////////////////////////// + +class SignalThread + : public sigslot::has_slots<>, + protected MessageHandler { + public: + explicit SignalThread(bool use_socket_server = true); + + // Context: Main Thread. Call before Start to change the worker's name. + bool SetName(const std::string& name, const void* obj); + + // Context: Main Thread. Call to begin the worker thread. + void Start(); + + // Context: Main Thread. If the worker thread is not running, deletes the + // object immediately. Otherwise, asks the worker thread to abort processing, + // and schedules the object to be deleted once the worker exits. + // SignalWorkDone will not be signalled. If wait is true, does not return + // until the thread is deleted. + void Destroy(bool wait); + + // Context: Main Thread. If the worker thread is complete, deletes the + // object immediately. Otherwise, schedules the object to be deleted once + // the worker thread completes. SignalWorkDone will be signalled. + void Release(); + + // Context: Main Thread. Signalled when work is complete. + sigslot::signal1 SignalWorkDone; + + enum { ST_MSG_WORKER_DONE, ST_MSG_FIRST_AVAILABLE }; + + protected: + ~SignalThread() override; + + Thread* worker() { return &worker_; } + + // Context: Main Thread. Subclass should override to do pre-work setup. + virtual void OnWorkStart() { } + + // Context: Worker Thread. Subclass should override to do work. + virtual void DoWork() = 0; + + // Context: Worker Thread. Subclass should call periodically to + // dispatch messages and determine if the thread should terminate. + bool ContinueWork(); + + // Context: Worker Thread. Subclass should override when extra work is + // needed to abort the worker thread. + virtual void OnWorkStop() { } + + // Context: Main Thread. Subclass should override to do post-work cleanup. + virtual void OnWorkDone() { } + + // Context: Any Thread. If subclass overrides, be sure to call the base + // implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE) + void OnMessage(Message* msg) override; + + private: + enum State { + kInit, // Initialized, but not started + kRunning, // Started and doing work + kReleasing, // Same as running, but to be deleted when work is done + kComplete, // Work is done + kStopping, // Work is being interrupted + }; + + class Worker : public Thread { + public: + explicit Worker(SignalThread* parent, bool use_socket_server) + : Thread(use_socket_server + ? SocketServer::CreateDefault() + : std::unique_ptr(new NullSocketServer())), + parent_(parent) {} + ~Worker() override; + void Run() override; + bool IsProcessingMessages() override; + + private: + SignalThread* parent_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Worker); + }; + + class SCOPED_LOCKABLE EnterExit { + public: + explicit EnterExit(SignalThread* t) EXCLUSIVE_LOCK_FUNCTION(t->cs_) + : t_(t) { + t_->cs_.Enter(); + // If refcount_ is zero then the object has already been deleted and we + // will be double-deleting it in ~EnterExit()! (shouldn't happen) + RTC_DCHECK_NE(0, t_->refcount_); + ++t_->refcount_; + } + ~EnterExit() UNLOCK_FUNCTION() { + bool d = (0 == --t_->refcount_); + t_->cs_.Leave(); + if (d) + delete t_; + } + + private: + SignalThread* t_; + + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(EnterExit); + }; + + void Run(); + void OnMainThreadDestroyed(); + + Thread* main_; + Worker worker_; + CriticalSection cs_; + State state_; + int refcount_; + + RTC_DISALLOW_COPY_AND_ASSIGN(SignalThread); +}; + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace rtc + +#endif // WEBRTC_RTC_BASE_SIGNALTHREAD_H_ diff --git a/webrtc/rtc_base/signalthread_unittest.cc b/webrtc/rtc_base/signalthread_unittest.cc new file mode 100644 index 0000000000..53bb006102 --- /dev/null +++ b/webrtc/rtc_base/signalthread_unittest.cc @@ -0,0 +1,210 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include + +#include "webrtc/base/constructormagic.h" +#include "webrtc/base/gunit.h" +#include "webrtc/base/signalthread.h" +#include "webrtc/base/thread.h" + +using namespace rtc; + +// 10 seconds. +static const int kTimeout = 10000; + +class SignalThreadTest : public testing::Test, public sigslot::has_slots<> { + public: + class SlowSignalThread : public SignalThread { + public: + SlowSignalThread(SignalThreadTest* harness) : harness_(harness) {} + + virtual ~SlowSignalThread() { + EXPECT_EQ(harness_->main_thread_, Thread::Current()); + ++harness_->thread_deleted_; + } + + const SignalThreadTest* harness() { return harness_; } + + protected: + virtual void OnWorkStart() { + ASSERT_TRUE(harness_ != nullptr); + ++harness_->thread_started_; + EXPECT_EQ(harness_->main_thread_, Thread::Current()); + EXPECT_FALSE(worker()->RunningForTest()); // not started yet + } + + virtual void OnWorkStop() { + ++harness_->thread_stopped_; + EXPECT_EQ(harness_->main_thread_, Thread::Current()); + EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet + } + + virtual void OnWorkDone() { + ++harness_->thread_done_; + EXPECT_EQ(harness_->main_thread_, Thread::Current()); + EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet + } + + virtual void DoWork() { + EXPECT_NE(harness_->main_thread_, Thread::Current()); + EXPECT_EQ(worker(), Thread::Current()); + Thread::Current()->socketserver()->Wait(250, false); + } + + private: + SignalThreadTest* harness_; + RTC_DISALLOW_COPY_AND_ASSIGN(SlowSignalThread); + }; + + void OnWorkComplete(rtc::SignalThread* thread) { + SlowSignalThread* t = static_cast(thread); + EXPECT_EQ(t->harness(), this); + EXPECT_EQ(main_thread_, Thread::Current()); + + ++thread_completed_; + if (!called_release_) { + thread->Release(); + } + } + + virtual void SetUp() { + main_thread_ = Thread::Current(); + thread_ = new SlowSignalThread(this); + thread_->SignalWorkDone.connect(this, &SignalThreadTest::OnWorkComplete); + called_release_ = false; + thread_started_ = 0; + thread_done_ = 0; + thread_completed_ = 0; + thread_stopped_ = 0; + thread_deleted_ = 0; + } + + virtual void TearDown() {} + + void ExpectState(int started, + int done, + int completed, + int stopped, + int deleted) { + EXPECT_EQ(started, thread_started_); + EXPECT_EQ(done, thread_done_); + EXPECT_EQ(completed, thread_completed_); + EXPECT_EQ(stopped, thread_stopped_); + EXPECT_EQ(deleted, thread_deleted_); + } + + void ExpectStateWait(int started, + int done, + int completed, + int stopped, + int deleted, + int timeout) { + EXPECT_EQ_WAIT(started, thread_started_, timeout); + EXPECT_EQ_WAIT(done, thread_done_, timeout); + EXPECT_EQ_WAIT(completed, thread_completed_, timeout); + EXPECT_EQ_WAIT(stopped, thread_stopped_, timeout); + EXPECT_EQ_WAIT(deleted, thread_deleted_, timeout); + } + + Thread* main_thread_; + SlowSignalThread* thread_; + bool called_release_; + + int thread_started_; + int thread_done_; + int thread_completed_; + int thread_stopped_; + int thread_deleted_; +}; + +class OwnerThread : public Thread, public sigslot::has_slots<> { + public: + explicit OwnerThread(SignalThreadTest* harness) + : harness_(harness), has_run_(false) {} + + virtual ~OwnerThread() { Stop(); } + + virtual void Run() { + SignalThreadTest::SlowSignalThread* signal_thread = + new SignalThreadTest::SlowSignalThread(harness_); + signal_thread->SignalWorkDone.connect(this, &OwnerThread::OnWorkDone); + signal_thread->Start(); + Thread::Current()->socketserver()->Wait(100, false); + signal_thread->Release(); + // Delete |signal_thread|. + signal_thread->Destroy(true); + has_run_ = true; + } + + bool has_run() { return has_run_; } + void OnWorkDone(SignalThread* signal_thread) { + FAIL() << " This shouldn't get called."; + } + + private: + SignalThreadTest* harness_; + bool has_run_; + RTC_DISALLOW_COPY_AND_ASSIGN(OwnerThread); +}; + +// Test for when the main thread goes away while the +// signal thread is still working. This may happen +// when shutting down the process. +TEST_F(SignalThreadTest, OwnerThreadGoesAway) { + // We don't use |thread_| for this test, so destroy it. + thread_->Destroy(true); + + { + std::unique_ptr owner(new OwnerThread(this)); + main_thread_ = owner.get(); + owner->Start(); + while (!owner->has_run()) { + Thread::Current()->socketserver()->Wait(10, false); + } + } + // At this point the main thread has gone away. + // Give the SignalThread a little time to do its callback, + // which will crash if the signal thread doesn't handle + // this situation well. + Thread::Current()->socketserver()->Wait(500, false); +} + +TEST_F(SignalThreadTest, ThreadFinishes) { + thread_->Start(); + ExpectState(1, 0, 0, 0, 0); + ExpectStateWait(1, 1, 1, 0, 1, kTimeout); +} + +TEST_F(SignalThreadTest, ReleasedThreadFinishes) { + thread_->Start(); + ExpectState(1, 0, 0, 0, 0); + thread_->Release(); + called_release_ = true; + ExpectState(1, 0, 0, 0, 0); + ExpectStateWait(1, 1, 1, 0, 1, kTimeout); +} + +TEST_F(SignalThreadTest, DestroyedThreadCleansUp) { + thread_->Start(); + ExpectState(1, 0, 0, 0, 0); + thread_->Destroy(true); + ExpectState(1, 0, 0, 1, 1); + Thread::Current()->ProcessMessages(0); + ExpectState(1, 0, 0, 1, 1); +} + +TEST_F(SignalThreadTest, DeferredDestroyedThreadCleansUp) { + thread_->Start(); + ExpectState(1, 0, 0, 0, 0); + thread_->Destroy(false); + ExpectState(1, 0, 0, 1, 0); + ExpectStateWait(1, 1, 0, 1, 1, kTimeout); +}