TaskQueue[Win] DOS handling

BUG=webrtc:7341

Review-Url: https://codereview.webrtc.org/2750853002
Cr-Commit-Position: refs/heads/master@{#17242}
This commit is contained in:
tommi 2017-03-15 04:36:29 -07:00 committed by Commit bot
parent 35b7de480b
commit 83722268d6
2 changed files with 56 additions and 14 deletions

View File

@ -13,6 +13,7 @@
#include <list>
#include <memory>
#include <queue>
#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT)
#include <dispatch/dispatch.h>
@ -274,6 +275,7 @@ class LOCKABLE TaskQueue {
QueueContext* const context_;
#elif defined(WEBRTC_WIN)
class ThreadState;
void RunPendingTasks();
static void ThreadMain(void* context);
class WorkerThread : public PlatformThread {
@ -289,6 +291,9 @@ class LOCKABLE TaskQueue {
}
};
WorkerThread thread_;
rtc::CriticalSection pending_lock_;
std::queue<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
HANDLE in_queue_;
#else
#error not supported.
#endif

View File

@ -16,6 +16,7 @@
#include <algorithm>
#include <queue>
#include "webrtc/base/arraysize.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/safe_conversions.h"
@ -117,7 +118,8 @@ class DelayedTaskInfo {
class MultimediaTimer {
public:
MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
// Note: We create an event that requires manual reset.
MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {}
~MultimediaTimer() {
Cancel();
@ -134,6 +136,7 @@ class MultimediaTimer {
}
void Cancel() {
::ResetEvent(event_);
if (timer_id_) {
::timeKillEvent(timer_id_);
timer_id_ = 0;
@ -153,7 +156,7 @@ class MultimediaTimer {
class TaskQueue::ThreadState {
public:
ThreadState() {}
explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {}
~ThreadState() {}
void RunThreadMain();
@ -180,14 +183,17 @@ class TaskQueue::ThreadState {
greater<DelayedTaskInfo>>
timer_tasks_;
UINT_PTR timer_id_ = 0;
HANDLE in_queue_;
};
TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: thread_(&TaskQueue::ThreadMain,
this,
queue_name,
TaskQueuePriorityToThreadPriority(priority)) {
TaskQueuePriorityToThreadPriority(priority)),
in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
RTC_DCHECK(queue_name);
RTC_DCHECK(in_queue_);
thread_.Start();
Event event(false, false);
ThreadStartupData startup = {&event, this};
@ -203,6 +209,7 @@ TaskQueue::~TaskQueue() {
Sleep(1);
}
thread_.Stop();
::CloseHandle(in_queue_);
}
// static
@ -221,10 +228,9 @@ bool TaskQueue::IsCurrent() const {
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
reinterpret_cast<LPARAM>(task.get()))) {
task.release();
}
rtc::CritScope lock(&pending_lock_);
pending_.push(std::move(task));
::SetEvent(in_queue_);
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
@ -268,42 +274,70 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
return PostTaskAndReply(std::move(task), std::move(reply), Current());
}
void TaskQueue::RunPendingTasks() {
while (true) {
std::unique_ptr<QueuedTask> task;
{
rtc::CritScope lock(&pending_lock_);
if (pending_.empty())
break;
task = std::move(pending_.front());
pending_.pop();
}
if (!task->Run())
task.release();
}
}
// static
void TaskQueue::ThreadMain(void* context) {
ThreadState state;
ThreadState state(static_cast<TaskQueue*>(context)->in_queue_);
state.RunThreadMain();
}
void TaskQueue::ThreadState::RunThreadMain() {
HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ };
while (true) {
// Make sure we do an alertable wait as that's required to allow APCs to run
// (e.g. required for InitializeQueueThread and stopping the thread in
// PlatformThread).
DWORD result = ::MsgWaitForMultipleObjectsEx(
1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
RTC_CHECK_NE(WAIT_FAILED, result);
if (result == (WAIT_OBJECT_0 + 1)) {
if (result == (WAIT_OBJECT_0 + 2)) {
// There are messages in the message queue that need to be handled.
if (!ProcessQueuedMessages())
break;
} else if (result == WAIT_OBJECT_0) {
}
if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() &&
::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) {
// The multimedia timer was signaled.
timer_.Cancel();
RTC_DCHECK(!timer_tasks_.empty());
RunDueTasks();
ScheduleNextTimer();
} else {
RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
}
if (result == (WAIT_OBJECT_0 + 1)) {
::ResetEvent(in_queue_);
TaskQueue::Current()->RunPendingTasks();
}
}
}
bool TaskQueue::ThreadState::ProcessQueuedMessages() {
MSG msg = {};
// To protect against overly busy message queues, we limit the time
// we process tasks to a few milliseconds. If we don't do that, there's
// a chance that timer tasks won't ever run.
static const int kMaxTaskProcessingTimeMs = 500;
auto start = GetTick();
while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
msg.message != WM_QUIT) {
if (!msg.hwnd) {
switch (msg.message) {
// TODO(tommi): Stop using this way of queueing tasks.
case WM_RUN_TASK: {
QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
if (task->Run())
@ -339,6 +373,9 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() {
::TranslateMessage(&msg);
::DispatchMessage(&msg);
}
if (GetTick() > start + kMaxTaskProcessingTimeMs)
break;
}
return msg.message != WM_QUIT;
}