Thread: delete racy API (Release()) and fix racy code (started()).

- Thread::Release() wrote a local variable on the calling thread but read it on
  another thread, with no synchronization.  Happily it has no non-test callers
  so deleting it instead of trying to fix it (see bug for details).
- Thread::started_ similarly was racily being written to; replaced with a
  running_ Event, and hid the accessor except for tests & legacy callers,
  with a note about why it's a bad idea.

webrtc/base patched with:
git diff origin --relative=talk/base | patch -p1 -dwebrtc/base
followed by manual merge of 3 thunks that ran afoul of naming differences
between talk/base and webrtc/base.

BUG=3388
R=andrew@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/14589005

git-svn-id: http://webrtc.googlecode.com/svn/trunk@6236 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
fischman@webrtc.org 2014-05-23 17:28:50 +00:00
parent 18f41b8eb4
commit e5063b1733
9 changed files with 80 additions and 104 deletions

View File

@ -50,19 +50,19 @@ class SignalThreadTest : public testing::Test, public sigslot::has_slots<> {
ASSERT_TRUE(harness_ != NULL);
++harness_->thread_started_;
EXPECT_EQ(harness_->main_thread_, Thread::Current());
EXPECT_FALSE(worker()->started()); // not started yet
EXPECT_FALSE(worker()->RunningForTest()); // not started yet
}
virtual void OnWorkStop() {
++harness_->thread_stopped_;
EXPECT_EQ(harness_->main_thread_, Thread::Current());
EXPECT_TRUE(worker()->started()); // not stopped yet
EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet
}
virtual void OnWorkDone() {
++harness_->thread_done_;
EXPECT_EQ(harness_->main_thread_, Thread::Current());
EXPECT_TRUE(worker()->started()); // not stopped yet
EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet
}
virtual void DoWork() {

View File

@ -145,13 +145,12 @@ struct ThreadInit {
Thread::Thread(SocketServer* ss)
: MessageQueue(ss),
priority_(PRIORITY_NORMAL),
started_(false),
running_(true, false),
#if defined(WIN32)
thread_(NULL),
thread_id_(0),
#endif
owned_(true),
delete_self_when_complete_(false) {
owned_(true) {
SetName("Thread", this); // default name
}
@ -180,7 +179,7 @@ bool Thread::SleepMs(int milliseconds) {
}
bool Thread::SetName(const std::string& name, const void* obj) {
if (started_) return false;
if (running()) return false;
name_ = name;
if (obj) {
char buf[16];
@ -192,7 +191,7 @@ bool Thread::SetName(const std::string& name, const void* obj) {
bool Thread::SetPriority(ThreadPriority priority) {
#if defined(WIN32)
if (started_) {
if (running()) {
BOOL ret = FALSE;
if (priority == PRIORITY_NORMAL) {
ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL);
@ -211,7 +210,7 @@ bool Thread::SetPriority(ThreadPriority priority) {
return true;
#else
// TODO: Implement for Linux/Mac if possible.
if (started_) return false;
if (running()) return false;
priority_ = priority;
return true;
#endif
@ -220,8 +219,8 @@ bool Thread::SetPriority(ThreadPriority priority) {
bool Thread::Start(Runnable* runnable) {
ASSERT(owned_);
if (!owned_) return false;
ASSERT(!started_);
if (started_) return false;
ASSERT(!running());
if (running()) return false;
Restart(); // reset fStop_ if the thread is being restarted
@ -240,7 +239,7 @@ bool Thread::Start(Runnable* runnable) {
thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags,
&thread_id_);
if (thread_) {
started_ = true;
running_.Set();
if (priority_ != PRIORITY_NORMAL) {
SetPriority(priority_);
::ResumeThread(thread_);
@ -288,13 +287,13 @@ bool Thread::Start(Runnable* runnable) {
LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
return false;
}
started_ = true;
running_.Set();
#endif
return true;
}
void Thread::Join() {
if (started_) {
if (running()) {
ASSERT(!IsCurrent());
#if defined(WIN32)
WaitForSingleObject(thread_, INFINITE);
@ -305,7 +304,7 @@ void Thread::Join() {
void *pv;
pthread_join(thread_, &pv);
#endif
started_ = false;
running_.Reset();
}
}
@ -356,10 +355,6 @@ void* Thread::PreRun(void* pv) {
} else {
init->thread->Run();
}
if (init->thread->delete_self_when_complete_) {
init->thread->started_ = false;
delete init->thread;
}
delete init;
return NULL;
}
@ -521,7 +516,7 @@ bool Thread::WrapCurrent() {
}
bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) {
if (started_)
if (running())
return false;
#if defined(WIN32)
// We explicitly ask for no rights other than synchronization.
@ -536,7 +531,7 @@ bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) {
thread_ = pthread_self();
#endif
owned_ = false;
started_ = true;
running_.Set();
thread_manager->SetCurrentThread(this);
return true;
}
@ -549,7 +544,7 @@ void Thread::UnwrapCurrent() {
LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
}
#endif
started_ = false;
running_.Reset();
}

View File

@ -37,6 +37,7 @@
#include <pthread.h>
#endif
#include "talk/base/constructormagic.h"
#include "talk/base/event.h"
#include "talk/base/messagequeue.h"
#ifdef WIN32
@ -143,15 +144,8 @@ class Thread : public MessageQueue {
bool SetPriority(ThreadPriority priority);
// Starts the execution of the thread.
bool started() const { return started_; }
bool Start(Runnable* runnable = NULL);
// Used for fire-and-forget threads. Deletes this thread object when the
// Run method returns.
void Release() {
delete_self_when_complete_ = true;
}
// Tells the thread to stop and waits until it is joined.
// Never call Stop on the current thread. Instead use the inherited Quit
// function which will exit the base MessageQueue without terminating the
@ -218,6 +212,19 @@ class Thread : public MessageQueue {
bool WrapCurrent();
void UnwrapCurrent();
// Expose private method running() for tests.
//
// DANGER: this is a terrible public API. Most callers that might want to
// call this likely do not have enough control/knowledge of the Thread in
// question to guarantee that the returned value remains true for the duration
// of whatever code is conditionally executing because of the return value!
bool RunningForTest() { return running(); }
// This is a legacy call-site that probably doesn't need to exist in the first
// place.
// TODO(fischman): delete once the ASSERT added in channelmanager.cc sticks
// for a month (ETA 2014/06/22).
bool RunningForChannelManager() { return running(); }
protected:
// Blocks the calling thread until this thread has terminated.
void Join();
@ -230,10 +237,13 @@ class Thread : public MessageQueue {
// being created.
bool WrapCurrentWithThreadManager(ThreadManager* thread_manager);
// Return true if the thread was started and hasn't yet stopped.
bool running() { return running_.Wait(0); }
std::list<_SendMessage> sendlist_;
std::string name_;
ThreadPriority priority_;
bool started_;
Event running_; // Signalled means running.
#ifdef POSIX
pthread_t thread_;
@ -245,7 +255,6 @@ class Thread : public MessageQueue {
#endif
bool owned_;
bool delete_self_when_complete_;
friend class ThreadManager;

View File

@ -261,32 +261,14 @@ TEST(ThreadTest, Wrap) {
current_thread->UnwrapCurrent();
CustomThread* cthread = new CustomThread();
EXPECT_TRUE(cthread->WrapCurrent());
EXPECT_TRUE(cthread->started());
EXPECT_TRUE(cthread->RunningForTest());
EXPECT_FALSE(cthread->IsOwned());
cthread->UnwrapCurrent();
EXPECT_FALSE(cthread->started());
EXPECT_FALSE(cthread->RunningForTest());
delete cthread;
current_thread->WrapCurrent();
}
// Test that calling Release on a thread causes it to self-destruct when
// it's finished running
TEST(ThreadTest, Release) {
scoped_ptr<Event> event(new Event(true, false));
// Ensure the event is initialized.
event->Reset();
Thread* thread = new SignalWhenDestroyedThread(event.get());
thread->Start();
thread->Release();
// The event should get signaled when the thread completes, which should
// be nearly instantaneous, since it doesn't do anything. For safety,
// give it 3 seconds in case the machine is under load.
bool signaled = event->Wait(3000);
EXPECT_TRUE(signaled);
}
TEST(ThreadTest, Invoke) {
// Create and start the thread.
Thread thread;

View File

@ -217,7 +217,11 @@ bool ChannelManager::Init() {
}
ASSERT(worker_thread_ != NULL);
if (worker_thread_ && worker_thread_->started()) {
ASSERT(worker_thread_->RunningForChannelManager());
// TODO(fischman): remove the if below (and
// Thread::RunningForChannelManager()) once the ASSERT above has stuck for a
// month (2014/06/22).
if (worker_thread_ && worker_thread_->RunningForChannelManager()) {
if (media_engine_->Init(worker_thread_)) {
initialized_ = true;

View File

@ -33,19 +33,19 @@ class SignalThreadTest : public testing::Test, public sigslot::has_slots<> {
ASSERT_TRUE(harness_ != NULL);
++harness_->thread_started_;
EXPECT_EQ(harness_->main_thread_, Thread::Current());
EXPECT_FALSE(worker()->started()); // not started yet
EXPECT_FALSE(worker()->RunningForTest()); // not started yet
}
virtual void OnWorkStop() {
++harness_->thread_stopped_;
EXPECT_EQ(harness_->main_thread_, Thread::Current());
EXPECT_TRUE(worker()->started()); // not stopped yet
EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet
}
virtual void OnWorkDone() {
++harness_->thread_done_;
EXPECT_EQ(harness_->main_thread_, Thread::Current());
EXPECT_TRUE(worker()->started()); // not stopped yet
EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet
}
virtual void DoWork() {

View File

@ -128,13 +128,12 @@ struct ThreadInit {
Thread::Thread(SocketServer* ss)
: MessageQueue(ss),
priority_(PRIORITY_NORMAL),
started_(false),
running_(true, false),
#if defined(WEBRTC_WIN)
thread_(NULL),
thread_id_(0),
#endif
owned_(true),
delete_self_when_complete_(false) {
owned_(true) {
SetName("Thread", this); // default name
}
@ -163,7 +162,7 @@ bool Thread::SleepMs(int milliseconds) {
}
bool Thread::SetName(const std::string& name, const void* obj) {
if (started_) return false;
if (running()) return false;
name_ = name;
if (obj) {
char buf[16];
@ -175,7 +174,7 @@ bool Thread::SetName(const std::string& name, const void* obj) {
bool Thread::SetPriority(ThreadPriority priority) {
#if defined(WEBRTC_WIN)
if (started_) {
if (running()) {
BOOL ret = FALSE;
if (priority == PRIORITY_NORMAL) {
ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL);
@ -194,7 +193,7 @@ bool Thread::SetPriority(ThreadPriority priority) {
return true;
#else
// TODO: Implement for Linux/Mac if possible.
if (started_) return false;
if (running()) return false;
priority_ = priority;
return true;
#endif
@ -203,8 +202,8 @@ bool Thread::SetPriority(ThreadPriority priority) {
bool Thread::Start(Runnable* runnable) {
ASSERT(owned_);
if (!owned_) return false;
ASSERT(!started_);
if (started_) return false;
ASSERT(!running());
if (running()) return false;
Restart(); // reset fStop_ if the thread is being restarted
@ -223,7 +222,7 @@ bool Thread::Start(Runnable* runnable) {
thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags,
&thread_id_);
if (thread_) {
started_ = true;
running_.Set();
if (priority_ != PRIORITY_NORMAL) {
SetPriority(priority_);
::ResumeThread(thread_);
@ -271,13 +270,13 @@ bool Thread::Start(Runnable* runnable) {
LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
return false;
}
started_ = true;
running_.Set();
#endif
return true;
}
void Thread::Join() {
if (started_) {
if (running()) {
ASSERT(!IsCurrent());
#if defined(WEBRTC_WIN)
WaitForSingleObject(thread_, INFINITE);
@ -288,7 +287,7 @@ void Thread::Join() {
void *pv;
pthread_join(thread_, &pv);
#endif
started_ = false;
running_.Reset();
}
}
@ -317,7 +316,7 @@ void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) {
__except(EXCEPTION_CONTINUE_EXECUTION) {
}
}
#endif // WEBRTC_WIN
#endif // WEBRTC_WIN
void* Thread::PreRun(void* pv) {
ThreadInit* init = static_cast<ThreadInit*>(pv);
@ -339,10 +338,6 @@ void* Thread::PreRun(void* pv) {
} else {
init->thread->Run();
}
if (init->thread->delete_self_when_complete_) {
init->thread->started_ = false;
delete init->thread;
}
delete init;
return NULL;
}
@ -504,7 +499,7 @@ bool Thread::WrapCurrent() {
}
bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) {
if (started_)
if (running())
return false;
#if defined(WEBRTC_WIN)
// We explicitly ask for no rights other than synchronization.
@ -519,7 +514,7 @@ bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) {
thread_ = pthread_self();
#endif
owned_ = false;
started_ = true;
running_.Set();
thread_manager->SetCurrentThread(this);
return true;
}
@ -532,7 +527,7 @@ void Thread::UnwrapCurrent() {
LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
}
#endif
started_ = false;
running_.Reset();
}

View File

@ -20,6 +20,7 @@
#include <pthread.h>
#endif
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/event.h"
#include "webrtc/base/messagequeue.h"
#if defined(WEBRTC_WIN)
@ -126,15 +127,8 @@ class Thread : public MessageQueue {
bool SetPriority(ThreadPriority priority);
// Starts the execution of the thread.
bool started() const { return started_; }
bool Start(Runnable* runnable = NULL);
// Used for fire-and-forget threads. Deletes this thread object when the
// Run method returns.
void Release() {
delete_self_when_complete_ = true;
}
// Tells the thread to stop and waits until it is joined.
// Never call Stop on the current thread. Instead use the inherited Quit
// function which will exit the base MessageQueue without terminating the
@ -201,6 +195,19 @@ class Thread : public MessageQueue {
bool WrapCurrent();
void UnwrapCurrent();
// Expose private method running() for tests.
//
// DANGER: this is a terrible public API. Most callers that might want to
// call this likely do not have enough control/knowledge of the Thread in
// question to guarantee that the returned value remains true for the duration
// of whatever code is conditionally executing because of the return value!
bool RunningForTest() { return running(); }
// This is a legacy call-site that probably doesn't need to exist in the first
// place.
// TODO(fischman): delete once the ASSERT added in channelmanager.cc sticks
// for a month (ETA 2014/06/22).
bool RunningForChannelManager() { return running(); }
protected:
// Blocks the calling thread until this thread has terminated.
void Join();
@ -213,10 +220,13 @@ class Thread : public MessageQueue {
// being created.
bool WrapCurrentWithThreadManager(ThreadManager* thread_manager);
// Return true if the thread was started and hasn't yet stopped.
bool running() { return running_.Wait(0); }
std::list<_SendMessage> sendlist_;
std::string name_;
ThreadPriority priority_;
bool started_;
Event running_; // Signalled means running.
#if defined(WEBRTC_POSIX)
pthread_t thread_;
@ -228,7 +238,6 @@ class Thread : public MessageQueue {
#endif
bool owned_;
bool delete_self_when_complete_;
friend class ThreadManager;

View File

@ -244,32 +244,14 @@ TEST(ThreadTest, Wrap) {
current_thread->UnwrapCurrent();
CustomThread* cthread = new CustomThread();
EXPECT_TRUE(cthread->WrapCurrent());
EXPECT_TRUE(cthread->started());
EXPECT_TRUE(cthread->RunningForTest());
EXPECT_FALSE(cthread->IsOwned());
cthread->UnwrapCurrent();
EXPECT_FALSE(cthread->started());
EXPECT_FALSE(cthread->RunningForTest());
delete cthread;
current_thread->WrapCurrent();
}
// Test that calling Release on a thread causes it to self-destruct when
// it's finished running
TEST(ThreadTest, Release) {
scoped_ptr<Event> event(new Event(true, false));
// Ensure the event is initialized.
event->Reset();
Thread* thread = new SignalWhenDestroyedThread(event.get());
thread->Start();
thread->Release();
// The event should get signaled when the thread completes, which should
// be nearly instantaneous, since it doesn't do anything. For safety,
// give it 3 seconds in case the machine is under load.
bool signaled = event->Wait(3000);
EXPECT_TRUE(signaled);
}
TEST(ThreadTest, Invoke) {
// Create and start the thread.
Thread thread;