From 341c8e40b7f7202b13a4b93e225f4ec8320c9eea Mon Sep 17 00:00:00 2001 From: nisse Date: Wed, 6 Sep 2017 04:38:22 -0700 Subject: [PATCH] Convert windows TaskQueue implementation to pimpl convention. BUG=webrtc:8160,webrtc:8166 Review-Url: https://codereview.webrtc.org/3009133002 Cr-Commit-Position: refs/heads/master@{#19709} --- webrtc/rtc_base/task_queue.h | 25 ---- webrtc/rtc_base/task_queue_win.cc | 195 ++++++++++++++++++++++-------- 2 files changed, 146 insertions(+), 74 deletions(-) diff --git a/webrtc/rtc_base/task_queue.h b/webrtc/rtc_base/task_queue.h index 5dee6e613d..966b29c40d 100644 --- a/webrtc/rtc_base/task_queue.h +++ b/webrtc/rtc_base/task_queue.h @@ -24,10 +24,6 @@ #include "webrtc/rtc_base/criticalsection.h" #include "webrtc/rtc_base/scoped_ref_ptr.h" -#if defined(WEBRTC_WIN) -#include "webrtc/rtc_base/platform_thread.h" -#endif - namespace rtc { // Base interface for asynchronously executed tasks. @@ -246,27 +242,6 @@ class LOCKABLE TaskQueue { struct PostTaskAndReplyContext; dispatch_queue_t queue_; QueueContext* const context_; -#elif defined(WEBRTC_WIN) - class ThreadState; - void RunPendingTasks(); - static void ThreadMain(void* context); - - class WorkerThread : public PlatformThread { - public: - WorkerThread(ThreadRunFunction func, - void* obj, - const char* thread_name, - ThreadPriority priority) - : PlatformThread(func, obj, thread_name, priority) {} - - bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { - return PlatformThread::QueueAPC(apc_function, data); - } - }; - WorkerThread thread_; - rtc::CriticalSection pending_lock_; - std::queue> pending_ GUARDED_BY(pending_lock_); - HANDLE in_queue_; #else class Impl; const scoped_refptr impl_; diff --git a/webrtc/rtc_base/task_queue_win.cc b/webrtc/rtc_base/task_queue_win.cc index 9b4d1a2308..00a8c79827 100644 --- a/webrtc/rtc_base/task_queue_win.cc +++ b/webrtc/rtc_base/task_queue_win.cc @@ -18,7 +18,11 @@ #include "webrtc/rtc_base/arraysize.h" #include "webrtc/rtc_base/checks.h" +#include "webrtc/rtc_base/event.h" #include "webrtc/rtc_base/logging.h" +#include "webrtc/rtc_base/platform_thread.h" +#include "webrtc/rtc_base/refcount.h" +#include "webrtc/rtc_base/refcountedobject.h" #include "webrtc/rtc_base/safe_conversions.h" #include "webrtc/rtc_base/timeutils.h" @@ -154,44 +158,97 @@ class MultimediaTimer { } // namespace -class TaskQueue::ThreadState { +class TaskQueue::Impl : public RefCountInterface { public: - explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} - ~ThreadState() {} + Impl(const char* queue_name, TaskQueue* queue, Priority priority); + ~Impl() override; - void RunThreadMain(); + static TaskQueue::Impl* Current(); + static TaskQueue* CurrentQueue(); + + // Used for DCHECKing the current queue. + bool IsCurrent() const; + + template ::value>::type* = nullptr> + void PostTask(const Closure& closure) { + PostTask(std::unique_ptr(new ClosureTask(closure))); + } + + void PostTask(std::unique_ptr task); + void PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue::Impl* reply_queue); + + void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); + + void RunPendingTasks(); private: - bool ProcessQueuedMessages(); - void RunDueTasks(); - void ScheduleNextTimer(); - void CancelTimers(); + static void ThreadMain(void* context); - // Since priority_queue<> by defult orders items in terms of - // largest->smallest, using std::less<>, and we want smallest->largest, - // we would like to use std::greater<> here. Alas it's only available in - // C++14 and later, so we roll our own compare template that that relies on - // operator<(). - template - struct greater { - bool operator()(const T& l, const T& r) { return l > r; } + class WorkerThread : public PlatformThread { + public: + WorkerThread(ThreadRunFunction func, + void* obj, + const char* thread_name, + ThreadPriority priority) + : PlatformThread(func, obj, thread_name, priority) {} + + bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { + return PlatformThread::QueueAPC(apc_function, data); + } }; - MultimediaTimer timer_; - std::priority_queue, - greater> - timer_tasks_; - UINT_PTR timer_id_ = 0; + class ThreadState { + public: + explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} + ~ThreadState() {} + + void RunThreadMain(); + + private: + bool ProcessQueuedMessages(); + void RunDueTasks(); + void ScheduleNextTimer(); + void CancelTimers(); + + // Since priority_queue<> by defult orders items in terms of + // largest->smallest, using std::less<>, and we want smallest->largest, + // we would like to use std::greater<> here. Alas it's only available in + // C++14 and later, so we roll our own compare template that that relies on + // operator<(). + template + struct greater { + bool operator()(const T& l, const T& r) { return l > r; } + }; + + MultimediaTimer timer_; + std::priority_queue, + greater> + timer_tasks_; + UINT_PTR timer_id_ = 0; + HANDLE in_queue_; + }; + + TaskQueue* const queue_; + WorkerThread thread_; + rtc::CriticalSection pending_lock_; + std::queue> pending_ GUARDED_BY(pending_lock_); HANDLE in_queue_; }; -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) - : thread_(&TaskQueue::ThreadMain, +TaskQueue::Impl::Impl(const char* queue_name, + TaskQueue* queue, + Priority priority) + : queue_(queue), + thread_(&TaskQueue::Impl::ThreadMain, this, queue_name, TaskQueuePriorityToThreadPriority(priority)), - in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { + in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { RTC_DCHECK(queue_name); RTC_DCHECK(in_queue_); thread_.Start(); @@ -202,7 +259,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) event.Wait(Event::kForever); } -TaskQueue::~TaskQueue() { +TaskQueue::Impl::~Impl() { RTC_DCHECK(!IsCurrent()); while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); @@ -213,22 +270,28 @@ TaskQueue::~TaskQueue() { } // static -TaskQueue* TaskQueue::Current() { - return static_cast(::TlsGetValue(GetQueuePtrTls())); +TaskQueue::Impl* TaskQueue::Impl::Current() { + return static_cast(::TlsGetValue(GetQueuePtrTls())); } -bool TaskQueue::IsCurrent() const { +// static +TaskQueue* TaskQueue::Impl::CurrentQueue() { + TaskQueue::Impl* current = Current(); + return current ? current->queue_ : nullptr; +} + +bool TaskQueue::Impl::IsCurrent() const { return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); } -void TaskQueue::PostTask(std::unique_ptr task) { +void TaskQueue::Impl::PostTask(std::unique_ptr task) { rtc::CritScope lock(&pending_lock_); pending_.push(std::move(task)); ::SetEvent(in_queue_); } -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { +void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { if (!milliseconds) { PostTask(std::move(task)); return; @@ -245,9 +308,9 @@ void TaskQueue::PostDelayedTask(std::unique_ptr task, } } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply, - TaskQueue* reply_queue) { +void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue::Impl* reply_queue) { QueuedTask* task_ptr = task.release(); QueuedTask* reply_task_ptr = reply.release(); DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); @@ -263,12 +326,7 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr task, }); } -void TaskQueue::PostTaskAndReply(std::unique_ptr task, - std::unique_ptr reply) { - return PostTaskAndReply(std::move(task), std::move(reply), Current()); -} - -void TaskQueue::RunPendingTasks() { +void TaskQueue::Impl::RunPendingTasks() { while (true) { std::unique_ptr task; { @@ -285,12 +343,12 @@ void TaskQueue::RunPendingTasks() { } // static -void TaskQueue::ThreadMain(void* context) { - ThreadState state(static_cast(context)->in_queue_); +void TaskQueue::Impl::ThreadMain(void* context) { + ThreadState state(static_cast(context)->in_queue_); state.RunThreadMain(); } -void TaskQueue::ThreadState::RunThreadMain() { +void TaskQueue::Impl::ThreadState::RunThreadMain() { HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ }; while (true) { // Make sure we do an alertable wait as that's required to allow APCs to run @@ -315,12 +373,12 @@ void TaskQueue::ThreadState::RunThreadMain() { if (result == (WAIT_OBJECT_0 + 1)) { ::ResetEvent(in_queue_); - TaskQueue::Current()->RunPendingTasks(); + TaskQueue::Impl::Current()->RunPendingTasks(); } } } -bool TaskQueue::ThreadState::ProcessQueuedMessages() { +bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() { MSG msg = {}; // To protect against overly busy message queues, we limit the time // we process tasks to a few milliseconds. If we don't do that, there's @@ -374,7 +432,7 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() { return msg.message != WM_QUIT; } -void TaskQueue::ThreadState::RunDueTasks() { +void TaskQueue::Impl::ThreadState::RunDueTasks() { RTC_DCHECK(!timer_tasks_.empty()); auto now = GetTick(); do { @@ -386,7 +444,7 @@ void TaskQueue::ThreadState::RunDueTasks() { } while (!timer_tasks_.empty()); } -void TaskQueue::ThreadState::ScheduleNextTimer() { +void TaskQueue::Impl::ThreadState::ScheduleNextTimer() { RTC_DCHECK_EQ(timer_id_, 0); if (timer_tasks_.empty()) return; @@ -398,7 +456,7 @@ void TaskQueue::ThreadState::ScheduleNextTimer() { timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr); } -void TaskQueue::ThreadState::CancelTimers() { +void TaskQueue::Impl::ThreadState::CancelTimers() { timer_.Cancel(); if (timer_id_) { ::KillTimer(nullptr, timer_id_); @@ -406,4 +464,43 @@ void TaskQueue::ThreadState::CancelTimers() { } } +// Boilerplate for the PIMPL pattern. +TaskQueue::TaskQueue(const char* queue_name, Priority priority) + : impl_(new RefCountedObject(queue_name, this, priority)) { +} + +TaskQueue::~TaskQueue() {} + +// static +TaskQueue* TaskQueue::Current() { + return TaskQueue::Impl::CurrentQueue(); +} + +// Used for DCHECKing the current queue. +bool TaskQueue::IsCurrent() const { + return impl_->IsCurrent(); +} + +void TaskQueue::PostTask(std::unique_ptr task) { + return TaskQueue::impl_->PostTask(std::move(task)); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply, + TaskQueue* reply_queue) { + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), + reply_queue->impl_.get()); +} + +void TaskQueue::PostTaskAndReply(std::unique_ptr task, + std::unique_ptr reply) { + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), + impl_.get()); +} + +void TaskQueue::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); +} + } // namespace rtc