Minor tweaks to the stdlib TQ implementation.

Remove the `started_` member variable and some other minor updates to
follow conventions elsewhere in the code.

Bug: none
Change-Id: I4cbb914b39cb2e2787719b906ca937931dc3dad7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/258360
Reviewed-by: Markus Handell <handellm@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36677}
This commit is contained in:
Tommi 2022-04-22 15:48:37 +02:00 committed by WebRTC LUCI CQ
parent 942cac2e9e
commit 76d9c18c3a

View File

@ -68,31 +68,32 @@ class TaskQueueStdlib final : public TaskQueueBase {
};
struct NextTask {
bool final_task_{false};
bool final_task_ = false;
std::unique_ptr<QueuedTask> run_task_;
int64_t sleep_time_ms_{};
int64_t sleep_time_ms_ = 0;
};
static rtc::PlatformThread InitializeThread(TaskQueueStdlib* me,
absl::string_view queue_name,
rtc::ThreadPriority priority);
NextTask GetNextTask();
void ProcessTasks();
void NotifyWake();
// Indicates if the thread has started.
rtc::Event started_;
// Signaled whenever a new task is pending.
rtc::Event flag_notify_;
Mutex pending_lock_;
// Indicates if the worker thread needs to shutdown now.
bool thread_should_quit_ RTC_GUARDED_BY(pending_lock_){false};
bool thread_should_quit_ RTC_GUARDED_BY(pending_lock_) = false;
// Holds the next order to use for the next task to be
// put into one of the pending queues.
OrderId thread_posting_order_ RTC_GUARDED_BY(pending_lock_){};
OrderId thread_posting_order_ RTC_GUARDED_BY(pending_lock_) = 0;
// The list of all pending tasks that need to be processed in the
// FIFO queue ordering on the worker thread.
@ -117,16 +118,24 @@ class TaskQueueStdlib final : public TaskQueueBase {
TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name,
rtc::ThreadPriority priority)
: started_(/*manual_reset=*/false, /*initially_signaled=*/false),
flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
thread_(rtc::PlatformThread::SpawnJoinable(
[this] {
CurrentTaskQueueSetter set_current(this);
ProcessTasks();
},
queue_name,
rtc::ThreadAttributes().SetPriority(priority))) {
started_.Wait(rtc::Event::kForever);
: flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
thread_(InitializeThread(this, queue_name, priority)) {}
// static
rtc::PlatformThread TaskQueueStdlib::InitializeThread(
TaskQueueStdlib* me,
absl::string_view queue_name,
rtc::ThreadPriority priority) {
rtc::Event started;
auto thread = rtc::PlatformThread::SpawnJoinable(
[&started, me] {
CurrentTaskQueueSetter set_current(me);
started.Set();
me->ProcessTasks();
},
queue_name, rtc::ThreadAttributes().SetPriority(priority));
started.Wait(rtc::Event::kForever);
return thread;
}
void TaskQueueStdlib::Delete() {
@ -156,7 +165,7 @@ void TaskQueueStdlib::PostTask(std::unique_ptr<QueuedTask> task) {
void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
auto fire_at = rtc::TimeMillis() + milliseconds;
const auto fire_at = rtc::TimeMillis() + milliseconds;
DelayedEntryTimeout delay;
delay.next_fire_at_ms_ = fire_at;
@ -171,9 +180,9 @@ void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task,
}
TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
NextTask result{};
NextTask result;
auto tick = rtc::TimeMillis();
const auto tick = rtc::TimeMillis();
MutexLock lock(&pending_lock_);
@ -216,8 +225,6 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
}
void TaskQueueStdlib::ProcessTasks() {
started_.Set();
while (true) {
auto task = GetNextTask();
@ -230,14 +237,12 @@ void TaskQueueStdlib::ProcessTasks() {
if (release_ptr->Run())
delete release_ptr;
// attempt to sleep again
// Attempt to run more tasks before going to sleep.
continue;
}
if (0 == task.sleep_time_ms_)
flag_notify_.Wait(rtc::Event::kForever);
else
flag_notify_.Wait(task.sleep_time_ms_);
flag_notify_.Wait(0 == task.sleep_time_ms_ ? rtc::Event::kForever
: task.sleep_time_ms_);
}
}