/* * Copyright 2019 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "test/time_controller/simulated_time_controller.h" #include #include #include #include #include #include #include #include #include "absl/memory/memory.h" #include "absl/strings/string_view.h" namespace webrtc { namespace sim_time_impl { class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase { public: SimulatedSequenceRunner(SimulatedTimeControllerImpl* handler, absl::string_view queue_name) : handler_(handler), name_(queue_name) {} ~SimulatedSequenceRunner() override { handler_->Unregister(this); } // Provides next run time. Timestamp GetNextRunTime() const; // Iterates through delayed tasks and modules and moves them to the ready set // if they are supposed to execute by |at time|. void UpdateReady(Timestamp at_time); // Runs all ready tasks and modules and updates next run time. void Run(Timestamp at_time); // TaskQueueBase interface void Delete() override; // Note: PostTask is also in ProcessThread interface. void PostTask(std::unique_ptr task) override; void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) override; // ProcessThread interface void Start() override; void Stop() override; void WakeUp(Module* module) override; void RegisterModule(Module* module, const rtc::Location& from) override; void DeRegisterModule(Module* module) override; private: Timestamp GetCurrentTime() const { return handler_->CurrentTime(); } void RunReadyTasks(Timestamp at_time) RTC_LOCKS_EXCLUDED(lock_); void RunReadyModules(Timestamp at_time) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); void UpdateNextRunTime() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); SimulatedTimeControllerImpl* const handler_; const std::string name_; rtc::CriticalSection lock_; std::deque> ready_tasks_ RTC_GUARDED_BY(lock_); std::multimap> delayed_tasks_ RTC_GUARDED_BY(lock_); bool process_thread_running_ RTC_GUARDED_BY(lock_) = false; std::set stopped_modules_ RTC_GUARDED_BY(lock_); std::set ready_modules_ RTC_GUARDED_BY(lock_); std::multimap delayed_modules_ RTC_GUARDED_BY(lock_); Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity(); }; Timestamp SimulatedSequenceRunner::GetNextRunTime() const { rtc::CritScope lock(&lock_); return next_run_time_; } void SimulatedSequenceRunner::UpdateReady(Timestamp at_time) { rtc::CritScope lock(&lock_); for (auto it = delayed_tasks_.begin(); it != delayed_tasks_.end() && it->first <= at_time;) { ready_tasks_.emplace_back(std::move(it->second)); it = delayed_tasks_.erase(it); } for (auto it = delayed_modules_.begin(); it != delayed_modules_.end() && it->first <= at_time;) { ready_modules_.insert(it->second); it = delayed_modules_.erase(it); } } void SimulatedSequenceRunner::Run(Timestamp at_time) { RunReadyTasks(at_time); rtc::CritScope lock(&lock_); RunReadyModules(at_time); UpdateNextRunTime(); } void SimulatedSequenceRunner::Delete() { { rtc::CritScope lock(&lock_); ready_tasks_.clear(); delayed_tasks_.clear(); } delete this; } void SimulatedSequenceRunner::RunReadyTasks(Timestamp at_time) { std::deque> ready_tasks; { rtc::CritScope lock(&lock_); ready_tasks.swap(ready_tasks_); } if (!ready_tasks.empty()) { CurrentTaskQueueSetter set_current(this); for (auto& ready : ready_tasks) { bool delete_task = ready->Run(); if (delete_task) { ready.reset(); } else { ready.release(); } } } } void SimulatedSequenceRunner::RunReadyModules(Timestamp at_time) { if (!ready_modules_.empty()) { CurrentTaskQueueSetter set_current(this); for (auto* module : ready_modules_) { module->Process(); Timestamp next_run_time = at_time + TimeDelta::ms(module->TimeUntilNextProcess()); delayed_modules_.emplace(next_run_time, module); } } ready_modules_.clear(); } void SimulatedSequenceRunner::UpdateNextRunTime() { if (!ready_tasks_.empty() || !ready_modules_.empty()) { next_run_time_ = Timestamp::MinusInfinity(); } else { next_run_time_ = Timestamp::PlusInfinity(); if (!delayed_tasks_.empty()) next_run_time_ = std::min(next_run_time_, delayed_tasks_.begin()->first); if (!delayed_modules_.empty()) next_run_time_ = std::min(next_run_time_, delayed_modules_.begin()->first); } } void SimulatedSequenceRunner::PostTask(std::unique_ptr task) { rtc::CritScope lock(&lock_); ready_tasks_.emplace_back(std::move(task)); next_run_time_ = Timestamp::MinusInfinity(); } void SimulatedSequenceRunner::PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) { rtc::CritScope lock(&lock_); Timestamp target_time = GetCurrentTime() + TimeDelta::ms(milliseconds); delayed_tasks_.emplace(target_time, std::move(task)); next_run_time_ = std::min(next_run_time_, target_time); } void SimulatedSequenceRunner::Start() { std::set starting; { rtc::CritScope lock(&lock_); if (process_thread_running_) return; process_thread_running_ = true; starting.swap(stopped_modules_); } for (auto& module : starting) module->ProcessThreadAttached(this); Timestamp at_time = GetCurrentTime(); rtc::CritScope lock(&lock_); for (auto& module : starting) delayed_modules_.insert( {at_time + TimeDelta::ms(module->TimeUntilNextProcess()), module}); UpdateNextRunTime(); } void SimulatedSequenceRunner::Stop() { std::set stopping; { rtc::CritScope lock(&lock_); process_thread_running_ = false; for (auto* ready : ready_modules_) stopped_modules_.insert(ready); ready_modules_.clear(); for (auto& delayed : delayed_modules_) stopped_modules_.insert(delayed.second); delayed_modules_.clear(); stopping = stopped_modules_; } for (auto& module : stopping) module->ProcessThreadAttached(nullptr); } void SimulatedSequenceRunner::WakeUp(Module* module) { rtc::CritScope lock(&lock_); // If we already are planning to run this module as soon as possible, we don't // need to do anything. if (ready_modules_.find(module) != ready_modules_.end()) return; for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) { if (it->second == module) { delayed_modules_.erase(it); break; } } Timestamp next_time = GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess()); delayed_modules_.insert({next_time, module}); next_run_time_ = std::min(next_run_time_, next_time); } void SimulatedSequenceRunner::RegisterModule(Module* module, const rtc::Location& from) { module->ProcessThreadAttached(this); rtc::CritScope lock(&lock_); if (!process_thread_running_) { stopped_modules_.insert(module); } else { Timestamp next_time = GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess()); delayed_modules_.insert({next_time, module}); next_run_time_ = std::min(next_run_time_, next_time); } } void SimulatedSequenceRunner::DeRegisterModule(Module* module) { bool modules_running; { rtc::CritScope lock(&lock_); if (!process_thread_running_) { stopped_modules_.erase(module); } else { ready_modules_.erase(module); for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) { if (it->second == module) { delayed_modules_.erase(it); break; } } } modules_running = process_thread_running_; } if (modules_running) module->ProcessThreadAttached(nullptr); } SimulatedTimeControllerImpl::SimulatedTimeControllerImpl(Timestamp start_time) : thread_id_(rtc::CurrentThreadId()), current_time_(start_time) {} SimulatedTimeControllerImpl::~SimulatedTimeControllerImpl() = default; std::unique_ptr SimulatedTimeControllerImpl::CreateTaskQueue( absl::string_view name, TaskQueueFactory::Priority priority) const { // TODO(srte): Remove the const cast when the interface is made mutable. auto mutable_this = const_cast(this); auto task_queue = std::unique_ptr( new SimulatedSequenceRunner(mutable_this, name)); rtc::CritScope lock(&mutable_this->lock_); mutable_this->runners_.insert(task_queue.get()); return task_queue; } std::unique_ptr SimulatedTimeControllerImpl::CreateProcessThread( const char* thread_name) { rtc::CritScope lock(&lock_); auto process_thread = absl::make_unique(this, thread_name); runners_.insert(process_thread.get()); return process_thread; } std::vector SimulatedTimeControllerImpl::GetNextReadyRunner(Timestamp current_time) { rtc::CritScope lock(&lock_); std::vector ready; for (auto* runner : runners_) { if (yielded_.find(runner) == yielded_.end() && runner->GetNextRunTime() <= current_time) { ready.push_back(runner); } } return ready; } void SimulatedTimeControllerImpl::YieldExecution() { if (rtc::CurrentThreadId() == thread_id_) { RTC_DCHECK_RUN_ON(&thread_checker_); // When we yield, we don't want to risk executing further tasks on the // currently executing task queue. If there's a ready task that also yields, // it's added to this set as well and only tasks on the remaining task // queues are executed. auto inserted = yielded_.insert(TaskQueueBase::Current()); RTC_DCHECK(inserted.second); RunReadyRunners(); yielded_.erase(inserted.first); } } void SimulatedTimeControllerImpl::RunReadyRunners() { RTC_DCHECK_RUN_ON(&thread_checker_); Timestamp current_time = CurrentTime(); // We repeat until we have no ready left to handle tasks posted by ready // runners. while (true) { auto ready = GetNextReadyRunner(current_time); if (ready.empty()) break; for (auto* runner : ready) { runner->UpdateReady(current_time); runner->Run(current_time); } } } Timestamp SimulatedTimeControllerImpl::CurrentTime() const { rtc::CritScope lock(&time_lock_); return current_time_; } Timestamp SimulatedTimeControllerImpl::NextRunTime() const { Timestamp current_time = CurrentTime(); Timestamp next_time = Timestamp::PlusInfinity(); rtc::CritScope lock(&lock_); for (auto* runner : runners_) { Timestamp next_run_time = runner->GetNextRunTime(); if (next_run_time <= current_time) return current_time; next_time = std::min(next_time, next_run_time); } return next_time; } void SimulatedTimeControllerImpl::AdvanceTime(Timestamp target_time) { rtc::CritScope time_lock(&time_lock_); RTC_DCHECK(target_time >= current_time_); current_time_ = target_time; } void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) { rtc::CritScope lock(&lock_); RTC_CHECK(runners_.erase(runner)); } } // namespace sim_time_impl GlobalSimulatedTimeController::GlobalSimulatedTimeController( Timestamp start_time) : sim_clock_(start_time.us()), impl_(start_time) { global_clock_.SetTimeMicros(start_time.us()); } GlobalSimulatedTimeController::~GlobalSimulatedTimeController() = default; Clock* GlobalSimulatedTimeController::GetClock() { return &sim_clock_; } TaskQueueFactory* GlobalSimulatedTimeController::GetTaskQueueFactory() { return &impl_; } std::unique_ptr GlobalSimulatedTimeController::CreateProcessThread(const char* thread_name) { return impl_.CreateProcessThread(thread_name); } void GlobalSimulatedTimeController::Sleep(TimeDelta duration) { rtc::ScopedYieldPolicy yield_policy(&impl_); Timestamp current_time = impl_.CurrentTime(); Timestamp target_time = current_time + duration; RTC_DCHECK_EQ(current_time.us(), rtc::TimeMicros()); while (current_time < target_time) { impl_.RunReadyRunners(); Timestamp next_time = std::min(impl_.NextRunTime(), target_time); impl_.AdvanceTime(next_time); auto delta = next_time - current_time; current_time = next_time; sim_clock_.AdvanceTimeMicroseconds(delta.us()); global_clock_.AdvanceTimeMicros(delta.us()); } } void GlobalSimulatedTimeController::InvokeWithControlledYield( std::function closure) { rtc::ScopedYieldPolicy yield_policy(&impl_); closure(); } // namespace sim_time_impl } // namespace webrtc