Delete rtc::Thread functions that use rtc::MessageHandler
Bug: webrtc:9702 Change-Id: I6fc8aa8a793caf19d62a149db1861c352c609255 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/275774 Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org> Cr-Commit-Position: refs/heads/main@{#38150}
This commit is contained in:
parent
52dd1a566a
commit
d44e3410b6
@ -76,25 +76,6 @@ namespace {
|
|||||||
|
|
||||||
using ::webrtc::TimeDelta;
|
using ::webrtc::TimeDelta;
|
||||||
|
|
||||||
struct AnyInvocableMessage final : public MessageData {
|
|
||||||
explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task)
|
|
||||||
: task(std::move(task)) {}
|
|
||||||
absl::AnyInvocable<void() &&> task;
|
|
||||||
};
|
|
||||||
|
|
||||||
class AnyInvocableMessageHandler final : public MessageHandler {
|
|
||||||
public:
|
|
||||||
void OnMessage(Message* msg) override {
|
|
||||||
std::move(static_cast<AnyInvocableMessage*>(msg->pdata)->task)();
|
|
||||||
delete msg->pdata;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
MessageHandler* GetAnyInvocableMessageHandler() {
|
|
||||||
static MessageHandler* const handler = new AnyInvocableMessageHandler;
|
|
||||||
return handler;
|
|
||||||
}
|
|
||||||
|
|
||||||
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
|
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
|
||||||
public:
|
public:
|
||||||
MarkProcessingCritScope(const RecursiveCriticalSection* cs,
|
MarkProcessingCritScope(const RecursiveCriticalSection* cs,
|
||||||
@ -407,7 +388,9 @@ void Thread::DoDestroy() {
|
|||||||
ss_->SetMessageQueue(nullptr);
|
ss_->SetMessageQueue(nullptr);
|
||||||
}
|
}
|
||||||
ThreadManager::Remove(this);
|
ThreadManager::Remove(this);
|
||||||
ClearInternal(nullptr, MQID_ANY, nullptr);
|
// Clear.
|
||||||
|
messages_ = {};
|
||||||
|
delayed_messages_ = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
SocketServer* Thread::socketserver() {
|
SocketServer* Thread::socketserver() {
|
||||||
@ -431,7 +414,7 @@ void Thread::Restart() {
|
|||||||
stop_.store(0, std::memory_order_release);
|
stop_.store(0, std::memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
|
absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
|
||||||
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
|
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
|
||||||
|
|
||||||
int64_t cmsTotal = cmsWait;
|
int64_t cmsTotal = cmsWait;
|
||||||
@ -448,19 +431,19 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
|
|||||||
// Check for delayed messages that have been triggered and calculate the
|
// Check for delayed messages that have been triggered and calculate the
|
||||||
// next trigger time.
|
// next trigger time.
|
||||||
while (!delayed_messages_.empty()) {
|
while (!delayed_messages_.empty()) {
|
||||||
if (msCurrent < delayed_messages_.top().run_time_ms_) {
|
if (msCurrent < delayed_messages_.top().run_time_ms) {
|
||||||
cmsDelayNext =
|
cmsDelayNext =
|
||||||
TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
|
TimeDiff(delayed_messages_.top().run_time_ms, msCurrent);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
messages_.push_back(delayed_messages_.top().msg_);
|
messages_.push(std::move(delayed_messages_.top().functor));
|
||||||
delayed_messages_.pop();
|
delayed_messages_.pop();
|
||||||
}
|
}
|
||||||
// Pull a message off the message queue, if available.
|
// Pull a message off the message queue, if available.
|
||||||
if (!messages_.empty()) {
|
if (!messages_.empty()) {
|
||||||
*pmsg = messages_.front();
|
absl::AnyInvocable<void()&&> task = std::move(messages_.front());
|
||||||
messages_.pop_front();
|
messages_.pop();
|
||||||
return true;
|
return task;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -482,8 +465,8 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
|
|||||||
// Wait and multiplex in the meantime
|
// Wait and multiplex in the meantime
|
||||||
if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
|
if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
|
||||||
: webrtc::TimeDelta::Millis(cmsNext),
|
: webrtc::TimeDelta::Millis(cmsNext),
|
||||||
process_io))
|
/*process_io=*/true))
|
||||||
return false;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the specified timeout expired, return
|
// If the specified timeout expired, return
|
||||||
@ -492,20 +475,14 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
|
|||||||
cmsElapsed = TimeDiff(msCurrent, msStart);
|
cmsElapsed = TimeDiff(msCurrent, msStart);
|
||||||
if (cmsWait != kForever) {
|
if (cmsWait != kForever) {
|
||||||
if (cmsElapsed >= cmsWait)
|
if (cmsElapsed >= cmsWait)
|
||||||
return false;
|
return nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Thread::Post(const Location& posted_from,
|
void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata,
|
|
||||||
bool time_sensitive) {
|
|
||||||
RTC_DCHECK(!time_sensitive);
|
|
||||||
if (IsQuitting()) {
|
if (IsQuitting()) {
|
||||||
delete pdata;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -515,42 +492,14 @@ void Thread::Post(const Location& posted_from,
|
|||||||
|
|
||||||
{
|
{
|
||||||
CritScope cs(&crit_);
|
CritScope cs(&crit_);
|
||||||
Message msg;
|
messages_.push(std::move(task));
|
||||||
msg.posted_from = posted_from;
|
|
||||||
msg.phandler = phandler;
|
|
||||||
msg.message_id = id;
|
|
||||||
msg.pdata = pdata;
|
|
||||||
messages_.push_back(msg);
|
|
||||||
}
|
}
|
||||||
WakeUpSocketServer();
|
WakeUpSocketServer();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Thread::PostDelayed(const Location& posted_from,
|
void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
|
||||||
int delay_ms,
|
webrtc::TimeDelta delay) {
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata) {
|
|
||||||
return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id,
|
|
||||||
pdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
void Thread::PostAt(const Location& posted_from,
|
|
||||||
int64_t run_at_ms,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata) {
|
|
||||||
return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id,
|
|
||||||
pdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
void Thread::DoDelayPost(const Location& posted_from,
|
|
||||||
int64_t delay_ms,
|
|
||||||
int64_t run_at_ms,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata) {
|
|
||||||
if (IsQuitting()) {
|
if (IsQuitting()) {
|
||||||
delete pdata;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -558,15 +507,14 @@ void Thread::DoDelayPost(const Location& posted_from,
|
|||||||
// Add to the priority queue. Gets sorted soonest first.
|
// Add to the priority queue. Gets sorted soonest first.
|
||||||
// Signal for the multiplexer to return.
|
// Signal for the multiplexer to return.
|
||||||
|
|
||||||
|
int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
|
||||||
|
int64_t run_time_ms = TimeAfter(delay_ms);
|
||||||
{
|
{
|
||||||
CritScope cs(&crit_);
|
CritScope cs(&crit_);
|
||||||
Message msg;
|
delayed_messages_.push({.delay_ms = delay_ms,
|
||||||
msg.posted_from = posted_from;
|
.run_time_ms = run_time_ms,
|
||||||
msg.phandler = phandler;
|
.message_number = delayed_next_num_,
|
||||||
msg.message_id = id;
|
.functor = std::move(task)});
|
||||||
msg.pdata = pdata;
|
|
||||||
DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg);
|
|
||||||
delayed_messages_.push(delayed);
|
|
||||||
// If this message queue processes 1 message every millisecond for 50 days,
|
// If this message queue processes 1 message every millisecond for 50 days,
|
||||||
// we will wrap this number. Even then, only messages with identical times
|
// we will wrap this number. Even then, only messages with identical times
|
||||||
// will be misordered, and then only briefly. This is probably ok.
|
// will be misordered, and then only briefly. This is probably ok.
|
||||||
@ -583,7 +531,7 @@ int Thread::GetDelay() {
|
|||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
if (!delayed_messages_.empty()) {
|
if (!delayed_messages_.empty()) {
|
||||||
int delay = TimeUntil(delayed_messages_.top().run_time_ms_);
|
int delay = TimeUntil(delayed_messages_.top().run_time_ms);
|
||||||
if (delay < 0)
|
if (delay < 0)
|
||||||
delay = 0;
|
delay = 0;
|
||||||
return delay;
|
return delay;
|
||||||
@ -592,56 +540,16 @@ int Thread::GetDelay() {
|
|||||||
return kForever;
|
return kForever;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Thread::ClearInternal(MessageHandler* phandler,
|
void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
|
||||||
uint32_t id,
|
TRACE_EVENT0("webrtc", "Thread::Dispatch");
|
||||||
MessageList* removed) {
|
|
||||||
// Remove from ordered message queue
|
|
||||||
|
|
||||||
for (auto it = messages_.begin(); it != messages_.end();) {
|
|
||||||
if (it->Match(phandler, id)) {
|
|
||||||
if (removed) {
|
|
||||||
removed->push_back(*it);
|
|
||||||
} else {
|
|
||||||
delete it->pdata;
|
|
||||||
}
|
|
||||||
it = messages_.erase(it);
|
|
||||||
} else {
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from priority queue. Not directly iterable, so use this approach
|
|
||||||
|
|
||||||
auto new_end = delayed_messages_.container().begin();
|
|
||||||
for (auto it = new_end; it != delayed_messages_.container().end(); ++it) {
|
|
||||||
if (it->msg_.Match(phandler, id)) {
|
|
||||||
if (removed) {
|
|
||||||
removed->push_back(it->msg_);
|
|
||||||
} else {
|
|
||||||
delete it->msg_.pdata;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
*new_end++ = *it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
delayed_messages_.container().erase(new_end,
|
|
||||||
delayed_messages_.container().end());
|
|
||||||
delayed_messages_.reheap();
|
|
||||||
}
|
|
||||||
|
|
||||||
void Thread::Dispatch(Message* pmsg) {
|
|
||||||
TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
|
|
||||||
pmsg->posted_from.file_name(), "src_func",
|
|
||||||
pmsg->posted_from.function_name());
|
|
||||||
RTC_DCHECK_RUN_ON(this);
|
RTC_DCHECK_RUN_ON(this);
|
||||||
int64_t start_time = TimeMillis();
|
int64_t start_time = TimeMillis();
|
||||||
pmsg->phandler->OnMessage(pmsg);
|
std::move(task)();
|
||||||
int64_t end_time = TimeMillis();
|
int64_t end_time = TimeMillis();
|
||||||
int64_t diff = TimeDiff(end_time, start_time);
|
int64_t diff = TimeDiff(end_time, start_time);
|
||||||
if (diff >= dispatch_warning_ms_) {
|
if (diff >= dispatch_warning_ms_) {
|
||||||
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
|
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
|
||||||
<< "ms to dispatch. Posted from: "
|
<< "ms to dispatch.";
|
||||||
<< pmsg->posted_from.ToString();
|
|
||||||
// To avoid log spew, move the warning limit to only give warning
|
// To avoid log spew, move the warning limit to only give warning
|
||||||
// for delays that are larger than the one observed.
|
// for delays that are larger than the one observed.
|
||||||
dispatch_warning_ms_ = diff + 1;
|
dispatch_warning_ms_ = diff + 1;
|
||||||
@ -986,39 +894,16 @@ void Thread::Delete() {
|
|||||||
delete this;
|
delete this;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
|
|
||||||
// Though Post takes MessageData by raw pointer (last parameter), it still
|
|
||||||
// takes it with ownership.
|
|
||||||
Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(),
|
|
||||||
/*id=*/0, new AnyInvocableMessage(std::move(task)));
|
|
||||||
}
|
|
||||||
|
|
||||||
void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
|
void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
|
||||||
webrtc::TimeDelta delay) {
|
webrtc::TimeDelta delay) {
|
||||||
// This implementation does not support low precision yet.
|
// This implementation does not support low precision yet.
|
||||||
PostDelayedHighPrecisionTask(std::move(task), delay);
|
PostDelayedHighPrecisionTask(std::move(task), delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
|
|
||||||
webrtc::TimeDelta delay) {
|
|
||||||
int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
|
|
||||||
// Though PostDelayed takes MessageData by raw pointer (last parameter),
|
|
||||||
// it still takes it with ownership.
|
|
||||||
PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(),
|
|
||||||
/*id=*/0, new AnyInvocableMessage(std::move(task)));
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Thread::IsProcessingMessagesForTesting() {
|
bool Thread::IsProcessingMessagesForTesting() {
|
||||||
return (owned_ || IsCurrent()) && !IsQuitting();
|
return (owned_ || IsCurrent()) && !IsQuitting();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Thread::Clear(MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageList* removed) {
|
|
||||||
CritScope cs(&crit_);
|
|
||||||
ClearInternal(phandler, id, removed);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Thread::ProcessMessages(int cmsLoop) {
|
bool Thread::ProcessMessages(int cmsLoop) {
|
||||||
// Using ProcessMessages with a custom clock for testing and a time greater
|
// Using ProcessMessages with a custom clock for testing and a time greater
|
||||||
// than 0 doesn't work, since it's not guaranteed to advance the custom
|
// than 0 doesn't work, since it's not guaranteed to advance the custom
|
||||||
@ -1032,10 +917,10 @@ bool Thread::ProcessMessages(int cmsLoop) {
|
|||||||
#if defined(WEBRTC_MAC)
|
#if defined(WEBRTC_MAC)
|
||||||
ScopedAutoReleasePool pool;
|
ScopedAutoReleasePool pool;
|
||||||
#endif
|
#endif
|
||||||
Message msg;
|
absl::AnyInvocable<void()&&> task = Get(cmsNext);
|
||||||
if (!Get(&msg, cmsNext))
|
if (!task)
|
||||||
return !IsQuitting();
|
return !IsQuitting();
|
||||||
Dispatch(&msg);
|
Dispatch(std::move(task));
|
||||||
|
|
||||||
if (cmsLoop != kForever) {
|
if (cmsLoop != kForever) {
|
||||||
cmsNext = static_cast<int>(TimeUntil(msEnd));
|
cmsNext = static_cast<int>(TimeUntil(msEnd));
|
||||||
|
|||||||
@ -36,12 +36,10 @@
|
|||||||
#include "rtc_base/checks.h"
|
#include "rtc_base/checks.h"
|
||||||
#include "rtc_base/deprecated/recursive_critical_section.h"
|
#include "rtc_base/deprecated/recursive_critical_section.h"
|
||||||
#include "rtc_base/location.h"
|
#include "rtc_base/location.h"
|
||||||
#include "rtc_base/message_handler.h"
|
|
||||||
#include "rtc_base/platform_thread_types.h"
|
#include "rtc_base/platform_thread_types.h"
|
||||||
#include "rtc_base/socket_server.h"
|
#include "rtc_base/socket_server.h"
|
||||||
#include "rtc_base/system/rtc_export.h"
|
#include "rtc_base/system/rtc_export.h"
|
||||||
#include "rtc_base/thread_annotations.h"
|
#include "rtc_base/thread_annotations.h"
|
||||||
#include "rtc_base/thread_message.h"
|
|
||||||
|
|
||||||
#if defined(WEBRTC_WIN)
|
#if defined(WEBRTC_WIN)
|
||||||
#include "rtc_base/win32.h"
|
#include "rtc_base/win32.h"
|
||||||
@ -267,26 +265,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
|
|||||||
// Processed. Normally, this would be true until IsQuitting() is true.
|
// Processed. Normally, this would be true until IsQuitting() is true.
|
||||||
virtual bool IsProcessingMessagesForTesting();
|
virtual bool IsProcessingMessagesForTesting();
|
||||||
|
|
||||||
// `time_sensitive` is deprecated and should always be false.
|
|
||||||
virtual void Post(const Location& posted_from,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id = 0,
|
|
||||||
MessageData* pdata = nullptr,
|
|
||||||
bool time_sensitive = false);
|
|
||||||
virtual void PostDelayed(const Location& posted_from,
|
|
||||||
int delay_ms,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id = 0,
|
|
||||||
MessageData* pdata = nullptr);
|
|
||||||
virtual void PostAt(const Location& posted_from,
|
|
||||||
int64_t run_at_ms,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id = 0,
|
|
||||||
MessageData* pdata = nullptr);
|
|
||||||
virtual void Clear(MessageHandler* phandler,
|
|
||||||
uint32_t id = MQID_ANY,
|
|
||||||
MessageList* removed = nullptr);
|
|
||||||
|
|
||||||
// Amount of time until the next message can be retrieved
|
// Amount of time until the next message can be retrieved
|
||||||
virtual int GetDelay();
|
virtual int GetDelay();
|
||||||
|
|
||||||
@ -427,54 +405,28 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
|
|||||||
|
|
||||||
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
|
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
|
||||||
// with the same trigger time are processed in num_ (FIFO) order.
|
// with the same trigger time are processed in num_ (FIFO) order.
|
||||||
class DelayedMessage {
|
struct DelayedMessage {
|
||||||
public:
|
|
||||||
DelayedMessage(int64_t delay,
|
|
||||||
int64_t run_time_ms,
|
|
||||||
uint32_t num,
|
|
||||||
const Message& msg)
|
|
||||||
: delay_ms_(delay),
|
|
||||||
run_time_ms_(run_time_ms),
|
|
||||||
message_number_(num),
|
|
||||||
msg_(msg) {}
|
|
||||||
|
|
||||||
bool operator<(const DelayedMessage& dmsg) const {
|
bool operator<(const DelayedMessage& dmsg) const {
|
||||||
return (dmsg.run_time_ms_ < run_time_ms_) ||
|
return (dmsg.run_time_ms < run_time_ms) ||
|
||||||
((dmsg.run_time_ms_ == run_time_ms_) &&
|
((dmsg.run_time_ms == run_time_ms) &&
|
||||||
(dmsg.message_number_ < message_number_));
|
(dmsg.message_number < message_number));
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t delay_ms_; // for debugging
|
int64_t delay_ms; // for debugging
|
||||||
int64_t run_time_ms_;
|
int64_t run_time_ms;
|
||||||
// Monotonicaly incrementing number used for ordering of messages
|
// Monotonicaly incrementing number used for ordering of messages
|
||||||
// targeted to execute at the same time.
|
// targeted to execute at the same time.
|
||||||
uint32_t message_number_;
|
uint32_t message_number;
|
||||||
Message msg_;
|
// std::priority_queue doesn't allow to extract elements, but functor
|
||||||
|
// is move-only and thus need to be changed when pulled out of the
|
||||||
|
// priority queue. That is ok because `functor` doesn't affect operator<
|
||||||
|
mutable absl::AnyInvocable<void() &&> functor;
|
||||||
};
|
};
|
||||||
|
|
||||||
class PriorityQueue : public std::priority_queue<DelayedMessage> {
|
|
||||||
public:
|
|
||||||
container_type& container() { return c; }
|
|
||||||
void reheap() { make_heap(c.begin(), c.end(), comp); }
|
|
||||||
};
|
|
||||||
|
|
||||||
void DoDelayPost(const Location& posted_from,
|
|
||||||
int64_t cmsDelay,
|
|
||||||
int64_t tstamp,
|
|
||||||
MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageData* pdata);
|
|
||||||
|
|
||||||
// Perform initialization, subclasses must call this from their constructor
|
// Perform initialization, subclasses must call this from their constructor
|
||||||
// if false was passed as init_queue to the Thread constructor.
|
// if false was passed as init_queue to the Thread constructor.
|
||||||
void DoInit();
|
void DoInit();
|
||||||
|
|
||||||
// Does not take any lock. Must be called either while holding crit_, or by
|
|
||||||
// the destructor (by definition, the latter has exclusive access).
|
|
||||||
void ClearInternal(MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
|
|
||||||
|
|
||||||
// Perform cleanup; subclasses must call this from the destructor,
|
// Perform cleanup; subclasses must call this from the destructor,
|
||||||
// and are not expected to actually hold the lock.
|
// and are not expected to actually hold the lock.
|
||||||
void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
|
void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
|
||||||
@ -497,13 +449,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
|
|||||||
static const int kSlowDispatchLoggingThreshold = 50; // 50 ms
|
static const int kSlowDispatchLoggingThreshold = 50; // 50 ms
|
||||||
|
|
||||||
// Get() will process I/O until:
|
// Get() will process I/O until:
|
||||||
// 1) A message is available (returns true)
|
// 1) A task is available (returns it)
|
||||||
// 2) cmsWait seconds have elapsed (returns false)
|
// 2) cmsWait seconds have elapsed (returns empty task)
|
||||||
// 3) Stop() is called (returns false)
|
// 3) Stop() is called (returns empty task)
|
||||||
virtual bool Get(Message* pmsg,
|
absl::AnyInvocable<void() &&> Get(int cmsWait);
|
||||||
int cmsWait = kForever,
|
void Dispatch(absl::AnyInvocable<void() &&> task);
|
||||||
bool process_io = true);
|
|
||||||
virtual void Dispatch(Message* pmsg);
|
|
||||||
|
|
||||||
// Sets the per-thread allow-blocking-calls flag and returns the previous
|
// Sets the per-thread allow-blocking-calls flag and returns the previous
|
||||||
// value. Must be called on this thread.
|
// value. Must be called on this thread.
|
||||||
@ -532,8 +482,8 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
|
|||||||
// Called by the ThreadManager when being unset as the current thread.
|
// Called by the ThreadManager when being unset as the current thread.
|
||||||
void ClearCurrentTaskQueue();
|
void ClearCurrentTaskQueue();
|
||||||
|
|
||||||
MessageList messages_ RTC_GUARDED_BY(crit_);
|
std::queue<absl::AnyInvocable<void() &&>> messages_ RTC_GUARDED_BY(crit_);
|
||||||
PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
|
std::priority_queue<DelayedMessage> delayed_messages_ RTC_GUARDED_BY(crit_);
|
||||||
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
|
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
|
||||||
#if RTC_DCHECK_IS_ON
|
#if RTC_DCHECK_IS_ON
|
||||||
uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0;
|
uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0;
|
||||||
|
|||||||
@ -553,32 +553,6 @@ TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) {
|
|||||||
ThreadManager::ProcessAllMessageQueuesForTesting();
|
ThreadManager::ProcessAllMessageQueuesForTesting();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
|
|
||||||
// messages.
|
|
||||||
TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) {
|
|
||||||
rtc::AutoThread main_thread;
|
|
||||||
Event entered_process_all_message_queues(true, false);
|
|
||||||
auto t = Thread::CreateWithSocketServer();
|
|
||||||
t->Start();
|
|
||||||
|
|
||||||
auto clearer = [&entered_process_all_message_queues] {
|
|
||||||
// Wait for event as a means to ensure Clear doesn't occur outside of
|
|
||||||
// ProcessAllMessageQueues. The event is set by a message posted to the
|
|
||||||
// main thread, which is guaranteed to be handled inside
|
|
||||||
// ProcessAllMessageQueues.
|
|
||||||
entered_process_all_message_queues.Wait(Event::kForever);
|
|
||||||
rtc::Thread::Current()->Clear(nullptr);
|
|
||||||
};
|
|
||||||
auto event_signaler = [&entered_process_all_message_queues] {
|
|
||||||
entered_process_all_message_queues.Set();
|
|
||||||
};
|
|
||||||
|
|
||||||
// Post messages (both delayed and non delayed) to both threads.
|
|
||||||
t->PostTask(clearer);
|
|
||||||
main_thread.PostTask(event_signaler);
|
|
||||||
ThreadManager::ProcessAllMessageQueuesForTesting();
|
|
||||||
}
|
|
||||||
|
|
||||||
void WaitAndSetEvent(Event* wait_event, Event* set_event) {
|
void WaitAndSetEvent(Event* wait_event, Event* set_event) {
|
||||||
wait_event->Wait(Event::kForever);
|
wait_event->Wait(Event::kForever);
|
||||||
set_event->Set();
|
set_event->Set();
|
||||||
|
|||||||
@ -77,35 +77,27 @@ void SimulatedThread::BlockingCall(rtc::FunctionView<void()> functor) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SimulatedThread::Post(const rtc::Location& posted_from,
|
void SimulatedThread::PostTask(absl::AnyInvocable<void() &&> task) {
|
||||||
rtc::MessageHandler* phandler,
|
rtc::Thread::PostTask(std::move(task));
|
||||||
uint32_t id,
|
|
||||||
rtc::MessageData* pdata,
|
|
||||||
bool time_sensitive) {
|
|
||||||
rtc::Thread::Post(posted_from, phandler, id, pdata, time_sensitive);
|
|
||||||
MutexLock lock(&lock_);
|
MutexLock lock(&lock_);
|
||||||
next_run_time_ = Timestamp::MinusInfinity();
|
next_run_time_ = Timestamp::MinusInfinity();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SimulatedThread::PostDelayed(const rtc::Location& posted_from,
|
void SimulatedThread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
|
||||||
int delay_ms,
|
TimeDelta delay) {
|
||||||
rtc::MessageHandler* phandler,
|
rtc::Thread::PostDelayedTask(std::move(task), delay);
|
||||||
uint32_t id,
|
|
||||||
rtc::MessageData* pdata) {
|
|
||||||
rtc::Thread::PostDelayed(posted_from, delay_ms, phandler, id, pdata);
|
|
||||||
MutexLock lock(&lock_);
|
MutexLock lock(&lock_);
|
||||||
next_run_time_ =
|
next_run_time_ =
|
||||||
std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis() + delay_ms));
|
std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis()) + delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SimulatedThread::PostAt(const rtc::Location& posted_from,
|
void SimulatedThread::PostDelayedHighPrecisionTask(
|
||||||
int64_t target_time_ms,
|
absl::AnyInvocable<void() &&> task,
|
||||||
rtc::MessageHandler* phandler,
|
TimeDelta delay) {
|
||||||
uint32_t id,
|
rtc::Thread::PostDelayedHighPrecisionTask(std::move(task), delay);
|
||||||
rtc::MessageData* pdata) {
|
|
||||||
rtc::Thread::PostAt(posted_from, target_time_ms, phandler, id, pdata);
|
|
||||||
MutexLock lock(&lock_);
|
MutexLock lock(&lock_);
|
||||||
next_run_time_ = std::min(next_run_time_, Timestamp::Millis(target_time_ms));
|
next_run_time_ =
|
||||||
|
std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis()) + delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SimulatedThread::Stop() {
|
void SimulatedThread::Stop() {
|
||||||
|
|||||||
@ -37,21 +37,11 @@ class SimulatedThread : public rtc::Thread,
|
|||||||
|
|
||||||
// Thread interface
|
// Thread interface
|
||||||
void BlockingCall(rtc::FunctionView<void()> functor) override;
|
void BlockingCall(rtc::FunctionView<void()> functor) override;
|
||||||
void Post(const rtc::Location& posted_from,
|
void PostTask(absl::AnyInvocable<void() &&> task) override;
|
||||||
rtc::MessageHandler* phandler,
|
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
|
||||||
uint32_t id,
|
TimeDelta delay) override;
|
||||||
rtc::MessageData* pdata,
|
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
|
||||||
bool time_sensitive) override;
|
TimeDelta delay) override;
|
||||||
void PostDelayed(const rtc::Location& posted_from,
|
|
||||||
int delay_ms,
|
|
||||||
rtc::MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
rtc::MessageData* pdata) override;
|
|
||||||
void PostAt(const rtc::Location& posted_from,
|
|
||||||
int64_t target_time_ms,
|
|
||||||
rtc::MessageHandler* phandler,
|
|
||||||
uint32_t id,
|
|
||||||
rtc::MessageData* pdata) override;
|
|
||||||
|
|
||||||
void Stop() override;
|
void Stop() override;
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user