This cl refactor TaskQueues to use a PIMPL implementation on linux/Android.

In later steps the Win/Mac implementation will also be refactored.

The rtc_task_queue target is split up in three separate targets:

rtc_task_queue_api:
Contains the header file task_queue.h but no implementation.
Only external TaskQueue implementations should directly depend on this target.

rtc_task_queue_impl:
Contains the default implementation of task_queue.h.
Only external application targets should directly depend on this target.

rtc_task_queue:
WebRTC targets should depend on this target. It unconditionally depend on rtc_task_queue_api and depending on the new build flag,|rtc_link_task_queue_impl|,  depend on rtc_task_queue_impl.

BUG=webrtc:8160

Review-Url: https://codereview.webrtc.org/3003643002
Cr-Commit-Position: refs/heads/master@{#19516}
This commit is contained in:
perkj 2017-08-25 05:00:11 -07:00 committed by Commit Bot
parent ee95f87488
commit 650fdae91c
4 changed files with 195 additions and 83 deletions

View File

@ -295,42 +295,71 @@ config("enable_libevent_config") {
defines = [ "WEBRTC_BUILD_LIBEVENT" ]
}
rtc_static_library("rtc_task_queue") {
rtc_source_set("rtc_task_queue") {
public_deps = [
":rtc_base_approved",
":rtc_task_queue_api",
]
if (rtc_link_task_queue_impl) {
deps = [
":rtc_task_queue_impl",
]
}
}
# WebRTC targets must not directly depend on rtc_task_queue_api or
# rtc_task_queue_impl. Instead, depend on rtc_task_queue.
# The build flag |rtc_link_task_queue_impl| decides if WebRTC targets will link
# to the default implemenation in rtc_task_queue_impl or if an externally
# provided implementation should be used. An external implementation should
# depend on rtc_task_queue_api.
rtc_source_set("rtc_task_queue_api") {
if (build_with_chromium) {
sources = [
"../../webrtc_overrides/webrtc/rtc_base/task_queue.cc",
"../../webrtc_overrides/webrtc/rtc_base/task_queue.h",
]
} else {
sources = [
"task_queue.h",
"task_queue_posix.h",
]
if (rtc_build_libevent) {
deps = [
"//base/third_party/libevent",
]
}
}
deps = [
":rtc_base_approved",
]
}
rtc_source_set("rtc_task_queue_impl") {
deps = [
":rtc_base_approved",
":rtc_task_queue_api",
]
if (build_with_chromium) {
sources = [
"../../webrtc_overrides/webrtc/rtc_base/task_queue.cc",
]
} else {
if (rtc_build_libevent) {
deps += [ "//base/third_party/libevent" ]
}
if (rtc_enable_libevent) {
sources += [
sources = [
"task_queue_libevent.cc",
"task_queue_posix.cc",
"task_queue_posix.h",
]
all_dependent_configs = [ ":enable_libevent_config" ]
} else {
if (is_mac || is_ios) {
sources += [
sources = [
"task_queue_gcd.cc",
"task_queue_posix.cc",
]
}
if (is_win) {
sources += [ "task_queue_win.cc" ]
sources = [
"task_queue_win.cc",
]
}
}
}

View File

@ -15,23 +15,16 @@
#include <memory>
#include <queue>
#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT)
#if defined(WEBRTC_MAC)
#include <dispatch/dispatch.h>
#endif
#include "webrtc/rtc_base/constructormagic.h"
#include "webrtc/rtc_base/criticalsection.h"
#if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT)
#include "webrtc/rtc_base/platform_thread.h"
#endif
#if defined(WEBRTC_BUILD_LIBEVENT)
#include "webrtc/rtc_base/refcountedobject.h"
#include "webrtc/rtc_base/scoped_ref_ptr.h"
struct event_base;
struct event;
#if defined(WEBRTC_WIN)
#include "webrtc/rtc_base/platform_thread.h"
#endif
namespace rtc {
@ -242,32 +235,7 @@ class LOCKABLE TaskQueue {
}
private:
#if defined(WEBRTC_BUILD_LIBEVENT)
static void ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT
static void RunTask(int fd, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
class ReplyTaskOwner;
class PostAndReplyTask;
class SetTimerTask;
typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
struct QueueContext;
int wakeup_pipe_in_ = -1;
int wakeup_pipe_out_ = -1;
event_base* event_base_;
std::unique_ptr<event> wakeup_event_;
PlatformThread thread_;
rtc::CriticalSection pending_lock_;
std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
GUARDED_BY(pending_lock_);
#elif defined(WEBRTC_MAC)
#if defined(WEBRTC_MAC)
struct QueueContext;
struct TaskContext;
struct PostTaskAndReplyContext;
@ -295,7 +263,8 @@ class LOCKABLE TaskQueue {
std::queue<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
HANDLE in_queue_;
#else
#error not supported.
class Impl;
const scoped_refptr<Impl> impl_;
#endif
RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);

View File

@ -18,7 +18,11 @@
#include "base/third_party/libevent/event.h"
#include "webrtc/rtc_base/checks.h"
#include "webrtc/rtc_base/logging.h"
#include "webrtc/rtc_base/platform_thread.h"
#include "webrtc/rtc_base/refcount.h"
#include "webrtc/rtc_base/refcountedobject.h"
#include "webrtc/rtc_base/safe_conversions.h"
#include "webrtc/rtc_base/task_queue.h"
#include "webrtc/rtc_base/task_queue_posix.h"
#include "webrtc/rtc_base/timeutils.h"
@ -104,9 +108,57 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
}
} // namespace
struct TaskQueue::QueueContext {
explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
TaskQueue* queue;
class TaskQueue::Impl : public RefCountInterface {
public:
explicit Impl(const char* queue_name,
TaskQueue* queue,
Priority priority = Priority::NORMAL);
~Impl() override;
static TaskQueue::Impl* Current();
static TaskQueue* CurrentQueue();
// Used for DCHECKing the current queue.
static bool IsCurrent(const char* queue_name);
bool IsCurrent() const;
void PostTask(std::unique_ptr<QueuedTask> task);
void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue::Impl* reply_queue);
void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
private:
static void ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT
static void RunTask(int fd, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
class ReplyTaskOwner;
class PostAndReplyTask;
class SetTimerTask;
typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
struct QueueContext;
TaskQueue* const queue_;
int wakeup_pipe_in_ = -1;
int wakeup_pipe_out_ = -1;
event_base* event_base_;
std::unique_ptr<event> wakeup_event_;
PlatformThread thread_;
rtc::CriticalSection pending_lock_;
std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
GUARDED_BY(pending_lock_);
};
struct TaskQueue::Impl::QueueContext {
explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
TaskQueue::Impl* queue;
bool is_active;
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
@ -135,7 +187,7 @@ struct TaskQueue::QueueContext {
// * if set_should_run_task() was called, the reply task will be run
// * Release the reference to ReplyTaskOwner
// * ReplyTaskOwner and associated |reply_| are deleted.
class TaskQueue::ReplyTaskOwner {
class TaskQueue::Impl::ReplyTaskOwner {
public:
ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
: reply_(std::move(reply)) {}
@ -159,11 +211,11 @@ class TaskQueue::ReplyTaskOwner {
bool run_task_ = false;
};
class TaskQueue::PostAndReplyTask : public QueuedTask {
class TaskQueue::Impl::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue,
TaskQueue::Impl* reply_queue,
int reply_pipe)
: task_(std::move(task)),
reply_pipe_(reply_pipe),
@ -196,7 +248,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask {
scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
};
class TaskQueue::SetTimerTask : public QueuedTask {
class TaskQueue::Impl::SetTimerTask : public QueuedTask {
public:
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
: task_(std::move(task)),
@ -208,7 +260,7 @@ class TaskQueue::SetTimerTask : public QueuedTask {
// Compensate for the time that has passed since construction
// and until we got here.
uint32_t post_time = Time32() - posted_;
TaskQueue::Current()->PostDelayedTask(
TaskQueue::Impl::Current()->PostDelayedTask(
std::move(task_),
post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
return true;
@ -219,10 +271,13 @@ class TaskQueue::SetTimerTask : public QueuedTask {
const uint32_t posted_;
};
TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: event_base_(event_base_new()),
TaskQueue::Impl::Impl(const char* queue_name,
TaskQueue* queue,
Priority priority /*= NORMAL*/)
: queue_(queue),
event_base_(event_base_new()),
wakeup_event_(new event()),
thread_(&TaskQueue::ThreadMain,
thread_(&TaskQueue::Impl::ThreadMain,
this,
queue_name,
TaskQueuePriorityToThreadPriority(priority)) {
@ -240,7 +295,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
thread_.Start();
}
TaskQueue::~TaskQueue() {
TaskQueue::Impl::~Impl() {
RTC_DCHECK(!IsCurrent());
struct timespec ts;
char message = kQuit;
@ -267,29 +322,38 @@ TaskQueue::~TaskQueue() {
}
// static
TaskQueue* TaskQueue::Current() {
TaskQueue::Impl* TaskQueue::Impl::Current() {
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
return ctx ? ctx->queue : nullptr;
}
// static
bool TaskQueue::IsCurrent(const char* queue_name) {
TaskQueue* current = Current();
TaskQueue* TaskQueue::Impl::CurrentQueue() {
TaskQueue::Impl* current = Current();
if (current) {
return current->queue_;
}
return nullptr;
}
// static
bool TaskQueue::Impl::IsCurrent(const char* queue_name) {
TaskQueue::Impl* current = Current();
return current && current->thread_.name().compare(queue_name) == 0;
}
bool TaskQueue::IsCurrent() const {
bool TaskQueue::Impl::IsCurrent() const {
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
RTC_DCHECK(task.get());
// libevent isn't thread safe. This means that we can't use methods such
// as event_base_once to post tasks to the worker thread from a different
// thread. However, we can use it when posting from the worker thread itself.
if (IsCurrent()) {
if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
task.get(), nullptr) == 0) {
task.release();
}
@ -310,11 +374,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
}
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
if (IsCurrent()) {
TimerEvent* timer = new TimerEvent(std::move(task));
EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
timer);
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
ctx->pending_timers_.push_back(timer);
@ -327,23 +392,18 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
}
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue::Impl* reply_queue) {
std::unique_ptr<QueuedTask> wrapper_task(
new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
reply_queue->wakeup_pipe_in_));
PostTask(std::move(wrapper_task));
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) {
return PostTaskAndReply(std::move(task), std::move(reply), Current());
}
// static
void TaskQueue::ThreadMain(void* context) {
TaskQueue* me = static_cast<TaskQueue*>(context);
void TaskQueue::Impl::ThreadMain(void* context) {
TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
QueueContext queue_context(me);
pthread_setspecific(GetQueuePtrTls(), &queue_context);
@ -358,7 +418,9 @@ void TaskQueue::ThreadMain(void* context) {
}
// static
void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
void TaskQueue::Impl::OnWakeup(int socket,
short flags,
void* context) { // NOLINT
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
@ -405,14 +467,14 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
}
// static
void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT
auto* task = static_cast<QueuedTask*>(context);
if (task->Run())
delete task;
}
// static
void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT
TimerEvent* timer = static_cast<TimerEvent*>(context);
if (!timer->task->Run())
timer->task.release();
@ -422,10 +484,54 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
delete timer;
}
void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
void TaskQueue::Impl::PrepareReplyTask(
scoped_refptr<ReplyTaskOwnerRef> reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
pending_replies_.push_back(std::move(reply_task));
}
TaskQueue::TaskQueue(const char* queue_name, Priority priority)
: impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
}
TaskQueue::~TaskQueue() {}
// static
TaskQueue* TaskQueue::Current() {
return TaskQueue::Impl::CurrentQueue();
}
// Used for DCHECKing the current queue.
// static
bool TaskQueue::IsCurrent(const char* queue_name) {
return TaskQueue::Impl::IsCurrent(queue_name);
}
bool TaskQueue::IsCurrent() const {
return impl_->IsCurrent();
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
return TaskQueue::impl_->PostTask(std::move(task));
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
reply_queue->impl_.get());
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) {
return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
impl_.get());
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
}
} // namespace rtc

View File

@ -105,7 +105,14 @@ declare_args() {
# See http://clang.llvm.org/docs/SanitizerCoverage.html .
rtc_sanitize_coverage = ""
# Links a default implementation of task queues to targets
# that depend on the target rtc_task_queue. Set to false to
# use an external implementation.
rtc_link_task_queue_impl = true
# Enable libevent task queues on platforms that support it.
# rtc_link_task_queue_impl must be set to true for this to
# have an effect.
if (is_win || is_mac || is_ios || is_nacl) {
rtc_enable_libevent = false
rtc_build_libevent = false
@ -314,6 +321,7 @@ template("rtc_executable") {
"//build/config:exe_and_shlib_deps",
]
deps += invoker.deps
public_configs = [ rtc_common_inherited_config ]
if (defined(invoker.public_configs)) {
public_configs += invoker.public_configs