diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc index bbdb941ffa..61aa61192b 100644 --- a/webrtc/base/messagequeue.cc +++ b/webrtc/base/messagequeue.cc @@ -117,8 +117,8 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) { // MessageQueue MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) - : ss_(ss), fStop_(false), fPeekKeep_(false), - dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { + : fStop_(false), fPeekKeep_(false), + dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { if (!ss_) { // Currently, MessageQueue holds a socket server, and is the base class for // Thread. It seems like it makes more sense for Thread to hold the socket @@ -159,19 +159,37 @@ void MessageQueue::DoDestroy() { SignalQueueDestroyed(); MessageQueueManager::Remove(this); Clear(NULL); + + SharedScope ss(&ss_lock_); if (ss_) { ss_->SetMessageQueue(NULL); } } +SocketServer* MessageQueue::socketserver() { + SharedScope ss(&ss_lock_); + return ss_; +} + void MessageQueue::set_socketserver(SocketServer* ss) { + // Need to lock exclusively here to prevent simultaneous modifications from + // other threads. Can't be a shared lock to prevent races with other reading + // threads. + // Other places that only read "ss_" can use a shared lock as simultaneous + // read access is allowed. + ExclusiveScope es(&ss_lock_); ss_ = ss ? ss : default_ss_.get(); ss_->SetMessageQueue(this); } +void MessageQueue::WakeUpSocketServer() { + SharedScope ss(&ss_lock_); + ss_->WakeUp(); +} + void MessageQueue::Quit() { fStop_ = true; - ss_->WakeUp(); + WakeUpSocketServer(); } bool MessageQueue::IsQuitting() { @@ -277,9 +295,12 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { cmsNext = cmsDelayNext; } - // Wait and multiplex in the meantime - if (!ss_->Wait(cmsNext, process_io)) - return false; + { + // Wait and multiplex in the meantime + SharedScope ss(&ss_lock_); + if (!ss_->Wait(cmsNext, process_io)) + return false; + } // If the specified timeout expired, return @@ -307,16 +328,18 @@ void MessageQueue::Post(MessageHandler* phandler, // Add the message to the end of the queue // Signal for the multiplexer to return - CritScope cs(&crit_); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - if (time_sensitive) { - msg.ts_sensitive = Time() + kMaxMsgLatency; + { + CritScope cs(&crit_); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (time_sensitive) { + msg.ts_sensitive = Time() + kMaxMsgLatency; + } + msgq_.push_back(msg); } - msgq_.push_back(msg); - ss_->WakeUp(); + WakeUpSocketServer(); } void MessageQueue::PostDelayed(int cmsDelay, @@ -345,18 +368,20 @@ void MessageQueue::DoDelayPost(int cmsDelay, // Add to the priority queue. Gets sorted soonest first. // Signal for the multiplexer to return. - CritScope cs(&crit_); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); - dmsgq_.push(dmsg); - // If this message queue processes 1 message every millisecond for 50 days, - // we will wrap this number. Even then, only messages with identical times - // will be misordered, and then only briefly. This is probably ok. - VERIFY(0 != ++dmsgq_next_num_); - ss_->WakeUp(); + { + CritScope cs(&crit_); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); + dmsgq_.push(dmsg); + // If this message queue processes 1 message every millisecond for 50 days, + // we will wrap this number. Even then, only messages with identical times + // will be misordered, and then only briefly. This is probably ok. + VERIFY(0 != ++dmsgq_next_num_); + } + WakeUpSocketServer(); } int MessageQueue::GetDelay() { diff --git a/webrtc/base/messagequeue.h b/webrtc/base/messagequeue.h index a7991a8923..efc479cf26 100644 --- a/webrtc/base/messagequeue.h +++ b/webrtc/base/messagequeue.h @@ -24,9 +24,11 @@ #include "webrtc/base/messagehandler.h" #include "webrtc/base/scoped_ptr.h" #include "webrtc/base/scoped_ref_ptr.h" +#include "webrtc/base/sharedexclusivelock.h" #include "webrtc/base/sigslot.h" #include "webrtc/base/socketserver.h" #include "webrtc/base/timeutils.h" +#include "webrtc/base/thread_annotations.h" namespace rtc { @@ -181,7 +183,7 @@ class MessageQueue { // calling Clear on the object from a different thread. virtual ~MessageQueue(); - SocketServer* socketserver() { return ss_; } + SocketServer* socketserver(); void set_socketserver(SocketServer* ss); // Note: The behavior of MessageQueue has changed. When a MQ is stopped, @@ -260,21 +262,25 @@ class MessageQueue { // destructor. void DoDestroy(); - // The SocketServer is not owned by MessageQueue. - SocketServer* ss_; - // If a server isn't supplied in the constructor, use this one. - scoped_ptr default_ss_; + void WakeUpSocketServer(); + bool fStop_; bool fPeekKeep_; Message msgPeek_; - MessageList msgq_; - PriorityQueue dmsgq_; - uint32_t dmsgq_next_num_; + MessageList msgq_ GUARDED_BY(crit_); + PriorityQueue dmsgq_ GUARDED_BY(crit_); + uint32_t dmsgq_next_num_ GUARDED_BY(crit_); CriticalSection crit_; bool fInitialized_; bool fDestroyed_; private: + // The SocketServer is not owned by MessageQueue. + SocketServer* ss_ GUARDED_BY(ss_lock_); + // If a server isn't supplied in the constructor, use this one. + scoped_ptr default_ss_; + SharedExclusiveLock ss_lock_; + RTC_DISALLOW_COPY_AND_ASSIGN(MessageQueue); }; diff --git a/webrtc/base/thread.cc b/webrtc/base/thread.cc index cda4ba475c..dc8ccdfd4d 100644 --- a/webrtc/base/thread.cc +++ b/webrtc/base/thread.cc @@ -353,8 +353,7 @@ void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) { } // Wait for a reply - - ss_->WakeUp(); + WakeUpSocketServer(); bool waited = false; crit_.Enter();