diff --git a/webrtc/rtc_base/BUILD.gn b/webrtc/rtc_base/BUILD.gn index a9e82e510b..d26d5e7d85 100644 --- a/webrtc/rtc_base/BUILD.gn +++ b/webrtc/rtc_base/BUILD.gn @@ -295,42 +295,71 @@ config("enable_libevent_config") { defines = [ "WEBRTC_BUILD_LIBEVENT" ] } -rtc_static_library("rtc_task_queue") { +rtc_source_set("rtc_task_queue") { public_deps = [ ":rtc_base_approved", + ":rtc_task_queue_api", ] + if (rtc_link_task_queue_impl) { + deps = [ + ":rtc_task_queue_impl", + ] + } +} + +# WebRTC targets must not directly depend on rtc_task_queue_api or +# rtc_task_queue_impl. Instead, depend on rtc_task_queue. +# The build flag |rtc_link_task_queue_impl| decides if WebRTC targets will link +# to the default implemenation in rtc_task_queue_impl or if an externally +# provided implementation should be used. An external implementation should +# depend on rtc_task_queue_api. +rtc_source_set("rtc_task_queue_api") { if (build_with_chromium) { sources = [ - "../../webrtc_overrides/webrtc/rtc_base/task_queue.cc", "../../webrtc_overrides/webrtc/rtc_base/task_queue.h", ] } else { sources = [ "task_queue.h", - "task_queue_posix.h", ] - if (rtc_build_libevent) { - deps = [ - "//base/third_party/libevent", - ] - } + } + deps = [ + ":rtc_base_approved", + ] +} +rtc_source_set("rtc_task_queue_impl") { + deps = [ + ":rtc_base_approved", + ":rtc_task_queue_api", + ] + if (build_with_chromium) { + sources = [ + "../../webrtc_overrides/webrtc/rtc_base/task_queue.cc", + ] + } else { + if (rtc_build_libevent) { + deps += [ "//base/third_party/libevent" ] + } if (rtc_enable_libevent) { - sources += [ + sources = [ "task_queue_libevent.cc", "task_queue_posix.cc", + "task_queue_posix.h", ] all_dependent_configs = [ ":enable_libevent_config" ] } else { if (is_mac || is_ios) { - sources += [ + sources = [ "task_queue_gcd.cc", "task_queue_posix.cc", ] } if (is_win) { - sources += [ "task_queue_win.cc" ] + sources = [ + "task_queue_win.cc", + ] } } } diff --git a/webrtc/rtc_base/task_queue.h b/webrtc/rtc_base/task_queue.h index 218fce9a3b..e7eac2f185 100644 --- a/webrtc/rtc_base/task_queue.h +++ b/webrtc/rtc_base/task_queue.h @@ -15,23 +15,16 @@ #include #include -#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT) +#if defined(WEBRTC_MAC) #include #endif #include "webrtc/rtc_base/constructormagic.h" #include "webrtc/rtc_base/criticalsection.h" - -#if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT) -#include "webrtc/rtc_base/platform_thread.h" -#endif - -#if defined(WEBRTC_BUILD_LIBEVENT) -#include "webrtc/rtc_base/refcountedobject.h" #include "webrtc/rtc_base/scoped_ref_ptr.h" -struct event_base; -struct event; +#if defined(WEBRTC_WIN) +#include "webrtc/rtc_base/platform_thread.h" #endif namespace rtc { @@ -242,32 +235,7 @@ class LOCKABLE TaskQueue { } private: -#if defined(WEBRTC_BUILD_LIBEVENT) - static void ThreadMain(void* context); - static void OnWakeup(int socket, short flags, void* context); // NOLINT - static void RunTask(int fd, short flags, void* context); // NOLINT - static void RunTimer(int fd, short flags, void* context); // NOLINT - - class ReplyTaskOwner; - class PostAndReplyTask; - class SetTimerTask; - - typedef RefCountedObject ReplyTaskOwnerRef; - - void PrepareReplyTask(scoped_refptr reply_task); - - struct QueueContext; - - int wakeup_pipe_in_ = -1; - int wakeup_pipe_out_ = -1; - event_base* event_base_; - std::unique_ptr wakeup_event_; - PlatformThread thread_; - rtc::CriticalSection pending_lock_; - std::list> pending_ GUARDED_BY(pending_lock_); - std::list> pending_replies_ - GUARDED_BY(pending_lock_); -#elif defined(WEBRTC_MAC) +#if defined(WEBRTC_MAC) struct QueueContext; struct TaskContext; struct PostTaskAndReplyContext; @@ -295,7 +263,8 @@ class LOCKABLE TaskQueue { std::queue> pending_ GUARDED_BY(pending_lock_); HANDLE in_queue_; #else -#error not supported. + class Impl; + const scoped_refptr impl_; #endif RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue); diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc index db267dc80b..99b88df363 100644 --- a/webrtc/rtc_base/task_queue_libevent.cc +++ b/webrtc/rtc_base/task_queue_libevent.cc @@ -18,7 +18,11 @@ #include "base/third_party/libevent/event.h" #include "webrtc/rtc_base/checks.h" #include "webrtc/rtc_base/logging.h" +#include "webrtc/rtc_base/platform_thread.h" +#include "webrtc/rtc_base/refcount.h" +#include "webrtc/rtc_base/refcountedobject.h" #include "webrtc/rtc_base/safe_conversions.h" +#include "webrtc/rtc_base/task_queue.h" #include "webrtc/rtc_base/task_queue_posix.h" #include "webrtc/rtc_base/timeutils.h" @@ -104,9 +108,57 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { } } // namespace -struct TaskQueue::QueueContext { - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} - TaskQueue* queue; +class TaskQueue::Impl : public RefCountInterface { + public: + explicit Impl(const char* queue_name, + TaskQueue* queue, + Priority priority = Priority::NORMAL); + ~Impl() override; + + static TaskQueue::Impl* Current(); + static TaskQueue* CurrentQueue(); + + // Used for DCHECKing the current queue. + static bool IsCurrent(const char* queue_name); + bool IsCurrent() const; + + void PostTask(std::unique_ptr task); + void PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue::Impl* reply_queue); + + void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); + + private: + static void ThreadMain(void* context); + static void OnWakeup(int socket, short flags, void* context); // NOLINT + static void RunTask(int fd, short flags, void* context); // NOLINT + static void RunTimer(int fd, short flags, void* context); // NOLINT + + class ReplyTaskOwner; + class PostAndReplyTask; + class SetTimerTask; + + typedef RefCountedObject ReplyTaskOwnerRef; + + void PrepareReplyTask(scoped_refptr reply_task); + + struct QueueContext; + TaskQueue* const queue_; + int wakeup_pipe_in_ = -1; + int wakeup_pipe_out_ = -1; + event_base* event_base_; + std::unique_ptr wakeup_event_; + PlatformThread thread_; + rtc::CriticalSection pending_lock_; + std::list> pending_ GUARDED_BY(pending_lock_); + std::list> pending_replies_ + GUARDED_BY(pending_lock_); +}; + +struct TaskQueue::Impl::QueueContext { + explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {} + TaskQueue::Impl* queue; bool is_active; // Holds a list of events pending timers for cleanup when the loop exits. std::list pending_timers_; @@ -135,7 +187,7 @@ struct TaskQueue::QueueContext { // * if set_should_run_task() was called, the reply task will be run // * Release the reference to ReplyTaskOwner // * ReplyTaskOwner and associated |reply_| are deleted. -class TaskQueue::ReplyTaskOwner { +class TaskQueue::Impl::ReplyTaskOwner { public: ReplyTaskOwner(std::unique_ptr reply) : reply_(std::move(reply)) {} @@ -159,11 +211,11 @@ class TaskQueue::ReplyTaskOwner { bool run_task_ = false; }; -class TaskQueue::PostAndReplyTask : public QueuedTask { +class TaskQueue::Impl::PostAndReplyTask : public QueuedTask { public: PostAndReplyTask(std::unique_ptr task, std::unique_ptr reply, - TaskQueue* reply_queue, + TaskQueue::Impl* reply_queue, int reply_pipe) : task_(std::move(task)), reply_pipe_(reply_pipe), @@ -196,7 +248,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { scoped_refptr> reply_task_owner_; }; -class TaskQueue::SetTimerTask : public QueuedTask { +class TaskQueue::Impl::SetTimerTask : public QueuedTask { public: SetTimerTask(std::unique_ptr task, uint32_t milliseconds) : task_(std::move(task)), @@ -208,7 +260,7 @@ class TaskQueue::SetTimerTask : public QueuedTask { // Compensate for the time that has passed since construction // and until we got here. uint32_t post_time = Time32() - posted_; - TaskQueue::Current()->PostDelayedTask( + TaskQueue::Impl::Current()->PostDelayedTask( std::move(task_), post_time > milliseconds_ ? 0 : milliseconds_ - post_time); return true; @@ -219,10 +271,13 @@ class TaskQueue::SetTimerTask : public QueuedTask { const uint32_t posted_; }; -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) - : event_base_(event_base_new()), +TaskQueue::Impl::Impl(const char* queue_name, + TaskQueue* queue, + Priority priority /*= NORMAL*/) + : queue_(queue), + event_base_(event_base_new()), wakeup_event_(new event()), - thread_(&TaskQueue::ThreadMain, + thread_(&TaskQueue::Impl::ThreadMain, this, queue_name, TaskQueuePriorityToThreadPriority(priority)) { @@ -240,7 +295,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) thread_.Start(); } -TaskQueue::~TaskQueue() { +TaskQueue::Impl::~Impl() { RTC_DCHECK(!IsCurrent()); struct timespec ts; char message = kQuit; @@ -267,29 +322,38 @@ TaskQueue::~TaskQueue() { } // static -TaskQueue* TaskQueue::Current() { +TaskQueue::Impl* TaskQueue::Impl::Current() { QueueContext* ctx = static_cast(pthread_getspecific(GetQueuePtrTls())); return ctx ? ctx->queue : nullptr; } // static -bool TaskQueue::IsCurrent(const char* queue_name) { - TaskQueue* current = Current(); +TaskQueue* TaskQueue::Impl::CurrentQueue() { + TaskQueue::Impl* current = Current(); + if (current) { + return current->queue_; + } + return nullptr; +} + +// static +bool TaskQueue::Impl::IsCurrent(const char* queue_name) { + TaskQueue::Impl* current = Current(); return current && current->thread_.name().compare(queue_name) == 0; } -bool TaskQueue::IsCurrent() const { +bool TaskQueue::Impl::IsCurrent() const { return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); } -void TaskQueue::PostTask(std::unique_ptr task) { +void TaskQueue::Impl::PostTask(std::unique_ptr task) { RTC_DCHECK(task.get()); // libevent isn't thread safe. This means that we can't use methods such // as event_base_once to post tasks to the worker thread from a different // thread. However, we can use it when posting from the worker thread itself. if (IsCurrent()) { - if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, + if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask, task.get(), nullptr) == 0) { task.release(); } @@ -310,11 +374,12 @@ void TaskQueue::PostTask(std::unique_ptr task) { } } -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { +void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { if (IsCurrent()) { TimerEvent* timer = new TimerEvent(std::move(task)); - EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); + EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer, + timer); QueueContext* ctx = static_cast(pthread_getspecific(GetQueuePtrTls())); ctx->pending_timers_.push_back(timer); @@ -327,23 +392,18 @@ void TaskQueue::PostDelayedTask(std::unique_ptr task, } } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { +void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue::Impl* reply_queue) { std::unique_ptr wrapper_task( new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, reply_queue->wakeup_pipe_in_)); PostTask(std::move(wrapper_task)); } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return PostTaskAndReply(std::move(task), std::move(reply), Current()); -} - // static -void TaskQueue::ThreadMain(void* context) { - TaskQueue* me = static_cast(context); +void TaskQueue::Impl::ThreadMain(void* context) { + TaskQueue::Impl* me = static_cast(context); QueueContext queue_context(me); pthread_setspecific(GetQueuePtrTls(), &queue_context); @@ -358,7 +418,9 @@ void TaskQueue::ThreadMain(void* context) { } // static -void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT +void TaskQueue::Impl::OnWakeup(int socket, + short flags, + void* context) { // NOLINT QueueContext* ctx = static_cast(pthread_getspecific(GetQueuePtrTls())); RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); @@ -405,14 +467,14 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT } // static -void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT +void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT auto* task = static_cast(context); if (task->Run()) delete task; } // static -void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT +void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT TimerEvent* timer = static_cast(context); if (!timer->task->Run()) timer->task.release(); @@ -422,10 +484,54 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT delete timer; } -void TaskQueue::PrepareReplyTask(scoped_refptr reply_task) { +void TaskQueue::Impl::PrepareReplyTask( + scoped_refptr reply_task) { RTC_DCHECK(reply_task); CritScope lock(&pending_lock_); pending_replies_.push_back(std::move(reply_task)); } +TaskQueue::TaskQueue(const char* queue_name, Priority priority) + : impl_(new RefCountedObject(queue_name, this, priority)) { +} + +TaskQueue::~TaskQueue() {} + +// static +TaskQueue* TaskQueue::Current() { + return TaskQueue::Impl::CurrentQueue(); +} + +// Used for DCHECKing the current queue. +// static +bool TaskQueue::IsCurrent(const char* queue_name) { + return TaskQueue::Impl::IsCurrent(queue_name); +} + +bool TaskQueue::IsCurrent() const { + return impl_->IsCurrent(); +} + +void TaskQueue::PostTask(std::unique_ptr task) { + return TaskQueue::impl_->PostTask(std::move(task)); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue) { + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), + reply_queue->impl_.get()); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply) { + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), + impl_.get()); +} + +void TaskQueue::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); +} + } // namespace rtc diff --git a/webrtc/webrtc.gni b/webrtc/webrtc.gni index 071c9cc23f..72e031ee77 100644 --- a/webrtc/webrtc.gni +++ b/webrtc/webrtc.gni @@ -105,7 +105,14 @@ declare_args() { # See http://clang.llvm.org/docs/SanitizerCoverage.html . rtc_sanitize_coverage = "" + # Links a default implementation of task queues to targets + # that depend on the target rtc_task_queue. Set to false to + # use an external implementation. + rtc_link_task_queue_impl = true + # Enable libevent task queues on platforms that support it. + # rtc_link_task_queue_impl must be set to true for this to + # have an effect. if (is_win || is_mac || is_ios || is_nacl) { rtc_enable_libevent = false rtc_build_libevent = false @@ -314,6 +321,7 @@ template("rtc_executable") { "//build/config:exe_and_shlib_deps", ] deps += invoker.deps + public_configs = [ rtc_common_inherited_config ] if (defined(invoker.public_configs)) { public_configs += invoker.public_configs