diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h index 92a1c9477a..7d0d41437d 100644 --- a/webrtc/base/task_queue.h +++ b/webrtc/base/task_queue.h @@ -161,8 +161,16 @@ static std::unique_ptr NewClosure(const Closure& closure, // so assumptions about lifetimes of pending tasks should not be made. class LOCKABLE TaskQueue { public: - explicit TaskQueue(const char* queue_name); - // TODO(tommi): Implement move semantics? + // TaskQueue priority levels. On some platforms these will map to thread + // priorities, on others such as Mac and iOS, GCD queue priorities. + enum class Priority { + NORMAL = 0, + HIGH, + LOW, + }; + + explicit TaskQueue(const char* queue_name, + Priority priority = Priority::NORMAL); ~TaskQueue(); static TaskQueue* Current(); @@ -275,8 +283,11 @@ class LOCKABLE TaskQueue { class WorkerThread : public PlatformThread { public: - WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name) - : PlatformThread(func, obj, thread_name) {} + WorkerThread(ThreadRunFunction func, + void* obj, + const char* thread_name, + ThreadPriority priority) + : PlatformThread(func, obj, thread_name, priority) {} bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { return PlatformThread::QueueAPC(apc_function, data); diff --git a/webrtc/base/task_queue_gcd.cc b/webrtc/base/task_queue_gcd.cc index d5a4a6c1c6..296da16fdb 100644 --- a/webrtc/base/task_queue_gcd.cc +++ b/webrtc/base/task_queue_gcd.cc @@ -21,6 +21,22 @@ #include "webrtc/base/task_queue_posix.h" namespace rtc { +namespace { + +using Priority = TaskQueue::Priority; + +int TaskQueuePriorityToGCD(Priority priority) { + switch (priority) { + case Priority::NORMAL: + return DISPATCH_QUEUE_PRIORITY_DEFAULT; + case Priority::HIGH: + return DISPATCH_QUEUE_PRIORITY_HIGH; + case Priority::LOW: + return DISPATCH_QUEUE_PRIORITY_LOW; + } +} +} + using internal::GetQueuePtrTls; using internal::AutoSetCurrentQueuePtr; @@ -94,7 +110,7 @@ struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext { dispatch_queue_t reply_queue_; }; -TaskQueue::TaskQueue(const char* queue_name) +TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), context_(new QueueContext(this)) { RTC_DCHECK(queue_name); @@ -104,6 +120,9 @@ TaskQueue::TaskQueue(const char* queue_name) // to the queue is released. This may run after the TaskQueue object has // been deleted. dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext); + + dispatch_set_target_queue( + queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0)); } TaskQueue::~TaskQueue() { diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc index c6ce5b3baf..1376ea3f70 100644 --- a/webrtc/base/task_queue_libevent.cc +++ b/webrtc/base/task_queue_libevent.cc @@ -30,6 +30,8 @@ static const char kQuit = 1; static const char kRunTask = 2; static const char kRunReplyTask = 3; +using Priority = TaskQueue::Priority; + // This ignores the SIGPIPE signal on the calling thread. // This signal can be fired when trying to write() to a pipe that's being // closed or while closing a pipe that's being written to. @@ -84,6 +86,21 @@ void EventAssign(struct event* ev, RTC_CHECK_EQ(0, event_base_set(base, ev)); #endif } + +ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { + switch (priority) { + case Priority::HIGH: + return kRealtimePriority; + case Priority::LOW: + return kLowPriority; + case Priority::NORMAL: + return kNormalPriority; + default: + RTC_NOTREACHED(); + break; + } + return kNormalPriority; +} } // namespace struct TaskQueue::QueueContext { @@ -201,10 +218,13 @@ class TaskQueue::SetTimerTask : public QueuedTask { const uint32_t posted_; }; -TaskQueue::TaskQueue(const char* queue_name) +TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) : event_base_(event_base_new()), wakeup_event_(new event()), - thread_(&TaskQueue::ThreadMain, this, queue_name) { + thread_(&TaskQueue::ThreadMain, + this, + queue_name, + TaskQueuePriorityToThreadPriority(priority)) { RTC_DCHECK(queue_name); int fds[2]; RTC_CHECK(pipe(fds) == 0); diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc index c8ef7211e8..5850b2947e 100644 --- a/webrtc/base/task_queue_win.cc +++ b/webrtc/base/task_queue_win.cc @@ -24,6 +24,8 @@ namespace { #define WM_RUN_TASK WM_USER + 1 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 +using Priority = TaskQueue::Priority; + DWORD g_queue_ptr_tls = 0; BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { @@ -49,6 +51,21 @@ void CALLBACK InitializeQueueThread(ULONG_PTR param) { ::TlsSetValue(GetQueuePtrTls(), data->thread_context); data->started->Set(); } + +ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { + switch (priority) { + case Priority::HIGH: + return kRealtimePriority; + case Priority::LOW: + return kLowPriority; + case Priority::NORMAL: + return kNormalPriority; + default: + RTC_NOTREACHED(); + break; + } + return kNormalPriority; +} } // namespace class TaskQueue::MultimediaTimer { @@ -145,8 +162,11 @@ class TaskQueue::MultimediaTimer { RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); }; -TaskQueue::TaskQueue(const char* queue_name) - : thread_(&TaskQueue::ThreadMain, this, queue_name) { +TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) + : thread_(&TaskQueue::ThreadMain, + this, + queue_name, + TaskQueuePriorityToThreadPriority(priority)) { RTC_DCHECK(queue_name); thread_.Start(); Event event(false, false);