diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc index 61aa61192b..bbdb941ffa 100644 --- a/webrtc/base/messagequeue.cc +++ b/webrtc/base/messagequeue.cc @@ -117,8 +117,8 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) { // MessageQueue MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) - : fStop_(false), fPeekKeep_(false), - dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { + : ss_(ss), fStop_(false), fPeekKeep_(false), + dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { if (!ss_) { // Currently, MessageQueue holds a socket server, and is the base class for // Thread. It seems like it makes more sense for Thread to hold the socket @@ -159,37 +159,19 @@ void MessageQueue::DoDestroy() { SignalQueueDestroyed(); MessageQueueManager::Remove(this); Clear(NULL); - - SharedScope ss(&ss_lock_); if (ss_) { ss_->SetMessageQueue(NULL); } } -SocketServer* MessageQueue::socketserver() { - SharedScope ss(&ss_lock_); - return ss_; -} - void MessageQueue::set_socketserver(SocketServer* ss) { - // Need to lock exclusively here to prevent simultaneous modifications from - // other threads. Can't be a shared lock to prevent races with other reading - // threads. - // Other places that only read "ss_" can use a shared lock as simultaneous - // read access is allowed. - ExclusiveScope es(&ss_lock_); ss_ = ss ? ss : default_ss_.get(); ss_->SetMessageQueue(this); } -void MessageQueue::WakeUpSocketServer() { - SharedScope ss(&ss_lock_); - ss_->WakeUp(); -} - void MessageQueue::Quit() { fStop_ = true; - WakeUpSocketServer(); + ss_->WakeUp(); } bool MessageQueue::IsQuitting() { @@ -295,12 +277,9 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { cmsNext = cmsDelayNext; } - { - // Wait and multiplex in the meantime - SharedScope ss(&ss_lock_); - if (!ss_->Wait(cmsNext, process_io)) - return false; - } + // Wait and multiplex in the meantime + if (!ss_->Wait(cmsNext, process_io)) + return false; // If the specified timeout expired, return @@ -328,18 +307,16 @@ void MessageQueue::Post(MessageHandler* phandler, // Add the message to the end of the queue // Signal for the multiplexer to return - { - CritScope cs(&crit_); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - if (time_sensitive) { - msg.ts_sensitive = Time() + kMaxMsgLatency; - } - msgq_.push_back(msg); + CritScope cs(&crit_); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (time_sensitive) { + msg.ts_sensitive = Time() + kMaxMsgLatency; } - WakeUpSocketServer(); + msgq_.push_back(msg); + ss_->WakeUp(); } void MessageQueue::PostDelayed(int cmsDelay, @@ -368,20 +345,18 @@ void MessageQueue::DoDelayPost(int cmsDelay, // Add to the priority queue. Gets sorted soonest first. // Signal for the multiplexer to return. - { - CritScope cs(&crit_); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); - dmsgq_.push(dmsg); - // If this message queue processes 1 message every millisecond for 50 days, - // we will wrap this number. Even then, only messages with identical times - // will be misordered, and then only briefly. This is probably ok. - VERIFY(0 != ++dmsgq_next_num_); - } - WakeUpSocketServer(); + CritScope cs(&crit_); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); + dmsgq_.push(dmsg); + // If this message queue processes 1 message every millisecond for 50 days, + // we will wrap this number. Even then, only messages with identical times + // will be misordered, and then only briefly. This is probably ok. + VERIFY(0 != ++dmsgq_next_num_); + ss_->WakeUp(); } int MessageQueue::GetDelay() { diff --git a/webrtc/base/messagequeue.h b/webrtc/base/messagequeue.h index efc479cf26..a7991a8923 100644 --- a/webrtc/base/messagequeue.h +++ b/webrtc/base/messagequeue.h @@ -24,11 +24,9 @@ #include "webrtc/base/messagehandler.h" #include "webrtc/base/scoped_ptr.h" #include "webrtc/base/scoped_ref_ptr.h" -#include "webrtc/base/sharedexclusivelock.h" #include "webrtc/base/sigslot.h" #include "webrtc/base/socketserver.h" #include "webrtc/base/timeutils.h" -#include "webrtc/base/thread_annotations.h" namespace rtc { @@ -183,7 +181,7 @@ class MessageQueue { // calling Clear on the object from a different thread. virtual ~MessageQueue(); - SocketServer* socketserver(); + SocketServer* socketserver() { return ss_; } void set_socketserver(SocketServer* ss); // Note: The behavior of MessageQueue has changed. When a MQ is stopped, @@ -262,25 +260,21 @@ class MessageQueue { // destructor. void DoDestroy(); - void WakeUpSocketServer(); - + // The SocketServer is not owned by MessageQueue. + SocketServer* ss_; + // If a server isn't supplied in the constructor, use this one. + scoped_ptr default_ss_; bool fStop_; bool fPeekKeep_; Message msgPeek_; - MessageList msgq_ GUARDED_BY(crit_); - PriorityQueue dmsgq_ GUARDED_BY(crit_); - uint32_t dmsgq_next_num_ GUARDED_BY(crit_); + MessageList msgq_; + PriorityQueue dmsgq_; + uint32_t dmsgq_next_num_; CriticalSection crit_; bool fInitialized_; bool fDestroyed_; private: - // The SocketServer is not owned by MessageQueue. - SocketServer* ss_ GUARDED_BY(ss_lock_); - // If a server isn't supplied in the constructor, use this one. - scoped_ptr default_ss_; - SharedExclusiveLock ss_lock_; - RTC_DISALLOW_COPY_AND_ASSIGN(MessageQueue); }; diff --git a/webrtc/base/thread.cc b/webrtc/base/thread.cc index dc8ccdfd4d..cda4ba475c 100644 --- a/webrtc/base/thread.cc +++ b/webrtc/base/thread.cc @@ -353,7 +353,8 @@ void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) { } // Wait for a reply - WakeUpSocketServer(); + + ss_->WakeUp(); bool waited = false; crit_.Enter();