Make ProcessThread be a TaskQueue implementation

That would allow to switch components from relying on ProcessThreads to
relying on TaskQueue one by one, without introducing new threads.

Bug: webrtc:6289
Change-Id: I18fe5d679d4d4d0ddf4a11900c9814eb570284d6
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/167533
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30631}
This commit is contained in:
Danil Chapovalov 2020-02-27 13:37:43 +01:00 committed by Commit Bot
parent a7382f7879
commit 14273de88b
9 changed files with 139 additions and 32 deletions

View File

@ -60,6 +60,7 @@ if (rtc_include_tests) {
":utility", ":utility",
"..:module_api", "..:module_api",
"../../api/task_queue", "../../api/task_queue",
"../../api/task_queue:task_queue_test",
"../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_base_approved",
"../../test:test_support", "../../test:test_support",
] ]

View File

@ -21,23 +21,20 @@ namespace webrtc {
class MockProcessThread : public ProcessThread { class MockProcessThread : public ProcessThread {
public: public:
// TODO(nisse): Valid overrides commented out, because the gmock MOCK_METHOD(void, Start, (), (override));
// methods don't use any override declarations, and we want to avoid MOCK_METHOD(void, Stop, (), (override));
// warnings from -Winconsistent-missing-override. See MOCK_METHOD(void, Delete, (), (override));
// http://crbug.com/428099. MOCK_METHOD(void, WakeUp, (Module*), (override));
MOCK_METHOD0(Start, void()); MOCK_METHOD(void, PostTask, (std::unique_ptr<QueuedTask>), (override));
MOCK_METHOD0(Stop, void()); MOCK_METHOD(void,
MOCK_METHOD1(WakeUp, void(Module* module)); PostDelayedTask,
MOCK_METHOD1(PostTask, void(QueuedTask* task)); (std::unique_ptr<QueuedTask>, uint32_t),
MOCK_METHOD2(RegisterModule, void(Module* module, const rtc::Location&)); (override));
MOCK_METHOD1(DeRegisterModule, void(Module* module)); MOCK_METHOD(void,
RegisterModule,
// MOCK_METHOD1 gets confused with mocking this method, so we work around it (Module*, const rtc::Location&),
// by overriding the method from the interface and forwarding the call to a (override));
// mocked, simpler method. MOCK_METHOD(void, DeRegisterModule, (Module*), (override));
void PostTask(std::unique_ptr<QueuedTask> task) /*override*/ {
PostTask(task.get());
}
}; };
} // namespace webrtc } // namespace webrtc

View File

@ -14,6 +14,7 @@
#include <memory> #include <memory>
#include "api/task_queue/queued_task.h" #include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
namespace rtc { namespace rtc {
class Location; class Location;
@ -26,9 +27,9 @@ class Module;
// interface. There exists one override besides ProcessThreadImpl, // interface. There exists one override besides ProcessThreadImpl,
// MockProcessThread, but when looking at how it is used, it seems // MockProcessThread, but when looking at how it is used, it seems
// a nullptr might suffice (or simply an actual ProcessThread instance). // a nullptr might suffice (or simply an actual ProcessThread instance).
class ProcessThread { class ProcessThread : public TaskQueueBase {
public: public:
virtual ~ProcessThread(); ~ProcessThread() override;
static std::unique_ptr<ProcessThread> Create(const char* thread_name); static std::unique_ptr<ProcessThread> Create(const char* thread_name);
@ -45,14 +46,6 @@ class ProcessThread {
// Can be called on any thread. // Can be called on any thread.
virtual void WakeUp(Module* module) = 0; virtual void WakeUp(Module* module) = 0;
// Queues a task object to run on the worker thread. Ownership of the
// task object is transferred to the ProcessThread and the object will
// either be deleted after running on the worker thread, or on the
// construction thread of the ProcessThread instance, if the task did not
// get a chance to run (e.g. posting the task while shutting down or when
// the thread never runs).
virtual void PostTask(std::unique_ptr<QueuedTask> task) = 0;
// Adds a module that will start to receive callbacks on the worker thread. // Adds a module that will start to receive callbacks on the worker thread.
// Can be called from any thread. // Can be called from any thread.
virtual void RegisterModule(Module* module, const rtc::Location& from) = 0; virtual void RegisterModule(Module* module, const rtc::Location& from) = 0;

View File

@ -14,6 +14,7 @@
#include "modules/include/module.h" #include "modules/include/module.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
@ -50,12 +51,24 @@ ProcessThreadImpl::~ProcessThreadImpl() {
RTC_DCHECK(!thread_.get()); RTC_DCHECK(!thread_.get());
RTC_DCHECK(!stop_); RTC_DCHECK(!stop_);
while (!delayed_tasks_.empty()) {
delete delayed_tasks_.top().task;
delayed_tasks_.pop();
}
while (!queue_.empty()) { while (!queue_.empty()) {
delete queue_.front(); delete queue_.front();
queue_.pop(); queue_.pop();
} }
} }
void ProcessThreadImpl::Delete() {
RTC_LOG(LS_WARNING) << "Process thread " << thread_name_
<< " is destroyed as a TaskQueue.";
Stop();
delete this;
}
void ProcessThreadImpl::Start() { void ProcessThreadImpl::Start() {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_.get()); RTC_DCHECK(!thread_.get());
@ -113,6 +126,21 @@ void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
wake_up_.Set(); wake_up_.Set();
} }
void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
bool recalculate_wakeup_time;
{
rtc::CritScope lock(&lock_);
recalculate_wakeup_time =
delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
delayed_tasks_.emplace(run_at_ms, std::move(task));
}
if (recalculate_wakeup_time) {
wake_up_.Set();
}
}
void ProcessThreadImpl::RegisterModule(Module* module, void ProcessThreadImpl::RegisterModule(Module* module,
const rtc::Location& from) { const rtc::Location& from) {
RTC_DCHECK(thread_checker_.IsCurrent()); RTC_DCHECK(thread_checker_.IsCurrent());
@ -166,6 +194,7 @@ void ProcessThreadImpl::DeRegisterModule(Module* module) {
// static // static
void ProcessThreadImpl::Run(void* obj) { void ProcessThreadImpl::Run(void* obj) {
ProcessThreadImpl* impl = static_cast<ProcessThreadImpl*>(obj); ProcessThreadImpl* impl = static_cast<ProcessThreadImpl*>(obj);
CurrentTaskQueueSetter set_current(impl);
while (impl->Process()) { while (impl->Process()) {
} }
} }
@ -206,12 +235,23 @@ bool ProcessThreadImpl::Process() {
next_checkpoint = m.next_callback; next_checkpoint = m.next_callback;
} }
while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
queue_.push(delayed_tasks_.top().task);
delayed_tasks_.pop();
}
if (!delayed_tasks_.empty()) {
next_checkpoint =
std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
}
while (!queue_.empty()) { while (!queue_.empty()) {
QueuedTask* task = queue_.front(); QueuedTask* task = queue_.front();
queue_.pop(); queue_.pop();
lock_.Leave(); lock_.Leave();
task->Run(); if (task->Run()) {
delete task; delete task;
}
lock_.Enter(); lock_.Enter();
} }
} }

View File

@ -38,6 +38,8 @@ class ProcessThreadImpl : public ProcessThread {
void WakeUp(Module* module) override; void WakeUp(Module* module) override;
void PostTask(std::unique_ptr<QueuedTask> task) override; void PostTask(std::unique_ptr<QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override;
void RegisterModule(Module* module, const rtc::Location& from) override; void RegisterModule(Module* module, const rtc::Location& from) override;
void DeRegisterModule(Module* module) override; void DeRegisterModule(Module* module) override;
@ -64,9 +66,26 @@ class ProcessThreadImpl : public ProcessThread {
private: private:
ModuleCallback& operator=(ModuleCallback&); ModuleCallback& operator=(ModuleCallback&);
}; };
struct DelayedTask {
DelayedTask(int64_t run_at_ms, std::unique_ptr<QueuedTask> task)
: run_at_ms(run_at_ms), task(task.release()) {}
friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) {
// Earliest DelayedTask should be at the top of the priority queue.
return lhs.run_at_ms > rhs.run_at_ms;
}
int64_t run_at_ms;
// DelayedTask owns the |task|, but some delayed tasks must be removed from
// the std::priority_queue, but mustn't be deleted. std::priority_queue does
// not give non-const access to the values, so storing unique_ptr would
// delete the task as soon as it is remove from the priority queue.
// Thus lifetime of the |task| is managed manually.
QueuedTask* task;
};
typedef std::list<ModuleCallback> ModuleList; typedef std::list<ModuleCallback> ModuleList;
void Delete() override;
// Warning: For some reason, if |lock_| comes immediately before |modules_| // Warning: For some reason, if |lock_| comes immediately before |modules_|
// with the current class layout, we will start to have mysterious crashes // with the current class layout, we will start to have mysterious crashes
// on Mac 10.9 debug. I (Tommi) suspect we're hitting some obscure alignemnt // on Mac 10.9 debug. I (Tommi) suspect we're hitting some obscure alignemnt
@ -82,6 +101,7 @@ class ProcessThreadImpl : public ProcessThread {
ModuleList modules_; ModuleList modules_;
std::queue<QueuedTask*> queue_; std::queue<QueuedTask*> queue_;
std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(lock_);
bool stop_; bool stop_;
const char* thread_name_; const char* thread_name_;
}; };

View File

@ -14,6 +14,7 @@
#include <utility> #include <utility>
#include "api/task_queue/queued_task.h" #include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_test.h"
#include "modules/include/module.h" #include "modules/include/module.h"
#include "rtc_base/location.h" #include "rtc_base/location.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
@ -310,4 +311,21 @@ TEST(ProcessThreadImpl, PostTask) {
thread.Stop(); thread.Stop();
} }
class ProcessThreadFactory : public TaskQueueFactory {
public:
~ProcessThreadFactory() override = default;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view name,
Priority priority) const override {
ProcessThreadImpl* process_thread = new ProcessThreadImpl("thread");
process_thread->Start();
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(process_thread);
}
};
INSTANTIATE_TEST_SUITE_P(
ProcessThread,
TaskQueueTest,
testing::Values(std::make_unique<ProcessThreadFactory>));
} // namespace webrtc } // namespace webrtc

