diff --git a/api/metronome/BUILD.gn b/api/metronome/BUILD.gn index 3d3d876df0..269b847384 100644 --- a/api/metronome/BUILD.gn +++ b/api/metronome/BUILD.gn @@ -10,7 +10,10 @@ import("../../webrtc.gni") rtc_source_set("metronome") { visibility = [ "*" ] - sources = [ "metronome.h" ] + sources = [ + "metronome.cc", + "metronome.h", + ] deps = [ "../../rtc_base/system:rtc_export", "../task_queue", diff --git a/api/metronome/metronome.cc b/api/metronome/metronome.cc new file mode 100644 index 0000000000..8d74f928a0 --- /dev/null +++ b/api/metronome/metronome.cc @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "api/metronome/metronome.h" + +namespace webrtc { + +// TODO(crbug.com/1381982): Remove outdated methods. +void Metronome::AddListener(TickListener* listener) {} +void Metronome::RemoveListener(TickListener* listener) {} + +} // namespace webrtc diff --git a/api/metronome/metronome.h b/api/metronome/metronome.h index fc5f350db2..477afc2c92 100644 --- a/api/metronome/metronome.h +++ b/api/metronome/metronome.h @@ -17,44 +17,40 @@ namespace webrtc { -// The Metronome posts OnTick() on task queues provided by its listeners' task -// queue periodically. The metronome can be used as an alternative to using -// PostDelayedTask on a thread or task queue for coalescing work and reducing -// the number of idle-wakeups. -// -// Listeners can be added and removed from any sequence, but it is illegal to -// remove a listener from an OnTick invocation. +// The Metronome posts OnTick() calls requested with RequestCallOnNextTick. +// The API is designed to be fully used from a single task queue. Scheduled +// callbacks are executed on the same sequence as they were requested on. There +// are no features implemented for cancellation. When that's needed, use e.g. +// ScopedTaskSafety from the client. // // The metronome concept is still under experimentation, and may not be availble // in all platforms or applications. See https://crbug.com/1253787 for more // details. // -// Metronome implementations must be thread-safe. +// Metronome implementations must be thread-compatible. class RTC_EXPORT Metronome { public: + // TODO(crbug.com/1381982): remove stale classes and methods once downstream + // dependencies adapts. class RTC_EXPORT TickListener { public: virtual ~TickListener() = default; - - // OnTick is run on the task queue provided by OnTickTaskQueue each time the - // metronome ticks. virtual void OnTick() = 0; - - // The task queue that OnTick will run on. Must not be null. virtual TaskQueueBase* OnTickTaskQueue() = 0; }; virtual ~Metronome() = default; - // Adds a tick listener to the metronome. Once this method has returned - // OnTick will be invoked on each metronome tick. A listener may - // only be added to the metronome once. - virtual void AddListener(TickListener* listener) = 0; + // TODO(crbug.com/1381982): remove stale classes and methods once downstream + // dependencies adapts. + virtual void AddListener(TickListener* listener); + virtual void RemoveListener(TickListener* listener); - // Removes the tick listener from the metronome. Once this method has returned - // OnTick will never be called again. This method must not be called from - // within OnTick. - virtual void RemoveListener(TickListener* listener) = 0; + // Requests a call to `callback` on the next tick. Scheduled callbacks are + // executed on the same sequence as they were requested on. There are no + // features for cancellation. When that's needed, use e.g. ScopedTaskSafety + // from the client. + virtual void RequestCallOnNextTick(absl::AnyInvocable callback) {} // Returns the current tick period of the metronome. virtual TimeDelta TickPeriod() const = 0; diff --git a/api/metronome/test/BUILD.gn b/api/metronome/test/BUILD.gn index 0ea13b3de5..f415d98a0b 100644 --- a/api/metronome/test/BUILD.gn +++ b/api/metronome/test/BUILD.gn @@ -23,6 +23,7 @@ rtc_library("fake_metronome") { "../../../rtc_base:rtc_task_queue", "../../../rtc_base/synchronization:mutex", "../../../rtc_base/task_utils:repeating_task", + "../../../test:test_support", "../../task_queue", "../../units:time_delta", ] diff --git a/api/metronome/test/fake_metronome.cc b/api/metronome/test/fake_metronome.cc index cb471b9ba9..025f7ce5a6 100644 --- a/api/metronome/test/fake_metronome.cc +++ b/api/metronome/test/fake_metronome.cc @@ -10,8 +10,12 @@ #include "api/metronome/test/fake_metronome.h" +#include +#include + #include "api/priority.h" #include "api/sequence_checker.h" +#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_factory.h" #include "api/units/time_delta.h" #include "rtc_base/event.h" @@ -22,12 +26,9 @@ namespace webrtc::test { ForcedTickMetronome::ForcedTickMetronome(TimeDelta tick_period) : tick_period_(tick_period) {} -void ForcedTickMetronome::AddListener(TickListener* listener) { - listeners_.insert(listener); -} - -void ForcedTickMetronome::RemoveListener(TickListener* listener) { - listeners_.erase(listener); +void ForcedTickMetronome::RequestCallOnNextTick( + absl::AnyInvocable callback) { + callbacks_.push_back(std::move(callback)); } TimeDelta ForcedTickMetronome::TickPeriod() const { @@ -35,55 +36,35 @@ TimeDelta ForcedTickMetronome::TickPeriod() const { } size_t ForcedTickMetronome::NumListeners() { - return listeners_.size(); + return callbacks_.size(); } void ForcedTickMetronome::Tick() { - for (auto* listener : listeners_) { - listener->OnTickTaskQueue()->PostTask([listener] { listener->OnTick(); }); + std::vector> callbacks; + callbacks_.swap(callbacks); + for (auto& callback : callbacks) + std::move(callback)(); +} + +FakeMetronome::FakeMetronome(TimeDelta tick_period) + : tick_period_(tick_period) {} + +void FakeMetronome::RequestCallOnNextTick( + absl::AnyInvocable callback) { + TaskQueueBase* current = TaskQueueBase::Current(); + callbacks_.push_back(std::move(callback)); + if (callbacks_.size() == 1) { + current->PostDelayedTask( + [this] { + std::vector> callbacks; + callbacks_.swap(callbacks); + for (auto& callback : callbacks) + std::move(callback)(); + }, + tick_period_); } } -FakeMetronome::FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period) - : tick_period_(tick_period), - queue_(factory->CreateTaskQueue("MetronomeQueue", - TaskQueueFactory::Priority::HIGH)) {} - -FakeMetronome::~FakeMetronome() { - RTC_DCHECK(listeners_.empty()); -} - -void FakeMetronome::AddListener(TickListener* listener) { - MutexLock lock(&mutex_); - listeners_.insert(listener); - if (!started_) { - tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] { - MutexLock lock(&mutex_); - // Stop if empty. - if (listeners_.empty()) - return TimeDelta::PlusInfinity(); - for (auto* listener : listeners_) { - listener->OnTickTaskQueue()->PostTask( - [listener] { listener->OnTick(); }); - } - return tick_period_; - }); - started_ = true; - } -} - -void FakeMetronome::RemoveListener(TickListener* listener) { - MutexLock lock(&mutex_); - listeners_.erase(listener); -} - -void FakeMetronome::Stop() { - MutexLock lock(&mutex_); - RTC_DCHECK(listeners_.empty()); - if (started_) - queue_.PostTask([this] { tick_task_.Stop(); }); -} - TimeDelta FakeMetronome::TickPeriod() const { return tick_period_; } diff --git a/api/metronome/test/fake_metronome.h b/api/metronome/test/fake_metronome.h index 28a79e06ff..73c938e9cd 100644 --- a/api/metronome/test/fake_metronome.h +++ b/api/metronome/test/fake_metronome.h @@ -13,6 +13,7 @@ #include #include +#include #include "api/metronome/metronome.h" #include "api/task_queue/task_queue_base.h" @@ -36,13 +37,12 @@ class ForcedTickMetronome : public Metronome { size_t NumListeners(); // Metronome implementation. - void AddListener(TickListener* listener) override; - void RemoveListener(TickListener* listener) override; + void RequestCallOnNextTick(absl::AnyInvocable callback) override; TimeDelta TickPeriod() const override; private: const TimeDelta tick_period_; - std::set listeners_; + std::vector> callbacks_; }; // FakeMetronome is a metronome that ticks based on a repeating task at the @@ -53,23 +53,15 @@ class ForcedTickMetronome : public Metronome { // on the proper task queue. class FakeMetronome : public Metronome { public: - FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period); - ~FakeMetronome() override; + explicit FakeMetronome(TimeDelta tick_period); // Metronome implementation. - void AddListener(TickListener* listener) override; - void RemoveListener(TickListener* listener) override; + void RequestCallOnNextTick(absl::AnyInvocable callback) override; TimeDelta TickPeriod() const override; - void Stop(); - private: const TimeDelta tick_period_; - RepeatingTaskHandle tick_task_; - bool started_ RTC_GUARDED_BY(mutex_) = false; - std::set listeners_ RTC_GUARDED_BY(mutex_); - Mutex mutex_; - rtc::TaskQueue queue_; + std::vector> callbacks_; }; } // namespace webrtc::test diff --git a/call/call.cc b/call/call.cc index ae796cf6a0..9f8c6e0907 100644 --- a/call/call.cc +++ b/call/call.cc @@ -1037,11 +1037,13 @@ webrtc::VideoReceiveStreamInterface* Call::CreateVideoReceiveStream( // and `video_receiver_controller_` out of VideoReceiveStream2 construction // and set it up asynchronously on the network thread (the registration and // `video_receiver_controller_` need to live on the network thread). + // TODO(crbug.com/1381982): Re-enable decode synchronizer once the Chromium + // API has adapted to the new Metronome interface. VideoReceiveStream2* receive_stream = new VideoReceiveStream2( task_queue_factory_, this, num_cpu_cores_, transport_send_->packet_router(), std::move(configuration), call_stats_.get(), clock_, std::make_unique(clock_, trials()), - &nack_periodic_processor_, decode_sync_.get(), event_log_); + &nack_periodic_processor_, /*decode_sync=*/nullptr, event_log_); // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network // thread. receive_stream->RegisterWithTransport(&video_receiver_controller_); diff --git a/pc/peer_connection_factory.cc b/pc/peer_connection_factory.cc index cec909f03e..5495535167 100644 --- a/pc/peer_connection_factory.cc +++ b/pc/peer_connection_factory.cc @@ -110,6 +110,8 @@ PeerConnectionFactory::PeerConnectionFactory( PeerConnectionFactory::~PeerConnectionFactory() { RTC_DCHECK_RUN_ON(signaling_thread()); + // Ensures the metronome is destroyed on the worker thread. + worker_thread()->BlockingCall([metronome = std::move(metronome_)] {}); } void PeerConnectionFactory::SetOptions(const Options& options) { diff --git a/pc/peer_connection_factory.h b/pc/peer_connection_factory.h index 9bed5ba105..2851954a2f 100644 --- a/pc/peer_connection_factory.h +++ b/pc/peer_connection_factory.h @@ -152,7 +152,7 @@ class PeerConnectionFactory : public PeerConnectionFactoryInterface { std::unique_ptr neteq_factory_; const std::unique_ptr transport_controller_send_factory_; - std::unique_ptr metronome_; + std::unique_ptr metronome_ RTC_GUARDED_BY(worker_thread()); }; } // namespace webrtc diff --git a/pc/test/integration_test_helpers.cc b/pc/test/integration_test_helpers.cc index 3201328e84..0ad6cbcfcf 100644 --- a/pc/test/integration_test_helpers.cc +++ b/pc/test/integration_test_helpers.cc @@ -56,41 +56,25 @@ int FindFirstMediaStatsIndexByKind( return -1; } -TaskQueueMetronome::TaskQueueMetronome(TaskQueueFactory* factory, - TimeDelta tick_period) - : tick_period_(tick_period), - queue_(factory->CreateTaskQueue("MetronomeQueue", - TaskQueueFactory::Priority::HIGH)) { - tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] { - MutexLock lock(&mutex_); - for (auto* listener : listeners_) { - listener->OnTickTaskQueue()->PostTask([listener] { listener->OnTick(); }); - } - return tick_period_; - }); -} +TaskQueueMetronome::TaskQueueMetronome(TimeDelta tick_period) + : tick_period_(tick_period) {} -TaskQueueMetronome::~TaskQueueMetronome() { - RTC_DCHECK(listeners_.empty()); - rtc::Event stop_event; - queue_.PostTask([this, &stop_event] { - tick_task_.Stop(); - stop_event.Set(); - }); - stop_event.Wait(TimeDelta::Seconds(1)); -} - -void TaskQueueMetronome::AddListener(TickListener* listener) { - MutexLock lock(&mutex_); - auto [it, inserted] = listeners_.insert(listener); - RTC_DCHECK(inserted); -} - -void TaskQueueMetronome::RemoveListener(TickListener* listener) { - MutexLock lock(&mutex_); - auto it = listeners_.find(listener); - RTC_DCHECK(it != listeners_.end()); - listeners_.erase(it); +void TaskQueueMetronome::RequestCallOnNextTick( + absl::AnyInvocable callback) { + callbacks_.push_back(std::move(callback)); + // Only schedule a tick callback for the first `callback` addition. + // Schedule on the current task queue to comply with RequestCallOnNextTick + // requirements. + if (callbacks_.size() == 1) { + TaskQueueBase::Current()->PostDelayedTask( + [this] { + std::vector> callbacks; + callbacks_.swap(callbacks); + for (auto& callback : callbacks) + std::move(callback)(); + }, + tick_period_); + } } TimeDelta TaskQueueMetronome::TickPeriod() const { diff --git a/pc/test/integration_test_helpers.h b/pc/test/integration_test_helpers.h index 1a1172c8e3..21ddacb7e3 100644 --- a/pc/test/integration_test_helpers.h +++ b/pc/test/integration_test_helpers.h @@ -180,20 +180,15 @@ int FindFirstMediaStatsIndexByKind( class TaskQueueMetronome : public webrtc::Metronome { public: - TaskQueueMetronome(TaskQueueFactory* factory, TimeDelta tick_period); - ~TaskQueueMetronome() override; + explicit TaskQueueMetronome(TimeDelta tick_period); // webrtc::Metronome implementation. - void AddListener(TickListener* listener) override; - void RemoveListener(TickListener* listener) override; + void RequestCallOnNextTick(absl::AnyInvocable callback) override; TimeDelta TickPeriod() const override; private: - Mutex mutex_; const TimeDelta tick_period_; - std::set listeners_ RTC_GUARDED_BY(mutex_); - RepeatingTaskHandle tick_task_; - rtc::TaskQueue queue_; + std::vector> callbacks_; }; class SignalingMessageReceiver { @@ -775,8 +770,8 @@ class PeerConnectionIntegrationWrapper : public webrtc::PeerConnectionObserver, pc_factory_dependencies.task_queue_factory = webrtc::CreateDefaultTaskQueueFactory(); pc_factory_dependencies.trials = std::make_unique(); - pc_factory_dependencies.metronome = std::make_unique( - pc_factory_dependencies.task_queue_factory.get(), TimeDelta::Millis(8)); + pc_factory_dependencies.metronome = + std::make_unique(TimeDelta::Millis(8)); cricket::MediaEngineDependencies media_deps; media_deps.task_queue_factory = pc_factory_dependencies.task_queue_factory.get(); diff --git a/video/decode_synchronizer.cc b/video/decode_synchronizer.cc index a86066800f..7d4da3d47a 100644 --- a/video/decode_synchronizer.cc +++ b/video/decode_synchronizer.cc @@ -106,6 +106,7 @@ DecodeSynchronizer::DecodeSynchronizer(Clock* clock, } DecodeSynchronizer::~DecodeSynchronizer() { + RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(schedulers_.empty()); } @@ -117,7 +118,7 @@ DecodeSynchronizer::CreateSynchronizedFrameScheduler() { // If this is the first `scheduler` added, start listening to the metronome. if (inserted && schedulers_.size() == 1) { RTC_DLOG(LS_VERBOSE) << "Listening to metronome"; - metronome_->AddListener(this); + ScheduleNextTick(); } return std::move(scheduler); @@ -160,12 +161,16 @@ void DecodeSynchronizer::RemoveFrameScheduler( schedulers_.erase(it); // If there are no more schedulers active, stop listening for metronome ticks. if (schedulers_.empty()) { - RTC_DLOG(LS_VERBOSE) << "Not listening to metronome"; - metronome_->RemoveListener(this); expected_next_tick_ = Timestamp::PlusInfinity(); } } +void DecodeSynchronizer::ScheduleNextTick() { + RTC_DCHECK_RUN_ON(worker_queue_); + metronome_->RequestCallOnNextTick( + SafeTask(safety_.flag(), [this] { OnTick(); })); +} + void DecodeSynchronizer::OnTick() { RTC_DCHECK_RUN_ON(worker_queue_); expected_next_tick_ = clock_->CurrentTime() + metronome_->TickPeriod(); @@ -177,10 +182,9 @@ void DecodeSynchronizer::OnTick() { std::move(scheduled_frame).RunFrameReleaseCallback(); } } -} -TaskQueueBase* DecodeSynchronizer::OnTickTaskQueue() { - return worker_queue_; + if (!schedulers_.empty()) + ScheduleNextTick(); } } // namespace webrtc diff --git a/video/decode_synchronizer.h b/video/decode_synchronizer.h index 26e6fdf31d..c6f8efdb29 100644 --- a/video/decode_synchronizer.h +++ b/video/decode_synchronizer.h @@ -53,12 +53,12 @@ namespace webrtc { // // DecodeSynchronizer is single threaded - all method calls must run on the // `worker_queue_`. -class DecodeSynchronizer : private Metronome::TickListener { +class DecodeSynchronizer { public: DecodeSynchronizer(Clock* clock, Metronome* metronome, TaskQueueBase* worker_queue); - ~DecodeSynchronizer() override; + ~DecodeSynchronizer(); DecodeSynchronizer(const DecodeSynchronizer&) = delete; DecodeSynchronizer& operator=(const DecodeSynchronizer&) = delete; @@ -119,9 +119,8 @@ class DecodeSynchronizer : private Metronome::TickListener { void OnFrameScheduled(SynchronizedFrameDecodeScheduler* scheduler); void RemoveFrameScheduler(SynchronizedFrameDecodeScheduler* scheduler); - // Metronome::TickListener implementation. - void OnTick() override; - TaskQueueBase* OnTickTaskQueue() override; + void ScheduleNextTick(); + void OnTick(); Clock* const clock_; TaskQueueBase* const worker_queue_; @@ -130,6 +129,7 @@ class DecodeSynchronizer : private Metronome::TickListener { Timestamp expected_next_tick_ = Timestamp::PlusInfinity(); std::set schedulers_ RTC_GUARDED_BY(worker_queue_); + ScopedTaskSafetyDetached safety_; }; } // namespace webrtc diff --git a/video/decode_synchronizer_unittest.cc b/video/decode_synchronizer_unittest.cc index 81d63029a9..7a0d833812 100644 --- a/video/decode_synchronizer_unittest.cc +++ b/video/decode_synchronizer_unittest.cc @@ -15,6 +15,7 @@ #include #include +#include "absl/functional/any_invocable.h" #include "api/metronome/test/fake_metronome.h" #include "api/units/time_delta.h" #include "test/gmock.h" @@ -25,9 +26,20 @@ using ::testing::_; using ::testing::Eq; +using ::testing::Invoke; +using ::testing::Return; namespace webrtc { +class MockMetronome : public Metronome { + public: + MOCK_METHOD(void, + RequestCallOnNextTick, + (absl::AnyInvocable callback), + (override)); + MOCK_METHOD(TimeDelta, TickPeriod, (), (const override)); +}; + class DecodeSynchronizerTest : public ::testing::Test { public: static constexpr TimeDelta kTickPeriod = TimeDelta::Millis(33); @@ -215,18 +227,26 @@ TEST_F(DecodeSynchronizerTest, FramesNotReleasedAfterStop) { time_controller_.AdvanceTime(TimeDelta::Zero()); } -TEST_F(DecodeSynchronizerTest, MetronomeNotListenedWhenNoStreamsAreActive) { - EXPECT_EQ(0u, metronome_.NumListeners()); - +TEST(DecodeSynchronizerStandaloneTest, + MetronomeNotListenedWhenNoStreamsAreActive) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(4711)); + Clock* clock(time_controller.GetClock()); + MockMetronome metronome; + ON_CALL(metronome, TickPeriod).WillByDefault(Return(TimeDelta::Seconds(1))); + DecodeSynchronizer decode_synchronizer_(clock, &metronome, + time_controller.GetMainThread()); + absl::AnyInvocable callback; + EXPECT_CALL(metronome, RequestCallOnNextTick) + .WillOnce(Invoke([&callback](absl::AnyInvocable cb) { + callback = std::move(cb); + })); auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler(); - EXPECT_EQ(1u, metronome_.NumListeners()); auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler(); - EXPECT_EQ(1u, metronome_.NumListeners()); - scheduler->Stop(); - EXPECT_EQ(1u, metronome_.NumListeners()); scheduler2->Stop(); - EXPECT_EQ(0u, metronome_.NumListeners()); + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + ASSERT_TRUE(callback); + (std::move)(callback)(); } } // namespace webrtc diff --git a/video/video_receive_stream2_unittest.cc b/video/video_receive_stream2_unittest.cc index fe4dde3787..458944aefa 100644 --- a/video/video_receive_stream2_unittest.cc +++ b/video/video_receive_stream2_unittest.cc @@ -196,8 +196,7 @@ class VideoReceiveStream2Test : public ::testing::TestWithParam { config_(&mock_transport_, &mock_h264_decoder_factory_), call_stats_(clock_, time_controller_.GetMainThread()), fake_renderer_(&time_controller_), - fake_metronome_(time_controller_.GetTaskQueueFactory(), - TimeDelta::Millis(16)), + fake_metronome_(TimeDelta::Millis(16)), decode_sync_(clock_, &fake_metronome_, time_controller_.GetMainThread()), @@ -228,7 +227,6 @@ class VideoReceiveStream2Test : public ::testing::TestWithParam { video_receive_stream_->Stop(); video_receive_stream_->UnregisterFromTransport(); } - fake_metronome_.Stop(); time_controller_.AdvanceTime(TimeDelta::Zero()); } diff --git a/video/video_stream_buffer_controller_unittest.cc b/video/video_stream_buffer_controller_unittest.cc index 3e6c352fb1..e7235a2ff1 100644 --- a/video/video_stream_buffer_controller_unittest.cc +++ b/video/video_stream_buffer_controller_unittest.cc @@ -132,8 +132,7 @@ class VideoStreamBufferControllerFixture field_trials_(std::get<1>(GetParam())), time_controller_(kClockStart), clock_(time_controller_.GetClock()), - fake_metronome_(time_controller_.GetTaskQueueFactory(), - TimeDelta::Millis(16)), + fake_metronome_(TimeDelta::Millis(16)), decode_sync_(clock_, &fake_metronome_, time_controller_.GetMainThread()), @@ -163,7 +162,6 @@ class VideoStreamBufferControllerFixture if (buffer_) { buffer_->Stop(); } - fake_metronome_.Stop(); time_controller_.AdvanceTime(TimeDelta::Zero()); }