From 9d29026216e1e736022d10a901408ebe1d8c2fc0 Mon Sep 17 00:00:00 2001 From: Evan Shrubsole Date: Wed, 15 Dec 2021 14:33:40 +0100 Subject: [PATCH] Add FrameBufferProxy in prep for FrameBuffer3 This is a delegate that is used by video_receive_stream2 to handle frame buffer tasks like threading, and stats. This will be used in a follow up to use FrameBuffer3 as a strategy selected by field trial. Unit-tests will be used in follow-up CLs containing Frame Buffer 3, and are expected to work with both Frame buffer proxy versions. Change-Id: I524279343d60a348d044d9085d618f12d7bf3a23 Bug: webrtc:13343 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/241605 Reviewed-by: Ilya Nikolaevskiy Reviewed-by: Philip Eliasson Commit-Queue: Evan Shrubsole Cr-Commit-Position: refs/heads/main@{#35803} --- modules/video_coding/BUILD.gn | 1 + video/BUILD.gn | 20 + video/frame_buffer_proxy.cc | 132 ++++++ video/frame_buffer_proxy.h | 64 +++ video/frame_buffer_proxy_unittest.cc | 638 +++++++++++++++++++++++++++ video/video_receive_stream2.cc | 77 ++-- video/video_receive_stream2.h | 10 +- 7 files changed, 908 insertions(+), 34 deletions(-) create mode 100644 video/frame_buffer_proxy.cc create mode 100644 video/frame_buffer_proxy.h create mode 100644 video/frame_buffer_proxy_unittest.cc diff --git a/modules/video_coding/BUILD.gn b/modules/video_coding/BUILD.gn index acec4570a3..08d613b55e 100644 --- a/modules/video_coding/BUILD.gn +++ b/modules/video_coding/BUILD.gn @@ -253,6 +253,7 @@ rtc_library("video_coding") { "../../api:rtp_packet_info", "../../api:scoped_refptr", "../../api:sequence_checker", + "../../api/task_queue", "../../api/units:data_rate", "../../api/units:time_delta", "../../api/units:timestamp", diff --git a/video/BUILD.gn b/video/BUILD.gn index 92f5dd4b35..92531fe11f 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -53,6 +53,7 @@ rtc_library("video") { ] deps = [ + ":frame_buffer_proxy", ":frame_cadence_adapter", ":frame_dumping_decoder", ":video_stream_encoder_impl", @@ -285,6 +286,22 @@ rtc_library("frame_cadence_adapter") { ] } +rtc_library("frame_buffer_proxy") { + sources = [ + "frame_buffer_proxy.cc", + "frame_buffer_proxy.h", + ] + deps = [ + "../api/task_queue", + "../api/video:encoded_frame", + "../modules/video_coding", + "../modules/video_coding:video_codec_interface", + "../rtc_base:rtc_task_queue", + "../system_wrappers", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] +} + rtc_library("video_stream_encoder_impl") { visibility = [ "*" ] @@ -640,6 +657,7 @@ if (rtc_include_tests) { "end_to_end_tests/ssrc_tests.cc", "end_to_end_tests/stats_tests.cc", "end_to_end_tests/transport_feedback_tests.cc", + "frame_buffer_proxy_unittest.cc", "frame_cadence_adapter_unittest.cc", "frame_encode_metadata_writer_unittest.cc", "picture_id_tests.cc", @@ -662,6 +680,7 @@ if (rtc_include_tests) { "video_stream_encoder_unittest.cc", ] deps = [ + ":frame_buffer_proxy", ":frame_cadence_adapter", ":video", ":video_mocks", @@ -690,6 +709,7 @@ if (rtc_include_tests) { "../api/task_queue:default_task_queue_factory", "../api/test/video:function_video_factory", "../api/units:data_rate", + "../api/units:time_delta", "../api/units:timestamp", "../api/video:builtin_video_bitrate_allocator_factory", "../api/video:encoded_image", diff --git a/video/frame_buffer_proxy.cc b/video/frame_buffer_proxy.cc new file mode 100644 index 0000000000..48658f0bff --- /dev/null +++ b/video/frame_buffer_proxy.cc @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/frame_buffer_proxy.h" + +#include +#include + +#include "modules/video_coding/frame_buffer2.h" + +namespace webrtc { + +class FrameBuffer2Proxy : public FrameBufferProxy { + public: + FrameBuffer2Proxy(Clock* clock, + VCMTiming* timing, + VCMReceiveStatisticsCallback* stats_proxy, + rtc::TaskQueue* decode_queue, + FrameSchedulingReceiver* receiver, + TimeDelta max_wait_for_keyframe, + TimeDelta max_wait_for_frame) + : max_wait_for_keyframe_(max_wait_for_keyframe), + max_wait_for_frame_(max_wait_for_frame), + frame_buffer_(clock, timing, stats_proxy), + decode_queue_(decode_queue), + stats_proxy_(stats_proxy), + receiver_(receiver) { + RTC_DCHECK(decode_queue_); + RTC_DCHECK(stats_proxy_); + RTC_DCHECK(receiver_); + } + + void StopOnWorker() override { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + decode_queue_->PostTask([this] { + frame_buffer_.Stop(); + decode_safety_->SetNotAlive(); + }); + } + + void SetProtectionMode(VCMVideoProtection protection_mode) override { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + frame_buffer_.SetProtectionMode(kProtectionNackFEC); + } + + void Clear() override { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + frame_buffer_.Clear(); + } + + absl::optional InsertFrame( + std::unique_ptr frame) override { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + int64_t last_continuous_pid = frame_buffer_.InsertFrame(std::move(frame)); + if (last_continuous_pid != -1) + return last_continuous_pid; + return absl::nullopt; + } + + void UpdateRtt(int64_t max_rtt_ms) override { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + frame_buffer_.UpdateRtt(max_rtt_ms); + } + + void StartNextDecode(bool keyframe_required) override { + if (!decode_queue_->IsCurrent()) { + decode_queue_->PostTask(ToQueuedTask( + decode_safety_, + [this, keyframe_required] { StartNextDecode(keyframe_required); })); + return; + } + RTC_DCHECK_RUN_ON(decode_queue_); + + frame_buffer_.NextFrame( + MaxWait(keyframe_required).ms(), keyframe_required, decode_queue_, + /* encoded frame handler */ + [this, keyframe_required](std::unique_ptr frame) { + RTC_DCHECK_RUN_ON(decode_queue_); + if (!decode_safety_->alive()) + return; + if (frame) { + receiver_->OnEncodedFrame(std::move(frame)); + } else { + receiver_->OnDecodableFrameTimeout(MaxWait(keyframe_required)); + } + }); + } + + int Size() override { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + return frame_buffer_.Size(); + } + + private: + TimeDelta MaxWait(bool keyframe_required) const { + return keyframe_required ? max_wait_for_keyframe_ : max_wait_for_frame_; + } + + RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_sequence_checker_; + const TimeDelta max_wait_for_keyframe_; + const TimeDelta max_wait_for_frame_; + video_coding::FrameBuffer frame_buffer_; + rtc::TaskQueue* const decode_queue_; + VCMReceiveStatisticsCallback* const stats_proxy_; + FrameSchedulingReceiver* const receiver_; + rtc::scoped_refptr decode_safety_ = + PendingTaskSafetyFlag::CreateDetached(); +}; + +// TODO(bugs.webrtc.org/13343): Create FrameBuffer3Proxy when complete. +std::unique_ptr FrameBufferProxy::CreateFromFieldTrial( + Clock* clock, + TaskQueueBase* worker_queue, + VCMTiming* timing, + VCMReceiveStatisticsCallback* stats_proxy, + rtc::TaskQueue* decode_queue, + FrameSchedulingReceiver* receiver, + TimeDelta max_wait_for_keyframe, + TimeDelta max_wait_for_frame) { + return std::make_unique( + clock, timing, stats_proxy, decode_queue, receiver, max_wait_for_keyframe, + max_wait_for_frame); +} + +} // namespace webrtc diff --git a/video/frame_buffer_proxy.h b/video/frame_buffer_proxy.h new file mode 100644 index 0000000000..cab0bd20a3 --- /dev/null +++ b/video/frame_buffer_proxy.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef VIDEO_FRAME_BUFFER_PROXY_H_ +#define VIDEO_FRAME_BUFFER_PROXY_H_ + +#include + +#include "api/task_queue/task_queue_base.h" +#include "api/video/encoded_frame.h" +#include "modules/video_coding/include/video_coding_defines.h" +#include "modules/video_coding/timing.h" +#include "rtc_base/task_queue.h" +#include "system_wrappers/include/clock.h" +namespace webrtc { + +class FrameSchedulingReceiver { + public: + virtual ~FrameSchedulingReceiver() = default; + + virtual void OnEncodedFrame(std::unique_ptr frame) = 0; + virtual void OnDecodableFrameTimeout(TimeDelta wait_time) = 0; +}; + +// Temporary class to enable replacement of frame_buffer2 with frame_buffer3. +// Once frame_buffer3 has shown to work with a field trial, frame_buffer2 will +// be removed and this class should be directly integrated into +// video_receive_stream2. bugs.webrtc.org/13343 tracks this integration. +class FrameBufferProxy { + public: + static std::unique_ptr CreateFromFieldTrial( + Clock* clock, + TaskQueueBase* worker_queue, + VCMTiming* timing, + VCMReceiveStatisticsCallback* stats_proxy, + rtc::TaskQueue* decode_queue, + FrameSchedulingReceiver* receiver, + TimeDelta max_wait_for_keyframe, + TimeDelta max_wait_for_frame); + virtual ~FrameBufferProxy() = default; + + // Run on the worker thread. + virtual void StopOnWorker() = 0; + virtual void SetProtectionMode(VCMVideoProtection protection_mode) = 0; + virtual void Clear() = 0; + virtual absl::optional InsertFrame( + std::unique_ptr frame) = 0; + virtual void UpdateRtt(int64_t max_rtt_ms) = 0; + virtual int Size() = 0; + + // Run on either the worker thread or the decode thread. + virtual void StartNextDecode(bool keyframe_required) = 0; +}; + +} // namespace webrtc + +#endif // VIDEO_FRAME_BUFFER_PROXY_H_ diff --git a/video/frame_buffer_proxy_unittest.cc b/video/frame_buffer_proxy_unittest.cc new file mode 100644 index 0000000000..c8aa1149d7 --- /dev/null +++ b/video/frame_buffer_proxy_unittest.cc @@ -0,0 +1,638 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/frame_buffer_proxy.h" + +#include + +#include +#include +#include +#include + +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "api/video/video_content_type.h" +#include "rtc_base/event.h" +#include "test/field_trial.h" +#include "test/gmock.h" +#include "test/gtest.h" +#include "test/run_loop.h" +#include "test/time_controller/simulated_time_controller.h" + +using ::testing::AllOf; +using ::testing::Contains; +using ::testing::Each; +using ::testing::Eq; +using ::testing::IsEmpty; +using ::testing::Matches; +using ::testing::Ne; +using ::testing::Not; +using ::testing::Optional; +using ::testing::Pointee; +using ::testing::SizeIs; + +namespace webrtc { + +// For test printing. +void PrintTo(const EncodedFrame& frame, std::ostream* os) { + *os << "EncodedFrame with id=" << frame.Id() << " rtp=" << frame.Timestamp() + << " size=" << frame.size() << " refs=["; + for (size_t ref = 0; ref < frame.num_references; ++ref) { + *os << frame.references[ref] << ","; + } + *os << "]"; +} + +namespace { + +constexpr size_t kFrameSize = 10; +constexpr uint32_t kFps30Rtp = 90000 / 30; +constexpr TimeDelta kFps30Delay = 1 / Frequency::Hertz(30); +const VideoPlayoutDelay kZeroPlayoutDelay = {0, 0}; +constexpr Timestamp kClockStart = Timestamp::Millis(1000); + +class FakeEncodedFrame : public EncodedFrame { + public: + // Always 10ms delay and on time. + int64_t ReceivedTime() const override { + if (Timestamp() == 0) + return kClockStart.ms(); + return TimeDelta::Seconds(Timestamp() / 90000.0).ms() + kClockStart.ms(); + } + int64_t RenderTime() const override { return _renderTimeMs; } +}; + +MATCHER_P(FrameWithId, id, "") { + return Matches(Eq(id))(arg.Id()); +} + +MATCHER_P(FrameWithSize, id, "") { + return Matches(Eq(id))(arg.size()); +} + +class Builder { + public: + Builder& Time(uint32_t rtp_timestamp) { + rtp_timestamp_ = rtp_timestamp; + return *this; + } + Builder& Id(int64_t frame_id) { + frame_id_ = frame_id; + return *this; + } + Builder& AsLast() { + last_spatial_layer_ = true; + return *this; + } + Builder& Refs(const std::vector& references) { + references_ = references; + return *this; + } + Builder& PlayoutDelay(VideoPlayoutDelay playout_delay) { + playout_delay_ = playout_delay; + return *this; + } + Builder& SpatialLayer(int spatial_layer) { + spatial_layer_ = spatial_layer; + return *this; + } + + std::unique_ptr Build() { + RTC_CHECK_LE(references_.size(), EncodedFrame::kMaxFrameReferences); + RTC_CHECK(rtp_timestamp_); + RTC_CHECK(frame_id_); + + auto frame = std::make_unique(); + frame->SetTimestamp(*rtp_timestamp_); + frame->SetId(*frame_id_); + frame->is_last_spatial_layer = last_spatial_layer_; + frame->SetEncodedData(EncodedImageBuffer::Create(kFrameSize)); + + if (playout_delay_) + frame->SetPlayoutDelay(*playout_delay_); + + for (int64_t ref : references_) { + frame->references[frame->num_references] = ref; + frame->num_references++; + } + if (spatial_layer_) { + frame->SetSpatialIndex(spatial_layer_); + } + + return frame; + } + + private: + absl::optional rtp_timestamp_; + absl::optional frame_id_; + absl::optional playout_delay_; + absl::optional spatial_layer_; + bool last_spatial_layer_ = false; + std::vector references_; +}; + +class VCMReceiveStatisticsCallbackMock : public VCMReceiveStatisticsCallback { + public: + MOCK_METHOD(void, + OnCompleteFrame, + (bool is_keyframe, + size_t size_bytes, + VideoContentType content_type), + (override)); + MOCK_METHOD(void, OnDroppedFrames, (uint32_t num_dropped), (override)); + MOCK_METHOD(void, + OnFrameBufferTimingsUpdated, + (int max_decode_ms, + int current_delay_ms, + int target_delay_ms, + int jitter_buffer_ms, + int min_playout_delay_ms, + int render_delay_ms), + (override)); + MOCK_METHOD(void, + OnTimingFrameInfoUpdated, + (const TimingFrameInfo& info), + (override)); +}; +} // namespace + +constexpr auto kMaxWaitForKeyframe = TimeDelta::Millis(500); +constexpr auto kMaxWaitForFrame = TimeDelta::Millis(1500); +class FrameBufferProxyTest : public ::testing::TestWithParam, + public FrameSchedulingReceiver { + public: + FrameBufferProxyTest() + : field_trials_(GetParam()), + time_controller_(kClockStart), + clock_(time_controller_.GetClock()), + decode_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue( + "decode_queue", + TaskQueueFactory::Priority::NORMAL)), + timing_(clock_), + proxy_(FrameBufferProxy::CreateFromFieldTrial(clock_, + run_loop_.task_queue(), + &timing_, + &stats_callback_, + &decode_queue_, + this, + kMaxWaitForKeyframe, + kMaxWaitForFrame)) { + // Avoid starting with negative render times. + timing_.set_min_playout_delay(10); + + ON_CALL(stats_callback_, OnDroppedFrames) + .WillByDefault( + [this](auto num_dropped) { dropped_frames_ += num_dropped; }); + } + + ~FrameBufferProxyTest() override { + if (proxy_) { + proxy_->StopOnWorker(); + } + } + + void OnEncodedFrame(std::unique_ptr frame) override { + last_frame_ = std::move(frame); + run_loop_.Quit(); + } + + void OnDecodableFrameTimeout(TimeDelta wait_time) override { + timeouts_++; + run_loop_.Quit(); + } + + bool WaitForFrameOrTimeout(TimeDelta wait) { + if (NewFrameOrTimeout()) { + return true; + } + run_loop_.PostTask([&] { time_controller_.AdvanceTime(wait); }); + run_loop_.PostTask([&] { + // If run loop posted to a task queue, flush that. + time_controller_.AdvanceTime(TimeDelta::Zero()); + + run_loop_.PostTask([&] { + time_controller_.AdvanceTime(TimeDelta::Zero()); + run_loop_.Quit(); + }); + }); + run_loop_.Run(); + return NewFrameOrTimeout(); + } + + void StartNextDecode() { + ResetLastResult(); + proxy_->StartNextDecode(false); + time_controller_.AdvanceTime(TimeDelta::Zero()); + } + + void StartNextDecodeForceKeyframe() { + ResetLastResult(); + proxy_->StartNextDecode(true); + time_controller_.AdvanceTime(TimeDelta::Zero()); + } + + void ResetLastResult() { + last_frame_.reset(); + last_timeouts_ = timeouts_; + } + + int timeouts() const { return timeouts_; } + EncodedFrame* last_frame() const { return last_frame_.get(); } + int dropped_frames() const { return dropped_frames_; } + + protected: + test::ScopedFieldTrials field_trials_; + GlobalSimulatedTimeController time_controller_; + Clock* const clock_; + test::RunLoop run_loop_; + rtc::TaskQueue decode_queue_; + VCMTiming timing_; + ::testing::NiceMock stats_callback_; + std::unique_ptr proxy_; + + private: + bool NewFrameOrTimeout() const { + return last_frame_ || timeouts_ != last_timeouts_; + } + + int timeouts_ = 0; + int last_timeouts_ = 0; + std::unique_ptr last_frame_; + uint32_t dropped_frames_ = 0; +}; + +TEST_P(FrameBufferProxyTest, InitialTimeoutAfterKeyframeTimeoutPeriod) { + StartNextDecodeForceKeyframe(); + // No frame insterted. Timeout expected. + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForKeyframe)); + EXPECT_EQ(timeouts(), 1); + + // No new timeout set since receiver has not started new decode. + ResetLastResult(); + EXPECT_FALSE(WaitForFrameOrTimeout(kMaxWaitForKeyframe)); + EXPECT_EQ(timeouts(), 1); + + // Now that receiver has asked for new frame, a new timeout can occur. + StartNextDecodeForceKeyframe(); + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForKeyframe)); + EXPECT_EQ(timeouts(), 2); +} + +TEST_P(FrameBufferProxyTest, KeyFramesAreScheduled) { + StartNextDecodeForceKeyframe(); + time_controller_.AdvanceTime(TimeDelta::Millis(50)); + + auto frame = Builder().Id(0).Time(0).AsLast().Build(); + proxy_->InsertFrame(std::move(frame)); + + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + + ASSERT_THAT(last_frame(), Pointee(FrameWithId(0))); + EXPECT_EQ(timeouts(), 0); +} + +TEST_P(FrameBufferProxyTest, DeltaFrameTimeoutAfterKeyframeExtracted) { + StartNextDecodeForceKeyframe(); + + time_controller_.AdvanceTime(TimeDelta::Millis(50)); + auto frame = Builder().Id(0).Time(0).AsLast().Build(); + proxy_->InsertFrame(std::move(frame)); + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForKeyframe)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + StartNextDecode(); + time_controller_.AdvanceTime(TimeDelta::Millis(50)); + + // Timeouts should now happen at the normal frequency. + const int expected_timeouts = 5; + for (int i = 0; i < expected_timeouts; ++i) { + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForFrame)); + StartNextDecode(); + } + + EXPECT_EQ(timeouts(), expected_timeouts); +} + +TEST_P(FrameBufferProxyTest, DependantFramesAreScheduled) { + StartNextDecodeForceKeyframe(); + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + StartNextDecode(); + + time_controller_.AdvanceTime(kFps30Delay); + proxy_->InsertFrame( + Builder().Id(1).Time(kFps30Rtp).AsLast().Refs({0}).Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + + EXPECT_THAT(last_frame(), Pointee(FrameWithId(1))); + EXPECT_EQ(timeouts(), 0); +} + +TEST_P(FrameBufferProxyTest, SpatialLayersAreScheduled) { + StartNextDecodeForceKeyframe(); + proxy_->InsertFrame(Builder().Id(0).SpatialLayer(0).Time(0).Build()); + proxy_->InsertFrame(Builder().Id(1).SpatialLayer(1).Time(0).Build()); + proxy_->InsertFrame(Builder().Id(2).SpatialLayer(2).Time(0).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), + Pointee(AllOf(FrameWithId(0), FrameWithSize(3 * kFrameSize)))); + + proxy_->InsertFrame(Builder().Id(3).Time(kFps30Rtp).SpatialLayer(0).Build()); + proxy_->InsertFrame(Builder().Id(4).Time(kFps30Rtp).SpatialLayer(1).Build()); + proxy_->InsertFrame( + Builder().Id(5).Time(kFps30Rtp).SpatialLayer(2).AsLast().Build()); + + StartNextDecode(); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay * 10)); + EXPECT_THAT(last_frame(), + Pointee(AllOf(FrameWithId(3), FrameWithSize(3 * kFrameSize)))); + EXPECT_EQ(timeouts(), 0); +} + +TEST_P(FrameBufferProxyTest, OutstandingFrameTasksAreCancelledAfterDeletion) { + StartNextDecodeForceKeyframe(); + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + // Get keyframe. Delta frame should now be scheduled. + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + StartNextDecode(); + proxy_->InsertFrame( + Builder().Id(1).Time(kFps30Rtp).AsLast().Refs({0}).Build()); + proxy_->StopOnWorker(); + // Wait for 2x max wait time. Since we stopped, this should cause no timeouts + // or frame-ready callbacks. + EXPECT_FALSE(WaitForFrameOrTimeout(kMaxWaitForFrame * 2)); + EXPECT_EQ(timeouts(), 0); +} + +TEST_P(FrameBufferProxyTest, FramesWaitForDecoderToComplete) { + StartNextDecodeForceKeyframe(); + + // Start with a keyframe. + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + ResetLastResult(); + // Insert a delta frame. + proxy_->InsertFrame( + Builder().Id(1).Time(kFps30Rtp).AsLast().Refs({0}).Build()); + + // Advancing time should not result in a frame since the scheduler has not + // been signalled that we are ready. + EXPECT_FALSE(WaitForFrameOrTimeout(kFps30Delay)); + // Signal ready. + StartNextDecode(); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(1))); +} + +TEST_P(FrameBufferProxyTest, LateFrameDropped) { + StartNextDecodeForceKeyframe(); + // F1 + // / + // F0 --> F2 + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + // Start with a keyframe. + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + StartNextDecode(); + + // Simulate late F1 which arrives after F2. + time_controller_.AdvanceTime(kFps30Delay * 2); + proxy_->InsertFrame( + Builder().Id(2).Time(2 * kFps30Rtp).AsLast().Refs({0}).Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(2))); + + StartNextDecode(); + + proxy_->InsertFrame( + Builder().Id(1).Time(1 * kFps30Rtp).AsLast().Refs({0}).Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForFrame)); + // Confirm frame 1 is never scheduled by timing out. + EXPECT_EQ(timeouts(), 1); +} + +TEST_P(FrameBufferProxyTest, FramesFastForwardOnSystemHalt) { + StartNextDecodeForceKeyframe(); + // F1 + // / + // F0 --> F2 + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + + // Start with a keyframe. + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + time_controller_.AdvanceTime(kFps30Delay); + proxy_->InsertFrame( + Builder().Id(1).Time(kFps30Rtp).AsLast().Refs({0}).Build()); + time_controller_.AdvanceTime(kFps30Delay); + proxy_->InsertFrame( + Builder().Id(2).Time(2 * kFps30Rtp).AsLast().Refs({0}).Build()); + + // Halting time should result in F1 being skipped. + time_controller_.AdvanceTime(kFps30Delay * 2); + StartNextDecode(); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(2))); + EXPECT_EQ(dropped_frames(), 1); +} + +TEST_P(FrameBufferProxyTest, ForceKeyFrame) { + StartNextDecodeForceKeyframe(); + // Initial keyframe. + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + StartNextDecodeForceKeyframe(); + + // F2 is the next keyframe, and should be extracted since a keyframe was + // forced. + proxy_->InsertFrame( + Builder().Id(1).Time(kFps30Rtp).AsLast().Refs({0}).Build()); + proxy_->InsertFrame(Builder().Id(2).Time(kFps30Rtp * 2).AsLast().Build()); + + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForFrame)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(2))); +} + +TEST_P(FrameBufferProxyTest, SlowDecoderDropsTemporalLayers) { + StartNextDecodeForceKeyframe(); + // 2 temporal layers, at 15fps per layer to make 30fps total. + // Decoder is slower than 30fps, so last_frame() will be skipped. + // F1 --> F3 --> F5 + // / / / + // F0 --> F2 --> F4 + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + // Keyframe received. + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + // Don't start next decode until slow delay. + + time_controller_.AdvanceTime(kFps30Delay); + proxy_->InsertFrame( + Builder().Id(1).Time(1 * kFps30Rtp).Refs({0}).AsLast().Build()); + time_controller_.AdvanceTime(kFps30Delay); + proxy_->InsertFrame( + Builder().Id(2).Time(2 * kFps30Rtp).Refs({0}).AsLast().Build()); + + // Simulate decode taking 3x FPS rate. + time_controller_.AdvanceTime(kFps30Delay * 1.5); + StartNextDecode(); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay * 2)); + // F2 is the best frame since decoding was so slow that F1 is too old. + EXPECT_THAT(last_frame(), Pointee(FrameWithId(2))); + EXPECT_EQ(dropped_frames(), 1); + time_controller_.AdvanceTime(kFps30Delay / 2); + + proxy_->InsertFrame( + Builder().Id(3).Time(3 * kFps30Rtp).Refs({1, 2}).AsLast().Build()); + time_controller_.AdvanceTime(kFps30Delay / 2); + proxy_->InsertFrame( + Builder().Id(4).Time(4 * kFps30Rtp).Refs({2}).AsLast().Build()); + time_controller_.AdvanceTime(kFps30Delay / 2); + + // F4 is the best frame since decoding was so slow that F1 is too old. + time_controller_.AdvanceTime(kFps30Delay); + StartNextDecode(); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(4))); + + proxy_->InsertFrame( + Builder().Id(5).Time(5 * kFps30Rtp).Refs({3, 4}).AsLast().Build()); + time_controller_.AdvanceTime(kFps30Delay / 2); + + // F5 is not decodable since F4 was decoded, so a timeout is expected. + time_controller_.AdvanceTime(TimeDelta::Millis(10)); + StartNextDecode(); + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForFrame)); + EXPECT_EQ(timeouts(), 1); + // TODO(bugs.webrtc.org/13343): This should be 2 dropped frames since frames 1 + // and 3 were dropped. However, frame_buffer2 does not mark frame 3 as dropped + // which is a bug. Uncomment below when that is fixed for frame_buffer2 is + // deleted. + // EXPECT_EQ(dropped_frames(), 2); +} + +TEST_P(FrameBufferProxyTest, OldTimestampNotDecodable) { + StartNextDecodeForceKeyframe(); + + proxy_->InsertFrame(Builder().Id(0).Time(kFps30Rtp).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + // Timestamp is before previous frame's. + proxy_->InsertFrame(Builder().Id(1).Time(0).AsLast().Build()); + StartNextDecode(); + // F1 should be dropped since its timestamp went backwards. + EXPECT_TRUE(WaitForFrameOrTimeout(kMaxWaitForFrame)); + EXPECT_EQ(timeouts(), 1); +} + +TEST_P(FrameBufferProxyTest, NewFrameInsertedWhileWaitingToReleaseFrame) { + StartNextDecodeForceKeyframe(); + // Initial keyframe. + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + time_controller_.AdvanceTime(kFps30Delay); + proxy_->InsertFrame( + Builder().Id(1).Time(kFps30Rtp).Refs({0}).AsLast().Build()); + StartNextDecode(); + EXPECT_FALSE(WaitForFrameOrTimeout(TimeDelta::Zero())); + + // Scheduler is waiting to deliver Frame 1 now. Insert Frame 2. Frame 1 should + // be delivered still. + proxy_->InsertFrame( + Builder().Id(2).Time(kFps30Rtp * 2).Refs({0}).AsLast().Build()); + + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(1))); +} + +TEST_P(FrameBufferProxyTest, SameFrameNotScheduledTwice) { + // A frame could be scheduled twice if last_frame() arrive out-of-order but + // the older frame is old enough to be fast forwarded. + // + // 1. F2 arrives and is scheduled. + // 2. F3 arrives, but scheduling will not change since F2 is next. + // 3. F1 arrives late and scheduling is checked since it is before F2. F1 + // fast-forwarded since it is older. + // + // F2 is the best frame, but should only be scheduled once, followed by F3. + StartNextDecodeForceKeyframe(); + + // First keyframe. + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Millis(15))); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + StartNextDecode(); + + // Warmup VCMTiming for 30fps. + for (int i = 1; i <= 30; ++i) { + proxy_->InsertFrame(Builder().Id(i).Time(i * kFps30Rtp).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(i))); + StartNextDecode(); + } + + // F2 arrives and is scheduled. + proxy_->InsertFrame(Builder().Id(32).Time(32 * kFps30Rtp).AsLast().Build()); + + // F3 arrives before F2 is extracted. + time_controller_.AdvanceTime(kFps30Delay); + proxy_->InsertFrame(Builder().Id(33).Time(33 * kFps30Rtp).AsLast().Build()); + + // F1 arrives and is fast-forwarded since it is too late. + // F2 is already scheduled and should not be rescheduled. + time_controller_.AdvanceTime(kFps30Delay / 2); + proxy_->InsertFrame(Builder().Id(31).Time(31 * kFps30Rtp).AsLast().Build()); + + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(32))); + StartNextDecode(); + + EXPECT_TRUE(WaitForFrameOrTimeout(kFps30Delay)); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(33))); + EXPECT_EQ(dropped_frames(), 1); +} + +TEST_P(FrameBufferProxyTest, TestStatsCallback) { + EXPECT_CALL(stats_callback_, + OnCompleteFrame(true, kFrameSize, VideoContentType::UNSPECIFIED)); + EXPECT_CALL(stats_callback_, OnFrameBufferTimingsUpdated); + + // Fake timing having received decoded frame. + timing_.StopDecodeTimer(clock_->TimeInMicroseconds() + 1, + clock_->TimeInMilliseconds()); + StartNextDecodeForceKeyframe(); + proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build()); + EXPECT_TRUE(WaitForFrameOrTimeout(TimeDelta::Zero())); + EXPECT_THAT(last_frame(), Pointee(FrameWithId(0))); + + // Flush stats posted on the decode queue. + time_controller_.AdvanceTime(TimeDelta::Zero()); +} + +INSTANTIATE_TEST_SUITE_P(FrameBufferProxy, + FrameBufferProxyTest, + ::testing::Values("WebRTC-FrameBuffer3/Disabled/")); + +} // namespace webrtc diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index b1e593ba0c..395a15f949 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -23,6 +23,9 @@ #include "absl/types/optional.h" #include "api/array_view.h" #include "api/crypto/frame_decryptor_interface.h" +#include "api/scoped_refptr.h" +#include "api/sequence_checker.h" +#include "api/units/time_delta.h" #include "api/video/encoded_image.h" #include "api/video_codecs/h264_profile_level_id.h" #include "api/video_codecs/sdp_video_format.h" @@ -32,6 +35,7 @@ #include "call/rtp_stream_receiver_controller_interface.h" #include "call/rtx_receive_stream.h" #include "common_video/include/incoming_video_stream.h" +#include "modules/video_coding/frame_buffer2.h" #include "modules/video_coding/include/video_codec_interface.h" #include "modules/video_coding/include/video_coding_defines.h" #include "modules/video_coding/include/video_error_codes.h" @@ -43,6 +47,9 @@ #include "rtc_base/strings/string_builder.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/thread_registry.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/clock.h" @@ -251,8 +258,10 @@ VideoReceiveStream2::VideoReceiveStream2( timing_->set_render_delay(config_.render_delay_ms); - frame_buffer_.reset( - new video_coding::FrameBuffer(clock_, timing_.get(), &stats_proxy_)); + frame_buffer_ = FrameBufferProxy::CreateFromFieldTrial( + clock_, call->worker_thread(), timing_.get(), &stats_proxy_, + &decode_queue_, this, TimeDelta::Millis(max_wait_for_keyframe_ms_), + TimeDelta::Millis(max_wait_for_frame_ms_)); if (config_.rtp.rtx_ssrc) { rtx_receive_stream_ = std::make_unique( @@ -419,10 +428,8 @@ void VideoReceiveStream2::Stop() { stats_proxy_.OnUniqueFramesCounted( rtp_video_stream_receiver_.GetUniqueFramesSeen()); - decode_queue_.PostTask([this] { frame_buffer_->Stop(); }); - + frame_buffer_->StopOnWorker(); call_stats_->DeregisterStatsObserver(this); - if (decoder_running_) { rtc::Event done; decode_queue_.PostTask([this, &done] { @@ -657,12 +664,12 @@ void VideoReceiveStream2::OnCompleteFrame(std::unique_ptr frame) { UpdatePlayoutDelays(); } - int64_t last_continuous_pid = frame_buffer_->InsertFrame(std::move(frame)); - if (last_continuous_pid != -1) { + auto last_continuous_pid = frame_buffer_->InsertFrame(std::move(frame)); + if (last_continuous_pid.has_value()) { { // TODO(bugs.webrtc.org/11993): Call on the network thread. RTC_DCHECK_RUN_ON(&packet_sequence_checker_); - rtp_video_stream_receiver_.FrameContinuous(last_continuous_pid); + rtp_video_stream_receiver_.FrameContinuous(*last_continuous_pid); } } } @@ -715,29 +722,39 @@ int64_t VideoReceiveStream2::GetMaxWaitMs() const { : max_wait_for_frame_ms_; } +void VideoReceiveStream2::OnEncodedFrame(std::unique_ptr frame) { + if (!decode_queue_.IsCurrent()) { + decode_queue_.PostTask([this, frame = std::move(frame)]() mutable { + OnEncodedFrame(std::move(frame)); + }); + return; + } + RTC_DCHECK_RUN_ON(&decode_queue_); + HandleEncodedFrame(std::move(frame)); + StartNextDecode(); +} + +void VideoReceiveStream2::OnDecodableFrameTimeout(TimeDelta wait_time) { + if (!call_->worker_thread()->IsCurrent()) { + call_->worker_thread()->PostTask(ToQueuedTask( + task_safety_, + [this, wait_time] { OnDecodableFrameTimeout(wait_time); })); + return; + } + + // TODO(bugs.webrtc.org/11993): PostTask to the network thread. + RTC_DCHECK_RUN_ON(&packet_sequence_checker_); + int64_t now_ms = clock_->TimeInMilliseconds(); + HandleFrameBufferTimeout(now_ms, wait_time.ms()); + + decode_queue_.PostTask([this] { + RTC_DCHECK_RUN_ON(&decode_queue_); + StartNextDecode(); + }); +} + void VideoReceiveStream2::StartNextDecode() { - // Running on the decode thread. - TRACE_EVENT0("webrtc", "VideoReceiveStream2::StartNextDecode"); - frame_buffer_->NextFrame( - GetMaxWaitMs(), keyframe_required_, &decode_queue_, - /* encoded frame handler */ - [this](std::unique_ptr frame) { - RTC_DCHECK_RUN_ON(&decode_queue_); - if (decoder_stopped_) - return; - if (frame) { - HandleEncodedFrame(std::move(frame)); - } else { - int64_t now_ms = clock_->TimeInMilliseconds(); - // TODO(bugs.webrtc.org/11993): PostTask to the network thread. - call_->worker_thread()->PostTask(ToQueuedTask( - task_safety_, [this, now_ms, wait_ms = GetMaxWaitMs()]() { - RTC_DCHECK_RUN_ON(&packet_sequence_checker_); - HandleFrameBufferTimeout(now_ms, wait_ms); - })); - } - StartNextDecode(); - }); + frame_buffer_->StartNextDecode(keyframe_required_); } void VideoReceiveStream2::HandleEncodedFrame( diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index cf637f8c0e..970e9bdc90 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -25,7 +25,6 @@ #include "call/video_receive_stream.h" #include "modules/rtp_rtcp/include/flexfec_receiver.h" #include "modules/rtp_rtcp/source/source_tracker.h" -#include "modules/video_coding/frame_buffer2.h" #include "modules/video_coding/nack_requester.h" #include "modules/video_coding/video_receiver2.h" #include "rtc_base/system/no_unique_address.h" @@ -33,6 +32,7 @@ #include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread_annotations.h" #include "system_wrappers/include/clock.h" +#include "video/frame_buffer_proxy.h" #include "video/receive_statistics_proxy2.h" #include "video/rtp_streams_synchronizer2.h" #include "video/rtp_video_stream_receiver2.h" @@ -83,7 +83,8 @@ class VideoReceiveStream2 public NackSender, public RtpVideoStreamReceiver2::OnCompleteFrameCallback, public Syncable, - public CallStatsObserver { + public CallStatsObserver, + public FrameSchedulingReceiver { public: // The default number of milliseconds to pass before re-requesting a key frame // to be sent. @@ -184,6 +185,8 @@ class VideoReceiveStream2 void GenerateKeyFrame() override; private: + void OnEncodedFrame(std::unique_ptr frame) override; + void OnDecodableFrameTimeout(TimeDelta wait_time) override; void CreateAndRegisterExternalDecoder(const Decoder& decoder); int64_t GetMaxWaitMs() const RTC_RUN_ON(decode_queue_); void StartNextDecode() RTC_RUN_ON(decode_queue_); @@ -247,8 +250,7 @@ class VideoReceiveStream2 // moved to the new VideoStreamDecoder. std::vector> video_decoders_; - // Members for the new jitter buffer experiment. - std::unique_ptr frame_buffer_; + std::unique_ptr frame_buffer_; std::unique_ptr media_receiver_ RTC_GUARDED_BY(packet_sequence_checker_);