Delete unused TaskRunner abstraction.

This is the fifth and final step in the process started in cl https://codereview.webrtc.org/2696703009/

Depends on the landing of a copy of this code in Chrome (step 4), cl
https://codereview.chromium.org/2694903005/

BUG=webrtc:6424

Review-Url: https://codereview.webrtc.org/2622923002
Cr-Commit-Position: refs/heads/master@{#17107}
This commit is contained in:
nisse 2017-03-08 01:45:05 -08:00 committed by Commit bot
parent 34b7a91862
commit 2d15fdd91b
8 changed files with 0 additions and 1493 deletions

View File

@ -348,19 +348,6 @@ config("rtc_base_warnings_config") {
}
}
# TODO(nisse): This target is temporarily split out, to aid moving the
# code over to Chrome. It should be deleted from webrtc soon.
rtc_static_library("rtc_task_runner") {
sources = [
"task.cc",
"task.h",
"taskparent.cc",
"taskparent.h",
"taskrunner.cc",
"taskrunner.h",
]
}
rtc_static_library("rtc_base") {
cflags = []
cflags_cc = []
@ -861,7 +848,6 @@ if (rtc_include_tests) {
"sigslot_unittest.cc",
"sigslottester_unittest.cc",
"stream_unittest.cc",
"task_unittest.cc",
"testclient_unittest.cc",
"thread_unittest.cc",
]
@ -883,7 +869,6 @@ if (rtc_include_tests) {
}
deps = [
":rtc_base_tests_main",
":rtc_task_runner",
]
public_deps = [
":rtc_base",

View File

@ -1,283 +0,0 @@
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/base/task.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/taskrunner.h"
namespace rtc {
int32_t Task::unique_id_seed_ = 0;
Task::Task(TaskParent *parent)
: TaskParent(this, parent),
state_(STATE_INIT),
blocked_(false),
done_(false),
aborted_(false),
busy_(false),
error_(false),
start_time_(0),
timeout_time_(0),
timeout_seconds_(0),
timeout_suspended_(false) {
unique_id_ = unique_id_seed_++;
// sanity check that we didn't roll-over our id seed
RTC_DCHECK(unique_id_ < unique_id_seed_);
}
Task::~Task() {
// Is this task being deleted in the correct manner?
#if RTC_DCHECK_IS_ON
RTC_DCHECK(!done_ || GetRunner()->is_ok_to_delete(this));
#endif
RTC_DCHECK(state_ == STATE_INIT || done_);
RTC_DCHECK(state_ == STATE_INIT || blocked_);
// If the task is being deleted without being done, it
// means that it hasn't been removed from its parent.
// This happens if a task is deleted outside of TaskRunner.
if (!done_) {
Stop();
}
}
int64_t Task::CurrentTime() {
return GetRunner()->CurrentTime();
}
int64_t Task::ElapsedTime() {
return CurrentTime() - start_time_;
}
void Task::Start() {
if (state_ != STATE_INIT)
return;
// Set the start time before starting the task. Otherwise if the task
// finishes quickly and deletes the Task object, setting start_time_
// will crash.
start_time_ = CurrentTime();
GetRunner()->StartTask(this);
}
void Task::Step() {
if (done_) {
#if RTC_DCHECK_IS_ON
// we do not know how !blocked_ happens when done_ - should be impossible.
// But it causes problems, so in retail build, we force blocked_, and
// under debug we assert.
RTC_DCHECK(blocked_);
#else
blocked_ = true;
#endif
return;
}
// Async Error() was called
if (error_) {
done_ = true;
state_ = STATE_ERROR;
blocked_ = true;
// obsolete - an errored task is not considered done now
// SignalDone();
Stop();
#if RTC_DCHECK_IS_ON
// verify that stop removed this from its parent
RTC_DCHECK(!parent()->IsChildTask(this));
#endif
return;
}
busy_ = true;
int new_state = Process(state_);
busy_ = false;
if (aborted_) {
Abort(true); // no need to wake because we're awake
return;
}
if (new_state == STATE_BLOCKED) {
blocked_ = true;
// Let the timeout continue
} else {
state_ = new_state;
blocked_ = false;
ResetTimeout();
}
if (new_state == STATE_DONE) {
done_ = true;
} else if (new_state == STATE_ERROR) {
done_ = true;
error_ = true;
}
if (done_) {
// obsolete - call this yourself
// SignalDone();
Stop();
#if RTC_DCHECK_IS_ON
// verify that stop removed this from its parent
RTC_DCHECK(!parent()->IsChildTask(this));
#endif
blocked_ = true;
}
}
void Task::Abort(bool nowake) {
// Why only check for done_ (instead of "aborted_ || done_")?
//
// If aborted_ && !done_, it means the logic for aborting still
// needs to be executed (because busy_ must have been true when
// Abort() was previously called).
if (done_)
return;
aborted_ = true;
if (!busy_) {
done_ = true;
blocked_ = true;
error_ = true;
// "done_" is set before calling "Stop()" to ensure that this code
// doesn't execute more than once (recursively) for the same task.
Stop();
#if RTC_DCHECK_IS_ON
// verify that stop removed this from its parent
RTC_DCHECK(!parent()->IsChildTask(this));
#endif
if (!nowake) {
// WakeTasks to self-delete.
// Don't call Wake() because it is a no-op after "done_" is set.
// Even if Wake() did run, it clears "blocked_" which isn't desireable.
GetRunner()->WakeTasks();
}
}
}
void Task::Wake() {
if (done_)
return;
if (blocked_) {
blocked_ = false;
GetRunner()->WakeTasks();
}
}
void Task::Error() {
if (error_ || done_)
return;
error_ = true;
Wake();
}
std::string Task::GetStateName(int state) const {
switch (state) {
case STATE_BLOCKED: return "BLOCKED";
case STATE_INIT: return "INIT";
case STATE_START: return "START";
case STATE_DONE: return "DONE";
case STATE_ERROR: return "ERROR";
case STATE_RESPONSE: return "RESPONSE";
}
return "??";
}
int Task::Process(int state) {
int newstate = STATE_ERROR;
if (TimedOut()) {
ClearTimeout();
newstate = OnTimeout();
SignalTimeout();
} else {
switch (state) {
case STATE_INIT:
newstate = STATE_START;
break;
case STATE_START:
newstate = ProcessStart();
break;
case STATE_RESPONSE:
newstate = ProcessResponse();
break;
case STATE_DONE:
case STATE_ERROR:
newstate = STATE_BLOCKED;
break;
}
}
return newstate;
}
void Task::Stop() {
// No need to wake because we're either awake or in abort
TaskParent::OnStopped(this);
}
int Task::ProcessResponse() {
return STATE_DONE;
}
void Task::set_timeout_seconds(const int timeout_seconds) {
timeout_seconds_ = timeout_seconds;
ResetTimeout();
}
bool Task::TimedOut() {
return timeout_seconds_ &&
timeout_time_ &&
CurrentTime() >= timeout_time_;
}
void Task::ResetTimeout() {
int64_t previous_timeout_time = timeout_time_;
bool timeout_allowed = (state_ != STATE_INIT)
&& (state_ != STATE_DONE)
&& (state_ != STATE_ERROR);
if (timeout_seconds_ && timeout_allowed && !timeout_suspended_)
timeout_time_ = CurrentTime() +
(timeout_seconds_ * kSecToMsec * kMsecTo100ns);
else
timeout_time_ = 0;
GetRunner()->UpdateTaskTimeout(this, previous_timeout_time);
}
void Task::ClearTimeout() {
int64_t previous_timeout_time = timeout_time_;
timeout_time_ = 0;
GetRunner()->UpdateTaskTimeout(this, previous_timeout_time);
}
void Task::SuspendTimeout() {
if (!timeout_suspended_) {
timeout_suspended_ = true;
ResetTimeout();
}
}
void Task::ResumeTimeout() {
if (timeout_suspended_) {
timeout_suspended_ = false;
ResetTimeout();
}
}
int Task::OnTimeout() {
// by default, we are finished after timing out
return STATE_DONE;
}
} // namespace rtc

View File

@ -1,175 +0,0 @@
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_BASE_TASK_H__
#define WEBRTC_BASE_TASK_H__
#include <stdint.h>
#include <string>
#include "webrtc/base/sigslot.h"
#include "webrtc/base/taskparent.h"
/////////////////////////////////////////////////////////////////////
//
// TASK
//
/////////////////////////////////////////////////////////////////////
//
// Task is a state machine infrastructure. States are pushed forward by
// pushing forwards a TaskRunner that holds on to all Tasks. The purpose
// of Task is threefold:
//
// (1) It manages ongoing work on the UI thread. Multitasking without
// threads, keeping it easy, keeping it real. :-) It does this by
// organizing a set of states for each task. When you return from your
// Process*() function, you return an integer for the next state. You do
// not go onto the next state yourself. Every time you enter a state,
// you check to see if you can do anything yet. If not, you return
// STATE_BLOCKED. If you _could_ do anything, do not return
// STATE_BLOCKED - even if you end up in the same state, return
// STATE_mysamestate. When you are done, return STATE_DONE and then the
// task will self-delete sometime afterwards.
//
// (2) It helps you avoid all those reentrancy problems when you chain
// too many triggers on one thread. Basically if you want to tell a task
// to process something for you, you feed your task some information and
// then you Wake() it. Don't tell it to process it right away. If it
// might be working on something as you send it information, you may want
// to have a queue in the task.
//
// (3) Finally it helps manage parent tasks and children. If a parent
// task gets aborted, all the children tasks are too. The nice thing
// about this, for example, is if you have one parent task that
// represents, say, and Xmpp connection, then you can spawn a whole bunch
// of infinite lifetime child tasks and now worry about cleaning them up.
// When the parent task goes to STATE_DONE, the task engine will make
// sure all those children are aborted and get deleted.
//
// Notice that Task has a few built-in states, e.g.,
//
// STATE_INIT - the task isn't running yet
// STATE_START - the task is in its first state
// STATE_RESPONSE - the task is in its second state
// STATE_DONE - the task is done
//
// STATE_ERROR - indicates an error - we should audit the error code in
// light of any usage of it to see if it should be improved. When I
// first put down the task stuff I didn't have a good sense of what was
// needed for Abort and Error, and now the subclasses of Task will ground
// the design in a stronger way.
//
// STATE_NEXT - the first undefined state number. (like WM_USER) - you
// can start defining more task states there.
//
// When you define more task states, just override Process(int state) and
// add your own switch statement. If you want to delegate to
// Task::Process, you can effectively delegate to its switch statement.
// No fancy method pointers or such - this is all just pretty low tech,
// easy to debug, and fast.
//
// Also notice that Task has some primitive built-in timeout functionality.
//
// A timeout is defined as "the task stays in STATE_BLOCKED longer than
// timeout_seconds_."
//
// Descendant classes can override this behavior by calling the
// various protected methods to change the timeout behavior. For
// instance, a descendand might call SuspendTimeout() when it knows
// that it isn't waiting for anything that might timeout, but isn't
// yet in the STATE_DONE state.
//
namespace rtc {
// Executes a sequence of steps
class Task : public TaskParent {
public:
Task(TaskParent *parent);
~Task() override;
int32_t unique_id() { return unique_id_; }
void Start();
void Step();
int GetState() const { return state_; }
bool HasError() const { return (GetState() == STATE_ERROR); }
bool Blocked() const { return blocked_; }
bool IsDone() const { return done_; }
int64_t ElapsedTime();
// Called from outside to stop task without any more callbacks
void Abort(bool nowake = false);
bool TimedOut();
int64_t timeout_time() const { return timeout_time_; }
int timeout_seconds() const { return timeout_seconds_; }
void set_timeout_seconds(int timeout_seconds);
sigslot::signal0<> SignalTimeout;
// Called inside the task to signal that the task may be unblocked
void Wake();
protected:
enum {
STATE_BLOCKED = -1,
STATE_INIT = 0,
STATE_START = 1,
STATE_DONE = 2,
STATE_ERROR = 3,
STATE_RESPONSE = 4,
STATE_NEXT = 5, // Subclasses which need more states start here and higher
};
// Called inside to advise that the task should wake and signal an error
void Error();
int64_t CurrentTime();
virtual std::string GetStateName(int state) const;
virtual int Process(int state);
virtual void Stop();
virtual int ProcessStart() = 0;
virtual int ProcessResponse();
void ResetTimeout();
void ClearTimeout();
void SuspendTimeout();
void ResumeTimeout();
protected:
virtual int OnTimeout();
private:
void Done();
int state_;
bool blocked_;
bool done_;
bool aborted_;
bool busy_;
bool error_;
int64_t start_time_;
int64_t timeout_time_;
int timeout_seconds_;
bool timeout_suspended_;
int32_t unique_id_;
static int32_t unique_id_seed_;
};
} // namespace rtc
#endif // WEBRTC_BASE_TASK_H__

View File

@ -1,542 +0,0 @@
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#if defined(WEBRTC_POSIX)
#include <sys/time.h>
#endif // WEBRTC_POSIX
// TODO: Remove this once the cause of sporadic failures in these
// tests is tracked down.
#include <iostream>
#if defined(WEBRTC_WIN)
#include "webrtc/base/win32.h"
#endif // WEBRTC_WIN
#include "webrtc/base/arraysize.h"
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/gunit.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/task.h"
#include "webrtc/base/taskrunner.h"
#include "webrtc/base/thread.h"
#include "webrtc/base/timeutils.h"
namespace rtc {
static int64_t GetCurrentTime() {
return TimeMillis() * 10000;
}
// feel free to change these numbers. Note that '0' won't work, though
#define STUCK_TASK_COUNT 5
#define HAPPY_TASK_COUNT 20
// this is a generic timeout task which, when it signals timeout, will
// include the unique ID of the task in the signal (we don't use this
// in production code because we haven't yet had occasion to generate
// an array of the same types of task)
class IdTimeoutTask : public Task, public sigslot::has_slots<> {
public:
explicit IdTimeoutTask(TaskParent *parent) : Task(parent) {
SignalTimeout.connect(this, &IdTimeoutTask::OnLocalTimeout);
}
sigslot::signal1<const int> SignalTimeoutId;
sigslot::signal1<const int> SignalDoneId;
virtual int ProcessStart() {
return STATE_RESPONSE;
}
void OnLocalTimeout() {
SignalTimeoutId(unique_id());
}
protected:
virtual void Stop() {
SignalDoneId(unique_id());
Task::Stop();
}
};
class StuckTask : public IdTimeoutTask {
public:
explicit StuckTask(TaskParent *parent) : IdTimeoutTask(parent) {}
virtual int ProcessStart() {
return STATE_BLOCKED;
}
};
class HappyTask : public IdTimeoutTask {
public:
explicit HappyTask(TaskParent *parent) : IdTimeoutTask(parent) {
time_to_perform_ = rand() % (STUCK_TASK_COUNT / 2);
}
virtual int ProcessStart() {
if (ElapsedTime() > (time_to_perform_ * 1000 * 10000))
return STATE_RESPONSE;
else
return STATE_BLOCKED;
}
private:
int time_to_perform_;
};
// simple implementation of a task runner which uses Windows'
// GetSystemTimeAsFileTime() to get the current clock ticks
class MyTaskRunner : public TaskRunner {
public:
virtual void WakeTasks() { RunTasks(); }
virtual int64_t CurrentTime() { return GetCurrentTime(); }
bool timeout_change() const {
return timeout_change_;
}
void clear_timeout_change() {
timeout_change_ = false;
}
protected:
virtual void OnTimeoutChange() {
timeout_change_ = true;
}
bool timeout_change_;
};
//
// this unit test is primarily concerned (for now) with the timeout
// functionality in tasks. It works as follows:
//
// * Create a bunch of tasks, some "stuck" (ie., guaranteed to timeout)
// and some "happy" (will immediately finish).
// * Set the timeout on the "stuck" tasks to some number of seconds between
// 1 and the number of stuck tasks
// * Start all the stuck & happy tasks in random order
// * Wait "number of stuck tasks" seconds and make sure everything timed out
class TaskTest : public sigslot::has_slots<> {
public:
TaskTest() {}
// no need to delete any tasks; the task runner owns them
~TaskTest() {}
void Start() {
// create and configure tasks
for (int i = 0; i < STUCK_TASK_COUNT; ++i) {
stuck_[i].task_ = new StuckTask(&task_runner_);
stuck_[i].task_->SignalTimeoutId.connect(this,
&TaskTest::OnTimeoutStuck);
stuck_[i].timed_out_ = false;
stuck_[i].xlat_ = stuck_[i].task_->unique_id();
stuck_[i].task_->set_timeout_seconds(i + 1);
LOG(LS_INFO) << "Task " << stuck_[i].xlat_ << " created with timeout "
<< stuck_[i].task_->timeout_seconds();
}
for (int i = 0; i < HAPPY_TASK_COUNT; ++i) {
happy_[i].task_ = new HappyTask(&task_runner_);
happy_[i].task_->SignalTimeoutId.connect(this,
&TaskTest::OnTimeoutHappy);
happy_[i].task_->SignalDoneId.connect(this,
&TaskTest::OnDoneHappy);
happy_[i].timed_out_ = false;
happy_[i].xlat_ = happy_[i].task_->unique_id();
}
// start all the tasks in random order
int stuck_index = 0;
int happy_index = 0;
for (int i = 0; i < STUCK_TASK_COUNT + HAPPY_TASK_COUNT; ++i) {
if ((stuck_index < STUCK_TASK_COUNT) &&
(happy_index < HAPPY_TASK_COUNT)) {
if (rand() % 2 == 1) {
stuck_[stuck_index++].task_->Start();
} else {
happy_[happy_index++].task_->Start();
}
} else if (stuck_index < STUCK_TASK_COUNT) {
stuck_[stuck_index++].task_->Start();
} else {
happy_[happy_index++].task_->Start();
}
}
for (int i = 0; i < STUCK_TASK_COUNT; ++i) {
std::cout << "Stuck task #" << i << " timeout is " <<
stuck_[i].task_->timeout_seconds() << " at " <<
stuck_[i].task_->timeout_time() << std::endl;
}
// just a little self-check to make sure we started all the tasks
ASSERT_EQ(STUCK_TASK_COUNT, stuck_index);
ASSERT_EQ(HAPPY_TASK_COUNT, happy_index);
// run the unblocked tasks
LOG(LS_INFO) << "Running tasks";
task_runner_.RunTasks();
std::cout << "Start time is " << GetCurrentTime() << std::endl;
// give all the stuck tasks time to timeout
for (int i = 0; !task_runner_.AllChildrenDone() && i < STUCK_TASK_COUNT;
++i) {
Thread::Current()->ProcessMessages(1000);
for (int j = 0; j < HAPPY_TASK_COUNT; ++j) {
if (happy_[j].task_) {
happy_[j].task_->Wake();
}
}
LOG(LS_INFO) << "Polling tasks";
task_runner_.PollTasks();
}
// We see occasional test failures here due to the stuck tasks not having
// timed-out yet, which seems like it should be impossible. To help track
// this down we have added logging of the timing information, which we send
// directly to stdout so that we get it in opt builds too.
std::cout << "End time is " << GetCurrentTime() << std::endl;
}
void OnTimeoutStuck(const int id) {
LOG(LS_INFO) << "Timed out task " << id;
int i;
for (i = 0; i < STUCK_TASK_COUNT; ++i) {
if (stuck_[i].xlat_ == id) {
stuck_[i].timed_out_ = true;
stuck_[i].task_ = nullptr;
break;
}
}
// getting a bad ID here is a failure, but let's continue
// running to see what else might go wrong
EXPECT_LT(i, STUCK_TASK_COUNT);
}
void OnTimeoutHappy(const int id) {
int i;
for (i = 0; i < HAPPY_TASK_COUNT; ++i) {
if (happy_[i].xlat_ == id) {
happy_[i].timed_out_ = true;
happy_[i].task_ = nullptr;
break;
}
}
// getting a bad ID here is a failure, but let's continue
// running to see what else might go wrong
EXPECT_LT(i, HAPPY_TASK_COUNT);
}
void OnDoneHappy(const int id) {
int i;
for (i = 0; i < HAPPY_TASK_COUNT; ++i) {
if (happy_[i].xlat_ == id) {
happy_[i].task_ = nullptr;
break;
}
}
// getting a bad ID here is a failure, but let's continue
// running to see what else might go wrong
EXPECT_LT(i, HAPPY_TASK_COUNT);
}
void check_passed() {
EXPECT_TRUE(task_runner_.AllChildrenDone());
// make sure none of our happy tasks timed out
for (int i = 0; i < HAPPY_TASK_COUNT; ++i) {
EXPECT_FALSE(happy_[i].timed_out_);
}
// make sure all of our stuck tasks timed out
for (int i = 0; i < STUCK_TASK_COUNT; ++i) {
EXPECT_TRUE(stuck_[i].timed_out_);
if (!stuck_[i].timed_out_) {
std::cout << "Stuck task #" << i << " timeout is at "
<< stuck_[i].task_->timeout_time() << std::endl;
}
}
std::cout.flush();
}
private:
struct TaskInfo {
IdTimeoutTask *task_;
bool timed_out_;
int xlat_;
};
MyTaskRunner task_runner_;
TaskInfo stuck_[STUCK_TASK_COUNT];
TaskInfo happy_[HAPPY_TASK_COUNT];
};
TEST(start_task_test, Timeout) {
TaskTest task_test;
task_test.Start();
task_test.check_passed();
}
// Test for aborting the task while it is running
class AbortTask : public Task {
public:
explicit AbortTask(TaskParent *parent) : Task(parent) {
set_timeout_seconds(1);
}
virtual int ProcessStart() {
Abort();
return STATE_NEXT;
}
private:
RTC_DISALLOW_COPY_AND_ASSIGN(AbortTask);
};
class TaskAbortTest : public sigslot::has_slots<> {
public:
TaskAbortTest() {}
// no need to delete any tasks; the task runner owns them
~TaskAbortTest() {}
void Start() {
Task *abort_task = new AbortTask(&task_runner_);
abort_task->SignalTimeout.connect(this, &TaskAbortTest::OnTimeout);
abort_task->Start();
// run the task
task_runner_.RunTasks();
}
private:
void OnTimeout() {
FAIL() << "Task timed out instead of aborting.";
}
MyTaskRunner task_runner_;
RTC_DISALLOW_COPY_AND_ASSIGN(TaskAbortTest);
};
TEST(start_task_test, Abort) {
TaskAbortTest abort_test;
abort_test.Start();
}
// Test for aborting a task to verify that it does the Wake operation
// which gets it deleted.
class SetBoolOnDeleteTask : public Task {
public:
SetBoolOnDeleteTask(TaskParent *parent, bool *set_when_deleted)
: Task(parent),
set_when_deleted_(set_when_deleted) {
EXPECT_TRUE(nullptr != set_when_deleted);
EXPECT_FALSE(*set_when_deleted);
}
virtual ~SetBoolOnDeleteTask() {
*set_when_deleted_ = true;
}
virtual int ProcessStart() {
return STATE_BLOCKED;
}
private:
bool* set_when_deleted_;
RTC_DISALLOW_COPY_AND_ASSIGN(SetBoolOnDeleteTask);
};
class AbortShouldWakeTest : public sigslot::has_slots<> {
public:
AbortShouldWakeTest() {}
// no need to delete any tasks; the task runner owns them
~AbortShouldWakeTest() {}
void Start() {
bool task_deleted = false;
Task *task_to_abort = new SetBoolOnDeleteTask(&task_runner_, &task_deleted);
task_to_abort->Start();
// Task::Abort() should call TaskRunner::WakeTasks(). WakeTasks calls
// TaskRunner::RunTasks() immediately which should delete the task.
task_to_abort->Abort();
EXPECT_TRUE(task_deleted);
if (!task_deleted) {
// avoid a crash (due to referencing a local variable)
// if the test fails.
task_runner_.RunTasks();
}
}
private:
void OnTimeout() {
FAIL() << "Task timed out instead of aborting.";
}
MyTaskRunner task_runner_;
RTC_DISALLOW_COPY_AND_ASSIGN(AbortShouldWakeTest);
};
TEST(start_task_test, AbortShouldWake) {
AbortShouldWakeTest abort_should_wake_test;
abort_should_wake_test.Start();
}
// Validate that TaskRunner's OnTimeoutChange gets called appropriately
// * When a task calls UpdateTaskTimeout
// * When the next timeout task time, times out
class TimeoutChangeTest : public sigslot::has_slots<> {
public:
TimeoutChangeTest()
: task_count_(arraysize(stuck_tasks_)) {}
// no need to delete any tasks; the task runner owns them
~TimeoutChangeTest() {}
void Start() {
for (int i = 0; i < task_count_; ++i) {
stuck_tasks_[i] = new StuckTask(&task_runner_);
stuck_tasks_[i]->set_timeout_seconds(i + 2);
stuck_tasks_[i]->SignalTimeoutId.connect(this,
&TimeoutChangeTest::OnTimeoutId);
}
for (int i = task_count_ - 1; i >= 0; --i) {
stuck_tasks_[i]->Start();
}
task_runner_.clear_timeout_change();
// At this point, our timeouts are set as follows
// task[0] is 2 seconds, task[1] at 3 seconds, etc.
stuck_tasks_[0]->set_timeout_seconds(2);
// Now, task[0] is 2 seconds, task[1] at 3 seconds...
// so timeout change shouldn't be called.
EXPECT_FALSE(task_runner_.timeout_change());
task_runner_.clear_timeout_change();
stuck_tasks_[0]->set_timeout_seconds(1);
// task[0] is 1 seconds, task[1] at 3 seconds...
// The smallest timeout got smaller so timeout change be called.
EXPECT_TRUE(task_runner_.timeout_change());
task_runner_.clear_timeout_change();
stuck_tasks_[1]->set_timeout_seconds(2);
// task[0] is 1 seconds, task[1] at 2 seconds...
// The smallest timeout is still 1 second so no timeout change.
EXPECT_FALSE(task_runner_.timeout_change());
task_runner_.clear_timeout_change();
while (task_count_ > 0) {
int previous_count = task_count_;
task_runner_.PollTasks();
if (previous_count != task_count_) {
// We only get here when a task times out. When that
// happens, the timeout change should get called because
// the smallest timeout is now in the past.
EXPECT_TRUE(task_runner_.timeout_change());
task_runner_.clear_timeout_change();
}
Thread::Current()->socketserver()->Wait(500, false);
}
}
private:
void OnTimeoutId(const int id) {
for (size_t i = 0; i < arraysize(stuck_tasks_); ++i) {
if (stuck_tasks_[i] && stuck_tasks_[i]->unique_id() == id) {
task_count_--;
stuck_tasks_[i] = nullptr;
break;
}
}
}
MyTaskRunner task_runner_;
StuckTask* (stuck_tasks_[3]);
int task_count_;
RTC_DISALLOW_COPY_AND_ASSIGN(TimeoutChangeTest);
};
TEST(start_task_test, TimeoutChange) {
TimeoutChangeTest timeout_change_test;
timeout_change_test.Start();
}
class DeleteTestTaskRunner : public TaskRunner {
public:
DeleteTestTaskRunner() {
}
virtual void WakeTasks() { }
virtual int64_t CurrentTime() { return GetCurrentTime(); }
private:
RTC_DISALLOW_COPY_AND_ASSIGN(DeleteTestTaskRunner);
};
TEST(unstarted_task_test, DeleteTask) {
// This test ensures that we don't
// crash if a task is deleted without running it.
DeleteTestTaskRunner task_runner;
HappyTask* happy_task = new HappyTask(&task_runner);
happy_task->Start();
// try deleting the task directly
HappyTask* child_happy_task = new HappyTask(happy_task);
delete child_happy_task;
// run the unblocked tasks
task_runner.RunTasks();
}
TEST(unstarted_task_test, DoNotDeleteTask1) {
// This test ensures that we don't
// crash if a task runner is deleted without
// running a certain task.
DeleteTestTaskRunner task_runner;
HappyTask* happy_task = new HappyTask(&task_runner);
happy_task->Start();
HappyTask* child_happy_task = new HappyTask(happy_task);
child_happy_task->Start();
// Never run the tasks
}
TEST(unstarted_task_test, DoNotDeleteTask2) {
// This test ensures that we don't
// crash if a taskrunner is delete with a
// task that has never been started.
DeleteTestTaskRunner task_runner;
HappyTask* happy_task = new HappyTask(&task_runner);
happy_task->Start();
// Do not start the task.
// Note: this leaks memory, so don't do this.
// Instead, always run your tasks or delete them.
new HappyTask(happy_task);
// run the unblocked tasks
task_runner.RunTasks();
}
} // namespace rtc

View File

@ -1,97 +0,0 @@
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <algorithm>
#include "webrtc/base/taskparent.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/task.h"
#include "webrtc/base/taskrunner.h"
namespace rtc {
TaskParent::TaskParent(Task* derived_instance, TaskParent *parent)
: parent_(parent) {
RTC_DCHECK(derived_instance != nullptr);
RTC_DCHECK(parent != nullptr);
runner_ = parent->GetRunner();
parent_->AddChild(derived_instance);
Initialize();
}
TaskParent::TaskParent(TaskRunner* derived_instance)
: parent_(nullptr), runner_(derived_instance) {
RTC_DCHECK(derived_instance != nullptr);
Initialize();
}
TaskParent::~TaskParent() = default;
// Does common initialization of member variables
void TaskParent::Initialize() {
children_.reset(new ChildSet());
child_error_ = false;
}
void TaskParent::AddChild(Task *child) {
children_->insert(child);
}
#if RTC_DCHECK_IS_ON
bool TaskParent::IsChildTask(Task *task) {
RTC_DCHECK(task != nullptr);
return task->parent_ == this && children_->find(task) != children_->end();
}
#endif
bool TaskParent::AllChildrenDone() {
for (ChildSet::iterator it = children_->begin();
it != children_->end();
++it) {
if (!(*it)->IsDone())
return false;
}
return true;
}
bool TaskParent::AnyChildError() {
return child_error_;
}
void TaskParent::AbortAllChildren() {
if (children_->size() > 0) {
#if RTC_DCHECK_IS_ON
runner_->IncrementAbortCount();
#endif
ChildSet copy = *children_;
for (ChildSet::iterator it = copy.begin(); it != copy.end(); ++it) {
(*it)->Abort(true); // Note we do not wake
}
#if RTC_DCHECK_IS_ON
runner_->DecrementAbortCount();
#endif
}
}
void TaskParent::OnStopped(Task *task) {
AbortAllChildren();
parent_->OnChildStopped(task);
}
void TaskParent::OnChildStopped(Task *child) {
if (child->HasError())
child_error_ = true;
children_->erase(child);
}
} // namespace rtc

View File

@ -1,63 +0,0 @@
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_BASE_TASKPARENT_H__
#define WEBRTC_BASE_TASKPARENT_H__
#include <memory>
#include <set>
#include "webrtc/base/checks.h"
#include "webrtc/base/constructormagic.h"
namespace rtc {
class Task;
class TaskRunner;
class TaskParent {
public:
TaskParent(Task *derived_instance, TaskParent *parent);
explicit TaskParent(TaskRunner *derived_instance);
virtual ~TaskParent();
TaskParent *GetParent() { return parent_; }
TaskRunner *GetRunner() { return runner_; }
bool AllChildrenDone();
bool AnyChildError();
#if RTC_DCHECK_IS_ON
bool IsChildTask(Task *task);
#endif
protected:
void OnStopped(Task *task);
void AbortAllChildren();
TaskParent *parent() {
return parent_;
}
private:
void Initialize();
void OnChildStopped(Task *child);
void AddChild(Task *child);
TaskParent *parent_;
TaskRunner *runner_;
bool child_error_;
typedef std::set<Task *> ChildSet;
std::unique_ptr<ChildSet> children_;
RTC_DISALLOW_COPY_AND_ASSIGN(TaskParent);
};
} // namespace rtc
#endif // WEBRTC_BASE_TASKPARENT_H__

View File

@ -1,216 +0,0 @@
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <algorithm>
#include "webrtc/base/taskrunner.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/task.h"
#include "webrtc/base/logging.h"
namespace rtc {
TaskRunner::TaskRunner()
: TaskParent(this) {}
TaskRunner::~TaskRunner() {
// this kills and deletes children silently!
AbortAllChildren();
InternalRunTasks(true);
}
void TaskRunner::StartTask(Task * task) {
tasks_.push_back(task);
// the task we just started could be about to timeout --
// make sure our "next timeout task" is correct
UpdateTaskTimeout(task, 0);
WakeTasks();
}
void TaskRunner::RunTasks() {
InternalRunTasks(false);
}
void TaskRunner::InternalRunTasks(bool in_destructor) {
// This shouldn't run while an abort is happening.
// If that occurs, then tasks may be deleted in this method,
// but pointers to them will still be in the
// "ChildSet copy" in TaskParent::AbortAllChildren.
// Subsequent use of those task may cause data corruption or crashes.
#if RTC_DCHECK_IS_ON
RTC_DCHECK(!abort_count_);
#endif
// Running continues until all tasks are Blocked (ok for a small # of tasks)
if (tasks_running_) {
return; // don't reenter
}
tasks_running_ = true;
int64_t previous_timeout_time = next_task_timeout();
int did_run = true;
while (did_run) {
did_run = false;
// use indexing instead of iterators because tasks_ may grow
for (size_t i = 0; i < tasks_.size(); ++i) {
while (!tasks_[i]->Blocked()) {
tasks_[i]->Step();
did_run = true;
}
}
}
// Tasks are deleted when running has paused
bool need_timeout_recalc = false;
for (size_t i = 0; i < tasks_.size(); ++i) {
if (tasks_[i]->IsDone()) {
Task* task = tasks_[i];
if (next_timeout_task_ &&
task->unique_id() == next_timeout_task_->unique_id()) {
next_timeout_task_ = nullptr;
need_timeout_recalc = true;
}
#if RTC_DCHECK_IS_ON
deleting_task_ = task;
#endif
delete task;
#if RTC_DCHECK_IS_ON
deleting_task_ = nullptr;
#endif
tasks_[i] = nullptr;
}
}
// Finally, remove nulls.
std::vector<Task *>::iterator it;
it = std::remove(tasks_.begin(), tasks_.end(), nullptr);
tasks_.erase(it, tasks_.end());
if (need_timeout_recalc)
RecalcNextTimeout(nullptr);
// Make sure that adjustments are done to account
// for any timeout changes (but don't call this
// while being destroyed since it calls a pure virtual function).
if (!in_destructor)
CheckForTimeoutChange(previous_timeout_time);
tasks_running_ = false;
}
void TaskRunner::PollTasks() {
// see if our "next potentially timed-out task" has indeed timed out.
// If it has, wake it up, then queue up the next task in line
// Repeat while we have new timed-out tasks.
// TODO: We need to guard against WakeTasks not updating
// next_timeout_task_. Maybe also add documentation in the header file once
// we understand this code better.
Task* old_timeout_task = nullptr;
while (next_timeout_task_ &&
old_timeout_task != next_timeout_task_ &&
next_timeout_task_->TimedOut()) {
old_timeout_task = next_timeout_task_;
next_timeout_task_->Wake();
WakeTasks();
}
}
int64_t TaskRunner::next_task_timeout() const {
if (next_timeout_task_) {
return next_timeout_task_->timeout_time();
}
return 0;
}
// this function gets called frequently -- when each task changes
// state to something other than DONE, ERROR or BLOCKED, it calls
// ResetTimeout(), which will call this function to make sure that
// the next timeout-able task hasn't changed. The logic in this function
// prevents RecalcNextTimeout() from getting called in most cases,
// effectively making the task scheduler O-1 instead of O-N
void TaskRunner::UpdateTaskTimeout(Task* task,
int64_t previous_task_timeout_time) {
RTC_DCHECK(task != nullptr);
int64_t previous_timeout_time = next_task_timeout();
bool task_is_timeout_task =
next_timeout_task_ != nullptr &&
task->unique_id() == next_timeout_task_->unique_id();
if (task_is_timeout_task) {
previous_timeout_time = previous_task_timeout_time;
}
// if the relevant task has a timeout, then
// check to see if it's closer than the current
// "about to timeout" task
if (task->timeout_time()) {
if (next_timeout_task_ == nullptr ||
(task->timeout_time() <= next_timeout_task_->timeout_time())) {
next_timeout_task_ = task;
}
} else if (task_is_timeout_task) {
// otherwise, if the task doesn't have a timeout,
// and it used to be our "about to timeout" task,
// walk through all the tasks looking for the real
// "about to timeout" task
RecalcNextTimeout(task);
}
// Note when task_running_, then the running routine
// (TaskRunner::InternalRunTasks) is responsible for calling
// CheckForTimeoutChange.
if (!tasks_running_) {
CheckForTimeoutChange(previous_timeout_time);
}
}
void TaskRunner::RecalcNextTimeout(Task *exclude_task) {
// walk through all the tasks looking for the one
// which satisfies the following:
// it's not finished already
// we're not excluding it
// it has the closest timeout time
int64_t next_timeout_time = 0;
next_timeout_task_ = nullptr;
for (size_t i = 0; i < tasks_.size(); ++i) {
Task *task = tasks_[i];
// if the task isn't complete, and it actually has a timeout time
if (!task->IsDone() && (task->timeout_time() > 0))
// if it doesn't match our "exclude" task
if (exclude_task == nullptr ||
exclude_task->unique_id() != task->unique_id())
// if its timeout time is sooner than our current timeout time
if (next_timeout_time == 0 ||
task->timeout_time() <= next_timeout_time) {
// set this task as our next-to-timeout
next_timeout_time = task->timeout_time();
next_timeout_task_ = task;
}
}
}
void TaskRunner::CheckForTimeoutChange(int64_t previous_timeout_time) {
int64_t next_timeout = next_task_timeout();
bool timeout_change = (previous_timeout_time == 0 && next_timeout != 0) ||
next_timeout < previous_timeout_time ||
(previous_timeout_time <= CurrentTime() &&
previous_timeout_time != next_timeout);
if (timeout_change) {
OnTimeoutChange();
}
}
} // namespace rtc

View File

@ -1,102 +0,0 @@
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_BASE_TASKRUNNER_H__
#define WEBRTC_BASE_TASKRUNNER_H__
#include <stdint.h>
#include <vector>
#include "webrtc/base/checks.h"
#include "webrtc/base/sigslot.h"
#include "webrtc/base/taskparent.h"
namespace rtc {
class Task;
const int64_t kSecToMsec = 1000;
const int64_t kMsecTo100ns = 10000;
const int64_t kSecTo100ns = kSecToMsec * kMsecTo100ns;
class TaskRunner : public TaskParent, public sigslot::has_slots<> {
public:
TaskRunner();
~TaskRunner() override;
virtual void WakeTasks() = 0;
// Returns the current time in 100ns units. It is used for
// determining timeouts. The origin is not important, only
// the units and that rollover while the computer is running.
//
// On Windows, GetSystemTimeAsFileTime is the typical implementation.
virtual int64_t CurrentTime() = 0;
void StartTask(Task *task);
void RunTasks();
void PollTasks();
void UpdateTaskTimeout(Task* task, int64_t previous_task_timeout_time);
#if RTC_DCHECK_IS_ON
bool is_ok_to_delete(Task* task) {
return task == deleting_task_;
}
void IncrementAbortCount() {
++abort_count_;
}
void DecrementAbortCount() {
--abort_count_;
}
#endif
// Returns the next absolute time when a task times out
// OR "0" if there is no next timeout.
int64_t next_task_timeout() const;
protected:
// The primary usage of this method is to know if
// a callback timer needs to be set-up or adjusted.
// This method will be called
// * when the next_task_timeout() becomes a smaller value OR
// * when next_task_timeout() has changed values and the previous
// value is in the past.
//
// If the next_task_timeout moves to the future, this method will *not*
// get called (because it subclass should check next_task_timeout()
// when its timer goes off up to see if it needs to set-up a new timer).
//
// Note that this maybe called conservatively. In that it may be
// called when no time change has happened.
virtual void OnTimeoutChange() {
// by default, do nothing.
}
private:
void InternalRunTasks(bool in_destructor);
void CheckForTimeoutChange(int64_t previous_timeout_time);
std::vector<Task *> tasks_;
Task *next_timeout_task_ = nullptr;
bool tasks_running_ = false;
#if RTC_DCHECK_IS_ON
int abort_count_ = 0;
Task* deleting_task_ = nullptr;
#endif
void RecalcNextTimeout(Task *exclude_task);
};
} // namespace rtc
#endif // TASK_BASE_TASKRUNNER_H__