View File

@ -61,6 +61,13 @@ class ExternalTimeController::ProcessThreadWrapper : public ProcessThread {
parent_->ScheduleNext(); parent_->ScheduleNext();
} }
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override {
parent_->UpdateTime();
thread_->PostDelayedTask(std::move(task), milliseconds);
parent_->ScheduleNext();
}
void RegisterModule(Module* module, const rtc::Location& from) override { void RegisterModule(Module* module, const rtc::Location& from) override {
parent_->UpdateTime(); parent_->UpdateTime();
module_wrappers_.emplace(module, new ModuleWrapper(module, this)); module_wrappers_.emplace(module, new ModuleWrapper(module, this));
@ -100,6 +107,11 @@ class ExternalTimeController::ProcessThreadWrapper : public ProcessThread {
ProcessThreadWrapper* thread_; ProcessThreadWrapper* thread_;
}; };
void Delete() override {
// ProcessThread shouldn't be deleted as a TaskQueue.
RTC_NOTREACHED();
}
ModuleWrapper* GetWrapper(Module* module) { ModuleWrapper* GetWrapper(Module* module) {
auto it = module_wrappers_.find(module); auto it = module_wrappers_.find(module);
RTC_DCHECK(it != module_wrappers_.end()); RTC_DCHECK(it != module_wrappers_.end());

View File

@ -38,7 +38,7 @@ SimulatedProcessThread::~SimulatedProcessThread() {
} }
void SimulatedProcessThread::RunReady(Timestamp at_time) { void SimulatedProcessThread::RunReady(Timestamp at_time) {
TokenTaskQueue::CurrentTaskQueueSetter set_current(this); CurrentTaskQueueSetter set_current(this);
rtc::CritScope lock(&lock_); rtc::CritScope lock(&lock_);
std::vector<Module*> ready_modules; std::vector<Module*> ready_modules;
for (auto it = delayed_modules_.begin(); for (auto it = delayed_modules_.begin();
@ -53,6 +53,13 @@ void SimulatedProcessThread::RunReady(Timestamp at_time) {
delayed_modules_[GetNextTime(module, at_time)].push_back(module); delayed_modules_[GetNextTime(module, at_time)].push_back(module);
} }
for (auto it = delayed_tasks_.begin();
it != delayed_tasks_.end() && it->first <= at_time;
it = delayed_tasks_.erase(it)) {
for (auto& task : it->second) {
queue_.push_back(std::move(task));
}
}
while (!queue_.empty()) { while (!queue_.empty()) {
std::unique_ptr<QueuedTask> task = std::move(queue_.front()); std::unique_ptr<QueuedTask> task = std::move(queue_.front());
queue_.pop_front(); queue_.pop_front();
@ -67,6 +74,9 @@ void SimulatedProcessThread::RunReady(Timestamp at_time) {
} else { } else {
next_run_time_ = Timestamp::PlusInfinity(); next_run_time_ = Timestamp::PlusInfinity();
} }
if (!delayed_tasks_.empty()) {
next_run_time_ = std::min(next_run_time_, delayed_tasks_.begin()->first);
}
} }
void SimulatedProcessThread::Start() { void SimulatedProcessThread::Start() {
std::vector<Module*> starting; std::vector<Module*> starting;
@ -160,6 +170,15 @@ void SimulatedProcessThread::PostTask(std::unique_ptr<QueuedTask> task) {
next_run_time_ = Timestamp::MinusInfinity(); next_run_time_ = Timestamp::MinusInfinity();
} }
void SimulatedProcessThread::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
rtc::CritScope lock(&lock_);
Timestamp target_time =
handler_->CurrentTime() + TimeDelta::Millis(milliseconds);
delayed_tasks_[target_time].push_back(std::move(task));
next_run_time_ = std::min(next_run_time_, target_time);
}
Timestamp SimulatedProcessThread::GetNextTime(Module* module, Timestamp SimulatedProcessThread::GetNextTime(Module* module,
Timestamp at_time) { Timestamp at_time) {
CurrentTaskQueueSetter set_current(this); CurrentTaskQueueSetter set_current(this);

View File

@ -20,8 +20,7 @@
namespace webrtc { namespace webrtc {
class SimulatedProcessThread : public TokenTaskQueue, class SimulatedProcessThread : public ProcessThread,
public ProcessThread,
public sim_time_impl::SimulatedSequenceRunner { public sim_time_impl::SimulatedSequenceRunner {
public: public:
SimulatedProcessThread(sim_time_impl::SimulatedTimeControllerImpl* handler, SimulatedProcessThread(sim_time_impl::SimulatedTimeControllerImpl* handler,
@ -43,8 +42,14 @@ class SimulatedProcessThread : public TokenTaskQueue,
void RegisterModule(Module* module, const rtc::Location& from) override; void RegisterModule(Module* module, const rtc::Location& from) override;
void DeRegisterModule(Module* module) override; void DeRegisterModule(Module* module) override;
void PostTask(std::unique_ptr<QueuedTask> task) override; void PostTask(std::unique_ptr<QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override;
private: private:
void Delete() override {
// ProcessThread shouldn't be deleted as a TaskQueue.
RTC_NOTREACHED();
}
Timestamp GetNextTime(Module* module, Timestamp at_time); Timestamp GetNextTime(Module* module, Timestamp at_time);
sim_time_impl::SimulatedTimeControllerImpl* const handler_; sim_time_impl::SimulatedTimeControllerImpl* const handler_;
@ -54,6 +59,8 @@ class SimulatedProcessThread : public TokenTaskQueue,
Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity(); Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity();
std::deque<std::unique_ptr<QueuedTask>> queue_; std::deque<std::unique_ptr<QueuedTask>> queue_;
std::map<Timestamp, std::vector<std::unique_ptr<QueuedTask>>> delayed_tasks_
RTC_GUARDED_BY(lock_);
bool process_thread_running_ RTC_GUARDED_BY(lock_) = false; bool process_thread_running_ RTC_GUARDED_BY(lock_) = false;
std::vector<Module*> stopped_modules_ RTC_GUARDED_BY(lock_); std::vector<Module*> stopped_modules_ RTC_GUARDED_BY(lock_);