Introduce Sync-Decoding based on Metronome

Adds new class DecodeSynchronizer that will coalesce the decoding
of received streams on the metronome. This feature is experimental and
is backed by a field trial WebRTC-FrameBuffer3.

This experiment now has 3 arms to it,

"WebRTC-FrameBuffer3/arm:FrameBuffer2/": Default, uses old frame buffer.
"WebRTC-FrameBuffer3/arm:FrameBuffer3/": Uses new frame buffer.
"WebRTC-FrameBuffer3/arm:SyncDecoding/": Uses new frame buffer with
frame scheduled on the metronome.

The SyncDecoding arm will not work until it is wired up in the follow-up
CL.

This change also makes the following modifications,
* Adds FakeMetronome utilities for tests using a metronome.
* Makes FrameDecodeScheduler an interface. The default implementation is
TaskQueueFrameDecodeScheduler.
* FrameDecodeScheduler now has a Stop() method, which must be called
before destruction.


TBR=philipel@webrtc.org

Change-Id: I58a306bb883604b0be3eb2a04b3d07dbdf185c71
Bug: webrtc:13658
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/250665
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Stefan Holmer <holmer@google.com>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35988}
This commit is contained in:
Evan Shrubsole 2022-02-11 15:30:26 +01:00 committed by WebRTC LUCI CQ
parent 93348d89bc
commit 6cd6d8ecfd
19 changed files with 1067 additions and 128 deletions

View File

@ -255,6 +255,13 @@ specific_include_rules = {
"+rtc_base/ref_counted_object.h",
],
"fake_metronome\.h": [
"+rtc_base/synchronization/mutex.h",
"+rtc_base/task_queue.h",
"+rtc_base/task_utils/repeating_task.h",
"+rtc_base/thread_annotations.h",
],
"mock.*\.h": [
"+test/gmock.h",
],

View File

@ -0,0 +1,30 @@
# Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
import("../../../webrtc.gni")
rtc_library("fake_metronome") {
testonly = true
sources = [
"fake_metronome.cc",
"fake_metronome.h",
]
deps = [
"..:metronome",
"../..:priority",
"../..:sequence_checker",
"../../../rtc_base:macromagic",
"../../../rtc_base:rtc_event",
"../../../rtc_base:rtc_task_queue",
"../../../rtc_base/synchronization:mutex",
"../../../rtc_base/task_utils:repeating_task",
"../../../rtc_base/task_utils:to_queued_task",
"../../task_queue",
"../../units:time_delta",
]
}

View File

@ -0,0 +1,93 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "api/metronome/test/fake_metronome.h"
#include "api/priority.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
#include "rtc_base/event.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc::test {
ForcedTickMetronome::ForcedTickMetronome(TimeDelta tick_period)
: tick_period_(tick_period) {}
void ForcedTickMetronome::AddListener(TickListener* listener) {
listeners_.insert(listener);
}
void ForcedTickMetronome::RemoveListener(TickListener* listener) {
listeners_.erase(listener);
}
TimeDelta ForcedTickMetronome::TickPeriod() const {
return tick_period_;
}
size_t ForcedTickMetronome::NumListeners() {
return listeners_.size();
}
void ForcedTickMetronome::Tick() {
for (auto* listener : listeners_) {
listener->OnTickTaskQueue()->PostTask(
ToQueuedTask([listener] { listener->OnTick(); }));
}
}
FakeMetronome::FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period)
: tick_period_(tick_period),
queue_(factory->CreateTaskQueue("MetronomeQueue",
TaskQueueFactory::Priority::HIGH)) {}
FakeMetronome::~FakeMetronome() {
RTC_DCHECK(listeners_.empty());
}
void FakeMetronome::AddListener(TickListener* listener) {
MutexLock lock(&mutex_);
listeners_.insert(listener);
if (!started_) {
tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] {
MutexLock lock(&mutex_);
// Stop if empty.
if (listeners_.empty())
return TimeDelta::PlusInfinity();
for (auto* listener : listeners_) {
listener->OnTickTaskQueue()->PostTask(
ToQueuedTask([listener] { listener->OnTick(); }));
}
return tick_period_;
});
started_ = true;
}
}
void FakeMetronome::RemoveListener(TickListener* listener) {
MutexLock lock(&mutex_);
listeners_.erase(listener);
}
void FakeMetronome::Stop() {
MutexLock lock(&mutex_);
RTC_DCHECK(listeners_.empty());
if (started_)
queue_.PostTask([this] { tick_task_.Stop(); });
}
TimeDelta FakeMetronome::TickPeriod() const {
return tick_period_;
}
} // namespace webrtc::test

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef API_METRONOME_TEST_FAKE_METRONOME_H_
#define API_METRONOME_TEST_FAKE_METRONOME_H_
#include <memory>
#include <set>
#include "api/metronome/metronome.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc::test {
// ForcedTickMetronome is a Metronome that ticks when `Tick()` is invoked.
// The constructor argument `tick_period` returned in `TickPeriod()`.
class ForcedTickMetronome : public Metronome {
public:
explicit ForcedTickMetronome(TimeDelta tick_period);
// Forces all TickListeners to run `OnTick`.
void Tick();
size_t NumListeners();
// Metronome implementation.
void AddListener(TickListener* listener) override;
void RemoveListener(TickListener* listener) override;
TimeDelta TickPeriod() const override;
private:
const TimeDelta tick_period_;
std::set<TickListener*> listeners_;
};
// FakeMetronome is a metronome that ticks based on a repeating task at the
// `tick_period` provided in the constructor. It is designed for use with
// simulated task queues for unit tests.
//
// `Stop()` must be called before destruction, as it cancels the metronome tick
// on the proper task queue.
class FakeMetronome : public Metronome {
public:
FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period);
~FakeMetronome() override;
// Metronome implementation.
void AddListener(TickListener* listener) override;
void RemoveListener(TickListener* listener) override;
TimeDelta TickPeriod() const override;
void Stop();
private:
const TimeDelta tick_period_;
RepeatingTaskHandle tick_task_;
bool started_ RTC_GUARDED_BY(mutex_) = false;
std::set<TickListener*> listeners_ RTC_GUARDED_BY(mutex_);
Mutex mutex_;
rtc::TaskQueue queue_;
};
} // namespace webrtc::test
#endif // API_METRONOME_TEST_FAKE_METRONOME_H_

View File

