From 9674d7cb89228509203b846f27a74af84762f682 Mon Sep 17 00:00:00 2001 From: jbauch Date: Fri, 19 Feb 2016 07:16:16 -0800 Subject: [PATCH] Revert of Prevent data race in MessageQueue. (patchset #3 id:40001 of https://codereview.webrtc.org/1675923002/ ) Reason for revert: Broke chromium.webrtc.fyi bots: https://build.chromium.org/p/chromium.webrtc.fyi/builders/Mac%20Builder/builds/9891 https://build.chromium.org/p/chromium.webrtc.fyi/builders/Mac%20GN/builds/11416 Fails with ----- Undefined symbols for architecture x86_64: "rtc::SharedExclusiveLock::LockShared()", referenced from: rtc::MessageQueue::DoDestroy() in librtc_base.a(messagequeue.o) rtc::MessageQueue::socketserver() in librtc_base.a(messagequeue.o) rtc::MessageQueue::WakeUpSocketServer() in librtc_base.a(messagequeue.o) rtc::MessageQueue::Quit() in librtc_base.a(messagequeue.o) rtc::MessageQueue::Get(rtc::Message*, int, bool) in librtc_base.a(messagequeue.o) rtc::MessageQueue::Post(rtc::MessageHandler*, unsigned int, rtc::MessageData*, bool) in librtc_base.a(messagequeue.o) rtc::MessageQueue::DoDelayPost(int, unsigned int, rtc::MessageHandler*, unsigned int, rtc::MessageData*) in librtc_base.a(messagequeue.o) ... "rtc::SharedExclusiveLock::UnlockShared()", referenced from: rtc::MessageQueue::DoDestroy() in librtc_base.a(messagequeue.o) rtc::MessageQueue::socketserver() in librtc_base.a(messagequeue.o) rtc::MessageQueue::WakeUpSocketServer() in librtc_base.a(messagequeue.o) rtc::MessageQueue::Quit() in librtc_base.a(messagequeue.o) rtc::MessageQueue::Get(rtc::Message*, int, bool) in librtc_base.a(messagequeue.o) rtc::MessageQueue::Post(rtc::MessageHandler*, unsigned int, rtc::MessageData*, bool) in librtc_base.a(messagequeue.o) rtc::MessageQueue::DoDelayPost(int, unsigned int, rtc::MessageHandler*, unsigned int, rtc::MessageData*) in librtc_base.a(messagequeue.o) ... "rtc::SharedExclusiveLock::SharedExclusiveLock()", referenced from: rtc::MessageQueue::MessageQueue(rtc::SocketServer*, bool) in librtc_base.a(messagequeue.o) ld: symbol(s) not found for architecture x86_64 ----- Looks like these are compiling without "webrtc/base/sharedexclusivelock.cc". Original issue's description: > Prevent data race in MessageQueue. > > The CL prevents a data race in MessageQueue where the variable "ss_" is > modified without a lock while sometimes read inside a lock. > > Also thread annotations have been added to the MessageQueue class. > > BUG=webrtc:5496 > > Committed: https://crrev.com/df88460372e7ce78c871a87774d7e6d82aac6ee3 > Cr-Commit-Position: refs/heads/master@{#11683} TBR=ivoc@webrtc.org,pthatcher@webrtc.org,deadbeef@webrtc.org # Skipping CQ checks because original CL landed less than 1 days ago. NOPRESUBMIT=true NOTREECHECKS=true NOTRY=true BUG=webrtc:5496 Review URL: https://codereview.webrtc.org/1714463003 Cr-Commit-Position: refs/heads/master@{#11686} --- webrtc/base/messagequeue.cc | 79 +++++++++++++------------------------ webrtc/base/messagequeue.h | 22 ++++------- webrtc/base/thread.cc | 3 +- 3 files changed, 37 insertions(+), 67 deletions(-) diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc index 61aa61192b..bbdb941ffa 100644 --- a/webrtc/base/messagequeue.cc +++ b/webrtc/base/messagequeue.cc @@ -117,8 +117,8 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) { // MessageQueue MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) - : fStop_(false), fPeekKeep_(false), - dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { + : ss_(ss), fStop_(false), fPeekKeep_(false), + dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { if (!ss_) { // Currently, MessageQueue holds a socket server, and is the base class for // Thread. It seems like it makes more sense for Thread to hold the socket @@ -159,37 +159,19 @@ void MessageQueue::DoDestroy() { SignalQueueDestroyed(); MessageQueueManager::Remove(this); Clear(NULL); - - SharedScope ss(&ss_lock_); if (ss_) { ss_->SetMessageQueue(NULL); } } -SocketServer* MessageQueue::socketserver() { - SharedScope ss(&ss_lock_); - return ss_; -} - void MessageQueue::set_socketserver(SocketServer* ss) { - // Need to lock exclusively here to prevent simultaneous modifications from - // other threads. Can't be a shared lock to prevent races with other reading - // threads. - // Other places that only read "ss_" can use a shared lock as simultaneous - // read access is allowed. - ExclusiveScope es(&ss_lock_); ss_ = ss ? ss : default_ss_.get(); ss_->SetMessageQueue(this); } -void MessageQueue::WakeUpSocketServer() { - SharedScope ss(&ss_lock_); - ss_->WakeUp(); -} - void MessageQueue::Quit() { fStop_ = true; - WakeUpSocketServer(); + ss_->WakeUp(); } bool MessageQueue::IsQuitting() { @@ -295,12 +277,9 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { cmsNext = cmsDelayNext; } - { - // Wait and multiplex in the meantime - SharedScope ss(&ss_lock_); - if (!ss_->Wait(cmsNext, process_io)) - return false; - } + // Wait and multiplex in the meantime + if (!ss_->Wait(cmsNext, process_io)) + return false; // If the specified timeout expired, return @@ -328,18 +307,16 @@ void MessageQueue::Post(MessageHandler* phandler, // Add the message to the end of the queue // Signal for the multiplexer to return - { - CritScope cs(&crit_); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - if (time_sensitive) { - msg.ts_sensitive = Time() + kMaxMsgLatency; - } - msgq_.push_back(msg); + CritScope cs(&crit_); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (time_sensitive) { + msg.ts_sensitive = Time() + kMaxMsgLatency; } - WakeUpSocketServer(); + msgq_.push_back(msg); + ss_->WakeUp(); } void MessageQueue::PostDelayed(int cmsDelay, @@ -368,20 +345,18 @@ void MessageQueue::DoDelayPost(int cmsDelay, // Add to the priority queue. Gets sorted soonest first. // Signal for the multiplexer to return. - { - CritScope cs(&crit_); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); - dmsgq_.push(dmsg); - // If this message queue processes 1 message every millisecond for 50 days, - // we will wrap this number. Even then, only messages with identical times - // will be misordered, and then only briefly. This is probably ok. - VERIFY(0 != ++dmsgq_next_num_); - } - WakeUpSocketServer(); + CritScope cs(&crit_); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); + dmsgq_.push(dmsg); + // If this message queue processes 1 message every millisecond for 50 days, + // we will wrap this number. Even then, only messages with identical times + // will be misordered, and then only briefly. This is probably ok. + VERIFY(0 != ++dmsgq_next_num_); + ss_->WakeUp(); } int MessageQueue::GetDelay() { diff --git a/webrtc/base/messagequeue.h b/webrtc/base/messagequeue.h index efc479cf26..a7991a8923 100644 --- a/webrtc/base/messagequeue.h +++ b/webrtc/base/messagequeue.h @@ -24,11 +24,9 @@ #include "webrtc/base/messagehandler.h" #include "webrtc/base/scoped_ptr.h" #include "webrtc/base/scoped_ref_ptr.h" -#include "webrtc/base/sharedexclusivelock.h" #include "webrtc/base/sigslot.h" #include "webrtc/base/socketserver.h" #include "webrtc/base/timeutils.h" -#include "webrtc/base/thread_annotations.h" namespace rtc { @@ -183,7 +181,7 @@ class MessageQueue { // calling Clear on the object from a different thread. virtual ~MessageQueue(); - SocketServer* socketserver(); + SocketServer* socketserver() { return ss_; } void set_socketserver(SocketServer* ss); // Note: The behavior of MessageQueue has changed. When a MQ is stopped, @@ -262,25 +260,21 @@ class MessageQueue { // destructor. void DoDestroy(); - void WakeUpSocketServer(); - + // The SocketServer is not owned by MessageQueue. + SocketServer* ss_; + // If a server isn't supplied in the constructor, use this one. + scoped_ptr default_ss_; bool fStop_; bool fPeekKeep_; Message msgPeek_; - MessageList msgq_ GUARDED_BY(crit_); - PriorityQueue dmsgq_ GUARDED_BY(crit_); - uint32_t dmsgq_next_num_ GUARDED_BY(crit_); + MessageList msgq_; + PriorityQueue dmsgq_; + uint32_t dmsgq_next_num_; CriticalSection crit_; bool fInitialized_; bool fDestroyed_; private: - // The SocketServer is not owned by MessageQueue. - SocketServer* ss_ GUARDED_BY(ss_lock_); - // If a server isn't supplied in the constructor, use this one. - scoped_ptr default_ss_; - SharedExclusiveLock ss_lock_; - RTC_DISALLOW_COPY_AND_ASSIGN(MessageQueue); }; diff --git a/webrtc/base/thread.cc b/webrtc/base/thread.cc index dc8ccdfd4d..cda4ba475c 100644 --- a/webrtc/base/thread.cc +++ b/webrtc/base/thread.cc @@ -353,7 +353,8 @@ void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) { } // Wait for a reply - WakeUpSocketServer(); + + ss_->WakeUp(); bool waited = false; crit_.Enter();