Replace RecursiveCriticalSection with Mutex in ProcessThreadImpl

Bug: webrtc:11567
Change-Id: I03961ddc55f29a01c3e466217222fd56ba51d895
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/208764
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33354}
This commit is contained in:
Niels Möller 2021-03-01 11:31:33 +01:00 committed by Commit Bot
parent 1e2da374b3
commit 4785402475
2 changed files with 55 additions and 22 deletions

View File

@ -93,7 +93,7 @@ void ProcessThreadImpl::Stop() {
{
// Need to take lock, for synchronization with `thread_`.
rtc::CritScope lock(&lock_);
MutexLock lock(&mutex_);
stop_ = true;
}
@ -117,20 +117,46 @@ void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
void ProcessThreadImpl::WakeUp(Module* module) {
// Allowed to be called on any thread.
{
rtc::CritScope lock(&lock_);
for (ModuleCallback& m : modules_) {
if (m.module == module)
m.next_callback = kCallProcessImmediately;
auto holds_mutex = [this] {
if (!IsCurrent()) {
return false;
}
RTC_DCHECK_RUN_ON(this);
return holds_mutex_;
};
if (holds_mutex()) {
// Avoid locking if called on the ProcessThread, via a module's Process),
WakeUpNoLocks(module);
} else {
MutexLock lock(&mutex_);
WakeUpInternal(module);
}
wake_up_.Set();
}
// Must be called only indirectly from Process, which already holds the lock.
void ProcessThreadImpl::WakeUpNoLocks(Module* module)
RTC_NO_THREAD_SAFETY_ANALYSIS {
RTC_DCHECK_RUN_ON(this);
WakeUpInternal(module);
}
void ProcessThreadImpl::WakeUpInternal(Module* module) {
for (ModuleCallback& m : modules_) {
if (m.module == module)
m.next_callback = kCallProcessImmediately;
}
}
void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
// Allowed to be called on any thread.
// Allowed to be called on any thread, except from a module's Process method.
if (IsCurrent()) {
RTC_DCHECK_RUN_ON(this);
RTC_DCHECK(!holds_mutex_) << "Calling ProcessThread::PostTask from "
"Module::Process is not supported";
}
{
rtc::CritScope lock(&lock_);
MutexLock lock(&mutex_);
queue_.push(task.release());
}
wake_up_.Set();
@ -141,7 +167,7 @@ void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
bool recalculate_wakeup_time;
{
rtc::CritScope lock(&lock_);
MutexLock lock(&mutex_);
recalculate_wakeup_time =
delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
delayed_tasks_.emplace(run_at_ms, std::move(task));
@ -159,7 +185,7 @@ void ProcessThreadImpl::RegisterModule(Module* module,
#if RTC_DCHECK_IS_ON
{
// Catch programmer error.
rtc::CritScope lock(&lock_);
MutexLock lock(&mutex_);
for (const ModuleCallback& mc : modules_) {
RTC_DCHECK(mc.module != module)
<< "Already registered here: " << mc.location.ToString()
@ -177,7 +203,7 @@ void ProcessThreadImpl::RegisterModule(Module* module,
module->ProcessThreadAttached(this);
{
rtc::CritScope lock(&lock_);
MutexLock lock(&mutex_);
modules_.push_back(ModuleCallback(module, from));
}
@ -192,7 +218,7 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) {
RTC_DCHECK(module);
{
rtc::CritScope lock(&lock_);
MutexLock lock(&mutex_);
modules_.remove_if(
[&module](const ModuleCallback& m) { return m.module == module; });
}
@ -213,9 +239,9 @@ bool ProcessThreadImpl::Process() {
TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
int64_t now = rtc::TimeMillis();
int64_t next_checkpoint = now + (1000 * 60);
RTC_DCHECK_RUN_ON(this);
{
rtc::CritScope lock(&lock_);
MutexLock lock(&mutex_);
if (stop_)
return false;
for (ModuleCallback& m : modules_) {
@ -226,6 +252,8 @@ bool ProcessThreadImpl::Process() {
if (m.next_callback == 0)
m.next_callback = GetNextCallbackTime(m.module, now);
// Set to true for the duration of the calls to modules' Process().
holds_mutex_ = true;
if (m.next_callback <= now ||
m.next_callback == kCallProcessImmediately) {
{
@ -240,6 +268,7 @@ bool ProcessThreadImpl::Process() {
int64_t new_now = rtc::TimeMillis();
m.next_callback = GetNextCallbackTime(m.module, new_now);
}
holds_mutex_ = false;
if (m.next_callback < next_checkpoint)
next_checkpoint = m.next_callback;
@ -258,11 +287,11 @@ bool ProcessThreadImpl::Process() {
while (!queue_.empty()) {
QueuedTask* task = queue_.front();
queue_.pop();
lock_.Leave();
mutex_.Unlock();
if (task->Run()) {
delete task;
}
lock_.Enter();
mutex_.Lock();
}
}

View File

@ -87,27 +87,31 @@ class ProcessThreadImpl : public ProcessThread {
void Delete() override;
// The part of Stop processing that doesn't need any locking.
void StopNoLocks();
void WakeUpNoLocks(Module* module);
void WakeUpInternal(Module* module) RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Members protected by this mutex are accessed on the constructor thread and
// on the spawned process thread, and locking is needed only while the process
// thread is running.
rtc::RecursiveCriticalSection lock_;
Mutex mutex_;
SequenceChecker thread_checker_;
rtc::Event wake_up_;
// TODO(pbos): Remove unique_ptr and stop recreating the thread.
std::unique_ptr<rtc::PlatformThread> thread_;
ModuleList modules_ RTC_GUARDED_BY(lock_);
ModuleList modules_ RTC_GUARDED_BY(mutex_);
// Set to true when calling Process, to allow reentrant calls to WakeUp.
bool holds_mutex_ RTC_GUARDED_BY(this) = false;
std::queue<QueuedTask*> queue_;
std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(lock_);
std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(mutex_);
// The `stop_` flag is modified only by the construction thread, protected by
// `thread_checker_`. It is read also by the spawned `thread_`. The latter
// thread must take `lock_` before access, and for thread safety, the
// constructor thread needs to take `lock_` when it modifies `stop_` and
// thread must take `mutex_` before access, and for thread safety, the
// constructor thread needs to take `mutex_` when it modifies `stop_` and
// `thread_` is running. Annotations like RTC_GUARDED_BY doesn't support this
// usage pattern.
bool stop_ RTC_GUARDED_BY(lock_);
bool stop_ RTC_GUARDED_BY(mutex_);
const char* thread_name_;
};