Cleanup: Merges Thread and MessageQueue.
Since rtc::Thread is the only class inheriting from rtc::MessageQueue and most members of MessageQueue are public or protected the split is not adding much value. In preparation for future cleanup, this cl merges the two classes. Bug: webrtc:9883 Change-Id: Ia0efb4349f66f653aa34fa4d244998f187e3ce36 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165340 Commit-Queue: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Karl Wiberg <kwiberg@webrtc.org> Reviewed-by: Steve Anton <steveanton@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30235}
This commit is contained in:
parent
7d43801a07
commit
6ea2c6ae87
@ -845,7 +845,6 @@ rtc_library("rtc_base") {
|
|||||||
"message_digest.h",
|
"message_digest.h",
|
||||||
"message_handler.cc",
|
"message_handler.cc",
|
||||||
"message_handler.h",
|
"message_handler.h",
|
||||||
"message_queue.cc",
|
|
||||||
"message_queue.h",
|
"message_queue.h",
|
||||||
"net_helper.cc",
|
"net_helper.cc",
|
||||||
"net_helper.h",
|
"net_helper.h",
|
||||||
@ -909,6 +908,7 @@ rtc_library("rtc_base") {
|
|||||||
"stream.h",
|
"stream.h",
|
||||||
"thread.cc",
|
"thread.cc",
|
||||||
"thread.h",
|
"thread.h",
|
||||||
|
"thread_message.h",
|
||||||
"unique_id_generator.cc",
|
"unique_id_generator.cc",
|
||||||
"unique_id_generator.h",
|
"unique_id_generator.h",
|
||||||
]
|
]
|
||||||
|
|||||||
@ -23,14 +23,14 @@ AsyncInvoker::AsyncInvoker()
|
|||||||
AsyncInvoker::~AsyncInvoker() {
|
AsyncInvoker::~AsyncInvoker() {
|
||||||
destroying_.store(true, std::memory_order_relaxed);
|
destroying_.store(true, std::memory_order_relaxed);
|
||||||
// Messages for this need to be cleared *before* our destructor is complete.
|
// Messages for this need to be cleared *before* our destructor is complete.
|
||||||
MessageQueueManager::Clear(this);
|
ThreadManager::Clear(this);
|
||||||
// And we need to wait for any invocations that are still in progress on
|
// And we need to wait for any invocations that are still in progress on
|
||||||
// other threads. Using memory_order_acquire for synchronization with
|
// other threads. Using memory_order_acquire for synchronization with
|
||||||
// AsyncClosure destructors.
|
// AsyncClosure destructors.
|
||||||
while (pending_invocations_.load(std::memory_order_acquire) > 0) {
|
while (pending_invocations_.load(std::memory_order_acquire) > 0) {
|
||||||
// If the destructor was called while AsyncInvoke was being called by
|
// If the destructor was called while AsyncInvoke was being called by
|
||||||
// another thread, WITHIN an AsyncInvoked functor, it may do another
|
// another thread, WITHIN an AsyncInvoked functor, it may do another
|
||||||
// Thread::Post even after we called MessageQueueManager::Clear(this). So
|
// Thread::Post even after we called ThreadManager::Clear(this). So
|
||||||
// we need to keep calling Clear to discard these posts.
|
// we need to keep calling Clear to discard these posts.
|
||||||
Thread::Current()->Clear(this);
|
Thread::Current()->Clear(this);
|
||||||
invocation_complete_->Wait(Event::kForever);
|
invocation_complete_->Wait(Event::kForever);
|
||||||
@ -68,7 +68,7 @@ void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AsyncInvoker::Clear() {
|
void AsyncInvoker::Clear() {
|
||||||
MessageQueueManager::Clear(this);
|
ThreadManager::Clear(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncInvoker::DoInvoke(const Location& posted_from,
|
void AsyncInvoker::DoInvoke(const Location& posted_from,
|
||||||
|
|||||||
@ -11,7 +11,7 @@
|
|||||||
#include "rtc_base/fake_clock.h"
|
#include "rtc_base/fake_clock.h"
|
||||||
|
|
||||||
#include "rtc_base/checks.h"
|
#include "rtc_base/checks.h"
|
||||||
#include "rtc_base/message_queue.h"
|
#include "rtc_base/thread.h"
|
||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
@ -35,12 +35,12 @@ void ThreadProcessingFakeClock::SetTime(webrtc::Timestamp time) {
|
|||||||
clock_.SetTime(time);
|
clock_.SetTime(time);
|
||||||
// If message queues are waiting in a socket select() with a timeout provided
|
// If message queues are waiting in a socket select() with a timeout provided
|
||||||
// by the OS, they should wake up and dispatch all messages that are ready.
|
// by the OS, they should wake up and dispatch all messages that are ready.
|
||||||
MessageQueueManager::ProcessAllMessageQueuesForTesting();
|
ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadProcessingFakeClock::AdvanceTime(webrtc::TimeDelta delta) {
|
void ThreadProcessingFakeClock::AdvanceTime(webrtc::TimeDelta delta) {
|
||||||
clock_.AdvanceTime(delta);
|
clock_.AdvanceTime(delta);
|
||||||
MessageQueueManager::ProcessAllMessageQueuesForTesting();
|
ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
}
|
}
|
||||||
|
|
||||||
ScopedBaseFakeClock::ScopedBaseFakeClock() {
|
ScopedBaseFakeClock::ScopedBaseFakeClock() {
|
||||||
|
|||||||
@ -10,12 +10,12 @@
|
|||||||
|
|
||||||
#include "rtc_base/message_handler.h"
|
#include "rtc_base/message_handler.h"
|
||||||
|
|
||||||
#include "rtc_base/message_queue.h"
|
#include "rtc_base/thread.h"
|
||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
MessageHandler::~MessageHandler() {
|
MessageHandler::~MessageHandler() {
|
||||||
MessageQueueManager::Clear(this);
|
ThreadManager::Clear(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
|||||||
@ -1,523 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
|
|
||||||
*
|
|
||||||
* Use of this source code is governed by a BSD-style license
|
|
||||||
* that can be found in the LICENSE file in the root of the source
|
|
||||||
* tree. An additional intellectual property rights grant can be found
|
|
||||||
* in the file PATENTS. All contributing project authors may
|
|
||||||
* be found in the AUTHORS file in the root of the source tree.
|
|
||||||
*/
|
|
||||||
#include "rtc_base/message_queue.h"
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <utility>
|
|
||||||
|
|
||||||
#include "absl/algorithm/container.h"
|
|
||||||
#include "rtc_base/atomic_ops.h"
|
|
||||||
#include "rtc_base/checks.h"
|
|
||||||
#include "rtc_base/logging.h"
|
|
||||||
#include "rtc_base/thread.h"
|
|
||||||
#include "rtc_base/time_utils.h"
|
|
||||||
#include "rtc_base/trace_event.h"
|
|
||||||
|
|
||||||
namespace rtc {
|
|
||||||
namespace {
|
|
||||||
|
|
||||||
const int kMaxMsgLatency = 150; // 150 ms
|
|
||||||
const int kSlowDispatchLoggingThreshold = 50; // 50 ms
|
|
||||||
|
|
||||||
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
|
|
||||||
public:
|
|
||||||
MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
|
|
||||||
RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
|
|
||||||
: cs_(cs), processing_(processing) {
|
|
||||||
cs_->Enter();
|
|
||||||
*processing_ += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
|
|
||||||
*processing_ -= 1;
|
|
||||||
cs_->Leave();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
const CriticalSection* const cs_;
|
|
||||||
size_t* processing_;
|
|
||||||
|
|
||||||
RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
|
|
||||||
};
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
//------------------------------------------------------------------
|
|
||||||
// MessageQueueManager
|
|
||||||
|
|
||||||
MessageQueueManager* MessageQueueManager::Instance() {
|
|
||||||
static MessageQueueManager* const instance = new MessageQueueManager;
|
|
||||||
return instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
MessageQueueManager::MessageQueueManager() : processing_(0) {}
|
|
||||||
|
|
||||||
MessageQueueManager::~MessageQueueManager() {}
|
|
||||||
|
|
||||||
void MessageQueueManager::Add(MessageQueue* message_queue) {
|
|
||||||
return Instance()->AddInternal(message_queue);
|
|
||||||
}
|
|
||||||
void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
// Prevent changes while the list of message queues is processed.
|
|
||||||
RTC_DCHECK_EQ(processing_, 0);
|
|
||||||
message_queues_.push_back(message_queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueueManager::Remove(MessageQueue* message_queue) {
|
|
||||||
return Instance()->RemoveInternal(message_queue);
|
|
||||||
}
|
|
||||||
void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
|
|
||||||
{
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
// Prevent changes while the list of message queues is processed.
|
|
||||||
RTC_DCHECK_EQ(processing_, 0);
|
|
||||||
std::vector<MessageQueue*>::iterator iter;
|
|
||||||
iter = absl::c_find(message_queues_, message_queue);
|
|
||||||
if (iter != message_queues_.end()) {
|
|
||||||
message_queues_.erase(iter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueueManager::Clear(MessageHandler* handler) {
|
|
||||||
return Instance()->ClearInternal(handler);
|
|
||||||
}
|
|
||||||
void MessageQueueManager::ClearInternal(MessageHandler* handler) {
|
|
||||||
// Deleted objects may cause re-entrant calls to ClearInternal. This is
|
|
||||||
// allowed as the list of message queues does not change while queues are
|
|
||||||
// cleared.
|
|
||||||
MarkProcessingCritScope cs(&crit_, &processing_);
|
|
||||||
for (MessageQueue* queue : message_queues_) {
|
|
||||||
queue->Clear(handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
|
|
||||||
return Instance()->ProcessAllMessageQueuesInternal();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueueManager::ProcessAllMessageQueuesInternal() {
|
|
||||||
// This works by posting a delayed message at the current time and waiting
|
|
||||||
// for it to be dispatched on all queues, which will ensure that all messages
|
|
||||||
// that came before it were also dispatched.
|
|
||||||
volatile int queues_not_done = 0;
|
|
||||||
|
|
||||||
// This class is used so that whether the posted message is processed, or the
|
|
||||||
// message queue is simply cleared, queues_not_done gets decremented.
|
|
||||||
class ScopedIncrement : public MessageData {
|
|
||||||
public:
|
|
||||||
ScopedIncrement(volatile int* value) : value_(value) {
|
|
||||||
AtomicOps::Increment(value_);
|
|
||||||
}
|
|
||||||
~ScopedIncrement() override { AtomicOps::Decrement(value_); }
|
|
||||||
|
|
||||||
private:
|
|
||||||
volatile int* value_;
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
|
||||||
MarkProcessingCritScope cs(&crit_, &processing_);
|
|
||||||
for (MessageQueue* queue : message_queues_) {
|
|
||||||
if (!queue->IsProcessingMessagesForTesting()) {
|
|
||||||
// If the queue is not processing messages, it can
|
|
||||||
// be ignored. If we tried to post a message to it, it would be dropped
|
|
||||||
// or ignored.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
|
|
||||||
new ScopedIncrement(&queues_not_done));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rtc::Thread* current = rtc::Thread::Current();
|
|
||||||
// Note: One of the message queues may have been on this thread, which is
|
|
||||||
// why we can't synchronously wait for queues_not_done to go to 0; we need
|
|
||||||
// to process messages as well.
|
|
||||||
while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
|
|
||||||
if (current) {
|
|
||||||
current->ProcessMessages(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------
|
|
||||||
// MessageQueue
|
|
||||||
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
|
|
||||||
: fPeekKeep_(false),
|
|
||||||
dmsgq_next_num_(0),
|
|
||||||
fInitialized_(false),
|
|
||||||
fDestroyed_(false),
|
|
||||||
stop_(0),
|
|
||||||
ss_(ss) {
|
|
||||||
RTC_DCHECK(ss);
|
|
||||||
// Currently, MessageQueue holds a socket server, and is the base class for
|
|
||||||
// Thread. It seems like it makes more sense for Thread to hold the socket
|
|
||||||
// server, and provide it to the MessageQueue, since the Thread controls
|
|
||||||
// the I/O model, and MQ is agnostic to those details. Anyway, this causes
|
|
||||||
// messagequeue_unittest to depend on network libraries... yuck.
|
|
||||||
if (init_queue) {
|
|
||||||
DoInit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
|
|
||||||
: MessageQueue(ss.get(), init_queue) {
|
|
||||||
own_ss_ = std::move(ss);
|
|
||||||
}
|
|
||||||
|
|
||||||
MessageQueue::~MessageQueue() {
|
|
||||||
DoDestroy();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::DoInit() {
|
|
||||||
if (fInitialized_) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
fInitialized_ = true;
|
|
||||||
MessageQueueManager::Add(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::DoDestroy() {
|
|
||||||
if (fDestroyed_) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
fDestroyed_ = true;
|
|
||||||
// The signal is done from here to ensure
|
|
||||||
// that it always gets called when the queue
|
|
||||||
// is going away.
|
|
||||||
SignalQueueDestroyed();
|
|
||||||
MessageQueueManager::Remove(this);
|
|
||||||
ClearInternal(nullptr, MQID_ANY, nullptr);
|
|
||||||
|
|
||||||
if (ss_) {
|
|
||||||
ss_->SetMessageQueue(nullptr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SocketServer* MessageQueue::socketserver() {
|
|
||||||
return ss_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::WakeUpSocketServer() {
|
|
||||||
ss_->WakeUp();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::Quit() {
|
|
||||||
AtomicOps::ReleaseStore(&stop_, 1);
|
|
||||||
WakeUpSocketServer();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool MessageQueue::IsQuitting() {
|
|
||||||
return AtomicOps::AcquireLoad(&stop_) != 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool MessageQueue::IsProcessingMessagesForTesting() {
|
|
||||||
return !IsQuitting();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::Restart() {
|
|
||||||
AtomicOps::ReleaseStore(&stop_, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
|
|
||||||
if (fPeekKeep_) {
|
|
||||||
*pmsg = msgPeek_;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (!Get(pmsg, cmsWait))
|
|
||||||
return false;
|
|
||||||
msgPeek_ = *pmsg;
|
|
||||||
fPeekKeep_ = true;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
|
|
||||||
// Return and clear peek if present
|
|
||||||
// Always return the peek if it exists so there is Peek/Get symmetry
|
|
||||||
|
|
||||||
if (fPeekKeep_) {
|
|
||||||
*pmsg = msgPeek_;
|
|
||||||
fPeekKeep_ = false;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
|
|
||||||
|
|
||||||
int64_t cmsTotal = cmsWait;
|
|
||||||
int64_t cmsElapsed = 0;
|
|
||||||
int64_t msStart = TimeMillis();
|
|
||||||
int64_t msCurrent = msStart;
|
|
||||||
while (true) {
|
|
||||||
// Check for sent messages
|
|
||||||
ReceiveSends();
|
|
||||||
|
|
||||||
// Check for posted events
|
|
||||||
int64_t cmsDelayNext = kForever;
|
|
||||||
bool first_pass = true;
|
|
||||||
while (true) {
|
|
||||||
// All queue operations need to be locked, but nothing else in this loop
|
|
||||||
// (specifically handling disposed message) can happen inside the crit.
|
|
||||||
// Otherwise, disposed MessageHandlers will cause deadlocks.
|
|
||||||
{
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
// On the first pass, check for delayed messages that have been
|
|
||||||
// triggered and calculate the next trigger time.
|
|
||||||
if (first_pass) {
|
|
||||||
first_pass = false;
|
|
||||||
while (!dmsgq_.empty()) {
|
|
||||||
if (msCurrent < dmsgq_.top().msTrigger_) {
|
|
||||||
cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
msgq_.push_back(dmsgq_.top().msg_);
|
|
||||||
dmsgq_.pop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Pull a message off the message queue, if available.
|
|
||||||
if (msgq_.empty()) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
*pmsg = msgq_.front();
|
|
||||||
msgq_.pop_front();
|
|
||||||
}
|
|
||||||
} // crit_ is released here.
|
|
||||||
|
|
||||||
// Log a warning for time-sensitive messages that we're late to deliver.
|
|
||||||
if (pmsg->ts_sensitive) {
|
|
||||||
int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
|
|
||||||
if (delay > 0) {
|
|
||||||
RTC_LOG_F(LS_WARNING)
|
|
||||||
<< "id: " << pmsg->message_id
|
|
||||||
<< " delay: " << (delay + kMaxMsgLatency) << "ms";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If this was a dispose message, delete it and skip it.
|
|
||||||
if (MQID_DISPOSE == pmsg->message_id) {
|
|
||||||
RTC_DCHECK(nullptr == pmsg->phandler);
|
|
||||||
delete pmsg->pdata;
|
|
||||||
*pmsg = Message();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IsQuitting())
|
|
||||||
break;
|
|
||||||
|
|
||||||
// Which is shorter, the delay wait or the asked wait?
|
|
||||||
|
|
||||||
int64_t cmsNext;
|
|
||||||
if (cmsWait == kForever) {
|
|
||||||
cmsNext = cmsDelayNext;
|
|
||||||
} else {
|
|
||||||
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
|
|
||||||
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
|
|
||||||
cmsNext = cmsDelayNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// Wait and multiplex in the meantime
|
|
||||||
if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the specified timeout expired, return
|
|
||||||
|
|
||||||
msCurrent = TimeMillis();
|
|
||||||
cmsElapsed = TimeDiff(msCurrent, msStart);
|
|
||||||
if (cmsWait != kForever) {
|
|
||||||
if (cmsElapsed >= cmsWait)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::ReceiveSends() {}
|
|
||||||
|
|
||||||
void MessageQueue::Post(const Location& posted_from,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata,
|
|
||||||
bool time_sensitive) {
|
|
||||||
if (IsQuitting()) {
|
|
||||||
delete pdata;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep thread safe
|
|
||||||
// Add the message to the end of the queue
|
|
||||||
// Signal for the multiplexer to return
|
|
||||||
|
|
||||||
{
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
Message msg;
|
|
||||||
msg.posted_from = posted_from;
|
|
||||||
msg.phandler = phandler;
|
|
||||||
msg.message_id = id;
|
|
||||||
msg.pdata = pdata;
|
|
||||||
if (time_sensitive) {
|
|
||||||
msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
|
|
||||||
}
|
|
||||||
msgq_.push_back(msg);
|
|
||||||
}
|
|
||||||
WakeUpSocketServer();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::PostDelayed(const Location& posted_from,
|
|
||||||
int cmsDelay,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata) {
|
|
||||||
return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
|
|
||||||
pdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::PostAt(const Location& posted_from,
|
|
||||||
uint32_t tstamp,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata) {
|
|
||||||
// This should work even if it is used (unexpectedly).
|
|
||||||
int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
|
|
||||||
return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::PostAt(const Location& posted_from,
|
|
||||||
int64_t tstamp,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata) {
|
|
||||||
return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
|
|
||||||
pdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::DoDelayPost(const Location& posted_from,
|
|
||||||
int64_t cmsDelay,
|
|
||||||
int64_t tstamp,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata) {
|
|
||||||
if (IsQuitting()) {
|
|
||||||
delete pdata;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep thread safe
|
|
||||||
// Add to the priority queue. Gets sorted soonest first.
|
|
||||||
// Signal for the multiplexer to return.
|
|
||||||
|
|
||||||
{
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
Message msg;
|
|
||||||
msg.posted_from = posted_from;
|
|
||||||
msg.phandler = phandler;
|
|
||||||
msg.message_id = id;
|
|
||||||
msg.pdata = pdata;
|
|
||||||
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
|
|
||||||
dmsgq_.push(dmsg);
|
|
||||||
// If this message queue processes 1 message every millisecond for 50 days,
|
|
||||||
// we will wrap this number. Even then, only messages with identical times
|
|
||||||
// will be misordered, and then only briefly. This is probably ok.
|
|
||||||
++dmsgq_next_num_;
|
|
||||||
RTC_DCHECK_NE(0, dmsgq_next_num_);
|
|
||||||
}
|
|
||||||
WakeUpSocketServer();
|
|
||||||
}
|
|
||||||
|
|
||||||
int MessageQueue::GetDelay() {
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
|
|
||||||
if (!msgq_.empty())
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
if (!dmsgq_.empty()) {
|
|
||||||
int delay = TimeUntil(dmsgq_.top().msTrigger_);
|
|
||||||
if (delay < 0)
|
|
||||||
delay = 0;
|
|
||||||
return delay;
|
|
||||||
}
|
|
||||||
|
|
||||||
return kForever;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::Clear(MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageList* removed) {
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
ClearInternal(phandler, id, removed);
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::ClearInternal(MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageList* removed) {
|
|
||||||
// Remove messages with phandler
|
|
||||||
|
|
||||||
if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
|
|
||||||
if (removed) {
|
|
||||||
removed->push_back(msgPeek_);
|
|
||||||
} else {
|
|
||||||
delete msgPeek_.pdata;
|
|
||||||
}
|
|
||||||
fPeekKeep_ = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from ordered message queue
|
|
||||||
|
|
||||||
for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
|
|
||||||
if (it->Match(phandler, id)) {
|
|
||||||
if (removed) {
|
|
||||||
removed->push_back(*it);
|
|
||||||
} else {
|
|
||||||
delete it->pdata;
|
|
||||||
}
|
|
||||||
it = msgq_.erase(it);
|
|
||||||
} else {
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from priority queue. Not directly iterable, so use this approach
|
|
||||||
|
|
||||||
PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
|
|
||||||
for (PriorityQueue::container_type::iterator it = new_end;
|
|
||||||
it != dmsgq_.container().end(); ++it) {
|
|
||||||
if (it->msg_.Match(phandler, id)) {
|
|
||||||
if (removed) {
|
|
||||||
removed->push_back(it->msg_);
|
|
||||||
} else {
|
|
||||||
delete it->msg_.pdata;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
*new_end++ = *it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dmsgq_.container().erase(new_end, dmsgq_.container().end());
|
|
||||||
dmsgq_.reheap();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageQueue::Dispatch(Message* pmsg) {
|
|
||||||
TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file",
|
|
||||||
pmsg->posted_from.file_name(), "src_func",
|
|
||||||
pmsg->posted_from.function_name());
|
|
||||||
int64_t start_time = TimeMillis();
|
|
||||||
pmsg->phandler->OnMessage(pmsg);
|
|
||||||
int64_t end_time = TimeMillis();
|
|
||||||
int64_t diff = TimeDiff(end_time, start_time);
|
|
||||||
if (diff >= kSlowDispatchLoggingThreshold) {
|
|
||||||
RTC_LOG(LS_INFO) << "Message took " << diff
|
|
||||||
<< "ms to dispatch. Posted from: "
|
|
||||||
<< pmsg->posted_from.ToString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace rtc
|
|
||||||
@ -11,324 +11,8 @@
|
|||||||
#ifndef RTC_BASE_MESSAGE_QUEUE_H_
|
#ifndef RTC_BASE_MESSAGE_QUEUE_H_
|
||||||
#define RTC_BASE_MESSAGE_QUEUE_H_
|
#define RTC_BASE_MESSAGE_QUEUE_H_
|
||||||
|
|
||||||
#include <string.h>
|
// TODO(srte): Remove this file when all dependencies has been updated.
|
||||||
|
|
||||||
#include <algorithm>
|
#include "rtc_base/thread.h"
|
||||||
#include <list>
|
|
||||||
#include <memory>
|
|
||||||
#include <queue>
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "api/scoped_refptr.h"
|
|
||||||
#include "rtc_base/constructor_magic.h"
|
|
||||||
#include "rtc_base/critical_section.h"
|
|
||||||
#include "rtc_base/location.h"
|
|
||||||
#include "rtc_base/message_handler.h"
|
|
||||||
#include "rtc_base/socket_server.h"
|
|
||||||
#include "rtc_base/system/rtc_export.h"
|
|
||||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
|
||||||
#include "rtc_base/thread_annotations.h"
|
|
||||||
|
|
||||||
namespace rtc {
|
|
||||||
|
|
||||||
struct Message;
|
|
||||||
class MessageQueue;
|
|
||||||
|
|
||||||
// MessageQueueManager does cleanup of of message queues
|
|
||||||
|
|
||||||
class RTC_EXPORT MessageQueueManager {
|
|
||||||
public:
|
|
||||||
static void Add(MessageQueue* message_queue);
|
|
||||||
static void Remove(MessageQueue* message_queue);
|
|
||||||
static void Clear(MessageHandler* handler);
|
|
||||||
|
|
||||||
// TODO(nisse): Delete alias, as soon as downstream code is updated.
|
|
||||||
static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }
|
|
||||||
|
|
||||||
// For testing purposes, for use with a simulated clock.
|
|
||||||
// Ensures that all message queues have processed delayed messages
|
|
||||||
// up until the current point in time.
|
|
||||||
static void ProcessAllMessageQueuesForTesting();
|
|
||||||
|
|
||||||
private:
|
|
||||||
static MessageQueueManager* Instance();
|
|
||||||
|
|
||||||
MessageQueueManager();
|
|
||||||
~MessageQueueManager();
|
|
||||||
|
|
||||||
void AddInternal(MessageQueue* message_queue);
|
|
||||||
void RemoveInternal(MessageQueue* message_queue);
|
|
||||||
void ClearInternal(MessageHandler* handler);
|
|
||||||
void ProcessAllMessageQueuesInternal();
|
|
||||||
|
|
||||||
// This list contains all live MessageQueues.
|
|
||||||
std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);
|
|
||||||
|
|
||||||
// Methods that don't modify the list of message queues may be called in a
|
|
||||||
// re-entrant fashion. "processing_" keeps track of the depth of re-entrant
|
|
||||||
// calls.
|
|
||||||
CriticalSection crit_;
|
|
||||||
size_t processing_ RTC_GUARDED_BY(crit_);
|
|
||||||
};
|
|
||||||
|
|
||||||
// Derive from this for specialized data
|
|
||||||
// App manages lifetime, except when messages are purged
|
|
||||||
|
|
||||||
class MessageData {
|
|
||||||
public:
|
|
||||||
MessageData() {}
|
|
||||||
virtual ~MessageData() {}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <class T>
|
|
||||||
class TypedMessageData : public MessageData {
|
|
||||||
public:
|
|
||||||
explicit TypedMessageData(const T& data) : data_(data) {}
|
|
||||||
const T& data() const { return data_; }
|
|
||||||
T& data() { return data_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
T data_;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Like TypedMessageData, but for pointers that require a delete.
|
|
||||||
template <class T>
|
|
||||||
class ScopedMessageData : public MessageData {
|
|
||||||
public:
|
|
||||||
explicit ScopedMessageData(std::unique_ptr<T> data)
|
|
||||||
: data_(std::move(data)) {}
|
|
||||||
// Deprecated.
|
|
||||||
// TODO(deadbeef): Remove this once downstream applications stop using it.
|
|
||||||
explicit ScopedMessageData(T* data) : data_(data) {}
|
|
||||||
// Deprecated.
|
|
||||||
// TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
|
|
||||||
// this once downstream applications stop using it, then rename inner_data to
|
|
||||||
// just data.
|
|
||||||
const std::unique_ptr<T>& data() const { return data_; }
|
|
||||||
std::unique_ptr<T>& data() { return data_; }
|
|
||||||
|
|
||||||
const T& inner_data() const { return *data_; }
|
|
||||||
T& inner_data() { return *data_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::unique_ptr<T> data_;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Like ScopedMessageData, but for reference counted pointers.
|
|
||||||
template <class T>
|
|
||||||
class ScopedRefMessageData : public MessageData {
|
|
||||||
public:
|
|
||||||
explicit ScopedRefMessageData(T* data) : data_(data) {}
|
|
||||||
const scoped_refptr<T>& data() const { return data_; }
|
|
||||||
scoped_refptr<T>& data() { return data_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
scoped_refptr<T> data_;
|
|
||||||
};
|
|
||||||
|
|
||||||
template <class T>
|
|
||||||
inline MessageData* WrapMessageData(const T& data) {
|
|
||||||
return new TypedMessageData<T>(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class T>
|
|
||||||
inline const T& UseMessageData(MessageData* data) {
|
|
||||||
return static_cast<TypedMessageData<T>*>(data)->data();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class T>
|
|
||||||
class DisposeData : public MessageData {
|
|
||||||
public:
|
|
||||||
explicit DisposeData(T* data) : data_(data) {}
|
|
||||||
virtual ~DisposeData() { delete data_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
T* data_;
|
|
||||||
};
|
|
||||||
|
|
||||||
const uint32_t MQID_ANY = static_cast<uint32_t>(-1);
|
|
||||||
const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2);
|
|
||||||
|
|
||||||
// No destructor
|
|
||||||
|
|
||||||
struct Message {
|
|
||||||
Message()
|
|
||||||
: phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
|
|
||||||
inline bool Match(MessageHandler* handler, uint32_t id) const {
|
|
||||||
return (handler == nullptr || handler == phandler) &&
|
|
||||||
(id == MQID_ANY || id == message_id);
|
|
||||||
}
|
|
||||||
Location posted_from;
|
|
||||||
MessageHandler* phandler;
|
|
||||||
uint32_t message_id;
|
|
||||||
MessageData* pdata;
|
|
||||||
int64_t ts_sensitive;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef std::list<Message> MessageList;
|
|
||||||
|
|
||||||
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
|
|
||||||
// with the same trigger time are processed in num_ (FIFO) order.
|
|
||||||
|
|
||||||
class DelayedMessage {
|
|
||||||
public:
|
|
||||||
DelayedMessage(int64_t delay,
|
|
||||||
int64_t trigger,
|
|
||||||
uint32_t num,
|
|
||||||
const Message& msg)
|
|
||||||
: cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
|
|
||||||
|
|
||||||
bool operator<(const DelayedMessage& dmsg) const {
|
|
||||||
return (dmsg.msTrigger_ < msTrigger_) ||
|
|
||||||
((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t cmsDelay_; // for debugging
|
|
||||||
int64_t msTrigger_;
|
|
||||||
uint32_t num_;
|
|
||||||
Message msg_;
|
|
||||||
};
|
|
||||||
|
|
||||||
class RTC_EXPORT MessageQueue {
|
|
||||||
public:
|
|
||||||
static const int kForever = -1;
|
|
||||||
|
|
||||||
// Create a new MessageQueue and optionally assign it to the passed
|
|
||||||
// SocketServer. Subclasses that override Clear should pass false for
|
|
||||||
// init_queue and call DoInit() from their constructor to prevent races
|
|
||||||
// with the MessageQueueManager using the object while the vtable is still
|
|
||||||
// being created.
|
|
||||||
MessageQueue(SocketServer* ss, bool init_queue);
|
|
||||||
MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue);
|
|
||||||
|
|
||||||
// NOTE: SUBCLASSES OF MessageQueue THAT OVERRIDE Clear MUST CALL
|
|
||||||
// DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race
|
|
||||||
// between the destructor modifying the vtable, and the MessageQueueManager
|
|
||||||
// calling Clear on the object from a different thread.
|
|
||||||
virtual ~MessageQueue();
|
|
||||||
|
|
||||||
SocketServer* socketserver();
|
|
||||||
|
|
||||||
// Note: The behavior of MessageQueue has changed. When a MQ is stopped,
|
|
||||||
// futher Posts and Sends will fail. However, any pending Sends and *ready*
|
|
||||||
// Posts (as opposed to unexpired delayed Posts) will be delivered before
|
|
||||||
// Get (or Peek) returns false. By guaranteeing delivery of those messages,
|
|
||||||
// we eliminate the race condition when an MessageHandler and MessageQueue
|
|
||||||
// may be destroyed independently of each other.
|
|
||||||
virtual void Quit();
|
|
||||||
virtual bool IsQuitting();
|
|
||||||
virtual void Restart();
|
|
||||||
// Not all message queues actually process messages (such as SignalThread).
|
|
||||||
// In those cases, it's important to know, before posting, that it won't be
|
|
||||||
// Processed. Normally, this would be true until IsQuitting() is true.
|
|
||||||
virtual bool IsProcessingMessagesForTesting();
|
|
||||||
|
|
||||||
// Get() will process I/O until:
|
|
||||||
// 1) A message is available (returns true)
|
|
||||||
// 2) cmsWait seconds have elapsed (returns false)
|
|
||||||
// 3) Stop() is called (returns false)
|
|
||||||
virtual bool Get(Message* pmsg,
|
|
||||||
int cmsWait = kForever,
|
|
||||||
bool process_io = true);
|
|
||||||
virtual bool Peek(Message* pmsg, int cmsWait = 0);
|
|
||||||
virtual void Post(const Location& posted_from,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id = 0,
|
|
||||||
MessageData* pdata = nullptr,
|
|
||||||
bool time_sensitive = false);
|
|
||||||
virtual void PostDelayed(const Location& posted_from,
|
|
||||||
int cmsDelay,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id = 0,
|
|
||||||
MessageData* pdata = nullptr);
|
|
||||||
virtual void PostAt(const Location& posted_from,
|
|
||||||
int64_t tstamp,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id = 0,
|
|
||||||
MessageData* pdata = nullptr);
|
|
||||||
// TODO(honghaiz): Remove this when all the dependencies are removed.
|
|
||||||
virtual void PostAt(const Location& posted_from,
|
|
||||||
uint32_t tstamp,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id = 0,
|
|
||||||
MessageData* pdata = nullptr);
|
|
||||||
virtual void Clear(MessageHandler* phandler,
|
|
||||||
uint32_t id = MQID_ANY,
|
|
||||||
MessageList* removed = nullptr);
|
|
||||||
virtual void Dispatch(Message* pmsg);
|
|
||||||
virtual void ReceiveSends();
|
|
||||||
|
|
||||||
// Amount of time until the next message can be retrieved
|
|
||||||
virtual int GetDelay();
|
|
||||||
|
|
||||||
bool empty() const { return size() == 0u; }
|
|
||||||
size_t size() const {
|
|
||||||
CritScope cs(&crit_); // msgq_.size() is not thread safe.
|
|
||||||
return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Internally posts a message which causes the doomed object to be deleted
|
|
||||||
template <class T>
|
|
||||||
void Dispose(T* doomed) {
|
|
||||||
if (doomed) {
|
|
||||||
Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// When this signal is sent out, any references to this queue should
|
|
||||||
// no longer be used.
|
|
||||||
sigslot::signal0<> SignalQueueDestroyed;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
class PriorityQueue : public std::priority_queue<DelayedMessage> {
|
|
||||||
public:
|
|
||||||
container_type& container() { return c; }
|
|
||||||
void reheap() { make_heap(c.begin(), c.end(), comp); }
|
|
||||||
};
|
|
||||||
|
|
||||||
void DoDelayPost(const Location& posted_from,
|
|
||||||
int64_t cmsDelay,
|
|
||||||
int64_t tstamp,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata);
|
|
||||||
|
|
||||||
// Perform initialization, subclasses must call this from their constructor
|
|
||||||
// if false was passed as init_queue to the MessageQueue constructor.
|
|
||||||
void DoInit();
|
|
||||||
|
|
||||||
// Does not take any lock. Must be called either while holding crit_, or by
|
|
||||||
// the destructor (by definition, the latter has exclusive access).
|
|
||||||
void ClearInternal(MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
|
|
||||||
|
|
||||||
// Perform cleanup; subclasses must call this from the destructor,
|
|
||||||
// and are not expected to actually hold the lock.
|
|
||||||
void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
|
|
||||||
|
|
||||||
void WakeUpSocketServer();
|
|
||||||
|
|
||||||
bool fPeekKeep_;
|
|
||||||
Message msgPeek_;
|
|
||||||
MessageList msgq_ RTC_GUARDED_BY(crit_);
|
|
||||||
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
|
|
||||||
uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
|
|
||||||
CriticalSection crit_;
|
|
||||||
bool fInitialized_;
|
|
||||||
bool fDestroyed_;
|
|
||||||
|
|
||||||
private:
|
|
||||||
volatile int stop_;
|
|
||||||
|
|
||||||
// The SocketServer might not be owned by MessageQueue.
|
|
||||||
SocketServer* const ss_;
|
|
||||||
// Used if SocketServer ownership lies with |this|.
|
|
||||||
std::unique_ptr<SocketServer> own_ss_;
|
|
||||||
|
|
||||||
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue);
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace rtc
|
|
||||||
|
|
||||||
#endif // RTC_BASE_MESSAGE_QUEUE_H_
|
#endif // RTC_BASE_MESSAGE_QUEUE_H_
|
||||||
|
|||||||
@ -8,7 +8,7 @@
|
|||||||
* be found in the AUTHORS file in the root of the source tree.
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "rtc_base/message_queue.h"
|
#include "rtc_base/thread.h"
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
@ -29,14 +29,14 @@ namespace {
|
|||||||
|
|
||||||
using ::webrtc::ToQueuedTask;
|
using ::webrtc::ToQueuedTask;
|
||||||
|
|
||||||
class MessageQueueTest : public ::testing::Test, public MessageQueue {
|
class MessageQueueTest : public ::testing::Test, public Thread {
|
||||||
public:
|
public:
|
||||||
MessageQueueTest() : MessageQueue(SocketServer::CreateDefault(), true) {}
|
MessageQueueTest() : Thread(SocketServer::CreateDefault(), true) {}
|
||||||
bool IsLocked_Worker() {
|
bool IsLocked_Worker() {
|
||||||
if (!crit_.TryEnter()) {
|
if (!CritForTest()->TryEnter()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
crit_.Leave();
|
CritForTest()->Leave();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
bool IsLocked() {
|
bool IsLocked() {
|
||||||
@ -61,8 +61,7 @@ struct DeletedLockChecker {
|
|||||||
bool* deleted;
|
bool* deleted;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
|
static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) {
|
||||||
MessageQueue* q) {
|
|
||||||
EXPECT_TRUE(q != nullptr);
|
EXPECT_TRUE(q != nullptr);
|
||||||
int64_t now = TimeMillis();
|
int64_t now = TimeMillis();
|
||||||
q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
|
q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
|
||||||
@ -83,11 +82,11 @@ static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
|
|||||||
|
|
||||||
TEST_F(MessageQueueTest,
|
TEST_F(MessageQueueTest,
|
||||||
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
|
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
|
||||||
MessageQueue q(SocketServer::CreateDefault(), true);
|
Thread q(SocketServer::CreateDefault(), true);
|
||||||
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
|
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
|
||||||
|
|
||||||
NullSocketServer nullss;
|
NullSocketServer nullss;
|
||||||
MessageQueue q_nullss(&nullss, true);
|
Thread q_nullss(&nullss, true);
|
||||||
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
|
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,7 +126,7 @@ TEST_F(MessageQueueTest, DiposeHandlerWithPostedMessagePending) {
|
|||||||
// Ensure that ProcessAllMessageQueues does its essential function; process
|
// Ensure that ProcessAllMessageQueues does its essential function; process
|
||||||
// all messages (both delayed and non delayed) up until the current time, on
|
// all messages (both delayed and non delayed) up until the current time, on
|
||||||
// all registered message queues.
|
// all registered message queues.
|
||||||
TEST(MessageQueueManager, ProcessAllMessageQueues) {
|
TEST(ThreadManager, ProcessAllMessageQueues) {
|
||||||
Event entered_process_all_message_queues(true, false);
|
Event entered_process_all_message_queues(true, false);
|
||||||
auto a = Thread::CreateWithSocketServer();
|
auto a = Thread::CreateWithSocketServer();
|
||||||
auto b = Thread::CreateWithSocketServer();
|
auto b = Thread::CreateWithSocketServer();
|
||||||
@ -155,21 +154,21 @@ TEST(MessageQueueManager, ProcessAllMessageQueues) {
|
|||||||
b->PostDelayedTask(ToQueuedTask(incrementer), 0);
|
b->PostDelayedTask(ToQueuedTask(incrementer), 0);
|
||||||
rtc::Thread::Current()->PostTask(ToQueuedTask(event_signaler));
|
rtc::Thread::Current()->PostTask(ToQueuedTask(event_signaler));
|
||||||
|
|
||||||
MessageQueueManager::ProcessAllMessageQueuesForTesting();
|
ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
|
EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
|
// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
|
||||||
TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
|
TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) {
|
||||||
auto t = Thread::CreateWithSocketServer();
|
auto t = Thread::CreateWithSocketServer();
|
||||||
t->Start();
|
t->Start();
|
||||||
t->Quit();
|
t->Quit();
|
||||||
MessageQueueManager::ProcessAllMessageQueuesForTesting();
|
ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
|
// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
|
||||||
// messages.
|
// messages.
|
||||||
TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
|
TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) {
|
||||||
Event entered_process_all_message_queues(true, false);
|
Event entered_process_all_message_queues(true, false);
|
||||||
auto t = Thread::CreateWithSocketServer();
|
auto t = Thread::CreateWithSocketServer();
|
||||||
t->Start();
|
t->Start();
|
||||||
@ -189,7 +188,7 @@ TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
|
|||||||
// Post messages (both delayed and non delayed) to both threads.
|
// Post messages (both delayed and non delayed) to both threads.
|
||||||
t->PostTask(RTC_FROM_HERE, clearer);
|
t->PostTask(RTC_FROM_HERE, clearer);
|
||||||
rtc::Thread::Current()->PostTask(RTC_FROM_HERE, event_signaler);
|
rtc::Thread::Current()->PostTask(RTC_FROM_HERE, event_signaler);
|
||||||
MessageQueueManager::ProcessAllMessageQueuesForTesting();
|
ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
}
|
}
|
||||||
|
|
||||||
class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface {
|
class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface {
|
||||||
@ -202,7 +201,7 @@ class EmptyHandler : public MessageHandler {
|
|||||||
void OnMessage(Message* msg) override {}
|
void OnMessage(Message* msg) override {}
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST(MessageQueueManager, ClearReentrant) {
|
TEST(ThreadManager, ClearReentrant) {
|
||||||
std::unique_ptr<Thread> t(Thread::Create());
|
std::unique_ptr<Thread> t(Thread::Create());
|
||||||
EmptyHandler handler;
|
EmptyHandler handler;
|
||||||
RefCountedHandler* inner_handler(
|
RefCountedHandler* inner_handler(
|
||||||
|
|||||||
@ -28,6 +28,8 @@
|
|||||||
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
#include "absl/algorithm/container.h"
|
||||||
|
#include "rtc_base/atomic_ops.h"
|
||||||
#include "rtc_base/checks.h"
|
#include "rtc_base/checks.h"
|
||||||
#include "rtc_base/critical_section.h"
|
#include "rtc_base/critical_section.h"
|
||||||
#include "rtc_base/logging.h"
|
#include "rtc_base/logging.h"
|
||||||
@ -65,6 +67,9 @@ class ScopedAutoReleasePool {
|
|||||||
namespace rtc {
|
namespace rtc {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
const int kMaxMsgLatency = 150; // 150 ms
|
||||||
|
const int kSlowDispatchLoggingThreshold = 50; // 50 ms
|
||||||
|
|
||||||
class MessageHandlerWithTask final : public MessageHandler {
|
class MessageHandlerWithTask final : public MessageHandler {
|
||||||
public:
|
public:
|
||||||
MessageHandlerWithTask() = default;
|
MessageHandlerWithTask() = default;
|
||||||
@ -80,6 +85,27 @@ class MessageHandlerWithTask final : public MessageHandler {
|
|||||||
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask);
|
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
|
||||||
|
public:
|
||||||
|
MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
|
||||||
|
RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
|
||||||
|
: cs_(cs), processing_(processing) {
|
||||||
|
cs_->Enter();
|
||||||
|
*processing_ += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
|
||||||
|
*processing_ -= 1;
|
||||||
|
cs_->Leave();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const CriticalSection* const cs_;
|
||||||
|
size_t* processing_;
|
||||||
|
|
||||||
|
RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
ThreadManager* ThreadManager::Instance() {
|
ThreadManager* ThreadManager::Instance() {
|
||||||
@ -92,6 +118,97 @@ ThreadManager::~ThreadManager() {
|
|||||||
RTC_NOTREACHED() << "ThreadManager should never be destructed.";
|
RTC_NOTREACHED() << "ThreadManager should never be destructed.";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// static
|
||||||
|
void ThreadManager::Add(Thread* message_queue) {
|
||||||
|
return Instance()->AddInternal(message_queue);
|
||||||
|
}
|
||||||
|
void ThreadManager::AddInternal(Thread* message_queue) {
|
||||||
|
CritScope cs(&crit_);
|
||||||
|
// Prevent changes while the list of message queues is processed.
|
||||||
|
RTC_DCHECK_EQ(processing_, 0);
|
||||||
|
message_queues_.push_back(message_queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
// static
|
||||||
|
void ThreadManager::Remove(Thread* message_queue) {
|
||||||
|
return Instance()->RemoveInternal(message_queue);
|
||||||
|
}
|
||||||
|
void ThreadManager::RemoveInternal(Thread* message_queue) {
|
||||||
|
{
|
||||||
|
CritScope cs(&crit_);
|
||||||
|
// Prevent changes while the list of message queues is processed.
|
||||||
|
RTC_DCHECK_EQ(processing_, 0);
|
||||||
|
std::vector<Thread*>::iterator iter;
|
||||||
|
iter = absl::c_find(message_queues_, message_queue);
|
||||||
|
if (iter != message_queues_.end()) {
|
||||||
|
message_queues_.erase(iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// static
|
||||||
|
void ThreadManager::Clear(MessageHandler* handler) {
|
||||||
|
return Instance()->ClearInternal(handler);
|
||||||
|
}
|
||||||
|
void ThreadManager::ClearInternal(MessageHandler* handler) {
|
||||||
|
// Deleted objects may cause re-entrant calls to ClearInternal. This is
|
||||||
|
// allowed as the list of message queues does not change while queues are
|
||||||
|
// cleared.
|
||||||
|
MarkProcessingCritScope cs(&crit_, &processing_);
|
||||||
|
for (Thread* queue : message_queues_) {
|
||||||
|
queue->Clear(handler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// static
|
||||||
|
void ThreadManager::ProcessAllMessageQueuesForTesting() {
|
||||||
|
return Instance()->ProcessAllMessageQueuesInternal();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadManager::ProcessAllMessageQueuesInternal() {
|
||||||
|
// This works by posting a delayed message at the current time and waiting
|
||||||
|
// for it to be dispatched on all queues, which will ensure that all messages
|
||||||
|
// that came before it were also dispatched.
|
||||||
|
volatile int queues_not_done = 0;
|
||||||
|
|
||||||
|
// This class is used so that whether the posted message is processed, or the
|
||||||
|
// message queue is simply cleared, queues_not_done gets decremented.
|
||||||
|
class ScopedIncrement : public MessageData {
|
||||||
|
public:
|
||||||
|
ScopedIncrement(volatile int* value) : value_(value) {
|
||||||
|
AtomicOps::Increment(value_);
|
||||||
|
}
|
||||||
|
~ScopedIncrement() override { AtomicOps::Decrement(value_); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
volatile int* value_;
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
MarkProcessingCritScope cs(&crit_, &processing_);
|
||||||
|
for (Thread* queue : message_queues_) {
|
||||||
|
if (!queue->IsProcessingMessagesForTesting()) {
|
||||||
|
// If the queue is not processing messages, it can
|
||||||
|
// be ignored. If we tried to post a message to it, it would be dropped
|
||||||
|
// or ignored.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
|
||||||
|
new ScopedIncrement(&queues_not_done));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rtc::Thread* current = rtc::Thread::Current();
|
||||||
|
// Note: One of the message queues may have been on this thread, which is
|
||||||
|
// why we can't synchronously wait for queues_not_done to go to 0; we need
|
||||||
|
// to process messages as well.
|
||||||
|
while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
|
||||||
|
if (current) {
|
||||||
|
current->ProcessMessages(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// static
|
// static
|
||||||
Thread* Thread::Current() {
|
Thread* Thread::Current() {
|
||||||
ThreadManager* manager = ThreadManager::Instance();
|
ThreadManager* manager = ThreadManager::Instance();
|
||||||
@ -180,8 +297,14 @@ Thread::Thread(std::unique_ptr<SocketServer> ss)
|
|||||||
: Thread(std::move(ss), /*do_init=*/true) {}
|
: Thread(std::move(ss), /*do_init=*/true) {}
|
||||||
|
|
||||||
Thread::Thread(SocketServer* ss, bool do_init)
|
Thread::Thread(SocketServer* ss, bool do_init)
|
||||||
: MessageQueue(ss, /*do_init=*/false) {
|
: fPeekKeep_(false),
|
||||||
socketserver()->SetMessageQueue(this);
|
dmsgq_next_num_(0),
|
||||||
|
fInitialized_(false),
|
||||||
|
fDestroyed_(false),
|
||||||
|
stop_(0),
|
||||||
|
ss_(ss) {
|
||||||
|
RTC_DCHECK(ss);
|
||||||
|
ss_->SetMessageQueue(this);
|
||||||
SetName("Thread", this); // default name
|
SetName("Thread", this); // default name
|
||||||
if (do_init) {
|
if (do_init) {
|
||||||
DoInit();
|
DoInit();
|
||||||
@ -189,12 +312,8 @@ Thread::Thread(SocketServer* ss, bool do_init)
|
|||||||
}
|
}
|
||||||
|
|
||||||
Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
|
Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
|
||||||
: MessageQueue(std::move(ss), false) {
|
: Thread(ss.get(), do_init) {
|
||||||
socketserver()->SetMessageQueue(this);
|
own_ss_ = std::move(ss);
|
||||||
SetName("Thread", this); // default name
|
|
||||||
if (do_init) {
|
|
||||||
DoInit();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread::~Thread() {
|
Thread::~Thread() {
|
||||||
@ -202,6 +321,337 @@ Thread::~Thread() {
|
|||||||
DoDestroy();
|
DoDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Thread::DoInit() {
|
||||||
|
if (fInitialized_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
fInitialized_ = true;
|
||||||
|
ThreadManager::Add(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::DoDestroy() {
|
||||||
|
if (fDestroyed_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
fDestroyed_ = true;
|
||||||
|
// The signal is done from here to ensure
|
||||||
|
// that it always gets called when the queue
|
||||||
|
// is going away.
|
||||||
|
SignalQueueDestroyed();
|
||||||
|
ThreadManager::Remove(this);
|
||||||
|
ClearInternal(nullptr, MQID_ANY, nullptr);
|
||||||
|
|
||||||
|
if (ss_) {
|
||||||
|
ss_->SetMessageQueue(nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SocketServer* Thread::socketserver() {
|
||||||
|
return ss_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::WakeUpSocketServer() {
|
||||||
|
ss_->WakeUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::Quit() {
|
||||||
|
AtomicOps::ReleaseStore(&stop_, 1);
|
||||||
|
WakeUpSocketServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Thread::IsQuitting() {
|
||||||
|
return AtomicOps::AcquireLoad(&stop_) != 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::Restart() {
|
||||||
|
AtomicOps::ReleaseStore(&stop_, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Thread::Peek(Message* pmsg, int cmsWait) {
|
||||||
|
if (fPeekKeep_) {
|
||||||
|
*pmsg = msgPeek_;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!Get(pmsg, cmsWait))
|
||||||
|
return false;
|
||||||
|
msgPeek_ = *pmsg;
|
||||||
|
fPeekKeep_ = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
|
||||||
|
// Return and clear peek if present
|
||||||
|
// Always return the peek if it exists so there is Peek/Get symmetry
|
||||||
|
|
||||||
|
if (fPeekKeep_) {
|
||||||
|
*pmsg = msgPeek_;
|
||||||
|
fPeekKeep_ = false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
|
||||||
|
|
||||||
|
int64_t cmsTotal = cmsWait;
|
||||||
|
int64_t cmsElapsed = 0;
|
||||||
|
int64_t msStart = TimeMillis();
|
||||||
|
int64_t msCurrent = msStart;
|
||||||
|
while (true) {
|
||||||
|
// Check for sent messages
|
||||||
|
ReceiveSends();
|
||||||
|
|
||||||
|
// Check for posted events
|
||||||
|
int64_t cmsDelayNext = kForever;
|
||||||
|
bool first_pass = true;
|
||||||
|
while (true) {
|
||||||
|
// All queue operations need to be locked, but nothing else in this loop
|
||||||
|
// (specifically handling disposed message) can happen inside the crit.
|
||||||
|
// Otherwise, disposed MessageHandlers will cause deadlocks.
|
||||||
|
{
|
||||||
|
CritScope cs(&crit_);
|
||||||
|
// On the first pass, check for delayed messages that have been
|
||||||
|
// triggered and calculate the next trigger time.
|
||||||
|
if (first_pass) {
|
||||||
|
first_pass = false;
|
||||||
|
while (!dmsgq_.empty()) {
|
||||||
|
if (msCurrent < dmsgq_.top().msTrigger_) {
|
||||||
|
cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
msgq_.push_back(dmsgq_.top().msg_);
|
||||||
|
dmsgq_.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Pull a message off the message queue, if available.
|
||||||
|
if (msgq_.empty()) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
*pmsg = msgq_.front();
|
||||||
|
msgq_.pop_front();
|
||||||
|
}
|
||||||
|
} // crit_ is released here.
|
||||||
|
|
||||||
|
// Log a warning for time-sensitive messages that we're late to deliver.
|
||||||
|
if (pmsg->ts_sensitive) {
|
||||||
|
int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
|
||||||
|
if (delay > 0) {
|
||||||
|
RTC_LOG_F(LS_WARNING)
|
||||||
|
<< "id: " << pmsg->message_id
|
||||||
|
<< " delay: " << (delay + kMaxMsgLatency) << "ms";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If this was a dispose message, delete it and skip it.
|
||||||
|
if (MQID_DISPOSE == pmsg->message_id) {
|
||||||
|
RTC_DCHECK(nullptr == pmsg->phandler);
|
||||||
|
delete pmsg->pdata;
|
||||||
|
*pmsg = Message();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsQuitting())
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Which is shorter, the delay wait or the asked wait?
|
||||||
|
|
||||||
|
int64_t cmsNext;
|
||||||
|
if (cmsWait == kForever) {
|
||||||
|
cmsNext = cmsDelayNext;
|
||||||
|
} else {
|
||||||
|
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
|
||||||
|
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
|
||||||
|
cmsNext = cmsDelayNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Wait and multiplex in the meantime
|
||||||
|
if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the specified timeout expired, return
|
||||||
|
|
||||||
|
msCurrent = TimeMillis();
|
||||||
|
cmsElapsed = TimeDiff(msCurrent, msStart);
|
||||||
|
if (cmsWait != kForever) {
|
||||||
|
if (cmsElapsed >= cmsWait)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::Post(const Location& posted_from,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageData* pdata,
|
||||||
|
bool time_sensitive) {
|
||||||
|
if (IsQuitting()) {
|
||||||
|
delete pdata;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep thread safe
|
||||||
|
// Add the message to the end of the queue
|
||||||
|
// Signal for the multiplexer to return
|
||||||
|
|
||||||
|
{
|
||||||
|
CritScope cs(&crit_);
|
||||||
|
Message msg;
|
||||||
|
msg.posted_from = posted_from;
|
||||||
|
msg.phandler = phandler;
|
||||||
|
msg.message_id = id;
|
||||||
|
msg.pdata = pdata;
|
||||||
|
if (time_sensitive) {
|
||||||
|
msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
|
||||||
|
}
|
||||||
|
msgq_.push_back(msg);
|
||||||
|
}
|
||||||
|
WakeUpSocketServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::PostDelayed(const Location& posted_from,
|
||||||
|
int cmsDelay,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageData* pdata) {
|
||||||
|
return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
|
||||||
|
pdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::PostAt(const Location& posted_from,
|
||||||
|
uint32_t tstamp,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageData* pdata) {
|
||||||
|
// This should work even if it is used (unexpectedly).
|
||||||
|
int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
|
||||||
|
return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::PostAt(const Location& posted_from,
|
||||||
|
int64_t tstamp,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageData* pdata) {
|
||||||
|
return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
|
||||||
|
pdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::DoDelayPost(const Location& posted_from,
|
||||||
|
int64_t cmsDelay,
|
||||||
|
int64_t tstamp,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageData* pdata) {
|
||||||
|
if (IsQuitting()) {
|
||||||
|
delete pdata;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep thread safe
|
||||||
|
// Add to the priority queue. Gets sorted soonest first.
|
||||||
|
// Signal for the multiplexer to return.
|
||||||
|
|
||||||
|
{
|
||||||
|
CritScope cs(&crit_);
|
||||||
|
Message msg;
|
||||||
|
msg.posted_from = posted_from;
|
||||||
|
msg.phandler = phandler;
|
||||||
|
msg.message_id = id;
|
||||||
|
msg.pdata = pdata;
|
||||||
|
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
|
||||||
|
dmsgq_.push(dmsg);
|
||||||
|
// If this message queue processes 1 message every millisecond for 50 days,
|
||||||
|
// we will wrap this number. Even then, only messages with identical times
|
||||||
|
// will be misordered, and then only briefly. This is probably ok.
|
||||||
|
++dmsgq_next_num_;
|
||||||
|
RTC_DCHECK_NE(0, dmsgq_next_num_);
|
||||||
|
}
|
||||||
|
WakeUpSocketServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
int Thread::GetDelay() {
|
||||||
|
CritScope cs(&crit_);
|
||||||
|
|
||||||
|
if (!msgq_.empty())
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (!dmsgq_.empty()) {
|
||||||
|
int delay = TimeUntil(dmsgq_.top().msTrigger_);
|
||||||
|
if (delay < 0)
|
||||||
|
delay = 0;
|
||||||
|
return delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
return kForever;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::ClearInternal(MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageList* removed) {
|
||||||
|
// Remove messages with phandler
|
||||||
|
|
||||||
|
if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
|
||||||
|
if (removed) {
|
||||||
|
removed->push_back(msgPeek_);
|
||||||
|
} else {
|
||||||
|
delete msgPeek_.pdata;
|
||||||
|
}
|
||||||
|
fPeekKeep_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from ordered message queue
|
||||||
|
|
||||||
|
for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
|
||||||
|
if (it->Match(phandler, id)) {
|
||||||
|
if (removed) {
|
||||||
|
removed->push_back(*it);
|
||||||
|
} else {
|
||||||
|
delete it->pdata;
|
||||||
|
}
|
||||||
|
it = msgq_.erase(it);
|
||||||
|
} else {
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from priority queue. Not directly iterable, so use this approach
|
||||||
|
|
||||||
|
PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
|
||||||
|
for (PriorityQueue::container_type::iterator it = new_end;
|
||||||
|
it != dmsgq_.container().end(); ++it) {
|
||||||
|
if (it->msg_.Match(phandler, id)) {
|
||||||
|
if (removed) {
|
||||||
|
removed->push_back(it->msg_);
|
||||||
|
} else {
|
||||||
|
delete it->msg_.pdata;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
*new_end++ = *it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dmsgq_.container().erase(new_end, dmsgq_.container().end());
|
||||||
|
dmsgq_.reheap();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Thread::Dispatch(Message* pmsg) {
|
||||||
|
TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
|
||||||
|
pmsg->posted_from.file_name(), "src_func",
|
||||||
|
pmsg->posted_from.function_name());
|
||||||
|
int64_t start_time = TimeMillis();
|
||||||
|
pmsg->phandler->OnMessage(pmsg);
|
||||||
|
int64_t end_time = TimeMillis();
|
||||||
|
int64_t diff = TimeDiff(end_time, start_time);
|
||||||
|
if (diff >= kSlowDispatchLoggingThreshold) {
|
||||||
|
RTC_LOG(LS_INFO) << "Message took " << diff
|
||||||
|
<< "ms to dispatch. Posted from: "
|
||||||
|
<< pmsg->posted_from.ToString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool Thread::IsCurrent() const {
|
bool Thread::IsCurrent() const {
|
||||||
return ThreadManager::Instance()->CurrentThread() == this;
|
return ThreadManager::Instance()->CurrentThread() == this;
|
||||||
}
|
}
|
||||||
@ -379,7 +829,7 @@ bool Thread::IsOwned() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Thread::Stop() {
|
void Thread::Stop() {
|
||||||
MessageQueue::Quit();
|
Thread::Quit();
|
||||||
Join();
|
Join();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -436,7 +886,7 @@ void Thread::Send(const Location& posted_from,
|
|||||||
crit_.Leave();
|
crit_.Leave();
|
||||||
|
|
||||||
// Our Wait loop above may have consumed some WakeUp events for this
|
// Our Wait loop above may have consumed some WakeUp events for this
|
||||||
// MessageQueue, that weren't relevant to this Send. Losing these WakeUps can
|
// Thread, that weren't relevant to this Send. Losing these WakeUps can
|
||||||
// cause problems for some SocketServers.
|
// cause problems for some SocketServers.
|
||||||
//
|
//
|
||||||
// Concrete example:
|
// Concrete example:
|
||||||
@ -510,7 +960,7 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
|
|||||||
RTC_DCHECK(msg);
|
RTC_DCHECK(msg);
|
||||||
auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
|
auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
|
||||||
std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data());
|
std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data());
|
||||||
// MessageQueue expects handler to own Message::pdata when OnMessage is called
|
// Thread expects handler to own Message::pdata when OnMessage is called
|
||||||
// Since MessageData is no longer needed, delete it.
|
// Since MessageData is no longer needed, delete it.
|
||||||
delete data;
|
delete data;
|
||||||
|
|
||||||
@ -542,8 +992,7 @@ void Thread::Delete() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool Thread::IsProcessingMessagesForTesting() {
|
bool Thread::IsProcessingMessagesForTesting() {
|
||||||
return (owned_ || IsCurrent()) &&
|
return (owned_ || IsCurrent()) && !IsQuitting();
|
||||||
MessageQueue::IsProcessingMessagesForTesting();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Thread::Clear(MessageHandler* phandler,
|
void Thread::Clear(MessageHandler* phandler,
|
||||||
@ -642,7 +1091,7 @@ MessageHandler* Thread::GetPostTaskMessageHandler() {
|
|||||||
AutoThread::AutoThread()
|
AutoThread::AutoThread()
|
||||||
: Thread(SocketServer::CreateDefault(), /*do_init=*/false) {
|
: Thread(SocketServer::CreateDefault(), /*do_init=*/false) {
|
||||||
if (!ThreadManager::Instance()->CurrentThread()) {
|
if (!ThreadManager::Instance()->CurrentThread()) {
|
||||||
// DoInit registers with MessageQueueManager. Do that only if we intend to
|
// DoInit registers with ThreadManager. Do that only if we intend to
|
||||||
// be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
|
// be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
|
||||||
// post a message to a queue that no running thread is serving.
|
// post a message to a queue that no running thread is serving.
|
||||||
DoInit();
|
DoInit();
|
||||||
@ -667,7 +1116,7 @@ AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
|
|||||||
rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
|
rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
|
||||||
rtc::ThreadManager::Instance()->SetCurrentThread(this);
|
rtc::ThreadManager::Instance()->SetCurrentThread(this);
|
||||||
if (old_thread_) {
|
if (old_thread_) {
|
||||||
MessageQueueManager::Remove(old_thread_);
|
ThreadManager::Remove(old_thread_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -679,7 +1128,7 @@ AutoSocketServerThread::~AutoSocketServerThread() {
|
|||||||
// cricket::Connection::Destroy.
|
// cricket::Connection::Destroy.
|
||||||
ProcessMessages(0);
|
ProcessMessages(0);
|
||||||
// Stop and destroy the thread before clearing it as the current thread.
|
// Stop and destroy the thread before clearing it as the current thread.
|
||||||
// Sometimes there are messages left in the MessageQueue that will be
|
// Sometimes there are messages left in the Thread that will be
|
||||||
// destroyed by DoDestroy, and sometimes the destructors of the message and/or
|
// destroyed by DoDestroy, and sometimes the destructors of the message and/or
|
||||||
// its contents rely on this thread still being set as the current thread.
|
// its contents rely on this thread still being set as the current thread.
|
||||||
Stop();
|
Stop();
|
||||||
@ -687,7 +1136,7 @@ AutoSocketServerThread::~AutoSocketServerThread() {
|
|||||||
rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
|
rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
|
||||||
rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
|
rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
|
||||||
if (old_thread_) {
|
if (old_thread_) {
|
||||||
MessageQueueManager::Add(old_thread_);
|
ThreadManager::Add(old_thread_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -15,8 +15,10 @@
|
|||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <queue>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
#if defined(WEBRTC_POSIX)
|
#if defined(WEBRTC_POSIX)
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
@ -25,13 +27,14 @@
|
|||||||
#include "api/task_queue/queued_task.h"
|
#include "api/task_queue/queued_task.h"
|
||||||
#include "api/task_queue/task_queue_base.h"
|
#include "api/task_queue/task_queue_base.h"
|
||||||
#include "rtc_base/constructor_magic.h"
|
#include "rtc_base/constructor_magic.h"
|
||||||
|
#include "rtc_base/critical_section.h"
|
||||||
#include "rtc_base/location.h"
|
#include "rtc_base/location.h"
|
||||||
#include "rtc_base/message_handler.h"
|
#include "rtc_base/message_handler.h"
|
||||||
#include "rtc_base/message_queue.h"
|
|
||||||
#include "rtc_base/platform_thread_types.h"
|
#include "rtc_base/platform_thread_types.h"
|
||||||
#include "rtc_base/socket_server.h"
|
#include "rtc_base/socket_server.h"
|
||||||
#include "rtc_base/system/rtc_export.h"
|
#include "rtc_base/system/rtc_export.h"
|
||||||
#include "rtc_base/thread_annotations.h"
|
#include "rtc_base/thread_annotations.h"
|
||||||
|
#include "rtc_base/thread_message.h"
|
||||||
|
|
||||||
#if defined(WEBRTC_WIN)
|
#if defined(WEBRTC_WIN)
|
||||||
#include "rtc_base/win32.h"
|
#include "rtc_base/win32.h"
|
||||||
@ -73,6 +76,18 @@ class RTC_EXPORT ThreadManager {
|
|||||||
// Singleton, constructor and destructor are private.
|
// Singleton, constructor and destructor are private.
|
||||||
static ThreadManager* Instance();
|
static ThreadManager* Instance();
|
||||||
|
|
||||||
|
static void Add(Thread* message_queue);
|
||||||
|
static void Remove(Thread* message_queue);
|
||||||
|
static void Clear(MessageHandler* handler);
|
||||||
|
|
||||||
|
// TODO(nisse): Delete alias, as soon as downstream code is updated.
|
||||||
|
static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }
|
||||||
|
|
||||||
|
// For testing purposes, for use with a simulated clock.
|
||||||
|
// Ensures that all message queues have processed delayed messages
|
||||||
|
// up until the current point in time.
|
||||||
|
static void ProcessAllMessageQueuesForTesting();
|
||||||
|
|
||||||
Thread* CurrentThread();
|
Thread* CurrentThread();
|
||||||
void SetCurrentThread(Thread* thread);
|
void SetCurrentThread(Thread* thread);
|
||||||
|
|
||||||
@ -98,6 +113,20 @@ class RTC_EXPORT ThreadManager {
|
|||||||
ThreadManager();
|
ThreadManager();
|
||||||
~ThreadManager();
|
~ThreadManager();
|
||||||
|
|
||||||
|
void AddInternal(Thread* message_queue);
|
||||||
|
void RemoveInternal(Thread* message_queue);
|
||||||
|
void ClearInternal(MessageHandler* handler);
|
||||||
|
void ProcessAllMessageQueuesInternal();
|
||||||
|
|
||||||
|
// This list contains all live Threads.
|
||||||
|
std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);
|
||||||
|
|
||||||
|
// Methods that don't modify the list of message queues may be called in a
|
||||||
|
// re-entrant fashion. "processing_" keeps track of the depth of re-entrant
|
||||||
|
// calls.
|
||||||
|
CriticalSection crit_;
|
||||||
|
size_t processing_ RTC_GUARDED_BY(crit_) = 0;
|
||||||
|
|
||||||
#if defined(WEBRTC_POSIX)
|
#if defined(WEBRTC_POSIX)
|
||||||
pthread_key_t key_;
|
pthread_key_t key_;
|
||||||
#endif
|
#endif
|
||||||
@ -121,11 +150,18 @@ struct _SendMessage {
|
|||||||
|
|
||||||
// WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread().
|
// WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread().
|
||||||
|
|
||||||
class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
|
||||||
public webrtc::TaskQueueBase {
|
|
||||||
public:
|
public:
|
||||||
|
static const int kForever = -1;
|
||||||
|
|
||||||
|
// Create a new Thread and optionally assign it to the passed
|
||||||
|
// SocketServer. Subclasses that override Clear should pass false for
|
||||||
|
// init_queue and call DoInit() from their constructor to prevent races
|
||||||
|
// with the ThreadManager using the object while the vtable is still
|
||||||
|
// being created.
|
||||||
explicit Thread(SocketServer* ss);
|
explicit Thread(SocketServer* ss);
|
||||||
explicit Thread(std::unique_ptr<SocketServer> ss);
|
explicit Thread(std::unique_ptr<SocketServer> ss);
|
||||||
|
|
||||||
// Constructors meant for subclasses; they should call DoInit themselves and
|
// Constructors meant for subclasses; they should call DoInit themselves and
|
||||||
// pass false for |do_init|, so that DoInit is called only on the fully
|
// pass false for |do_init|, so that DoInit is called only on the fully
|
||||||
// instantiated class, which avoids a vptr data race.
|
// instantiated class, which avoids a vptr data race.
|
||||||
@ -136,6 +172,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
|||||||
// guarantee Stop() is explicitly called before the subclass is destroyed).
|
// guarantee Stop() is explicitly called before the subclass is destroyed).
|
||||||
// This is required to avoid a data race between the destructor modifying the
|
// This is required to avoid a data race between the destructor modifying the
|
||||||
// vtable, and the Thread::PreRun calling the virtual method Run().
|
// vtable, and the Thread::PreRun calling the virtual method Run().
|
||||||
|
|
||||||
|
// NOTE: SUBCLASSES OF Thread THAT OVERRIDE Clear MUST CALL
|
||||||
|
// DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race
|
||||||
|
// between the destructor modifying the vtable, and the ThreadManager
|
||||||
|
// calling Clear on the object from a different thread.
|
||||||
~Thread() override;
|
~Thread() override;
|
||||||
|
|
||||||
static std::unique_ptr<Thread> CreateWithSocketServer();
|
static std::unique_ptr<Thread> CreateWithSocketServer();
|
||||||
@ -159,6 +200,78 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
|||||||
const bool previous_state_;
|
const bool previous_state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
SocketServer* socketserver();
|
||||||
|
|
||||||
|
// Note: The behavior of Thread has changed. When a thread is stopped,
|
||||||
|
// futher Posts and Sends will fail. However, any pending Sends and *ready*
|
||||||
|
// Posts (as opposed to unexpired delayed Posts) will be delivered before
|
||||||
|
// Get (or Peek) returns false. By guaranteeing delivery of those messages,
|
||||||
|
// we eliminate the race condition when an MessageHandler and Thread
|
||||||
|
// may be destroyed independently of each other.
|
||||||
|
virtual void Quit();
|
||||||
|
virtual bool IsQuitting();
|
||||||
|
virtual void Restart();
|
||||||
|
// Not all message queues actually process messages (such as SignalThread).
|
||||||
|
// In those cases, it's important to know, before posting, that it won't be
|
||||||
|
// Processed. Normally, this would be true until IsQuitting() is true.
|
||||||
|
virtual bool IsProcessingMessagesForTesting();
|
||||||
|
|
||||||
|
// Get() will process I/O until:
|
||||||
|
// 1) A message is available (returns true)
|
||||||
|
// 2) cmsWait seconds have elapsed (returns false)
|
||||||
|
// 3) Stop() is called (returns false)
|
||||||
|
virtual bool Get(Message* pmsg,
|
||||||
|
int cmsWait = kForever,
|
||||||
|
bool process_io = true);
|
||||||
|
virtual bool Peek(Message* pmsg, int cmsWait = 0);
|
||||||
|
virtual void Post(const Location& posted_from,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id = 0,
|
||||||
|
MessageData* pdata = nullptr,
|
||||||
|
bool time_sensitive = false);
|
||||||
|
virtual void PostDelayed(const Location& posted_from,
|
||||||
|
int cmsDelay,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id = 0,
|
||||||
|
MessageData* pdata = nullptr);
|
||||||
|
virtual void PostAt(const Location& posted_from,
|
||||||
|
int64_t tstamp,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id = 0,
|
||||||
|
MessageData* pdata = nullptr);
|
||||||
|
// TODO(honghaiz): Remove this when all the dependencies are removed.
|
||||||
|
virtual void PostAt(const Location& posted_from,
|
||||||
|
uint32_t tstamp,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id = 0,
|
||||||
|
MessageData* pdata = nullptr);
|
||||||
|
virtual void Clear(MessageHandler* phandler,
|
||||||
|
uint32_t id = MQID_ANY,
|
||||||
|
MessageList* removed = nullptr);
|
||||||
|
virtual void Dispatch(Message* pmsg);
|
||||||
|
virtual void ReceiveSends();
|
||||||
|
|
||||||
|
// Amount of time until the next message can be retrieved
|
||||||
|
virtual int GetDelay();
|
||||||
|
|
||||||
|
bool empty() const { return size() == 0u; }
|
||||||
|
size_t size() const {
|
||||||
|
CritScope cs(&crit_); // msgq_.size() is not thread safe.
|
||||||
|
return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internally posts a message which causes the doomed object to be deleted
|
||||||
|
template <class T>
|
||||||
|
void Dispose(T* doomed) {
|
||||||
|
if (doomed) {
|
||||||
|
Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// When this signal is sent out, any references to this queue should
|
||||||
|
// no longer be used.
|
||||||
|
sigslot::signal0<> SignalQueueDestroyed;
|
||||||
|
|
||||||
bool IsCurrent() const;
|
bool IsCurrent() const;
|
||||||
|
|
||||||
// Sleeps the calling thread for the specified number of milliseconds, during
|
// Sleeps the calling thread for the specified number of milliseconds, during
|
||||||
@ -176,7 +289,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
|||||||
|
|
||||||
// Tells the thread to stop and waits until it is joined.
|
// Tells the thread to stop and waits until it is joined.
|
||||||
// Never call Stop on the current thread. Instead use the inherited Quit
|
// Never call Stop on the current thread. Instead use the inherited Quit
|
||||||
// function which will exit the base MessageQueue without terminating the
|
// function which will exit the base Thread without terminating the
|
||||||
// underlying OS thread.
|
// underlying OS thread.
|
||||||
virtual void Stop();
|
virtual void Stop();
|
||||||
|
|
||||||
@ -272,13 +385,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
|||||||
uint32_t milliseconds) override;
|
uint32_t milliseconds) override;
|
||||||
void Delete() override;
|
void Delete() override;
|
||||||
|
|
||||||
// From MessageQueue
|
|
||||||
bool IsProcessingMessagesForTesting() override;
|
|
||||||
void Clear(MessageHandler* phandler,
|
|
||||||
uint32_t id = MQID_ANY,
|
|
||||||
MessageList* removed = nullptr) override;
|
|
||||||
void ReceiveSends() override;
|
|
||||||
|
|
||||||
// ProcessMessages will process I/O and dispatch messages until:
|
// ProcessMessages will process I/O and dispatch messages until:
|
||||||
// 1) cms milliseconds have elapsed (returns true)
|
// 1) cms milliseconds have elapsed (returns true)
|
||||||
// 2) Stop() is called (returns false)
|
// 2) Stop() is called (returns false)
|
||||||
@ -321,6 +427,35 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
class PriorityQueue : public std::priority_queue<DelayedMessage> {
|
||||||
|
public:
|
||||||
|
container_type& container() { return c; }
|
||||||
|
void reheap() { make_heap(c.begin(), c.end(), comp); }
|
||||||
|
};
|
||||||
|
|
||||||
|
void DoDelayPost(const Location& posted_from,
|
||||||
|
int64_t cmsDelay,
|
||||||
|
int64_t tstamp,
|
||||||
|
MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageData* pdata);
|
||||||
|
|
||||||
|
// Perform initialization, subclasses must call this from their constructor
|
||||||
|
// if false was passed as init_queue to the Thread constructor.
|
||||||
|
void DoInit();
|
||||||
|
|
||||||
|
// Does not take any lock. Must be called either while holding crit_, or by
|
||||||
|
// the destructor (by definition, the latter has exclusive access).
|
||||||
|
void ClearInternal(MessageHandler* phandler,
|
||||||
|
uint32_t id,
|
||||||
|
MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
|
||||||
|
|
||||||
|
// Perform cleanup; subclasses must call this from the destructor,
|
||||||
|
// and are not expected to actually hold the lock.
|
||||||
|
void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
|
||||||
|
|
||||||
|
void WakeUpSocketServer();
|
||||||
|
|
||||||
// Same as WrapCurrent except that it never fails as it does not try to
|
// Same as WrapCurrent except that it never fails as it does not try to
|
||||||
// acquire the synchronization access of the thread. The caller should never
|
// acquire the synchronization access of the thread. The caller should never
|
||||||
// call Stop() or Join() on this thread.
|
// call Stop() or Join() on this thread.
|
||||||
@ -333,6 +468,8 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
|||||||
|
|
||||||
friend class ScopedDisallowBlockingCalls;
|
friend class ScopedDisallowBlockingCalls;
|
||||||
|
|
||||||
|
CriticalSection* CritForTest() { return &crit_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class QueuedTaskHandler final : public MessageHandler {
|
class QueuedTaskHandler final : public MessageHandler {
|
||||||
public:
|
public:
|
||||||
@ -377,6 +514,22 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
|
|||||||
// MessageLikeTask payload data.
|
// MessageLikeTask payload data.
|
||||||
static MessageHandler* GetPostTaskMessageHandler();
|
static MessageHandler* GetPostTaskMessageHandler();
|
||||||
|
|
||||||
|
bool fPeekKeep_;
|
||||||
|
Message msgPeek_;
|
||||||
|
MessageList msgq_ RTC_GUARDED_BY(crit_);
|
||||||
|
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
|
||||||
|
uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
|
||||||
|
CriticalSection crit_;
|
||||||
|
bool fInitialized_;
|
||||||
|
bool fDestroyed_;
|
||||||
|
|
||||||
|
volatile int stop_;
|
||||||
|
|
||||||
|
// The SocketServer might not be owned by Thread.
|
||||||
|
SocketServer* const ss_;
|
||||||
|
// Used if SocketServer ownership lies with |this|.
|
||||||
|
std::unique_ptr<SocketServer> own_ss_;
|
||||||
|
|
||||||
std::list<_SendMessage> sendlist_;
|
std::list<_SendMessage> sendlist_;
|
||||||
std::string name_;
|
std::string name_;
|
||||||
|
|
||||||
@ -437,6 +590,10 @@ class AutoSocketServerThread : public Thread {
|
|||||||
RTC_DISALLOW_COPY_AND_ASSIGN(AutoSocketServerThread);
|
RTC_DISALLOW_COPY_AND_ASSIGN(AutoSocketServerThread);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// TODO(srte): Remove these when all dependencies has been updated.
|
||||||
|
using MessageQueue = Thread;
|
||||||
|
using MessageQueueManager = ThreadManager;
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
|
||||||
#endif // RTC_BASE_THREAD_H_
|
#endif // RTC_BASE_THREAD_H_
|
||||||
|
|||||||
141
rtc_base/thread_message.h
Normal file
141
rtc_base/thread_message.h
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#ifndef RTC_BASE_THREAD_MESSAGE_H_
|
||||||
|
#define RTC_BASE_THREAD_MESSAGE_H_
|
||||||
|
|
||||||
|
#include <list>
|
||||||
|
#include <memory>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include "api/scoped_refptr.h"
|
||||||
|
#include "rtc_base/location.h"
|
||||||
|
#include "rtc_base/message_handler.h"
|
||||||
|
|
||||||
|
namespace rtc {
|
||||||
|
|
||||||
|
// Derive from this for specialized data
|
||||||
|
// App manages lifetime, except when messages are purged
|
||||||
|
|
||||||
|
class MessageData {
|
||||||
|
public:
|
||||||
|
MessageData() {}
|
||||||
|
virtual ~MessageData() {}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
class TypedMessageData : public MessageData {
|
||||||
|
public:
|
||||||
|
explicit TypedMessageData(const T& data) : data_(data) {}
|
||||||
|
const T& data() const { return data_; }
|
||||||
|
T& data() { return data_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
T data_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Like TypedMessageData, but for pointers that require a delete.
|
||||||
|
template <class T>
|
||||||
|
class ScopedMessageData : public MessageData {
|
||||||
|
public:
|
||||||
|
explicit ScopedMessageData(std::unique_ptr<T> data)
|
||||||
|
: data_(std::move(data)) {}
|
||||||
|
// Deprecated.
|
||||||
|
// TODO(deadbeef): Remove this once downstream applications stop using it.
|
||||||
|
explicit ScopedMessageData(T* data) : data_(data) {}
|
||||||
|
// Deprecated.
|
||||||
|
// TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
|
||||||
|
// this once downstream applications stop using it, then rename inner_data to
|
||||||
|
// just data.
|
||||||
|
const std::unique_ptr<T>& data() const { return data_; }
|
||||||
|
std::unique_ptr<T>& data() { return data_; }
|
||||||
|
|
||||||
|
const T& inner_data() const { return *data_; }
|
||||||
|
T& inner_data() { return *data_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::unique_ptr<T> data_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Like ScopedMessageData, but for reference counted pointers.
|
||||||
|
template <class T>
|
||||||
|
class ScopedRefMessageData : public MessageData {
|
||||||
|
public:
|
||||||
|
explicit ScopedRefMessageData(T* data) : data_(data) {}
|
||||||
|
const scoped_refptr<T>& data() const { return data_; }
|
||||||
|
scoped_refptr<T>& data() { return data_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
scoped_refptr<T> data_;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
inline MessageData* WrapMessageData(const T& data) {
|
||||||
|
return new TypedMessageData<T>(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
inline const T& UseMessageData(MessageData* data) {
|
||||||
|
return static_cast<TypedMessageData<T>*>(data)->data();
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
class DisposeData : public MessageData {
|
||||||
|
public:
|
||||||
|
explicit DisposeData(T* data) : data_(data) {}
|
||||||
|
virtual ~DisposeData() { delete data_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
T* data_;
|
||||||
|
};
|
||||||
|
|
||||||
|
const uint32_t MQID_ANY = static_cast<uint32_t>(-1);
|
||||||
|
const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2);
|
||||||
|
|
||||||
|
// No destructor
|
||||||
|
|
||||||
|
struct Message {
|
||||||
|
Message()
|
||||||
|
: phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
|
||||||
|
inline bool Match(MessageHandler* handler, uint32_t id) const {
|
||||||
|
return (handler == nullptr || handler == phandler) &&
|
||||||
|
(id == MQID_ANY || id == message_id);
|
||||||
|
}
|
||||||
|
Location posted_from;
|
||||||
|
MessageHandler* phandler;
|
||||||
|
uint32_t message_id;
|
||||||
|
MessageData* pdata;
|
||||||
|
int64_t ts_sensitive;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef std::list<Message> MessageList;
|
||||||
|
|
||||||
|
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
|
||||||
|
// with the same trigger time are processed in num_ (FIFO) order.
|
||||||
|
|
||||||
|
class DelayedMessage {
|
||||||
|
public:
|
||||||
|
DelayedMessage(int64_t delay,
|
||||||
|
int64_t trigger,
|
||||||
|
uint32_t num,
|
||||||
|
const Message& msg)
|
||||||
|
: cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
|
||||||
|
|
||||||
|
bool operator<(const DelayedMessage& dmsg) const {
|
||||||
|
return (dmsg.msTrigger_ < msTrigger_) ||
|
||||||
|
((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t cmsDelay_; // for debugging
|
||||||
|
int64_t msTrigger_;
|
||||||
|
uint32_t num_;
|
||||||
|
Message msg_;
|
||||||
|
};
|
||||||
|
} // namespace rtc
|
||||||
|
#endif // RTC_BASE_THREAD_MESSAGE_H_
|
||||||
@ -264,7 +264,7 @@ int32_t MediaCodecVideoDecoder::ResetDecodeOnCodecThread() {
|
|||||||
<< ". Frames decoded: " << frames_decoded_;
|
<< ". Frames decoded: " << frames_decoded_;
|
||||||
|
|
||||||
inited_ = false;
|
inited_ = false;
|
||||||
rtc::MessageQueueManager::Clear(this);
|
rtc::ThreadManager::Clear(this);
|
||||||
ResetVariables();
|
ResetVariables();
|
||||||
|
|
||||||
Java_MediaCodecVideoDecoder_reset(jni, j_media_codec_video_decoder_,
|
Java_MediaCodecVideoDecoder_reset(jni, j_media_codec_video_decoder_,
|
||||||
@ -300,7 +300,7 @@ int32_t MediaCodecVideoDecoder::ReleaseOnCodecThread() {
|
|||||||
input_buffers_.clear();
|
input_buffers_.clear();
|
||||||
Java_MediaCodecVideoDecoder_release(jni, j_media_codec_video_decoder_);
|
Java_MediaCodecVideoDecoder_release(jni, j_media_codec_video_decoder_);
|
||||||
inited_ = false;
|
inited_ = false;
|
||||||
rtc::MessageQueueManager::Clear(this);
|
rtc::ThreadManager::Clear(this);
|
||||||
if (CheckException(jni)) {
|
if (CheckException(jni)) {
|
||||||
ALOGE << "Decoder release exception";
|
ALOGE << "Decoder release exception";
|
||||||
return WEBRTC_VIDEO_CODEC_ERROR;
|
return WEBRTC_VIDEO_CODEC_ERROR;
|
||||||
|
|||||||
@ -96,7 +96,7 @@
|
|||||||
[self.audioSession notifyDidBeginInterruption];
|
[self.audioSession notifyDidBeginInterruption];
|
||||||
|
|
||||||
// Wait for notification to propagate.
|
// Wait for notification to propagate.
|
||||||
rtc::MessageQueueManager::ProcessAllMessageQueuesForTesting();
|
rtc::ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
XCTAssertTrue(_audio_device->IsInterrupted());
|
XCTAssertTrue(_audio_device->IsInterrupted());
|
||||||
|
|
||||||
// Force it for testing.
|
// Force it for testing.
|
||||||
@ -104,7 +104,7 @@
|
|||||||
|
|
||||||
[self.audioSession notifyDidEndInterruptionWithShouldResumeSession:YES];
|
[self.audioSession notifyDidEndInterruptionWithShouldResumeSession:YES];
|
||||||
// Wait for notification to propagate.
|
// Wait for notification to propagate.
|
||||||
rtc::MessageQueueManager::ProcessAllMessageQueuesForTesting();
|
rtc::ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
XCTAssertTrue(_audio_device->IsInterrupted());
|
XCTAssertTrue(_audio_device->IsInterrupted());
|
||||||
|
|
||||||
_audio_device->Init();
|
_audio_device->Init();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user