From 4785402475a793d705dc84bd21d2ae544afcaa4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niels=20M=C3=B6ller?= Date: Mon, 1 Mar 2021 11:31:33 +0100 Subject: [PATCH] Replace RecursiveCriticalSection with Mutex in ProcessThreadImpl Bug: webrtc:11567 Change-Id: I03961ddc55f29a01c3e466217222fd56ba51d895 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/208764 Reviewed-by: Tommi Commit-Queue: Niels Moller Cr-Commit-Position: refs/heads/master@{#33354} --- modules/utility/source/process_thread_impl.cc | 61 ++++++++++++++----- modules/utility/source/process_thread_impl.h | 16 +++-- 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/modules/utility/source/process_thread_impl.cc b/modules/utility/source/process_thread_impl.cc index 3ce4d86af0..dc2a0066e9 100644 --- a/modules/utility/source/process_thread_impl.cc +++ b/modules/utility/source/process_thread_impl.cc @@ -93,7 +93,7 @@ void ProcessThreadImpl::Stop() { { // Need to take lock, for synchronization with `thread_`. - rtc::CritScope lock(&lock_); + MutexLock lock(&mutex_); stop_ = true; } @@ -117,20 +117,46 @@ void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS { void ProcessThreadImpl::WakeUp(Module* module) { // Allowed to be called on any thread. - { - rtc::CritScope lock(&lock_); - for (ModuleCallback& m : modules_) { - if (m.module == module) - m.next_callback = kCallProcessImmediately; + auto holds_mutex = [this] { + if (!IsCurrent()) { + return false; } + RTC_DCHECK_RUN_ON(this); + return holds_mutex_; + }; + if (holds_mutex()) { + // Avoid locking if called on the ProcessThread, via a module's Process), + WakeUpNoLocks(module); + } else { + MutexLock lock(&mutex_); + WakeUpInternal(module); } wake_up_.Set(); } +// Must be called only indirectly from Process, which already holds the lock. +void ProcessThreadImpl::WakeUpNoLocks(Module* module) + RTC_NO_THREAD_SAFETY_ANALYSIS { + RTC_DCHECK_RUN_ON(this); + WakeUpInternal(module); +} + +void ProcessThreadImpl::WakeUpInternal(Module* module) { + for (ModuleCallback& m : modules_) { + if (m.module == module) + m.next_callback = kCallProcessImmediately; + } +} + void ProcessThreadImpl::PostTask(std::unique_ptr task) { - // Allowed to be called on any thread. + // Allowed to be called on any thread, except from a module's Process method. + if (IsCurrent()) { + RTC_DCHECK_RUN_ON(this); + RTC_DCHECK(!holds_mutex_) << "Calling ProcessThread::PostTask from " + "Module::Process is not supported"; + } { - rtc::CritScope lock(&lock_); + MutexLock lock(&mutex_); queue_.push(task.release()); } wake_up_.Set(); @@ -141,7 +167,7 @@ void ProcessThreadImpl::PostDelayedTask(std::unique_ptr task, int64_t run_at_ms = rtc::TimeMillis() + milliseconds; bool recalculate_wakeup_time; { - rtc::CritScope lock(&lock_); + MutexLock lock(&mutex_); recalculate_wakeup_time = delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms; delayed_tasks_.emplace(run_at_ms, std::move(task)); @@ -159,7 +185,7 @@ void ProcessThreadImpl::RegisterModule(Module* module, #if RTC_DCHECK_IS_ON { // Catch programmer error. - rtc::CritScope lock(&lock_); + MutexLock lock(&mutex_); for (const ModuleCallback& mc : modules_) { RTC_DCHECK(mc.module != module) << "Already registered here: " << mc.location.ToString() @@ -177,7 +203,7 @@ void ProcessThreadImpl::RegisterModule(Module* module, module->ProcessThreadAttached(this); { - rtc::CritScope lock(&lock_); + MutexLock lock(&mutex_); modules_.push_back(ModuleCallback(module, from)); } @@ -192,7 +218,7 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) { RTC_DCHECK(module); { - rtc::CritScope lock(&lock_); + MutexLock lock(&mutex_); modules_.remove_if( [&module](const ModuleCallback& m) { return m.module == module; }); } @@ -213,9 +239,9 @@ bool ProcessThreadImpl::Process() { TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_); int64_t now = rtc::TimeMillis(); int64_t next_checkpoint = now + (1000 * 60); - + RTC_DCHECK_RUN_ON(this); { - rtc::CritScope lock(&lock_); + MutexLock lock(&mutex_); if (stop_) return false; for (ModuleCallback& m : modules_) { @@ -226,6 +252,8 @@ bool ProcessThreadImpl::Process() { if (m.next_callback == 0) m.next_callback = GetNextCallbackTime(m.module, now); + // Set to true for the duration of the calls to modules' Process(). + holds_mutex_ = true; if (m.next_callback <= now || m.next_callback == kCallProcessImmediately) { { @@ -240,6 +268,7 @@ bool ProcessThreadImpl::Process() { int64_t new_now = rtc::TimeMillis(); m.next_callback = GetNextCallbackTime(m.module, new_now); } + holds_mutex_ = false; if (m.next_callback < next_checkpoint) next_checkpoint = m.next_callback; @@ -258,11 +287,11 @@ bool ProcessThreadImpl::Process() { while (!queue_.empty()) { QueuedTask* task = queue_.front(); queue_.pop(); - lock_.Leave(); + mutex_.Unlock(); if (task->Run()) { delete task; } - lock_.Enter(); + mutex_.Lock(); } } diff --git a/modules/utility/source/process_thread_impl.h b/modules/utility/source/process_thread_impl.h index ef763b8faf..b83994cef8 100644 --- a/modules/utility/source/process_thread_impl.h +++ b/modules/utility/source/process_thread_impl.h @@ -87,27 +87,31 @@ class ProcessThreadImpl : public ProcessThread { void Delete() override; // The part of Stop processing that doesn't need any locking. void StopNoLocks(); + void WakeUpNoLocks(Module* module); + void WakeUpInternal(Module* module) RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_); // Members protected by this mutex are accessed on the constructor thread and // on the spawned process thread, and locking is needed only while the process // thread is running. - rtc::RecursiveCriticalSection lock_; + Mutex mutex_; SequenceChecker thread_checker_; rtc::Event wake_up_; // TODO(pbos): Remove unique_ptr and stop recreating the thread. std::unique_ptr thread_; - ModuleList modules_ RTC_GUARDED_BY(lock_); + ModuleList modules_ RTC_GUARDED_BY(mutex_); + // Set to true when calling Process, to allow reentrant calls to WakeUp. + bool holds_mutex_ RTC_GUARDED_BY(this) = false; std::queue queue_; - std::priority_queue delayed_tasks_ RTC_GUARDED_BY(lock_); + std::priority_queue delayed_tasks_ RTC_GUARDED_BY(mutex_); // The `stop_` flag is modified only by the construction thread, protected by // `thread_checker_`. It is read also by the spawned `thread_`. The latter - // thread must take `lock_` before access, and for thread safety, the - // constructor thread needs to take `lock_` when it modifies `stop_` and + // thread must take `mutex_` before access, and for thread safety, the + // constructor thread needs to take `mutex_` when it modifies `stop_` and // `thread_` is running. Annotations like RTC_GUARDED_BY doesn't support this // usage pattern. - bool stop_ RTC_GUARDED_BY(lock_); + bool stop_ RTC_GUARDED_BY(mutex_); const char* thread_name_; };