diff --git a/video/adaptation/BUILD.gn b/video/adaptation/BUILD.gn index 3269d89ac9..51e6a2d84e 100644 --- a/video/adaptation/BUILD.gn +++ b/video/adaptation/BUILD.gn @@ -59,11 +59,14 @@ if (rtc_include_tests) { defines = [] sources = [ "overuse_frame_detector_unittest.cc", + "quality_scaler_resource_unittest.cc", "video_stream_encoder_resource_manager_unittest.cc", ] deps = [ ":video_adaptation", "../../api:scoped_refptr", + "../../api/task_queue:default_task_queue_factory", + "../../api/task_queue:task_queue", "../../api/video:encoded_image", "../../api/video:video_adaptation", "../../api/video:video_frame_i420", @@ -76,6 +79,7 @@ if (rtc_include_tests) { "../../rtc_base:rtc_base_tests_utils", "../../rtc_base:rtc_event", "../../rtc_base:rtc_numerics", + "../../rtc_base:rtc_task_queue", "../../rtc_base:task_queue_for_test", "../../test:field_trial", "//test:rtc_expect_death", diff --git a/video/adaptation/quality_scaler_resource.cc b/video/adaptation/quality_scaler_resource.cc index 9fcc58e6be..631e5b08fd 100644 --- a/video/adaptation/quality_scaler_resource.cc +++ b/video/adaptation/quality_scaler_resource.cc @@ -21,7 +21,15 @@ QualityScalerResource::QualityScalerResource() encoder_queue_(nullptr), adaptation_processor_(nullptr), quality_scaler_(nullptr), - pending_qp_usage_callback_(nullptr) {} + num_handled_callbacks_(0), + pending_callbacks_(), + processing_in_progress_(false), + clear_qp_samples_(false) {} + +QualityScalerResource::~QualityScalerResource() { + RTC_DCHECK(!quality_scaler_); + RTC_DCHECK(pending_callbacks_.empty()); +} void QualityScalerResource::Initialize(rtc::TaskQueue* encoder_queue) { RTC_DCHECK(!encoder_queue_); @@ -29,8 +37,6 @@ void QualityScalerResource::Initialize(rtc::TaskQueue* encoder_queue) { encoder_queue_ = encoder_queue; } -QualityScalerResource::~QualityScalerResource() {} - void QualityScalerResource::SetAdaptationProcessor( ResourceAdaptationProcessorInterface* adaptation_processor) { RTC_DCHECK_RUN_ON(encoder_queue_); @@ -52,6 +58,9 @@ void QualityScalerResource::StartCheckForOveruse( void QualityScalerResource::StopCheckForOveruse() { RTC_DCHECK_RUN_ON(encoder_queue_); + // Ensure we have no pending callbacks. This makes it safe to destroy the + // QualityScaler and even task queues with tasks in-flight. + AbortPendingCallbacks(); quality_scaler_.reset(); } @@ -101,30 +110,28 @@ void QualityScalerResource::OnFrameDropped( void QualityScalerResource::OnReportQpUsageHigh( rtc::scoped_refptr callback) { RTC_DCHECK_RUN_ON(encoder_queue_); - RTC_DCHECK(!pending_qp_usage_callback_); + size_t callback_id = QueuePendingCallback(callback); // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue, // PostTask the resource usage measurements. - pending_qp_usage_callback_ = std::move(callback); - // If this triggers adaptation, OnAdaptationApplied() is called by the - // processor where we determine if QP should be cleared and we invoke and null - // the |pending_qp_usage_callback_|. + RTC_DCHECK(!processing_in_progress_); + processing_in_progress_ = true; + clear_qp_samples_ = false; + // If this OnResourceUsageStateMeasured() triggers an adaptation, + // OnAdaptationApplied() will occur between this line and the next. This + // allows modifying |clear_qp_samples_| based on the adaptation. OnResourceUsageStateMeasured(ResourceUsageState::kOveruse); - // If |pending_qp_usage_callback_| has not been nulled yet then we did not - // just trigger an adaptation and should not clear the QP samples. - if (pending_qp_usage_callback_) { - pending_qp_usage_callback_->OnQpUsageHandled(false); - pending_qp_usage_callback_ = nullptr; - } + HandlePendingCallback(callback_id, clear_qp_samples_); + processing_in_progress_ = false; } void QualityScalerResource::OnReportQpUsageLow( rtc::scoped_refptr callback) { RTC_DCHECK_RUN_ON(encoder_queue_); - RTC_DCHECK(!pending_qp_usage_callback_); + size_t callback_id = QueuePendingCallback(callback); // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue, // PostTask the resource usage measurements. OnResourceUsageStateMeasured(ResourceUsageState::kUnderuse); - callback->OnQpUsageHandled(true); + HandlePendingCallback(callback_id, true); } void QualityScalerResource::OnAdaptationApplied( @@ -132,13 +139,11 @@ void QualityScalerResource::OnAdaptationApplied( const VideoSourceRestrictions& restrictions_before, const VideoSourceRestrictions& restrictions_after, rtc::scoped_refptr reason_resource) { - // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue, - // ensure that this is running on it instead. RTC_DCHECK_RUN_ON(encoder_queue_); // We only clear QP samples on adaptations triggered by the QualityScaler. - if (!pending_qp_usage_callback_) + if (!processing_in_progress_) return; - bool clear_qp_samples = true; + clear_qp_samples_ = true; // If we're in "balanced" and the frame rate before and after adaptation did // not differ that much, don't clear the QP samples and instead check for QP // again in a short amount of time. This may trigger adapting down again soon. @@ -160,12 +165,45 @@ void QualityScalerResource::OnAdaptationApplied( int fps_diff = input_state.frames_per_second() - restrictions_after.max_frame_rate().value(); if (fps_diff < min_diff.value()) { - clear_qp_samples = false; + clear_qp_samples_ = false; } } } - pending_qp_usage_callback_->OnQpUsageHandled(clear_qp_samples); - pending_qp_usage_callback_ = nullptr; +} + +size_t QualityScalerResource::QueuePendingCallback( + rtc::scoped_refptr callback) { + RTC_DCHECK_RUN_ON(encoder_queue_); + pending_callbacks_.push(callback); + // The ID of a callback is its sequence number (1, 2, 3...). + return num_handled_callbacks_ + pending_callbacks_.size(); +} + +void QualityScalerResource::HandlePendingCallback(size_t callback_id, + bool clear_qp_samples) { + // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue, + // this method would be invoked on the adaptation queue and a PostTask would + // be used to resolve the callback. + RTC_DCHECK_RUN_ON(encoder_queue_); + if (num_handled_callbacks_ >= callback_id) { + // The callback with this ID has already been handled. + // This happens if AbortPendingCallbacks() is called while the task is + // in flight. + return; + } + RTC_DCHECK(!pending_callbacks_.empty()); + pending_callbacks_.front()->OnQpUsageHandled(clear_qp_samples); + ++num_handled_callbacks_; + pending_callbacks_.pop(); +} + +void QualityScalerResource::AbortPendingCallbacks() { + RTC_DCHECK_RUN_ON(encoder_queue_); + while (!pending_callbacks_.empty()) { + pending_callbacks_.front()->OnQpUsageHandled(false); + ++num_handled_callbacks_; + pending_callbacks_.pop(); + } } } // namespace webrtc diff --git a/video/adaptation/quality_scaler_resource.h b/video/adaptation/quality_scaler_resource.h index 6cec79c4b0..7c55e9bacd 100644 --- a/video/adaptation/quality_scaler_resource.h +++ b/video/adaptation/quality_scaler_resource.h @@ -12,6 +12,7 @@ #define VIDEO_ADAPTATION_QUALITY_SCALER_RESOURCE_H_ #include +#include #include #include "api/video/video_adaptation_reason.h" @@ -19,16 +20,13 @@ #include "call/adaptation/resource.h" #include "call/adaptation/resource_adaptation_processor_interface.h" #include "modules/video_coding/utility/quality_scaler.h" +#include "rtc_base/critical_section.h" #include "rtc_base/ref_counted_object.h" #include "rtc_base/task_queue.h" namespace webrtc { // Handles interaction with the QualityScaler. -// TODO(hbos): Add unittests specific to this class, it is currently only tested -// indirectly by usage in the ResourceAdaptationProcessor (which is only tested -// because of its usage in VideoStreamEncoder); all tests are currently in -// video_stream_encoder_unittest.cc. class QualityScalerResource : public rtc::RefCountedObject, public QualityScalerQpUsageHandlerInterface { public: @@ -70,14 +68,31 @@ class QualityScalerResource : public rtc::RefCountedObject, rtc::scoped_refptr reason_resource) override; private: + size_t QueuePendingCallback( + rtc::scoped_refptr + callback); + void HandlePendingCallback(size_t callback_id, bool clear_qp_samples); + void AbortPendingCallbacks(); + rtc::TaskQueue* encoder_queue_; // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue, // guard the processor by it instead. ResourceAdaptationProcessorInterface* adaptation_processor_ RTC_GUARDED_BY(encoder_queue_); std::unique_ptr quality_scaler_ RTC_GUARDED_BY(encoder_queue_); - rtc::scoped_refptr - pending_qp_usage_callback_ RTC_GUARDED_BY(encoder_queue_); + // Every OnReportQpUsageHigh/Low() operation has a callback that MUST be + // invoked on the |encoder_queue_|. + // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue, + // handling a measurement entails a task queue "ping" round-trip between the + // encoder queue and the adaptation queue. Multiple callbacks in-flight would + // then be possible. + size_t num_handled_callbacks_ RTC_GUARDED_BY(encoder_queue_); + std::queue> + pending_callbacks_ RTC_GUARDED_BY(encoder_queue_); + // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue, + // guard processing_in_progress_/clear_cp_samples_ by it instead. + bool processing_in_progress_ RTC_GUARDED_BY(encoder_queue_); + bool clear_qp_samples_ RTC_GUARDED_BY(encoder_queue_); }; } // namespace webrtc diff --git a/video/adaptation/quality_scaler_resource_unittest.cc b/video/adaptation/quality_scaler_resource_unittest.cc new file mode 100644 index 0000000000..d49addfe8c --- /dev/null +++ b/video/adaptation/quality_scaler_resource_unittest.cc @@ -0,0 +1,158 @@ +/* + * Copyright 2020 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/adaptation/quality_scaler_resource.h" + +#include + +#include "absl/types/optional.h" +#include "api/task_queue/default_task_queue_factory.h" +#include "api/task_queue/task_queue_factory.h" +#include "api/video_codecs/video_encoder.h" +#include "rtc_base/event.h" +#include "rtc_base/task_queue.h" +#include "test/gtest.h" + +namespace webrtc { + +namespace { + +const int kDefaultTimeout = 5000; + +class FakeQualityScalerQpUsageHandlerCallback + : public QualityScalerQpUsageHandlerCallbackInterface { + public: + explicit FakeQualityScalerQpUsageHandlerCallback( + rtc::TaskQueue* encoder_queue) + : QualityScalerQpUsageHandlerCallbackInterface(), + encoder_queue_(encoder_queue), + is_handled_(false), + qp_usage_handled_event_(true /* manual_reset */, false), + clear_qp_samples_result_(absl::nullopt) {} + ~FakeQualityScalerQpUsageHandlerCallback() override { + RTC_DCHECK(is_handled_) + << "The callback was destroyed without being invoked."; + } + + void OnQpUsageHandled(bool clear_qp_samples) override { + ASSERT_TRUE(encoder_queue_->IsCurrent()); + RTC_DCHECK(!is_handled_); + clear_qp_samples_result_ = clear_qp_samples; + is_handled_ = true; + qp_usage_handled_event_.Set(); + } + + bool is_handled() const { return is_handled_; } + rtc::Event* qp_usage_handled_event() { return &qp_usage_handled_event_; } + absl::optional clear_qp_samples_result() const { + return clear_qp_samples_result_; + } + + private: + rtc::TaskQueue* const encoder_queue_; + bool is_handled_; + rtc::Event qp_usage_handled_event_; + absl::optional clear_qp_samples_result_; +}; + +} // namespace + +class QualityScalerResourceTest : public ::testing::Test { + public: + QualityScalerResourceTest() + : task_queue_factory_(CreateDefaultTaskQueueFactory()), + encoder_queue_(task_queue_factory_->CreateTaskQueue( + "EncoderQueue", + TaskQueueFactory::Priority::NORMAL)), + quality_scaler_resource_(new QualityScalerResource()) { + quality_scaler_resource_->Initialize(&encoder_queue_); + rtc::Event event; + encoder_queue_.PostTask([this, &event] { + quality_scaler_resource_->StartCheckForOveruse( + VideoEncoder::QpThresholds()); + event.Set(); + }); + event.Wait(kDefaultTimeout); + } + + ~QualityScalerResourceTest() { + rtc::Event event; + encoder_queue_.PostTask([this, &event] { + quality_scaler_resource_->StopCheckForOveruse(); + event.Set(); + }); + event.Wait(kDefaultTimeout); + } + + protected: + const std::unique_ptr task_queue_factory_; + rtc::TaskQueue encoder_queue_; + rtc::scoped_refptr quality_scaler_resource_; +}; + +TEST_F(QualityScalerResourceTest, ReportQpHigh) { + rtc::scoped_refptr callback = + new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_); + encoder_queue_.PostTask([this, callback] { + quality_scaler_resource_->OnReportQpUsageHigh(callback); + }); + callback->qp_usage_handled_event()->Wait(kDefaultTimeout); +} + +TEST_F(QualityScalerResourceTest, ReportQpLow) { + rtc::scoped_refptr callback = + new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_); + encoder_queue_.PostTask([this, callback] { + quality_scaler_resource_->OnReportQpUsageLow(callback); + }); + callback->qp_usage_handled_event()->Wait(kDefaultTimeout); +} + +// TODO(https://crbug.com/webrtc/11542): Callbacks are currently resolved +// immediately, but when we have an adaptation queue this test will ensure we +// can have multiple callbacks pending at the same time. +TEST_F(QualityScalerResourceTest, MultipleCallbacksInFlight) { + rtc::scoped_refptr callback1 = + new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_); + rtc::scoped_refptr callback2 = + new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_); + rtc::scoped_refptr callback3 = + new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_); + encoder_queue_.PostTask([this, callback1, callback2, callback3] { + quality_scaler_resource_->OnReportQpUsageHigh(callback1); + quality_scaler_resource_->OnReportQpUsageLow(callback2); + quality_scaler_resource_->OnReportQpUsageHigh(callback3); + }); + callback1->qp_usage_handled_event()->Wait(kDefaultTimeout); + callback2->qp_usage_handled_event()->Wait(kDefaultTimeout); + callback3->qp_usage_handled_event()->Wait(kDefaultTimeout); +} + +// TODO(https://crbug.com/webrtc/11542): Callbacks are currently resolved +// immediately, but when we have an adaptation queue this test will ensure we +// can abort pending callbacks. +TEST_F(QualityScalerResourceTest, AbortPendingCallbacksAndStartAgain) { + rtc::scoped_refptr callback1 = + new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_); + rtc::scoped_refptr callback2 = + new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_); + encoder_queue_.PostTask([this, callback1, callback2] { + quality_scaler_resource_->OnReportQpUsageHigh(callback1); + quality_scaler_resource_->StopCheckForOveruse(); + EXPECT_TRUE(callback1->qp_usage_handled_event()->Wait(0)); + quality_scaler_resource_->StartCheckForOveruse( + VideoEncoder::QpThresholds()); + quality_scaler_resource_->OnReportQpUsageHigh(callback2); + }); + callback1->qp_usage_handled_event()->Wait(kDefaultTimeout); + callback2->qp_usage_handled_event()->Wait(kDefaultTimeout); +} + +} // namespace webrtc