Migrate RepeatingTask to take raw pointer to TaskQueueBase instead of TaskQueue

In particular replace call rtc::TaskQueue::Current with TaskQueueBase::Current

Bug: webrtc:10191
Change-Id: I19d42a716d27f0aba087dc70ac65b4ee6249408f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125085
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27005}
This commit is contained in:
Danil Chapovalov 2019-03-06 18:41:39 +01:00 committed by Commit Bot
parent 11e55ee90a
commit 4423c36448
12 changed files with 46 additions and 32 deletions

View File

@ -21,6 +21,7 @@ rtc_source_set("task_queue") {
deps = [
"../../rtc_base:checks",
"../../rtc_base:macromagic",
"//third_party/abseil-cpp/absl/base:config",
"//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/strings",

View File

@ -1,4 +1,9 @@
specific_include_rules = {
"task_queue_base\.h": [
# Make TaskQueueBase RTC_LOCKABALE to allow annotate variables are only
# accessed on specific task queue.
"+rtc_base/thread_annotations.h",
],
"task_queue_test\.h": [
"+test/gtest.h",
],

View File

@ -13,6 +13,7 @@
#include <memory>
#include "api/task_queue/queued_task.h"
#include "rtc_base/thread_annotations.h"
// TODO(bugs.webrtc.org/10191): Remove when
// rtc::TaskQueue* rtc::TaskQueue::Current() is unused.
@ -26,7 +27,7 @@ namespace webrtc {
// in FIFO order and that tasks never overlap. Tasks may always execute on the
// same worker thread and they may not. To DCHECK that tasks are executing on a
// known task queue, use IsCurrent().
class TaskQueueBase {
class RTC_LOCKABLE TaskQueueBase {
public:
// Starts destruction of the task queue.
// On return ensures no task are running and no new tasks are able to start

View File

@ -472,7 +472,7 @@ void RtpTransportControllerSend::UpdateInitialConstraints(
void RtpTransportControllerSend::StartProcessPeriodicTasks() {
if (!pacer_queue_update_task_.Running()) {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
&task_queue_, kPacerQueueUpdateInterval, [this]() {
task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time =
TimeDelta::ms(pacer_.ExpectedQueueTimeMs());
@ -484,7 +484,7 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() {
controller_task_.Stop();
if (process_interval_.IsFinite()) {
controller_task_ = RepeatingTaskHandle::DelayedStart(
&task_queue_, process_interval_, [this]() {
task_queue_.Get(), process_interval_, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
UpdateControllerWithTimeInterval();
return process_interval_;

View File

@ -333,7 +333,7 @@ void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() {
void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
periodic_task_handle_ = RepeatingTaskHandle::DelayedStart(
config_.task_queue, TimeDelta::ms(delay_ms), [this] {
config_.task_queue->Get(), TimeDelta::ms(delay_ms), [this] {
RTC_DCHECK(config_.schedule_periodic_compound_packets);
RTC_DCHECK(ready_to_send_);
SendPeriodicCompoundPacket();

View File

@ -50,7 +50,7 @@ class FakePeriodicVideoSource final
frame_source_.SetRotation(config.rotation);
TimeDelta frame_interval = TimeDelta::ms(config.frame_interval_ms);
RepeatingTaskHandle::Start(task_queue_.get(), [this, frame_interval] {
RepeatingTaskHandle::Start(task_queue_->Get(), [this, frame_interval] {
if (broadcaster_.wants().rotation_applied) {
broadcaster_.OnFrame(frame_source_.GetFrameRotationApplied());
} else {

View File

@ -14,11 +14,12 @@ rtc_source_set("repeating_task") {
"repeating_task.h",
]
deps = [
":to_queued_task",
"..:logging",
"..:rtc_task_queue",
"..:sequenced_task_checker",
"..:thread_checker",
"..:timeutils",
"../../api/task_queue",
"../../api/units:time_delta",
"../../api/units:timestamp",
"//third_party/abseil-cpp/absl/memory",
@ -44,6 +45,7 @@ if (rtc_include_tests) {
deps = [
":repeating_task",
"..:rtc_base_approved",
"..:rtc_task_queue",
"../../test:test_support",
"//third_party/abseil-cpp/absl/memory",
]

View File

@ -9,12 +9,14 @@
*/
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
namespace webrtc {
namespace webrtc_repeating_task_impl {
RepeatingTaskBase::RepeatingTaskBase(rtc::TaskQueue* task_queue,
RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue,
TimeDelta first_delay)
: task_queue_(task_queue),
next_run_time_(Timestamp::us(rtc::TimeMicros()) + first_delay) {}
@ -57,10 +59,10 @@ void RepeatingTaskBase::PostStop() {
RTC_DLOG(LS_INFO) << "Using PostStop() from the task queue running the "
"repeated task. Consider calling Stop() instead.";
}
task_queue_->PostTask([this] {
task_queue_->PostTask(ToQueuedTask([this] {
RTC_DCHECK_RUN_ON(task_queue_);
Stop();
});
}));
}
} // namespace webrtc_repeating_task_impl

View File

@ -15,10 +15,11 @@
#include <utility>
#include "absl/memory/memory.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/sequenced_task_checker.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
@ -26,9 +27,9 @@ namespace webrtc {
class RepeatingTaskHandle;
namespace webrtc_repeating_task_impl {
class RepeatingTaskBase : public rtc::QueuedTask {
class RepeatingTaskBase : public QueuedTask {
public:
RepeatingTaskBase(rtc::TaskQueue* task_queue, TimeDelta first_delay);
RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay);
~RepeatingTaskBase() override;
virtual TimeDelta RunClosure() = 0;
@ -39,7 +40,7 @@ class RepeatingTaskBase : public rtc::QueuedTask {
void Stop() RTC_RUN_ON(task_queue_);
void PostStop();
rtc::TaskQueue* const task_queue_;
TaskQueueBase* const task_queue_;
// This is always finite, except for the special case where it's PlusInfinity
// to signal that the task should stop.
Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_);
@ -49,7 +50,7 @@ class RepeatingTaskBase : public rtc::QueuedTask {
template <class Closure>
class RepeatingTaskImpl final : public RepeatingTaskBase {
public:
RepeatingTaskImpl(rtc::TaskQueue* task_queue,
RepeatingTaskImpl(TaskQueueBase* task_queue,
TimeDelta first_delay,
Closure&& closure)
: RepeatingTaskBase(task_queue, first_delay),
@ -91,7 +92,7 @@ class RepeatingTaskHandle {
// perfectly fine to destroy the handle while the task is running, since the
// repeated task is owned by the TaskQueue.
template <class Closure>
static RepeatingTaskHandle Start(rtc::TaskQueue* task_queue,
static RepeatingTaskHandle Start(TaskQueueBase* task_queue,
Closure&& closure) {
auto repeating_task = absl::make_unique<
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
@ -102,13 +103,13 @@ class RepeatingTaskHandle {
}
template <class Closure>
static RepeatingTaskHandle Start(Closure&& closure) {
return Start(rtc::TaskQueue::Current(), std::forward<Closure>(closure));
return Start(TaskQueueBase::Current(), std::forward<Closure>(closure));
}
// DelayedStart is equivalent to Start except that the first invocation of the
// closure will be delayed by the given amount.
template <class Closure>
static RepeatingTaskHandle DelayedStart(rtc::TaskQueue* task_queue,
static RepeatingTaskHandle DelayedStart(TaskQueueBase* task_queue,
TimeDelta first_delay,
Closure&& closure) {
auto repeating_task = absl::make_unique<
@ -121,7 +122,7 @@ class RepeatingTaskHandle {
template <class Closure>
static RepeatingTaskHandle DelayedStart(TimeDelta first_delay,
Closure&& closure) {
return DelayedStart(rtc::TaskQueue::Current(), first_delay,
return DelayedStart(TaskQueueBase::Current(), first_delay,
std::forward<Closure>(closure));
}

View File

@ -15,6 +15,7 @@
#include "absl/memory/memory.h"
#include "rtc_base/event.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "test/gmock.h"
#include "test/gtest.h"
@ -69,7 +70,7 @@ TEST(RepeatingTaskTest, TaskIsStoppedOnStop) {
rtc::TaskQueue task_queue("TestQueue");
std::atomic_int counter(0);
auto handle = RepeatingTaskHandle::Start(&task_queue, [&] {
auto handle = RepeatingTaskHandle::Start(task_queue.Get(), [&] {
if (++counter >= kShortIntervalCount)
return kLongInterval;
return kShortInterval;
@ -96,7 +97,7 @@ TEST(RepeatingTaskTest, CompensatesForLongRunTime) {
std::atomic_int counter(0);
rtc::TaskQueue task_queue("TestQueue");
RepeatingTaskHandle::Start(&task_queue, [&] {
RepeatingTaskHandle::Start(task_queue.Get(), [&] {
if (++counter == kSleepAtCount)
Sleep(kSleepDuration);
return kRepeatInterval;
@ -110,7 +111,7 @@ TEST(RepeatingTaskTest, CompensatesForLongRunTime) {
TEST(RepeatingTaskTest, CompensatesForShortRunTime) {
std::atomic_int counter(0);
rtc::TaskQueue task_queue("TestQueue");
RepeatingTaskHandle::Start(&task_queue, [&] {
RepeatingTaskHandle::Start(task_queue.Get(), [&] {
++counter;
// Sleeping for the 10 ms should be compensated.
Sleep(TimeDelta::ms(10));
@ -130,7 +131,7 @@ TEST(RepeatingTaskTest, CancelDelayedTaskBeforeItRuns) {
EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); }));
rtc::TaskQueue task_queue("queue");
auto handle = RepeatingTaskHandle::DelayedStart(
&task_queue, TimeDelta::ms(100), MoveOnlyClosure(&mock));
task_queue.Get(), TimeDelta::ms(100), MoveOnlyClosure(&mock));
handle.PostStop();
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@ -141,7 +142,8 @@ TEST(RepeatingTaskTest, CancelTaskAfterItRuns) {
EXPECT_CALL(mock, Call).WillOnce(Return(TimeDelta::ms(100)));
EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); }));
rtc::TaskQueue task_queue("queue");
auto handle = RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&mock));
auto handle =
RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&mock));
handle.PostStop();
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@ -151,7 +153,7 @@ TEST(RepeatingTaskTest, TaskCanStopItself) {
rtc::TaskQueue task_queue("TestQueue");
RepeatingTaskHandle handle;
task_queue.PostTask([&] {
handle = RepeatingTaskHandle::Start(&task_queue, [&] {
handle = RepeatingTaskHandle::Start(task_queue.Get(), [&] {
++counter;
handle.Stop();
return TimeDelta::ms(2);
@ -171,7 +173,7 @@ TEST(RepeatingTaskTest, ZeroReturnValueRepostsTheTask) {
return kTimeout;
}));
rtc::TaskQueue task_queue("queue");
RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&closure));
RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&closure));
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@ -186,7 +188,7 @@ TEST(RepeatingTaskTest, StartPeriodicTask) {
return kTimeout;
}));
rtc::TaskQueue task_queue("queue");
RepeatingTaskHandle::Start(&task_queue, closure.AsStdFunction());
RepeatingTaskHandle::Start(task_queue.Get(), closure.AsStdFunction());
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@ -196,7 +198,7 @@ TEST(RepeatingTaskTest, Example) {
void DoPeriodicTask() {}
TimeDelta TimeUntilNextRun() { return TimeDelta::ms(100); }
void StartPeriodicTask(RepeatingTaskHandle* handle,
rtc::TaskQueue* task_queue) {
TaskQueueBase* task_queue) {
*handle = RepeatingTaskHandle::Start(task_queue, [this] {
DoPeriodicTask();
return TimeUntilNextRun();
@ -207,10 +209,10 @@ TEST(RepeatingTaskTest, Example) {
auto object = absl::make_unique<ObjectOnTaskQueue>();
// Create and start the periodic task.
RepeatingTaskHandle handle;
object->StartPeriodicTask(&handle, &task_queue);
object->StartPeriodicTask(&handle, task_queue.Get());
// Restart the task
handle.PostStop();
object->StartPeriodicTask(&handle, &task_queue);
object->StartPeriodicTask(&handle, task_queue.Get());
handle.PostStop();
struct Destructor {
void operator()() { object.reset(); }

View File

@ -141,8 +141,8 @@ bool FrameGeneratorCapturer::Init() {
return false;
RepeatingTaskHandle::DelayedStart(
&task_queue_, TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(),
[this] {
task_queue_.Get(),
TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(), [this] {
InsertFrame();
return TimeDelta::seconds(1) / GetCurrentConfiguredFramerate();
});

View File

@ -41,7 +41,7 @@ NetworkEmulationManager::NetworkEmulationManager()
next_node_id_(1),
next_ip4_address_(kMinIPv4Address),
task_queue_("network_emulation_manager") {
process_task_handle_ = RepeatingTaskHandle::Start(&task_queue_, [this] {
process_task_handle_ = RepeatingTaskHandle::Start(task_queue_.Get(), [this] {
ProcessNetworkPackets();
return TimeDelta::ms(kPacketProcessingIntervalMs);
});