diff --git a/api/DEPS b/api/DEPS index ce88d34ef4..ff493bf77f 100644 --- a/api/DEPS +++ b/api/DEPS @@ -255,6 +255,13 @@ specific_include_rules = { "+rtc_base/ref_counted_object.h", ], + "fake_metronome\.h": [ + "+rtc_base/synchronization/mutex.h", + "+rtc_base/task_queue.h", + "+rtc_base/task_utils/repeating_task.h", + "+rtc_base/thread_annotations.h", + ], + "mock.*\.h": [ "+test/gmock.h", ], diff --git a/api/metronome/test/BUILD.gn b/api/metronome/test/BUILD.gn new file mode 100644 index 0000000000..d25d5a848a --- /dev/null +++ b/api/metronome/test/BUILD.gn @@ -0,0 +1,30 @@ +# Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +import("../../../webrtc.gni") + +rtc_library("fake_metronome") { + testonly = true + sources = [ + "fake_metronome.cc", + "fake_metronome.h", + ] + deps = [ + "..:metronome", + "../..:priority", + "../..:sequence_checker", + "../../../rtc_base:macromagic", + "../../../rtc_base:rtc_event", + "../../../rtc_base:rtc_task_queue", + "../../../rtc_base/synchronization:mutex", + "../../../rtc_base/task_utils:repeating_task", + "../../../rtc_base/task_utils:to_queued_task", + "../../task_queue", + "../../units:time_delta", + ] +} diff --git a/api/metronome/test/fake_metronome.cc b/api/metronome/test/fake_metronome.cc new file mode 100644 index 0000000000..83b5ea7604 --- /dev/null +++ b/api/metronome/test/fake_metronome.cc @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "api/metronome/test/fake_metronome.h" + +#include "api/priority.h" +#include "api/sequence_checker.h" +#include "api/task_queue/task_queue_factory.h" +#include "api/units/time_delta.h" +#include "rtc_base/event.h" +#include "rtc_base/task_utils/repeating_task.h" +#include "rtc_base/task_utils/to_queued_task.h" + +namespace webrtc::test { + +ForcedTickMetronome::ForcedTickMetronome(TimeDelta tick_period) + : tick_period_(tick_period) {} + +void ForcedTickMetronome::AddListener(TickListener* listener) { + listeners_.insert(listener); +} + +void ForcedTickMetronome::RemoveListener(TickListener* listener) { + listeners_.erase(listener); +} + +TimeDelta ForcedTickMetronome::TickPeriod() const { + return tick_period_; +} + +size_t ForcedTickMetronome::NumListeners() { + return listeners_.size(); +} + +void ForcedTickMetronome::Tick() { + for (auto* listener : listeners_) { + listener->OnTickTaskQueue()->PostTask( + ToQueuedTask([listener] { listener->OnTick(); })); + } +} + +FakeMetronome::FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period) + : tick_period_(tick_period), + queue_(factory->CreateTaskQueue("MetronomeQueue", + TaskQueueFactory::Priority::HIGH)) {} + +FakeMetronome::~FakeMetronome() { + RTC_DCHECK(listeners_.empty()); +} + +void FakeMetronome::AddListener(TickListener* listener) { + MutexLock lock(&mutex_); + listeners_.insert(listener); + if (!started_) { + tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] { + MutexLock lock(&mutex_); + // Stop if empty. + if (listeners_.empty()) + return TimeDelta::PlusInfinity(); + for (auto* listener : listeners_) { + listener->OnTickTaskQueue()->PostTask( + ToQueuedTask([listener] { listener->OnTick(); })); + } + return tick_period_; + }); + started_ = true; + } +} + +void FakeMetronome::RemoveListener(TickListener* listener) { + MutexLock lock(&mutex_); + listeners_.erase(listener); +} + +void FakeMetronome::Stop() { + MutexLock lock(&mutex_); + RTC_DCHECK(listeners_.empty()); + if (started_) + queue_.PostTask([this] { tick_task_.Stop(); }); +} + +TimeDelta FakeMetronome::TickPeriod() const { + return tick_period_; +} + +} // namespace webrtc::test diff --git a/api/metronome/test/fake_metronome.h b/api/metronome/test/fake_metronome.h new file mode 100644 index 0000000000..28a79e06ff --- /dev/null +++ b/api/metronome/test/fake_metronome.h @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef API_METRONOME_TEST_FAKE_METRONOME_H_ +#define API_METRONOME_TEST_FAKE_METRONOME_H_ + +#include +#include + +#include "api/metronome/metronome.h" +#include "api/task_queue/task_queue_base.h" +#include "api/task_queue/task_queue_factory.h" +#include "api/units/time_delta.h" +#include "rtc_base/synchronization/mutex.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" +#include "rtc_base/thread_annotations.h" + +namespace webrtc::test { + +// ForcedTickMetronome is a Metronome that ticks when `Tick()` is invoked. +// The constructor argument `tick_period` returned in `TickPeriod()`. +class ForcedTickMetronome : public Metronome { + public: + explicit ForcedTickMetronome(TimeDelta tick_period); + + // Forces all TickListeners to run `OnTick`. + void Tick(); + size_t NumListeners(); + + // Metronome implementation. + void AddListener(TickListener* listener) override; + void RemoveListener(TickListener* listener) override; + TimeDelta TickPeriod() const override; + + private: + const TimeDelta tick_period_; + std::set listeners_; +}; + +// FakeMetronome is a metronome that ticks based on a repeating task at the +// `tick_period` provided in the constructor. It is designed for use with +// simulated task queues for unit tests. +// +// `Stop()` must be called before destruction, as it cancels the metronome tick +// on the proper task queue. +class FakeMetronome : public Metronome { + public: + FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period); + ~FakeMetronome() override; + + // Metronome implementation. + void AddListener(TickListener* listener) override; + void RemoveListener(TickListener* listener) override; + TimeDelta TickPeriod() const override; + + void Stop(); + + private: + const TimeDelta tick_period_; + RepeatingTaskHandle tick_task_; + bool started_ RTC_GUARDED_BY(mutex_) = false; + std::set listeners_ RTC_GUARDED_BY(mutex_); + Mutex mutex_; + rtc::TaskQueue queue_; +}; + +} // namespace webrtc::test + +#endif // API_METRONOME_TEST_FAKE_METRONOME_H_ diff --git a/video/BUILD.gn b/video/BUILD.gn index b89e5e4f50..1a247f4b50 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -301,10 +301,12 @@ rtc_library("frame_buffer_proxy") { "frame_buffer_proxy.h", ] deps = [ - ":frame_decode_scheduler", + ":decode_synchronizer", ":frame_decode_timing", + ":task_queue_frame_decode_scheduler", ":video_receive_stream_timeout_tracker", "../api:sequence_checker", + "../api/metronome", "../api/task_queue", "../api/video:encoded_frame", "../modules/video_coding", @@ -312,6 +314,7 @@ rtc_library("frame_buffer_proxy") { "../modules/video_coding:frame_helpers", "../modules/video_coding:timing", "../modules/video_coding:video_codec_interface", + "../rtc_base:checks", "../rtc_base:logging", "../rtc_base:macromagic", "../rtc_base:rtc_task_queue", @@ -324,16 +327,27 @@ rtc_library("frame_buffer_proxy") { ] } -rtc_library("frame_decode_scheduler") { +rtc_source_set("frame_decode_scheduler") { + sources = [ "frame_decode_scheduler.h" ] + deps = [ + ":frame_decode_timing", + "../api/units:timestamp", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] +} + +rtc_library("task_queue_frame_decode_scheduler") { sources = [ - "frame_decode_scheduler.cc", - "frame_decode_scheduler.h", + "task_queue_frame_decode_scheduler.cc", + "task_queue_frame_decode_scheduler.h", ] deps = [ + ":frame_decode_scheduler", ":frame_decode_timing", "../api:sequence_checker", "../api/task_queue", "../api/units:timestamp", + "../rtc_base:checks", "../rtc_base/task_utils:pending_task_safety_flag", "../rtc_base/task_utils:to_queued_task", "../system_wrappers", @@ -369,6 +383,26 @@ rtc_library("video_receive_stream_timeout_tracker") { ] } +rtc_library("decode_synchronizer") { + sources = [ + "decode_synchronizer.cc", + "decode_synchronizer.h", + ] + deps = [ + ":frame_decode_scheduler", + ":frame_decode_timing", + "../api:sequence_checker", + "../api/metronome", + "../api/task_queue", + "../api/units:time_delta", + "../api/units:timestamp", + "../rtc_base:checks", + "../rtc_base:logging", + "../rtc_base:macromagic", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] +} + rtc_library("video_stream_encoder_impl") { visibility = [ "*" ] @@ -702,6 +736,7 @@ if (rtc_include_tests) { "buffered_frame_decryptor_unittest.cc", "call_stats2_unittest.cc", "cpu_scaling_tests.cc", + "decode_synchronizer_unittest.cc", "encoder_bitrate_adjuster_unittest.cc", "encoder_overshoot_detector_unittest.cc", "encoder_rtcp_feedback_unittest.cc", @@ -726,7 +761,6 @@ if (rtc_include_tests) { "end_to_end_tests/transport_feedback_tests.cc", "frame_buffer_proxy_unittest.cc", "frame_cadence_adapter_unittest.cc", - "frame_decode_scheduler_unittest.cc", "frame_decode_timing_unittest.cc", "frame_encode_metadata_writer_unittest.cc", "picture_id_tests.cc", @@ -741,6 +775,7 @@ if (rtc_include_tests) { "send_statistics_proxy_unittest.cc", "stats_counter_unittest.cc", "stream_synchronization_unittest.cc", + "task_queue_frame_decode_scheduler_unittest.cc", "video_receive_stream2_unittest.cc", "video_receive_stream_timeout_tracker_unittest.cc", "video_send_stream_impl_unittest.cc", @@ -750,10 +785,12 @@ if (rtc_include_tests) { "video_stream_encoder_unittest.cc", ] deps = [ + ":decode_synchronizer", ":frame_buffer_proxy", ":frame_cadence_adapter", ":frame_decode_scheduler", ":frame_decode_timing", + ":task_queue_frame_decode_scheduler", ":video", ":video_mocks", ":video_receive_stream_timeout_tracker", @@ -777,6 +814,7 @@ if (rtc_include_tests) { "../api:transport_api", "../api/adaptation:resource_adaptation_api", "../api/crypto:options", + "../api/metronome/test:fake_metronome", "../api/rtc_event_log", "../api/task_queue", "../api/task_queue:default_task_queue_factory", @@ -870,6 +908,7 @@ if (rtc_include_tests) { ] absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/functional:bind_front", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", "//third_party/abseil-cpp/absl/types:variant", diff --git a/video/decode_synchronizer.cc b/video/decode_synchronizer.cc new file mode 100644 index 0000000000..9f22c49586 --- /dev/null +++ b/video/decode_synchronizer.cc @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/decode_synchronizer.h" + +#include +#include +#include +#include + +#include "api/sequence_checker.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "video/frame_decode_scheduler.h" +#include "video/frame_decode_timing.h" + +namespace webrtc { + +DecodeSynchronizer::ScheduledFrame::ScheduledFrame( + uint32_t rtp_timestamp, + FrameDecodeTiming::FrameSchedule schedule, + FrameDecodeScheduler::FrameReleaseCallback callback) + : rtp_timestamp_(rtp_timestamp), + schedule_(std::move(schedule)), + callback_(std::move(callback)) {} + +void DecodeSynchronizer::ScheduledFrame::RunFrameReleaseCallback() && { + // Inspiration from Chromium base::OnceCallback. Move `*this` to a local + // before execution to ensure internal state is cleared after callback + // execution. + auto sf = std::move(*this); + sf.callback_(sf.rtp_timestamp_, sf.schedule_.render_time); +} + +Timestamp DecodeSynchronizer::ScheduledFrame::LatestDecodeTime() const { + return schedule_.latest_decode_time; +} + +DecodeSynchronizer::SynchronizedFrameDecodeScheduler:: + SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync) + : sync_(sync) { + RTC_DCHECK(sync_); +} + +DecodeSynchronizer::SynchronizedFrameDecodeScheduler:: + ~SynchronizedFrameDecodeScheduler() { + RTC_DCHECK(!next_frame_); + RTC_DCHECK(stopped_); +} + +absl::optional +DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduledRtpTimestamp() { + return next_frame_.has_value() + ? absl::make_optional(next_frame_->rtp_timestamp()) + : absl::nullopt; +} + +DecodeSynchronizer::ScheduledFrame +DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ReleaseNextFrame() { + RTC_DCHECK(next_frame_); + auto res = std::move(*next_frame_); + next_frame_.reset(); + return res; +} + +Timestamp +DecodeSynchronizer::SynchronizedFrameDecodeScheduler::LatestDecodeTime() { + RTC_DCHECK(next_frame_); + return next_frame_->LatestDecodeTime(); +} + +void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduleFrame( + uint32_t rtp, + FrameDecodeTiming::FrameSchedule schedule, + FrameReleaseCallback cb) { + RTC_DCHECK(!next_frame_) << "Can not schedule two frames at once."; + next_frame_ = ScheduledFrame(rtp, std::move(schedule), std::move(cb)); + sync_->OnFrameScheduled(this); +} + +void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::CancelOutstanding() { + next_frame_.reset(); +} + +void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::Stop() { + CancelOutstanding(); + stopped_ = true; + sync_->RemoveFrameScheduler(this); +} + +DecodeSynchronizer::DecodeSynchronizer(Clock* clock, + Metronome* metronome, + TaskQueueBase* worker_queue) + : clock_(clock), worker_queue_(worker_queue), metronome_(metronome) { + RTC_DCHECK(metronome_); + RTC_DCHECK(worker_queue_); +} + +DecodeSynchronizer::~DecodeSynchronizer() { + RTC_DCHECK(schedulers_.empty()); +} + +std::unique_ptr +DecodeSynchronizer::CreateSynchronizedFrameScheduler() { + RTC_DCHECK_RUN_ON(worker_queue_); + auto scheduler = std::make_unique(this); + auto [it, inserted] = schedulers_.emplace(scheduler.get()); + // If this is the first `scheduler` added, start listening to the metronome. + if (inserted && schedulers_.size() == 1) { + RTC_DLOG(LS_VERBOSE) << "Listening to metronome"; + metronome_->AddListener(this); + } + + return std::move(scheduler); +} + +void DecodeSynchronizer::OnFrameScheduled( + SynchronizedFrameDecodeScheduler* scheduler) { + RTC_DCHECK_RUN_ON(worker_queue_); + RTC_DCHECK(scheduler->ScheduledRtpTimestamp()); + + Timestamp now = clock_->CurrentTime(); + Timestamp next_tick = expected_next_tick_; + // If no tick has registered yet assume it will occur in the tick period. + if (next_tick.IsInfinite()) { + next_tick = now + metronome_->TickPeriod(); + } + + // Release the frame right away if the decode time is too soon. Otherwise + // the stream may fall behind too much. + bool decode_before_next_tick = + scheduler->LatestDecodeTime() < + (next_tick - FrameDecodeTiming::kMaxAllowedFrameDelay); + // Decode immediately if the decode time is in the past. + bool decode_time_in_past = scheduler->LatestDecodeTime() < now; + + if (decode_before_next_tick || decode_time_in_past) { + ScheduledFrame scheduled_frame = scheduler->ReleaseNextFrame(); + std::move(scheduled_frame).RunFrameReleaseCallback(); + } +} + +void DecodeSynchronizer::RemoveFrameScheduler( + SynchronizedFrameDecodeScheduler* scheduler) { + RTC_DCHECK_RUN_ON(worker_queue_); + RTC_DCHECK(scheduler); + auto it = schedulers_.find(scheduler); + if (it == schedulers_.end()) { + return; + } + schedulers_.erase(it); + // If there are no more schedulers active, stop listening for metronome ticks. + if (schedulers_.empty()) { + RTC_DLOG(LS_VERBOSE) << "Not listening to metronome"; + metronome_->RemoveListener(this); + expected_next_tick_ = Timestamp::PlusInfinity(); + } +} + +void DecodeSynchronizer::OnTick() { + RTC_DCHECK_RUN_ON(worker_queue_); + expected_next_tick_ = clock_->CurrentTime() + metronome_->TickPeriod(); + + for (auto* scheduler : schedulers_) { + if (scheduler->ScheduledRtpTimestamp() && + scheduler->LatestDecodeTime() < expected_next_tick_) { + auto scheduled_frame = scheduler->ReleaseNextFrame(); + std::move(scheduled_frame).RunFrameReleaseCallback(); + } + } +} + +TaskQueueBase* DecodeSynchronizer::OnTickTaskQueue() { + return worker_queue_; +} + +} // namespace webrtc diff --git a/video/decode_synchronizer.h b/video/decode_synchronizer.h new file mode 100644 index 0000000000..bcbde4f414 --- /dev/null +++ b/video/decode_synchronizer.h @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef VIDEO_DECODE_SYNCHRONIZER_H_ +#define VIDEO_DECODE_SYNCHRONIZER_H_ + +#include + +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "api/metronome/metronome.h" +#include "api/sequence_checker.h" +#include "api/task_queue/task_queue_base.h" +#include "api/units/timestamp.h" +#include "rtc_base/checks.h" +#include "rtc_base/thread_annotations.h" +#include "video/frame_decode_scheduler.h" +#include "video/frame_decode_timing.h" + +namespace webrtc { + +// DecodeSynchronizer synchronizes the frame scheduling by coalescing decoding +// on the metronome. +// +// A video receive stream can use the DecodeSynchronizer by receiving a +// FrameDecodeScheduler instance with `CreateSynchronizedFrameScheduler()`. +// This instance implements FrameDecodeScheduler and can be used as a normal +// scheduler. This instance is owned by the receive stream, and is borrowed by +// the DecodeSynchronizer. The DecodeSynchronizer will stop borrowing the +// instance when `FrameDecodeScheduler::Stop()` is called, after which the +// scheduler may be destroyed by the receive stream. +// +// When a frame is scheduled for decode by a receive stream using the +// DecodeSynchronizer, it will instead be executed on the metronome during the +// tick interval where `max_decode_time` occurs. For example, if a frame is +// scheduled for decode in 50ms and the tick interval is 20ms, then the frame +// will be released for decoding in 2 ticks. See below for illustation, +// +// In the case where the decode time is in the past, or must occur before the +// next metronome tick then the frame will be released right away, allowing a +// delayed stream to catch up quickly. +// +// DecodeSynchronizer is single threaded - all method calls must run on the +// `worker_queue_`. +class DecodeSynchronizer : private Metronome::TickListener { + public: + DecodeSynchronizer(Clock* clock, + Metronome* metronome, + TaskQueueBase* worker_queue); + ~DecodeSynchronizer() override; + DecodeSynchronizer(const DecodeSynchronizer&) = delete; + DecodeSynchronizer& operator=(const DecodeSynchronizer&) = delete; + + std::unique_ptr CreateSynchronizedFrameScheduler(); + + private: + class ScheduledFrame { + public: + ScheduledFrame(uint32_t rtp_timestamp, + FrameDecodeTiming::FrameSchedule schedule, + FrameDecodeScheduler::FrameReleaseCallback callback); + + // Disallow copy since `callback` should only be moved. + ScheduledFrame(const ScheduledFrame&) = delete; + ScheduledFrame& operator=(const ScheduledFrame&) = delete; + ScheduledFrame(ScheduledFrame&&) = default; + ScheduledFrame& operator=(ScheduledFrame&&) = default; + + // Executes `callback_`. + void RunFrameReleaseCallback() &&; + + uint32_t rtp_timestamp() const { return rtp_timestamp_; } + Timestamp LatestDecodeTime() const; + + private: + uint32_t rtp_timestamp_; + FrameDecodeTiming::FrameSchedule schedule_; + FrameDecodeScheduler::FrameReleaseCallback callback_; + }; + + class SynchronizedFrameDecodeScheduler : public FrameDecodeScheduler { + public: + explicit SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync); + ~SynchronizedFrameDecodeScheduler() override; + + // Releases the outstanding frame for decoding. This invalidates + // `next_frame_`. There must be a frame scheduled. + ScheduledFrame ReleaseNextFrame(); + + // Returns `next_frame_.schedule.max_decode_time`. There must be a frame + // scheduled when this is called. + Timestamp LatestDecodeTime(); + + // FrameDecodeScheduler implementation. + absl::optional ScheduledRtpTimestamp() override; + void ScheduleFrame(uint32_t rtp, + FrameDecodeTiming::FrameSchedule schedule, + FrameReleaseCallback cb) override; + void CancelOutstanding() override; + void Stop() override; + + private: + DecodeSynchronizer* sync_; + absl::optional next_frame_; + bool stopped_ = false; + }; + + void OnFrameScheduled(SynchronizedFrameDecodeScheduler* scheduler); + void RemoveFrameScheduler(SynchronizedFrameDecodeScheduler* scheduler); + + // Metronome::TickListener implementation. + void OnTick() override; + TaskQueueBase* OnTickTaskQueue() override; + + Clock* const clock_; + TaskQueueBase* const worker_queue_; + Metronome* const metronome_; + + Timestamp expected_next_tick_ = Timestamp::PlusInfinity(); + std::set schedulers_ + RTC_GUARDED_BY(worker_queue_); +}; + +} // namespace webrtc + +#endif // VIDEO_DECODE_SYNCHRONIZER_H_ diff --git a/video/decode_synchronizer_unittest.cc b/video/decode_synchronizer_unittest.cc new file mode 100644 index 0000000000..db9540f0ce --- /dev/null +++ b/video/decode_synchronizer_unittest.cc @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/decode_synchronizer.h" + +#include + +#include +#include + +#include "api/metronome/test/fake_metronome.h" +#include "api/units/time_delta.h" +#include "test/gmock.h" +#include "test/gtest.h" +#include "test/run_loop.h" +#include "test/time_controller/simulated_time_controller.h" +#include "video/frame_decode_scheduler.h" +#include "video/frame_decode_timing.h" + +using ::testing::_; +using ::testing::Eq; + +namespace webrtc { + +class DecodeSynchronizerTest : public ::testing::Test { + public: + static constexpr TimeDelta kTickPeriod = TimeDelta::Millis(33); + + DecodeSynchronizerTest() + : time_controller_(Timestamp::Millis(1337)), + clock_(time_controller_.GetClock()), + metronome_(kTickPeriod), + decode_synchronizer_(clock_, &metronome_, run_loop_.task_queue()) {} + + protected: + GlobalSimulatedTimeController time_controller_; + Clock* clock_; + test::RunLoop run_loop_; + test::ForcedTickMetronome metronome_; + DecodeSynchronizer decode_synchronizer_; +}; + +TEST_F(DecodeSynchronizerTest, AllFramesReadyBeforeNextTickDecoded) { + ::testing::MockFunction mock_callback1; + auto scheduler1 = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + + testing::MockFunction mock_callback2; + auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + + { + uint32_t frame_rtp = 90000; + FrameDecodeTiming::FrameSchedule frame_sched{ + .latest_decode_time = + clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(3), + .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)}; + scheduler1->ScheduleFrame(frame_rtp, frame_sched, + mock_callback1.AsStdFunction()); + EXPECT_CALL(mock_callback1, + Call(Eq(frame_rtp), Eq(frame_sched.render_time))); + } + { + uint32_t frame_rtp = 123456; + FrameDecodeTiming::FrameSchedule frame_sched{ + .latest_decode_time = + clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(2), + .render_time = clock_->CurrentTime() + TimeDelta::Millis(70)}; + scheduler2->ScheduleFrame(frame_rtp, frame_sched, + mock_callback2.AsStdFunction()); + EXPECT_CALL(mock_callback2, + Call(Eq(frame_rtp), Eq(frame_sched.render_time))); + } + metronome_.Tick(); + run_loop_.Flush(); + + // Cleanup + scheduler1->Stop(); + scheduler2->Stop(); +} + +TEST_F(DecodeSynchronizerTest, FramesNotDecodedIfDecodeTimeIsInNextInterval) { + ::testing::MockFunction mock_callback; + auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + + uint32_t frame_rtp = 90000; + FrameDecodeTiming::FrameSchedule frame_sched{ + .latest_decode_time = + clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(10), + .render_time = + clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(30)}; + scheduler->ScheduleFrame(frame_rtp, frame_sched, + mock_callback.AsStdFunction()); + + metronome_.Tick(); + run_loop_.Flush(); + // No decodes should have happened in this tick. + ::testing::Mock::VerifyAndClearExpectations(&mock_callback); + + // Decode should happen on next tick. + EXPECT_CALL(mock_callback, Call(Eq(frame_rtp), Eq(frame_sched.render_time))); + time_controller_.AdvanceTime(kTickPeriod); + metronome_.Tick(); + run_loop_.Flush(); + + // Cleanup + scheduler->Stop(); +} + +TEST_F(DecodeSynchronizerTest, FrameDecodedOnce) { + ::testing::MockFunction mock_callback; + auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + + uint32_t frame_rtp = 90000; + FrameDecodeTiming::FrameSchedule frame_sched{ + .latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30), + .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)}; + scheduler->ScheduleFrame(frame_rtp, frame_sched, + mock_callback.AsStdFunction()); + EXPECT_CALL(mock_callback, Call(_, _)).Times(1); + metronome_.Tick(); + run_loop_.Flush(); + ::testing::Mock::VerifyAndClearExpectations(&mock_callback); + + // Trigger tick again. No frame should be decoded now. + time_controller_.AdvanceTime(kTickPeriod); + metronome_.Tick(); + run_loop_.Flush(); + + // Cleanup + scheduler->Stop(); +} + +TEST_F(DecodeSynchronizerTest, FrameWithDecodeTimeInPastDecodedImmediately) { + ::testing::MockFunction mock_callback; + auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + + uint32_t frame_rtp = 90000; + FrameDecodeTiming::FrameSchedule frame_sched{ + .latest_decode_time = clock_->CurrentTime() - TimeDelta::Millis(5), + .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)}; + EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1); + scheduler->ScheduleFrame(frame_rtp, frame_sched, + mock_callback.AsStdFunction()); + // Verify the callback was invoked already. + ::testing::Mock::VerifyAndClearExpectations(&mock_callback); + + metronome_.Tick(); + run_loop_.Flush(); + + // Cleanup + scheduler->Stop(); +} + +TEST_F(DecodeSynchronizerTest, + FrameWithDecodeTimeFarBeforeNextTickDecodedImmediately) { + ::testing::MockFunction mock_callback; + auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + + // Frame which would be behind by more than kMaxAllowedFrameDelay after + // the next tick. + FrameDecodeTiming::FrameSchedule frame_sched{ + .latest_decode_time = clock_->CurrentTime() + kTickPeriod - + FrameDecodeTiming::kMaxAllowedFrameDelay - + TimeDelta::Millis(1), + .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)}; + EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1); + scheduler->ScheduleFrame(90000, frame_sched, mock_callback.AsStdFunction()); + // Verify the callback was invoked already. + ::testing::Mock::VerifyAndClearExpectations(&mock_callback); + + time_controller_.AdvanceTime(kTickPeriod); + metronome_.Tick(); + run_loop_.Flush(); + + // A frame that would be behind by exactly kMaxAllowedFrameDelay after next + // tick should decode at the next tick. + FrameDecodeTiming::FrameSchedule queued_frame{ + .latest_decode_time = clock_->CurrentTime() + kTickPeriod - + FrameDecodeTiming::kMaxAllowedFrameDelay, + .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)}; + scheduler->ScheduleFrame(180000, queued_frame, mock_callback.AsStdFunction()); + // Verify the callback was invoked already. + ::testing::Mock::VerifyAndClearExpectations(&mock_callback); + + EXPECT_CALL(mock_callback, Call(Eq(180000u), _)).Times(1); + time_controller_.AdvanceTime(kTickPeriod); + metronome_.Tick(); + run_loop_.Flush(); + + // Cleanup + scheduler->Stop(); +} + +TEST_F(DecodeSynchronizerTest, FramesNotReleasedAfterStop) { + ::testing::MockFunction mock_callback; + auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + + uint32_t frame_rtp = 90000; + FrameDecodeTiming::FrameSchedule frame_sched{ + .latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30), + .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)}; + scheduler->ScheduleFrame(frame_rtp, frame_sched, + mock_callback.AsStdFunction()); + // Cleanup + scheduler->Stop(); + + // No callback should occur on this tick since Stop() was called before. + metronome_.Tick(); + run_loop_.Flush(); +} + +TEST_F(DecodeSynchronizerTest, MetronomeNotListenedWhenNoStreamsAreActive) { + EXPECT_EQ(0u, metronome_.NumListeners()); + + auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + EXPECT_EQ(1u, metronome_.NumListeners()); + auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler(); + EXPECT_EQ(1u, metronome_.NumListeners()); + + scheduler->Stop(); + EXPECT_EQ(1u, metronome_.NumListeners()); + scheduler2->Stop(); + EXPECT_EQ(0u, metronome_.NumListeners()); +} + +} // namespace webrtc diff --git a/video/frame_buffer_proxy.cc b/video/frame_buffer_proxy.cc index 3182bf967a..d294eebc53 100644 --- a/video/frame_buffer_proxy.cc +++ b/video/frame_buffer_proxy.cc @@ -14,20 +14,24 @@ #include #include +#include "absl/base/attributes.h" #include "absl/functional/bind_front.h" #include "api/sequence_checker.h" #include "modules/video_coding/frame_buffer2.h" #include "modules/video_coding/frame_buffer3.h" #include "modules/video_coding/frame_helpers.h" +#include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/thread_annotations.h" #include "system_wrappers/include/field_trial.h" -#include "video/frame_decode_scheduler.h" #include "video/frame_decode_timing.h" +#include "video/task_queue_frame_decode_scheduler.h" #include "video/video_receive_stream_timeout_tracker.h" namespace webrtc { +namespace { + class FrameBuffer2Proxy : public FrameBufferProxy { public: FrameBuffer2Proxy(Clock* clock, @@ -140,14 +144,16 @@ static constexpr size_t kZeroPlayoutDelayDefaultMaxDecodeQueueSize = 8; // accounting are moved into this pro class FrameBuffer3Proxy : public FrameBufferProxy { public: - FrameBuffer3Proxy(Clock* clock, - TaskQueueBase* worker_queue, - VCMTiming* timing, - VCMReceiveStatisticsCallback* stats_proxy, - rtc::TaskQueue* decode_queue, - FrameSchedulingReceiver* receiver, - TimeDelta max_wait_for_keyframe, - TimeDelta max_wait_for_frame) + FrameBuffer3Proxy( + Clock* clock, + TaskQueueBase* worker_queue, + VCMTiming* timing, + VCMReceiveStatisticsCallback* stats_proxy, + rtc::TaskQueue* decode_queue, + FrameSchedulingReceiver* receiver, + TimeDelta max_wait_for_keyframe, + TimeDelta max_wait_for_frame, + std::unique_ptr frame_decode_scheduler) : max_wait_for_keyframe_(max_wait_for_keyframe), max_wait_for_frame_(max_wait_for_frame), clock_(clock), @@ -156,15 +162,11 @@ class FrameBuffer3Proxy : public FrameBufferProxy { stats_proxy_(stats_proxy), receiver_(receiver), timing_(timing), + frame_decode_scheduler_(std::move(frame_decode_scheduler)), jitter_estimator_(clock_), inter_frame_delay_(clock_->TimeInMilliseconds()), buffer_(std::make_unique(kMaxFramesBuffered, kMaxFramesHistory)), - frame_decode_scheduler_( - clock_, - worker_queue, - absl::bind_front(&FrameBuffer3Proxy::OnFrameReadyForExtraction, - this)), decode_timing_(clock_, timing_), timeout_tracker_(clock_, worker_queue_, @@ -181,6 +183,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy { RTC_DCHECK(timing_); RTC_DCHECK(worker_queue_); RTC_DCHECK(clock_); + RTC_DCHECK(frame_decode_scheduler_); RTC_LOG(LS_WARNING) << "Using FrameBuffer3"; ParseFieldTrial({&zero_playout_delay_max_decode_queue_size_}, @@ -190,8 +193,8 @@ class FrameBuffer3Proxy : public FrameBufferProxy { // FrameBufferProxy implementation. void StopOnWorker() override { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + frame_decode_scheduler_->Stop(); timeout_tracker_.Stop(); - frame_decode_scheduler_.CancelOutstanding(); decoder_ready_for_new_frame_ = false; decode_queue_->PostTask([this] { RTC_DCHECK_RUN_ON(decode_queue_); @@ -209,7 +212,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy { stats_proxy_->OnDroppedFrames(buffer_->CurrentSize()); buffer_ = std::make_unique(kMaxFramesBuffered, kMaxFramesHistory); - frame_decode_scheduler_.CancelOutstanding(); + frame_decode_scheduler_->CancelOutstanding(); } absl::optional InsertFrame( @@ -342,8 +345,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy { } private: - void OnFrameReadyForExtraction(uint32_t rtp_timestamp, - Timestamp render_time) { + void FrameReadyForDecode(uint32_t rtp_timestamp, Timestamp render_time) { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); RTC_DCHECK(buffer_->NextDecodableTemporalUnitRtpTimestamp() == rtp_timestamp) @@ -430,7 +432,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy { auto last_rtp = *buffer_->LastDecodableTemporalUnitRtpTimestamp(); // If already scheduled then abort. - if (frame_decode_scheduler_.scheduled_rtp() == + if (frame_decode_scheduler_->ScheduledRtpTimestamp() == buffer_->NextDecodableTemporalUnitRtpTimestamp()) return; @@ -441,9 +443,11 @@ class FrameBuffer3Proxy : public FrameBufferProxy { IsTooManyFramesQueued()); if (schedule) { // Don't schedule if already waiting for the same frame. - if (frame_decode_scheduler_.scheduled_rtp() != next_rtp) { - frame_decode_scheduler_.CancelOutstanding(); - frame_decode_scheduler_.ScheduleFrame(next_rtp, *schedule); + if (frame_decode_scheduler_->ScheduledRtpTimestamp() != next_rtp) { + frame_decode_scheduler_->CancelOutstanding(); + frame_decode_scheduler_->ScheduleFrame( + next_rtp, *schedule, + absl::bind_front(&FrameBuffer3Proxy::FrameReadyForDecode, this)); } return; } @@ -463,6 +467,8 @@ class FrameBuffer3Proxy : public FrameBufferProxy { VCMReceiveStatisticsCallback* const stats_proxy_; FrameSchedulingReceiver* const receiver_; VCMTiming* const timing_; + const std::unique_ptr frame_decode_scheduler_ + RTC_GUARDED_BY(&worker_sequence_checker_); VCMJitterEstimator jitter_estimator_ RTC_GUARDED_BY(&worker_sequence_checker_); @@ -471,8 +477,6 @@ class FrameBuffer3Proxy : public FrameBufferProxy { bool keyframe_required_ RTC_GUARDED_BY(&worker_sequence_checker_) = false; std::unique_ptr buffer_ RTC_GUARDED_BY(&worker_sequence_checker_); - FrameDecodeScheduler frame_decode_scheduler_ - RTC_GUARDED_BY(&worker_sequence_checker_); FrameDecodeTiming decode_timing_ RTC_GUARDED_BY(&worker_sequence_checker_); VideoReceiveStreamTimeoutTracker timeout_tracker_ RTC_GUARDED_BY(&worker_sequence_checker_); @@ -500,6 +504,28 @@ class FrameBuffer3Proxy : public FrameBufferProxy { ScopedTaskSafety worker_safety_; }; +enum class FrameBufferArm { + kFrameBuffer2, + kFrameBuffer3, + kSyncDecode, +}; + +constexpr const char* kFrameBufferFieldTrial = "WebRTC-FrameBuffer3"; + +FrameBufferArm ParseFrameBufferFieldTrial() { + webrtc::FieldTrialEnum arm( + "arm", FrameBufferArm::kFrameBuffer2, + { + {"FrameBuffer2", FrameBufferArm::kFrameBuffer2}, + {"FrameBuffer3", FrameBufferArm::kFrameBuffer3}, + {"SyncDecoding", FrameBufferArm::kSyncDecode}, + }); + ParseFieldTrial({&arm}, field_trial::FindFullName(kFrameBufferFieldTrial)); + return arm.Get(); +} + +} // namespace + std::unique_ptr FrameBufferProxy::CreateFromFieldTrial( Clock* clock, TaskQueueBase* worker_queue, @@ -508,14 +534,39 @@ std::unique_ptr FrameBufferProxy::CreateFromFieldTrial( rtc::TaskQueue* decode_queue, FrameSchedulingReceiver* receiver, TimeDelta max_wait_for_keyframe, - TimeDelta max_wait_for_frame) { - if (field_trial::IsEnabled("WebRTC-FrameBuffer3")) - return std::make_unique( - clock, worker_queue, timing, stats_proxy, decode_queue, receiver, - max_wait_for_keyframe, max_wait_for_frame); - return std::make_unique( - clock, timing, stats_proxy, decode_queue, receiver, max_wait_for_keyframe, - max_wait_for_frame); + TimeDelta max_wait_for_frame, + DecodeSynchronizer* decode_sync) { + switch (ParseFrameBufferFieldTrial()) { + case FrameBufferArm::kFrameBuffer3: { + auto scheduler = + std::make_unique(clock, worker_queue); + return std::make_unique( + clock, worker_queue, timing, stats_proxy, decode_queue, receiver, + max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler)); + } + case FrameBufferArm::kSyncDecode: { + std::unique_ptr scheduler; + if (decode_sync) { + scheduler = decode_sync->CreateSynchronizedFrameScheduler(); + } else { + RTC_LOG(LS_ERROR) << "In FrameBuffer with sync decode trial, but " + "no DecodeSynchronizer was present!"; + // Crash in debug, but in production use the task queue scheduler. + RTC_DCHECK_NOTREACHED(); + scheduler = std::make_unique( + clock, worker_queue); + } + return std::make_unique( + clock, worker_queue, timing, stats_proxy, decode_queue, receiver, + max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler)); + } + case FrameBufferArm::kFrameBuffer2: + ABSL_FALLTHROUGH_INTENDED; + default: + return std::make_unique( + clock, timing, stats_proxy, decode_queue, receiver, + max_wait_for_keyframe, max_wait_for_frame); + } } } // namespace webrtc diff --git a/video/frame_buffer_proxy.h b/video/frame_buffer_proxy.h index cab0bd20a3..b419aedb76 100644 --- a/video/frame_buffer_proxy.h +++ b/video/frame_buffer_proxy.h @@ -13,12 +13,15 @@ #include +#include "api/metronome/metronome.h" #include "api/task_queue/task_queue_base.h" #include "api/video/encoded_frame.h" #include "modules/video_coding/include/video_coding_defines.h" #include "modules/video_coding/timing.h" #include "rtc_base/task_queue.h" #include "system_wrappers/include/clock.h" +#include "video/decode_synchronizer.h" + namespace webrtc { class FrameSchedulingReceiver { @@ -43,7 +46,8 @@ class FrameBufferProxy { rtc::TaskQueue* decode_queue, FrameSchedulingReceiver* receiver, TimeDelta max_wait_for_keyframe, - TimeDelta max_wait_for_frame); + TimeDelta max_wait_for_frame, + DecodeSynchronizer* decode_sync); virtual ~FrameBufferProxy() = default; // Run on the worker thread. diff --git a/video/frame_buffer_proxy_unittest.cc b/video/frame_buffer_proxy_unittest.cc index 408fac9bfb..fc266d28bc 100644 --- a/video/frame_buffer_proxy_unittest.cc +++ b/video/frame_buffer_proxy_unittest.cc @@ -20,6 +20,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" +#include "api/metronome/test/fake_metronome.h" #include "api/units/frequency.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" @@ -32,6 +33,7 @@ #include "test/gtest.h" #include "test/run_loop.h" #include "test/time_controller/simulated_time_controller.h" +#include "video/decode_synchronizer.h" using ::testing::_; using ::testing::AllOf; @@ -209,6 +211,9 @@ class FrameBufferProxyTest : public ::testing::TestWithParam, decode_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue( "decode_queue", TaskQueueFactory::Priority::NORMAL)), + fake_metronome_(time_controller_.GetTaskQueueFactory(), + TimeDelta::Millis(16)), + decode_sync_(clock_, &fake_metronome_, run_loop_.task_queue()), timing_(clock_), proxy_(FrameBufferProxy::CreateFromFieldTrial(clock_, run_loop_.task_queue(), @@ -217,7 +222,8 @@ class FrameBufferProxyTest : public ::testing::TestWithParam, &decode_queue_, this, kMaxWaitForKeyframe, - kMaxWaitForFrame)) { + kMaxWaitForFrame, + &decode_sync_)) { // Avoid starting with negative render times. timing_.set_min_playout_delay(10); @@ -230,6 +236,8 @@ class FrameBufferProxyTest : public ::testing::TestWithParam, if (proxy_) { proxy_->StopOnWorker(); } + fake_metronome_.Stop(); + time_controller_.AdvanceTime(TimeDelta::Zero()); } void OnEncodedFrame(std::unique_ptr frame) override { @@ -291,7 +299,10 @@ class FrameBufferProxyTest : public ::testing::TestWithParam, Clock* const clock_; test::RunLoop run_loop_; rtc::TaskQueue decode_queue_; + test::FakeMetronome fake_metronome_; + DecodeSynchronizer decode_sync_; VCMTiming timing_; + ::testing::NiceMock stats_callback_; std::unique_ptr proxy_; @@ -544,7 +555,7 @@ TEST_P(FrameBufferProxyTest, NewFrameInsertedWhileWaitingToReleaseFrame) { proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); EXPECT_THAT(WaitForFrameOrTimeout(TimeDelta::Zero()), Frame(WithId(0))); - time_controller_.AdvanceTime(kFps30Delay); + time_controller_.AdvanceTime(kFps30Delay / 2); proxy_->InsertFrame( Builder().Id(1).Time(kFps30Rtp).Refs({0}).AsLast().Build()); StartNextDecode(); @@ -598,6 +609,8 @@ TEST_P(FrameBufferProxyTest, SameFrameNotScheduledTwice) { StartNextDecode(); EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(33))); + StartNextDecode(); + EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut()); EXPECT_EQ(dropped_frames(), 1); } @@ -617,6 +630,10 @@ TEST_P(FrameBufferProxyTest, TestStatsCallback) { time_controller_.AdvanceTime(TimeDelta::Zero()); } +// Note: This test takes a long time to run if the fake metronome is active. +// Since the test needs to wait for the timestamp to rollover, it has a fake +// delay of around 6.5 hours. Even though time is simulated, this will be +// around 1,500,000 metronome tick invocations. TEST_P(FrameBufferProxyTest, NextFrameWithOldTimestamp) { // Test inserting 31 frames and pause the stream for a long time before // frame 32. @@ -668,16 +685,19 @@ TEST_P(FrameBufferProxyTest, NextFrameWithOldTimestamp) { .AsLast() .Build()); // FrameBuffer2 drops the frame, while FrameBuffer3 will continue the stream. - if (field_trial::IsEnabled("WebRTC-FrameBuffer3")) { + if (field_trial::FindFullName("WebRTC-FrameBuffer3") + .find("arm:FrameBuffer2") == std::string::npos) { EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(2))); } else { EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut()); } } -INSTANTIATE_TEST_SUITE_P(FrameBufferProxy, - FrameBufferProxyTest, - ::testing::Values("WebRTC-FrameBuffer3/Disabled/", - "WebRTC-FrameBuffer3/Enabled/")); +INSTANTIATE_TEST_SUITE_P( + FrameBufferProxy, + FrameBufferProxyTest, + ::testing::Values("WebRTC-FrameBuffer3/arm:FrameBuffer2/", + "WebRTC-FrameBuffer3/arm:FrameBuffer3/", + "WebRTC-FrameBuffer3/arm:SyncDecoding/")); } // namespace webrtc diff --git a/video/frame_decode_scheduler.h b/video/frame_decode_scheduler.h index 6e1c1bdc53..5387e54293 100644 --- a/video/frame_decode_scheduler.h +++ b/video/frame_decode_scheduler.h @@ -16,10 +16,7 @@ #include #include "absl/types/optional.h" -#include "api/task_queue/task_queue_base.h" #include "api/units/timestamp.h" -#include "rtc_base/task_utils/pending_task_safety_flag.h" -#include "system_wrappers/include/clock.h" #include "video/frame_decode_timing.h" namespace webrtc { @@ -30,25 +27,23 @@ class FrameDecodeScheduler { using FrameReleaseCallback = std::function; - FrameDecodeScheduler(Clock* clock, - TaskQueueBase* const bookkeeping_queue, - FrameReleaseCallback callback); - ~FrameDecodeScheduler(); - FrameDecodeScheduler(const FrameDecodeScheduler&) = delete; - FrameDecodeScheduler& operator=(const FrameDecodeScheduler&) = delete; + virtual ~FrameDecodeScheduler() = default; - absl::optional scheduled_rtp() const { return scheduled_rtp_; } + // Returns the rtp timestamp of the next frame scheduled for release, or + // `nullopt` if no frame is currently scheduled. + virtual absl::optional ScheduledRtpTimestamp() = 0; - void ScheduleFrame(uint32_t rtp, FrameDecodeTiming::FrameSchedule schedule); - void CancelOutstanding(); + // Shedules a frame for release based on `schedule`. When released, `callback` + // will be invoked with the `rtp` timestamp of the frame and the `render_time` + virtual void ScheduleFrame(uint32_t rtp, + FrameDecodeTiming::FrameSchedule schedule, + FrameReleaseCallback callback) = 0; - private: - Clock* const clock_; - TaskQueueBase* const bookkeeping_queue_; - const FrameReleaseCallback callback_; + // Cancels all scheduled frames. + virtual void CancelOutstanding() = 0; - absl::optional scheduled_rtp_; - ScopedTaskSafetyDetached task_safety_; + // Stop() Must be called before destruction. + virtual void Stop() = 0; }; } // namespace webrtc diff --git a/video/frame_decode_timing.cc b/video/frame_decode_timing.cc index 7150bbc82a..02567baa90 100644 --- a/video/frame_decode_timing.cc +++ b/video/frame_decode_timing.cc @@ -15,12 +15,6 @@ namespace webrtc { -namespace { - -constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5); - -} - FrameDecodeTiming::FrameDecodeTiming(Clock* clock, webrtc::VCMTiming const* timing) : clock_(clock), timing_(timing) { @@ -51,7 +45,7 @@ FrameDecodeTiming::OnFrameBufferUpdated(uint32_t next_temporal_unit_rtp, RTC_DLOG(LS_VERBOSE) << "Selected frame with rtp " << next_temporal_unit_rtp << " render time " << render_time.ms() << " with a max wait of " << max_wait.ms() << "ms"; - return FrameSchedule{.max_decode_time = now + max_wait, + return FrameSchedule{.latest_decode_time = now + max_wait, .render_time = render_time}; } diff --git a/video/frame_decode_timing.h b/video/frame_decode_timing.h index 8c7353e8d3..ff67ace3b2 100644 --- a/video/frame_decode_timing.h +++ b/video/frame_decode_timing.h @@ -29,8 +29,12 @@ class FrameDecodeTiming { FrameDecodeTiming(const FrameDecodeTiming&) = delete; FrameDecodeTiming& operator=(const FrameDecodeTiming&) = delete; + // Any frame that has decode delay more than this in the past can be + // fast-forwarded. + static constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5); + struct FrameSchedule { - Timestamp max_decode_time; + Timestamp latest_decode_time; Timestamp render_time; }; diff --git a/video/frame_decode_timing_unittest.cc b/video/frame_decode_timing_unittest.cc index ec77ed408d..1932e85246 100644 --- a/video/frame_decode_timing_unittest.cc +++ b/video/frame_decode_timing_unittest.cc @@ -81,10 +81,11 @@ TEST_F(FrameDecodeTimingTest, ReturnsWaitTimesWhenValid) { EXPECT_THAT( frame_decode_scheduler_.OnFrameBufferUpdated(90000, 180000, false), - Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time, - Eq(clock_.CurrentTime() + decode_delay)), - Field(&FrameDecodeTiming::FrameSchedule::render_time, - Eq(render_time))))); + Optional( + AllOf(Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time, + Eq(clock_.CurrentTime() + decode_delay)), + Field(&FrameDecodeTiming::FrameSchedule::render_time, + Eq(render_time))))); } TEST_F(FrameDecodeTimingTest, FastForwardsFrameTooFarInThePast) { @@ -102,12 +103,12 @@ TEST_F(FrameDecodeTimingTest, NoFastForwardIfOnlyFrameToDecode) { const Timestamp render_time = clock_.CurrentTime(); timing_.SetTimes(90000, render_time, decode_delay); - EXPECT_THAT( - frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false), - Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time, - Eq(clock_.CurrentTime() + decode_delay)), - Field(&FrameDecodeTiming::FrameSchedule::render_time, - Eq(render_time))))); + EXPECT_THAT(frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false), + Optional(AllOf( + Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time, + Eq(clock_.CurrentTime() + decode_delay)), + Field(&FrameDecodeTiming::FrameSchedule::render_time, + Eq(render_time))))); } } // namespace webrtc diff --git a/video/frame_decode_scheduler.cc b/video/task_queue_frame_decode_scheduler.cc similarity index 59% rename from video/frame_decode_scheduler.cc rename to video/task_queue_frame_decode_scheduler.cc index 5696e10e65..72de3c3ec9 100644 --- a/video/frame_decode_scheduler.cc +++ b/video/task_queue_frame_decode_scheduler.cc @@ -8,57 +8,69 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "video/frame_decode_scheduler.h" +#include "video/task_queue_frame_decode_scheduler.h" #include #include #include "api/sequence_checker.h" +#include "rtc_base/checks.h" #include "rtc_base/task_utils/to_queued_task.h" namespace webrtc { -FrameDecodeScheduler::FrameDecodeScheduler( +TaskQueueFrameDecodeScheduler::TaskQueueFrameDecodeScheduler( Clock* clock, - TaskQueueBase* const bookkeeping_queue, - FrameReleaseCallback callback) - : clock_(clock), - bookkeeping_queue_(bookkeeping_queue), - callback_(std::move(callback)) { + TaskQueueBase* const bookkeeping_queue) + : clock_(clock), bookkeeping_queue_(bookkeeping_queue) { RTC_DCHECK(clock_); RTC_DCHECK(bookkeeping_queue_); } -FrameDecodeScheduler::~FrameDecodeScheduler() { +TaskQueueFrameDecodeScheduler::~TaskQueueFrameDecodeScheduler() { + RTC_DCHECK(stopped_); RTC_DCHECK(!scheduled_rtp_) << "Outstanding scheduled rtp=" << *scheduled_rtp_ << ". Call CancelOutstanding before destruction."; } -void FrameDecodeScheduler::ScheduleFrame( +void TaskQueueFrameDecodeScheduler::ScheduleFrame( uint32_t rtp, - FrameDecodeTiming::FrameSchedule schedule) { + FrameDecodeTiming::FrameSchedule schedule, + FrameReleaseCallback cb) { + RTC_DCHECK(!stopped_) << "Can not schedule frames after stopped."; RTC_DCHECK(!scheduled_rtp_.has_value()) << "Can not schedule two frames for release at the same time."; + RTC_DCHECK(cb); scheduled_rtp_ = rtp; - TimeDelta wait = std::max(TimeDelta::Zero(), - schedule.max_decode_time - clock_->CurrentTime()); + TimeDelta wait = std::max( + TimeDelta::Zero(), schedule.latest_decode_time - clock_->CurrentTime()); bookkeeping_queue_->PostDelayedTask( ToQueuedTask(task_safety_.flag(), - [this, rtp, schedule] { + [this, rtp, schedule, cb = std::move(cb)] { RTC_DCHECK_RUN_ON(bookkeeping_queue_); // If the next frame rtp has changed since this task was // this scheduled release should be skipped. if (scheduled_rtp_ != rtp) return; scheduled_rtp_ = absl::nullopt; - callback_(rtp, schedule.render_time); + cb(rtp, schedule.render_time); }), wait.ms()); } -void FrameDecodeScheduler::CancelOutstanding() { +void TaskQueueFrameDecodeScheduler::CancelOutstanding() { scheduled_rtp_ = absl::nullopt; } +absl::optional +TaskQueueFrameDecodeScheduler::ScheduledRtpTimestamp() { + return scheduled_rtp_; +} + +void TaskQueueFrameDecodeScheduler::Stop() { + CancelOutstanding(); + stopped_ = true; +} + } // namespace webrtc diff --git a/video/task_queue_frame_decode_scheduler.h b/video/task_queue_frame_decode_scheduler.h new file mode 100644 index 0000000000..69c6dae63d --- /dev/null +++ b/video/task_queue_frame_decode_scheduler.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_ +#define VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_ + +#include "video/frame_decode_scheduler.h" + +namespace webrtc { + +// An implementation of FrameDecodeScheduler that is based on TaskQueues. This +// is the default implementation for general use. +class TaskQueueFrameDecodeScheduler : public FrameDecodeScheduler { + public: + TaskQueueFrameDecodeScheduler(Clock* clock, + TaskQueueBase* const bookkeeping_queue); + ~TaskQueueFrameDecodeScheduler() override; + TaskQueueFrameDecodeScheduler(const TaskQueueFrameDecodeScheduler&) = delete; + TaskQueueFrameDecodeScheduler& operator=( + const TaskQueueFrameDecodeScheduler&) = delete; + + // FrameDecodeScheduler implementation. + absl::optional ScheduledRtpTimestamp() override; + void ScheduleFrame(uint32_t rtp, + FrameDecodeTiming::FrameSchedule schedule, + FrameReleaseCallback cb) override; + void CancelOutstanding() override; + void Stop() override; + + private: + Clock* const clock_; + TaskQueueBase* const bookkeeping_queue_; + + absl::optional scheduled_rtp_; + ScopedTaskSafetyDetached task_safety_; + bool stopped_ = false; +}; + +} // namespace webrtc + +#endif // VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_ diff --git a/video/frame_decode_scheduler_unittest.cc b/video/task_queue_frame_decode_scheduler_unittest.cc similarity index 59% rename from video/frame_decode_scheduler_unittest.cc rename to video/task_queue_frame_decode_scheduler_unittest.cc index e30863c5ba..2807e654bf 100644 --- a/video/frame_decode_scheduler_unittest.cc +++ b/video/task_queue_frame_decode_scheduler_unittest.cc @@ -8,13 +8,14 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "video/frame_decode_scheduler.h" +#include "video/task_queue_frame_decode_scheduler.h" #include #include #include +#include "absl/functional/bind_front.h" #include "absl/types/optional.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" @@ -28,29 +29,31 @@ namespace webrtc { using ::testing::Eq; using ::testing::Optional; -class FrameDecodeSchedulerTest : public ::testing::Test { +class TaskQueueFrameDecodeSchedulerTest : public ::testing::Test { public: - FrameDecodeSchedulerTest() + TaskQueueFrameDecodeSchedulerTest() : time_controller_(Timestamp::Millis(2000)), task_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue( "scheduler", TaskQueueFactory::Priority::NORMAL)), - scheduler_(std::make_unique( + scheduler_(std::make_unique( time_controller_.GetClock(), - task_queue_.Get(), - [this](uint32_t rtp, Timestamp render_time) { - OnFrame(rtp, render_time); - })) {} + task_queue_.Get())) {} - ~FrameDecodeSchedulerTest() override { + ~TaskQueueFrameDecodeSchedulerTest() override { if (scheduler_) { OnQueue([&] { - scheduler_->CancelOutstanding(); + scheduler_->Stop(); scheduler_ = nullptr; }); } } + void FrameReadyForDecode(uint32_t timestamp, Timestamp render_time) { + last_rtp_ = timestamp; + last_render_time_ = render_time; + } + protected: template void OnQueue(Task&& t) { @@ -63,23 +66,21 @@ class FrameDecodeSchedulerTest : public ::testing::Test { std::unique_ptr scheduler_; absl::optional last_rtp_; absl::optional last_render_time_; - - private: - void OnFrame(uint32_t timestamp, Timestamp render_time) { - last_rtp_ = timestamp; - last_render_time_ = render_time; - } }; -TEST_F(FrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) { +TEST_F(TaskQueueFrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) { constexpr TimeDelta decode_delay = TimeDelta::Millis(5); const Timestamp now = time_controller_.GetClock()->CurrentTime(); const uint32_t rtp = 90000; const Timestamp render_time = now + TimeDelta::Millis(15); + FrameDecodeTiming::FrameSchedule schedule = { + .latest_decode_time = now + decode_delay, .render_time = render_time}; OnQueue([&] { - scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay, - .render_time = render_time}); - EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp)); + scheduler_->ScheduleFrame( + rtp, schedule, + absl::bind_front( + &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this)); + EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp)); }); EXPECT_THAT(last_rtp_, Eq(absl::nullopt)); @@ -88,35 +89,43 @@ TEST_F(FrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) { EXPECT_THAT(last_render_time_, Optional(render_time)); } -TEST_F(FrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) { +TEST_F(TaskQueueFrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) { constexpr TimeDelta decode_delay = TimeDelta::Millis(-5); const Timestamp now = time_controller_.GetClock()->CurrentTime(); const uint32_t rtp = 90000; const Timestamp render_time = now + TimeDelta::Millis(15); + FrameDecodeTiming::FrameSchedule schedule = { + .latest_decode_time = now + decode_delay, .render_time = render_time}; OnQueue([&] { - scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay, - .render_time = render_time}); - EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp)); + scheduler_->ScheduleFrame( + rtp, schedule, + absl::bind_front( + &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this)); + EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp)); }); EXPECT_THAT(last_rtp_, Optional(rtp)); EXPECT_THAT(last_render_time_, Optional(render_time)); } -TEST_F(FrameDecodeSchedulerTest, CancelOutstanding) { +TEST_F(TaskQueueFrameDecodeSchedulerTest, CancelOutstanding) { constexpr TimeDelta decode_delay = TimeDelta::Millis(50); const Timestamp now = time_controller_.GetClock()->CurrentTime(); const uint32_t rtp = 90000; + FrameDecodeTiming::FrameSchedule schedule = { + .latest_decode_time = now + decode_delay, + .render_time = now + TimeDelta::Millis(75)}; OnQueue([&] { - scheduler_->ScheduleFrame(rtp, - {.max_decode_time = now + decode_delay, - .render_time = now + TimeDelta::Millis(75)}); - EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp)); + scheduler_->ScheduleFrame( + rtp, schedule, + absl::bind_front( + &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this)); + EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp)); }); time_controller_.AdvanceTime(decode_delay / 2); OnQueue([&] { - EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp)); + EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp)); scheduler_->CancelOutstanding(); - EXPECT_THAT(scheduler_->scheduled_rtp(), Eq(absl::nullopt)); + EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Eq(absl::nullopt)); }); time_controller_.AdvanceTime(decode_delay / 2); EXPECT_THAT(last_rtp_, Eq(absl::nullopt)); diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index 918413fee1..ef9692e61e 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -272,7 +272,7 @@ VideoReceiveStream2::VideoReceiveStream2( frame_buffer_ = FrameBufferProxy::CreateFromFieldTrial( clock_, call_->worker_thread(), timing_.get(), &stats_proxy_, &decode_queue_, this, TimeDelta::Millis(max_wait_for_keyframe_ms_), - TimeDelta::Millis(max_wait_for_frame_ms_)); + TimeDelta::Millis(max_wait_for_frame_ms_), nullptr); if (config_.rtp.rtx_ssrc) { rtx_receive_stream_ = std::make_unique(