From 7b6add35ab47f61daeb1bfeeac762543fff9d8c4 Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Fri, 29 Mar 2019 10:34:26 +0100 Subject: [PATCH] Makes simulated time controller deterministic. Replacing sets of pointers (that will depend on allocation addresses) with vectors and lists. This allows deterministic execution. Also doing some cleanup of the task queue configuration, ensuring that the task queue states is not set outside of actual task queues. Bug: webrtc:10365 Change-Id: I1fad621c7b1ba0bbb33db8c3bd69cb3a1e212b9c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/130460 Commit-Queue: Sebastian Jansson Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#27364} --- .../simulated_time_controller.cc | 117 +++++++++++------- .../simulated_time_controller.h | 3 +- 2 files changed, 74 insertions(+), 46 deletions(-) diff --git a/test/time_controller/simulated_time_controller.cc b/test/time_controller/simulated_time_controller.cc index 144d587db9..10592edaf4 100644 --- a/test/time_controller/simulated_time_controller.cc +++ b/test/time_controller/simulated_time_controller.cc @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -22,6 +21,17 @@ #include "absl/strings/string_view.h" namespace webrtc { +namespace { +// Helper function to remove from a std container by value. +template +bool RemoveByValue(C& vec, typename C::value_type val) { + auto it = std::find(vec.begin(), vec.end(), val); + if (it == vec.end()) + return false; + vec.erase(it); + return true; +} +} // namespace namespace sim_time_impl { class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase { @@ -53,12 +63,15 @@ class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase { void WakeUp(Module* module) override; void RegisterModule(Module* module, const rtc::Location& from) override; void DeRegisterModule(Module* module) override; + // Promoted to public for use in SimulatedTimeControllerImpl::YieldExecution. + using CurrentTaskQueueSetter = TaskQueueBase::CurrentTaskQueueSetter; private: Timestamp GetCurrentTime() const { return handler_->CurrentTime(); } void RunReadyTasks(Timestamp at_time) RTC_LOCKS_EXCLUDED(lock_); void RunReadyModules(Timestamp at_time) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); void UpdateNextRunTime() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + Timestamp GetNextTime(Module* module, Timestamp at_time); SimulatedTimeControllerImpl* const handler_; const std::string name_; @@ -66,13 +79,14 @@ class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase { rtc::CriticalSection lock_; std::deque> ready_tasks_ RTC_GUARDED_BY(lock_); - std::multimap> delayed_tasks_ + std::map>> delayed_tasks_ RTC_GUARDED_BY(lock_); bool process_thread_running_ RTC_GUARDED_BY(lock_) = false; - std::set stopped_modules_ RTC_GUARDED_BY(lock_); - std::set ready_modules_ RTC_GUARDED_BY(lock_); - std::multimap delayed_modules_ RTC_GUARDED_BY(lock_); + std::vector stopped_modules_ RTC_GUARDED_BY(lock_); + std::vector ready_modules_ RTC_GUARDED_BY(lock_); + std::map> delayed_modules_ + RTC_GUARDED_BY(lock_); Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity(); }; @@ -85,14 +99,18 @@ Timestamp SimulatedSequenceRunner::GetNextRunTime() const { void SimulatedSequenceRunner::UpdateReady(Timestamp at_time) { rtc::CritScope lock(&lock_); for (auto it = delayed_tasks_.begin(); - it != delayed_tasks_.end() && it->first <= at_time;) { - ready_tasks_.emplace_back(std::move(it->second)); - it = delayed_tasks_.erase(it); + it != delayed_tasks_.end() && it->first <= at_time; + it = delayed_tasks_.erase(it)) { + for (auto& task : it->second) { + ready_tasks_.emplace_back(std::move(task)); + } } for (auto it = delayed_modules_.begin(); - it != delayed_modules_.end() && it->first <= at_time;) { - ready_modules_.insert(it->second); - it = delayed_modules_.erase(it); + it != delayed_modules_.end() && it->first <= at_time; + it = delayed_modules_.erase(it)) { + for (auto module : it->second) { + ready_modules_.push_back(module); + } } } @@ -136,9 +154,7 @@ void SimulatedSequenceRunner::RunReadyModules(Timestamp at_time) { CurrentTaskQueueSetter set_current(this); for (auto* module : ready_modules_) { module->Process(); - Timestamp next_run_time = - at_time + TimeDelta::ms(module->TimeUntilNextProcess()); - delayed_modules_.emplace(next_run_time, module); + delayed_modules_[GetNextTime(module, at_time)].push_back(module); } } ready_modules_.clear(); @@ -167,12 +183,12 @@ void SimulatedSequenceRunner::PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) { rtc::CritScope lock(&lock_); Timestamp target_time = GetCurrentTime() + TimeDelta::ms(milliseconds); - delayed_tasks_.emplace(target_time, std::move(task)); + delayed_tasks_[target_time].push_back(std::move(task)); next_run_time_ = std::min(next_run_time_, target_time); } void SimulatedSequenceRunner::Start() { - std::set starting; + std::vector starting; { rtc::CritScope lock(&lock_); if (process_thread_running_) @@ -186,23 +202,24 @@ void SimulatedSequenceRunner::Start() { Timestamp at_time = GetCurrentTime(); rtc::CritScope lock(&lock_); for (auto& module : starting) - delayed_modules_.insert( - {at_time + TimeDelta::ms(module->TimeUntilNextProcess()), module}); + delayed_modules_[GetNextTime(module, at_time)].push_back(module); UpdateNextRunTime(); } void SimulatedSequenceRunner::Stop() { - std::set stopping; + std::vector stopping; { rtc::CritScope lock(&lock_); process_thread_running_ = false; for (auto* ready : ready_modules_) - stopped_modules_.insert(ready); + stopped_modules_.push_back(ready); ready_modules_.clear(); - for (auto& delayed : delayed_modules_) - stopped_modules_.insert(delayed.second); + for (auto& delayed : delayed_modules_) { + for (auto mod : delayed.second) + stopped_modules_.push_back(mod); + } delayed_modules_.clear(); stopping = stopped_modules_; @@ -215,18 +232,16 @@ void SimulatedSequenceRunner::WakeUp(Module* module) { rtc::CritScope lock(&lock_); // If we already are planning to run this module as soon as possible, we don't // need to do anything. - if (ready_modules_.find(module) != ready_modules_.end()) - return; + for (auto mod : ready_modules_) + if (mod == module) + return; for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) { - if (it->second == module) { - delayed_modules_.erase(it); + if (RemoveByValue(it->second, module)) break; - } } - Timestamp next_time = - GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess()); - delayed_modules_.insert({next_time, module}); + Timestamp next_time = GetNextTime(module, GetCurrentTime()); + delayed_modules_[next_time].push_back(module); next_run_time_ = std::min(next_run_time_, next_time); } @@ -235,11 +250,10 @@ void SimulatedSequenceRunner::RegisterModule(Module* module, module->ProcessThreadAttached(this); rtc::CritScope lock(&lock_); if (!process_thread_running_) { - stopped_modules_.insert(module); + stopped_modules_.push_back(module); } else { - Timestamp next_time = - GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess()); - delayed_modules_.insert({next_time, module}); + Timestamp next_time = GetNextTime(module, GetCurrentTime()); + delayed_modules_[next_time].push_back(module); next_run_time_ = std::min(next_run_time_, next_time); } } @@ -249,14 +263,13 @@ void SimulatedSequenceRunner::DeRegisterModule(Module* module) { { rtc::CritScope lock(&lock_); if (!process_thread_running_) { - stopped_modules_.erase(module); + RemoveByValue(stopped_modules_, module); } else { - ready_modules_.erase(module); - for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); - ++it) { - if (it->second == module) { - delayed_modules_.erase(it); - break; + bool removed = RemoveByValue(ready_modules_, module); + if (!removed) { + for (auto& pair : delayed_modules_) { + if (RemoveByValue(pair.second, module)) + break; } } } @@ -266,6 +279,12 @@ void SimulatedSequenceRunner::DeRegisterModule(Module* module) { module->ProcessThreadAttached(nullptr); } +Timestamp SimulatedSequenceRunner::GetNextTime(Module* module, + Timestamp at_time) { + CurrentTaskQueueSetter set_current(this); + return at_time + TimeDelta::ms(module->TimeUntilNextProcess()); +} + SimulatedTimeControllerImpl::SimulatedTimeControllerImpl(Timestamp start_time) : thread_id_(rtc::CurrentThreadId()), current_time_(start_time) {} @@ -280,7 +299,7 @@ SimulatedTimeControllerImpl::CreateTaskQueue( auto task_queue = std::unique_ptr( new SimulatedSequenceRunner(mutable_this, name)); rtc::CritScope lock(&mutable_this->lock_); - mutable_this->runners_.insert(task_queue.get()); + mutable_this->runners_.push_back(task_queue.get()); return task_queue; } @@ -289,7 +308,7 @@ std::unique_ptr SimulatedTimeControllerImpl::CreateProcessThread( rtc::CritScope lock(&lock_); auto process_thread = absl::make_unique(this, thread_name); - runners_.insert(process_thread.get()); + runners_.push_back(process_thread.get()); return process_thread; } @@ -308,12 +327,18 @@ SimulatedTimeControllerImpl::GetNextReadyRunner(Timestamp current_time) { void SimulatedTimeControllerImpl::YieldExecution() { if (rtc::CurrentThreadId() == thread_id_) { + TaskQueueBase* yielding_from = TaskQueueBase::Current(); + // Since we might continue execution on a process thread, we should reset + // the thread local task queue reference. This ensures that thread checkers + // won't think we are executing on the yielding task queue. It also ensure + // that TaskQueueBase::Current() won't return the yielding task queue. + SimulatedSequenceRunner::CurrentTaskQueueSetter reset_queue(nullptr); RTC_DCHECK_RUN_ON(&thread_checker_); // When we yield, we don't want to risk executing further tasks on the // currently executing task queue. If there's a ready task that also yields, // it's added to this set as well and only tasks on the remaining task // queues are executed. - auto inserted = yielded_.insert(TaskQueueBase::Current()); + auto inserted = yielded_.insert(yielding_from); RTC_DCHECK(inserted.second); RunReadyRunners(); yielded_.erase(inserted.first); @@ -322,6 +347,7 @@ void SimulatedTimeControllerImpl::YieldExecution() { void SimulatedTimeControllerImpl::RunReadyRunners() { RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_EQ(rtc::CurrentThreadId(), thread_id_); Timestamp current_time = CurrentTime(); // We repeat until we have no ready left to handle tasks posted by ready // runners. @@ -362,7 +388,8 @@ void SimulatedTimeControllerImpl::AdvanceTime(Timestamp target_time) { void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) { rtc::CritScope lock(&lock_); - RTC_CHECK(runners_.erase(runner)); + bool removed = RemoveByValue(runners_, runner); + RTC_CHECK(removed); } } // namespace sim_time_impl diff --git a/test/time_controller/simulated_time_controller.h b/test/time_controller/simulated_time_controller.h index 38a99848be..6837643412 100644 --- a/test/time_controller/simulated_time_controller.h +++ b/test/time_controller/simulated_time_controller.h @@ -10,6 +10,7 @@ #ifndef TEST_TIME_CONTROLLER_SIMULATED_TIME_CONTROLLER_H_ #define TEST_TIME_CONTROLLER_SIMULATED_TIME_CONTROLLER_H_ +#include #include #include #include @@ -68,7 +69,7 @@ class SimulatedTimeControllerImpl : public TaskQueueFactory, rtc::CriticalSection time_lock_; Timestamp current_time_ RTC_GUARDED_BY(time_lock_); rtc::CriticalSection lock_; - std::unordered_set runners_ RTC_GUARDED_BY(lock_); + std::vector runners_ RTC_GUARDED_BY(lock_); // Task queues on which YieldExecution has been called. std::unordered_set yielded_ RTC_GUARDED_BY(thread_checker_); };