diff --git a/modules/utility/source/process_thread_impl.cc b/modules/utility/source/process_thread_impl.cc index 73fc23400b..2274aaee91 100644 --- a/modules/utility/source/process_thread_impl.cc +++ b/modules/utility/source/process_thread_impl.cc @@ -171,7 +171,7 @@ void ProcessThreadImpl::PostDelayedTask(std::unique_ptr task, MutexLock lock(&mutex_); recalculate_wakeup_time = delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms; - delayed_tasks_.emplace(run_at_ms, std::move(task)); + delayed_tasks_.emplace(run_at_ms, sequence_id_++, std::move(task)); } if (recalculate_wakeup_time) { wake_up_.Set(); diff --git a/modules/utility/source/process_thread_impl.h b/modules/utility/source/process_thread_impl.h index e9a26eb96f..0dc7aff591 100644 --- a/modules/utility/source/process_thread_impl.h +++ b/modules/utility/source/process_thread_impl.h @@ -65,14 +65,22 @@ class ProcessThreadImpl : public ProcessThread { ModuleCallback& operator=(ModuleCallback&); }; struct DelayedTask { - DelayedTask(int64_t run_at_ms, std::unique_ptr task) - : run_at_ms(run_at_ms), task(task.release()) {} + DelayedTask(int64_t run_at_ms, + uint64_t sequence_id, + std::unique_ptr task) + : run_at_ms(run_at_ms), + sequence_id_(sequence_id), + task(task.release()) {} friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) { // Earliest DelayedTask should be at the top of the priority queue. - return lhs.run_at_ms > rhs.run_at_ms; + if (lhs.run_at_ms != rhs.run_at_ms) { + return lhs.run_at_ms > rhs.run_at_ms; + } + return lhs.sequence_id_ > rhs.sequence_id_; } int64_t run_at_ms; + uint64_t sequence_id_; // DelayedTask owns the `task`, but some delayed tasks must be removed from // the std::priority_queue, but mustn't be deleted. std::priority_queue does // not give non-const access to the values, so storing unique_ptr would @@ -101,7 +109,10 @@ class ProcessThreadImpl : public ProcessThread { // Set to true when calling Process, to allow reentrant calls to WakeUp. bool holds_mutex_ RTC_GUARDED_BY(this) = false; std::queue queue_; + // `std::priority_queue` does not guarantee stable sort. For delayed tasks + // with the same wakeup time, use `sequence_id_` to ensure FIFO ordering. std::priority_queue delayed_tasks_ RTC_GUARDED_BY(mutex_); + uint64_t sequence_id_ RTC_GUARDED_BY(mutex_) = 0; // The `stop_` flag is modified only by the construction thread, protected by // `thread_checker_`. It is read also by the spawned `thread_`. The latter // thread must take `mutex_` before access, and for thread safety, the