Update TaskQueueWin implementation to absl::AnyInvocable

Bug: webrtc:14245
Change-Id: I4203f4dbbdc9c2ee4a6440942215341182f180db
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/269000
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37571}
This commit is contained in:
Danil Chapovalov 2022-07-20 13:38:58 +02:00 committed by WebRTC LUCI CQ
parent 07d80675e2
commit 3c06cfc96a
2 changed files with 46 additions and 52 deletions

View File

@ -696,9 +696,12 @@ if (is_win) {
":safe_conversions", ":safe_conversions",
":timeutils", ":timeutils",
"../api/task_queue", "../api/task_queue",
"../api/units:time_delta",
"../api/units:timestamp",
"synchronization:mutex", "synchronization:mutex",
] ]
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional", "//third_party/abseil-cpp/absl/types:optional",
] ]

View File

@ -24,14 +24,17 @@
#include <string.h> #include <string.h>
#include <algorithm> #include <algorithm>
#include <functional>
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <utility> #include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/arraysize.h" #include "rtc_base/arraysize.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
@ -43,7 +46,6 @@
namespace webrtc { namespace webrtc {
namespace { namespace {
#define WM_RUN_TASK WM_USER + 1
#define WM_QUEUE_DELAYED_TASK WM_USER + 2 #define WM_QUEUE_DELAYED_TASK WM_USER + 2
void CALLBACK InitializeQueueThread(ULONG_PTR param) { void CALLBACK InitializeQueueThread(ULONG_PTR param) {
@ -65,10 +67,10 @@ rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
} }
} }
int64_t GetTick() { Timestamp CurrentTime() {
static const UINT kPeriod = 1; static const UINT kPeriod = 1;
bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR); bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
int64_t ret = rtc::TimeMillis(); Timestamp ret = Timestamp::Micros(rtc::TimeMicros());
if (high_res) if (high_res)
timeEndPeriod(kPeriod); timeEndPeriod(kPeriod);
return ret; return ret;
@ -78,8 +80,8 @@ class DelayedTaskInfo {
public: public:
// Default ctor needed to support priority_queue::pop(). // Default ctor needed to support priority_queue::pop().
DelayedTaskInfo() {} DelayedTaskInfo() {}
DelayedTaskInfo(uint32_t milliseconds, std::unique_ptr<QueuedTask> task) DelayedTaskInfo(TimeDelta delay, absl::AnyInvocable<void() &&> task)
: due_time_(GetTick() + milliseconds), task_(std::move(task)) {} : due_time_(CurrentTime() + delay), task_(std::move(task)) {}
DelayedTaskInfo(DelayedTaskInfo&&) = default; DelayedTaskInfo(DelayedTaskInfo&&) = default;
// Implement for priority_queue. // Implement for priority_queue.
@ -92,14 +94,14 @@ class DelayedTaskInfo {
// See below for why this method is const. // See below for why this method is const.
void Run() const { void Run() const {
RTC_DCHECK(due_time_); RTC_DCHECK(task_);
task_->Run() ? task_.reset() : static_cast<void>(task_.release()); std::move(task_)();
} }
int64_t due_time() const { return due_time_; } Timestamp due_time() const { return due_time_; }
private: private:
int64_t due_time_ = 0; // Absolute timestamp in milliseconds. Timestamp due_time_ = Timestamp::Zero();
// `task` needs to be mutable because std::priority_queue::top() returns // `task` needs to be mutable because std::priority_queue::top() returns
// a const reference and a key in an ordered queue must not be changed. // a const reference and a key in an ordered queue must not be changed.
@ -108,7 +110,7 @@ class DelayedTaskInfo {
// (`task`), mutable. // (`task`), mutable.
// Because of this, the `task` variable is made private and can only be // Because of this, the `task` variable is made private and can only be
// mutated by calling the `Run()` method. // mutated by calling the `Run()` method.
mutable std::unique_ptr<QueuedTask> task_; mutable absl::AnyInvocable<void() &&> task_;
}; };
class MultimediaTimer { class MultimediaTimer {
@ -158,10 +160,11 @@ class TaskQueueWin : public TaskQueueBase {
~TaskQueueWin() override = default; ~TaskQueueWin() override = default;
void Delete() override; void Delete() override;
void PostTask(std::unique_ptr<QueuedTask> task) override; void PostTask(absl::AnyInvocable<void() &&> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task, void PostDelayedTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) override; TimeDelta delay) override;
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override;
void RunPendingTasks(); void RunPendingTasks();
private: private:
@ -171,25 +174,18 @@ class TaskQueueWin : public TaskQueueBase {
void ScheduleNextTimer(); void ScheduleNextTimer();
void CancelTimers(); void CancelTimers();
MultimediaTimer timer_;
// Since priority_queue<> by defult orders items in terms of // Since priority_queue<> by defult orders items in terms of
// largest->smallest, using std::less<>, and we want smallest->largest, // largest->smallest, using std::less<>, and we want smallest->largest,
// we would like to use std::greater<> here. Alas it's only available in // we would like to use std::greater<> here.
// C++14 and later, so we roll our own compare template that that relies on
// operator<().
template <typename T>
struct greater {
bool operator()(const T& l, const T& r) { return l > r; }
};
MultimediaTimer timer_;
std::priority_queue<DelayedTaskInfo, std::priority_queue<DelayedTaskInfo,
std::vector<DelayedTaskInfo>, std::vector<DelayedTaskInfo>,
greater<DelayedTaskInfo>> std::greater<DelayedTaskInfo>>
timer_tasks_; timer_tasks_;
UINT_PTR timer_id_ = 0; UINT_PTR timer_id_ = 0;
rtc::PlatformThread thread_; rtc::PlatformThread thread_;
Mutex pending_lock_; Mutex pending_lock_;
std::queue<std::unique_ptr<QueuedTask>> pending_ std::queue<absl::AnyInvocable<void() &&>> pending_
RTC_GUARDED_BY(pending_lock_); RTC_GUARDED_BY(pending_lock_);
HANDLE in_queue_; HANDLE in_queue_;
}; };
@ -221,24 +217,20 @@ void TaskQueueWin::Delete() {
delete this; delete this;
} }
void TaskQueueWin::PostTask(std::unique_ptr<QueuedTask> task) { void TaskQueueWin::PostTask(absl::AnyInvocable<void() &&> task) {
MutexLock lock(&pending_lock_); MutexLock lock(&pending_lock_);
pending_.push(std::move(task)); pending_.push(std::move(task));
::SetEvent(in_queue_); ::SetEvent(in_queue_);
} }
void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task, void TaskQueueWin::PostDelayedTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) { TimeDelta delay) {
if (!milliseconds) { if (delay <= TimeDelta::Zero()) {
PostTask(std::move(task)); PostTask(std::move(task));
return; return;
} }
// TODO(tommi): Avoid this allocation. It is currently here since auto* task_info = new DelayedTaskInfo(delay, std::move(task));
// the timestamp stored in the task info object, is a 64bit timestamp
// and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
// task pointer and timestamp as LPARAM and WPARAM.
auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
RTC_CHECK(thread_.GetHandle() != absl::nullopt); RTC_CHECK(thread_.GetHandle() != absl::nullopt);
if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()), if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()),
WM_QUEUE_DELAYED_TASK, 0, WM_QUEUE_DELAYED_TASK, 0,
@ -247,9 +239,15 @@ void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task,
} }
} }
void TaskQueueWin::PostDelayedHighPrecisionTask(
absl::AnyInvocable<void() &&> task,
TimeDelta delay) {
PostDelayedTask(std::move(task), delay);
}
void TaskQueueWin::RunPendingTasks() { void TaskQueueWin::RunPendingTasks() {
while (true) { while (true) {
std::unique_ptr<QueuedTask> task; absl::AnyInvocable<void() &&> task;
{ {
MutexLock lock(&pending_lock_); MutexLock lock(&pending_lock_);
if (pending_.empty()) if (pending_.empty())
@ -258,8 +256,7 @@ void TaskQueueWin::RunPendingTasks() {
pending_.pop(); pending_.pop();
} }
if (!task->Run()) std::move(task)();
task.release();
} }
} }
@ -300,26 +297,19 @@ bool TaskQueueWin::ProcessQueuedMessages() {
// To protect against overly busy message queues, we limit the time // To protect against overly busy message queues, we limit the time
// we process tasks to a few milliseconds. If we don't do that, there's // we process tasks to a few milliseconds. If we don't do that, there's
// a chance that timer tasks won't ever run. // a chance that timer tasks won't ever run.
static const int kMaxTaskProcessingTimeMs = 500; static constexpr TimeDelta kMaxTaskProcessingTime = TimeDelta::Millis(500);
auto start = GetTick(); Timestamp start = CurrentTime();
while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
msg.message != WM_QUIT) { msg.message != WM_QUIT) {
if (!msg.hwnd) { if (!msg.hwnd) {
switch (msg.message) { switch (msg.message) {
// TODO(tommi): Stop using this way of queueing tasks.
case WM_RUN_TASK: {
QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
if (task->Run())
delete task;
break;
}
case WM_QUEUE_DELAYED_TASK: { case WM_QUEUE_DELAYED_TASK: {
std::unique_ptr<DelayedTaskInfo> info( std::unique_ptr<DelayedTaskInfo> info(
reinterpret_cast<DelayedTaskInfo*>(msg.lParam)); reinterpret_cast<DelayedTaskInfo*>(msg.lParam));
bool need_to_schedule_timers = bool need_to_schedule_timers =
timer_tasks_.empty() || timer_tasks_.empty() ||
timer_tasks_.top().due_time() > info->due_time(); timer_tasks_.top().due_time() > info->due_time();
timer_tasks_.emplace(std::move(*info.get())); timer_tasks_.push(std::move(*info));
if (need_to_schedule_timers) { if (need_to_schedule_timers) {
CancelTimers(); CancelTimers();
ScheduleNextTimer(); ScheduleNextTimer();
@ -343,7 +333,7 @@ bool TaskQueueWin::ProcessQueuedMessages() {
::DispatchMessage(&msg); ::DispatchMessage(&msg);
} }
if (GetTick() > start + kMaxTaskProcessingTimeMs) if (CurrentTime() > start + kMaxTaskProcessingTime)
break; break;
} }
return msg.message != WM_QUIT; return msg.message != WM_QUIT;
@ -351,7 +341,7 @@ bool TaskQueueWin::ProcessQueuedMessages() {
void TaskQueueWin::RunDueTasks() { void TaskQueueWin::RunDueTasks() {
RTC_DCHECK(!timer_tasks_.empty()); RTC_DCHECK(!timer_tasks_.empty());
auto now = GetTick(); Timestamp now = CurrentTime();
do { do {
const auto& top = timer_tasks_.top(); const auto& top = timer_tasks_.top();
if (top.due_time() > now) if (top.due_time() > now)
@ -367,8 +357,9 @@ void TaskQueueWin::ScheduleNextTimer() {
return; return;
const auto& next_task = timer_tasks_.top(); const auto& next_task = timer_tasks_.top();
int64_t delay_ms = std::max(0ll, next_task.due_time() - GetTick()); TimeDelta delay =
uint32_t milliseconds = rtc::dchecked_cast<uint32_t>(delay_ms); std::max(TimeDelta::Zero(), next_task.due_time() - CurrentTime());
uint32_t milliseconds = delay.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>();
if (!timer_.StartOneShotTimer(milliseconds)) if (!timer_.StartOneShotTimer(milliseconds))
timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr); timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
} }