From 9a83dd729bf5dc604d44da2e937ae4079781eaf2 Mon Sep 17 00:00:00 2001 From: Steve Anton Date: Thu, 9 Jan 2020 11:03:25 -0800 Subject: [PATCH] Batch process pending tasks in the libevent TaskQueue This change improves performance under high load by processing all pending tasks each time the thread is woken up by libevent. Additionally, the pipe used to wake up the TaskQueue thread now not be written to if there's already a pending write on the pipe. This fixes a bug where under high load the pipe write buffer can fill and cause tasks to get dropped. Bug: webrtc:11259, webrtc:8876 Change-Id: Ic82978c71bf9e9a25f281ca4775d46168d161d4e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165420 Commit-Queue: Steve Anton Reviewed-by: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#30202} --- rtc_base/BUILD.gn | 1 + rtc_base/task_queue_libevent.cc | 51 +++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 9c4cfa9269..d09dde83ac 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -496,6 +496,7 @@ if (rtc_enable_libevent) { ":safe_conversions", ":timeutils", "../api/task_queue", + "//third_party/abseil-cpp/absl/container:inlined_vector", "//third_party/abseil-cpp/absl/strings", ] if (rtc_build_libevent) { diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 7638869bbc..349a5f21fc 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -23,6 +23,7 @@ #include #include +#include "absl/container/inlined_vector.h" #include "absl/strings/string_view.h" #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" @@ -39,7 +40,7 @@ namespace webrtc { namespace { constexpr char kQuit = 1; -constexpr char kRunTask = 2; +constexpr char kRunTasks = 2; using Priority = TaskQueueFactory::Priority; @@ -130,7 +131,8 @@ class TaskQueueLibevent final : public TaskQueueBase { event wakeup_event_; rtc::PlatformThread thread_; rtc::CriticalSection pending_lock_; - std::list> pending_ RTC_GUARDED_BY(pending_lock_); + absl::InlinedVector, 4> pending_ + RTC_GUARDED_BY(pending_lock_); // Holds a list of events pending timers for cleanup when the loop exits. std::list pending_timers_; }; @@ -213,19 +215,26 @@ void TaskQueueLibevent::Delete() { } void TaskQueueLibevent::PostTask(std::unique_ptr task) { - QueuedTask* task_id = task.get(); // Only used for comparison. { rtc::CritScope lock(&pending_lock_); + bool had_pending_tasks = !pending_.empty(); pending_.push_back(std::move(task)); + + // Only write to the pipe if there were no pending tasks before this one + // since the thread could be sleeping. If there were already pending tasks + // then we know there's either a pending write in the pipe or the thread has + // not yet processed the pending tasks. In either case, the thread will + // eventually wake up and process all pending tasks including this one. + if (had_pending_tasks) { + return; + } } - char message = kRunTask; - if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { - RTC_LOG(WARNING) << "Failed to queue task."; - rtc::CritScope lock(&pending_lock_); - pending_.remove_if([task_id](std::unique_ptr& t) { - return t.get() == task_id; - }); - } + + // Note: This behvior outlined above ensures we never fill up the pipe write + // buffer since there will only ever be 1 byte pending. + char message = kRunTasks; + RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)), + sizeof(message)); } void TaskQueueLibevent::PostDelayedTask(std::unique_ptr task, @@ -270,17 +279,21 @@ void TaskQueueLibevent::OnWakeup(int socket, me->is_active_ = false; event_base_loopbreak(me->event_base_); break; - case kRunTask: { - std::unique_ptr task; + case kRunTasks: { + absl::InlinedVector, 4> tasks; { rtc::CritScope lock(&me->pending_lock_); - RTC_DCHECK(!me->pending_.empty()); - task = std::move(me->pending_.front()); - me->pending_.pop_front(); - RTC_DCHECK(task.get()); + tasks.swap(me->pending_); + } + RTC_DCHECK(!tasks.empty()); + for (auto& task : tasks) { + if (task->Run()) { + task.reset(); + } else { + // |false| means the task should *not* be deleted. + task.release(); + } } - if (!task->Run()) - task.release(); break; } default: