From 9a478b527de789c933ec8b2521033614be4e9d47 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Thu, 18 Nov 2021 16:07:01 +0100 Subject: [PATCH] VideoStreamEncoder: expect frame entry on the encoder queue. This change switches the sequence used by the FrameCadenceAdapter to be the encoder_queue, enabling VideoStreamEncoder::OnFrame to be invoked directly on the encoder_queue and eliminates the contained PostTasks. Bug: chromium:1255737 Change-Id: Ib86fc96ad2be9a38585fef2535855e3f9cc7e57c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/238171 Commit-Queue: Markus Handell Reviewed-by: Ilya Nikolaevskiy Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/main@{#35380} --- video/BUILD.gn | 1 + video/frame_cadence_adapter.cc | 72 ++++++++----- video/frame_cadence_adapter.h | 28 +++-- video/frame_cadence_adapter_unittest.cc | 43 +++++--- video/video_send_stream.cc | 38 +++++-- video/video_stream_encoder.cc | 120 ++++++++++------------ video/video_stream_encoder.h | 25 +++-- video/video_stream_encoder_unittest.cc | 130 ++++++++++-------------- 8 files changed, 248 insertions(+), 209 deletions(-) diff --git a/video/BUILD.gn b/video/BUILD.gn index 482169902b..84b2d98f19 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -275,6 +275,7 @@ rtc_library("frame_cadence_adapter") { "../rtc_base/synchronization:mutex", "../rtc_base/task_utils:pending_task_safety_flag", "../rtc_base/task_utils:to_queued_task", + "../system_wrappers", "../system_wrappers:field_trial", "../system_wrappers:metrics", ] diff --git a/video/frame_cadence_adapter.cc b/video/frame_cadence_adapter.cc index 030814bddc..c82ab5a445 100644 --- a/video/frame_cadence_adapter.cc +++ b/video/frame_cadence_adapter.cc @@ -10,6 +10,7 @@ #include "video/frame_cadence_adapter.h" +#include #include #include @@ -20,6 +21,7 @@ #include "rtc_base/synchronization/mutex.h" #include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/task_utils/to_queued_task.h" +#include "system_wrappers/include/clock.h" #include "system_wrappers/include/field_trial.h" #include "system_wrappers/include/metrics.h" @@ -28,7 +30,7 @@ namespace { class FrameCadenceAdapterImpl : public FrameCadenceAdapterInterface { public: - explicit FrameCadenceAdapterImpl(TaskQueueBase* worker_queue); + FrameCadenceAdapterImpl(Clock* clock, TaskQueueBase* queue); // FrameCadenceAdapterInterface overrides. void Initialize(Callback* callback) override; @@ -42,12 +44,15 @@ class FrameCadenceAdapterImpl : public FrameCadenceAdapterInterface { private: // Called from OnFrame in zero-hertz mode. - void OnFrameOnMainQueue(const VideoFrame& frame) RTC_RUN_ON(worker_queue_); + void OnFrameOnMainQueue(Timestamp post_time, + int frames_scheduled_for_processing, + const VideoFrame& frame) RTC_RUN_ON(queue_); // Called to report on constraint UMAs. - void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&worker_queue_); + void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&queue_); - TaskQueueBase* const worker_queue_; + Clock* const clock_; + TaskQueueBase* const queue_; // True if we support frame entry for screenshare with a minimum frequency of // 0 Hz. @@ -58,34 +63,36 @@ class FrameCadenceAdapterImpl : public FrameCadenceAdapterInterface { // The source's constraints. absl::optional source_constraints_ - RTC_GUARDED_BY(worker_queue_); + RTC_GUARDED_BY(queue_); // Whether zero-hertz and UMA reporting is enabled. - bool zero_hertz_and_uma_reporting_enabled_ RTC_GUARDED_BY(worker_queue_) = - false; + bool zero_hertz_and_uma_reporting_enabled_ RTC_GUARDED_BY(queue_) = false; // Race checker for incoming frames. This is the network thread in chromium, // but may vary from test contexts. rtc::RaceChecker incoming_frame_race_checker_; - bool has_reported_screenshare_frame_rate_umas_ RTC_GUARDED_BY(worker_queue_) = - false; + bool has_reported_screenshare_frame_rate_umas_ RTC_GUARDED_BY(queue_) = false; - ScopedTaskSafety safety_; + // Number of frames that are currently scheduled for processing on the + // |queue_|. + std::atomic frames_scheduled_for_processing_{0}; + + ScopedTaskSafetyDetached safety_; }; -FrameCadenceAdapterImpl::FrameCadenceAdapterImpl(TaskQueueBase* worker_queue) - : worker_queue_(worker_queue), +FrameCadenceAdapterImpl::FrameCadenceAdapterImpl(Clock* clock, + TaskQueueBase* queue) + : clock_(clock), + queue_(queue), zero_hertz_screenshare_enabled_( - field_trial::IsEnabled("WebRTC-ZeroHertzScreenshare")) { - RTC_DCHECK_RUN_ON(worker_queue_); -} + field_trial::IsEnabled("WebRTC-ZeroHertzScreenshare")) {} void FrameCadenceAdapterImpl::Initialize(Callback* callback) { callback_ = callback; } void FrameCadenceAdapterImpl::SetZeroHertzModeEnabled(bool enabled) { - RTC_DCHECK_RUN_ON(worker_queue_); + RTC_DCHECK_RUN_ON(queue_); if (enabled && !zero_hertz_and_uma_reporting_enabled_) has_reported_screenshare_frame_rate_umas_ = false; zero_hertz_and_uma_reporting_enabled_ = enabled; @@ -95,9 +102,17 @@ void FrameCadenceAdapterImpl::OnFrame(const VideoFrame& frame) { // This method is called on the network thread under Chromium, or other // various contexts in test. RTC_DCHECK_RUNS_SERIALIZED(&incoming_frame_race_checker_); - worker_queue_->PostTask(ToQueuedTask(safety_, [this, frame] { - RTC_DCHECK_RUN_ON(worker_queue_); - OnFrameOnMainQueue(std::move(frame)); + + // Local time in webrtc time base. + Timestamp post_time = clock_->CurrentTime(); + frames_scheduled_for_processing_.fetch_add(1, std::memory_order_relaxed); + queue_->PostTask(ToQueuedTask(safety_.flag(), [this, post_time, frame] { + RTC_DCHECK_RUN_ON(queue_); + const int frames_scheduled_for_processing = + frames_scheduled_for_processing_.fetch_sub(1, + std::memory_order_relaxed); + OnFrameOnMainQueue(post_time, frames_scheduled_for_processing, + std::move(frame)); MaybeReportFrameRateConstraintUmas(); })); } @@ -107,18 +122,21 @@ void FrameCadenceAdapterImpl::OnConstraintsChanged( RTC_LOG(LS_INFO) << __func__ << " min_fps " << constraints.min_fps.value_or(-1) << " max_fps " << constraints.max_fps.value_or(-1); - worker_queue_->PostTask(ToQueuedTask(safety_, [this, constraints] { - RTC_DCHECK_RUN_ON(worker_queue_); + queue_->PostTask(ToQueuedTask(safety_.flag(), [this, constraints] { + RTC_DCHECK_RUN_ON(queue_); source_constraints_ = constraints; })); } -// RTC_RUN_ON(worker_queue_) -void FrameCadenceAdapterImpl::OnFrameOnMainQueue(const VideoFrame& frame) { - callback_->OnFrame(frame); +// RTC_RUN_ON(queue_) +void FrameCadenceAdapterImpl::OnFrameOnMainQueue( + Timestamp post_time, + int frames_scheduled_for_processing, + const VideoFrame& frame) { + callback_->OnFrame(post_time, frames_scheduled_for_processing, frame); } -// RTC_RUN_ON(worker_queue_) +// RTC_RUN_ON(queue_) void FrameCadenceAdapterImpl::MaybeReportFrameRateConstraintUmas() { if (has_reported_screenshare_frame_rate_umas_) return; @@ -175,8 +193,8 @@ void FrameCadenceAdapterImpl::MaybeReportFrameRateConstraintUmas() { } // namespace std::unique_ptr -FrameCadenceAdapterInterface::Create(TaskQueueBase* worker_queue) { - return std::make_unique(worker_queue); +FrameCadenceAdapterInterface::Create(Clock* clock, TaskQueueBase* queue) { + return std::make_unique(clock, queue); } } // namespace webrtc diff --git a/video/frame_cadence_adapter.h b/video/frame_cadence_adapter.h index ca702cf875..beb73963ea 100644 --- a/video/frame_cadence_adapter.h +++ b/video/frame_cadence_adapter.h @@ -18,13 +18,14 @@ #include "api/video/video_sink_interface.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/thread_annotations.h" +#include "system_wrappers/include/clock.h" namespace webrtc { // A sink adapter implementing mutations to the received frame cadence. -// With the exception of construction & destruction which has to happen on the -// same sequence, this class is thread-safe because three different execution -// contexts call into it. +// With the exception of the constructor and the methods overridden in +// VideoSinkInterface, the rest of the interface to this class (including dtor) +// needs to happen on the queue passed in Create. class FrameCadenceAdapterInterface : public rtc::VideoSinkInterface { public: @@ -33,8 +34,20 @@ class FrameCadenceAdapterInterface public: virtual ~Callback() = default; - // Called when a frame arrives. - virtual void OnFrame(const VideoFrame& frame) = 0; + // Called when a frame arrives on the |queue| specified in Create. + // + // The |post_time| parameter indicates the current time sampled when + // FrameCadenceAdapterInterface::OnFrame was called. + // + // |frames_scheduled_for_processing| indicates how many frames that have + // been scheduled for processing. During sequential conditions where + // FrameCadenceAdapterInterface::OnFrame is invoked and subsequently ending + // up in this callback, this value will read 1. Otherwise if the + // |queue| gets stalled for some reason, the value will increase + // beyond 1. + virtual void OnFrame(Timestamp post_time, + int frames_scheduled_for_processing, + const VideoFrame& frame) = 0; // Called when the source has discarded a frame. virtual void OnDiscardedFrame() = 0; @@ -42,8 +55,11 @@ class FrameCadenceAdapterInterface // Factory function creating a production instance. Deletion of the returned // instance needs to happen on the same sequence that Create() was called on. + // Frames arriving in FrameCadenceAdapterInterface::OnFrame are posted to + // Callback::OnFrame on the |queue|. static std::unique_ptr Create( - TaskQueueBase* worker_queue); + Clock* clock, + TaskQueueBase* queue); // Call before using the rest of the API. virtual void Initialize(Callback* callback) = 0; diff --git a/video/frame_cadence_adapter_unittest.cc b/video/frame_cadence_adapter_unittest.cc index 56fa220b30..a6b6a87f0e 100644 --- a/video/frame_cadence_adapter_unittest.cc +++ b/video/frame_cadence_adapter_unittest.cc @@ -26,6 +26,7 @@ namespace webrtc { namespace { +using ::testing::_; using ::testing::ElementsAre; using ::testing::Mock; using ::testing::Pair; @@ -39,13 +40,13 @@ VideoFrame CreateFrame() { .build(); } -std::unique_ptr CreateAdapter() { - return FrameCadenceAdapterInterface::Create(TaskQueueBase::Current()); +std::unique_ptr CreateAdapter(Clock* clock) { + return FrameCadenceAdapterInterface::Create(clock, TaskQueueBase::Current()); } class MockCallback : public FrameCadenceAdapterInterface::Callback { public: - MOCK_METHOD(void, OnFrame, (const VideoFrame&), (override)); + MOCK_METHOD(void, OnFrame, (Timestamp, int, const VideoFrame&), (override)); MOCK_METHOD(void, OnDiscardedFrame, (), (override)); }; @@ -61,7 +62,7 @@ TEST(FrameCadenceAdapterTest, auto disabler = std::make_unique(); for (int i = 0; i != 2; i++) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller.GetClock()); adapter->Initialize(&callback); VideoFrame frame = CreateFrame(); EXPECT_CALL(callback, OnFrame).Times(1); @@ -76,6 +77,22 @@ TEST(FrameCadenceAdapterTest, } } +TEST(FrameCadenceAdapterTest, CountsOutstandingFramesToProcess) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1)); + MockCallback callback; + auto adapter = CreateAdapter(time_controller.GetClock()); + adapter->Initialize(&callback); + EXPECT_CALL(callback, OnFrame(_, 2, _)).Times(1); + EXPECT_CALL(callback, OnFrame(_, 1, _)).Times(1); + auto frame = CreateFrame(); + adapter->OnFrame(frame); + adapter->OnFrame(frame); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_CALL(callback, OnFrame(_, 1, _)).Times(1); + adapter->OnFrame(frame); + time_controller.AdvanceTime(TimeDelta::Zero()); +} + class FrameCadenceAdapterMetricsTest : public ::testing::Test { public: FrameCadenceAdapterMetricsTest() : time_controller_(Timestamp::Millis(1)) { @@ -83,13 +100,13 @@ class FrameCadenceAdapterMetricsTest : public ::testing::Test { } void DepleteTaskQueues() { time_controller_.AdvanceTime(TimeDelta::Zero()); } - private: + protected: GlobalSimulatedTimeController time_controller_; }; TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithNoFrameTransfer) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(nullptr); adapter->Initialize(&callback); adapter->OnConstraintsChanged( VideoTrackSourceConstraints{absl::nullopt, absl::nullopt}); @@ -129,7 +146,7 @@ TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithNoFrameTransfer) { TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithoutEnabledContentType) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller_.GetClock()); adapter->Initialize(&callback); adapter->OnFrame(CreateFrame()); adapter->OnConstraintsChanged( @@ -170,7 +187,7 @@ TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithoutEnabledContentType) { TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoConstraintsIfUnsetOnFrame) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller_.GetClock()); adapter->Initialize(&callback); adapter->SetZeroHertzModeEnabled(true); adapter->OnFrame(CreateFrame()); @@ -182,7 +199,7 @@ TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoConstraintsIfUnsetOnFrame) { TEST_F(FrameCadenceAdapterMetricsTest, RecordsEmptyConstraintsIfSetOnFrame) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller_.GetClock()); adapter->Initialize(&callback); adapter->SetZeroHertzModeEnabled(true); adapter->OnConstraintsChanged( @@ -221,7 +238,7 @@ TEST_F(FrameCadenceAdapterMetricsTest, RecordsEmptyConstraintsIfSetOnFrame) { TEST_F(FrameCadenceAdapterMetricsTest, RecordsMaxConstraintIfSetOnFrame) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller_.GetClock()); adapter->Initialize(&callback); adapter->SetZeroHertzModeEnabled(true); adapter->OnConstraintsChanged( @@ -257,7 +274,7 @@ TEST_F(FrameCadenceAdapterMetricsTest, RecordsMaxConstraintIfSetOnFrame) { TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinConstraintIfSetOnFrame) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller_.GetClock()); adapter->Initialize(&callback); adapter->SetZeroHertzModeEnabled(true); adapter->OnConstraintsChanged( @@ -293,7 +310,7 @@ TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinConstraintIfSetOnFrame) { TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinGtMaxConstraintIfSetOnFrame) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller_.GetClock()); adapter->Initialize(&callback); adapter->SetZeroHertzModeEnabled(true); adapter->OnConstraintsChanged(VideoTrackSourceConstraints{5.0, 4.0}); @@ -328,7 +345,7 @@ TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinGtMaxConstraintIfSetOnFrame) { TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinLtMaxConstraintIfSetOnFrame) { MockCallback callback; - auto adapter = CreateAdapter(); + auto adapter = CreateAdapter(time_controller_.GetClock()); adapter->Initialize(&callback); adapter->SetZeroHertzModeEnabled(true); adapter->OnConstraintsChanged(VideoTrackSourceConstraints{4.0, 5.0}); diff --git a/video/video_send_stream.cc b/video/video_send_stream.cc index 05708d0f18..e78211be36 100644 --- a/video/video_send_stream.cc +++ b/video/video_send_stream.cc @@ -12,6 +12,7 @@ #include #include "api/array_view.h" +#include "api/task_queue/task_queue_base.h" #include "api/video/video_stream_encoder_settings.h" #include "modules/rtp_rtcp/include/rtp_header_extension_map.h" #include "modules/rtp_rtcp/source/rtp_header_extension_size.h" @@ -106,6 +107,25 @@ RtpSenderObservers CreateObservers(RtcpRttStats* call_stats, return observers; } +std::unique_ptr CreateVideoStreamEncoder( + Clock* clock, + int num_cpu_cores, + TaskQueueFactory* task_queue_factory, + SendStatisticsProxy* stats_proxy, + const VideoStreamEncoderSettings& encoder_settings, + VideoStreamEncoder::BitrateAllocationCallbackType + bitrate_allocation_callback_type) { + std::unique_ptr encoder_queue = + task_queue_factory->CreateTaskQueue("EncoderQueue", + TaskQueueFactory::Priority::NORMAL); + TaskQueueBase* encoder_queue_ptr = encoder_queue.get(); + return std::make_unique( + clock, num_cpu_cores, stats_proxy, encoder_settings, + std::make_unique(stats_proxy), + FrameCadenceAdapterInterface::Create(clock, encoder_queue_ptr), + std::move(encoder_queue), bitrate_allocation_callback_type); +} + } // namespace namespace internal { @@ -130,17 +150,13 @@ VideoSendStream::VideoSendStream( stats_proxy_(clock, config, encoder_config.content_type), config_(std::move(config)), content_type_(encoder_config.content_type), - video_stream_encoder_(std::make_unique( - clock, - num_cpu_cores, - &stats_proxy_, - config_.encoder_settings, - std::make_unique(&stats_proxy_), - FrameCadenceAdapterInterface::Create( - /*worker_queue=*/TaskQueueBase::Current()), - task_queue_factory, - /*worker_queue=*/TaskQueueBase::Current(), - GetBitrateAllocationCallbackType(config_))), + video_stream_encoder_( + CreateVideoStreamEncoder(clock, + num_cpu_cores, + task_queue_factory, + &stats_proxy_, + config_.encoder_settings, + GetBitrateAllocationCallbackType(config_))), encoder_feedback_( clock, config_.rtp.ssrcs, diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc index c6870a0ade..1c0de4bc06 100644 --- a/video/video_stream_encoder.cc +++ b/video/video_stream_encoder.cc @@ -593,8 +593,8 @@ VideoStreamEncoder::VideoStreamEncoder( const VideoStreamEncoderSettings& settings, std::unique_ptr overuse_detector, std::unique_ptr frame_cadence_adapter, - TaskQueueFactory* task_queue_factory, - TaskQueueBase* worker_queue, + std::unique_ptr + encoder_queue, BitrateAllocationCallbackType allocation_cb_type) : worker_queue_(TaskQueueBase::Current()), number_of_cores_(number_of_cores), @@ -618,7 +618,6 @@ VideoStreamEncoder::VideoStreamEncoder( was_encode_called_since_last_initialization_(false), encoder_failed_(false), clock_(clock), - posted_frames_waiting_for_encode_(0), last_captured_timestamp_(0), delta_ntp_internal_ms_(clock_->CurrentNtpInMilliseconds() - clock_->TimeInMilliseconds()), @@ -665,9 +664,7 @@ VideoStreamEncoder::VideoStreamEncoder( !field_trial::IsEnabled("WebRTC-DefaultBitrateLimitsKillSwitch")), qp_parsing_allowed_( !field_trial::IsEnabled("WebRTC-QpParsingKillSwitch")), - encoder_queue_(task_queue_factory->CreateTaskQueue( - "EncoderQueue", - TaskQueueFactory::Priority::NORMAL)) { + encoder_queue_(std::move(encoder_queue)) { TRACE_EVENT0("webrtc", "VideoStreamEncoder::VideoStreamEncoder"); RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(encoder_stats_observer); @@ -732,6 +729,7 @@ void VideoStreamEncoder::Stop() { rate_allocator_ = nullptr; ReleaseEncoder(); encoder_ = nullptr; + frame_cadence_adapter_ = nullptr; shutdown_event.Set(); }); shutdown_event.Wait(rtc::Event::kForever); @@ -824,14 +822,14 @@ void VideoStreamEncoder::SetStartBitrate(int start_bitrate_bps) { void VideoStreamEncoder::ConfigureEncoder(VideoEncoderConfig config, size_t max_data_payload_length) { RTC_DCHECK_RUN_ON(worker_queue_); - frame_cadence_adapter_->SetZeroHertzModeEnabled( - config.content_type == VideoEncoderConfig::ContentType::kScreen); encoder_queue_.PostTask( [this, config = std::move(config), max_data_payload_length]() mutable { RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK(sink_); RTC_LOG(LS_INFO) << "ConfigureEncoder requested."; + frame_cadence_adapter_->SetZeroHertzModeEnabled( + config.content_type == VideoEncoderConfig::ContentType::kScreen); pending_encoder_creation_ = (!encoder_ || encoder_config_.video_format != config.video_format || max_data_payload_length_ != max_data_payload_length); @@ -1261,19 +1259,18 @@ void VideoStreamEncoder::OnEncoderSettingsChanged() { degradation_preference_manager_->SetIsScreenshare(is_screenshare); } -void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) { - RTC_DCHECK_RUN_ON(worker_queue_); +void VideoStreamEncoder::OnFrame(Timestamp post_time, + int frames_scheduled_for_processing, + const VideoFrame& video_frame) { + RTC_DCHECK_RUN_ON(&encoder_queue_); VideoFrame incoming_frame = video_frame; - // Local time in webrtc time base. - Timestamp now = clock_->CurrentTime(); - // In some cases, e.g., when the frame from decoder is fed to encoder, // the timestamp may be set to the future. As the encoding pipeline assumes // capture time to be less than present time, we should reset the capture // timestamps here. Otherwise there may be issues with RTP send stream. - if (incoming_frame.timestamp_us() > now.us()) - incoming_frame.set_timestamp_us(now.us()); + if (incoming_frame.timestamp_us() > post_time.us()) + incoming_frame.set_timestamp_us(post_time.us()); // Capture time may come from clock with an offset and drift from clock_. int64_t capture_ntp_time_ms; @@ -1282,7 +1279,7 @@ void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) { } else if (video_frame.render_time_ms() != 0) { capture_ntp_time_ms = video_frame.render_time_ms() + delta_ntp_internal_ms_; } else { - capture_ntp_time_ms = now.ms() + delta_ntp_internal_ms_; + capture_ntp_time_ms = post_time.ms() + delta_ntp_internal_ms_; } incoming_frame.set_ntp_time_ms(capture_ntp_time_ms); @@ -1306,62 +1303,51 @@ void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) { } bool log_stats = false; - if (now.ms() - last_frame_log_ms_ > kFrameLogIntervalMs) { - last_frame_log_ms_ = now.ms(); + if (post_time.ms() - last_frame_log_ms_ > kFrameLogIntervalMs) { + last_frame_log_ms_ = post_time.ms(); log_stats = true; } last_captured_timestamp_ = incoming_frame.ntp_time_ms(); - int64_t post_time_us = clock_->CurrentTime().us(); - ++posted_frames_waiting_for_encode_; - - encoder_queue_.PostTask( - [this, incoming_frame, post_time_us, log_stats]() { - RTC_DCHECK_RUN_ON(&encoder_queue_); - encoder_stats_observer_->OnIncomingFrame(incoming_frame.width(), - incoming_frame.height()); - ++captured_frame_count_; - const int posted_frames_waiting_for_encode = - posted_frames_waiting_for_encode_.fetch_sub(1); - RTC_DCHECK_GT(posted_frames_waiting_for_encode, 0); - CheckForAnimatedContent(incoming_frame, post_time_us); - bool cwnd_frame_drop = - cwnd_frame_drop_interval_ && - (cwnd_frame_counter_++ % cwnd_frame_drop_interval_.value() == 0); - if (posted_frames_waiting_for_encode == 1 && !cwnd_frame_drop) { - MaybeEncodeVideoFrame(incoming_frame, post_time_us); - } else { - if (cwnd_frame_drop) { - // Frame drop by congestion window pushback. Do not encode this - // frame. - ++dropped_frame_cwnd_pushback_count_; - encoder_stats_observer_->OnFrameDropped( - VideoStreamEncoderObserver::DropReason::kCongestionWindow); - } else { - // There is a newer frame in flight. Do not encode this frame. - RTC_LOG(LS_VERBOSE) - << "Incoming frame dropped due to that the encoder is blocked."; - ++dropped_frame_encoder_block_count_; - encoder_stats_observer_->OnFrameDropped( - VideoStreamEncoderObserver::DropReason::kEncoderQueue); - } - accumulated_update_rect_.Union(incoming_frame.update_rect()); - accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect(); - } - if (log_stats) { - RTC_LOG(LS_INFO) << "Number of frames: captured " - << captured_frame_count_ - << ", dropped (due to congestion window pushback) " - << dropped_frame_cwnd_pushback_count_ - << ", dropped (due to encoder blocked) " - << dropped_frame_encoder_block_count_ - << ", interval_ms " << kFrameLogIntervalMs; - captured_frame_count_ = 0; - dropped_frame_cwnd_pushback_count_ = 0; - dropped_frame_encoder_block_count_ = 0; - } - }); + encoder_stats_observer_->OnIncomingFrame(incoming_frame.width(), + incoming_frame.height()); + ++captured_frame_count_; + CheckForAnimatedContent(incoming_frame, post_time.us()); + bool cwnd_frame_drop = + cwnd_frame_drop_interval_ && + (cwnd_frame_counter_++ % cwnd_frame_drop_interval_.value() == 0); + if (frames_scheduled_for_processing == 1 && !cwnd_frame_drop) { + MaybeEncodeVideoFrame(incoming_frame, post_time.us()); + } else { + if (cwnd_frame_drop) { + // Frame drop by congestion window pushback. Do not encode this + // frame. + ++dropped_frame_cwnd_pushback_count_; + encoder_stats_observer_->OnFrameDropped( + VideoStreamEncoderObserver::DropReason::kCongestionWindow); + } else { + // There is a newer frame in flight. Do not encode this frame. + RTC_LOG(LS_VERBOSE) + << "Incoming frame dropped due to that the encoder is blocked."; + ++dropped_frame_encoder_block_count_; + encoder_stats_observer_->OnFrameDropped( + VideoStreamEncoderObserver::DropReason::kEncoderQueue); + } + accumulated_update_rect_.Union(incoming_frame.update_rect()); + accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect(); + } + if (log_stats) { + RTC_LOG(LS_INFO) << "Number of frames: captured " << captured_frame_count_ + << ", dropped (due to congestion window pushback) " + << dropped_frame_cwnd_pushback_count_ + << ", dropped (due to encoder blocked) " + << dropped_frame_encoder_block_count_ << ", interval_ms " + << kFrameLogIntervalMs; + captured_frame_count_ = 0; + dropped_frame_cwnd_pushback_count_ = 0; + dropped_frame_encoder_block_count_ = 0; + } } void VideoStreamEncoder::OnDiscardedFrame() { diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h index e0eda70c40..4231f1ba06 100644 --- a/video/video_stream_encoder.h +++ b/video/video_stream_encoder.h @@ -77,8 +77,8 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface, const VideoStreamEncoderSettings& settings, std::unique_ptr overuse_detector, std::unique_ptr frame_cadence_adapter, - TaskQueueFactory* task_queue_factory, - TaskQueueBase* worker_queue, + std::unique_ptr + encoder_queue, BitrateAllocationCallbackType allocation_cb_type); ~VideoStreamEncoder() override; @@ -146,8 +146,11 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface, explicit CadenceCallback(VideoStreamEncoder& video_stream_encoder) : video_stream_encoder_(video_stream_encoder) {} // FrameCadenceAdapterInterface::Callback overrides. - void OnFrame(const VideoFrame& frame) override { - video_stream_encoder_.OnFrame(frame); + void OnFrame(Timestamp post_time, + int frames_scheduled_for_processing, + const VideoFrame& frame) override { + video_stream_encoder_.OnFrame(post_time, frames_scheduled_for_processing, + frame); } void OnDiscardedFrame() override { video_stream_encoder_.OnDiscardedFrame(); @@ -192,7 +195,9 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface, void ReconfigureEncoder() RTC_RUN_ON(&encoder_queue_); void OnEncoderSettingsChanged() RTC_RUN_ON(&encoder_queue_); - void OnFrame(const VideoFrame& video_frame); + void OnFrame(Timestamp post_time, + int frames_scheduled_for_processing, + const VideoFrame& video_frame); void OnDiscardedFrame(); void MaybeEncodeVideoFrame(const VideoFrame& frame, @@ -260,7 +265,8 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface, CadenceCallback cadence_callback_; // Frame cadence encoder adapter. Frames enter this adapter first, and it then // forwards them to our OnFrame method. - const std::unique_ptr frame_cadence_adapter_; + std::unique_ptr frame_cadence_adapter_ + RTC_GUARDED_BY(&encoder_queue_) RTC_PT_GUARDED_BY(&encoder_queue_); VideoEncoderConfig encoder_config_ RTC_GUARDED_BY(&encoder_queue_); std::unique_ptr encoder_ RTC_GUARDED_BY(&encoder_queue_) @@ -296,13 +302,12 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface, bool encoder_failed_ RTC_GUARDED_BY(&encoder_queue_); Clock* const clock_; - std::atomic posted_frames_waiting_for_encode_; // Used to make sure incoming time stamp is increasing for every frame. - int64_t last_captured_timestamp_ RTC_GUARDED_BY(worker_queue_); + int64_t last_captured_timestamp_ RTC_GUARDED_BY(&encoder_queue_); // Delta used for translating between NTP and internal timestamps. - const int64_t delta_ntp_internal_ms_ RTC_GUARDED_BY(worker_queue_); + const int64_t delta_ntp_internal_ms_ RTC_GUARDED_BY(&encoder_queue_); - int64_t last_frame_log_ms_ RTC_GUARDED_BY(worker_queue_); + int64_t last_frame_log_ms_ RTC_GUARDED_BY(&encoder_queue_); int captured_frame_count_ RTC_GUARDED_BY(&encoder_queue_); int dropped_frame_cwnd_pushback_count_ RTC_GUARDED_BY(&encoder_queue_); int dropped_frame_encoder_block_count_ RTC_GUARDED_BY(&encoder_queue_); diff --git a/video/video_stream_encoder_unittest.cc b/video/video_stream_encoder_unittest.cc index ba29d0b692..32d4f94e8c 100644 --- a/video/video_stream_encoder_unittest.cc +++ b/video/video_stream_encoder_unittest.cc @@ -17,6 +17,7 @@ #include #include "absl/memory/memory.h" +#include "api/rtp_parameters.h" #include "api/task_queue/default_task_queue_factory.h" #include "api/task_queue/task_queue_factory.h" #include "api/test/mock_fec_controller_override.h" @@ -346,24 +347,25 @@ auto FpsEqResolutionGt(const rtc::VideoSinkWants& other_wants) { class VideoStreamEncoderUnderTest : public VideoStreamEncoder { public: - VideoStreamEncoderUnderTest(TimeController* time_controller, - TaskQueueFactory* task_queue_factory, - SendStatisticsProxy* stats_proxy, - const VideoStreamEncoderSettings& settings, - VideoStreamEncoder::BitrateAllocationCallbackType - allocation_callback_type) - : VideoStreamEncoder( - time_controller->GetClock(), - 1 /* number_of_cores */, - stats_proxy, - settings, - std::unique_ptr( - overuse_detector_proxy_ = - new CpuOveruseDetectorProxy(stats_proxy)), - FrameCadenceAdapterInterface::Create(TaskQueueBase::Current()), - task_queue_factory, - TaskQueueBase::Current(), - allocation_callback_type), + VideoStreamEncoderUnderTest( + TimeController* time_controller, + std::unique_ptr cadence_adapter, + std::unique_ptr + encoder_queue, + SendStatisticsProxy* stats_proxy, + const VideoStreamEncoderSettings& settings, + VideoStreamEncoder::BitrateAllocationCallbackType + allocation_callback_type) + : VideoStreamEncoder(time_controller->GetClock(), + 1 /* number_of_cores */, + stats_proxy, + settings, + std::unique_ptr( + overuse_detector_proxy_ = + new CpuOveruseDetectorProxy(stats_proxy)), + std::move(cadence_adapter), + std::move(encoder_queue), + allocation_callback_type), time_controller_(time_controller), fake_cpu_resource_(FakeResource::Create("FakeResource[CPU]")), fake_quality_resource_(FakeResource::Create("FakeResource[QP]")), @@ -659,14 +661,17 @@ class SimpleVideoStreamEncoderFactory { /*number_of_cores=*/1, /*stats_proxy=*/stats_proxy_.get(), encoder_settings_, std::make_unique(/*stats_proxy=*/nullptr), - std::move(zero_hertz_adapter), task_queue_factory_.get(), - TaskQueueBase::Current(), + std::move(zero_hertz_adapter), + time_controller_.GetTaskQueueFactory()->CreateTaskQueue( + "EncoderQueue", TaskQueueFactory::Priority::NORMAL), VideoStreamEncoder::BitrateAllocationCallbackType:: kVideoBitrateAllocation); result->SetSink(&sink_, /*rotation_applied=*/false); return result; } + void DepleteTaskQueues() { time_controller_.AdvanceTime(TimeDelta::Zero()); } + private: class NullEncoderSink : public VideoStreamEncoderInterface::EncoderSink { public: @@ -763,9 +768,17 @@ class VideoStreamEncoderTest : public ::testing::Test { kVideoBitrateAllocationWhenScreenSharing) { if (video_stream_encoder_) video_stream_encoder_->Stop(); - video_stream_encoder_.reset(new VideoStreamEncoderUnderTest( - &time_controller_, GetTaskQueueFactory(), stats_proxy_.get(), - video_send_config_.encoder_settings, allocation_callback_type)); + + auto encoder_queue = GetTaskQueueFactory()->CreateTaskQueue( + "EncoderQueue", TaskQueueFactory::Priority::NORMAL); + TaskQueueBase* encoder_queue_ptr = encoder_queue.get(); + std::unique_ptr cadence_adapter = + FrameCadenceAdapterInterface::Create(time_controller_.GetClock(), + encoder_queue_ptr); + video_stream_encoder_ = std::make_unique( + &time_controller_, std::move(cadence_adapter), std::move(encoder_queue), + stats_proxy_.get(), video_send_config_.encoder_settings, + allocation_callback_type); video_stream_encoder_->SetSink(&sink_, /*rotation_applied=*/false); video_stream_encoder_->SetSource( &video_source_, webrtc::DegradationPreference::MAINTAIN_FRAMERATE); @@ -930,11 +943,6 @@ class VideoStreamEncoderTest : public ::testing::Test { RTC_DCHECK(time_controller_); } - void BlockNextEncode() { - MutexLock lock(&local_mutex_); - block_next_encode_ = true; - } - VideoEncoder::EncoderInfo GetEncoderInfo() const override { MutexLock lock(&local_mutex_); EncoderInfo info = FakeEncoder::GetEncoderInfo(); @@ -1108,7 +1116,6 @@ class VideoStreamEncoderTest : public ::testing::Test { private: int32_t Encode(const VideoFrame& input_image, const std::vector* frame_types) override { - bool block_encode; { MutexLock lock(&local_mutex_); if (expect_null_frame_) { @@ -1126,16 +1133,11 @@ class VideoStreamEncoderTest : public ::testing::Test { ntp_time_ms_ = input_image.ntp_time_ms(); last_input_width_ = input_image.width(); last_input_height_ = input_image.height(); - block_encode = block_next_encode_; - block_next_encode_ = false; last_update_rect_ = input_image.update_rect(); last_frame_types_ = *frame_types; last_input_pixel_format_ = input_image.video_frame_buffer()->type(); } int32_t result = FakeEncoder::Encode(input_image, frame_types); - if (block_encode) - EXPECT_TRUE(continue_encode_event_.Wait(kDefaultTimeoutMs)); - return result; } @@ -1212,7 +1214,6 @@ class VideoStreamEncoderTest : public ::testing::Test { kInitializationFailed, kInitialized } initialized_ RTC_GUARDED_BY(local_mutex_) = EncoderState::kUninitialized; - bool block_next_encode_ RTC_GUARDED_BY(local_mutex_) = false; rtc::Event continue_encode_event_; uint32_t timestamp_ RTC_GUARDED_BY(local_mutex_) = 0; int64_t ntp_time_ms_ RTC_GUARDED_BY(local_mutex_) = 0; @@ -1592,20 +1593,10 @@ TEST_F(VideoStreamEncoderTest, DropsFrameAfterStop) { EXPECT_TRUE(frame_destroyed_event.Wait(kDefaultTimeoutMs)); } -class VideoStreamEncoderBlockedTest : public VideoStreamEncoderTest { - public: - VideoStreamEncoderBlockedTest() {} - - TaskQueueFactory* GetTaskQueueFactory() override { - return task_queue_factory_.get(); - } - - private: - std::unique_ptr task_queue_factory_ = - CreateDefaultTaskQueueFactory(); -}; - -TEST_F(VideoStreamEncoderBlockedTest, DropsPendingFramesOnSlowEncode) { +TEST_F(VideoStreamEncoderTest, DropsPendingFramesOnSlowEncode) { + test::FrameForwarder source; + video_stream_encoder_->SetSource(&source, + DegradationPreference::MAINTAIN_FRAMERATE); video_stream_encoder_->OnBitrateUpdatedAndWaitForManagedResources( kTargetBitrate, kTargetBitrate, kTargetBitrate, 0, 0, 0); @@ -1615,18 +1606,10 @@ TEST_F(VideoStreamEncoderBlockedTest, DropsPendingFramesOnSlowEncode) { ++dropped_count; }); - fake_encoder_.BlockNextEncode(); - video_source_.IncomingCapturedFrame(CreateFrame(1, nullptr)); - WaitForEncodedFrame(1); - // Here, the encoder thread will be blocked in the TestEncoder waiting for a - // call to ContinueEncode. - video_source_.IncomingCapturedFrame(CreateFrame(2, nullptr)); - video_source_.IncomingCapturedFrame(CreateFrame(3, nullptr)); - fake_encoder_.ContinueEncode(); - WaitForEncodedFrame(3); - + source.IncomingCapturedFrame(CreateFrame(1, nullptr)); + source.IncomingCapturedFrame(CreateFrame(2, nullptr)); + WaitForEncodedFrame(2); video_stream_encoder_->Stop(); - EXPECT_EQ(1, dropped_count); } @@ -7125,14 +7108,15 @@ TEST_F(VideoStreamEncoderTest, ConfiguresCorrectFrameRate) { video_stream_encoder_->Stop(); } -TEST_F(VideoStreamEncoderBlockedTest, AccumulatesUpdateRectOnDroppedFrames) { +TEST_F(VideoStreamEncoderTest, AccumulatesUpdateRectOnDroppedFrames) { VideoFrame::UpdateRect rect; + test::FrameForwarder source; + video_stream_encoder_->SetSource(&source, + DegradationPreference::MAINTAIN_FRAMERATE); video_stream_encoder_->OnBitrateUpdatedAndWaitForManagedResources( kTargetBitrate, kTargetBitrate, kTargetBitrate, 0, 0, 0); - fake_encoder_.BlockNextEncode(); - video_source_.IncomingCapturedFrame( - CreateFrameWithUpdatedPixel(1, nullptr, 0)); + source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(1, nullptr, 0)); WaitForEncodedFrame(1); // On the very first frame full update should be forced. rect = fake_encoder_.GetLastUpdateRect(); @@ -7140,15 +7124,10 @@ TEST_F(VideoStreamEncoderBlockedTest, AccumulatesUpdateRectOnDroppedFrames) { EXPECT_EQ(rect.offset_y, 0); EXPECT_EQ(rect.height, codec_height_); EXPECT_EQ(rect.width, codec_width_); - // Here, the encoder thread will be blocked in the TestEncoder waiting for a - // call to ContinueEncode. - video_source_.IncomingCapturedFrame( - CreateFrameWithUpdatedPixel(2, nullptr, 1)); - ExpectDroppedFrame(); - video_source_.IncomingCapturedFrame( - CreateFrameWithUpdatedPixel(3, nullptr, 10)); - ExpectDroppedFrame(); - fake_encoder_.ContinueEncode(); + // Frame with NTP timestamp 2 will be dropped due to outstanding frames + // scheduled for processing during encoder queue processing of frame 2. + source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(2, nullptr, 1)); + source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(3, nullptr, 10)); WaitForEncodedFrame(3); // Updates to pixels 1 and 10 should be accumulated to one 10x1 rect. rect = fake_encoder_.GetLastUpdateRect(); @@ -7157,8 +7136,7 @@ TEST_F(VideoStreamEncoderBlockedTest, AccumulatesUpdateRectOnDroppedFrames) { EXPECT_EQ(rect.width, 10); EXPECT_EQ(rect.height, 1); - video_source_.IncomingCapturedFrame( - CreateFrameWithUpdatedPixel(4, nullptr, 0)); + source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(4, nullptr, 0)); WaitForEncodedFrame(4); // Previous frame was encoded, so no accumulation should happen. rect = fake_encoder_.GetLastUpdateRect(); @@ -8737,12 +8715,14 @@ TEST(VideoStreamEncoderFrameCadenceTest, ActivatesFrameCadenceOnContentType) { VideoEncoderConfig config; config.content_type = VideoEncoderConfig::ContentType::kScreen; video_stream_encoder->ConfigureEncoder(std::move(config), 0); + factory.DepleteTaskQueues(); Mock::VerifyAndClearExpectations(adapter_ptr); EXPECT_CALL(*adapter_ptr, SetZeroHertzModeEnabled(false)); VideoEncoderConfig config2; config2.content_type = VideoEncoderConfig::ContentType::kRealtimeVideo; video_stream_encoder->ConfigureEncoder(std::move(config2), 0); + factory.DepleteTaskQueues(); } TEST(VideoStreamEncoderFrameCadenceTest,