diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index b20ec2d6a5..00a582cc06 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -67,7 +67,6 @@ class ScopedAutoReleasePool { namespace rtc { namespace { -const int kMaxMsgLatency = 150; // 150 ms const int kSlowDispatchLoggingThreshold = 50; // 50 ms class MessageHandlerWithTask final : public MessageHandler { @@ -305,7 +304,7 @@ Thread::Thread(std::unique_ptr ss) Thread::Thread(SocketServer* ss, bool do_init) : fPeekKeep_(false), - dmsgq_next_num_(0), + delayed_next_num_(0), fInitialized_(false), fDestroyed_(false), stop_(0), @@ -406,7 +405,7 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { int64_t msCurrent = msStart; while (true) { // Check for sent messages - ReceiveSends(); + ReceiveSendsFromThread(nullptr); // Check for posted events int64_t cmsDelayNext = kForever; @@ -421,33 +420,25 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { // triggered and calculate the next trigger time. if (first_pass) { first_pass = false; - while (!dmsgq_.empty()) { - if (msCurrent < dmsgq_.top().msTrigger_) { - cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); + while (!delayed_messages_.empty()) { + if (msCurrent < delayed_messages_.top().run_time_ms_) { + cmsDelayNext = + TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); break; } - msgq_.push_back(dmsgq_.top().msg_); - dmsgq_.pop(); + messages_.push_back(delayed_messages_.top().msg_); + delayed_messages_.pop(); } } // Pull a message off the message queue, if available. - if (msgq_.empty()) { + if (messages_.empty()) { break; } else { - *pmsg = msgq_.front(); - msgq_.pop_front(); + *pmsg = messages_.front(); + messages_.pop_front(); } } // crit_ is released here. - // Log a warning for time-sensitive messages that we're late to deliver. - if (pmsg->ts_sensitive) { - int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); - if (delay > 0) { - RTC_LOG_F(LS_WARNING) - << "id: " << pmsg->message_id - << " delay: " << (delay + kMaxMsgLatency) << "ms"; - } - } // If this was a dispose message, delete it and skip it. if (MQID_DISPOSE == pmsg->message_id) { RTC_DCHECK(nullptr == pmsg->phandler); @@ -495,6 +486,7 @@ void Thread::Post(const Location& posted_from, uint32_t id, MessageData* pdata, bool time_sensitive) { + RTC_DCHECK(!time_sensitive); if (IsQuitting()) { delete pdata; return; @@ -511,45 +503,32 @@ void Thread::Post(const Location& posted_from, msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; - if (time_sensitive) { - msg.ts_sensitive = TimeMillis() + kMaxMsgLatency; - } - msgq_.push_back(msg); + messages_.push_back(msg); } WakeUpSocketServer(); } void Thread::PostDelayed(const Location& posted_from, - int cmsDelay, + int delay_ms, MessageHandler* phandler, uint32_t id, MessageData* pdata) { - return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id, + return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id, pdata); } void Thread::PostAt(const Location& posted_from, - uint32_t tstamp, + int64_t run_at_ms, MessageHandler* phandler, uint32_t id, MessageData* pdata) { - // This should work even if it is used (unexpectedly). - int64_t delay = static_cast(TimeMillis()) - tstamp; - return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata); -} - -void Thread::PostAt(const Location& posted_from, - int64_t tstamp, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { - return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, + return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id, pdata); } void Thread::DoDelayPost(const Location& posted_from, - int64_t cmsDelay, - int64_t tstamp, + int64_t delay_ms, + int64_t run_at_ms, MessageHandler* phandler, uint32_t id, MessageData* pdata) { @@ -569,13 +548,13 @@ void Thread::DoDelayPost(const Location& posted_from, msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; - DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); - dmsgq_.push(dmsg); + DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg); + delayed_messages_.push(delayed); // If this message queue processes 1 message every millisecond for 50 days, // we will wrap this number. Even then, only messages with identical times // will be misordered, and then only briefly. This is probably ok. - ++dmsgq_next_num_; - RTC_DCHECK_NE(0, dmsgq_next_num_); + ++delayed_next_num_; + RTC_DCHECK_NE(0, delayed_next_num_); } WakeUpSocketServer(); } @@ -583,11 +562,11 @@ void Thread::DoDelayPost(const Location& posted_from, int Thread::GetDelay() { CritScope cs(&crit_); - if (!msgq_.empty()) + if (!messages_.empty()) return 0; - if (!dmsgq_.empty()) { - int delay = TimeUntil(dmsgq_.top().msTrigger_); + if (!delayed_messages_.empty()) { + int delay = TimeUntil(delayed_messages_.top().run_time_ms_); if (delay < 0) delay = 0; return delay; @@ -612,14 +591,14 @@ void Thread::ClearInternal(MessageHandler* phandler, // Remove from ordered message queue - for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { + for (auto it = messages_.begin(); it != messages_.end();) { if (it->Match(phandler, id)) { if (removed) { removed->push_back(*it); } else { delete it->pdata; } - it = msgq_.erase(it); + it = messages_.erase(it); } else { ++it; } @@ -627,9 +606,8 @@ void Thread::ClearInternal(MessageHandler* phandler, // Remove from priority queue. Not directly iterable, so use this approach - PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); - for (PriorityQueue::container_type::iterator it = new_end; - it != dmsgq_.container().end(); ++it) { + auto new_end = delayed_messages_.container().begin(); + for (auto it = new_end; it != delayed_messages_.container().end(); ++it) { if (it->msg_.Match(phandler, id)) { if (removed) { removed->push_back(it->msg_); @@ -640,8 +618,9 @@ void Thread::ClearInternal(MessageHandler* phandler, *new_end++ = *it; } } - dmsgq_.container().erase(new_end, dmsgq_.container().end()); - dmsgq_.reheap(); + delayed_messages_.container().erase(new_end, + delayed_messages_.container().end()); + delayed_messages_.reheap(); } void Thread::Dispatch(Message* pmsg) { @@ -909,10 +888,6 @@ void Thread::Send(const Location& posted_from, } } -void Thread::ReceiveSends() { - ReceiveSendsFromThread(nullptr); -} - void Thread::ReceiveSendsFromThread(const Thread* source) { // Receive a sent message. Cleanup scenarios: // - thread sending exits: We don't allow this, since thread can exit @@ -935,8 +910,7 @@ void Thread::ReceiveSendsFromThread(const Thread* source) { } bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { - for (std::list<_SendMessage>::iterator it = sendlist_.begin(); - it != sendlist_.end(); ++it) { + for (auto it = sendlist_.begin(); it != sendlist_.end(); ++it) { if (it->thread == source || source == nullptr) { *msg = *it; sendlist_.erase(it); @@ -1011,9 +985,7 @@ void Thread::Clear(MessageHandler* phandler, // Remove messages on sendlist_ with phandler // Object target cleared: remove from send list, wakeup/set ready // if sender not null. - - std::list<_SendMessage>::iterator iter = sendlist_.begin(); - while (iter != sendlist_.end()) { + for (auto iter = sendlist_.begin(); iter != sendlist_.end();) { _SendMessage smsg = *iter; if (smsg.msg.Match(phandler, id)) { if (removed) { diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 8b853a85d0..77aff611f9 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -228,24 +228,19 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { int cmsWait = kForever, bool process_io = true); virtual bool Peek(Message* pmsg, int cmsWait = 0); + // |time_sensitive| is deprecated and should always be false. virtual void Post(const Location& posted_from, MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr, bool time_sensitive = false); virtual void PostDelayed(const Location& posted_from, - int cmsDelay, + int delay_ms, MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr); virtual void PostAt(const Location& posted_from, - int64_t tstamp, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr); - // TODO(honghaiz): Remove this when all the dependencies are removed. - virtual void PostAt(const Location& posted_from, - uint32_t tstamp, + int64_t run_at_ms, MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr); @@ -253,15 +248,14 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { uint32_t id = MQID_ANY, MessageList* removed = nullptr); virtual void Dispatch(Message* pmsg); - virtual void ReceiveSends(); // Amount of time until the next message can be retrieved virtual int GetDelay(); bool empty() const { return size() == 0u; } size_t size() const { - CritScope cs(&crit_); // msgq_.size() is not thread safe. - return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); + CritScope cs(&crit_); + return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u); } // Internally posts a message which causes the doomed object to be deleted @@ -431,6 +425,33 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { #endif protected: + // DelayedMessage goes into a priority queue, sorted by trigger time. Messages + // with the same trigger time are processed in num_ (FIFO) order. + class DelayedMessage { + public: + DelayedMessage(int64_t delay, + int64_t run_time_ms, + uint32_t num, + const Message& msg) + : delay_ms_(delay), + run_time_ms_(run_time_ms), + message_number_(num), + msg_(msg) {} + + bool operator<(const DelayedMessage& dmsg) const { + return (dmsg.run_time_ms_ < run_time_ms_) || + ((dmsg.run_time_ms_ == run_time_ms_) && + (dmsg.message_number_ < message_number_)); + } + + int64_t delay_ms_; // for debugging + int64_t run_time_ms_; + // Monotonicaly incrementing number used for ordering of messages + // targeted to execute at the same time. + uint32_t message_number_; + Message msg_; + }; + class PriorityQueue : public std::priority_queue { public: container_type& container() { return c; } @@ -520,9 +541,9 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { bool fPeekKeep_; Message msgPeek_; - MessageList msgq_ RTC_GUARDED_BY(crit_); - PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_); - uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_); + MessageList messages_ RTC_GUARDED_BY(crit_); + PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); + uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_); CriticalSection crit_; bool fInitialized_; bool fDestroyed_; diff --git a/rtc_base/thread_message.h b/rtc_base/thread_message.h index 1f6af1a940..80824e29e5 100644 --- a/rtc_base/thread_message.h +++ b/rtc_base/thread_message.h @@ -101,8 +101,7 @@ const uint32_t MQID_DISPOSE = static_cast(-2); // No destructor struct Message { - Message() - : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {} + Message() : phandler(nullptr), message_id(0), pdata(nullptr) {} inline bool Match(MessageHandler* handler, uint32_t id) const { return (handler == nullptr || handler == phandler) && (id == MQID_ANY || id == message_id); @@ -111,31 +110,8 @@ struct Message { MessageHandler* phandler; uint32_t message_id; MessageData* pdata; - int64_t ts_sensitive; }; typedef std::list MessageList; - -// DelayedMessage goes into a priority queue, sorted by trigger time. Messages -// with the same trigger time are processed in num_ (FIFO) order. - -class DelayedMessage { - public: - DelayedMessage(int64_t delay, - int64_t trigger, - uint32_t num, - const Message& msg) - : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {} - - bool operator<(const DelayedMessage& dmsg) const { - return (dmsg.msTrigger_ < msTrigger_) || - ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); - } - - int64_t cmsDelay_; // for debugging - int64_t msTrigger_; - uint32_t num_; - Message msg_; -}; } // namespace rtc #endif // RTC_BASE_THREAD_MESSAGE_H_