From 00e71ef49e9138c3253a3f22ff0c717033bbb986 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Tue, 11 Jun 2019 18:01:56 +0200 Subject: [PATCH] Fix TaskQueueLibevent::PostTask when used on the same TaskQueue Stop using event_base_once because it doesn't guarantee to free QueuedTask when task not run and thus may break TaskQueue guarantee all posted tasks are eventually deleted Bug: webrtc:10731, webrtc:10278 Change-Id: Id073a6092cf603cac5768da7a0770371053b20cc Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/141420 Reviewed-by: Karl Wiberg Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#28241} --- rtc_base/task_queue_libevent.cc | 44 +++++++++------------------------ 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 38704c89ae..9a01f464c1 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -121,7 +121,6 @@ class TaskQueueLibevent final : public TaskQueueBase { static void ThreadMain(void* context); static void OnWakeup(int socket, short flags, void* context); // NOLINT - static void RunTask(int fd, short flags, void* context); // NOLINT static void RunTimer(int fd, short flags, void* context); // NOLINT bool is_active_ = true; @@ -214,30 +213,18 @@ void TaskQueueLibevent::Delete() { } void TaskQueueLibevent::PostTask(std::unique_ptr task) { - RTC_DCHECK(task.get()); - // libevent isn't thread safe. This means that we can't use methods such - // as event_base_once to post tasks to the worker thread from a different - // thread. However, we can use it when posting from the worker thread itself. - if (IsCurrent()) { - if (event_base_once(event_base_, -1, EV_TIMEOUT, - &TaskQueueLibevent::RunTask, task.get(), - nullptr) == 0) { - task.release(); - } - } else { - QueuedTask* task_id = task.get(); // Only used for comparison. - { - rtc::CritScope lock(&pending_lock_); - pending_.push_back(std::move(task)); - } - char message = kRunTask; - if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { - RTC_LOG(WARNING) << "Failed to queue task."; - rtc::CritScope lock(&pending_lock_); - pending_.remove_if([task_id](std::unique_ptr& t) { - return t.get() == task_id; - }); - } + QueuedTask* task_id = task.get(); // Only used for comparison. + { + rtc::CritScope lock(&pending_lock_); + pending_.push_back(std::move(task)); + } + char message = kRunTask; + if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { + RTC_LOG(WARNING) << "Failed to queue task."; + rtc::CritScope lock(&pending_lock_); + pending_.remove_if([task_id](std::unique_ptr& t) { + return t.get() == task_id; + }); } } @@ -302,13 +289,6 @@ void TaskQueueLibevent::OnWakeup(int socket, } } -// static -void TaskQueueLibevent::RunTask(int fd, short flags, void* context) { // NOLINT - auto* task = static_cast(context); - if (task->Run()) - delete task; -} - // static void TaskQueueLibevent::RunTimer(int fd, short flags, // NOLINT