From 83722268d67f5bd28e77fd86678600d5ee461c81 Mon Sep 17 00:00:00 2001 From: tommi Date: Wed, 15 Mar 2017 04:36:29 -0700 Subject: [PATCH] TaskQueue[Win] DOS handling BUG=webrtc:7341 Review-Url: https://codereview.webrtc.org/2750853002 Cr-Commit-Position: refs/heads/master@{#17242} --- webrtc/base/task_queue.h | 5 +++ webrtc/base/task_queue_win.cc | 65 +++++++++++++++++++++++++++-------- 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h index c5229b1fe2..15b31aa717 100644 --- a/webrtc/base/task_queue.h +++ b/webrtc/base/task_queue.h @@ -13,6 +13,7 @@ #include #include +#include #if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT) #include @@ -274,6 +275,7 @@ class LOCKABLE TaskQueue { QueueContext* const context_; #elif defined(WEBRTC_WIN) class ThreadState; + void RunPendingTasks(); static void ThreadMain(void* context); class WorkerThread : public PlatformThread { @@ -289,6 +291,9 @@ class LOCKABLE TaskQueue { } }; WorkerThread thread_; + rtc::CriticalSection pending_lock_; + std::queue> pending_ GUARDED_BY(pending_lock_); + HANDLE in_queue_; #else #error not supported. #endif diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc index f7332d76ed..a149dd88a6 100644 --- a/webrtc/base/task_queue_win.cc +++ b/webrtc/base/task_queue_win.cc @@ -16,6 +16,7 @@ #include #include +#include "webrtc/base/arraysize.h" #include "webrtc/base/checks.h" #include "webrtc/base/logging.h" #include "webrtc/base/safe_conversions.h" @@ -117,7 +118,8 @@ class DelayedTaskInfo { class MultimediaTimer { public: - MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {} + // Note: We create an event that requires manual reset. + MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {} ~MultimediaTimer() { Cancel(); @@ -134,6 +136,7 @@ class MultimediaTimer { } void Cancel() { + ::ResetEvent(event_); if (timer_id_) { ::timeKillEvent(timer_id_); timer_id_ = 0; @@ -153,7 +156,7 @@ class MultimediaTimer { class TaskQueue::ThreadState { public: - ThreadState() {} + explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} ~ThreadState() {} void RunThreadMain(); @@ -180,14 +183,17 @@ class TaskQueue::ThreadState { greater> timer_tasks_; UINT_PTR timer_id_ = 0; + HANDLE in_queue_; }; TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) : thread_(&TaskQueue::ThreadMain, this, queue_name, - TaskQueuePriorityToThreadPriority(priority)) { + TaskQueuePriorityToThreadPriority(priority)), + in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { RTC_DCHECK(queue_name); + RTC_DCHECK(in_queue_); thread_.Start(); Event event(false, false); ThreadStartupData startup = {&event, this}; @@ -203,6 +209,7 @@ TaskQueue::~TaskQueue() { Sleep(1); } thread_.Stop(); + ::CloseHandle(in_queue_); } // static @@ -221,10 +228,9 @@ bool TaskQueue::IsCurrent() const { } void TaskQueue::PostTask(std::unique_ptr task) { - if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, - reinterpret_cast(task.get()))) { - task.release(); - } + rtc::CritScope lock(&pending_lock_); + pending_.push(std::move(task)); + ::SetEvent(in_queue_); } void TaskQueue::PostDelayedTask(std::unique_ptr task, @@ -268,42 +274,70 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr task, return PostTaskAndReply(std::move(task), std::move(reply), Current()); } +void TaskQueue::RunPendingTasks() { + while (true) { + std::unique_ptr task; + { + rtc::CritScope lock(&pending_lock_); + if (pending_.empty()) + break; + task = std::move(pending_.front()); + pending_.pop(); + } + + if (!task->Run()) + task.release(); + } +} + // static void TaskQueue::ThreadMain(void* context) { - ThreadState state; + ThreadState state(static_cast(context)->in_queue_); state.RunThreadMain(); } void TaskQueue::ThreadState::RunThreadMain() { + HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ }; while (true) { // Make sure we do an alertable wait as that's required to allow APCs to run // (e.g. required for InitializeQueueThread and stopping the thread in // PlatformThread). DWORD result = ::MsgWaitForMultipleObjectsEx( - 1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE); + arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE); RTC_CHECK_NE(WAIT_FAILED, result); - if (result == (WAIT_OBJECT_0 + 1)) { + if (result == (WAIT_OBJECT_0 + 2)) { // There are messages in the message queue that need to be handled. if (!ProcessQueuedMessages()) break; - } else if (result == WAIT_OBJECT_0) { + } + + if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() && + ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) { // The multimedia timer was signaled. timer_.Cancel(); - RTC_DCHECK(!timer_tasks_.empty()); RunDueTasks(); ScheduleNextTimer(); - } else { - RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result); + } + + if (result == (WAIT_OBJECT_0 + 1)) { + ::ResetEvent(in_queue_); + TaskQueue::Current()->RunPendingTasks(); } } } bool TaskQueue::ThreadState::ProcessQueuedMessages() { MSG msg = {}; + // To protect against overly busy message queues, we limit the time + // we process tasks to a few milliseconds. If we don't do that, there's + // a chance that timer tasks won't ever run. + static const int kMaxTaskProcessingTimeMs = 500; + auto start = GetTick(); while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && msg.message != WM_QUIT) { if (!msg.hwnd) { switch (msg.message) { + // TODO(tommi): Stop using this way of queueing tasks. case WM_RUN_TASK: { QueuedTask* task = reinterpret_cast(msg.lParam); if (task->Run()) @@ -339,6 +373,9 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() { ::TranslateMessage(&msg); ::DispatchMessage(&msg); } + + if (GetTick() > start + kMaxTaskProcessingTimeMs) + break; } return msg.message != WM_QUIT; }