From 62b01db428b30dd16d3b1ae709965e5afa8e5cb4 Mon Sep 17 00:00:00 2001 From: Tommi Date: Tue, 25 Jan 2022 23:41:22 +0100 Subject: [PATCH] VideoStreamEncoder - wait less. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some of the state that's managed in VideoStreamEncoder, is updated and accessed on the encoder queue, but a method that's used for testing only (GetAdaptationResources()), represents a race between PostTask operations that update state and checking for said state on the worker thread. This CL removes Wait() operations related to adaptation resources from the common path and puts one in the test path instead. Bug: webrtc:13612 Change-Id: Ie3e018e815e24951bc0634ed70de17eaf336a508 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249220 Reviewed-by: Henrik Boström Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#35797} --- .../resource_adaptation_processor.cc | 25 ++------- .../resource_adaptation_processor.h | 3 -- .../resource_adaptation_processor_interface.h | 2 - .../resource_adaptation_processor_unittest.cc | 1 - video/video_stream_encoder.cc | 38 ++++++++------ video/video_stream_encoder.h | 4 +- video/video_stream_encoder_unittest.cc | 52 +++++++++++++++++++ 7 files changed, 80 insertions(+), 45 deletions(-) diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc index d95cd75a9d..66e6f0c36e 100644 --- a/call/adaptation/resource_adaptation_processor.cc +++ b/call/adaptation/resource_adaptation_processor.cc @@ -27,14 +27,8 @@ namespace webrtc { ResourceAdaptationProcessor::ResourceListenerDelegate::ResourceListenerDelegate( ResourceAdaptationProcessor* processor) - : task_queue_(nullptr), processor_(processor) {} - -void ResourceAdaptationProcessor::ResourceListenerDelegate::SetTaskQueue( - TaskQueueBase* task_queue) { - RTC_DCHECK(!task_queue_); - RTC_DCHECK(task_queue); - task_queue_ = task_queue; - RTC_DCHECK_RUN_ON(task_queue_); + : task_queue_(TaskQueueBase::Current()), processor_(processor) { + RTC_DCHECK(task_queue_); } void ResourceAdaptationProcessor::ResourceListenerDelegate:: @@ -70,14 +64,15 @@ ResourceAdaptationProcessor::MitigationResultAndLogMessage:: ResourceAdaptationProcessor::ResourceAdaptationProcessor( VideoStreamAdapter* stream_adapter) - : task_queue_(nullptr), + : task_queue_(TaskQueueBase::Current()), resource_listener_delegate_( rtc::make_ref_counted(this)), resources_(), stream_adapter_(stream_adapter), last_reported_source_restrictions_(), previous_mitigation_results_() { - RTC_DCHECK(stream_adapter_); + RTC_DCHECK(task_queue_); + stream_adapter_->AddRestrictionsListener(this); } ResourceAdaptationProcessor::~ResourceAdaptationProcessor() { @@ -89,16 +84,6 @@ ResourceAdaptationProcessor::~ResourceAdaptationProcessor() { resource_listener_delegate_->OnProcessorDestroyed(); } -void ResourceAdaptationProcessor::SetTaskQueue(TaskQueueBase* task_queue) { - RTC_DCHECK(!task_queue_); - RTC_DCHECK(task_queue); - task_queue_ = task_queue; - resource_listener_delegate_->SetTaskQueue(task_queue); - RTC_DCHECK_RUN_ON(task_queue_); - // Now that we have the queue we can attach as adaptation listener. - stream_adapter_->AddRestrictionsListener(this); -} - void ResourceAdaptationProcessor::AddResourceLimitationsListener( ResourceLimitationsListener* limitations_listener) { RTC_DCHECK_RUN_ON(task_queue_); diff --git a/call/adaptation/resource_adaptation_processor.h b/call/adaptation/resource_adaptation_processor.h index 3e273081f8..ca2fec08c3 100644 --- a/call/adaptation/resource_adaptation_processor.h +++ b/call/adaptation/resource_adaptation_processor.h @@ -58,8 +58,6 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, VideoStreamAdapter* video_stream_adapter); ~ResourceAdaptationProcessor() override; - void SetTaskQueue(TaskQueueBase* task_queue) override; - // ResourceAdaptationProcessorInterface implementation. void AddResourceLimitationsListener( ResourceLimitationsListener* limitations_listener) override; @@ -90,7 +88,6 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, public: explicit ResourceListenerDelegate(ResourceAdaptationProcessor* processor); - void SetTaskQueue(TaskQueueBase* task_queue); void OnProcessorDestroyed(); // ResourceListener implementation. diff --git a/call/adaptation/resource_adaptation_processor_interface.h b/call/adaptation/resource_adaptation_processor_interface.h index 8b1f94b73a..4729488150 100644 --- a/call/adaptation/resource_adaptation_processor_interface.h +++ b/call/adaptation/resource_adaptation_processor_interface.h @@ -47,8 +47,6 @@ class ResourceAdaptationProcessorInterface { public: virtual ~ResourceAdaptationProcessorInterface(); - virtual void SetTaskQueue(TaskQueueBase* task_queue) = 0; - virtual void AddResourceLimitationsListener( ResourceLimitationsListener* limitations_listener) = 0; virtual void RemoveResourceLimitationsListener( diff --git a/call/adaptation/resource_adaptation_processor_unittest.cc b/call/adaptation/resource_adaptation_processor_unittest.cc index 7020b22daf..705223ab71 100644 --- a/call/adaptation/resource_adaptation_processor_unittest.cc +++ b/call/adaptation/resource_adaptation_processor_unittest.cc @@ -93,7 +93,6 @@ class ResourceAdaptationProcessorTest : public ::testing::Test { &frame_rate_provider_)), processor_(std::make_unique( video_stream_adapter_.get())) { - processor_->SetTaskQueue(TaskQueueBase::Current()); video_stream_adapter_->AddRestrictionsListener(&restrictions_listener_); processor_->AddResource(resource_); processor_->AddResource(other_resource_); diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc index 28cd8a7bba..9a313e1992 100644 --- a/video/video_stream_encoder.cc +++ b/video/video_stream_encoder.cc @@ -647,9 +647,6 @@ VideoStreamEncoder::VideoStreamEncoder( video_stream_adapter_( std::make_unique(&input_state_provider_, encoder_stats_observer)), - resource_adaptation_processor_( - std::make_unique( - video_stream_adapter_.get())), degradation_preference_manager_( std::make_unique( video_stream_adapter_.get())), @@ -677,10 +674,13 @@ VideoStreamEncoder::VideoStreamEncoder( frame_cadence_adapter_->Initialize(&cadence_callback_); stream_resource_manager_.Initialize(&encoder_queue_); - rtc::Event initialize_processor_event; - encoder_queue_.PostTask([this, &initialize_processor_event] { + encoder_queue_.PostTask([this] { RTC_DCHECK_RUN_ON(&encoder_queue_); - resource_adaptation_processor_->SetTaskQueue(encoder_queue_.Get()); + + resource_adaptation_processor_ = + std::make_unique( + video_stream_adapter_.get()); + stream_resource_manager_.SetAdaptationProcessor( resource_adaptation_processor_.get(), video_stream_adapter_.get()); resource_adaptation_processor_->AddResourceLimitationsListener( @@ -694,9 +694,7 @@ VideoStreamEncoder::VideoStreamEncoder( for (auto* constraint : adaptation_constraints_) { video_stream_adapter_->AddAdaptationConstraint(constraint); } - initialize_processor_event.Set(); }); - initialize_processor_event.Wait(rtc::Event::kForever); } VideoStreamEncoder::~VideoStreamEncoder() { @@ -760,22 +758,31 @@ void VideoStreamEncoder::AddAdaptationResource( // of this MapResourceToReason() call. TRACE_EVENT_ASYNC_BEGIN0( "webrtc", "VideoStreamEncoder::AddAdaptationResource(latency)", this); - rtc::Event map_resource_event; - encoder_queue_.PostTask([this, resource, &map_resource_event] { + encoder_queue_.PostTask([this, resource = std::move(resource)] { TRACE_EVENT_ASYNC_END0( "webrtc", "VideoStreamEncoder::AddAdaptationResource(latency)", this); RTC_DCHECK_RUN_ON(&encoder_queue_); additional_resources_.push_back(resource); stream_resource_manager_.AddResource(resource, VideoAdaptationReason::kCpu); - map_resource_event.Set(); }); - map_resource_event.Wait(rtc::Event::kForever); } std::vector> VideoStreamEncoder::GetAdaptationResources() { RTC_DCHECK_RUN_ON(worker_queue_); - return resource_adaptation_processor_->GetResources(); + // In practice, this method is only called by tests to verify operations that + // run on the encoder queue. So rather than force PostTask() operations to + // be accompanied by an event and a `Wait()`, we'll use PostTask + Wait() + // here. + rtc::Event event; + std::vector> resources; + encoder_queue_.PostTask([&] { + RTC_DCHECK_RUN_ON(&encoder_queue_); + resources = resource_adaptation_processor_->GetResources(); + event.Set(); + }); + event.Wait(rtc::Event::kForever); + return resources; } void VideoStreamEncoder::SetSource( @@ -2349,14 +2356,11 @@ void VideoStreamEncoder::CheckForAnimatedContent( void VideoStreamEncoder::InjectAdaptationResource( rtc::scoped_refptr resource, VideoAdaptationReason reason) { - rtc::Event map_resource_event; - encoder_queue_.PostTask([this, resource, reason, &map_resource_event] { + encoder_queue_.PostTask([this, resource = std::move(resource), reason] { RTC_DCHECK_RUN_ON(&encoder_queue_); additional_resources_.push_back(resource); stream_resource_manager_.AddResource(resource, reason); - map_resource_event.Set(); }); - map_resource_event.Wait(rtc::Event::kForever); } void VideoStreamEncoder::InjectAdaptationConstraint( diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h index b0a4661c7b..30c04b2e33 100644 --- a/video/video_stream_encoder.h +++ b/video/video_stream_encoder.h @@ -394,13 +394,13 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface, // Provides video stream input states: current resolution and frame rate. VideoStreamInputStateProvider input_state_provider_; - std::unique_ptr video_stream_adapter_ + const std::unique_ptr video_stream_adapter_ RTC_GUARDED_BY(&encoder_queue_); // Responsible for adapting input resolution or frame rate to ensure resources // (e.g. CPU or bandwidth) are not overused. Adding resources can occur on any // thread. std::unique_ptr - resource_adaptation_processor_; + resource_adaptation_processor_ RTC_GUARDED_BY(&encoder_queue_); std::unique_ptr degradation_preference_manager_ RTC_GUARDED_BY(&encoder_queue_); std::vector adaptation_constraints_ diff --git a/video/video_stream_encoder_unittest.cc b/video/video_stream_encoder_unittest.cc index 1b0597e339..aea1988df9 100644 --- a/video/video_stream_encoder_unittest.cc +++ b/video/video_stream_encoder_unittest.cc @@ -8863,6 +8863,58 @@ TEST_F(ReconfigureEncoderTest, ReconfiguredIfScalabilityModeChanges) { RunTest({config1, config2}, /*expected_num_init_encode=*/2); } +// Simple test that just creates and then immediately destroys an encoder. +// The purpose of the test is to make sure that nothing bad happens if the +// initialization step on the encoder queue, doesn't run. +TEST(VideoStreamEncoderSimpleTest, CreateDestroy) { + class SuperLazyTaskQueue : public webrtc::TaskQueueBase { + public: + SuperLazyTaskQueue() = default; + ~SuperLazyTaskQueue() override = default; + + private: + void Delete() override { delete this; } + void PostTask(std::unique_ptr task) override { + // meh. + } + void PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) override { + ASSERT_TRUE(false); + } + }; + + // Lots of boiler plate. + GlobalSimulatedTimeController time_controller(Timestamp::Millis(0)); + auto stats_proxy = std::make_unique( + time_controller.GetClock(), VideoSendStream::Config(nullptr), + webrtc::VideoEncoderConfig::ContentType::kRealtimeVideo); + SimpleVideoStreamEncoderFactory::MockFakeEncoder mock_fake_encoder( + time_controller.GetClock()); + test::VideoEncoderProxyFactory encoder_factory(&mock_fake_encoder); + std::unique_ptr bitrate_allocator_factory = + CreateBuiltinVideoBitrateAllocatorFactory(); + VideoStreamEncoderSettings encoder_settings{ + VideoEncoder::Capabilities(/*loss_notification=*/false)}; + encoder_settings.encoder_factory = &encoder_factory; + encoder_settings.bitrate_allocator_factory = bitrate_allocator_factory.get(); + + auto adapter = std::make_unique(); + EXPECT_CALL((*adapter.get()), Initialize).WillOnce(Return()); + + std::unique_ptr + encoder_queue(new SuperLazyTaskQueue()); + + // Construct a VideoStreamEncoder instance and let it go out of scope without + // doing anything else (including calling Stop()). This should be fine since + // the posted init task will simply be deleted. + auto encoder = std::make_unique( + time_controller.GetClock(), 1, stats_proxy.get(), encoder_settings, + std::make_unique(stats_proxy.get()), + std::move(adapter), std::move(encoder_queue), + VideoStreamEncoder::BitrateAllocationCallbackType:: + kVideoBitrateAllocation); +} + TEST(VideoStreamEncoderFrameCadenceTest, ActivatesFrameCadenceOnContentType) { auto adapter = std::make_unique(); auto* adapter_ptr = adapter.get();