@ -301,10 +301,12 @@ rtc_library("frame_buffer_proxy") {
"frame_buffer_proxy.h",
]
deps = [
":frame_decode_scheduler",
":decode_synchronizer",
":frame_decode_timing",
":task_queue_frame_decode_scheduler",
":video_receive_stream_timeout_tracker",
"../api:sequence_checker",
"../api/metronome",
"../api/task_queue",
"../api/video:encoded_frame",
"../modules/video_coding",
@ -312,6 +314,7 @@ rtc_library("frame_buffer_proxy") {
"../modules/video_coding:frame_helpers",
"../modules/video_coding:timing",
"../modules/video_coding:video_codec_interface",
"../rtc_base:checks",
"../rtc_base:logging",
"../rtc_base:macromagic",
"../rtc_base:rtc_task_queue",
@ -324,16 +327,27 @@ rtc_library("frame_buffer_proxy") {
]
}
rtc_library("frame_decode_scheduler") {
rtc_source_set("frame_decode_scheduler") {
sources = [ "frame_decode_scheduler.h" ]
deps = [
":frame_decode_timing",
"../api/units:timestamp",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("task_queue_frame_decode_scheduler") {
sources = [
"frame_decode_scheduler.cc",
"frame_decode_scheduler.h",
"task_queue_frame_decode_scheduler.cc",
"task_queue_frame_decode_scheduler.h",
]
deps = [
":frame_decode_scheduler",
":frame_decode_timing",
"../api:sequence_checker",
"../api/task_queue",
"../api/units:timestamp",
"../rtc_base:checks",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
"../system_wrappers",
@ -369,6 +383,26 @@ rtc_library("video_receive_stream_timeout_tracker") {
]
}
rtc_library("decode_synchronizer") {
sources = [
"decode_synchronizer.cc",
"decode_synchronizer.h",
]
deps = [
":frame_decode_scheduler",
":frame_decode_timing",
"../api:sequence_checker",
"../api/metronome",
"../api/task_queue",
"../api/units:time_delta",
"../api/units:timestamp",
"../rtc_base:checks",
"../rtc_base:logging",
"../rtc_base:macromagic",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("video_stream_encoder_impl") {
visibility = [ "*" ]
@ -702,6 +736,7 @@ if (rtc_include_tests) {
"buffered_frame_decryptor_unittest.cc",
"call_stats2_unittest.cc",
"cpu_scaling_tests.cc",
"decode_synchronizer_unittest.cc",
"encoder_bitrate_adjuster_unittest.cc",
"encoder_overshoot_detector_unittest.cc",
"encoder_rtcp_feedback_unittest.cc",
@ -726,7 +761,6 @@ if (rtc_include_tests) {
"end_to_end_tests/transport_feedback_tests.cc",
"frame_buffer_proxy_unittest.cc",
"frame_cadence_adapter_unittest.cc",
"frame_decode_scheduler_unittest.cc",
"frame_decode_timing_unittest.cc",
"frame_encode_metadata_writer_unittest.cc",
"picture_id_tests.cc",
@ -741,6 +775,7 @@ if (rtc_include_tests) {
"send_statistics_proxy_unittest.cc",
"stats_counter_unittest.cc",
"stream_synchronization_unittest.cc",
"task_queue_frame_decode_scheduler_unittest.cc",
"video_receive_stream2_unittest.cc",
"video_receive_stream_timeout_tracker_unittest.cc",
"video_send_stream_impl_unittest.cc",
@ -750,10 +785,12 @@ if (rtc_include_tests) {
"video_stream_encoder_unittest.cc",
]
deps = [
":decode_synchronizer",
":frame_buffer_proxy",
":frame_cadence_adapter",
":frame_decode_scheduler",
":frame_decode_timing",
":task_queue_frame_decode_scheduler",
":video",
":video_mocks",
":video_receive_stream_timeout_tracker",
@ -777,6 +814,7 @@ if (rtc_include_tests) {
"../api:transport_api",
"../api/adaptation:resource_adaptation_api",
"../api/crypto:options",
"../api/metronome/test:fake_metronome",
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
@ -870,6 +908,7 @@ if (rtc_include_tests) {
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/functional:bind_front",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/types:optional",
"//third_party/abseil-cpp/absl/types:variant",

View File

@ -0,0 +1,186 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "video/decode_synchronizer.h"
#include <iterator>
#include <memory>
#include <utility>
#include <vector>
#include "api/sequence_checker.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "video/frame_decode_scheduler.h"
#include "video/frame_decode_timing.h"
namespace webrtc {
DecodeSynchronizer::ScheduledFrame::ScheduledFrame(
uint32_t rtp_timestamp,
FrameDecodeTiming::FrameSchedule schedule,
FrameDecodeScheduler::FrameReleaseCallback callback)
: rtp_timestamp_(rtp_timestamp),
schedule_(std::move(schedule)),
callback_(std::move(callback)) {}
void DecodeSynchronizer::ScheduledFrame::RunFrameReleaseCallback() && {
// Inspiration from Chromium base::OnceCallback. Move `*this` to a local
// before execution to ensure internal state is cleared after callback
// execution.
auto sf = std::move(*this);
sf.callback_(sf.rtp_timestamp_, sf.schedule_.render_time);
}
Timestamp DecodeSynchronizer::ScheduledFrame::LatestDecodeTime() const {
return schedule_.latest_decode_time;
}
DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync)
: sync_(sync) {
RTC_DCHECK(sync_);
}
DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
~SynchronizedFrameDecodeScheduler() {
RTC_DCHECK(!next_frame_);
RTC_DCHECK(stopped_);
}
absl::optional<uint32_t>
DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduledRtpTimestamp() {
return next_frame_.has_value()
? absl::make_optional(next_frame_->rtp_timestamp())
: absl::nullopt;
}
DecodeSynchronizer::ScheduledFrame
DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ReleaseNextFrame() {
RTC_DCHECK(next_frame_);
auto res = std::move(*next_frame_);
next_frame_.reset();
return res;
}
Timestamp
DecodeSynchronizer::SynchronizedFrameDecodeScheduler::LatestDecodeTime() {
RTC_DCHECK(next_frame_);
return next_frame_->LatestDecodeTime();
}
void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduleFrame(
uint32_t rtp,
FrameDecodeTiming::FrameSchedule schedule,
FrameReleaseCallback cb) {
RTC_DCHECK(!next_frame_) << "Can not schedule two frames at once.";
next_frame_ = ScheduledFrame(rtp, std::move(schedule), std::move(cb));
sync_->OnFrameScheduled(this);
}
void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::CancelOutstanding() {
next_frame_.reset();
}
void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::Stop() {
CancelOutstanding();
stopped_ = true;
sync_->RemoveFrameScheduler(this);
}
DecodeSynchronizer::DecodeSynchronizer(Clock* clock,
Metronome* metronome,
TaskQueueBase* worker_queue)
: clock_(clock), worker_queue_(worker_queue), metronome_(metronome) {
RTC_DCHECK(metronome_);
RTC_DCHECK(worker_queue_);
}
DecodeSynchronizer::~DecodeSynchronizer() {
RTC_DCHECK(schedulers_.empty());
}
std::unique_ptr<FrameDecodeScheduler>
DecodeSynchronizer::CreateSynchronizedFrameScheduler() {
RTC_DCHECK_RUN_ON(worker_queue_);
auto scheduler = std::make_unique<SynchronizedFrameDecodeScheduler>(this);
auto [it, inserted] = schedulers_.emplace(scheduler.get());
// If this is the first `scheduler` added, start listening to the metronome.
if (inserted && schedulers_.size() == 1) {
RTC_DLOG(LS_VERBOSE) << "Listening to metronome";
metronome_->AddListener(this);
}
return std::move(scheduler);
}
void DecodeSynchronizer::OnFrameScheduled(
SynchronizedFrameDecodeScheduler* scheduler) {
RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(scheduler->ScheduledRtpTimestamp());
Timestamp now = clock_->CurrentTime();
Timestamp next_tick = expected_next_tick_;
// If no tick has registered yet assume it will occur in the tick period.
if (next_tick.IsInfinite()) {
next_tick = now + metronome_->TickPeriod();
}
// Release the frame right away if the decode time is too soon. Otherwise
// the stream may fall behind too much.
bool decode_before_next_tick =
scheduler->LatestDecodeTime() <
(next_tick - FrameDecodeTiming::kMaxAllowedFrameDelay);
// Decode immediately if the decode time is in the past.
bool decode_time_in_past = scheduler->LatestDecodeTime() < now;
if (decode_before_next_tick || decode_time_in_past) {
ScheduledFrame scheduled_frame = scheduler->ReleaseNextFrame();
std::move(scheduled_frame).RunFrameReleaseCallback();
}
}
void DecodeSynchronizer::RemoveFrameScheduler(
SynchronizedFrameDecodeScheduler* scheduler) {
RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(scheduler);
auto it = schedulers_.find(scheduler);
if (it == schedulers_.end()) {
return;
}
schedulers_.erase(it);
// If there are no more schedulers active, stop listening for metronome ticks.
if (schedulers_.empty()) {
RTC_DLOG(LS_VERBOSE) << "Not listening to metronome";
metronome_->RemoveListener(this);
expected_next_tick_ = Timestamp::PlusInfinity();
}
}
void DecodeSynchronizer::OnTick() {
RTC_DCHECK_RUN_ON(worker_queue_);
expected_next_tick_ = clock_->CurrentTime() + metronome_->TickPeriod();
for (auto* scheduler : schedulers_) {
if (scheduler->ScheduledRtpTimestamp() &&
scheduler->LatestDecodeTime() < expected_next_tick_) {
auto scheduled_frame = scheduler->ReleaseNextFrame();
std::move(scheduled_frame).RunFrameReleaseCallback();
}
}
}
TaskQueueBase* DecodeSynchronizer::OnTickTaskQueue() {
return worker_queue_;
}
} // namespace webrtc

137
video/decode_synchronizer.h Normal file
View File

@ -0,0 +1,137 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef VIDEO_DECODE_SYNCHRONIZER_H_
#define VIDEO_DECODE_SYNCHRONIZER_H_
#include <stdint.h>
#include <functional>
#include <memory>
#include <set>
#include <utility>
#include "absl/types/optional.h"
#include "api/metronome/metronome.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/timestamp.h"
#include "rtc_base/checks.h"
#include "rtc_base/thread_annotations.h"
#include "video/frame_decode_scheduler.h"
#include "video/frame_decode_timing.h"
namespace webrtc {
// DecodeSynchronizer synchronizes the frame scheduling by coalescing decoding
// on the metronome.
//
// A video receive stream can use the DecodeSynchronizer by receiving a
// FrameDecodeScheduler instance with `CreateSynchronizedFrameScheduler()`.
// This instance implements FrameDecodeScheduler and can be used as a normal
// scheduler. This instance is owned by the receive stream, and is borrowed by
// the DecodeSynchronizer. The DecodeSynchronizer will stop borrowing the
// instance when `FrameDecodeScheduler::Stop()` is called, after which the
// scheduler may be destroyed by the receive stream.
//
// When a frame is scheduled for decode by a receive stream using the
// DecodeSynchronizer, it will instead be executed on the metronome during the
// tick interval where `max_decode_time` occurs. For example, if a frame is
// scheduled for decode in 50ms and the tick interval is 20ms, then the frame
// will be released for decoding in 2 ticks. See below for illustation,
//
// In the case where the decode time is in the past, or must occur before the
// next metronome tick then the frame will be released right away, allowing a
// delayed stream to catch up quickly.
//
// DecodeSynchronizer is single threaded - all method calls must run on the
// `worker_queue_`.
class DecodeSynchronizer : private Metronome::TickListener {
public:
DecodeSynchronizer(Clock* clock,
Metronome* metronome,
TaskQueueBase* worker_queue);
~DecodeSynchronizer() override;
DecodeSynchronizer(const DecodeSynchronizer&) = delete;
DecodeSynchronizer& operator=(const DecodeSynchronizer&) = delete;
std::unique_ptr<FrameDecodeScheduler> CreateSynchronizedFrameScheduler();
private:
class ScheduledFrame {
public:
ScheduledFrame(uint32_t rtp_timestamp,
FrameDecodeTiming::FrameSchedule schedule,
FrameDecodeScheduler::FrameReleaseCallback callback);
// Disallow copy since `callback` should only be moved.
ScheduledFrame(const ScheduledFrame&) = delete;
ScheduledFrame& operator=(const ScheduledFrame&) = delete;
ScheduledFrame(ScheduledFrame&&) = default;
ScheduledFrame& operator=(ScheduledFrame&&) = default;
// Executes `callback_`.
void RunFrameReleaseCallback() &&;
uint32_t rtp_timestamp() const { return rtp_timestamp_; }
Timestamp LatestDecodeTime() const;
private:
uint32_t rtp_timestamp_;
FrameDecodeTiming::FrameSchedule schedule_;
FrameDecodeScheduler::FrameReleaseCallback callback_;
};
class SynchronizedFrameDecodeScheduler : public FrameDecodeScheduler {
public:
explicit SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync);
~SynchronizedFrameDecodeScheduler() override;
// Releases the outstanding frame for decoding. This invalidates
// `next_frame_`. There must be a frame scheduled.
ScheduledFrame ReleaseNextFrame();
// Returns `next_frame_.schedule.max_decode_time`. There must be a frame
// scheduled when this is called.
Timestamp LatestDecodeTime();
// FrameDecodeScheduler implementation.
absl::optional<uint32_t> ScheduledRtpTimestamp() override;
void ScheduleFrame(uint32_t rtp,
FrameDecodeTiming::FrameSchedule schedule,
FrameReleaseCallback cb) override;
void CancelOutstanding() override;
void Stop() override;
private:
DecodeSynchronizer* sync_;
absl::optional<ScheduledFrame> next_frame_;
bool stopped_ = false;
};
void OnFrameScheduled(SynchronizedFrameDecodeScheduler* scheduler);
void RemoveFrameScheduler(SynchronizedFrameDecodeScheduler* scheduler);
// Metronome::TickListener implementation.
void OnTick() override;
TaskQueueBase* OnTickTaskQueue() override;
Clock* const clock_;
TaskQueueBase* const worker_queue_;
Metronome* const metronome_;
Timestamp expected_next_tick_ = Timestamp::PlusInfinity();
std::set<SynchronizedFrameDecodeScheduler*> schedulers_
RTC_GUARDED_BY(worker_queue_);
};
} // namespace webrtc
#endif // VIDEO_DECODE_SYNCHRONIZER_H_

View File

@ -0,0 +1,232 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "video/decode_synchronizer.h"
#include <stddef.h>
#include <memory>
#include <utility>
#include "api/metronome/test/fake_metronome.h"
#include "api/units/time_delta.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/run_loop.h"
#include "test/time_controller/simulated_time_controller.h"
#include "video/frame_decode_scheduler.h"
#include "video/frame_decode_timing.h"
using ::testing::_;
using ::testing::Eq;
namespace webrtc {
class DecodeSynchronizerTest : public ::testing::Test {
public:
static constexpr TimeDelta kTickPeriod = TimeDelta::Millis(33);
DecodeSynchronizerTest()
: time_controller_(Timestamp::Millis(1337)),
clock_(time_controller_.GetClock()),
metronome_(kTickPeriod),
decode_synchronizer_(clock_, &metronome_, run_loop_.task_queue()) {}
protected:
GlobalSimulatedTimeController time_controller_;
Clock* clock_;
test::RunLoop run_loop_;
test::ForcedTickMetronome metronome_;
DecodeSynchronizer decode_synchronizer_;
};
TEST_F(DecodeSynchronizerTest, AllFramesReadyBeforeNextTickDecoded) {
::testing::MockFunction<void(uint32_t, Timestamp)> mock_callback1;
auto scheduler1 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
testing::MockFunction<void(unsigned int, Timestamp)> mock_callback2;
auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
{
uint32_t frame_rtp = 90000;
FrameDecodeTiming::FrameSchedule frame_sched{
.latest_decode_time =
clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(3),
.render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
scheduler1->ScheduleFrame(frame_rtp, frame_sched,
mock_callback1.AsStdFunction());
EXPECT_CALL(mock_callback1,
Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
}
{
uint32_t frame_rtp = 123456;
FrameDecodeTiming::FrameSchedule frame_sched{
.latest_decode_time =
clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(2),
.render_time = clock_->CurrentTime() + TimeDelta::Millis(70)};
scheduler2->ScheduleFrame(frame_rtp, frame_sched,
mock_callback2.AsStdFunction());
EXPECT_CALL(mock_callback2,
Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
}
metronome_.Tick();
run_loop_.Flush();
// Cleanup
scheduler1->Stop();
scheduler2->Stop();
}
TEST_F(DecodeSynchronizerTest, FramesNotDecodedIfDecodeTimeIsInNextInterval) {
::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
uint32_t frame_rtp = 90000;
FrameDecodeTiming::FrameSchedule frame_sched{
.latest_decode_time =
clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(10),
.render_time =
clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(30)};
scheduler->ScheduleFrame(frame_rtp, frame_sched,
mock_callback.AsStdFunction());
metronome_.Tick();
run_loop_.Flush();
// No decodes should have happened in this tick.
::testing::Mock::VerifyAndClearExpectations(&mock_callback);
// Decode should happen on next tick.
EXPECT_CALL(mock_callback, Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
time_controller_.AdvanceTime(kTickPeriod);
metronome_.Tick();
run_loop_.Flush();
// Cleanup
scheduler->Stop();
}
TEST_F(DecodeSynchronizerTest, FrameDecodedOnce) {
::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
uint32_t frame_rtp = 90000;
FrameDecodeTiming::FrameSchedule frame_sched{
.latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30),
.render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
scheduler->ScheduleFrame(frame_rtp, frame_sched,
mock_callback.AsStdFunction());
EXPECT_CALL(mock_callback, Call(_, _)).Times(1);
metronome_.Tick();
run_loop_.Flush();
::testing::Mock::VerifyAndClearExpectations(&mock_callback);
// Trigger tick again. No frame should be decoded now.
time_controller_.AdvanceTime(kTickPeriod);
metronome_.Tick();
run_loop_.Flush();
// Cleanup
scheduler->Stop();
}
TEST_F(DecodeSynchronizerTest, FrameWithDecodeTimeInPastDecodedImmediately) {
::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
uint32_t frame_rtp = 90000;
FrameDecodeTiming::FrameSchedule frame_sched{
.latest_decode_time = clock_->CurrentTime() - TimeDelta::Millis(5),
.render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1);
scheduler->ScheduleFrame(frame_rtp, frame_sched,
mock_callback.AsStdFunction());
// Verify the callback was invoked already.
::testing::Mock::VerifyAndClearExpectations(&mock_callback);
metronome_.Tick();
run_loop_.Flush();
// Cleanup
scheduler->Stop();
}
TEST_F(DecodeSynchronizerTest,
FrameWithDecodeTimeFarBeforeNextTickDecodedImmediately) {
::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
// Frame which would be behind by more than kMaxAllowedFrameDelay after
// the next tick.
FrameDecodeTiming::FrameSchedule frame_sched{
.latest_decode_time = clock_->CurrentTime() + kTickPeriod -
FrameDecodeTiming::kMaxAllowedFrameDelay -
TimeDelta::Millis(1),
.render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1);
scheduler->ScheduleFrame(90000, frame_sched, mock_callback.AsStdFunction());
// Verify the callback was invoked already.
::testing::Mock::VerifyAndClearExpectations(&mock_callback);
time_controller_.AdvanceTime(kTickPeriod);
metronome_.Tick();
run_loop_.Flush();
// A frame that would be behind by exactly kMaxAllowedFrameDelay after next
// tick should decode at the next tick.
FrameDecodeTiming::FrameSchedule queued_frame{
.latest_decode_time = clock_->CurrentTime() + kTickPeriod -
FrameDecodeTiming::kMaxAllowedFrameDelay,
.render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
scheduler->ScheduleFrame(180000, queued_frame, mock_callback.AsStdFunction());
// Verify the callback was invoked already.
::testing::Mock::VerifyAndClearExpectations(&mock_callback);
EXPECT_CALL(mock_callback, Call(Eq(180000u), _)).Times(1);
time_controller_.AdvanceTime(kTickPeriod);
metronome_.Tick();
run_loop_.Flush();
// Cleanup
scheduler->Stop();
}
TEST_F(DecodeSynchronizerTest, FramesNotReleasedAfterStop) {
::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
uint32_t frame_rtp = 90000;
FrameDecodeTiming::FrameSchedule frame_sched{
.latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30),
.render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
scheduler->ScheduleFrame(frame_rtp, frame_sched,
mock_callback.AsStdFunction());
// Cleanup
scheduler->Stop();
// No callback should occur on this tick since Stop() was called before.
metronome_.Tick();
run_loop_.Flush();
}
TEST_F(DecodeSynchronizerTest, MetronomeNotListenedWhenNoStreamsAreActive) {
EXPECT_EQ(0u, metronome_.NumListeners());
auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
EXPECT_EQ(1u, metronome_.NumListeners());
auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
EXPECT_EQ(1u, metronome_.NumListeners());
scheduler->Stop();
EXPECT_EQ(1u, metronome_.NumListeners());
scheduler2->Stop();
EXPECT_EQ(0u, metronome_.NumListeners());
}
} // namespace webrtc

View File

@ -14,20 +14,24 @@
#include <memory>
#include <utility>
#include "absl/base/attributes.h"
#include "absl/functional/bind_front.h"
#include "api/sequence_checker.h"
#include "modules/video_coding/frame_buffer2.h"
#include "modules/video_coding/frame_buffer3.h"
#include "modules/video_coding/frame_helpers.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/thread_annotations.h"
#include "system_wrappers/include/field_trial.h"
#include "video/frame_decode_scheduler.h"
#include "video/frame_decode_timing.h"
#include "video/task_queue_frame_decode_scheduler.h"
#include "video/video_receive_stream_timeout_tracker.h"
namespace webrtc {
namespace {
class FrameBuffer2Proxy : public FrameBufferProxy {
public:
FrameBuffer2Proxy(Clock* clock,
@ -140,14 +144,16 @@ static constexpr size_t kZeroPlayoutDelayDefaultMaxDecodeQueueSize = 8;
// accounting are moved into this pro
class FrameBuffer3Proxy : public FrameBufferProxy {
public:
FrameBuffer3Proxy(Clock* clock,
TaskQueueBase* worker_queue,
VCMTiming* timing,
VCMReceiveStatisticsCallback* stats_proxy,
rtc::TaskQueue* decode_queue,
FrameSchedulingReceiver* receiver,
TimeDelta max_wait_for_keyframe,
TimeDelta max_wait_for_frame)
FrameBuffer3Proxy(
Clock* clock,
TaskQueueBase* worker_queue,
VCMTiming* timing,
VCMReceiveStatisticsCallback* stats_proxy,
rtc::TaskQueue* decode_queue,
FrameSchedulingReceiver* receiver,
TimeDelta max_wait_for_keyframe,
TimeDelta max_wait_for_frame,
std::unique_ptr<FrameDecodeScheduler> frame_decode_scheduler)
: max_wait_for_keyframe_(max_wait_for_keyframe),
max_wait_for_frame_(max_wait_for_frame),
clock_(clock),
@ -156,15 +162,11 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
stats_proxy_(stats_proxy),
receiver_(receiver),
timing_(timing),
frame_decode_scheduler_(std::move(frame_decode_scheduler)),
jitter_estimator_(clock_),
inter_frame_delay_(clock_->TimeInMilliseconds()),
buffer_(std::make_unique<FrameBuffer>(kMaxFramesBuffered,
kMaxFramesHistory)),
frame_decode_scheduler_(
clock_,
worker_queue,
absl::bind_front(&FrameBuffer3Proxy::OnFrameReadyForExtraction,
this)),
decode_timing_(clock_, timing_),
timeout_tracker_(clock_,
worker_queue_,
@ -181,6 +183,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
RTC_DCHECK(timing_);
RTC_DCHECK(worker_queue_);
RTC_DCHECK(clock_);
RTC_DCHECK(frame_decode_scheduler_);
RTC_LOG(LS_WARNING) << "Using FrameBuffer3";
ParseFieldTrial({&zero_playout_delay_max_decode_queue_size_},
@ -190,8 +193,8 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
// FrameBufferProxy implementation.
void StopOnWorker() override {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
frame_decode_scheduler_->Stop();
timeout_tracker_.Stop();
frame_decode_scheduler_.CancelOutstanding();
decoder_ready_for_new_frame_ = false;
decode_queue_->PostTask([this] {
RTC_DCHECK_RUN_ON(decode_queue_);
@ -209,7 +212,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
stats_proxy_->OnDroppedFrames(buffer_->CurrentSize());
buffer_ =
std::make_unique<FrameBuffer>(kMaxFramesBuffered, kMaxFramesHistory);
frame_decode_scheduler_.CancelOutstanding();
frame_decode_scheduler_->CancelOutstanding();
}
absl::optional<int64_t> InsertFrame(
@ -342,8 +345,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
}
private:
void OnFrameReadyForExtraction(uint32_t rtp_timestamp,
Timestamp render_time) {
void FrameReadyForDecode(uint32_t rtp_timestamp, Timestamp render_time) {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
RTC_DCHECK(buffer_->NextDecodableTemporalUnitRtpTimestamp() ==
rtp_timestamp)
@ -430,7 +432,7 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
auto last_rtp = *buffer_->LastDecodableTemporalUnitRtpTimestamp();
// If already scheduled then abort.
if (frame_decode_scheduler_.scheduled_rtp() ==
if (frame_decode_scheduler_->ScheduledRtpTimestamp() ==
buffer_->NextDecodableTemporalUnitRtpTimestamp())
return;
@ -441,9 +443,11 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
IsTooManyFramesQueued());
if (schedule) {
// Don't schedule if already waiting for the same frame.
if (frame_decode_scheduler_.scheduled_rtp() != next_rtp) {
frame_decode_scheduler_.CancelOutstanding();
frame_decode_scheduler_.ScheduleFrame(next_rtp, *schedule);
if (frame_decode_scheduler_->ScheduledRtpTimestamp() != next_rtp) {
frame_decode_scheduler_->CancelOutstanding();
frame_decode_scheduler_->ScheduleFrame(
next_rtp, *schedule,
absl::bind_front(&FrameBuffer3Proxy::FrameReadyForDecode, this));
}
return;
}
@ -463,6 +467,8 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
VCMReceiveStatisticsCallback* const stats_proxy_;
FrameSchedulingReceiver* const receiver_;
VCMTiming* const timing_;
const std::unique_ptr<FrameDecodeScheduler> frame_decode_scheduler_
RTC_GUARDED_BY(&worker_sequence_checker_);
VCMJitterEstimator jitter_estimator_
RTC_GUARDED_BY(&worker_sequence_checker_);
@ -471,8 +477,6 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
bool keyframe_required_ RTC_GUARDED_BY(&worker_sequence_checker_) = false;
std::unique_ptr<FrameBuffer> buffer_
RTC_GUARDED_BY(&worker_sequence_checker_);
FrameDecodeScheduler frame_decode_scheduler_
RTC_GUARDED_BY(&worker_sequence_checker_);
FrameDecodeTiming decode_timing_ RTC_GUARDED_BY(&worker_sequence_checker_);
VideoReceiveStreamTimeoutTracker timeout_tracker_
RTC_GUARDED_BY(&worker_sequence_checker_);
@ -500,6 +504,28 @@ class FrameBuffer3Proxy : public FrameBufferProxy {
ScopedTaskSafety worker_safety_;
};
enum class FrameBufferArm {
kFrameBuffer2,
kFrameBuffer3,
kSyncDecode,
};
constexpr const char* kFrameBufferFieldTrial = "WebRTC-FrameBuffer3";
FrameBufferArm ParseFrameBufferFieldTrial() {
webrtc::FieldTrialEnum<FrameBufferArm> arm(
"arm", FrameBufferArm::kFrameBuffer2,
{
{"FrameBuffer2", FrameBufferArm::kFrameBuffer2},
{"FrameBuffer3", FrameBufferArm::kFrameBuffer3},
{"SyncDecoding", FrameBufferArm::kSyncDecode},
});
ParseFieldTrial({&arm}, field_trial::FindFullName(kFrameBufferFieldTrial));
return arm.Get();
}
} // namespace
std::unique_ptr<FrameBufferProxy> FrameBufferProxy::CreateFromFieldTrial(
Clock* clock,
TaskQueueBase* worker_queue,
@ -508,14 +534,39 @@ std::unique_ptr<FrameBufferProxy> FrameBufferProxy::CreateFromFieldTrial(
rtc::TaskQueue* decode_queue,
FrameSchedulingReceiver* receiver,
TimeDelta max_wait_for_keyframe,
TimeDelta max_wait_for_frame) {
if (field_trial::IsEnabled("WebRTC-FrameBuffer3"))
return std::make_unique<FrameBuffer3Proxy>(
clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
max_wait_for_keyframe, max_wait_for_frame);
return std::make_unique<FrameBuffer2Proxy>(
clock, timing, stats_proxy, decode_queue, receiver, max_wait_for_keyframe,
max_wait_for_frame);
TimeDelta max_wait_for_frame,
DecodeSynchronizer* decode_sync) {
switch (ParseFrameBufferFieldTrial()) {
case FrameBufferArm::kFrameBuffer3: {
auto scheduler =
std::make_unique<TaskQueueFrameDecodeScheduler>(clock, worker_queue);
return std::make_unique<FrameBuffer3Proxy>(
clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler));
}
case FrameBufferArm::kSyncDecode: {
std::unique_ptr<FrameDecodeScheduler> scheduler;
if (decode_sync) {
scheduler = decode_sync->CreateSynchronizedFrameScheduler();
} else {
RTC_LOG(LS_ERROR) << "In FrameBuffer with sync decode trial, but "
"no DecodeSynchronizer was present!";
// Crash in debug, but in production use the task queue scheduler.
RTC_DCHECK_NOTREACHED();
scheduler = std::make_unique<TaskQueueFrameDecodeScheduler>(
clock, worker_queue);
}
return std::make_unique<FrameBuffer3Proxy>(
clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler));
}
case FrameBufferArm::kFrameBuffer2:
ABSL_FALLTHROUGH_INTENDED;
default:
return std::make_unique<FrameBuffer2Proxy>(
clock, timing, stats_proxy, decode_queue, receiver,
max_wait_for_keyframe, max_wait_for_frame);
}
}
} // namespace webrtc

View File

@ -13,12 +13,15 @@
#include <memory>
#include "api/metronome/metronome.h"
#include "api/task_queue/task_queue_base.h"
#include "api/video/encoded_frame.h"
#include "modules/video_coding/include/video_coding_defines.h"
#include "modules/video_coding/timing.h"
#include "rtc_base/task_queue.h"
#include "system_wrappers/include/clock.h"
#include "video/decode_synchronizer.h"
namespace webrtc {
class FrameSchedulingReceiver {
@ -43,7 +46,8 @@ class FrameBufferProxy {
rtc::TaskQueue* decode_queue,
FrameSchedulingReceiver* receiver,
TimeDelta max_wait_for_keyframe,
TimeDelta max_wait_for_frame);
TimeDelta max_wait_for_frame,
DecodeSynchronizer* decode_sync);
virtual ~FrameBufferProxy() = default;
// Run on the worker thread.

View File

@ -20,6 +20,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "api/metronome/test/fake_metronome.h"
#include "api/units/frequency.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@ -32,6 +33,7 @@
#include "test/gtest.h"
#include "test/run_loop.h"
#include "test/time_controller/simulated_time_controller.h"
#include "video/decode_synchronizer.h"
using ::testing::_;
using ::testing::AllOf;
@ -209,6 +211,9 @@ class FrameBufferProxyTest : public ::testing::TestWithParam<std::string>,
decode_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"decode_queue",
TaskQueueFactory::Priority::NORMAL)),
fake_metronome_(time_controller_.GetTaskQueueFactory(),
TimeDelta::Millis(16)),
decode_sync_(clock_, &fake_metronome_, run_loop_.task_queue()),
timing_(clock_),
proxy_(FrameBufferProxy::CreateFromFieldTrial(clock_,
run_loop_.task_queue(),
@ -217,7 +222,8 @@ class FrameBufferProxyTest : public ::testing::TestWithParam<std::string>,
&decode_queue_,
this,
kMaxWaitForKeyframe,
kMaxWaitForFrame)) {
kMaxWaitForFrame,
&decode_sync_)) {
// Avoid starting with negative render times.
timing_.set_min_playout_delay(10);
@ -230,6 +236,8 @@ class FrameBufferProxyTest : public ::testing::TestWithParam<std::string>,
if (proxy_) {
proxy_->StopOnWorker();
}
fake_metronome_.Stop();
time_controller_.AdvanceTime(TimeDelta::Zero());
}
void OnEncodedFrame(std::unique_ptr<EncodedFrame> frame) override {
@ -291,7 +299,10 @@ class FrameBufferProxyTest : public ::testing::TestWithParam<std::string>,
Clock* const clock_;
test::RunLoop run_loop_;
rtc::TaskQueue decode_queue_;
test::FakeMetronome fake_metronome_;
DecodeSynchronizer decode_sync_;
VCMTiming timing_;
::testing::NiceMock<VCMReceiveStatisticsCallbackMock> stats_callback_;
std::unique_ptr<FrameBufferProxy> proxy_;
@ -544,7 +555,7 @@ TEST_P(FrameBufferProxyTest, NewFrameInsertedWhileWaitingToReleaseFrame) {
proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build());
EXPECT_THAT(WaitForFrameOrTimeout(TimeDelta::Zero()), Frame(WithId(0)));
time_controller_.AdvanceTime(kFps30Delay);
time_controller_.AdvanceTime(kFps30Delay / 2);
proxy_->InsertFrame(
Builder().Id(1).Time(kFps30Rtp).Refs({0}).AsLast().Build());
StartNextDecode();
@ -598,6 +609,8 @@ TEST_P(FrameBufferProxyTest, SameFrameNotScheduledTwice) {
StartNextDecode();
EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(33)));
StartNextDecode();
EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut());
EXPECT_EQ(dropped_frames(), 1);
}
@ -617,6 +630,10 @@ TEST_P(FrameBufferProxyTest, TestStatsCallback) {
time_controller_.AdvanceTime(TimeDelta::Zero());
}
// Note: This test takes a long time to run if the fake metronome is active.
// Since the test needs to wait for the timestamp to rollover, it has a fake
// delay of around 6.5 hours. Even though time is simulated, this will be
// around 1,500,000 metronome tick invocations.
TEST_P(FrameBufferProxyTest, NextFrameWithOldTimestamp) {
// Test inserting 31 frames and pause the stream for a long time before
// frame 32.
@ -668,16 +685,19 @@ TEST_P(FrameBufferProxyTest, NextFrameWithOldTimestamp) {
.AsLast()
.Build());
// FrameBuffer2 drops the frame, while FrameBuffer3 will continue the stream.
if (field_trial::IsEnabled("WebRTC-FrameBuffer3")) {
if (field_trial::FindFullName("WebRTC-FrameBuffer3")
.find("arm:FrameBuffer2") == std::string::npos) {
EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(2)));
} else {
EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut());
}
}
INSTANTIATE_TEST_SUITE_P(FrameBufferProxy,
FrameBufferProxyTest,
::testing::Values("WebRTC-FrameBuffer3/Disabled/",
"WebRTC-FrameBuffer3/Enabled/"));
INSTANTIATE_TEST_SUITE_P(
FrameBufferProxy,
FrameBufferProxyTest,
::testing::Values("WebRTC-FrameBuffer3/arm:FrameBuffer2/",
"WebRTC-FrameBuffer3/arm:FrameBuffer3/",
"WebRTC-FrameBuffer3/arm:SyncDecoding/"));
} // namespace webrtc

View File

@ -16,10 +16,7 @@
#include <functional>
#include "absl/types/optional.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/timestamp.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "system_wrappers/include/clock.h"
#include "video/frame_decode_timing.h"
namespace webrtc {
@ -30,25 +27,23 @@ class FrameDecodeScheduler {
using FrameReleaseCallback =
std::function<void(uint32_t rtp_timestamp, Timestamp render_time)>;
FrameDecodeScheduler(Clock* clock,
TaskQueueBase* const bookkeeping_queue,
FrameReleaseCallback callback);
~FrameDecodeScheduler();
FrameDecodeScheduler(const FrameDecodeScheduler&) = delete;
FrameDecodeScheduler& operator=(const FrameDecodeScheduler&) = delete;
virtual ~FrameDecodeScheduler() = default;
absl::optional<uint32_t> scheduled_rtp() const { return scheduled_rtp_; }
// Returns the rtp timestamp of the next frame scheduled for release, or
// `nullopt` if no frame is currently scheduled.
virtual absl::optional<uint32_t> ScheduledRtpTimestamp() = 0;
void ScheduleFrame(uint32_t rtp, FrameDecodeTiming::FrameSchedule schedule);
void CancelOutstanding();
// Shedules a frame for release based on `schedule`. When released, `callback`
// will be invoked with the `rtp` timestamp of the frame and the `render_time`
virtual void ScheduleFrame(uint32_t rtp,
FrameDecodeTiming::FrameSchedule schedule,
FrameReleaseCallback callback) = 0;
private:
Clock* const clock_;
TaskQueueBase* const bookkeeping_queue_;
const FrameReleaseCallback callback_;
// Cancels all scheduled frames.
virtual void CancelOutstanding() = 0;
absl::optional<uint32_t> scheduled_rtp_;
ScopedTaskSafetyDetached task_safety_;
// Stop() Must be called before destruction.
virtual void Stop() = 0;
};
} // namespace webrtc

View File

@ -15,12 +15,6 @@
namespace webrtc {
namespace {
constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5);
}
FrameDecodeTiming::FrameDecodeTiming(Clock* clock,
webrtc::VCMTiming const* timing)
: clock_(clock), timing_(timing) {
@ -51,7 +45,7 @@ FrameDecodeTiming::OnFrameBufferUpdated(uint32_t next_temporal_unit_rtp,
RTC_DLOG(LS_VERBOSE) << "Selected frame with rtp " << next_temporal_unit_rtp
<< " render time " << render_time.ms()
<< " with a max wait of " << max_wait.ms() << "ms";
return FrameSchedule{.max_decode_time = now + max_wait,
return FrameSchedule{.latest_decode_time = now + max_wait,
.render_time = render_time};
}

View File

@ -29,8 +29,12 @@ class FrameDecodeTiming {
FrameDecodeTiming(const FrameDecodeTiming&) = delete;
FrameDecodeTiming& operator=(const FrameDecodeTiming&) = delete;
// Any frame that has decode delay more than this in the past can be
// fast-forwarded.
static constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5);
struct FrameSchedule {
Timestamp max_decode_time;
Timestamp latest_decode_time;
Timestamp render_time;
};

View File

@ -81,10 +81,11 @@ TEST_F(FrameDecodeTimingTest, ReturnsWaitTimesWhenValid) {
EXPECT_THAT(
frame_decode_scheduler_.OnFrameBufferUpdated(90000, 180000, false),
Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time,
Eq(clock_.CurrentTime() + decode_delay)),
Field(&FrameDecodeTiming::FrameSchedule::render_time,
Eq(render_time)))));
Optional(
AllOf(Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time,
Eq(clock_.CurrentTime() + decode_delay)),
Field(&FrameDecodeTiming::FrameSchedule::render_time,
Eq(render_time)))));
}
TEST_F(FrameDecodeTimingTest, FastForwardsFrameTooFarInThePast) {
@ -102,12 +103,12 @@ TEST_F(FrameDecodeTimingTest, NoFastForwardIfOnlyFrameToDecode) {
const Timestamp render_time = clock_.CurrentTime();
timing_.SetTimes(90000, render_time, decode_delay);
EXPECT_THAT(
frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false),
Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time,
Eq(clock_.CurrentTime() + decode_delay)),
Field(&FrameDecodeTiming::FrameSchedule::render_time,
Eq(render_time)))));
EXPECT_THAT(frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false),
Optional(AllOf(
Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time,
Eq(clock_.CurrentTime() + decode_delay)),
Field(&FrameDecodeTiming::FrameSchedule::render_time,
Eq(render_time)))));
}
} // namespace webrtc

View File

@ -8,57 +8,69 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#include "video/frame_decode_scheduler.h"
#include "video/task_queue_frame_decode_scheduler.h"
#include <algorithm>
#include <utility>
#include "api/sequence_checker.h"
#include "rtc_base/checks.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc {
FrameDecodeScheduler::FrameDecodeScheduler(
TaskQueueFrameDecodeScheduler::TaskQueueFrameDecodeScheduler(
Clock* clock,
TaskQueueBase* const bookkeeping_queue,
FrameReleaseCallback callback)
: clock_(clock),
bookkeeping_queue_(bookkeeping_queue),
callback_(std::move(callback)) {
TaskQueueBase* const bookkeeping_queue)
: clock_(clock), bookkeeping_queue_(bookkeeping_queue) {
RTC_DCHECK(clock_);
RTC_DCHECK(bookkeeping_queue_);
}
FrameDecodeScheduler::~FrameDecodeScheduler() {
TaskQueueFrameDecodeScheduler::~TaskQueueFrameDecodeScheduler() {
RTC_DCHECK(stopped_);
RTC_DCHECK(!scheduled_rtp_) << "Outstanding scheduled rtp=" << *scheduled_rtp_
<< ". Call CancelOutstanding before destruction.";
}
void FrameDecodeScheduler::ScheduleFrame(
void TaskQueueFrameDecodeScheduler::ScheduleFrame(
uint32_t rtp,
FrameDecodeTiming::FrameSchedule schedule) {
FrameDecodeTiming::FrameSchedule schedule,
FrameReleaseCallback cb) {
RTC_DCHECK(!stopped_) << "Can not schedule frames after stopped.";
RTC_DCHECK(!scheduled_rtp_.has_value())
<< "Can not schedule two frames for release at the same time.";
RTC_DCHECK(cb);
scheduled_rtp_ = rtp;
TimeDelta wait = std::max(TimeDelta::Zero(),
schedule.max_decode_time - clock_->CurrentTime());
TimeDelta wait = std::max(
TimeDelta::Zero(), schedule.latest_decode_time - clock_->CurrentTime());
bookkeeping_queue_->PostDelayedTask(
ToQueuedTask(task_safety_.flag(),
[this, rtp, schedule] {
[this, rtp, schedule, cb = std::move(cb)] {
RTC_DCHECK_RUN_ON(bookkeeping_queue_);
// If the next frame rtp has changed since this task was
// this scheduled release should be skipped.
if (scheduled_rtp_ != rtp)
return;
scheduled_rtp_ = absl::nullopt;
callback_(rtp, schedule.render_time);
cb(rtp, schedule.render_time);
}),
wait.ms());
}
void FrameDecodeScheduler::CancelOutstanding() {
void TaskQueueFrameDecodeScheduler::CancelOutstanding() {
scheduled_rtp_ = absl::nullopt;
}
absl::optional<uint32_t>
TaskQueueFrameDecodeScheduler::ScheduledRtpTimestamp() {
return scheduled_rtp_;
}
void TaskQueueFrameDecodeScheduler::Stop() {
CancelOutstanding();
stopped_ = true;
}
} // namespace webrtc

View File

@ -0,0 +1,48 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
#define VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
#include "video/frame_decode_scheduler.h"
namespace webrtc {
// An implementation of FrameDecodeScheduler that is based on TaskQueues. This
// is the default implementation for general use.
class TaskQueueFrameDecodeScheduler : public FrameDecodeScheduler {
public:
TaskQueueFrameDecodeScheduler(Clock* clock,
TaskQueueBase* const bookkeeping_queue);
~TaskQueueFrameDecodeScheduler() override;
TaskQueueFrameDecodeScheduler(const TaskQueueFrameDecodeScheduler&) = delete;
TaskQueueFrameDecodeScheduler& operator=(
const TaskQueueFrameDecodeScheduler&) = delete;
// FrameDecodeScheduler implementation.
absl::optional<uint32_t> ScheduledRtpTimestamp() override;
void ScheduleFrame(uint32_t rtp,
FrameDecodeTiming::FrameSchedule schedule,
FrameReleaseCallback cb) override;
void CancelOutstanding() override;
void Stop() override;
private:
Clock* const clock_;
TaskQueueBase* const bookkeeping_queue_;
absl::optional<uint32_t> scheduled_rtp_;
ScopedTaskSafetyDetached task_safety_;
bool stopped_ = false;
};
} // namespace webrtc
#endif // VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_

View File

@ -8,13 +8,14 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#include "video/frame_decode_scheduler.h"
#include "video/task_queue_frame_decode_scheduler.h"
#include <stddef.h>
#include <memory>
#include <utility>
#include "absl/functional/bind_front.h"
#include "absl/types/optional.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@ -28,29 +29,31 @@ namespace webrtc {
using ::testing::Eq;
using ::testing::Optional;
class FrameDecodeSchedulerTest : public ::testing::Test {
class TaskQueueFrameDecodeSchedulerTest : public ::testing::Test {
public:
FrameDecodeSchedulerTest()
TaskQueueFrameDecodeSchedulerTest()
: time_controller_(Timestamp::Millis(2000)),
task_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"scheduler",
TaskQueueFactory::Priority::NORMAL)),
scheduler_(std::make_unique<FrameDecodeScheduler>(
scheduler_(std::make_unique<TaskQueueFrameDecodeScheduler>(
time_controller_.GetClock(),
task_queue_.Get(),
[this](uint32_t rtp, Timestamp render_time) {
OnFrame(rtp, render_time);
})) {}
task_queue_.Get())) {}
~FrameDecodeSchedulerTest() override {
~TaskQueueFrameDecodeSchedulerTest() override {
if (scheduler_) {
OnQueue([&] {
scheduler_->CancelOutstanding();
scheduler_->Stop();
scheduler_ = nullptr;
});
}
}
void FrameReadyForDecode(uint32_t timestamp, Timestamp render_time) {
last_rtp_ = timestamp;
last_render_time_ = render_time;
}
protected:
template <class Task>
void OnQueue(Task&& t) {
@ -63,23 +66,21 @@ class FrameDecodeSchedulerTest : public ::testing::Test {
std::unique_ptr<FrameDecodeScheduler> scheduler_;
absl::optional<uint32_t> last_rtp_;
absl::optional<Timestamp> last_render_time_;
private:
void OnFrame(uint32_t timestamp, Timestamp render_time) {
last_rtp_ = timestamp;
last_render_time_ = render_time;
}
};
TEST_F(FrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) {
TEST_F(TaskQueueFrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) {
constexpr TimeDelta decode_delay = TimeDelta::Millis(5);
const Timestamp now = time_controller_.GetClock()->CurrentTime();
const uint32_t rtp = 90000;
const Timestamp render_time = now + TimeDelta::Millis(15);
FrameDecodeTiming::FrameSchedule schedule = {
.latest_decode_time = now + decode_delay, .render_time = render_time};
OnQueue([&] {
scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay,
.render_time = render_time});
EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
scheduler_->ScheduleFrame(
rtp, schedule,
absl::bind_front(
&TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
});
EXPECT_THAT(last_rtp_, Eq(absl::nullopt));
@ -88,35 +89,43 @@ TEST_F(FrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) {
EXPECT_THAT(last_render_time_, Optional(render_time));
}
TEST_F(FrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) {
TEST_F(TaskQueueFrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) {
constexpr TimeDelta decode_delay = TimeDelta::Millis(-5);
const Timestamp now = time_controller_.GetClock()->CurrentTime();
const uint32_t rtp = 90000;
const Timestamp render_time = now + TimeDelta::Millis(15);
FrameDecodeTiming::FrameSchedule schedule = {
.latest_decode_time = now + decode_delay, .render_time = render_time};
OnQueue([&] {
scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay,
.render_time = render_time});
EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
scheduler_->ScheduleFrame(
rtp, schedule,
absl::bind_front(
&TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
});
EXPECT_THAT(last_rtp_, Optional(rtp));
EXPECT_THAT(last_render_time_, Optional(render_time));
}
TEST_F(FrameDecodeSchedulerTest, CancelOutstanding) {
TEST_F(TaskQueueFrameDecodeSchedulerTest, CancelOutstanding) {
constexpr TimeDelta decode_delay = TimeDelta::Millis(50);
const Timestamp now = time_controller_.GetClock()->CurrentTime();
const uint32_t rtp = 90000;
FrameDecodeTiming::FrameSchedule schedule = {
.latest_decode_time = now + decode_delay,
.render_time = now + TimeDelta::Millis(75)};
OnQueue([&] {
scheduler_->ScheduleFrame(rtp,
{.max_decode_time = now + decode_delay,
.render_time = now + TimeDelta::Millis(75)});
EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
scheduler_->ScheduleFrame(
rtp, schedule,
absl::bind_front(
&TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
});
time_controller_.AdvanceTime(decode_delay / 2);
OnQueue([&] {
EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
scheduler_->CancelOutstanding();
EXPECT_THAT(scheduler_->scheduled_rtp(), Eq(absl::nullopt));
EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Eq(absl::nullopt));
});
time_controller_.AdvanceTime(decode_delay / 2);
EXPECT_THAT(last_rtp_, Eq(absl::nullopt));

View File

@ -272,7 +272,7 @@ VideoReceiveStream2::VideoReceiveStream2(
frame_buffer_ = FrameBufferProxy::CreateFromFieldTrial(
clock_, call_->worker_thread(), timing_.get(), &stats_proxy_,
&decode_queue_, this, TimeDelta::Millis(max_wait_for_keyframe_ms_),
TimeDelta::Millis(max_wait_for_frame_ms_));
TimeDelta::Millis(max_wait_for_frame_ms_), nullptr);
if (config_.rtp.rtx_ssrc) {
rtx_receive_stream_ = std::make_unique<RtxReceiveStream>(