diff --git a/webrtc/rtc_base/task_queue.h b/webrtc/rtc_base/task_queue.h index 64e6a6ca5c..5ad0afb220 100644 --- a/webrtc/rtc_base/task_queue.h +++ b/webrtc/rtc_base/task_queue.h @@ -16,10 +16,6 @@ #include #include -#if defined(WEBRTC_MAC) -#include -#endif - #include "webrtc/rtc_base/constructormagic.h" #include "webrtc/rtc_base/criticalsection.h" #include "webrtc/rtc_base/scoped_ref_ptr.h" @@ -236,16 +232,8 @@ class RTC_LOCKABLE TaskQueue { } private: -#if defined(WEBRTC_MAC) - struct QueueContext; - struct TaskContext; - struct PostTaskAndReplyContext; - dispatch_queue_t queue_; - QueueContext* const context_; -#else class Impl; const scoped_refptr impl_; -#endif RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue); }; diff --git a/webrtc/rtc_base/task_queue_gcd.cc b/webrtc/rtc_base/task_queue_gcd.cc index 4094dd5192..70a016b722 100644 --- a/webrtc/rtc_base/task_queue_gcd.cc +++ b/webrtc/rtc_base/task_queue_gcd.cc @@ -16,8 +16,12 @@ #include +#include + #include "webrtc/rtc_base/checks.h" #include "webrtc/rtc_base/logging.h" +#include "webrtc/rtc_base/refcount.h" +#include "webrtc/rtc_base/refcountedobject.h" #include "webrtc/rtc_base/task_queue_posix.h" namespace rtc { @@ -40,79 +44,103 @@ int TaskQueuePriorityToGCD(Priority priority) { using internal::GetQueuePtrTls; using internal::AutoSetCurrentQueuePtr; -struct TaskQueue::QueueContext { - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} +class TaskQueue::Impl : public RefCountInterface { + public: + Impl(const char* queue_name, TaskQueue* task_queue, Priority priority); + ~Impl() override; - static void SetNotActive(void* context) { - QueueContext* qc = static_cast(context); - qc->is_active = false; - } + static TaskQueue* Current(); - static void DeleteContext(void* context) { - QueueContext* qc = static_cast(context); - delete qc; - } + // Used for DCHECKing the current queue. + bool IsCurrent() const; - TaskQueue* const queue; - bool is_active; -}; + void PostTask(std::unique_ptr task); + void PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue::Impl* reply_queue); -struct TaskQueue::TaskContext { - TaskContext(QueueContext* queue_ctx, std::unique_ptr task) - : queue_ctx(queue_ctx), task(std::move(task)) {} - virtual ~TaskContext() {} + void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); - static void RunTask(void* context) { - std::unique_ptr tc(static_cast(context)); - if (tc->queue_ctx->is_active) { - AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue); - if (!tc->task->Run()) - tc->task.release(); + private: + struct QueueContext { + explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} + + static void SetNotActive(void* context) { + QueueContext* qc = static_cast(context); + qc->is_active = false; } - } - QueueContext* const queue_ctx; - std::unique_ptr task; -}; - -// Special case context for holding two tasks, a |first_task| + the task -// that's owned by the parent struct, TaskContext, that then becomes the -// second (i.e. 'reply') task. -struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext { - explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx, - std::unique_ptr first_task, - QueueContext* second_queue_ctx, - std::unique_ptr second_task) - : TaskContext(second_queue_ctx, std::move(second_task)), - first_queue_ctx(first_queue_ctx), - first_task(std::move(first_task)), - reply_queue_(second_queue_ctx->queue->queue_) { - // Retain the reply queue for as long as this object lives. - // If we don't, we may have memory leaks and/or failures. - dispatch_retain(reply_queue_); - } - ~PostTaskAndReplyContext() override { dispatch_release(reply_queue_); } - - static void RunTask(void* context) { - auto* rc = static_cast(context); - if (rc->first_queue_ctx->is_active) { - AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue); - if (!rc->first_task->Run()) - rc->first_task.release(); + static void DeleteContext(void* context) { + QueueContext* qc = static_cast(context); + delete qc; } - // Post the reply task. This hands the work over to the parent struct. - // This task will eventually delete |this|. - dispatch_async_f(rc->reply_queue_, rc, &TaskContext::RunTask); - } - QueueContext* const first_queue_ctx; - std::unique_ptr first_task; - dispatch_queue_t reply_queue_; + TaskQueue* const queue; + bool is_active; + }; + + struct TaskContext { + TaskContext(QueueContext* queue_ctx, std::unique_ptr task) + : queue_ctx(queue_ctx), task(std::move(task)) {} + virtual ~TaskContext() {} + + static void RunTask(void* context) { + std::unique_ptr tc(static_cast(context)); + if (tc->queue_ctx->is_active) { + AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue); + if (!tc->task->Run()) + tc->task.release(); + } + } + + QueueContext* const queue_ctx; + std::unique_ptr task; + }; + + // Special case context for holding two tasks, a |first_task| + the task + // that's owned by the parent struct, TaskContext, that then becomes the + // second (i.e. 'reply') task. + struct PostTaskAndReplyContext : public TaskContext { + explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx, + std::unique_ptr first_task, + QueueContext* second_queue_ctx, + std::unique_ptr second_task) + : TaskContext(second_queue_ctx, std::move(second_task)), + first_queue_ctx(first_queue_ctx), + first_task(std::move(first_task)), + reply_queue_(second_queue_ctx->queue->impl_->queue_) { + // Retain the reply queue for as long as this object lives. + // If we don't, we may have memory leaks and/or failures. + dispatch_retain(reply_queue_); + } + ~PostTaskAndReplyContext() override { dispatch_release(reply_queue_); } + + static void RunTask(void* context) { + auto* rc = static_cast(context); + if (rc->first_queue_ctx->is_active) { + AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue); + if (!rc->first_task->Run()) + rc->first_task.release(); + } + // Post the reply task. This hands the work over to the parent struct. + // This task will eventually delete |this|. + dispatch_async_f(rc->reply_queue_, rc, &TaskContext::RunTask); + } + + QueueContext* const first_queue_ctx; + std::unique_ptr first_task; + dispatch_queue_t reply_queue_; + }; + + dispatch_queue_t queue_; + QueueContext* const context_; }; -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) +TaskQueue::Impl::Impl(const char* queue_name, + TaskQueue* task_queue, + Priority priority) : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), - context_(new QueueContext(this)) { + context_(new QueueContext(task_queue)) { RTC_DCHECK(queue_name); RTC_CHECK(queue_); dispatch_set_context(queue_, context_); @@ -125,7 +153,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0)); } -TaskQueue::~TaskQueue() { +TaskQueue::Impl::~Impl() { RTC_DCHECK(!IsCurrent()); // Implementation/behavioral note: // Dispatch queues are reference counted via calls to dispatch_retain and @@ -141,39 +169,74 @@ TaskQueue::~TaskQueue() { } // static -TaskQueue* TaskQueue::Current() { +TaskQueue* TaskQueue::Impl::Current() { return static_cast(pthread_getspecific(GetQueuePtrTls())); } -bool TaskQueue::IsCurrent() const { +bool TaskQueue::Impl::IsCurrent() const { RTC_DCHECK(queue_); - return this == Current(); + const TaskQueue* current = Current(); + return current && this == current->impl_.get(); } -void TaskQueue::PostTask(std::unique_ptr task) { +void TaskQueue::Impl::PostTask(std::unique_ptr task) { auto* context = new TaskContext(context_, std::move(task)); dispatch_async_f(queue_, context, &TaskContext::RunTask); } -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { +void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { auto* context = new TaskContext(context_, std::move(task)); dispatch_after_f( dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_, context, &TaskContext::RunTask); } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { +void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue::Impl* reply_queue) { auto* context = new PostTaskAndReplyContext( context_, std::move(task), reply_queue->context_, std::move(reply)); dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask); } +// Boilerplate for the PIMPL pattern. +TaskQueue::TaskQueue(const char* queue_name, Priority priority) + : impl_(new RefCountedObject(queue_name, this, priority)) { +} + +TaskQueue::~TaskQueue() {} + +// static +TaskQueue* TaskQueue::Current() { + return TaskQueue::Impl::Current(); +} + +// Used for DCHECKing the current queue. +bool TaskQueue::IsCurrent() const { + return impl_->IsCurrent(); +} + +void TaskQueue::PostTask(std::unique_ptr task) { + return TaskQueue::impl_->PostTask(std::move(task)); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue) { + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), + reply_queue->impl_.get()); +} + void TaskQueue::PostTaskAndReply(std::unique_ptr task, std::unique_ptr reply) { - return PostTaskAndReply(std::move(task), std::move(reply), Current()); + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), + impl_.get()); +} + +void TaskQueue::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); } } // namespace rtc