Remove libevent task queue.
Previous CLs that disabled the rtc_enable_libevent build flag did not reveal issues. Now continue to remove the source code for the task queue. Bug: webrtc:42224654 Change-Id: I0866b4b56f0a8d8b56a5b604c31a426d77ab8d04 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/370801 Reviewed-by: Stefan Holmer <stefan@webrtc.org> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Commit-Queue: Markus Handell <handellm@webrtc.org> Cr-Commit-Position: refs/heads/main@{#43550}
This commit is contained in:
parent
aa4fced813
commit
74ace1a6e3
4
BUILD.gn
4
BUILD.gn
@ -354,10 +354,6 @@ config("common_config") {
|
||||
defines += [ "WEBRTC_ABSL_MUTEX" ]
|
||||
}
|
||||
|
||||
if (rtc_enable_libevent) {
|
||||
defines += [ "WEBRTC_ENABLE_LIBEVENT" ]
|
||||
}
|
||||
|
||||
if (rtc_disable_logging) {
|
||||
defines += [ "RTC_DISABLE_LOGGING" ]
|
||||
}
|
||||
|
||||
@ -90,21 +90,7 @@ rtc_library("default_task_queue_factory") {
|
||||
"../../rtc_base/memory:always_valid_pointer",
|
||||
]
|
||||
|
||||
if (rtc_enable_libevent) {
|
||||
if (is_android) {
|
||||
sources +=
|
||||
[ "default_task_queue_factory_stdlib_or_libevent_experiment.cc" ]
|
||||
deps += [
|
||||
"../../api/transport:field_trial_based_config",
|
||||
"../../rtc_base:logging",
|
||||
"../../rtc_base:rtc_task_queue_libevent",
|
||||
"../../rtc_base:rtc_task_queue_stdlib",
|
||||
]
|
||||
} else {
|
||||
sources += [ "default_task_queue_factory_libevent.cc" ]
|
||||
deps += [ "../../rtc_base:rtc_task_queue_libevent" ]
|
||||
}
|
||||
} else if (is_mac || is_ios) {
|
||||
if (is_mac || is_ios) {
|
||||
sources += [ "default_task_queue_factory_gcd.cc" ]
|
||||
deps += [ "../../rtc_base:rtc_task_queue_gcd" ]
|
||||
} else if (is_win && current_os != "winuwp" && !build_with_chromium) {
|
||||
|
||||
@ -1,26 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "api/field_trials_view.h"
|
||||
#include "api/task_queue/task_queue_factory.h"
|
||||
#include "api/transport/field_trial_based_config.h"
|
||||
#include "rtc_base/memory/always_valid_pointer.h"
|
||||
#include "rtc_base/task_queue_stdlib.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory(
|
||||
const FieldTrialsView* field_trials_view) {
|
||||
return CreateTaskQueueStdlibFactory();
|
||||
}
|
||||
|
||||
} // namespace webrtc
|
||||
@ -637,34 +637,6 @@ rtc_source_set("rtc_operations_chain") {
|
||||
]
|
||||
}
|
||||
|
||||
if (rtc_enable_libevent) {
|
||||
rtc_library("rtc_task_queue_libevent") {
|
||||
visibility = [ "../api/task_queue:default_task_queue_factory" ]
|
||||
sources = [
|
||||
"task_queue_libevent.cc",
|
||||
"task_queue_libevent.h",
|
||||
]
|
||||
deps = [
|
||||
":checks",
|
||||
":logging",
|
||||
":macromagic",
|
||||
":platform_thread",
|
||||
":platform_thread_types",
|
||||
":safe_conversions",
|
||||
":timeutils",
|
||||
"../api/task_queue",
|
||||
"../api/units:time_delta",
|
||||
"synchronization:mutex",
|
||||
"//third_party/abseil-cpp/absl/container:inlined_vector",
|
||||
"//third_party/abseil-cpp/absl/functional:any_invocable",
|
||||
"//third_party/abseil-cpp/absl/strings:string_view",
|
||||
]
|
||||
if (rtc_build_libevent) {
|
||||
deps += [ "//third_party/libevent" ]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (is_mac || is_ios) {
|
||||
rtc_library("rtc_task_queue_gcd") {
|
||||
visibility = [ "../api/task_queue:default_task_queue_factory" ]
|
||||
|
||||
@ -1,336 +0,0 @@
|
||||
/*
|
||||
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include "rtc_base/task_queue_libevent.h"
|
||||
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <pthread.h>
|
||||
#include <signal.h>
|
||||
#include <stdint.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include "absl/container/inlined_vector.h"
|
||||
#include "absl/functional/any_invocable.h"
|
||||
#include "absl/strings/string_view.h"
|
||||
#include "api/task_queue/task_queue_base.h"
|
||||
#include "api/units/time_delta.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/numerics/safe_conversions.h"
|
||||
#include "rtc_base/platform_thread.h"
|
||||
#include "rtc_base/platform_thread_types.h"
|
||||
#include "rtc_base/synchronization/mutex.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
#include "rtc_base/time_utils.h"
|
||||
#include "third_party/libevent/event.h"
|
||||
|
||||
namespace webrtc {
|
||||
namespace {
|
||||
constexpr char kQuit = 1;
|
||||
constexpr char kRunTasks = 2;
|
||||
|
||||
using Priority = TaskQueueFactory::Priority;
|
||||
|
||||
// This ignores the SIGPIPE signal on the calling thread.
|
||||
// This signal can be fired when trying to write() to a pipe that's being
|
||||
// closed or while closing a pipe that's being written to.
|
||||
// We can run into that situation so we ignore this signal and continue as
|
||||
// normal.
|
||||
// As a side note for this implementation, it would be great if we could safely
|
||||
// restore the sigmask, but unfortunately the operation of restoring it, can
|
||||
// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
|
||||
// The SIGPIPE signal by default causes the process to be terminated, so we
|
||||
// don't want to risk that.
|
||||
// An alternative to this approach is to ignore the signal for the whole
|
||||
// process:
|
||||
// signal(SIGPIPE, SIG_IGN);
|
||||
void IgnoreSigPipeSignalOnCurrentThread() {
|
||||
sigset_t sigpipe_mask;
|
||||
sigemptyset(&sigpipe_mask);
|
||||
sigaddset(&sigpipe_mask, SIGPIPE);
|
||||
pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
|
||||
}
|
||||
|
||||
bool SetNonBlocking(int fd) {
|
||||
const int flags = fcntl(fd, F_GETFL);
|
||||
RTC_CHECK(flags != -1);
|
||||
return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
|
||||
}
|
||||
|
||||
// TODO(tommi): This is a hack to support two versions of libevent that we're
|
||||
// compatible with. The method we really want to call is event_assign(),
|
||||
// since event_set() has been marked as deprecated (and doesn't accept
|
||||
// passing event_base__ as a parameter). However, the version of libevent
|
||||
// that we have in Chromium, doesn't have event_assign(), so we need to call
|
||||
// event_set() there.
|
||||
void EventAssign(struct event* ev,
|
||||
struct event_base* base,
|
||||
int fd,
|
||||
short events,
|
||||
void (*callback)(int, short, void*),
|
||||
void* arg) {
|
||||
#if defined(_EVENT2_EVENT_H_)
|
||||
RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
|
||||
#else
|
||||
event_set(ev, fd, events, callback, arg);
|
||||
RTC_CHECK_EQ(0, event_base_set(base, ev));
|
||||
#endif
|
||||
}
|
||||
|
||||
rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
|
||||
switch (priority) {
|
||||
case Priority::HIGH:
|
||||
return rtc::ThreadPriority::kRealtime;
|
||||
case Priority::LOW:
|
||||
return rtc::ThreadPriority::kLow;
|
||||
case Priority::NORMAL:
|
||||
return rtc::ThreadPriority::kNormal;
|
||||
}
|
||||
}
|
||||
|
||||
class TaskQueueLibevent final : public TaskQueueBase {
|
||||
public:
|
||||
TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
|
||||
|
||||
void Delete() override;
|
||||
|
||||
protected:
|
||||
void PostTaskImpl(absl::AnyInvocable<void() &&> task,
|
||||
const PostTaskTraits& traits,
|
||||
const Location& location) override;
|
||||
void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay,
|
||||
const PostDelayedTaskTraits& traits,
|
||||
const Location& location) override;
|
||||
|
||||
private:
|
||||
struct TimerEvent;
|
||||
|
||||
void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay);
|
||||
|
||||
~TaskQueueLibevent() override = default;
|
||||
|
||||
static void OnWakeup(int socket, short flags, void* context); // NOLINT
|
||||
static void RunTimer(int fd, short flags, void* context); // NOLINT
|
||||
|
||||
bool is_active_ = true;
|
||||
int wakeup_pipe_in_ = -1;
|
||||
int wakeup_pipe_out_ = -1;
|
||||
event_base* event_base_;
|
||||
event wakeup_event_;
|
||||
rtc::PlatformThread thread_;
|
||||
Mutex pending_lock_;
|
||||
absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
|
||||
RTC_GUARDED_BY(pending_lock_);
|
||||
// Holds a list of events pending timers for cleanup when the loop exits.
|
||||
std::list<TimerEvent*> pending_timers_;
|
||||
};
|
||||
|
||||
struct TaskQueueLibevent::TimerEvent {
|
||||
TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
|
||||
: task_queue(task_queue), task(std::move(task)) {}
|
||||
~TimerEvent() { event_del(&ev); }
|
||||
|
||||
event ev;
|
||||
TaskQueueLibevent* task_queue;
|
||||
absl::AnyInvocable<void() &&> task;
|
||||
};
|
||||
|
||||
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
|
||||
rtc::ThreadPriority priority)
|
||||
: event_base_(event_base_new()) {
|
||||
int fds[2];
|
||||
RTC_CHECK(pipe(fds) == 0);
|
||||
SetNonBlocking(fds[0]);
|
||||
SetNonBlocking(fds[1]);
|
||||
wakeup_pipe_out_ = fds[0];
|
||||
wakeup_pipe_in_ = fds[1];
|
||||
|
||||
EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
|
||||
EV_READ | EV_PERSIST, OnWakeup, this);
|
||||
event_add(&wakeup_event_, 0);
|
||||
thread_ = rtc::PlatformThread::SpawnJoinable(
|
||||
[this] {
|
||||
{
|
||||
CurrentTaskQueueSetter set_current(this);
|
||||
while (is_active_)
|
||||
event_base_loop(event_base_, 0);
|
||||
|
||||
// Ensure remaining deleted tasks are destroyed with Current() set up
|
||||
// to this task queue.
|
||||
absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending;
|
||||
MutexLock lock(&pending_lock_);
|
||||
pending_.swap(pending);
|
||||
}
|
||||
for (TimerEvent* timer : pending_timers_)
|
||||
delete timer;
|
||||
|
||||
#if RTC_DCHECK_IS_ON
|
||||
MutexLock lock(&pending_lock_);
|
||||
RTC_DCHECK(pending_.empty());
|
||||
#endif
|
||||
},
|
||||
queue_name, rtc::ThreadAttributes().SetPriority(priority));
|
||||
}
|
||||
|
||||
void TaskQueueLibevent::Delete() {
|
||||
RTC_DCHECK(!IsCurrent());
|
||||
struct timespec ts;
|
||||
char message = kQuit;
|
||||
while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
|
||||
// The queue is full, so we have no choice but to wait and retry.
|
||||
RTC_CHECK_EQ(EAGAIN, errno);
|
||||
ts.tv_sec = 0;
|
||||
ts.tv_nsec = 1000000;
|
||||
nanosleep(&ts, nullptr);
|
||||
}
|
||||
|
||||
thread_.Finalize();
|
||||
|
||||
event_del(&wakeup_event_);
|
||||
|
||||
IgnoreSigPipeSignalOnCurrentThread();
|
||||
|
||||
close(wakeup_pipe_in_);
|
||||
close(wakeup_pipe_out_);
|
||||
wakeup_pipe_in_ = -1;
|
||||
wakeup_pipe_out_ = -1;
|
||||
|
||||
event_base_free(event_base_);
|
||||
delete this;
|
||||
}
|
||||
|
||||
void TaskQueueLibevent::PostTaskImpl(absl::AnyInvocable<void() &&> task,
|
||||
const PostTaskTraits& traits,
|
||||
const Location& location) {
|
||||
{
|
||||
MutexLock lock(&pending_lock_);
|
||||
bool had_pending_tasks = !pending_.empty();
|
||||
pending_.push_back(std::move(task));
|
||||
|
||||
// Only write to the pipe if there were no pending tasks before this one
|
||||
// since the thread could be sleeping. If there were already pending tasks
|
||||
// then we know there's either a pending write in the pipe or the thread has
|
||||
// not yet processed the pending tasks. In either case, the thread will
|
||||
// eventually wake up and process all pending tasks including this one.
|
||||
if (had_pending_tasks) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Note: This behvior outlined above ensures we never fill up the pipe write
|
||||
// buffer since there will only ever be 1 byte pending.
|
||||
char message = kRunTasks;
|
||||
RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
|
||||
sizeof(message));
|
||||
}
|
||||
|
||||
void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
|
||||
absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay) {
|
||||
// libevent api is not thread safe by default, thus event_add need to be
|
||||
// called on the `thread_`.
|
||||
RTC_DCHECK(IsCurrent());
|
||||
|
||||
TimerEvent* timer = new TimerEvent(this, std::move(task));
|
||||
EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
|
||||
timer);
|
||||
pending_timers_.push_back(timer);
|
||||
timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
|
||||
.tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
|
||||
event_add(&timer->ev, &tv);
|
||||
}
|
||||
|
||||
void TaskQueueLibevent::PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
|
||||
TimeDelta delay,
|
||||
const PostDelayedTaskTraits& traits,
|
||||
const Location& location) {
|
||||
if (IsCurrent()) {
|
||||
PostDelayedTaskOnTaskQueue(std::move(task), delay);
|
||||
} else {
|
||||
int64_t posted_us = rtc::TimeMicros();
|
||||
PostTask([posted_us, delay, task = std::move(task), this]() mutable {
|
||||
// Compensate for the time that has passed since the posting.
|
||||
TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
|
||||
PostDelayedTaskOnTaskQueue(
|
||||
std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// static
|
||||
void TaskQueueLibevent::OnWakeup(int socket,
|
||||
short flags, // NOLINT
|
||||
void* context) {
|
||||
TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
|
||||
RTC_DCHECK(me->wakeup_pipe_out_ == socket);
|
||||
char buf;
|
||||
RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
|
||||
switch (buf) {
|
||||
case kQuit:
|
||||
me->is_active_ = false;
|
||||
event_base_loopbreak(me->event_base_);
|
||||
break;
|
||||
case kRunTasks: {
|
||||
absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
|
||||
{
|
||||
MutexLock lock(&me->pending_lock_);
|
||||
tasks.swap(me->pending_);
|
||||
}
|
||||
RTC_DCHECK(!tasks.empty());
|
||||
for (auto& task : tasks) {
|
||||
std::move(task)();
|
||||
// Prefer to delete the `task` before running the next one.
|
||||
task = nullptr;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
RTC_DCHECK_NOTREACHED();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// static
|
||||
void TaskQueueLibevent::RunTimer(int fd,
|
||||
short flags, // NOLINT
|
||||
void* context) {
|
||||
TimerEvent* timer = static_cast<TimerEvent*>(context);
|
||||
std::move(timer->task)();
|
||||
timer->task_queue->pending_timers_.remove(timer);
|
||||
delete timer;
|
||||
}
|
||||
|
||||
class TaskQueueLibeventFactory final : public TaskQueueFactory {
|
||||
public:
|
||||
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
|
||||
absl::string_view name,
|
||||
Priority priority) const override {
|
||||
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
|
||||
new TaskQueueLibevent(name,
|
||||
TaskQueuePriorityToThreadPriority(priority)));
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
|
||||
return std::make_unique<TaskQueueLibeventFactory>();
|
||||
}
|
||||
|
||||
} // namespace webrtc
|
||||
@ -1,24 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#ifndef RTC_BASE_TASK_QUEUE_LIBEVENT_H_
|
||||
#define RTC_BASE_TASK_QUEUE_LIBEVENT_H_
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "api/task_queue/task_queue_factory.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory();
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
#endif // RTC_BASE_TASK_QUEUE_LIBEVENT_H_
|
||||
@ -1096,18 +1096,7 @@ TEST(FrameCadenceAdapterRealTimeTest, TimestampsDoNotDrift) {
|
||||
finalized.Wait(rtc::Event::kForever);
|
||||
}
|
||||
|
||||
// TODO(bugs.webrtc.org/15462) Disable ScheduledRepeatAllowsForSlowEncode for
|
||||
// TaskQueueLibevent.
|
||||
#if defined(WEBRTC_ENABLE_LIBEVENT)
|
||||
#define MAYBE_ScheduledRepeatAllowsForSlowEncode \
|
||||
DISABLED_ScheduledRepeatAllowsForSlowEncode
|
||||
#else
|
||||
#define MAYBE_ScheduledRepeatAllowsForSlowEncode \
|
||||
ScheduledRepeatAllowsForSlowEncode
|
||||
#endif
|
||||
|
||||
TEST(FrameCadenceAdapterRealTimeTest,
|
||||
MAYBE_ScheduledRepeatAllowsForSlowEncode) {
|
||||
TEST(FrameCadenceAdapterRealTimeTest, ScheduledRepeatAllowsForSlowEncode) {
|
||||
// This regression test must be performed in realtime because of limitations
|
||||
// in GlobalSimulatedTimeController.
|
||||
//
|
||||
|
||||
@ -288,11 +288,6 @@ declare_args() {
|
||||
rtc_build_opus = !build_with_mozilla
|
||||
rtc_build_ssl = !build_with_mozilla
|
||||
|
||||
# Disable libevent task queue unconditionally.
|
||||
# TODO: bugs.webrtc.org/42224654 clean the flags up.
|
||||
rtc_enable_libevent = false
|
||||
rtc_build_libevent = false
|
||||
|
||||
# Excluded in Chromium since its prerequisites don't require Pulse Audio.
|
||||
rtc_include_pulse_audio = !build_with_chromium
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user