From 30c2a31309afc6b1d523e8a68ea6aa29f8661970 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Tue, 19 Jul 2022 14:12:43 +0200 Subject: [PATCH] Update TaskQueueLibevent implementation to absl::AnyInvocable Bug: webrtc:14245, webrtc:12889 Change-Id: I1aa20e3d5645c270abd1bee0c45c6982e799eaa4 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268767 Reviewed-by: Tomas Gunnarsson Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#37563} --- rtc_base/BUILD.gn | 2 + rtc_base/task_queue_libevent.cc | 102 ++++++++++++++++---------------- 2 files changed, 53 insertions(+), 51 deletions(-) diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 322fd7a70f..546a6d6c51 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -645,10 +645,12 @@ if (rtc_enable_libevent) { ":safe_conversions", ":timeutils", "../api/task_queue", + "../api/units:time_delta", "synchronization:mutex", ] absl_deps = [ "//third_party/abseil-cpp/absl/container:inlined_vector", + "//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/strings", ] if (rtc_build_libevent) { diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index ba80a64f9b..f50e5a63df 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -24,9 +24,10 @@ #include #include "absl/container/inlined_vector.h" +#include "absl/functional/any_invocable.h" #include "absl/strings/string_view.h" -#include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/numerics/safe_conversions.h" @@ -106,14 +107,18 @@ class TaskQueueLibevent final : public TaskQueueBase { TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority); void Delete() override; - void PostTask(std::unique_ptr task) override; - void PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) override; + void PostTask(absl::AnyInvocable task) override; + void PostDelayedTask(absl::AnyInvocable task, + TimeDelta delay) override; + void PostDelayedHighPrecisionTask(absl::AnyInvocable task, + TimeDelta delay) override; private: - class SetTimerTask; struct TimerEvent; + void PostDelayedTaskOnTaskQueue(absl::AnyInvocable task, + TimeDelta delay); + ~TaskQueueLibevent() override = default; static void OnWakeup(int socket, short flags, void* context); // NOLINT @@ -126,43 +131,20 @@ class TaskQueueLibevent final : public TaskQueueBase { event wakeup_event_; rtc::PlatformThread thread_; Mutex pending_lock_; - absl::InlinedVector, 4> pending_ + absl::InlinedVector, 4> pending_ RTC_GUARDED_BY(pending_lock_); // Holds a list of events pending timers for cleanup when the loop exits. std::list pending_timers_; }; struct TaskQueueLibevent::TimerEvent { - TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr task) + TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable task) : task_queue(task_queue), task(std::move(task)) {} ~TimerEvent() { event_del(&ev); } event ev; TaskQueueLibevent* task_queue; - std::unique_ptr task; -}; - -class TaskQueueLibevent::SetTimerTask : public QueuedTask { - public: - SetTimerTask(std::unique_ptr task, uint32_t milliseconds) - : task_(std::move(task)), - milliseconds_(milliseconds), - posted_(rtc::Time32()) {} - - private: - bool Run() override { - // Compensate for the time that has passed since construction - // and until we got here. - uint32_t post_time = rtc::Time32() - posted_; - TaskQueueLibevent::Current()->PostDelayedTask( - std::move(task_), - post_time > milliseconds_ ? 0 : milliseconds_ - post_time); - return true; - } - - std::unique_ptr task_; - const uint32_t milliseconds_; - const uint32_t posted_; + absl::AnyInvocable task; }; TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name, @@ -219,7 +201,7 @@ void TaskQueueLibevent::Delete() { delete this; } -void TaskQueueLibevent::PostTask(std::unique_ptr task) { +void TaskQueueLibevent::PostTask(absl::AnyInvocable task) { { MutexLock lock(&pending_lock_); bool had_pending_tasks = !pending_.empty(); @@ -242,21 +224,43 @@ void TaskQueueLibevent::PostTask(std::unique_ptr task) { sizeof(message)); } -void TaskQueueLibevent::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { +void TaskQueueLibevent::PostDelayedTaskOnTaskQueue( + absl::AnyInvocable task, + TimeDelta delay) { + // libevent api is not thread safe by default, thus event_add need to be + // called on the `thread_`. + RTC_DCHECK(IsCurrent()); + + TimerEvent* timer = new TimerEvent(this, std::move(task)); + EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer, + timer); + pending_timers_.push_back(timer); + timeval tv = {.tv_sec = rtc::dchecked_cast(delay.us() / 1'000'000), + .tv_usec = rtc::dchecked_cast(delay.us() % 1'000'000)}; + event_add(&timer->ev, &tv); +} + +void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable task, + TimeDelta delay) { if (IsCurrent()) { - TimerEvent* timer = new TimerEvent(this, std::move(task)); - EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer, - timer); - pending_timers_.push_back(timer); - timeval tv = {rtc::dchecked_cast(milliseconds / 1000), - rtc::dchecked_cast(milliseconds % 1000) * 1000}; - event_add(&timer->ev, &tv); + PostDelayedTaskOnTaskQueue(std::move(task), delay); } else { - PostTask(std::make_unique(std::move(task), milliseconds)); + int64_t posted_us = rtc::TimeMicros(); + PostTask([posted_us, delay, task = std::move(task), this]() mutable { + // Compensate for the time that has passed since the posting. + TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us); + PostDelayedTaskOnTaskQueue( + std::move(task), std::max(delay - post_time, TimeDelta::Zero())); + }); } } +void TaskQueueLibevent::PostDelayedHighPrecisionTask( + absl::AnyInvocable task, + TimeDelta delay) { + PostDelayedTask(std::move(task), delay); +} + // static void TaskQueueLibevent::OnWakeup(int socket, short flags, // NOLINT @@ -271,19 +275,16 @@ void TaskQueueLibevent::OnWakeup(int socket, event_base_loopbreak(me->event_base_); break; case kRunTasks: { - absl::InlinedVector, 4> tasks; + absl::InlinedVector, 4> tasks; { MutexLock lock(&me->pending_lock_); tasks.swap(me->pending_); } RTC_DCHECK(!tasks.empty()); for (auto& task : tasks) { - if (task->Run()) { - task.reset(); - } else { - // `false` means the task should *not* be deleted. - task.release(); - } + std::move(task)(); + // Prefer to delete the `task` before running the next one. + task = nullptr; } break; } @@ -298,8 +299,7 @@ void TaskQueueLibevent::RunTimer(int fd, short flags, // NOLINT void* context) { TimerEvent* timer = static_cast(context); - if (!timer->task->Run()) - timer->task.release(); + std::move(timer->task)(); timer->task_queue->pending_timers_.remove(timer); delete timer; }