Makes simulated time controller deterministic.
Replacing sets of pointers (that will depend on allocation addresses) with vectors and lists. This allows deterministic execution. Also doing some cleanup of the task queue configuration, ensuring that the task queue states is not set outside of actual task queues. Bug: webrtc:10365 Change-Id: I1fad621c7b1ba0bbb33db8c3bd69cb3a1e212b9c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/130460 Commit-Queue: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Niels Moller <nisse@webrtc.org> Cr-Commit-Position: refs/heads/master@{#27364}
This commit is contained in:
parent
11c012a4ce
commit
7b6add35ab
@ -13,7 +13,6 @@
|
||||
#include <deque>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
@ -22,6 +21,17 @@
|
||||
#include "absl/strings/string_view.h"
|
||||
|
||||
namespace webrtc {
|
||||
namespace {
|
||||
// Helper function to remove from a std container by value.
|
||||
template <class C>
|
||||
bool RemoveByValue(C& vec, typename C::value_type val) {
|
||||
auto it = std::find(vec.begin(), vec.end(), val);
|
||||
if (it == vec.end())
|
||||
return false;
|
||||
vec.erase(it);
|
||||
return true;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace sim_time_impl {
|
||||
class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase {
|
||||
@ -53,12 +63,15 @@ class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase {
|
||||
void WakeUp(Module* module) override;
|
||||
void RegisterModule(Module* module, const rtc::Location& from) override;
|
||||
void DeRegisterModule(Module* module) override;
|
||||
// Promoted to public for use in SimulatedTimeControllerImpl::YieldExecution.
|
||||
using CurrentTaskQueueSetter = TaskQueueBase::CurrentTaskQueueSetter;
|
||||
|
||||
private:
|
||||
Timestamp GetCurrentTime() const { return handler_->CurrentTime(); }
|
||||
void RunReadyTasks(Timestamp at_time) RTC_LOCKS_EXCLUDED(lock_);
|
||||
void RunReadyModules(Timestamp at_time) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
|
||||
void UpdateNextRunTime() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
|
||||
Timestamp GetNextTime(Module* module, Timestamp at_time);
|
||||
|
||||
SimulatedTimeControllerImpl* const handler_;
|
||||
const std::string name_;
|
||||
@ -66,13 +79,14 @@ class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase {
|
||||
rtc::CriticalSection lock_;
|
||||
|
||||
std::deque<std::unique_ptr<QueuedTask>> ready_tasks_ RTC_GUARDED_BY(lock_);
|
||||
std::multimap<Timestamp, std::unique_ptr<QueuedTask>> delayed_tasks_
|
||||
std::map<Timestamp, std::vector<std::unique_ptr<QueuedTask>>> delayed_tasks_
|
||||
RTC_GUARDED_BY(lock_);
|
||||
|
||||
bool process_thread_running_ RTC_GUARDED_BY(lock_) = false;
|
||||
std::set<Module*> stopped_modules_ RTC_GUARDED_BY(lock_);
|
||||
std::set<Module*> ready_modules_ RTC_GUARDED_BY(lock_);
|
||||
std::multimap<Timestamp, Module*> delayed_modules_ RTC_GUARDED_BY(lock_);
|
||||
std::vector<Module*> stopped_modules_ RTC_GUARDED_BY(lock_);
|
||||
std::vector<Module*> ready_modules_ RTC_GUARDED_BY(lock_);
|
||||
std::map<Timestamp, std::list<Module*>> delayed_modules_
|
||||
RTC_GUARDED_BY(lock_);
|
||||
|
||||
Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity();
|
||||
};
|
||||
@ -85,14 +99,18 @@ Timestamp SimulatedSequenceRunner::GetNextRunTime() const {
|
||||
void SimulatedSequenceRunner::UpdateReady(Timestamp at_time) {
|
||||
rtc::CritScope lock(&lock_);
|
||||
for (auto it = delayed_tasks_.begin();
|
||||
it != delayed_tasks_.end() && it->first <= at_time;) {
|
||||
ready_tasks_.emplace_back(std::move(it->second));
|
||||
it = delayed_tasks_.erase(it);
|
||||
it != delayed_tasks_.end() && it->first <= at_time;
|
||||
it = delayed_tasks_.erase(it)) {
|
||||
for (auto& task : it->second) {
|
||||
ready_tasks_.emplace_back(std::move(task));
|
||||
}
|
||||
}
|
||||
for (auto it = delayed_modules_.begin();
|
||||
it != delayed_modules_.end() && it->first <= at_time;) {
|
||||
ready_modules_.insert(it->second);
|
||||
it = delayed_modules_.erase(it);
|
||||
it != delayed_modules_.end() && it->first <= at_time;
|
||||
it = delayed_modules_.erase(it)) {
|
||||
for (auto module : it->second) {
|
||||
ready_modules_.push_back(module);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -136,9 +154,7 @@ void SimulatedSequenceRunner::RunReadyModules(Timestamp at_time) {
|
||||
CurrentTaskQueueSetter set_current(this);
|
||||
for (auto* module : ready_modules_) {
|
||||
module->Process();
|
||||
Timestamp next_run_time =
|
||||
at_time + TimeDelta::ms(module->TimeUntilNextProcess());
|
||||
delayed_modules_.emplace(next_run_time, module);
|
||||
delayed_modules_[GetNextTime(module, at_time)].push_back(module);
|
||||
}
|
||||
}
|
||||
ready_modules_.clear();
|
||||
@ -167,12 +183,12 @@ void SimulatedSequenceRunner::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
||||
uint32_t milliseconds) {
|
||||
rtc::CritScope lock(&lock_);
|
||||
Timestamp target_time = GetCurrentTime() + TimeDelta::ms(milliseconds);
|
||||
delayed_tasks_.emplace(target_time, std::move(task));
|
||||
delayed_tasks_[target_time].push_back(std::move(task));
|
||||
next_run_time_ = std::min(next_run_time_, target_time);
|
||||
}
|
||||
|
||||
void SimulatedSequenceRunner::Start() {
|
||||
std::set<Module*> starting;
|
||||
std::vector<Module*> starting;
|
||||
{
|
||||
rtc::CritScope lock(&lock_);
|
||||
if (process_thread_running_)
|
||||
@ -186,23 +202,24 @@ void SimulatedSequenceRunner::Start() {
|
||||
Timestamp at_time = GetCurrentTime();
|
||||
rtc::CritScope lock(&lock_);
|
||||
for (auto& module : starting)
|
||||
delayed_modules_.insert(
|
||||
{at_time + TimeDelta::ms(module->TimeUntilNextProcess()), module});
|
||||
delayed_modules_[GetNextTime(module, at_time)].push_back(module);
|
||||
UpdateNextRunTime();
|
||||
}
|
||||
|
||||
void SimulatedSequenceRunner::Stop() {
|
||||
std::set<Module*> stopping;
|
||||
std::vector<Module*> stopping;
|
||||
{
|
||||
rtc::CritScope lock(&lock_);
|
||||
process_thread_running_ = false;
|
||||
|
||||
for (auto* ready : ready_modules_)
|
||||
stopped_modules_.insert(ready);
|
||||
stopped_modules_.push_back(ready);
|
||||
ready_modules_.clear();
|
||||
|
||||
for (auto& delayed : delayed_modules_)
|
||||
stopped_modules_.insert(delayed.second);
|
||||
for (auto& delayed : delayed_modules_) {
|
||||
for (auto mod : delayed.second)
|
||||
stopped_modules_.push_back(mod);
|
||||
}
|
||||
delayed_modules_.clear();
|
||||
|
||||
stopping = stopped_modules_;
|
||||
@ -215,18 +232,16 @@ void SimulatedSequenceRunner::WakeUp(Module* module) {
|
||||
rtc::CritScope lock(&lock_);
|
||||
// If we already are planning to run this module as soon as possible, we don't
|
||||
// need to do anything.
|
||||
if (ready_modules_.find(module) != ready_modules_.end())
|
||||
return;
|
||||
for (auto mod : ready_modules_)
|
||||
if (mod == module)
|
||||
return;
|
||||
|
||||
for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) {
|
||||
if (it->second == module) {
|
||||
delayed_modules_.erase(it);
|
||||
if (RemoveByValue(it->second, module))
|
||||
break;
|
||||
}
|
||||
}
|
||||
Timestamp next_time =
|
||||
GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess());
|
||||
delayed_modules_.insert({next_time, module});
|
||||
Timestamp next_time = GetNextTime(module, GetCurrentTime());
|
||||
delayed_modules_[next_time].push_back(module);
|
||||
next_run_time_ = std::min(next_run_time_, next_time);
|
||||
}
|
||||
|
||||
@ -235,11 +250,10 @@ void SimulatedSequenceRunner::RegisterModule(Module* module,
|
||||
module->ProcessThreadAttached(this);
|
||||
rtc::CritScope lock(&lock_);
|
||||
if (!process_thread_running_) {
|
||||
stopped_modules_.insert(module);
|
||||
stopped_modules_.push_back(module);
|
||||
} else {
|
||||
Timestamp next_time =
|
||||
GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess());
|
||||
delayed_modules_.insert({next_time, module});
|
||||
Timestamp next_time = GetNextTime(module, GetCurrentTime());
|
||||
delayed_modules_[next_time].push_back(module);
|
||||
next_run_time_ = std::min(next_run_time_, next_time);
|
||||
}
|
||||
}
|
||||
@ -249,14 +263,13 @@ void SimulatedSequenceRunner::DeRegisterModule(Module* module) {
|
||||
{
|
||||
rtc::CritScope lock(&lock_);
|
||||
if (!process_thread_running_) {
|
||||
stopped_modules_.erase(module);
|
||||
RemoveByValue(stopped_modules_, module);
|
||||
} else {
|
||||
ready_modules_.erase(module);
|
||||
for (auto it = delayed_modules_.begin(); it != delayed_modules_.end();
|
||||
++it) {
|
||||
if (it->second == module) {
|
||||
delayed_modules_.erase(it);
|
||||
break;
|
||||
bool removed = RemoveByValue(ready_modules_, module);
|
||||
if (!removed) {
|
||||
for (auto& pair : delayed_modules_) {
|
||||
if (RemoveByValue(pair.second, module))
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -266,6 +279,12 @@ void SimulatedSequenceRunner::DeRegisterModule(Module* module) {
|
||||
module->ProcessThreadAttached(nullptr);
|
||||
}
|
||||
|
||||
Timestamp SimulatedSequenceRunner::GetNextTime(Module* module,
|
||||
Timestamp at_time) {
|
||||
CurrentTaskQueueSetter set_current(this);
|
||||
return at_time + TimeDelta::ms(module->TimeUntilNextProcess());
|
||||
}
|
||||
|
||||
SimulatedTimeControllerImpl::SimulatedTimeControllerImpl(Timestamp start_time)
|
||||
: thread_id_(rtc::CurrentThreadId()), current_time_(start_time) {}
|
||||
|
||||
@ -280,7 +299,7 @@ SimulatedTimeControllerImpl::CreateTaskQueue(
|
||||
auto task_queue = std::unique_ptr<SimulatedSequenceRunner, TaskQueueDeleter>(
|
||||
new SimulatedSequenceRunner(mutable_this, name));
|
||||
rtc::CritScope lock(&mutable_this->lock_);
|
||||
mutable_this->runners_.insert(task_queue.get());
|
||||
mutable_this->runners_.push_back(task_queue.get());
|
||||
return task_queue;
|
||||
}
|
||||
|
||||
@ -289,7 +308,7 @@ std::unique_ptr<ProcessThread> SimulatedTimeControllerImpl::CreateProcessThread(
|
||||
rtc::CritScope lock(&lock_);
|
||||
auto process_thread =
|
||||
absl::make_unique<SimulatedSequenceRunner>(this, thread_name);
|
||||
runners_.insert(process_thread.get());
|
||||
runners_.push_back(process_thread.get());
|
||||
return process_thread;
|
||||
}
|
||||
|
||||
@ -308,12 +327,18 @@ SimulatedTimeControllerImpl::GetNextReadyRunner(Timestamp current_time) {
|
||||
|
||||
void SimulatedTimeControllerImpl::YieldExecution() {
|
||||
if (rtc::CurrentThreadId() == thread_id_) {
|
||||
TaskQueueBase* yielding_from = TaskQueueBase::Current();
|
||||
// Since we might continue execution on a process thread, we should reset
|
||||
// the thread local task queue reference. This ensures that thread checkers
|
||||
// won't think we are executing on the yielding task queue. It also ensure
|
||||
// that TaskQueueBase::Current() won't return the yielding task queue.
|
||||
SimulatedSequenceRunner::CurrentTaskQueueSetter reset_queue(nullptr);
|
||||
RTC_DCHECK_RUN_ON(&thread_checker_);
|
||||
// When we yield, we don't want to risk executing further tasks on the
|
||||
// currently executing task queue. If there's a ready task that also yields,
|
||||
// it's added to this set as well and only tasks on the remaining task
|
||||
// queues are executed.
|
||||
auto inserted = yielded_.insert(TaskQueueBase::Current());
|
||||
auto inserted = yielded_.insert(yielding_from);
|
||||
RTC_DCHECK(inserted.second);
|
||||
RunReadyRunners();
|
||||
yielded_.erase(inserted.first);
|
||||
@ -322,6 +347,7 @@ void SimulatedTimeControllerImpl::YieldExecution() {
|
||||
|
||||
void SimulatedTimeControllerImpl::RunReadyRunners() {
|
||||
RTC_DCHECK_RUN_ON(&thread_checker_);
|
||||
RTC_DCHECK_EQ(rtc::CurrentThreadId(), thread_id_);
|
||||
Timestamp current_time = CurrentTime();
|
||||
// We repeat until we have no ready left to handle tasks posted by ready
|
||||
// runners.
|
||||
@ -362,7 +388,8 @@ void SimulatedTimeControllerImpl::AdvanceTime(Timestamp target_time) {
|
||||
|
||||
void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) {
|
||||
rtc::CritScope lock(&lock_);
|
||||
RTC_CHECK(runners_.erase(runner));
|
||||
bool removed = RemoveByValue(runners_, runner);
|
||||
RTC_CHECK(removed);
|
||||
}
|
||||
|
||||
} // namespace sim_time_impl
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
#ifndef TEST_TIME_CONTROLLER_SIMULATED_TIME_CONTROLLER_H_
|
||||
#define TEST_TIME_CONTROLLER_SIMULATED_TIME_CONTROLLER_H_
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
@ -68,7 +69,7 @@ class SimulatedTimeControllerImpl : public TaskQueueFactory,
|
||||
rtc::CriticalSection time_lock_;
|
||||
Timestamp current_time_ RTC_GUARDED_BY(time_lock_);
|
||||
rtc::CriticalSection lock_;
|
||||
std::unordered_set<SimulatedSequenceRunner*> runners_ RTC_GUARDED_BY(lock_);
|
||||
std::vector<SimulatedSequenceRunner*> runners_ RTC_GUARDED_BY(lock_);
|
||||
// Task queues on which YieldExecution has been called.
|
||||
std::unordered_set<TaskQueueBase*> yielded_ RTC_GUARDED_BY(thread_checker_);
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user