diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index c0648b77ea..0129512ae8 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -21,6 +21,7 @@ rtc_source_set("task_queue") { deps = [ "../../rtc_base:checks", + "../../rtc_base:macromagic", "//third_party/abseil-cpp/absl/base:config", "//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/strings", diff --git a/api/task_queue/DEPS b/api/task_queue/DEPS index 9cb6b1fe79..fab6056212 100644 --- a/api/task_queue/DEPS +++ b/api/task_queue/DEPS @@ -1,4 +1,9 @@ specific_include_rules = { + "task_queue_base\.h": [ + # Make TaskQueueBase RTC_LOCKABALE to allow annotate variables are only + # accessed on specific task queue. + "+rtc_base/thread_annotations.h", + ], "task_queue_test\.h": [ "+test/gtest.h", ], diff --git a/api/task_queue/task_queue_base.h b/api/task_queue/task_queue_base.h index b1b5cc7f1b..fade0057ce 100644 --- a/api/task_queue/task_queue_base.h +++ b/api/task_queue/task_queue_base.h @@ -13,6 +13,7 @@ #include #include "api/task_queue/queued_task.h" +#include "rtc_base/thread_annotations.h" // TODO(bugs.webrtc.org/10191): Remove when // rtc::TaskQueue* rtc::TaskQueue::Current() is unused. @@ -26,7 +27,7 @@ namespace webrtc { // in FIFO order and that tasks never overlap. Tasks may always execute on the // same worker thread and they may not. To DCHECK that tasks are executing on a // known task queue, use IsCurrent(). -class TaskQueueBase { +class RTC_LOCKABLE TaskQueueBase { public: // Starts destruction of the task queue. // On return ensures no task are running and no new tasks are able to start diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 5bf5737248..a414c3b425 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -472,7 +472,7 @@ void RtpTransportControllerSend::UpdateInitialConstraints( void RtpTransportControllerSend::StartProcessPeriodicTasks() { if (!pacer_queue_update_task_.Running()) { pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart( - &task_queue_, kPacerQueueUpdateInterval, [this]() { + task_queue_.Get(), kPacerQueueUpdateInterval, [this]() { RTC_DCHECK_RUN_ON(&task_queue_); TimeDelta expected_queue_time = TimeDelta::ms(pacer_.ExpectedQueueTimeMs()); @@ -484,7 +484,7 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() { controller_task_.Stop(); if (process_interval_.IsFinite()) { controller_task_ = RepeatingTaskHandle::DelayedStart( - &task_queue_, process_interval_, [this]() { + task_queue_.Get(), process_interval_, [this]() { RTC_DCHECK_RUN_ON(&task_queue_); UpdateControllerWithTimeInterval(); return process_interval_; diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc index 5d2cd6ea22..c6f8931075 100644 --- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc +++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc @@ -333,7 +333,7 @@ void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() { void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) { periodic_task_handle_ = RepeatingTaskHandle::DelayedStart( - config_.task_queue, TimeDelta::ms(delay_ms), [this] { + config_.task_queue->Get(), TimeDelta::ms(delay_ms), [this] { RTC_DCHECK(config_.schedule_periodic_compound_packets); RTC_DCHECK(ready_to_send_); SendPeriodicCompoundPacket(); diff --git a/pc/test/fake_periodic_video_source.h b/pc/test/fake_periodic_video_source.h index 923f10ae4d..00b2e21050 100644 --- a/pc/test/fake_periodic_video_source.h +++ b/pc/test/fake_periodic_video_source.h @@ -50,7 +50,7 @@ class FakePeriodicVideoSource final frame_source_.SetRotation(config.rotation); TimeDelta frame_interval = TimeDelta::ms(config.frame_interval_ms); - RepeatingTaskHandle::Start(task_queue_.get(), [this, frame_interval] { + RepeatingTaskHandle::Start(task_queue_->Get(), [this, frame_interval] { if (broadcaster_.wants().rotation_applied) { broadcaster_.OnFrame(frame_source_.GetFrameRotationApplied()); } else { diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index 0ec6739d8c..16204d159c 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -14,11 +14,12 @@ rtc_source_set("repeating_task") { "repeating_task.h", ] deps = [ + ":to_queued_task", "..:logging", - "..:rtc_task_queue", "..:sequenced_task_checker", "..:thread_checker", "..:timeutils", + "../../api/task_queue", "../../api/units:time_delta", "../../api/units:timestamp", "//third_party/abseil-cpp/absl/memory", @@ -44,6 +45,7 @@ if (rtc_include_tests) { deps = [ ":repeating_task", "..:rtc_base_approved", + "..:rtc_task_queue", "../../test:test_support", "//third_party/abseil-cpp/absl/memory", ] diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc index 5f366cb000..8d64334cff 100644 --- a/rtc_base/task_utils/repeating_task.cc +++ b/rtc_base/task_utils/repeating_task.cc @@ -9,12 +9,14 @@ */ #include "rtc_base/task_utils/repeating_task.h" + #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" namespace webrtc { namespace webrtc_repeating_task_impl { -RepeatingTaskBase::RepeatingTaskBase(rtc::TaskQueue* task_queue, +RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay) : task_queue_(task_queue), next_run_time_(Timestamp::us(rtc::TimeMicros()) + first_delay) {} @@ -57,10 +59,10 @@ void RepeatingTaskBase::PostStop() { RTC_DLOG(LS_INFO) << "Using PostStop() from the task queue running the " "repeated task. Consider calling Stop() instead."; } - task_queue_->PostTask([this] { + task_queue_->PostTask(ToQueuedTask([this] { RTC_DCHECK_RUN_ON(task_queue_); Stop(); - }); + })); } } // namespace webrtc_repeating_task_impl diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h index c4e760a55d..ee6035cad0 100644 --- a/rtc_base/task_utils/repeating_task.h +++ b/rtc_base/task_utils/repeating_task.h @@ -15,10 +15,11 @@ #include #include "absl/memory/memory.h" +#include "api/task_queue/queued_task.h" +#include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "rtc_base/sequenced_task_checker.h" -#include "rtc_base/task_queue.h" #include "rtc_base/thread_checker.h" namespace webrtc { @@ -26,9 +27,9 @@ namespace webrtc { class RepeatingTaskHandle; namespace webrtc_repeating_task_impl { -class RepeatingTaskBase : public rtc::QueuedTask { +class RepeatingTaskBase : public QueuedTask { public: - RepeatingTaskBase(rtc::TaskQueue* task_queue, TimeDelta first_delay); + RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay); ~RepeatingTaskBase() override; virtual TimeDelta RunClosure() = 0; @@ -39,7 +40,7 @@ class RepeatingTaskBase : public rtc::QueuedTask { void Stop() RTC_RUN_ON(task_queue_); void PostStop(); - rtc::TaskQueue* const task_queue_; + TaskQueueBase* const task_queue_; // This is always finite, except for the special case where it's PlusInfinity // to signal that the task should stop. Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); @@ -49,7 +50,7 @@ class RepeatingTaskBase : public rtc::QueuedTask { template class RepeatingTaskImpl final : public RepeatingTaskBase { public: - RepeatingTaskImpl(rtc::TaskQueue* task_queue, + RepeatingTaskImpl(TaskQueueBase* task_queue, TimeDelta first_delay, Closure&& closure) : RepeatingTaskBase(task_queue, first_delay), @@ -91,7 +92,7 @@ class RepeatingTaskHandle { // perfectly fine to destroy the handle while the task is running, since the // repeated task is owned by the TaskQueue. template - static RepeatingTaskHandle Start(rtc::TaskQueue* task_queue, + static RepeatingTaskHandle Start(TaskQueueBase* task_queue, Closure&& closure) { auto repeating_task = absl::make_unique< webrtc_repeating_task_impl::RepeatingTaskImpl>( @@ -102,13 +103,13 @@ class RepeatingTaskHandle { } template static RepeatingTaskHandle Start(Closure&& closure) { - return Start(rtc::TaskQueue::Current(), std::forward(closure)); + return Start(TaskQueueBase::Current(), std::forward(closure)); } // DelayedStart is equivalent to Start except that the first invocation of the // closure will be delayed by the given amount. template - static RepeatingTaskHandle DelayedStart(rtc::TaskQueue* task_queue, + static RepeatingTaskHandle DelayedStart(TaskQueueBase* task_queue, TimeDelta first_delay, Closure&& closure) { auto repeating_task = absl::make_unique< @@ -121,7 +122,7 @@ class RepeatingTaskHandle { template static RepeatingTaskHandle DelayedStart(TimeDelta first_delay, Closure&& closure) { - return DelayedStart(rtc::TaskQueue::Current(), first_delay, + return DelayedStart(TaskQueueBase::Current(), first_delay, std::forward(closure)); } diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc index 52683e3d60..244bb8e403 100644 --- a/rtc_base/task_utils/repeating_task_unittest.cc +++ b/rtc_base/task_utils/repeating_task_unittest.cc @@ -15,6 +15,7 @@ #include "absl/memory/memory.h" #include "rtc_base/event.h" +#include "rtc_base/task_queue.h" #include "rtc_base/task_utils/repeating_task.h" #include "test/gmock.h" #include "test/gtest.h" @@ -69,7 +70,7 @@ TEST(RepeatingTaskTest, TaskIsStoppedOnStop) { rtc::TaskQueue task_queue("TestQueue"); std::atomic_int counter(0); - auto handle = RepeatingTaskHandle::Start(&task_queue, [&] { + auto handle = RepeatingTaskHandle::Start(task_queue.Get(), [&] { if (++counter >= kShortIntervalCount) return kLongInterval; return kShortInterval; @@ -96,7 +97,7 @@ TEST(RepeatingTaskTest, CompensatesForLongRunTime) { std::atomic_int counter(0); rtc::TaskQueue task_queue("TestQueue"); - RepeatingTaskHandle::Start(&task_queue, [&] { + RepeatingTaskHandle::Start(task_queue.Get(), [&] { if (++counter == kSleepAtCount) Sleep(kSleepDuration); return kRepeatInterval; @@ -110,7 +111,7 @@ TEST(RepeatingTaskTest, CompensatesForLongRunTime) { TEST(RepeatingTaskTest, CompensatesForShortRunTime) { std::atomic_int counter(0); rtc::TaskQueue task_queue("TestQueue"); - RepeatingTaskHandle::Start(&task_queue, [&] { + RepeatingTaskHandle::Start(task_queue.Get(), [&] { ++counter; // Sleeping for the 10 ms should be compensated. Sleep(TimeDelta::ms(10)); @@ -130,7 +131,7 @@ TEST(RepeatingTaskTest, CancelDelayedTaskBeforeItRuns) { EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); rtc::TaskQueue task_queue("queue"); auto handle = RepeatingTaskHandle::DelayedStart( - &task_queue, TimeDelta::ms(100), MoveOnlyClosure(&mock)); + task_queue.Get(), TimeDelta::ms(100), MoveOnlyClosure(&mock)); handle.PostStop(); EXPECT_TRUE(done.Wait(kTimeout.ms())); } @@ -141,7 +142,8 @@ TEST(RepeatingTaskTest, CancelTaskAfterItRuns) { EXPECT_CALL(mock, Call).WillOnce(Return(TimeDelta::ms(100))); EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); })); rtc::TaskQueue task_queue("queue"); - auto handle = RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&mock)); + auto handle = + RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&mock)); handle.PostStop(); EXPECT_TRUE(done.Wait(kTimeout.ms())); } @@ -151,7 +153,7 @@ TEST(RepeatingTaskTest, TaskCanStopItself) { rtc::TaskQueue task_queue("TestQueue"); RepeatingTaskHandle handle; task_queue.PostTask([&] { - handle = RepeatingTaskHandle::Start(&task_queue, [&] { + handle = RepeatingTaskHandle::Start(task_queue.Get(), [&] { ++counter; handle.Stop(); return TimeDelta::ms(2); @@ -171,7 +173,7 @@ TEST(RepeatingTaskTest, ZeroReturnValueRepostsTheTask) { return kTimeout; })); rtc::TaskQueue task_queue("queue"); - RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&closure)); + RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&closure)); EXPECT_TRUE(done.Wait(kTimeout.ms())); } @@ -186,7 +188,7 @@ TEST(RepeatingTaskTest, StartPeriodicTask) { return kTimeout; })); rtc::TaskQueue task_queue("queue"); - RepeatingTaskHandle::Start(&task_queue, closure.AsStdFunction()); + RepeatingTaskHandle::Start(task_queue.Get(), closure.AsStdFunction()); EXPECT_TRUE(done.Wait(kTimeout.ms())); } @@ -196,7 +198,7 @@ TEST(RepeatingTaskTest, Example) { void DoPeriodicTask() {} TimeDelta TimeUntilNextRun() { return TimeDelta::ms(100); } void StartPeriodicTask(RepeatingTaskHandle* handle, - rtc::TaskQueue* task_queue) { + TaskQueueBase* task_queue) { *handle = RepeatingTaskHandle::Start(task_queue, [this] { DoPeriodicTask(); return TimeUntilNextRun(); @@ -207,10 +209,10 @@ TEST(RepeatingTaskTest, Example) { auto object = absl::make_unique(); // Create and start the periodic task. RepeatingTaskHandle handle; - object->StartPeriodicTask(&handle, &task_queue); + object->StartPeriodicTask(&handle, task_queue.Get()); // Restart the task handle.PostStop(); - object->StartPeriodicTask(&handle, &task_queue); + object->StartPeriodicTask(&handle, task_queue.Get()); handle.PostStop(); struct Destructor { void operator()() { object.reset(); } diff --git a/test/frame_generator_capturer.cc b/test/frame_generator_capturer.cc index 885ff2c9d1..5419d939d7 100644 --- a/test/frame_generator_capturer.cc +++ b/test/frame_generator_capturer.cc @@ -141,8 +141,8 @@ bool FrameGeneratorCapturer::Init() { return false; RepeatingTaskHandle::DelayedStart( - &task_queue_, TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(), - [this] { + task_queue_.Get(), + TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(), [this] { InsertFrame(); return TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(); }); diff --git a/test/scenario/network/network_emulation_manager.cc b/test/scenario/network/network_emulation_manager.cc index 1c9f249875..7558cb4921 100644 --- a/test/scenario/network/network_emulation_manager.cc +++ b/test/scenario/network/network_emulation_manager.cc @@ -41,7 +41,7 @@ NetworkEmulationManager::NetworkEmulationManager() next_node_id_(1), next_ip4_address_(kMinIPv4Address), task_queue_("network_emulation_manager") { - process_task_handle_ = RepeatingTaskHandle::Start(&task_queue_, [this] { + process_task_handle_ = RepeatingTaskHandle::Start(task_queue_.Get(), [this] { ProcessNetworkPackets(); return TimeDelta::ms(kPacketProcessingIntervalMs); });