diff --git a/webrtc/base/BUILD.gn b/webrtc/base/BUILD.gn index cd7e5b51e5..cc8b76b78b 100644 --- a/webrtc/base/BUILD.gn +++ b/webrtc/base/BUILD.gn @@ -86,6 +86,7 @@ if (rtc_build_ssl == 0) { # The subset of rtc_base approved for use outside of libjingle. static_library("rtc_base_approved") { + defines = [] deps = [] configs += [ "..:common_config" ] public_configs = [ "..:common_inherited_config" ] @@ -147,6 +148,12 @@ static_library("rtc_base_approved") { "swap_queue.h", "systeminfo.cc", "systeminfo.h", + "task_queue.h", + "task_queue_gcd.cc", + "task_queue_libevent.cc", + "task_queue_posix.cc", + "task_queue_posix.h", + "task_queue_win.cc", "template_util.h", "thread_annotations.h", "thread_checker.h", @@ -172,6 +179,23 @@ static_library("rtc_base_approved") { "logging_mac.mm", ] } + + if (!is_win && !is_mac && !is_ios && !is_nacl) { + deps += [ "//base/third_party/libevent" ] + defines += [ "WEBRTC_BUILD_LIBEVENT" ] + } + + if (is_mac || is_ios || is_win || is_nacl) { + sources -= [ "task_queue_libevent.cc" ] + } + + if (is_linux || is_android || is_win || is_nacl) { + sources -= [ "task_queue_gcd.cc" ] + } + + if (is_nacl) { + sources -= [ "task_queue_posix.cc" ] + } } static_library("rtc_base") { diff --git a/webrtc/base/base.gyp b/webrtc/base/base.gyp index 9cf936a57b..8eb881e8e0 100644 --- a/webrtc/base/base.gyp +++ b/webrtc/base/base.gyp @@ -84,6 +84,12 @@ 'swap_queue.h', 'systeminfo.cc', 'systeminfo.h', + 'task_queue.h', + 'task_queue_libevent.cc', + 'task_queue_gcd.cc', + 'task_queue_posix.cc', + 'task_queue_posix.h', + 'task_queue_win.cc', 'template_util.h', 'thread_annotations.h', 'thread_checker.h', @@ -111,6 +117,24 @@ 'logging.h', 'logging_mac.mm', ], + 'conditions': [ + ['build_libevent==1', { + 'dependencies': [ + '<(DEPTH)/base/third_party/libevent/libevent.gyp:libevent', + ], + }], + ], + }], + ['build_libevent!=1', { + 'sources!': [ 'task_queue_libevent.cc' ], + 'conditions': [ + ['OS=="linux" or OS=="android"', { + 'sources!': [ 'task_queue_posix.cc' ], + }], + ], + }], + ['build_libevent==1 or OS=="linux" or OS=="android" or OS=="win"', { + 'sources!': [ 'task_queue_gcd.cc' ], }], ['OS=="mac" and build_with_chromium==0', { 'all_dependent_settings': { diff --git a/webrtc/base/base_tests.gyp b/webrtc/base/base_tests.gyp index bc33028694..063e8e164f 100644 --- a/webrtc/base/base_tests.gyp +++ b/webrtc/base/base_tests.gyp @@ -108,6 +108,7 @@ 'swap_queue_unittest.cc', # TODO(ronghuawu): Reenable this test. # 'systeminfo_unittest.cc', + 'task_queue_unittest.cc', 'task_unittest.cc', 'testclient_unittest.cc', 'thread_checker_unittest.cc', diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h new file mode 100644 index 0000000000..dad4f431b7 --- /dev/null +++ b/webrtc/base/task_queue.h @@ -0,0 +1,277 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_BASE_TASK_QUEUE_H_ +#define WEBRTC_BASE_TASK_QUEUE_H_ + +#include +#include + +#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT) +#include +#endif + +#include "webrtc/base/constructormagic.h" +#include "webrtc/base/criticalsection.h" + +#if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT) +#include "webrtc/base/platform_thread.h" +#endif + +#if defined(WEBRTC_BUILD_LIBEVENT) +struct event_base; +struct event; +#endif + +namespace rtc { + +// Base interface for asynchronously executed tasks. +// The interface basically consists of a single function, Run(), that executes +// on the target queue. For more details see the Run() method and TaskQueue. +class QueuedTask { + public: + QueuedTask() {} + virtual ~QueuedTask() {} + + // Main routine that will run when the task is executed on the desired queue. + // The task should return |true| to indicate that it should be deleted or + // |false| to indicate that the queue should consider ownership of the task + // having been transferred. Returning |false| can be useful if a task has + // re-posted itself to a different queue or is otherwise being re-used. + virtual bool Run() = 0; + + private: + RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask); +}; + +// Simple implementation of QueuedTask for use with rtc::Bind and lambdas. +template +class ClosureTask : public QueuedTask { + public: + explicit ClosureTask(const Closure& closure) : closure_(closure) {} + + private: + bool Run() override { + closure_(); + return true; + } + + Closure closure_; +}; + +// Extends ClosureTask to also allow specifying cleanup code. +// This is useful when using lambdas if guaranteeing cleanup, even if a task +// was dropped (queue is too full), is required. +template +class ClosureTaskWithCleanup : public ClosureTask { + public: + ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup) + : ClosureTask(closure), cleanup_(cleanup) {} + ~ClosureTaskWithCleanup() { cleanup_(); } + + private: + Cleanup cleanup_; +}; + +// Convenience function to construct closures that can be passed directly +// to methods that support std::unique_ptr but not template +// based parameters. +template +static std::unique_ptr NewClosure(const Closure& closure) { + return std::unique_ptr(new ClosureTask(closure)); +} + +template +static std::unique_ptr NewClosure(const Closure& closure, + const Cleanup& cleanup) { + return std::unique_ptr( + new ClosureTaskWithCleanup(closure, cleanup)); +} + +// Implements a task queue that asynchronously executes tasks in a way that +// guarantees that they're executed in FIFO order and that tasks never overlap. +// Tasks may always execute on the same worker thread and they may not. +// To DCHECK that tasks are executing on a known task queue, use IsCurrent(). +// +// Here are some usage examples: +// +// 1) Asynchronously running a lambda: +// +// class MyClass { +// ... +// TaskQueue queue_("MyQueue"); +// }; +// +// void MyClass::StartWork() { +// queue_.PostTask([]() { Work(); }); +// ... +// +// 2) Doing work asynchronously on a worker queue and providing a notification +// callback on the current queue, when the work has been done: +// +// void MyClass::StartWorkAndLetMeKnowWhenDone( +// std::unique_ptr callback) { +// DCHECK(TaskQueue::Current()) << "Need to be running on a queue"; +// queue_.PostTaskAndReply([]() { Work(); }, std::move(callback)); +// } +// ... +// my_class->StartWorkAndLetMeKnowWhenDone( +// NewClosure([]() { LOG(INFO) << "The work is done!";})); +// +// 3) Posting a custom task on a timer. The task posts itself again after +// every running: +// +// class TimerTask : public QueuedTask { +// public: +// TimerTask() {} +// private: +// bool Run() override { +// ++count_; +// TaskQueue::Current()->PostDelayedTask( +// std::unique_ptr(this), 1000); +// // Ownership has been transferred to the next occurance, +// // so return false to prevent from being deleted now. +// return false; +// } +// int count_ = 0; +// }; +// ... +// queue_.PostDelayedTask( +// std::unique_ptr(new TimerTask()), 1000); +// +// For more examples, see task_queue_unittests.cc. +// +// A note on destruction: +// +// When a TaskQueue is deleted, pending tasks will not be executed but they will +// be deleted. The deletion of tasks may happen asynchronously after the +// TaskQueue itself has been deleted or it may happen synchronously while the +// TaskQueue instance is being deleted. This may vary from one OS to the next +// so assumptions about lifetimes of pending tasks should not be made. +class TaskQueue { + public: + explicit TaskQueue(const char* queue_name); + // TODO(tommi): Implement move semantics? + ~TaskQueue(); + + static TaskQueue* Current(); + + // Used for DCHECKing the current queue. + static bool IsCurrent(const char* queue_name); + bool IsCurrent() const; + + // TODO(tommi): For better debuggability, implement FROM_HERE. + + // Ownership of the task is passed to PostTask. + void PostTask(std::unique_ptr task); + void PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue); + void PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply); + + void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); + + template + void PostTask(const Closure& closure) { + PostTask(std::unique_ptr(new ClosureTask(closure))); + } + + template + void PostDelayedTask(const Closure& closure, uint32_t milliseconds) { + PostDelayedTask( + std::unique_ptr(new ClosureTask(closure)), + milliseconds); + } + + template + void PostTaskAndReply(const Closure1& task, + const Closure2& reply, + TaskQueue* reply_queue) { + PostTaskAndReply( + std::unique_ptr(new ClosureTask(task)), + std::unique_ptr(new ClosureTask(reply)), + reply_queue); + } + + template + void PostTaskAndReply(std::unique_ptr task, + const Closure& reply) { + PostTaskAndReply(std::move(task), std::unique_ptr( + new ClosureTask(reply))); + } + + template + void PostTaskAndReply(const Closure& task, + std::unique_ptr reply) { + PostTaskAndReply( + std::unique_ptr(new ClosureTask(task)), + std::move(reply)); + } + + template + void PostTaskAndReply(const Closure1& task, const Closure2& reply) { + PostTaskAndReply( + std::unique_ptr(new ClosureTask(task)), + std::unique_ptr(new ClosureTask(reply))); + } + + private: +#if defined(WEBRTC_BUILD_LIBEVENT) + static bool ThreadMain(void* context); + static void OnWakeup(int socket, short flags, void* context); // NOLINT + static void RunTask(int fd, short flags, void* context); // NOLINT + static void RunTimer(int fd, short flags, void* context); // NOLINT + + class PostAndReplyTask; + class SetTimerTask; + + void PrepareReplyTask(PostAndReplyTask* reply_task); + void ReplyTaskDone(PostAndReplyTask* reply_task); + + struct QueueContext; + + int wakeup_pipe_in_ = -1; + int wakeup_pipe_out_ = -1; + event_base* event_base_; + std::unique_ptr wakeup_event_; + PlatformThread thread_; + rtc::CriticalSection pending_lock_; + std::list> pending_ GUARDED_BY(pending_lock_); + std::list pending_replies_ GUARDED_BY(pending_lock_); +#elif defined(WEBRTC_MAC) + struct QueueContext; + struct TaskContext; + struct PostTaskAndReplyContext; + dispatch_queue_t queue_; + QueueContext* const context_; +#elif defined(WEBRTC_WIN) + static bool ThreadMain(void* context); + + class WorkerThread : public PlatformThread { + public: + WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name) + : PlatformThread(func, obj, thread_name) {} + + bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { + return PlatformThread::QueueAPC(apc_function, data); + } + }; + WorkerThread thread_; +#else +#error not supported. +#endif + + RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue); +}; + +} // namespace rtc + +#endif // WEBRTC_BASE_TASK_QUEUE_H_ diff --git a/webrtc/base/task_queue_gcd.cc b/webrtc/base/task_queue_gcd.cc new file mode 100644 index 0000000000..2c7d649fc9 --- /dev/null +++ b/webrtc/base/task_queue_gcd.cc @@ -0,0 +1,167 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +// This file contains the implementation of TaskQueue for Mac and iOS. +// The implementation uses Grand Central Dispatch queues (GCD) to +// do the actual task queuing. + +#include "webrtc/base/task_queue.h" + +#include + +#include "webrtc/base/checks.h" +#include "webrtc/base/logging.h" +#include "webrtc/base/task_queue_posix.h" + +namespace rtc { +using internal::GetQueuePtrTls; +using internal::AutoSetCurrentQueuePtr; + +struct TaskQueue::QueueContext { + explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} + + static void SetNotActive(void* context) { + QueueContext* qc = static_cast(context); + qc->is_active = false; + } + + static void DeleteContext(void* context) { + QueueContext* qc = static_cast(context); + delete qc; + } + + TaskQueue* const queue; + bool is_active; +}; + +struct TaskQueue::TaskContext { + TaskContext(QueueContext* queue_ctx, std::unique_ptr task) + : queue_ctx(queue_ctx), task(std::move(task)) {} + virtual ~TaskContext() {} + + static void RunTask(void* context) { + std::unique_ptr tc(static_cast(context)); + if (tc->queue_ctx->is_active) { + AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue); + if (!tc->task->Run()) + tc->task.release(); + } + } + + QueueContext* const queue_ctx; + std::unique_ptr task; +}; + +// Special case context for holding two tasks, a |first_task| + the task +// that's owned by the parent struct, TaskContext, that then becomes the +// second (i.e. 'reply') task. +struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext { + explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx, + std::unique_ptr first_task, + QueueContext* second_queue_ctx, + std::unique_ptr second_task) + : TaskContext(second_queue_ctx, std::move(second_task)), + first_queue_ctx(first_queue_ctx), + first_task(std::move(first_task)) { + // Retain the reply queue for as long as this object lives. + // If we don't, we may have memory leaks and/or failures. + dispatch_retain(first_queue_ctx->queue->queue_); + } + ~PostTaskAndReplyContext() override { + dispatch_release(first_queue_ctx->queue->queue_); + } + + static void RunTask(void* context) { + auto* rc = static_cast(context); + if (rc->first_queue_ctx->is_active) { + AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue); + if (!rc->first_task->Run()) + rc->first_task.release(); + } + // Post the reply task. This hands the work over to the parent struct. + // This task will eventually delete |this|. + dispatch_async_f(rc->queue_ctx->queue->queue_, rc, &TaskContext::RunTask); + } + + QueueContext* const first_queue_ctx; + std::unique_ptr first_task; +}; + +TaskQueue::TaskQueue(const char* queue_name) + : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), + context_(new QueueContext(this)) { + RTC_DCHECK(queue_name); + RTC_CHECK(queue_); + dispatch_set_context(queue_, context_); + // Assign a finalizer that will delete the context when the last reference + // to the queue is released. This may run after the TaskQueue object has + // been deleted. + dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext); +} + +TaskQueue::~TaskQueue() { + RTC_DCHECK(!IsCurrent()); + // Implementation/behavioral note: + // Dispatch queues are reference counted via calls to dispatch_retain and + // dispatch_release. Pending blocks submitted to a queue also hold a + // reference to the queue until they have finished. Once all references to a + // queue have been released, the queue will be deallocated by the system. + // This is why we check the context before running tasks. + + // Use dispatch_sync to set the context to null to guarantee that there's not + // a race between checking the context and using it from a task. + dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive); + dispatch_release(queue_); +} + +// static +TaskQueue* TaskQueue::Current() { + return static_cast(pthread_getspecific(GetQueuePtrTls())); +} + +// static +bool TaskQueue::IsCurrent(const char* queue_name) { + TaskQueue* current = Current(); + return current && + strcmp(queue_name, dispatch_queue_get_label(current->queue_)) == 0; +} + +bool TaskQueue::IsCurrent() const { + RTC_DCHECK(queue_); + return this == Current(); +} + +void TaskQueue::PostTask(std::unique_ptr task) { + auto* context = new TaskContext(context_, std::move(task)); + dispatch_async_f(queue_, context, &TaskContext::RunTask); +} + +void TaskQueue::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + auto* context = new TaskContext(context_, std::move(task)); + dispatch_after_f( + dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_, + context, &TaskContext::RunTask); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue) { + auto* context = new PostTaskAndReplyContext( + context_, std::move(task), reply_queue->context_, std::move(reply)); + dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply) { + return PostTaskAndReply(std::move(task), std::move(reply), Current()); +} + +} // namespace rtc diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc new file mode 100644 index 0000000000..a59b450828 --- /dev/null +++ b/webrtc/base/task_queue_libevent.cc @@ -0,0 +1,318 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/task_queue.h" + +#include +#include +#include + +#include "base/third_party/libevent/event.h" +#include "webrtc/base/checks.h" +#include "webrtc/base/logging.h" +#include "webrtc/base/task_queue_posix.h" +#include "webrtc/base/timeutils.h" + +namespace rtc { +using internal::GetQueuePtrTls; +using internal::AutoSetCurrentQueuePtr; + +namespace { +static const char kQuit = 1; +static const char kRunTask = 2; + +struct TimerEvent { + explicit TimerEvent(std::unique_ptr task) + : task(std::move(task)) {} + ~TimerEvent() { event_del(&ev); } + event ev; + std::unique_ptr task; +}; + +bool SetNonBlocking(int fd) { + const int flags = fcntl(fd, F_GETFL); + RTC_CHECK(flags != -1); + return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1; +} +} // namespace + +struct TaskQueue::QueueContext { + explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} + TaskQueue* queue; + bool is_active; + // Holds a list of events pending timers for cleanup when the loop exits. + std::list pending_timers_; +}; + +class TaskQueue::PostAndReplyTask : public QueuedTask { + public: + PostAndReplyTask(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue) + : task_(std::move(task)), + reply_(std::move(reply)), + reply_queue_(reply_queue) { + reply_queue->PrepareReplyTask(this); + } + + ~PostAndReplyTask() override { + CritScope lock(&lock_); + if (reply_queue_) + reply_queue_->ReplyTaskDone(this); + } + + void OnReplyQueueGone() { + CritScope lock(&lock_); + reply_queue_ = nullptr; + } + + private: + bool Run() override { + if (!task_->Run()) + task_.release(); + + CritScope lock(&lock_); + if (reply_queue_) + reply_queue_->PostTask(std::move(reply_)); + return true; + } + + CriticalSection lock_; + std::unique_ptr task_; + std::unique_ptr reply_; + TaskQueue* reply_queue_ GUARDED_BY(lock_); +}; + +class TaskQueue::SetTimerTask : public QueuedTask { + public: + SetTimerTask(std::unique_ptr task, uint32_t milliseconds) + : task_(std::move(task)), + milliseconds_(milliseconds), + posted_(Time32()) {} + + private: + bool Run() override { + // Compensate for the time that has passed since construction + // and until we got here. + uint32_t post_time = Time32() - posted_; + TaskQueue::Current()->PostDelayedTask( + std::move(task_), + post_time > milliseconds_ ? 0 : milliseconds_ - post_time); + return true; + } + + std::unique_ptr task_; + const uint32_t milliseconds_; + const uint32_t posted_; +}; + +TaskQueue::TaskQueue(const char* queue_name) + : event_base_(event_base_new()), + wakeup_event_(new event()), + thread_(&TaskQueue::ThreadMain, this, queue_name) { + RTC_DCHECK(queue_name); + int fds[2]; + RTC_CHECK(pipe(fds) == 0); + SetNonBlocking(fds[0]); + SetNonBlocking(fds[1]); + wakeup_pipe_out_ = fds[0]; + wakeup_pipe_in_ = fds[1]; + event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST, + OnWakeup, this); + event_base_set(event_base_, wakeup_event_.get()); + event_add(wakeup_event_.get(), 0); + thread_.Start(); +} + +TaskQueue::~TaskQueue() { + RTC_DCHECK(!IsCurrent()); + struct timespec ts; + char message = kQuit; + while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { + // The queue is full, so we have no choice but to wait and retry. + RTC_CHECK_EQ(EAGAIN, errno); + ts.tv_sec = 0; + ts.tv_nsec = 1000000; + nanosleep(&ts, nullptr); + } + + thread_.Stop(); + + event_del(wakeup_event_.get()); + close(wakeup_pipe_in_); + close(wakeup_pipe_out_); + wakeup_pipe_in_ = -1; + wakeup_pipe_out_ = -1; + + { + // Synchronize against any pending reply tasks that might be running on + // other queues. + CritScope lock(&pending_lock_); + for (auto* reply : pending_replies_) + reply->OnReplyQueueGone(); + pending_replies_.clear(); + } + + event_base_free(event_base_); +} + +// static +TaskQueue* TaskQueue::Current() { + QueueContext* ctx = + static_cast(pthread_getspecific(GetQueuePtrTls())); + return ctx ? ctx->queue : nullptr; +} + +// static +bool TaskQueue::IsCurrent(const char* queue_name) { + TaskQueue* current = Current(); + return current && current->thread_.name().compare(queue_name) == 0; +} + +bool TaskQueue::IsCurrent() const { + return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); +} + +void TaskQueue::PostTask(std::unique_ptr task) { + RTC_DCHECK(task.get()); + // libevent isn't thread safe. This means that we can't use methods such + // as event_base_once to post tasks to the worker thread from a different + // thread. However, we can use it when posting from the worker thread itself. + if (IsCurrent()) { + if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, + task.get(), nullptr) == 0) { + task.release(); + } + } else { + QueuedTask* task_id = task.get(); // Only used for comparison. + { + CritScope lock(&pending_lock_); + pending_.push_back(std::move(task)); + } + char message = kRunTask; + if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { + LOG(WARNING) << "Failed to queue task."; + CritScope lock(&pending_lock_); + pending_.remove_if([task_id](std::unique_ptr& t) { + return t.get() == task_id; + }); + } + } +} + +void TaskQueue::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + if (IsCurrent()) { + TimerEvent* timer = new TimerEvent(std::move(task)); + evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer); + event_base_set(event_base_, &timer->ev); + QueueContext* ctx = + static_cast(pthread_getspecific(GetQueuePtrTls())); + ctx->pending_timers_.push_back(timer); + timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000}; + event_add(&timer->ev, &tv); + } else { + PostTask(std::unique_ptr( + new SetTimerTask(std::move(task), milliseconds))); + } +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue) { + std::unique_ptr wrapper_task( + new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); + PostTask(std::move(wrapper_task)); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply) { + return PostTaskAndReply(std::move(task), std::move(reply), Current()); +} + +// static +bool TaskQueue::ThreadMain(void* context) { + TaskQueue* me = static_cast(context); + + QueueContext queue_context(me); + pthread_setspecific(GetQueuePtrTls(), &queue_context); + + while (queue_context.is_active) + event_base_loop(me->event_base_, 0); + + pthread_setspecific(GetQueuePtrTls(), nullptr); + + for (TimerEvent* timer : queue_context.pending_timers_) + delete timer; + + return false; +} + +// static +void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT + QueueContext* ctx = + static_cast(pthread_getspecific(GetQueuePtrTls())); + RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); + char buf; + RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); + switch (buf) { + case kQuit: + ctx->is_active = false; + event_base_loopbreak(ctx->queue->event_base_); + break; + case kRunTask: { + std::unique_ptr task; + { + CritScope lock(&ctx->queue->pending_lock_); + RTC_DCHECK(!ctx->queue->pending_.empty()); + task = std::move(ctx->queue->pending_.front()); + ctx->queue->pending_.pop_front(); + RTC_DCHECK(task.get()); + } + if (!task->Run()) + task.release(); + break; + } + default: + RTC_NOTREACHED(); + break; + } +} + +// static +void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT + auto* task = static_cast(context); + if (task->Run()) + delete task; +} + +// static +void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT + TimerEvent* timer = static_cast(context); + if (!timer->task->Run()) + timer->task.release(); + QueueContext* ctx = + static_cast(pthread_getspecific(GetQueuePtrTls())); + ctx->pending_timers_.remove(timer); + delete timer; +} + +void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { + RTC_DCHECK(reply_task); + CritScope lock(&pending_lock_); + pending_replies_.push_back(reply_task); +} + +void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { + CritScope lock(&pending_lock_); + pending_replies_.remove(reply_task); +} + +} // namespace rtc diff --git a/webrtc/base/task_queue_posix.cc b/webrtc/base/task_queue_posix.cc new file mode 100644 index 0000000000..3b00ac8e12 --- /dev/null +++ b/webrtc/base/task_queue_posix.cc @@ -0,0 +1,40 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/task_queue_posix.h" + +#include "webrtc/base/checks.h" +#include "webrtc/base/task_queue.h" + +namespace rtc { +namespace internal { +pthread_key_t g_queue_ptr_tls = 0; + +void InitializeTls() { + RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0); +} + +pthread_key_t GetQueuePtrTls() { + static pthread_once_t init_once = PTHREAD_ONCE_INIT; + RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0); + return g_queue_ptr_tls; +} + +AutoSetCurrentQueuePtr::AutoSetCurrentQueuePtr(TaskQueue* q) + : prev_(TaskQueue::Current()) { + pthread_setspecific(GetQueuePtrTls(), q); +} + +AutoSetCurrentQueuePtr::~AutoSetCurrentQueuePtr() { + pthread_setspecific(GetQueuePtrTls(), prev_); +} + +} // namespace internal +} // namespace rtc diff --git a/webrtc/base/task_queue_posix.h b/webrtc/base/task_queue_posix.h new file mode 100644 index 0000000000..b677b78a38 --- /dev/null +++ b/webrtc/base/task_queue_posix.h @@ -0,0 +1,36 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_BASE_TASK_QUEUE_POSIX_H_ +#define WEBRTC_BASE_TASK_QUEUE_POSIX_H_ + +#include + +namespace rtc { + +class TaskQueue; + +namespace internal { + +class AutoSetCurrentQueuePtr { + public: + explicit AutoSetCurrentQueuePtr(TaskQueue* q); + ~AutoSetCurrentQueuePtr(); + + private: + TaskQueue* const prev_; +}; + +pthread_key_t GetQueuePtrTls(); + +} // namespace internal +} // namespace rtc + +#endif // WEBRTC_BASE_TASK_QUEUE_POSIX_H_ diff --git a/webrtc/base/task_queue_unittest.cc b/webrtc/base/task_queue_unittest.cc new file mode 100644 index 0000000000..db4e6c2f7e --- /dev/null +++ b/webrtc/base/task_queue_unittest.cc @@ -0,0 +1,261 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include + +#include "webrtc/base/bind.h" +#include "webrtc/base/event.h" +#include "webrtc/base/gunit.h" +#include "webrtc/base/task_queue.h" +#include "webrtc/base/timeutils.h" + +namespace rtc { + +namespace { +void CheckCurrent(const char* expected_queue, Event* signal, TaskQueue* queue) { + EXPECT_TRUE(TaskQueue::IsCurrent(expected_queue)); + EXPECT_TRUE(queue->IsCurrent()); + if (signal) + signal->Set(); +} + +} // namespace + +TEST(TaskQueueTest, Construct) { + static const char kQueueName[] = "Construct"; + TaskQueue queue(kQueueName); + EXPECT_FALSE(queue.IsCurrent()); +} + +TEST(TaskQueueTest, PostAndCheckCurrent) { + static const char kQueueName[] = "PostAndCheckCurrent"; + TaskQueue queue(kQueueName); + + // We're not running a task, so there shouldn't be a current queue. + EXPECT_FALSE(queue.IsCurrent()); + EXPECT_FALSE(TaskQueue::Current()); + + Event event(false, false); + queue.PostTask(Bind(&CheckCurrent, kQueueName, &event, &queue)); + EXPECT_TRUE(event.Wait(1000)); +} + +TEST(TaskQueueTest, PostCustomTask) { + static const char kQueueName[] = "PostCustomImplementation"; + TaskQueue queue(kQueueName); + + Event event(false, false); + + class CustomTask : public QueuedTask { + public: + explicit CustomTask(Event* event) : event_(event) {} + + private: + bool Run() override { + event_->Set(); + return false; // Never allows the task to be deleted by the queue. + } + + Event* const event_; + } my_task(&event); + + // Please don't do this in production code! :) + queue.PostTask(std::unique_ptr(&my_task)); + EXPECT_TRUE(event.Wait(1000)); +} + +TEST(TaskQueueTest, PostLambda) { + static const char kQueueName[] = "PostLambda"; + TaskQueue queue(kQueueName); + + Event event(false, false); + queue.PostTask([&event]() { event.Set(); }); + EXPECT_TRUE(event.Wait(1000)); +} + +TEST(TaskQueueTest, PostFromQueue) { + static const char kQueueName[] = "PostFromQueue"; + TaskQueue queue(kQueueName); + + Event event(false, false); + queue.PostTask( + [&event, &queue]() { queue.PostTask([&event]() { event.Set(); }); }); + EXPECT_TRUE(event.Wait(1000)); +} + +TEST(TaskQueueTest, PostDelayed) { + static const char kQueueName[] = "PostDelayed"; + TaskQueue queue(kQueueName); + + Event event(false, false); + uint32_t start = Time(); + queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100); + EXPECT_TRUE(event.Wait(1000)); + uint32_t end = Time(); + EXPECT_GE(end - start, 100u); + EXPECT_NEAR(end - start, 200u, 100u); // Accept 100-300. +} + +TEST(TaskQueueTest, PostMultipleDelayed) { + static const char kQueueName[] = "PostMultipleDelayed"; + TaskQueue queue(kQueueName); + + std::vector> events; + for (int i = 0; i < 10; ++i) { + events.push_back(std::unique_ptr(new Event(false, false))); + queue.PostDelayedTask( + Bind(&CheckCurrent, kQueueName, events.back().get(), &queue), 10); + } + + for (const auto& e : events) + EXPECT_TRUE(e->Wait(100)); +} + +TEST(TaskQueueTest, PostDelayedAfterDestruct) { + static const char kQueueName[] = "PostDelayedAfterDestruct"; + Event event(false, false); + { + TaskQueue queue(kQueueName); + queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100); + } + EXPECT_FALSE(event.Wait(200)); // Task should not run. +} + +TEST(TaskQueueTest, PostAndReply) { + static const char kPostQueue[] = "PostQueue"; + static const char kReplyQueue[] = "ReplyQueue"; + TaskQueue post_queue(kPostQueue); + TaskQueue reply_queue(kReplyQueue); + + Event event(false, false); + post_queue.PostTaskAndReply( + Bind(&CheckCurrent, kPostQueue, nullptr, &post_queue), + Bind(&CheckCurrent, kReplyQueue, &event, &reply_queue), &reply_queue); + EXPECT_TRUE(event.Wait(1000)); +} + +TEST(TaskQueueTest, PostAndReuse) { + static const char kPostQueue[] = "PostQueue"; + static const char kReplyQueue[] = "ReplyQueue"; + TaskQueue post_queue(kPostQueue); + TaskQueue reply_queue(kReplyQueue); + + int call_count = 0; + + class ReusedTask : public QueuedTask { + public: + ReusedTask(int* counter, TaskQueue* reply_queue, Event* event) + : counter_(counter), reply_queue_(reply_queue), event_(event) { + EXPECT_EQ(0, *counter_); + } + + private: + bool Run() override { + if (++(*counter_) == 1) { + std::unique_ptr myself(this); + reply_queue_->PostTask(std::move(myself)); + // At this point, the object is owned by reply_queue_ and it's + // theoratically possible that the object has been deleted (e.g. if + // posting wasn't possible). So, don't touch any member variables here. + + // Indicate to the current queue that ownership has been transferred. + return false; + } else { + EXPECT_EQ(2, *counter_); + EXPECT_TRUE(reply_queue_->IsCurrent()); + event_->Set(); + return true; // Indicate that the object should be deleted. + } + } + + int* const counter_; + TaskQueue* const reply_queue_; + Event* const event_; + }; + + Event event(false, false); + std::unique_ptr task( + new ReusedTask(&call_count, &reply_queue, &event)); + + post_queue.PostTask(std::move(task)); + EXPECT_TRUE(event.Wait(1000)); +} + +TEST(TaskQueueTest, PostAndReplyLambda) { + static const char kPostQueue[] = "PostQueue"; + static const char kReplyQueue[] = "ReplyQueue"; + TaskQueue post_queue(kPostQueue); + TaskQueue reply_queue(kReplyQueue); + + Event event(false, false); + bool my_flag = false; + post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; }, + [&event]() { event.Set(); }, &reply_queue); + EXPECT_TRUE(event.Wait(1000)); + EXPECT_TRUE(my_flag); +} + +void TestPostTaskAndReply(TaskQueue* work_queue, + const char* work_queue_name, + Event* event) { + ASSERT_FALSE(work_queue->IsCurrent()); + work_queue->PostTaskAndReply( + Bind(&CheckCurrent, work_queue_name, nullptr, work_queue), + NewClosure([event]() { event->Set(); })); +} + +// Does a PostTaskAndReply from within a task to post and reply to the current +// queue. All in all there will be 3 tasks posted and run. +TEST(TaskQueueTest, PostAndReply2) { + static const char kQueueName[] = "PostAndReply2"; + static const char kWorkQueueName[] = "PostAndReply2_Worker"; + TaskQueue queue(kQueueName); + TaskQueue work_queue(kWorkQueueName); + + Event event(false, false); + queue.PostTask( + Bind(&TestPostTaskAndReply, &work_queue, kWorkQueueName, &event)); + EXPECT_TRUE(event.Wait(1000)); +} + +// Tests posting more messages than a queue can queue up. +// In situations like that, tasks will get dropped. +TEST(TaskQueueTest, PostALot) { + // To destruct the event after the queue has gone out of scope. + Event event(false, false); + + int tasks_executed = 0; + int tasks_cleaned_up = 0; + static const int kTaskCount = 0xffff; + + { + static const char kQueueName[] = "PostALot"; + TaskQueue queue(kQueueName); + + // On linux, the limit of pending bytes in the pipe buffer is 0xffff. + // So here we post a total of 0xffff+1 messages, which triggers a failure + // case inside of the libevent queue implementation. + + queue.PostTask([&event]() { event.Wait(Event::kForever); }); + for (int i = 0; i < kTaskCount; ++i) + queue.PostTask(NewClosure([&tasks_executed]() { ++tasks_executed; }, + [&tasks_cleaned_up]() { ++tasks_cleaned_up; })); + event.Set(); // Unblock the first task. + } + + EXPECT_GE(tasks_cleaned_up, tasks_executed); + EXPECT_EQ(kTaskCount, tasks_cleaned_up); + + LOG(INFO) << "tasks executed: " << tasks_executed + << ", tasks cleaned up: " << tasks_cleaned_up; +} + +} // namespace rtc diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc new file mode 100644 index 0000000000..5ae6d9275b --- /dev/null +++ b/webrtc/base/task_queue_win.cc @@ -0,0 +1,184 @@ +/* + * Copyright 2016 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/task_queue.h" + +#include +#include + +#include "webrtc/base/checks.h" +#include "webrtc/base/logging.h" + +namespace rtc { +namespace { +#define WM_RUN_TASK WM_USER + 1 +#define WM_QUEUE_DELAYED_TASK WM_USER + 2 + +DWORD g_queue_ptr_tls = 0; + +BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { + g_queue_ptr_tls = TlsAlloc(); + return TRUE; +} + +DWORD GetQueuePtrTls() { + static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT; + InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr); + return g_queue_ptr_tls; +} + +struct ThreadStartupData { + Event* started; + void* thread_context; +}; + +void CALLBACK InitializeQueueThread(ULONG_PTR param) { + MSG msg; + PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); + ThreadStartupData* data = reinterpret_cast(param); + TlsSetValue(GetQueuePtrTls(), data->thread_context); + data->started->Set(); +} +} // namespace + +TaskQueue::TaskQueue(const char* queue_name) + : thread_(&TaskQueue::ThreadMain, this, queue_name) { + RTC_DCHECK(queue_name); + thread_.Start(); + Event event(false, false); + ThreadStartupData startup = {&event, this}; + RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, + reinterpret_cast(&startup))); + event.Wait(Event::kForever); +} + +TaskQueue::~TaskQueue() { + RTC_DCHECK(!IsCurrent()); + while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { + RTC_CHECK(ERROR_NOT_ENOUGH_QUOTA == ::GetLastError()); + Sleep(1); + } + thread_.Stop(); +} + +// static +TaskQueue* TaskQueue::Current() { + return static_cast(TlsGetValue(GetQueuePtrTls())); +} + +// static +bool TaskQueue::IsCurrent(const char* queue_name) { + TaskQueue* current = Current(); + return current && current->thread_.name().compare(queue_name) == 0; +} + +bool TaskQueue::IsCurrent() const { + return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); +} + +void TaskQueue::PostTask(std::unique_ptr task) { + if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, + reinterpret_cast(task.get()))) { + task.release(); + } +} + +void TaskQueue::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + WPARAM wparam; +#if defined(_WIN64) + // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms) + // so this compensation isn't that accurate, but since we have unused 32 bits + // on Win64, we might as well use them. + wparam = (static_cast(::GetTickCount()) << 32) | milliseconds; +#else + wparam = milliseconds; +#endif + if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam, + reinterpret_cast(task.get()))) { + task.release(); + } +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue) { + QueuedTask* task_ptr = task.release(); + QueuedTask* reply_task_ptr = reply.release(); + DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); + PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { + if (task_ptr->Run()) + delete task_ptr; + // If the thread's message queue is full, we can't queue the task and will + // have to drop it (i.e. delete). + if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, + reinterpret_cast(reply_task_ptr))) { + delete reply_task_ptr; + } + }); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply) { + return PostTaskAndReply(std::move(task), std::move(reply), Current()); +} + +// static +bool TaskQueue::ThreadMain(void* context) { + std::unordered_map> delayed_tasks; + + BOOL ret; + MSG msg; + + while ((ret = GetMessage(&msg, nullptr, 0, 0)) != 0 && ret != -1) { + if (!msg.hwnd) { + switch (msg.message) { + case WM_RUN_TASK: { + QueuedTask* task = reinterpret_cast(msg.lParam); + if (task->Run()) + delete task; + break; + } + case WM_QUEUE_DELAYED_TASK: { + QueuedTask* task = reinterpret_cast(msg.lParam); + uint32_t milliseconds = msg.wParam & 0xFFFFFFFF; +#if defined(_WIN64) + // Subtract the time it took to queue the timer. + const DWORD now = GetTickCount(); + DWORD post_time = now - (msg.wParam >> 32); + milliseconds = + post_time > milliseconds ? 0 : milliseconds - post_time; +#endif + UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr); + delayed_tasks.insert(std::make_pair(timer_id, task)); + break; + } + case WM_TIMER: { + KillTimer(nullptr, msg.wParam); + auto found = delayed_tasks.find(msg.wParam); + RTC_DCHECK(found != delayed_tasks.end()); + if (!found->second->Run()) + found->second.release(); + delayed_tasks.erase(found); + break; + } + default: + RTC_NOTREACHED(); + break; + } + } else { + TranslateMessage(&msg); + DispatchMessage(&msg); + } + } + + return false; +} +} // namespace rtc diff --git a/webrtc/build/common.gypi b/webrtc/build/common.gypi index 15aa19155f..4006d1745a 100644 --- a/webrtc/build/common.gypi +++ b/webrtc/build/common.gypi @@ -19,7 +19,9 @@ # Enable to use the Mozilla internal settings. 'build_with_mozilla%': 0, + 'build_for%': '', }, + 'build_for%': '<(build_for)', 'build_with_chromium%': '<(build_with_chromium)', 'build_with_mozilla%': '<(build_with_mozilla%)', 'include_opus%': 1, @@ -41,10 +43,20 @@ 'apk_tests_path%': '<(DEPTH)/webrtc/build/apk_tests.gyp', 'modules_java_gyp_path%': '<(DEPTH)/webrtc/modules/modules_java.gyp', }], + + # Controls whether we use libevent on posix platforms. + # TODO(tommi): Remove the 'build_for' condition once libevent is more + # widely available in posix configurations. + ['OS=="win" or OS=="mac" or OS=="ios" or build_for!=""', { + 'build_libevent%': 0, + }, { + 'build_libevent%': 1, + }], ], }, 'build_with_chromium%': '<(build_with_chromium)', 'build_with_mozilla%': '<(build_with_mozilla)', + 'build_libevent%': '<(build_libevent)', 'webrtc_root%': '<(webrtc_root)', 'apk_tests_path%': '<(apk_tests_path)', 'modules_java_gyp_path%': '<(modules_java_gyp_path)', @@ -56,6 +68,7 @@ }, 'build_with_chromium%': '<(build_with_chromium)', 'build_with_mozilla%': '<(build_with_mozilla)', + 'build_libevent%': '<(build_libevent)', 'webrtc_root%': '<(webrtc_root)', 'apk_tests_path%': '<(apk_tests_path)', 'test_runner_path': '<(DEPTH)/webrtc/build/android/test_runner.py', @@ -319,6 +332,11 @@ }], ], }], + ['build_libevent==1', { + 'defines': [ + 'WEBRTC_BUILD_LIBEVENT', + ], + }], ['target_arch=="arm64"', { 'defines': [ 'WEBRTC_ARCH_ARM64',