diff --git a/test/time_controller/simulated_time_controller.cc b/test/time_controller/simulated_time_controller.cc index 144d587db9..10592edaf4 100644 --- a/test/time_controller/simulated_time_controller.cc +++ b/test/time_controller/simulated_time_controller.cc @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -22,6 +21,17 @@ #include "absl/strings/string_view.h" namespace webrtc { +namespace { +// Helper function to remove from a std container by value. +template +bool RemoveByValue(C& vec, typename C::value_type val) { + auto it = std::find(vec.begin(), vec.end(), val); + if (it == vec.end()) + return false; + vec.erase(it); + return true; +} +} // namespace namespace sim_time_impl { class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase { @@ -53,12 +63,15 @@ class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase { void WakeUp(Module* module) override; void RegisterModule(Module* module, const rtc::Location& from) override; void DeRegisterModule(Module* module) override; + // Promoted to public for use in SimulatedTimeControllerImpl::YieldExecution. + using CurrentTaskQueueSetter = TaskQueueBase::CurrentTaskQueueSetter; private: Timestamp GetCurrentTime() const { return handler_->CurrentTime(); } void RunReadyTasks(Timestamp at_time) RTC_LOCKS_EXCLUDED(lock_); void RunReadyModules(Timestamp at_time) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); void UpdateNextRunTime() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + Timestamp GetNextTime(Module* module, Timestamp at_time); SimulatedTimeControllerImpl* const handler_; const std::string name_; @@ -66,13 +79,14 @@ class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase { rtc::CriticalSection lock_; std::deque> ready_tasks_ RTC_GUARDED_BY(lock_); - std::multimap> delayed_tasks_ + std::map>> delayed_tasks_ RTC_GUARDED_BY(lock_); bool process_thread_running_ RTC_GUARDED_BY(lock_) = false; - std::set stopped_modules_ RTC_GUARDED_BY(lock_); - std::set ready_modules_ RTC_GUARDED_BY(lock_); - std::multimap delayed_modules_ RTC_GUARDED_BY(lock_); + std::vector stopped_modules_ RTC_GUARDED_BY(lock_); + std::vector ready_modules_ RTC_GUARDED_BY(lock_); + std::map> delayed_modules_ + RTC_GUARDED_BY(lock_); Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity(); }; @@ -85,14 +99,18 @@ Timestamp SimulatedSequenceRunner::GetNextRunTime() const { void SimulatedSequenceRunner::UpdateReady(Timestamp at_time) { rtc::CritScope lock(&lock_); for (auto it = delayed_tasks_.begin(); - it != delayed_tasks_.end() && it->first <= at_time;) { - ready_tasks_.emplace_back(std::move(it->second)); - it = delayed_tasks_.erase(it); + it != delayed_tasks_.end() && it->first <= at_time; + it = delayed_tasks_.erase(it)) { + for (auto& task : it->second) { + ready_tasks_.emplace_back(std::move(task)); + } } for (auto it = delayed_modules_.begin(); - it != delayed_modules_.end() && it->first <= at_time;) { - ready_modules_.insert(it->second); - it = delayed_modules_.erase(it); + it != delayed_modules_.end() && it->first <= at_time; + it = delayed_modules_.erase(it)) { + for (auto module : it->second) { + ready_modules_.push_back(module); + } } } @@ -136,9 +154,7 @@ void SimulatedSequenceRunner::RunReadyModules(Timestamp at_time) { CurrentTaskQueueSetter set_current(this); for (auto* module : ready_modules_) { module->Process(); - Timestamp next_run_time = - at_time + TimeDelta::ms(module->TimeUntilNextProcess()); - delayed_modules_.emplace(next_run_time, module); + delayed_modules_[GetNextTime(module, at_time)].push_back(module); } } ready_modules_.clear(); @@ -167,12 +183,12 @@ void SimulatedSequenceRunner::PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) { rtc::CritScope lock(&lock_); Timestamp target_time = GetCurrentTime() + TimeDelta::ms(milliseconds); - delayed_tasks_.emplace(target_time, std::move(task)); + delayed_tasks_[target_time].push_back(std::move(task)); next_run_time_ = std::min(next_run_time_, target_time); } void SimulatedSequenceRunner::Start() { - std::set starting; + std::vector starting; { rtc::CritScope lock(&lock_); if (process_thread_running_) @@ -186,23 +202,24 @@ void SimulatedSequenceRunner::Start() { Timestamp at_time = GetCurrentTime(); rtc::CritScope lock(&lock_); for (auto& module : starting) - delayed_modules_.insert( - {at_time + TimeDelta::ms(module->TimeUntilNextProcess()), module}); + delayed_modules_[GetNextTime(module, at_time)].push_back(module); UpdateNextRunTime(); } void SimulatedSequenceRunner::Stop() { - std::set stopping; + std::vector stopping; { rtc::CritScope lock(&lock_); process_thread_running_ = false; for (auto* ready : ready_modules_) - stopped_modules_.insert(ready); + stopped_modules_.push_back(ready); ready_modules_.clear(); - for (auto& delayed : delayed_modules_) - stopped_modules_.insert(delayed.second); + for (auto& delayed : delayed_modules_) { + for (auto mod : delayed.second) + stopped_modules_.push_back(mod); + } delayed_modules_.clear(); stopping = stopped_modules_; @@ -215,18 +232,16 @@ void SimulatedSequenceRunner::WakeUp(Module* module) { rtc::CritScope lock(&lock_); // If we already are planning to run this module as soon as possible, we don't // need to do anything. - if (ready_modules_.find(module) != ready_modules_.end()) - return; + for (auto mod : ready_modules_) + if (mod == module) + return; for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) { - if (it->second == module) { - delayed_modules_.erase(it); + if (RemoveByValue(it->second, module)) break; - } } - Timestamp next_time = - GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess()); - delayed_modules_.insert({next_time, module}); + Timestamp next_time = GetNextTime(module, GetCurrentTime()); + delayed_modules_[next_time].push_back(module); next_run_time_ = std::min(next_run_time_, next_time); } @@ -235,11 +250,10 @@ void SimulatedSequenceRunner::RegisterModule(Module* module, module->ProcessThreadAttached(this); rtc::CritScope lock(&lock_); if (!process_thread_running_) { - stopped_modules_.insert(module); + stopped_modules_.push_back(module); } else { - Timestamp next_time = - GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess()); - delayed_modules_.insert({next_time, module}); + Timestamp next_time = GetNextTime(module, GetCurrentTime()); + delayed_modules_[next_time].push_back(module); next_run_time_ = std::min(next_run_time_, next_time); } } @@ -249,14 +263,13 @@ void SimulatedSequenceRunner::DeRegisterModule(Module* module) { { rtc::CritScope lock(&lock_); if (!process_thread_running_) { - stopped_modules_.erase(module); + RemoveByValue(stopped_modules_, module); } else { - ready_modules_.erase(module); - for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); - ++it) { - if (it->second == module) { - delayed_modules_.erase(it); - break; + bool removed = RemoveByValue(ready_modules_, module); + if (!removed) { + for (auto& pair : delayed_modules_) { + if (RemoveByValue(pair.second, module)) + break; } } } @@ -266,6 +279,12 @@ void SimulatedSequenceRunner::DeRegisterModule(Module* module) { module->ProcessThreadAttached(nullptr); } +Timestamp SimulatedSequenceRunner::GetNextTime(Module* module, + Timestamp at_time) { + CurrentTaskQueueSetter set_current(this); + return at_time + TimeDelta::ms(module->TimeUntilNextProcess()); +} + SimulatedTimeControllerImpl::SimulatedTimeControllerImpl(Timestamp start_time) : thread_id_(rtc::CurrentThreadId()), current_time_(start_time) {} @@ -280,7 +299,7 @@ SimulatedTimeControllerImpl::CreateTaskQueue( auto task_queue = std::unique_ptr( new SimulatedSequenceRunner(mutable_this, name)); rtc::CritScope lock(&mutable_this->lock_); - mutable_this->runners_.insert(task_queue.get()); + mutable_this->runners_.push_back(task_queue.get()); return task_queue; } @@ -289,7 +308,7 @@ std::unique_ptr SimulatedTimeControllerImpl::CreateProcessThread( rtc::CritScope lock(&lock_); auto process_thread = absl::make_unique(this, thread_name); - runners_.insert(process_thread.get()); + runners_.push_back(process_thread.get()); return process_thread; } @@ -308,12 +327,18 @@ SimulatedTimeControllerImpl::GetNextReadyRunner(Timestamp current_time) { void SimulatedTimeControllerImpl::YieldExecution() { if (rtc::CurrentThreadId() == thread_id_) { + TaskQueueBase* yielding_from = TaskQueueBase::Current(); + // Since we might continue execution on a process thread, we should reset + // the thread local task queue reference. This ensures that thread checkers + // won't think we are executing on the yielding task queue. It also ensure + // that TaskQueueBase::Current() won't return the yielding task queue. + SimulatedSequenceRunner::CurrentTaskQueueSetter reset_queue(nullptr); RTC_DCHECK_RUN_ON(&thread_checker_); // When we yield, we don't want to risk executing further tasks on the // currently executing task queue. If there's a ready task that also yields, // it's added to this set as well and only tasks on the remaining task // queues are executed. - auto inserted = yielded_.insert(TaskQueueBase::Current()); + auto inserted = yielded_.insert(yielding_from); RTC_DCHECK(inserted.second); RunReadyRunners(); yielded_.erase(inserted.first); @@ -322,6 +347,7 @@ void SimulatedTimeControllerImpl::YieldExecution() { void SimulatedTimeControllerImpl::RunReadyRunners() { RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_EQ(rtc::CurrentThreadId(), thread_id_); Timestamp current_time = CurrentTime(); // We repeat until we have no ready left to handle tasks posted by ready // runners. @@ -362,7 +388,8 @@ void SimulatedTimeControllerImpl::AdvanceTime(Timestamp target_time) { void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) { rtc::CritScope lock(&lock_); - RTC_CHECK(runners_.erase(runner)); + bool removed = RemoveByValue(runners_, runner); + RTC_CHECK(removed); } } // namespace sim_time_impl diff --git a/test/time_controller/simulated_time_controller.h b/test/time_controller/simulated_time_controller.h index 38a99848be..6837643412 100644 --- a/test/time_controller/simulated_time_controller.h +++ b/test/time_controller/simulated_time_controller.h @@ -10,6 +10,7 @@ #ifndef TEST_TIME_CONTROLLER_SIMULATED_TIME_CONTROLLER_H_ #define TEST_TIME_CONTROLLER_SIMULATED_TIME_CONTROLLER_H_ +#include #include #include #include @@ -68,7 +69,7 @@ class SimulatedTimeControllerImpl : public TaskQueueFactory, rtc::CriticalSection time_lock_; Timestamp current_time_ RTC_GUARDED_BY(time_lock_); rtc::CriticalSection lock_; - std::unordered_set runners_ RTC_GUARDED_BY(lock_); + std::vector runners_ RTC_GUARDED_BY(lock_); // Task queues on which YieldExecution has been called. std::unordered_set yielded_ RTC_GUARDED_BY(thread_checker_); };