webrtc_m130/webrtc/base/task_queue_libevent.cc
tommi 0f8b403eb5 Introduce a new constructor to PlatformThread.
The new constructor introduces two new changes:

* Support specifying thread priority at construction time.
  - Moving forward, the SetPriority() method will be removed.
* New thread function type.
  - The new type has 'void' as a return type and a polling loop
    inside PlatformThread, is not used.

The old function type is still supported until all places have been moved over.

In this CL, the first steps towards deprecating the old mechanism are taken
by moving parts of the code that were simple to move, over to the new callback
type.

BUG=webrtc:7187

Review-Url: https://codereview.webrtc.org/2708723003
Cr-Commit-Position: refs/heads/master@{#16779}
2017-02-22 19:22:05 +00:00

335 lines
9.8 KiB
C++

/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/base/task_queue.h"
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include "base/third_party/libevent/event.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/task_queue_posix.h"
#include "webrtc/base/timeutils.h"
namespace rtc {
using internal::GetQueuePtrTls;
using internal::AutoSetCurrentQueuePtr;
namespace {
static const char kQuit = 1;
static const char kRunTask = 2;
struct TimerEvent {
explicit TimerEvent(std::unique_ptr<QueuedTask> task)
: task(std::move(task)) {}
~TimerEvent() { event_del(&ev); }
event ev;
std::unique_ptr<QueuedTask> task;
};
bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
RTC_CHECK(flags != -1);
return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
}
// TODO(tommi): This is a hack to support two versions of libevent that we're
// compatible with. The method we really want to call is event_assign(),
// since event_set() has been marked as deprecated (and doesn't accept
// passing event_base__ as a parameter). However, the version of libevent
// that we have in Chromium, doesn't have event_assign(), so we need to call
// event_set() there.
void EventAssign(struct event* ev,
struct event_base* base,
int fd,
short events,
void (*callback)(int, short, void*),
void* arg) {
#if defined(_EVENT2_EVENT_H_)
RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
#else
event_set(ev, fd, events, callback, arg);
RTC_CHECK_EQ(0, event_base_set(base, ev));
#endif
}
} // namespace
struct TaskQueue::QueueContext {
explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
TaskQueue* queue;
bool is_active;
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
};
class TaskQueue::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue)
: task_(std::move(task)),
reply_(std::move(reply)),
reply_queue_(reply_queue) {
reply_queue->PrepareReplyTask(this);
}
~PostAndReplyTask() override {
CritScope lock(&lock_);
if (reply_queue_)
reply_queue_->ReplyTaskDone(this);
}
void OnReplyQueueGone() {
CritScope lock(&lock_);
reply_queue_ = nullptr;
}
private:
bool Run() override {
if (!task_->Run())
task_.release();
CritScope lock(&lock_);
if (reply_queue_)
reply_queue_->PostTask(std::move(reply_));
return true;
}
CriticalSection lock_;
std::unique_ptr<QueuedTask> task_;
std::unique_ptr<QueuedTask> reply_;
TaskQueue* reply_queue_ GUARDED_BY(lock_);
};
class TaskQueue::SetTimerTask : public QueuedTask {
public:
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
: task_(std::move(task)),
milliseconds_(milliseconds),
posted_(Time32()) {}
private:
bool Run() override {
// Compensate for the time that has passed since construction
// and until we got here.
uint32_t post_time = Time32() - posted_;
TaskQueue::Current()->PostDelayedTask(
std::move(task_),
post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
return true;
}
std::unique_ptr<QueuedTask> task_;
const uint32_t milliseconds_;
const uint32_t posted_;
};
TaskQueue::TaskQueue(const char* queue_name)
: event_base_(event_base_new()),
wakeup_event_(new event()),
thread_(&TaskQueue::ThreadMain, this, queue_name) {
RTC_DCHECK(queue_name);
int fds[2];
RTC_CHECK(pipe(fds) == 0);
SetNonBlocking(fds[0]);
SetNonBlocking(fds[1]);
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
EV_READ | EV_PERSIST, OnWakeup, this);
event_add(wakeup_event_.get(), 0);
thread_.Start();
}
TaskQueue::~TaskQueue() {
RTC_DCHECK(!IsCurrent());
struct timespec ts;
char message = kQuit;
while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
// The queue is full, so we have no choice but to wait and retry.
RTC_CHECK_EQ(EAGAIN, errno);
ts.tv_sec = 0;
ts.tv_nsec = 1000000;
nanosleep(&ts, nullptr);
}
thread_.Stop();
event_del(wakeup_event_.get());
close(wakeup_pipe_in_);
close(wakeup_pipe_out_);
wakeup_pipe_in_ = -1;
wakeup_pipe_out_ = -1;
{
// Synchronize against any pending reply tasks that might be running on
// other queues.
CritScope lock(&pending_lock_);
for (auto* reply : pending_replies_)
reply->OnReplyQueueGone();
pending_replies_.clear();
}
event_base_free(event_base_);
}
// static
TaskQueue* TaskQueue::Current() {
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
return ctx ? ctx->queue : nullptr;
}
// static
bool TaskQueue::IsCurrent(const char* queue_name) {
TaskQueue* current = Current();
return current && current->thread_.name().compare(queue_name) == 0;
}
bool TaskQueue::IsCurrent() const {
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
RTC_DCHECK(task.get());
// libevent isn't thread safe. This means that we can't use methods such
// as event_base_once to post tasks to the worker thread from a different
// thread. However, we can use it when posting from the worker thread itself.
if (IsCurrent()) {
if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
task.get(), nullptr) == 0) {
task.release();
}
} else {
QueuedTask* task_id = task.get(); // Only used for comparison.
{
CritScope lock(&pending_lock_);
pending_.push_back(std::move(task));
}
char message = kRunTask;
if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
LOG(WARNING) << "Failed to queue task.";
CritScope lock(&pending_lock_);
pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
return t.get() == task_id;
});
}
}
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
if (IsCurrent()) {
TimerEvent* timer = new TimerEvent(std::move(task));
EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
ctx->pending_timers_.push_back(timer);
timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
event_add(&timer->ev, &tv);
} else {
PostTask(std::unique_ptr<QueuedTask>(
new SetTimerTask(std::move(task), milliseconds)));
}
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
std::unique_ptr<QueuedTask> wrapper_task(
new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
PostTask(std::move(wrapper_task));
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) {
return PostTaskAndReply(std::move(task), std::move(reply), Current());
}
// static
void TaskQueue::ThreadMain(void* context) {
TaskQueue* me = static_cast<TaskQueue*>(context);
QueueContext queue_context(me);
pthread_setspecific(GetQueuePtrTls(), &queue_context);
while (queue_context.is_active)
event_base_loop(me->event_base_, 0);
pthread_setspecific(GetQueuePtrTls(), nullptr);
for (TimerEvent* timer : queue_context.pending_timers_)
delete timer;
}
// static
void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
char buf;
RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
switch (buf) {
case kQuit:
ctx->is_active = false;
event_base_loopbreak(ctx->queue->event_base_);
break;
case kRunTask: {
std::unique_ptr<QueuedTask> task;
{
CritScope lock(&ctx->queue->pending_lock_);
RTC_DCHECK(!ctx->queue->pending_.empty());
task = std::move(ctx->queue->pending_.front());
ctx->queue->pending_.pop_front();
RTC_DCHECK(task.get());
}
if (!task->Run())
task.release();
break;
}
default:
RTC_NOTREACHED();
break;
}
}
// static
void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
auto* task = static_cast<QueuedTask*>(context);
if (task->Run())
delete task;
}
// static
void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
TimerEvent* timer = static_cast<TimerEvent*>(context);
if (!timer->task->Run())
timer->task.release();
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
ctx->pending_timers_.remove(timer);
delete timer;
}
void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
pending_replies_.push_back(reply_task);
}
void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
CritScope lock(&pending_lock_);
pending_replies_.remove(reply_task);
}
} // namespace rtc