diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 9c4cfa9269..d09dde83ac 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -496,6 +496,7 @@ if (rtc_enable_libevent) { ":safe_conversions", ":timeutils", "../api/task_queue", + "//third_party/abseil-cpp/absl/container:inlined_vector", "//third_party/abseil-cpp/absl/strings", ] if (rtc_build_libevent) { diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc index 7638869bbc..349a5f21fc 100644 --- a/rtc_base/task_queue_libevent.cc +++ b/rtc_base/task_queue_libevent.cc @@ -23,6 +23,7 @@ #include #include +#include "absl/container/inlined_vector.h" #include "absl/strings/string_view.h" #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" @@ -39,7 +40,7 @@ namespace webrtc { namespace { constexpr char kQuit = 1; -constexpr char kRunTask = 2; +constexpr char kRunTasks = 2; using Priority = TaskQueueFactory::Priority; @@ -130,7 +131,8 @@ class TaskQueueLibevent final : public TaskQueueBase { event wakeup_event_; rtc::PlatformThread thread_; rtc::CriticalSection pending_lock_; - std::list> pending_ RTC_GUARDED_BY(pending_lock_); + absl::InlinedVector, 4> pending_ + RTC_GUARDED_BY(pending_lock_); // Holds a list of events pending timers for cleanup when the loop exits. std::list pending_timers_; }; @@ -213,19 +215,26 @@ void TaskQueueLibevent::Delete() { } void TaskQueueLibevent::PostTask(std::unique_ptr task) { - QueuedTask* task_id = task.get(); // Only used for comparison. { rtc::CritScope lock(&pending_lock_); + bool had_pending_tasks = !pending_.empty(); pending_.push_back(std::move(task)); + + // Only write to the pipe if there were no pending tasks before this one + // since the thread could be sleeping. If there were already pending tasks + // then we know there's either a pending write in the pipe or the thread has + // not yet processed the pending tasks. In either case, the thread will + // eventually wake up and process all pending tasks including this one. + if (had_pending_tasks) { + return; + } } - char message = kRunTask; - if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { - RTC_LOG(WARNING) << "Failed to queue task."; - rtc::CritScope lock(&pending_lock_); - pending_.remove_if([task_id](std::unique_ptr& t) { - return t.get() == task_id; - }); - } + + // Note: This behvior outlined above ensures we never fill up the pipe write + // buffer since there will only ever be 1 byte pending. + char message = kRunTasks; + RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)), + sizeof(message)); } void TaskQueueLibevent::PostDelayedTask(std::unique_ptr task, @@ -270,17 +279,21 @@ void TaskQueueLibevent::OnWakeup(int socket, me->is_active_ = false; event_base_loopbreak(me->event_base_); break; - case kRunTask: { - std::unique_ptr task; + case kRunTasks: { + absl::InlinedVector, 4> tasks; { rtc::CritScope lock(&me->pending_lock_); - RTC_DCHECK(!me->pending_.empty()); - task = std::move(me->pending_.front()); - me->pending_.pop_front(); - RTC_DCHECK(task.get()); + tasks.swap(me->pending_); + } + RTC_DCHECK(!tasks.empty()); + for (auto& task : tasks) { + if (task->Run()) { + task.reset(); + } else { + // |false| means the task should *not* be deleted. + task.release(); + } } - if (!task->Run()) - task.release(); break; } default: