From b296d0591cd71baacb4ecbd1a4698a6eb2b17bca Mon Sep 17 00:00:00 2001 From: tommi Date: Fri, 29 Apr 2016 06:03:32 -0700 Subject: [PATCH] Revert of New task queueing primitive for async tasks: TaskQueue. (patchset #5 id:80001 of https://codereview.webrtc.org/1919733002/ ) Reason for revert: Reverting this temporarily while I figure out the issues with the Chrome on android GN debug build. Original issue's description: > New task queueing primitive for async tasks: TaskQueue. > TaskQueue is a new way to asynchronously execute tasks sequentially > in a thread safe manner with minimal locking. The implementation > uses OS supported APIs to do this that are compatible with async IO > notifications from things like sockets and files. > > This class is a part of rtc_base_approved, so can be used by both > the webrtc and libjingle parts of the WebRTC library. Moving forward, > we can replace rtc::Thread and webrtc::ProcessThread with this implementation. > > NOTE: It should not be assumed that all tasks that execute on a TaskQueue, > run on the same thread. E.g. on Mac and iOS, we use GCD dispatch queues > which means that tasks might execute on different threads depending on > what's the most efficient thing to do. TBR=perkj@webrtc.org # Skipping CQ checks because original CL landed less than 1 days ago. NOPRESUBMIT=true NOTREECHECKS=true NOTRY=true Review-Url: https://codereview.webrtc.org/1935483002 Cr-Commit-Position: refs/heads/master@{#12562} --- webrtc/base/BUILD.gn | 17 -- webrtc/base/base.gyp | 19 -- webrtc/base/base_tests.gyp | 1 - webrtc/base/task_queue.h | 281 ------------------------- webrtc/base/task_queue_gcd.cc | 167 --------------- webrtc/base/task_queue_libevent.cc | 318 ----------------------------- webrtc/base/task_queue_posix.cc | 40 ---- webrtc/base/task_queue_posix.h | 36 ---- webrtc/base/task_queue_unittest.cc | 261 ----------------------- webrtc/base/task_queue_win.cc | 184 ----------------- 10 files changed, 1324 deletions(-) delete mode 100644 webrtc/base/task_queue.h delete mode 100644 webrtc/base/task_queue_gcd.cc delete mode 100644 webrtc/base/task_queue_libevent.cc delete mode 100644 webrtc/base/task_queue_posix.cc delete mode 100644 webrtc/base/task_queue_posix.h delete mode 100644 webrtc/base/task_queue_unittest.cc delete mode 100644 webrtc/base/task_queue_win.cc diff --git a/webrtc/base/BUILD.gn b/webrtc/base/BUILD.gn index 4eaf617ac1..11e886ccc0 100644 --- a/webrtc/base/BUILD.gn +++ b/webrtc/base/BUILD.gn @@ -154,12 +154,6 @@ static_library("rtc_base_approved") { "swap_queue.h", "systeminfo.cc", "systeminfo.h", - "task_queue.h", - "task_queue_gcd.cc", - "task_queue_libevent.cc", - "task_queue_posix.cc", - "task_queue_posix.h", - "task_queue_win.cc", "template_util.h", "thread_annotations.h", "thread_checker.h", @@ -184,17 +178,6 @@ static_library("rtc_base_approved") { "logging.h", "logging_mac.mm", ] - if (!is_win && !is_mac && !is_ios) { - deps += [ "//base/third_party/libevent" ] - } - } - - if (is_mac || is_ios || is_win) { - sources -= [ "task_queue_libevent.cc" ] - } - - if (is_linux || is_android || is_win) { - sources -= [ "task_queue_gcd.cc" ] } } diff --git a/webrtc/base/base.gyp b/webrtc/base/base.gyp index 3ea7b15fa1..99ca457276 100644 --- a/webrtc/base/base.gyp +++ b/webrtc/base/base.gyp @@ -85,12 +85,6 @@ 'swap_queue.h', 'systeminfo.cc', 'systeminfo.h', - 'task_queue.h', - 'task_queue_libevent.cc', - 'task_queue_gcd.cc', - 'task_queue_posix.cc', - 'task_queue_posix.h', - 'task_queue_win.cc', 'template_util.h', 'thread_annotations.h', 'thread_checker.h', @@ -118,19 +112,6 @@ 'logging.h', 'logging_mac.mm', ], - 'conditions': [ - ['OS!="win" and OS!="mac" and OS!="ios"', { - 'dependencies': [ - '<(DEPTH)/base/third_party/libevent/libevent.gyp:libevent', - ], - }], - ], - }], - ['OS=="mac" or OS=="ios" or OS=="win"', { - 'sources!': [ 'task_queue_libevent.cc' ], - }], - ['OS=="linux" or OS=="android" or OS=="win"', { - 'sources!': [ 'task_queue_gcd.cc' ], }], ['OS=="mac" and build_with_chromium==0', { 'all_dependent_settings': { diff --git a/webrtc/base/base_tests.gyp b/webrtc/base/base_tests.gyp index 019b576230..8248cef9da 100644 --- a/webrtc/base/base_tests.gyp +++ b/webrtc/base/base_tests.gyp @@ -100,7 +100,6 @@ 'swap_queue_unittest.cc', # TODO(ronghuawu): Reenable this test. # 'systeminfo_unittest.cc', - 'task_queue_unittest.cc', 'task_unittest.cc', 'testclient_unittest.cc', 'thread_checker_unittest.cc', diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h deleted file mode 100644 index 520e5afd10..0000000000 --- a/webrtc/base/task_queue.h +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef WEBRTC_BASE_TASK_QUEUE_H_ -#define WEBRTC_BASE_TASK_QUEUE_H_ - -#if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) -#define LIBEVENT_TASK_QUEUE -#endif - -#include -#include - -#if defined(WEBRTC_MAC) -#include -#endif - -#include "webrtc/base/constructormagic.h" -#include "webrtc/base/criticalsection.h" - -#if !defined(WEBRTC_MAC) -#include "webrtc/base/platform_thread.h" -#endif - -#if defined(LIBEVENT_TASK_QUEUE) -struct event_base; -struct event; -#endif - -namespace rtc { - -// Base interface for asynchronously executed tasks. -// The interface basically consists of a single function, Run(), that executes -// on the target queue. For more details see the Run() method and TaskQueue. -class QueuedTask { - public: - QueuedTask() {} - virtual ~QueuedTask() {} - - // Main routine that will run when the task is executed on the desired queue. - // The task should return |true| to indicate that it should be deleted or - // |false| to indicate that the queue should consider ownership of the task - // having been transferred. Returning |false| can be useful if a task has - // re-posted itself to a different queue or is otherwise being re-used. - virtual bool Run() = 0; - - private: - RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask); -}; - -// Simple implementation of QueuedTask for use with rtc::Bind and lambdas. -template -class ClosureTask : public QueuedTask { - public: - explicit ClosureTask(const Closure& closure) : closure_(closure) {} - - private: - bool Run() override { - closure_(); - return true; - } - - Closure closure_; -}; - -// Extends ClosureTask to also allow specifying cleanup code. -// This is useful when using lambdas if guaranteeing cleanup, even if a task -// was dropped (queue is too full), is required. -template -class ClosureTaskWithCleanup : public ClosureTask { - public: - ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup) - : ClosureTask(closure), cleanup_(cleanup) {} - ~ClosureTaskWithCleanup() { cleanup_(); } - - private: - Cleanup cleanup_; -}; - -// Convenience function to construct closures that can be passed directly -// to methods that support std::unique_ptr but not template -// based parameters. -template -static std::unique_ptr NewClosure(const Closure& closure) { - return std::unique_ptr(new ClosureTask(closure)); -} - -template -static std::unique_ptr NewClosure(const Closure& closure, - const Cleanup& cleanup) { - return std::unique_ptr( - new ClosureTaskWithCleanup(closure, cleanup)); -} - -// Implements a task queue that asynchronously executes tasks in a way that -// guarantees that they're executed in FIFO order and that tasks never overlap. -// Tasks may always execute on the same worker thread and they may not. -// To DCHECK that tasks are executing on a known task queue, use IsCurrent(). -// -// Here are some usage examples: -// -// 1) Asynchronously running a lambda: -// -// class MyClass { -// ... -// TaskQueue queue_("MyQueue"); -// }; -// -// void MyClass::StartWork() { -// queue_.PostTask([]() { Work(); }); -// ... -// -// 2) Doing work asynchronously on a worker queue and providing a notification -// callback on the current queue, when the work has been done: -// -// void MyClass::StartWorkAndLetMeKnowWhenDone( -// std::unique_ptr callback) { -// DCHECK(TaskQueue::Current()) << "Need to be running on a queue"; -// queue_.PostTaskAndReply([]() { Work(); }, std::move(callback)); -// } -// ... -// my_class->StartWorkAndLetMeKnowWhenDone( -// NewClosure([]() { LOG(INFO) << "The work is done!";})); -// -// 3) Posting a custom task on a timer. The task posts itself again after -// every running: -// -// class TimerTask : public QueuedTask { -// public: -// TimerTask() {} -// private: -// bool Run() override { -// ++count_; -// TaskQueue::Current()->PostDelayedTask( -// std::unique_ptr(this), 1000); -// // Ownership has been transferred to the next occurance, -// // so return false to prevent from being deleted now. -// return false; -// } -// int count_ = 0; -// }; -// ... -// queue_.PostDelayedTask( -// std::unique_ptr(new TimerTask()), 1000); -// -// For more examples, see task_queue_unittests.cc. -// -// A note on destruction: -// -// When a TaskQueue is deleted, pending tasks will not be executed but they will -// be deleted. The deletion of tasks may happen asynchronously after the -// TaskQueue itself has been deleted or it may happen synchronously while the -// TaskQueue instance is being deleted. This may vary from one OS to the next -// so assumptions about lifetimes of pending tasks should not be made. -class TaskQueue { - public: - explicit TaskQueue(const char* queue_name); - // TODO(tommi): Implement move semantics? - ~TaskQueue(); - - static TaskQueue* Current(); - - // Used for DCHECKing the current queue. - static bool IsCurrent(const char* queue_name); - bool IsCurrent() const; - - // TODO(tommi): For better debuggability, implement FROM_HERE. - - // Ownership of the task is passed to PostTask. - void PostTask(std::unique_ptr task); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue); - void PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply); - - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); - - template - void PostTask(const Closure& closure) { - PostTask(std::unique_ptr(new ClosureTask(closure))); - } - - template - void PostDelayedTask(const Closure& closure, uint32_t milliseconds) { - PostDelayedTask( - std::unique_ptr(new ClosureTask(closure)), - milliseconds); - } - - template - void PostTaskAndReply(const Closure1& task, - const Closure2& reply, - TaskQueue* reply_queue) { - PostTaskAndReply( - std::unique_ptr(new ClosureTask(task)), - std::unique_ptr(new ClosureTask(reply)), - reply_queue); - } - - template - void PostTaskAndReply(std::unique_ptr task, - const Closure& reply) { - PostTaskAndReply(std::move(task), std::unique_ptr( - new ClosureTask(reply))); - } - - template - void PostTaskAndReply(const Closure& task, - std::unique_ptr reply) { - PostTaskAndReply( - std::unique_ptr(new ClosureTask(task)), - std::move(reply)); - } - - template - void PostTaskAndReply(const Closure1& task, const Closure2& reply) { - PostTaskAndReply( - std::unique_ptr(new ClosureTask(task)), - std::unique_ptr(new ClosureTask(reply))); - } - - private: -#if defined(LIBEVENT_TASK_QUEUE) - static bool ThreadMain(void* context); - static void OnWakeup(int socket, short flags, void* context); // NOLINT - static void RunTask(int fd, short flags, void* context); // NOLINT - static void RunTimer(int fd, short flags, void* context); // NOLINT - - class PostAndReplyTask; - class SetTimerTask; - - void PrepareReplyTask(PostAndReplyTask* reply_task); - void ReplyTaskDone(PostAndReplyTask* reply_task); - - struct QueueContext; - - int wakeup_pipe_in_ = -1; - int wakeup_pipe_out_ = -1; - event_base* event_base_; - std::unique_ptr wakeup_event_; - PlatformThread thread_; - rtc::CriticalSection pending_lock_; - std::list> pending_ GUARDED_BY(pending_lock_); - std::list pending_replies_ GUARDED_BY(pending_lock_); -#elif defined(WEBRTC_MAC) - struct QueueContext; - struct TaskContext; - struct PostTaskAndReplyContext; - dispatch_queue_t queue_; - QueueContext* const context_; -#elif defined(WEBRTC_WIN) - static bool ThreadMain(void* context); - - class WorkerThread : public PlatformThread { - public: - WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name) - : PlatformThread(func, obj, thread_name) {} - - bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { - return PlatformThread::QueueAPC(apc_function, data); - } - }; - WorkerThread thread_; -#else -#error not supported. -#endif - - RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue); -}; - -} // namespace rtc - -#endif // WEBRTC_BASE_TASK_QUEUE_H_ diff --git a/webrtc/base/task_queue_gcd.cc b/webrtc/base/task_queue_gcd.cc deleted file mode 100644 index 2c7d649fc9..0000000000 --- a/webrtc/base/task_queue_gcd.cc +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -// This file contains the implementation of TaskQueue for Mac and iOS. -// The implementation uses Grand Central Dispatch queues (GCD) to -// do the actual task queuing. - -#include "webrtc/base/task_queue.h" - -#include - -#include "webrtc/base/checks.h" -#include "webrtc/base/logging.h" -#include "webrtc/base/task_queue_posix.h" - -namespace rtc { -using internal::GetQueuePtrTls; -using internal::AutoSetCurrentQueuePtr; - -struct TaskQueue::QueueContext { - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} - - static void SetNotActive(void* context) { - QueueContext* qc = static_cast(context); - qc->is_active = false; - } - - static void DeleteContext(void* context) { - QueueContext* qc = static_cast(context); - delete qc; - } - - TaskQueue* const queue; - bool is_active; -}; - -struct TaskQueue::TaskContext { - TaskContext(QueueContext* queue_ctx, std::unique_ptr task) - : queue_ctx(queue_ctx), task(std::move(task)) {} - virtual ~TaskContext() {} - - static void RunTask(void* context) { - std::unique_ptr tc(static_cast(context)); - if (tc->queue_ctx->is_active) { - AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue); - if (!tc->task->Run()) - tc->task.release(); - } - } - - QueueContext* const queue_ctx; - std::unique_ptr task; -}; - -// Special case context for holding two tasks, a |first_task| + the task -// that's owned by the parent struct, TaskContext, that then becomes the -// second (i.e. 'reply') task. -struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext { - explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx, - std::unique_ptr first_task, - QueueContext* second_queue_ctx, - std::unique_ptr second_task) - : TaskContext(second_queue_ctx, std::move(second_task)), - first_queue_ctx(first_queue_ctx), - first_task(std::move(first_task)) { - // Retain the reply queue for as long as this object lives. - // If we don't, we may have memory leaks and/or failures. - dispatch_retain(first_queue_ctx->queue->queue_); - } - ~PostTaskAndReplyContext() override { - dispatch_release(first_queue_ctx->queue->queue_); - } - - static void RunTask(void* context) { - auto* rc = static_cast(context); - if (rc->first_queue_ctx->is_active) { - AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue); - if (!rc->first_task->Run()) - rc->first_task.release(); - } - // Post the reply task. This hands the work over to the parent struct. - // This task will eventually delete |this|. - dispatch_async_f(rc->queue_ctx->queue->queue_, rc, &TaskContext::RunTask); - } - - QueueContext* const first_queue_ctx; - std::unique_ptr first_task; -}; - -TaskQueue::TaskQueue(const char* queue_name) - : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), - context_(new QueueContext(this)) { - RTC_DCHECK(queue_name); - RTC_CHECK(queue_); - dispatch_set_context(queue_, context_); - // Assign a finalizer that will delete the context when the last reference - // to the queue is released. This may run after the TaskQueue object has - // been deleted. - dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext); -} - -TaskQueue::~TaskQueue() { - RTC_DCHECK(!IsCurrent()); - // Implementation/behavioral note: - // Dispatch queues are reference counted via calls to dispatch_retain and - // dispatch_release. Pending blocks submitted to a queue also hold a - // reference to the queue until they have finished. Once all references to a - // queue have been released, the queue will be deallocated by the system. - // This is why we check the context before running tasks. - - // Use dispatch_sync to set the context to null to guarantee that there's not - // a race between checking the context and using it from a task. - dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive); - dispatch_release(queue_); -} - -// static -TaskQueue* TaskQueue::Current() { - return static_cast(pthread_getspecific(GetQueuePtrTls())); -} - -// static -bool TaskQueue::IsCurrent(const char* queue_name) { - TaskQueue* current = Current(); - return current && - strcmp(queue_name, dispatch_queue_get_label(current->queue_)) == 0; -} - -bool TaskQueue::IsCurrent() const { - RTC_DCHECK(queue_); - return this == Current(); -} - -void TaskQueue::PostTask(std::unique_ptr task) { - auto* context = new TaskContext(context_, std::move(task)); - dispatch_async_f(queue_, context, &TaskContext::RunTask); -} - -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - auto* context = new TaskContext(context_, std::move(task)); - dispatch_after_f( - dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_, - context, &TaskContext::RunTask); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { - auto* context = new PostTaskAndReplyContext( - context_, std::move(task), reply_queue->context_, std::move(reply)); - dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return PostTaskAndReply(std::move(task), std::move(reply), Current()); -} - -} // namespace rtc diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc deleted file mode 100644 index a59b450828..0000000000 --- a/webrtc/base/task_queue_libevent.cc +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "webrtc/base/task_queue.h" - -#include -#include -#include - -#include "base/third_party/libevent/event.h" -#include "webrtc/base/checks.h" -#include "webrtc/base/logging.h" -#include "webrtc/base/task_queue_posix.h" -#include "webrtc/base/timeutils.h" - -namespace rtc { -using internal::GetQueuePtrTls; -using internal::AutoSetCurrentQueuePtr; - -namespace { -static const char kQuit = 1; -static const char kRunTask = 2; - -struct TimerEvent { - explicit TimerEvent(std::unique_ptr task) - : task(std::move(task)) {} - ~TimerEvent() { event_del(&ev); } - event ev; - std::unique_ptr task; -}; - -bool SetNonBlocking(int fd) { - const int flags = fcntl(fd, F_GETFL); - RTC_CHECK(flags != -1); - return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1; -} -} // namespace - -struct TaskQueue::QueueContext { - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} - TaskQueue* queue; - bool is_active; - // Holds a list of events pending timers for cleanup when the loop exits. - std::list pending_timers_; -}; - -class TaskQueue::PostAndReplyTask : public QueuedTask { - public: - PostAndReplyTask(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) - : task_(std::move(task)), - reply_(std::move(reply)), - reply_queue_(reply_queue) { - reply_queue->PrepareReplyTask(this); - } - - ~PostAndReplyTask() override { - CritScope lock(&lock_); - if (reply_queue_) - reply_queue_->ReplyTaskDone(this); - } - - void OnReplyQueueGone() { - CritScope lock(&lock_); - reply_queue_ = nullptr; - } - - private: - bool Run() override { - if (!task_->Run()) - task_.release(); - - CritScope lock(&lock_); - if (reply_queue_) - reply_queue_->PostTask(std::move(reply_)); - return true; - } - - CriticalSection lock_; - std::unique_ptr task_; - std::unique_ptr reply_; - TaskQueue* reply_queue_ GUARDED_BY(lock_); -}; - -class TaskQueue::SetTimerTask : public QueuedTask { - public: - SetTimerTask(std::unique_ptr task, uint32_t milliseconds) - : task_(std::move(task)), - milliseconds_(milliseconds), - posted_(Time32()) {} - - private: - bool Run() override { - // Compensate for the time that has passed since construction - // and until we got here. - uint32_t post_time = Time32() - posted_; - TaskQueue::Current()->PostDelayedTask( - std::move(task_), - post_time > milliseconds_ ? 0 : milliseconds_ - post_time); - return true; - } - - std::unique_ptr task_; - const uint32_t milliseconds_; - const uint32_t posted_; -}; - -TaskQueue::TaskQueue(const char* queue_name) - : event_base_(event_base_new()), - wakeup_event_(new event()), - thread_(&TaskQueue::ThreadMain, this, queue_name) { - RTC_DCHECK(queue_name); - int fds[2]; - RTC_CHECK(pipe(fds) == 0); - SetNonBlocking(fds[0]); - SetNonBlocking(fds[1]); - wakeup_pipe_out_ = fds[0]; - wakeup_pipe_in_ = fds[1]; - event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST, - OnWakeup, this); - event_base_set(event_base_, wakeup_event_.get()); - event_add(wakeup_event_.get(), 0); - thread_.Start(); -} - -TaskQueue::~TaskQueue() { - RTC_DCHECK(!IsCurrent()); - struct timespec ts; - char message = kQuit; - while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { - // The queue is full, so we have no choice but to wait and retry. - RTC_CHECK_EQ(EAGAIN, errno); - ts.tv_sec = 0; - ts.tv_nsec = 1000000; - nanosleep(&ts, nullptr); - } - - thread_.Stop(); - - event_del(wakeup_event_.get()); - close(wakeup_pipe_in_); - close(wakeup_pipe_out_); - wakeup_pipe_in_ = -1; - wakeup_pipe_out_ = -1; - - { - // Synchronize against any pending reply tasks that might be running on - // other queues. - CritScope lock(&pending_lock_); - for (auto* reply : pending_replies_) - reply->OnReplyQueueGone(); - pending_replies_.clear(); - } - - event_base_free(event_base_); -} - -// static -TaskQueue* TaskQueue::Current() { - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - return ctx ? ctx->queue : nullptr; -} - -// static -bool TaskQueue::IsCurrent(const char* queue_name) { - TaskQueue* current = Current(); - return current && current->thread_.name().compare(queue_name) == 0; -} - -bool TaskQueue::IsCurrent() const { - return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); -} - -void TaskQueue::PostTask(std::unique_ptr task) { - RTC_DCHECK(task.get()); - // libevent isn't thread safe. This means that we can't use methods such - // as event_base_once to post tasks to the worker thread from a different - // thread. However, we can use it when posting from the worker thread itself. - if (IsCurrent()) { - if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, - task.get(), nullptr) == 0) { - task.release(); - } - } else { - QueuedTask* task_id = task.get(); // Only used for comparison. - { - CritScope lock(&pending_lock_); - pending_.push_back(std::move(task)); - } - char message = kRunTask; - if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { - LOG(WARNING) << "Failed to queue task."; - CritScope lock(&pending_lock_); - pending_.remove_if([task_id](std::unique_ptr& t) { - return t.get() == task_id; - }); - } - } -} - -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - if (IsCurrent()) { - TimerEvent* timer = new TimerEvent(std::move(task)); - evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer); - event_base_set(event_base_, &timer->ev); - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - ctx->pending_timers_.push_back(timer); - timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000}; - event_add(&timer->ev, &tv); - } else { - PostTask(std::unique_ptr( - new SetTimerTask(std::move(task), milliseconds))); - } -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { - std::unique_ptr wrapper_task( - new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); - PostTask(std::move(wrapper_task)); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return PostTaskAndReply(std::move(task), std::move(reply), Current()); -} - -// static -bool TaskQueue::ThreadMain(void* context) { - TaskQueue* me = static_cast(context); - - QueueContext queue_context(me); - pthread_setspecific(GetQueuePtrTls(), &queue_context); - - while (queue_context.is_active) - event_base_loop(me->event_base_, 0); - - pthread_setspecific(GetQueuePtrTls(), nullptr); - - for (TimerEvent* timer : queue_context.pending_timers_) - delete timer; - - return false; -} - -// static -void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); - char buf; - RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); - switch (buf) { - case kQuit: - ctx->is_active = false; - event_base_loopbreak(ctx->queue->event_base_); - break; - case kRunTask: { - std::unique_ptr task; - { - CritScope lock(&ctx->queue->pending_lock_); - RTC_DCHECK(!ctx->queue->pending_.empty()); - task = std::move(ctx->queue->pending_.front()); - ctx->queue->pending_.pop_front(); - RTC_DCHECK(task.get()); - } - if (!task->Run()) - task.release(); - break; - } - default: - RTC_NOTREACHED(); - break; - } -} - -// static -void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT - auto* task = static_cast(context); - if (task->Run()) - delete task; -} - -// static -void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT - TimerEvent* timer = static_cast(context); - if (!timer->task->Run()) - timer->task.release(); - QueueContext* ctx = - static_cast(pthread_getspecific(GetQueuePtrTls())); - ctx->pending_timers_.remove(timer); - delete timer; -} - -void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { - RTC_DCHECK(reply_task); - CritScope lock(&pending_lock_); - pending_replies_.push_back(reply_task); -} - -void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { - CritScope lock(&pending_lock_); - pending_replies_.remove(reply_task); -} - -} // namespace rtc diff --git a/webrtc/base/task_queue_posix.cc b/webrtc/base/task_queue_posix.cc deleted file mode 100644 index 3b00ac8e12..0000000000 --- a/webrtc/base/task_queue_posix.cc +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "webrtc/base/task_queue_posix.h" - -#include "webrtc/base/checks.h" -#include "webrtc/base/task_queue.h" - -namespace rtc { -namespace internal { -pthread_key_t g_queue_ptr_tls = 0; - -void InitializeTls() { - RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0); -} - -pthread_key_t GetQueuePtrTls() { - static pthread_once_t init_once = PTHREAD_ONCE_INIT; - RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0); - return g_queue_ptr_tls; -} - -AutoSetCurrentQueuePtr::AutoSetCurrentQueuePtr(TaskQueue* q) - : prev_(TaskQueue::Current()) { - pthread_setspecific(GetQueuePtrTls(), q); -} - -AutoSetCurrentQueuePtr::~AutoSetCurrentQueuePtr() { - pthread_setspecific(GetQueuePtrTls(), prev_); -} - -} // namespace internal -} // namespace rtc diff --git a/webrtc/base/task_queue_posix.h b/webrtc/base/task_queue_posix.h deleted file mode 100644 index b677b78a38..0000000000 --- a/webrtc/base/task_queue_posix.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef WEBRTC_BASE_TASK_QUEUE_POSIX_H_ -#define WEBRTC_BASE_TASK_QUEUE_POSIX_H_ - -#include - -namespace rtc { - -class TaskQueue; - -namespace internal { - -class AutoSetCurrentQueuePtr { - public: - explicit AutoSetCurrentQueuePtr(TaskQueue* q); - ~AutoSetCurrentQueuePtr(); - - private: - TaskQueue* const prev_; -}; - -pthread_key_t GetQueuePtrTls(); - -} // namespace internal -} // namespace rtc - -#endif // WEBRTC_BASE_TASK_QUEUE_POSIX_H_ diff --git a/webrtc/base/task_queue_unittest.cc b/webrtc/base/task_queue_unittest.cc deleted file mode 100644 index db4e6c2f7e..0000000000 --- a/webrtc/base/task_queue_unittest.cc +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include -#include - -#include "webrtc/base/bind.h" -#include "webrtc/base/event.h" -#include "webrtc/base/gunit.h" -#include "webrtc/base/task_queue.h" -#include "webrtc/base/timeutils.h" - -namespace rtc { - -namespace { -void CheckCurrent(const char* expected_queue, Event* signal, TaskQueue* queue) { - EXPECT_TRUE(TaskQueue::IsCurrent(expected_queue)); - EXPECT_TRUE(queue->IsCurrent()); - if (signal) - signal->Set(); -} - -} // namespace - -TEST(TaskQueueTest, Construct) { - static const char kQueueName[] = "Construct"; - TaskQueue queue(kQueueName); - EXPECT_FALSE(queue.IsCurrent()); -} - -TEST(TaskQueueTest, PostAndCheckCurrent) { - static const char kQueueName[] = "PostAndCheckCurrent"; - TaskQueue queue(kQueueName); - - // We're not running a task, so there shouldn't be a current queue. - EXPECT_FALSE(queue.IsCurrent()); - EXPECT_FALSE(TaskQueue::Current()); - - Event event(false, false); - queue.PostTask(Bind(&CheckCurrent, kQueueName, &event, &queue)); - EXPECT_TRUE(event.Wait(1000)); -} - -TEST(TaskQueueTest, PostCustomTask) { - static const char kQueueName[] = "PostCustomImplementation"; - TaskQueue queue(kQueueName); - - Event event(false, false); - - class CustomTask : public QueuedTask { - public: - explicit CustomTask(Event* event) : event_(event) {} - - private: - bool Run() override { - event_->Set(); - return false; // Never allows the task to be deleted by the queue. - } - - Event* const event_; - } my_task(&event); - - // Please don't do this in production code! :) - queue.PostTask(std::unique_ptr(&my_task)); - EXPECT_TRUE(event.Wait(1000)); -} - -TEST(TaskQueueTest, PostLambda) { - static const char kQueueName[] = "PostLambda"; - TaskQueue queue(kQueueName); - - Event event(false, false); - queue.PostTask([&event]() { event.Set(); }); - EXPECT_TRUE(event.Wait(1000)); -} - -TEST(TaskQueueTest, PostFromQueue) { - static const char kQueueName[] = "PostFromQueue"; - TaskQueue queue(kQueueName); - - Event event(false, false); - queue.PostTask( - [&event, &queue]() { queue.PostTask([&event]() { event.Set(); }); }); - EXPECT_TRUE(event.Wait(1000)); -} - -TEST(TaskQueueTest, PostDelayed) { - static const char kQueueName[] = "PostDelayed"; - TaskQueue queue(kQueueName); - - Event event(false, false); - uint32_t start = Time(); - queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100); - EXPECT_TRUE(event.Wait(1000)); - uint32_t end = Time(); - EXPECT_GE(end - start, 100u); - EXPECT_NEAR(end - start, 200u, 100u); // Accept 100-300. -} - -TEST(TaskQueueTest, PostMultipleDelayed) { - static const char kQueueName[] = "PostMultipleDelayed"; - TaskQueue queue(kQueueName); - - std::vector> events; - for (int i = 0; i < 10; ++i) { - events.push_back(std::unique_ptr(new Event(false, false))); - queue.PostDelayedTask( - Bind(&CheckCurrent, kQueueName, events.back().get(), &queue), 10); - } - - for (const auto& e : events) - EXPECT_TRUE(e->Wait(100)); -} - -TEST(TaskQueueTest, PostDelayedAfterDestruct) { - static const char kQueueName[] = "PostDelayedAfterDestruct"; - Event event(false, false); - { - TaskQueue queue(kQueueName); - queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100); - } - EXPECT_FALSE(event.Wait(200)); // Task should not run. -} - -TEST(TaskQueueTest, PostAndReply) { - static const char kPostQueue[] = "PostQueue"; - static const char kReplyQueue[] = "ReplyQueue"; - TaskQueue post_queue(kPostQueue); - TaskQueue reply_queue(kReplyQueue); - - Event event(false, false); - post_queue.PostTaskAndReply( - Bind(&CheckCurrent, kPostQueue, nullptr, &post_queue), - Bind(&CheckCurrent, kReplyQueue, &event, &reply_queue), &reply_queue); - EXPECT_TRUE(event.Wait(1000)); -} - -TEST(TaskQueueTest, PostAndReuse) { - static const char kPostQueue[] = "PostQueue"; - static const char kReplyQueue[] = "ReplyQueue"; - TaskQueue post_queue(kPostQueue); - TaskQueue reply_queue(kReplyQueue); - - int call_count = 0; - - class ReusedTask : public QueuedTask { - public: - ReusedTask(int* counter, TaskQueue* reply_queue, Event* event) - : counter_(counter), reply_queue_(reply_queue), event_(event) { - EXPECT_EQ(0, *counter_); - } - - private: - bool Run() override { - if (++(*counter_) == 1) { - std::unique_ptr myself(this); - reply_queue_->PostTask(std::move(myself)); - // At this point, the object is owned by reply_queue_ and it's - // theoratically possible that the object has been deleted (e.g. if - // posting wasn't possible). So, don't touch any member variables here. - - // Indicate to the current queue that ownership has been transferred. - return false; - } else { - EXPECT_EQ(2, *counter_); - EXPECT_TRUE(reply_queue_->IsCurrent()); - event_->Set(); - return true; // Indicate that the object should be deleted. - } - } - - int* const counter_; - TaskQueue* const reply_queue_; - Event* const event_; - }; - - Event event(false, false); - std::unique_ptr task( - new ReusedTask(&call_count, &reply_queue, &event)); - - post_queue.PostTask(std::move(task)); - EXPECT_TRUE(event.Wait(1000)); -} - -TEST(TaskQueueTest, PostAndReplyLambda) { - static const char kPostQueue[] = "PostQueue"; - static const char kReplyQueue[] = "ReplyQueue"; - TaskQueue post_queue(kPostQueue); - TaskQueue reply_queue(kReplyQueue); - - Event event(false, false); - bool my_flag = false; - post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; }, - [&event]() { event.Set(); }, &reply_queue); - EXPECT_TRUE(event.Wait(1000)); - EXPECT_TRUE(my_flag); -} - -void TestPostTaskAndReply(TaskQueue* work_queue, - const char* work_queue_name, - Event* event) { - ASSERT_FALSE(work_queue->IsCurrent()); - work_queue->PostTaskAndReply( - Bind(&CheckCurrent, work_queue_name, nullptr, work_queue), - NewClosure([event]() { event->Set(); })); -} - -// Does a PostTaskAndReply from within a task to post and reply to the current -// queue. All in all there will be 3 tasks posted and run. -TEST(TaskQueueTest, PostAndReply2) { - static const char kQueueName[] = "PostAndReply2"; - static const char kWorkQueueName[] = "PostAndReply2_Worker"; - TaskQueue queue(kQueueName); - TaskQueue work_queue(kWorkQueueName); - - Event event(false, false); - queue.PostTask( - Bind(&TestPostTaskAndReply, &work_queue, kWorkQueueName, &event)); - EXPECT_TRUE(event.Wait(1000)); -} - -// Tests posting more messages than a queue can queue up. -// In situations like that, tasks will get dropped. -TEST(TaskQueueTest, PostALot) { - // To destruct the event after the queue has gone out of scope. - Event event(false, false); - - int tasks_executed = 0; - int tasks_cleaned_up = 0; - static const int kTaskCount = 0xffff; - - { - static const char kQueueName[] = "PostALot"; - TaskQueue queue(kQueueName); - - // On linux, the limit of pending bytes in the pipe buffer is 0xffff. - // So here we post a total of 0xffff+1 messages, which triggers a failure - // case inside of the libevent queue implementation. - - queue.PostTask([&event]() { event.Wait(Event::kForever); }); - for (int i = 0; i < kTaskCount; ++i) - queue.PostTask(NewClosure([&tasks_executed]() { ++tasks_executed; }, - [&tasks_cleaned_up]() { ++tasks_cleaned_up; })); - event.Set(); // Unblock the first task. - } - - EXPECT_GE(tasks_cleaned_up, tasks_executed); - EXPECT_EQ(kTaskCount, tasks_cleaned_up); - - LOG(INFO) << "tasks executed: " << tasks_executed - << ", tasks cleaned up: " << tasks_cleaned_up; -} - -} // namespace rtc diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc deleted file mode 100644 index 5ae6d9275b..0000000000 --- a/webrtc/base/task_queue_win.cc +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "webrtc/base/task_queue.h" - -#include -#include - -#include "webrtc/base/checks.h" -#include "webrtc/base/logging.h" - -namespace rtc { -namespace { -#define WM_RUN_TASK WM_USER + 1 -#define WM_QUEUE_DELAYED_TASK WM_USER + 2 - -DWORD g_queue_ptr_tls = 0; - -BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { - g_queue_ptr_tls = TlsAlloc(); - return TRUE; -} - -DWORD GetQueuePtrTls() { - static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT; - InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr); - return g_queue_ptr_tls; -} - -struct ThreadStartupData { - Event* started; - void* thread_context; -}; - -void CALLBACK InitializeQueueThread(ULONG_PTR param) { - MSG msg; - PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); - ThreadStartupData* data = reinterpret_cast(param); - TlsSetValue(GetQueuePtrTls(), data->thread_context); - data->started->Set(); -} -} // namespace - -TaskQueue::TaskQueue(const char* queue_name) - : thread_(&TaskQueue::ThreadMain, this, queue_name) { - RTC_DCHECK(queue_name); - thread_.Start(); - Event event(false, false); - ThreadStartupData startup = {&event, this}; - RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, - reinterpret_cast(&startup))); - event.Wait(Event::kForever); -} - -TaskQueue::~TaskQueue() { - RTC_DCHECK(!IsCurrent()); - while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { - RTC_CHECK(ERROR_NOT_ENOUGH_QUOTA == ::GetLastError()); - Sleep(1); - } - thread_.Stop(); -} - -// static -TaskQueue* TaskQueue::Current() { - return static_cast(TlsGetValue(GetQueuePtrTls())); -} - -// static -bool TaskQueue::IsCurrent(const char* queue_name) { - TaskQueue* current = Current(); - return current && current->thread_.name().compare(queue_name) == 0; -} - -bool TaskQueue::IsCurrent() const { - return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); -} - -void TaskQueue::PostTask(std::unique_ptr task) { - if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, - reinterpret_cast(task.get()))) { - task.release(); - } -} - -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - WPARAM wparam; -#if defined(_WIN64) - // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms) - // so this compensation isn't that accurate, but since we have unused 32 bits - // on Win64, we might as well use them. - wparam = (static_cast(::GetTickCount()) << 32) | milliseconds; -#else - wparam = milliseconds; -#endif - if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam, - reinterpret_cast(task.get()))) { - task.release(); - } -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { - QueuedTask* task_ptr = task.release(); - QueuedTask* reply_task_ptr = reply.release(); - DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); - PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { - if (task_ptr->Run()) - delete task_ptr; - // If the thread's message queue is full, we can't queue the task and will - // have to drop it (i.e. delete). - if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, - reinterpret_cast(reply_task_ptr))) { - delete reply_task_ptr; - } - }); -} - -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return PostTaskAndReply(std::move(task), std::move(reply), Current()); -} - -// static -bool TaskQueue::ThreadMain(void* context) { - std::unordered_map> delayed_tasks; - - BOOL ret; - MSG msg; - - while ((ret = GetMessage(&msg, nullptr, 0, 0)) != 0 && ret != -1) { - if (!msg.hwnd) { - switch (msg.message) { - case WM_RUN_TASK: { - QueuedTask* task = reinterpret_cast(msg.lParam); - if (task->Run()) - delete task; - break; - } - case WM_QUEUE_DELAYED_TASK: { - QueuedTask* task = reinterpret_cast(msg.lParam); - uint32_t milliseconds = msg.wParam & 0xFFFFFFFF; -#if defined(_WIN64) - // Subtract the time it took to queue the timer. - const DWORD now = GetTickCount(); - DWORD post_time = now - (msg.wParam >> 32); - milliseconds = - post_time > milliseconds ? 0 : milliseconds - post_time; -#endif - UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr); - delayed_tasks.insert(std::make_pair(timer_id, task)); - break; - } - case WM_TIMER: { - KillTimer(nullptr, msg.wParam); - auto found = delayed_tasks.find(msg.wParam); - RTC_DCHECK(found != delayed_tasks.end()); - if (!found->second->Run()) - found->second.release(); - delayed_tasks.erase(found); - break; - } - default: - RTC_NOTREACHED(); - break; - } - } else { - TranslateMessage(&msg); - DispatchMessage(&msg); - } - } - - return false; -} -} // namespace rtc