From af0a6b34e336ba7c0b0e0371875b2ba6db6ac9a8 Mon Sep 17 00:00:00 2001 From: Jianhui Dai Date: Wed, 9 Mar 2022 11:06:34 +0800 Subject: [PATCH] Ensure FIFO order for delayed tasks in `ProcessThreadImpl` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TaskQueue posts delayed task in milliseconds precision. If delayed tasks have the same wakeup time in queue, we should ensure they are waked up in FIFO order. E.g., call `PostDelayedTask(task-i, 0)` in a loop, we expect `task-i` is waked up as enqueue order. Co-Author: jiahe.zhang@intel.com Bug: webrtc:13761 Change-Id: I3bc87c2d251f8dffee868a012e828fd42e783afc Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/251960 Reviewed-by: Chen Xing Reviewed-by: Markus Handell Reviewed-by: Stefan Holmer Reviewed-by: Henrik Boström Commit-Queue: Henrik Boström Cr-Commit-Position: refs/heads/main@{#36582} --- modules/utility/source/process_thread_impl.cc | 2 +- modules/utility/source/process_thread_impl.h | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/modules/utility/source/process_thread_impl.cc b/modules/utility/source/process_thread_impl.cc index 73fc23400b..2274aaee91 100644 --- a/modules/utility/source/process_thread_impl.cc +++ b/modules/utility/source/process_thread_impl.cc @@ -171,7 +171,7 @@ void ProcessThreadImpl::PostDelayedTask(std::unique_ptr task, MutexLock lock(&mutex_); recalculate_wakeup_time = delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms; - delayed_tasks_.emplace(run_at_ms, std::move(task)); + delayed_tasks_.emplace(run_at_ms, sequence_id_++, std::move(task)); } if (recalculate_wakeup_time) { wake_up_.Set(); diff --git a/modules/utility/source/process_thread_impl.h b/modules/utility/source/process_thread_impl.h index e9a26eb96f..0dc7aff591 100644 --- a/modules/utility/source/process_thread_impl.h +++ b/modules/utility/source/process_thread_impl.h @@ -65,14 +65,22 @@ class ProcessThreadImpl : public ProcessThread { ModuleCallback& operator=(ModuleCallback&); }; struct DelayedTask { - DelayedTask(int64_t run_at_ms, std::unique_ptr task) - : run_at_ms(run_at_ms), task(task.release()) {} + DelayedTask(int64_t run_at_ms, + uint64_t sequence_id, + std::unique_ptr task) + : run_at_ms(run_at_ms), + sequence_id_(sequence_id), + task(task.release()) {} friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) { // Earliest DelayedTask should be at the top of the priority queue. - return lhs.run_at_ms > rhs.run_at_ms; + if (lhs.run_at_ms != rhs.run_at_ms) { + return lhs.run_at_ms > rhs.run_at_ms; + } + return lhs.sequence_id_ > rhs.sequence_id_; } int64_t run_at_ms; + uint64_t sequence_id_; // DelayedTask owns the `task`, but some delayed tasks must be removed from // the std::priority_queue, but mustn't be deleted. std::priority_queue does // not give non-const access to the values, so storing unique_ptr would @@ -101,7 +109,10 @@ class ProcessThreadImpl : public ProcessThread { // Set to true when calling Process, to allow reentrant calls to WakeUp. bool holds_mutex_ RTC_GUARDED_BY(this) = false; std::queue queue_; + // `std::priority_queue` does not guarantee stable sort. For delayed tasks + // with the same wakeup time, use `sequence_id_` to ensure FIFO ordering. std::priority_queue delayed_tasks_ RTC_GUARDED_BY(mutex_); + uint64_t sequence_id_ RTC_GUARDED_BY(mutex_) = 0; // The `stop_` flag is modified only by the construction thread, protected by // `thread_checker_`. It is read also by the spawned `thread_`. The latter // thread must take `mutex_` before access, and for thread safety, the