Revert of Prevent data race in MessageQueue. (patchset #3 id:40001 of https://codereview.webrtc.org/1675923002/ )

Reason for revert:
Broke chromium.webrtc.fyi bots:
https://build.chromium.org/p/chromium.webrtc.fyi/builders/Mac%20Builder/builds/9891
https://build.chromium.org/p/chromium.webrtc.fyi/builders/Mac%20GN/builds/11416

Fails with
-----
Undefined symbols for architecture x86_64:
  "rtc::SharedExclusiveLock::LockShared()", referenced from:
      rtc::MessageQueue::DoDestroy() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::socketserver() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::WakeUpSocketServer() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::Quit() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::Get(rtc::Message*, int, bool) in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::Post(rtc::MessageHandler*, unsigned int, rtc::MessageData*, bool) in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::DoDelayPost(int, unsigned int, rtc::MessageHandler*, unsigned int, rtc::MessageData*) in librtc_base.a(messagequeue.o)
      ...
  "rtc::SharedExclusiveLock::UnlockShared()", referenced from:
      rtc::MessageQueue::DoDestroy() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::socketserver() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::WakeUpSocketServer() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::Quit() in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::Get(rtc::Message*, int, bool) in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::Post(rtc::MessageHandler*, unsigned int, rtc::MessageData*, bool) in librtc_base.a(messagequeue.o)
      rtc::MessageQueue::DoDelayPost(int, unsigned int, rtc::MessageHandler*, unsigned int, rtc::MessageData*) in librtc_base.a(messagequeue.o)
      ...
  "rtc::SharedExclusiveLock::SharedExclusiveLock()", referenced from:
      rtc::MessageQueue::MessageQueue(rtc::SocketServer*, bool) in librtc_base.a(messagequeue.o)
ld: symbol(s) not found for architecture x86_64
-----

Looks like these are compiling without "webrtc/base/sharedexclusivelock.cc".

Original issue's description:
> Prevent data race in MessageQueue.
>
> The CL prevents a data race in MessageQueue where the variable "ss_" is
> modified without a lock while sometimes read inside a lock.
>
> Also thread annotations have been added to the MessageQueue class.
>
> BUG=webrtc:5496
>
> Committed: https://crrev.com/df88460372e7ce78c871a87774d7e6d82aac6ee3
> Cr-Commit-Position: refs/heads/master@{#11683}

TBR=ivoc@webrtc.org,pthatcher@webrtc.org,deadbeef@webrtc.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
BUG=webrtc:5496

Review URL: https://codereview.webrtc.org/1714463003

Cr-Commit-Position: refs/heads/master@{#11686}
This commit is contained in:
jbauch 2016-02-19 07:16:16 -08:00 committed by Commit bot
parent fc968a283c
commit 9674d7cb89
3 changed files with 37 additions and 67 deletions

View File

@ -117,8 +117,8 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) {
// MessageQueue
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
: fStop_(false), fPeekKeep_(false),
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
: ss_(ss), fStop_(false), fPeekKeep_(false),
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) {
if (!ss_) {
// Currently, MessageQueue holds a socket server, and is the base class for
// Thread. It seems like it makes more sense for Thread to hold the socket
@ -159,37 +159,19 @@ void MessageQueue::DoDestroy() {
SignalQueueDestroyed();
MessageQueueManager::Remove(this);
Clear(NULL);
SharedScope ss(&ss_lock_);
if (ss_) {
ss_->SetMessageQueue(NULL);
}
}
SocketServer* MessageQueue::socketserver() {
SharedScope ss(&ss_lock_);
return ss_;
}
void MessageQueue::set_socketserver(SocketServer* ss) {
// Need to lock exclusively here to prevent simultaneous modifications from
// other threads. Can't be a shared lock to prevent races with other reading
// threads.
// Other places that only read "ss_" can use a shared lock as simultaneous
// read access is allowed.
ExclusiveScope es(&ss_lock_);
ss_ = ss ? ss : default_ss_.get();
ss_->SetMessageQueue(this);
}
void MessageQueue::WakeUpSocketServer() {
SharedScope ss(&ss_lock_);
ss_->WakeUp();
}
void MessageQueue::Quit() {
fStop_ = true;
WakeUpSocketServer();
ss_->WakeUp();
}
bool MessageQueue::IsQuitting() {
@ -295,12 +277,9 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
cmsNext = cmsDelayNext;
}
{
// Wait and multiplex in the meantime
SharedScope ss(&ss_lock_);
if (!ss_->Wait(cmsNext, process_io))
return false;
}
// Wait and multiplex in the meantime
if (!ss_->Wait(cmsNext, process_io))
return false;
// If the specified timeout expired, return
@ -328,18 +307,16 @@ void MessageQueue::Post(MessageHandler* phandler,
// Add the message to the end of the queue
// Signal for the multiplexer to return
{
CritScope cs(&crit_);
Message msg;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (time_sensitive) {
msg.ts_sensitive = Time() + kMaxMsgLatency;
}
msgq_.push_back(msg);
CritScope cs(&crit_);
Message msg;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (time_sensitive) {
msg.ts_sensitive = Time() + kMaxMsgLatency;
}
WakeUpSocketServer();
msgq_.push_back(msg);
ss_->WakeUp();
}
void MessageQueue::PostDelayed(int cmsDelay,
@ -368,20 +345,18 @@ void MessageQueue::DoDelayPost(int cmsDelay,
// Add to the priority queue. Gets sorted soonest first.
// Signal for the multiplexer to return.
{
CritScope cs(&crit_);
Message msg;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
dmsgq_.push(dmsg);
// If this message queue processes 1 message every millisecond for 50 days,
// we will wrap this number. Even then, only messages with identical times
// will be misordered, and then only briefly. This is probably ok.
VERIFY(0 != ++dmsgq_next_num_);
}
WakeUpSocketServer();
CritScope cs(&crit_);
Message msg;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
dmsgq_.push(dmsg);
// If this message queue processes 1 message every millisecond for 50 days,
// we will wrap this number. Even then, only messages with identical times
// will be misordered, and then only briefly. This is probably ok.
VERIFY(0 != ++dmsgq_next_num_);
ss_->WakeUp();
}
int MessageQueue::GetDelay() {

View File

@ -24,11 +24,9 @@
#include "webrtc/base/messagehandler.h"
#include "webrtc/base/scoped_ptr.h"
#include "webrtc/base/scoped_ref_ptr.h"
#include "webrtc/base/sharedexclusivelock.h"
#include "webrtc/base/sigslot.h"
#include "webrtc/base/socketserver.h"
#include "webrtc/base/timeutils.h"
#include "webrtc/base/thread_annotations.h"
namespace rtc {
@ -183,7 +181,7 @@ class MessageQueue {
// calling Clear on the object from a different thread.
virtual ~MessageQueue();
SocketServer* socketserver();
SocketServer* socketserver() { return ss_; }
void set_socketserver(SocketServer* ss);
// Note: The behavior of MessageQueue has changed. When a MQ is stopped,
@ -262,25 +260,21 @@ class MessageQueue {
// destructor.
void DoDestroy();
void WakeUpSocketServer();
// The SocketServer is not owned by MessageQueue.
SocketServer* ss_;
// If a server isn't supplied in the constructor, use this one.
scoped_ptr<SocketServer> default_ss_;
bool fStop_;
bool fPeekKeep_;
Message msgPeek_;
MessageList msgq_ GUARDED_BY(crit_);
PriorityQueue dmsgq_ GUARDED_BY(crit_);
uint32_t dmsgq_next_num_ GUARDED_BY(crit_);
MessageList msgq_;
PriorityQueue dmsgq_;
uint32_t dmsgq_next_num_;
CriticalSection crit_;
bool fInitialized_;
bool fDestroyed_;
private:
// The SocketServer is not owned by MessageQueue.
SocketServer* ss_ GUARDED_BY(ss_lock_);
// If a server isn't supplied in the constructor, use this one.
scoped_ptr<SocketServer> default_ss_;
SharedExclusiveLock ss_lock_;
RTC_DISALLOW_COPY_AND_ASSIGN(MessageQueue);
};

View File

@ -353,7 +353,8 @@ void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) {
}
// Wait for a reply
WakeUpSocketServer();
ss_->WakeUp();
bool waited = false;
crit_.Enter();