Reland: Prevent data race in MessageQueue.
The CL prevents a data race in MessageQueue where the variable "ss_" is modified without a lock while sometimes read inside a lock. Also thread annotations have been added to the MessageQueue class. This was already reviewed and landed in https://codereview.webrtc.org/1675923002/ but failed in Chromium GN builds due to sharedexclusivelock.cc not being compiled in these builds. This changed in https://codereview.webrtc.org/1712773003/ so the reland should work fine now. BUG=webrtc:5496 Review URL: https://codereview.webrtc.org/1729893002 Cr-Commit-Position: refs/heads/master@{#11758}
This commit is contained in:
parent
0c74ae1e4d
commit
9ccedc38f6
@ -117,8 +117,8 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) {
|
||||
// MessageQueue
|
||||
|
||||
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
|
||||
: ss_(ss), fStop_(false), fPeekKeep_(false),
|
||||
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) {
|
||||
: fStop_(false), fPeekKeep_(false),
|
||||
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
|
||||
if (!ss_) {
|
||||
// Currently, MessageQueue holds a socket server, and is the base class for
|
||||
// Thread. It seems like it makes more sense for Thread to hold the socket
|
||||
@ -159,19 +159,37 @@ void MessageQueue::DoDestroy() {
|
||||
SignalQueueDestroyed();
|
||||
MessageQueueManager::Remove(this);
|
||||
Clear(NULL);
|
||||
|
||||
SharedScope ss(&ss_lock_);
|
||||
if (ss_) {
|
||||
ss_->SetMessageQueue(NULL);
|
||||
}
|
||||
}
|
||||
|
||||
SocketServer* MessageQueue::socketserver() {
|
||||
SharedScope ss(&ss_lock_);
|
||||
return ss_;
|
||||
}
|
||||
|
||||
void MessageQueue::set_socketserver(SocketServer* ss) {
|
||||
// Need to lock exclusively here to prevent simultaneous modifications from
|
||||
// other threads. Can't be a shared lock to prevent races with other reading
|
||||
// threads.
|
||||
// Other places that only read "ss_" can use a shared lock as simultaneous
|
||||
// read access is allowed.
|
||||
ExclusiveScope es(&ss_lock_);
|
||||
ss_ = ss ? ss : default_ss_.get();
|
||||
ss_->SetMessageQueue(this);
|
||||
}
|
||||
|
||||
void MessageQueue::WakeUpSocketServer() {
|
||||
SharedScope ss(&ss_lock_);
|
||||
ss_->WakeUp();
|
||||
}
|
||||
|
||||
void MessageQueue::Quit() {
|
||||
fStop_ = true;
|
||||
ss_->WakeUp();
|
||||
WakeUpSocketServer();
|
||||
}
|
||||
|
||||
bool MessageQueue::IsQuitting() {
|
||||
@ -277,9 +295,12 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
|
||||
cmsNext = cmsDelayNext;
|
||||
}
|
||||
|
||||
// Wait and multiplex in the meantime
|
||||
if (!ss_->Wait(cmsNext, process_io))
|
||||
return false;
|
||||
{
|
||||
// Wait and multiplex in the meantime
|
||||
SharedScope ss(&ss_lock_);
|
||||
if (!ss_->Wait(cmsNext, process_io))
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the specified timeout expired, return
|
||||
|
||||
@ -307,16 +328,18 @@ void MessageQueue::Post(MessageHandler* phandler,
|
||||
// Add the message to the end of the queue
|
||||
// Signal for the multiplexer to return
|
||||
|
||||
CritScope cs(&crit_);
|
||||
Message msg;
|
||||
msg.phandler = phandler;
|
||||
msg.message_id = id;
|
||||
msg.pdata = pdata;
|
||||
if (time_sensitive) {
|
||||
msg.ts_sensitive = Time() + kMaxMsgLatency;
|
||||
{
|
||||
CritScope cs(&crit_);
|
||||
Message msg;
|
||||
msg.phandler = phandler;
|
||||
msg.message_id = id;
|
||||
msg.pdata = pdata;
|
||||
if (time_sensitive) {
|
||||
msg.ts_sensitive = Time() + kMaxMsgLatency;
|
||||
}
|
||||
msgq_.push_back(msg);
|
||||
}
|
||||
msgq_.push_back(msg);
|
||||
ss_->WakeUp();
|
||||
WakeUpSocketServer();
|
||||
}
|
||||
|
||||
void MessageQueue::PostDelayed(int cmsDelay,
|
||||
@ -345,18 +368,20 @@ void MessageQueue::DoDelayPost(int cmsDelay,
|
||||
// Add to the priority queue. Gets sorted soonest first.
|
||||
// Signal for the multiplexer to return.
|
||||
|
||||
CritScope cs(&crit_);
|
||||
Message msg;
|
||||
msg.phandler = phandler;
|
||||
msg.message_id = id;
|
||||
msg.pdata = pdata;
|
||||
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
|
||||
dmsgq_.push(dmsg);
|
||||
// If this message queue processes 1 message every millisecond for 50 days,
|
||||
// we will wrap this number. Even then, only messages with identical times
|
||||
// will be misordered, and then only briefly. This is probably ok.
|
||||
VERIFY(0 != ++dmsgq_next_num_);
|
||||
ss_->WakeUp();
|
||||
{
|
||||
CritScope cs(&crit_);
|
||||
Message msg;
|
||||
msg.phandler = phandler;
|
||||
msg.message_id = id;
|
||||
msg.pdata = pdata;
|
||||
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
|
||||
dmsgq_.push(dmsg);
|
||||
// If this message queue processes 1 message every millisecond for 50 days,
|
||||
// we will wrap this number. Even then, only messages with identical times
|
||||
// will be misordered, and then only briefly. This is probably ok.
|
||||
VERIFY(0 != ++dmsgq_next_num_);
|
||||
}
|
||||
WakeUpSocketServer();
|
||||
}
|
||||
|
||||
int MessageQueue::GetDelay() {
|
||||
|
||||
@ -24,9 +24,11 @@
|
||||
#include "webrtc/base/messagehandler.h"
|
||||
#include "webrtc/base/scoped_ptr.h"
|
||||
#include "webrtc/base/scoped_ref_ptr.h"
|
||||
#include "webrtc/base/sharedexclusivelock.h"
|
||||
#include "webrtc/base/sigslot.h"
|
||||
#include "webrtc/base/socketserver.h"
|
||||
#include "webrtc/base/timeutils.h"
|
||||
#include "webrtc/base/thread_annotations.h"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
@ -181,7 +183,7 @@ class MessageQueue {
|
||||
// calling Clear on the object from a different thread.
|
||||
virtual ~MessageQueue();
|
||||
|
||||
SocketServer* socketserver() { return ss_; }
|
||||
SocketServer* socketserver();
|
||||
void set_socketserver(SocketServer* ss);
|
||||
|
||||
// Note: The behavior of MessageQueue has changed. When a MQ is stopped,
|
||||
@ -260,21 +262,25 @@ class MessageQueue {
|
||||
// destructor.
|
||||
void DoDestroy();
|
||||
|
||||
// The SocketServer is not owned by MessageQueue.
|
||||
SocketServer* ss_;
|
||||
// If a server isn't supplied in the constructor, use this one.
|
||||
scoped_ptr<SocketServer> default_ss_;
|
||||
void WakeUpSocketServer();
|
||||
|
||||
bool fStop_;
|
||||
bool fPeekKeep_;
|
||||
Message msgPeek_;
|
||||
MessageList msgq_;
|
||||
PriorityQueue dmsgq_;
|
||||
uint32_t dmsgq_next_num_;
|
||||
MessageList msgq_ GUARDED_BY(crit_);
|
||||
PriorityQueue dmsgq_ GUARDED_BY(crit_);
|
||||
uint32_t dmsgq_next_num_ GUARDED_BY(crit_);
|
||||
CriticalSection crit_;
|
||||
bool fInitialized_;
|
||||
bool fDestroyed_;
|
||||
|
||||
private:
|
||||
// The SocketServer is not owned by MessageQueue.
|
||||
SocketServer* ss_ GUARDED_BY(ss_lock_);
|
||||
// If a server isn't supplied in the constructor, use this one.
|
||||
scoped_ptr<SocketServer> default_ss_;
|
||||
SharedExclusiveLock ss_lock_;
|
||||
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(MessageQueue);
|
||||
};
|
||||
|
||||
|
||||
@ -353,8 +353,7 @@ void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) {
|
||||
}
|
||||
|
||||
// Wait for a reply
|
||||
|
||||
ss_->WakeUp();
|
||||
WakeUpSocketServer();
|
||||
|
||||
bool waited = false;
|
||||
crit_.Enter();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user