Reland of "Protect MessageQueue stop field with a critical section to avoid data races." (refs/heads/master@{#13430}).

It was reverted in "refs/heads/master@{#13431}" due to breaking Chrome FYI bots.
Fix for chromium was submmited in https://codereview.chromium.org/2159753002.

This reverts commit a2c900877d8338130210c99fec1c8e8e59defea4.

R=mflodman@webrtc.org

Review URL: https://codereview.webrtc.org/2166493004 .

Cr-Commit-Position: refs/heads/master@{#13508}
This commit is contained in:
André Susano Pinto 2016-07-22 13:30:05 +02:00
parent f52767ca67
commit 02a5797908
4 changed files with 16 additions and 12 deletions

View File

@ -150,8 +150,12 @@ void MessageQueueManager::ProcessAllMessageQueuesInternal() {
//------------------------------------------------------------------
// MessageQueue
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
: fStop_(false), fPeekKeep_(false),
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
: fPeekKeep_(false),
dmsgq_next_num_(0),
fInitialized_(false),
fDestroyed_(false),
stop_(0),
ss_(ss) {
RTC_DCHECK(ss);
// Currently, MessageQueue holds a socket server, and is the base class for
// Thread. It seems like it makes more sense for Thread to hold the socket
@ -223,16 +227,16 @@ void MessageQueue::WakeUpSocketServer() {
}
void MessageQueue::Quit() {
fStop_ = true;
AtomicOps::ReleaseStore(&stop_, 1);
WakeUpSocketServer();
}
bool MessageQueue::IsQuitting() {
return fStop_;
return AtomicOps::AcquireLoad(&stop_) != 0;
}
void MessageQueue::Restart() {
fStop_ = false;
AtomicOps::ReleaseStore(&stop_, 0);
}
bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
@ -316,7 +320,7 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
return true;
}
if (fStop_)
if (IsQuitting())
break;
// Which is shorter, the delay wait or the asked wait?
@ -357,7 +361,7 @@ void MessageQueue::Post(const Location& posted_from,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
if (fStop_)
if (IsQuitting())
return;
// Keep thread safe
@ -413,7 +417,7 @@ void MessageQueue::DoDelayPost(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
if (fStop_) {
if (IsQuitting()) {
return;
}

View File

@ -288,7 +288,6 @@ class MessageQueue {
void WakeUpSocketServer();
bool fStop_;
bool fPeekKeep_;
Message msgPeek_;
MessageList msgq_ GUARDED_BY(crit_);
@ -299,6 +298,8 @@ class MessageQueue {
bool fDestroyed_;
private:
volatile int stop_;
// The SocketServer might not be owned by MessageQueue.
SocketServer* ss_ GUARDED_BY(ss_lock_);
// Used if SocketServer ownership lies with |this|.

View File

@ -218,7 +218,7 @@ bool Thread::Start(Runnable* runnable) {
ASSERT(!running());
if (running()) return false;
Restart(); // reset fStop_ if the thread is being restarted
Restart(); // reset IsQuitting() if the thread is being restarted
// Make sure that ThreadManager is created on the main thread before
// we start a new thread.
@ -346,7 +346,7 @@ void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
if (fStop_)
if (IsQuitting())
return;
// Sent messages are sent to the MessageHandler directly, in the context

View File

@ -24,7 +24,6 @@ char kTSanDefaultSuppressions[] =
// WebRTC specific suppressions.
// Split up suppressions covered previously by thread.cc and messagequeue.cc.
"race:rtc::MessageQueue::Quit\n"
"race:vp8cx_remove_encoder_threads\n"
"race:third_party/libvpx/source/libvpx/vp9/common/vp9_scan.h\n"