diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc index bea8334efd..4ec643f46a 100644 --- a/call/adaptation/resource_adaptation_processor.cc +++ b/call/adaptation/resource_adaptation_processor.cc @@ -20,6 +20,7 @@ #include "rtc_base/logging.h" #include "rtc_base/ref_counted_object.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/task_utils/to_queued_task.h" namespace webrtc { @@ -127,38 +128,77 @@ void ResourceAdaptationProcessor::RemoveResourceLimitationsListener( void ResourceAdaptationProcessor::AddResource( rtc::scoped_refptr resource) { - RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(resource); - RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end()) - << "Resource \"" << resource->Name() << "\" was already registered."; - resources_.push_back(resource); + { + MutexLock crit(&resources_lock_); + RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end()) + << "Resource \"" << resource->Name() << "\" was already registered."; + resources_.push_back(resource); + } resource->SetResourceListener(resource_listener_delegate_); } std::vector> ResourceAdaptationProcessor::GetResources() const { - RTC_DCHECK_RUN_ON(resource_adaptation_queue_); + MutexLock crit(&resources_lock_); return resources_; } void ResourceAdaptationProcessor::RemoveResource( rtc::scoped_refptr resource) { - RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(resource); RTC_LOG(INFO) << "Removing resource \"" << resource->Name() << "\"."; - auto it = absl::c_find(resources_, resource); - RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name() - << "\" was not a registered resource."; + resource->SetResourceListener(nullptr); + { + MutexLock crit(&resources_lock_); + auto it = absl::c_find(resources_, resource); + RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name() + << "\" was not a registered resource."; + resources_.erase(it); + } + RemoveLimitationsImposedByResource(std::move(resource)); +} + +void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource( + rtc::scoped_refptr resource) { + if (!resource_adaptation_queue_->IsCurrent()) { + resource_adaptation_queue_->PostTask(ToQueuedTask( + [this, resource]() { RemoveLimitationsImposedByResource(resource); })); + return; + } + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); auto resource_adaptation_limits = adaptation_limits_by_resources_.find(resource); if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) { VideoStreamAdapter::RestrictionsWithCounters adaptation_limits = resource_adaptation_limits->second; adaptation_limits_by_resources_.erase(resource_adaptation_limits); - MaybeUpdateResourceLimitationsOnResourceRemoval(adaptation_limits); + if (adaptation_limits_by_resources_.empty()) { + // Only the resource being removed was adapted so clear restrictions. + stream_adapter_->ClearRestrictions(); + return; + } + + VideoStreamAdapter::RestrictionsWithCounters most_limited = + FindMostLimitedResources().second; + + if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) { + // The removed limitations were less limited than the most limited + // resource. Don't change the current restrictions. + return; + } + + // Apply the new most limited resource as the next restrictions. + Adaptation adapt_to = stream_adapter_->GetAdaptationTo( + most_limited.counters, most_limited.restrictions); + RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid); + stream_adapter_->ApplyAdaptation(adapt_to, nullptr); + + RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to " + "next most limited restrictions: " + << most_limited.restrictions.ToString() << " with counters " + << most_limited.counters.ToString(); } - resources_.erase(it); - resource->SetResourceListener(nullptr); } void ResourceAdaptationProcessor::AddAdaptationConstraint( @@ -185,10 +225,13 @@ void ResourceAdaptationProcessor::OnResourceUsageStateMeasured( RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(resource); // |resource| could have been removed after signalling. - if (absl::c_find(resources_, resource) == resources_.end()) { - RTC_LOG(INFO) << "Ignoring signal from removed resource \"" - << resource->Name() << "\"."; - return; + { + MutexLock crit(&resources_lock_); + if (absl::c_find(resources_, resource) == resources_.end()) { + RTC_LOG(INFO) << "Ignoring signal from removed resource \"" + << resource->Name() << "\"."; + return; + } } MitigationResultAndLogMessage result_and_message; switch (usage_state) { @@ -372,36 +415,6 @@ void ResourceAdaptationProcessor::UpdateResourceLimitations( } } -void ResourceAdaptationProcessor:: - MaybeUpdateResourceLimitationsOnResourceRemoval( - VideoStreamAdapter::RestrictionsWithCounters removed_limitations) { - if (adaptation_limits_by_resources_.empty()) { - // Only the resource being removed was adapted so clear restrictions. - stream_adapter_->ClearRestrictions(); - return; - } - - VideoStreamAdapter::RestrictionsWithCounters most_limited = - FindMostLimitedResources().second; - - if (removed_limitations.counters.Total() <= most_limited.counters.Total()) { - // The removed limitations were less limited than the most limited resource. - // Don't change the current restrictions. - return; - } - - // Apply the new most limited resource as the next restrictions. - Adaptation adapt_to = stream_adapter_->GetAdaptationTo( - most_limited.counters, most_limited.restrictions); - RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid); - stream_adapter_->ApplyAdaptation(adapt_to, nullptr); - - RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to " - "next most limited restrictions: " - << most_limited.restrictions.ToString() << " with counters " - << most_limited.counters.ToString(); -} - void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated( VideoSourceRestrictions restrictions, const VideoAdaptationCounters& adaptation_counters, diff --git a/call/adaptation/resource_adaptation_processor.h b/call/adaptation/resource_adaptation_processor.h index 9f20bdbc60..c10b3f6426 100644 --- a/call/adaptation/resource_adaptation_processor.h +++ b/call/adaptation/resource_adaptation_processor.h @@ -49,7 +49,8 @@ namespace webrtc { // // The ResourceAdaptationProcessor is single-threaded. It may be constructed on // any thread but MUST subsequently be used and destroyed on a single sequence, -// i.e. the "resource adaptation task queue". +// i.e. the "resource adaptation task queue". Resources can be added and removed +// from any thread. class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, public VideoSourceRestrictionsListener, public ResourceListener { @@ -146,18 +147,18 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, VideoStreamAdapter::RestrictionsWithCounters> FindMostLimitedResources() const RTC_RUN_ON(resource_adaptation_queue_); - void MaybeUpdateResourceLimitationsOnResourceRemoval( - VideoStreamAdapter::RestrictionsWithCounters removed_limitations) - RTC_RUN_ON(resource_adaptation_queue_); + void RemoveLimitationsImposedByResource( + rtc::scoped_refptr resource); TaskQueueBase* resource_adaptation_queue_; rtc::scoped_refptr resource_listener_delegate_; // Input and output. VideoStreamEncoderObserver* const encoder_stats_observer_ RTC_GUARDED_BY(resource_adaptation_queue_); - std::vector resource_limitations_listeners_ - RTC_GUARDED_BY(resource_adaptation_queue_); + mutable Mutex resources_lock_; std::vector> resources_ + RTC_GUARDED_BY(resources_lock_); + std::vector resource_limitations_listeners_ RTC_GUARDED_BY(resource_adaptation_queue_); std::vector adaptation_constraints_ RTC_GUARDED_BY(resource_adaptation_queue_); diff --git a/call/adaptation/resource_adaptation_processor_interface.h b/call/adaptation/resource_adaptation_processor_interface.h index fe500c9d89..59d2323715 100644 --- a/call/adaptation/resource_adaptation_processor_interface.h +++ b/call/adaptation/resource_adaptation_processor_interface.h @@ -51,16 +51,16 @@ class ResourceAdaptationProcessorInterface { virtual void SetResourceAdaptationQueue( TaskQueueBase* resource_adaptation_queue) = 0; - // Starts or stops listening to resources, effectively enabling or disabling - // processing. - // TODO(https://crbug.com/webrtc/11172): Automatically register and unregister - // with AddResource() and RemoveResource() instead. When the processor is - // multi-stream aware, stream-specific resouces will get added and removed - // over time. virtual void AddResourceLimitationsListener( ResourceLimitationsListener* limitations_listener) = 0; virtual void RemoveResourceLimitationsListener( ResourceLimitationsListener* limitations_listener) = 0; + // Starts or stops listening to resources, effectively enabling or disabling + // processing. May be called from anywhere. + // TODO(https://crbug.com/webrtc/11172): Automatically register and unregister + // with AddResource() and RemoveResource() instead. When the processor is + // multi-stream aware, stream-specific resouces will get added and removed + // over time. virtual void AddResource(rtc::scoped_refptr resource) = 0; virtual std::vector> GetResources() const = 0; virtual void RemoveResource(rtc::scoped_refptr resource) = 0; diff --git a/video/adaptation/video_stream_encoder_resource.cc b/video/adaptation/video_stream_encoder_resource.cc index fae33553f7..df76df48ac 100644 --- a/video/adaptation/video_stream_encoder_resource.cc +++ b/video/adaptation/video_stream_encoder_resource.cc @@ -52,9 +52,9 @@ void VideoStreamEncoderResource::UnregisterAdaptationTaskQueue() { void VideoStreamEncoderResource::SetResourceListener( ResourceListener* listener) { - RTC_DCHECK_RUN_ON(resource_adaptation_queue()); // If you want to change listener you need to unregister the old listener by // setting it to null first. + MutexLock crit(&listener_lock_); RTC_DCHECK(!listener_ || !listener) << "A listener is already set"; listener_ = listener; } @@ -65,7 +65,7 @@ std::string VideoStreamEncoderResource::Name() const { void VideoStreamEncoderResource::OnResourceUsageStateMeasured( ResourceUsageState usage_state) { - RTC_DCHECK_RUN_ON(resource_adaptation_queue()); + MutexLock crit(&listener_lock_); if (listener_) { listener_->OnResourceUsageStateMeasured(this, usage_state); } diff --git a/video/adaptation/video_stream_encoder_resource.h b/video/adaptation/video_stream_encoder_resource.h index 6ff314fcae..08994c135d 100644 --- a/video/adaptation/video_stream_encoder_resource.h +++ b/video/adaptation/video_stream_encoder_resource.h @@ -72,7 +72,8 @@ class VideoStreamEncoderResource : public Resource { // Treated as const after initialization. TaskQueueBase* encoder_queue_; TaskQueueBase* resource_adaptation_queue_ RTC_GUARDED_BY(lock_); - ResourceListener* listener_ RTC_GUARDED_BY(resource_adaptation_queue()); + mutable Mutex listener_lock_; + ResourceListener* listener_ RTC_GUARDED_BY(listener_lock_); }; } // namespace webrtc diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc index 255405ee92..9776e06593 100644 --- a/video/video_stream_encoder.cc +++ b/video/video_stream_encoder.cc @@ -410,14 +410,19 @@ void VideoStreamEncoder::Stop() { RTC_DCHECK_RUN_ON(&thread_checker_); video_source_sink_controller_.SetSource(nullptr); + if (resource_adaptation_processor_) { + for (auto& resource : stream_resource_manager_.MappedResources()) { + resource_adaptation_processor_->RemoveResource(resource); + } + } rtc::Event shutdown_adaptation_processor_event; resource_adaptation_queue_.PostTask([this, &shutdown_adaptation_processor_event] { RTC_DCHECK_RUN_ON(&resource_adaptation_queue_); if (resource_adaptation_processor_) { - for (auto& resource : stream_resource_manager_.MappedResources()) { - resource_adaptation_processor_->RemoveResource(resource); - } + // Removed on the resource_adaptaiton_processor_ queue because the + // adaptation_constraints_ and adaptation_listeners_ fields are guarded by + // this queue. for (auto* constraint : adaptation_constraints_) { resource_adaptation_processor_->RemoveAdaptationConstraint(constraint); } @@ -479,41 +484,15 @@ void VideoStreamEncoder::AddAdaptationResource( RTC_DCHECK_RUN_ON(&encoder_queue_); stream_resource_manager_.MapResourceToReason(resource, VideoAdaptationReason::kCpu); + resource_adaptation_processor_->AddResource(resource); map_resource_event.Set(); }); map_resource_event.Wait(rtc::Event::kForever); - - // Add the resource to the processor. - rtc::Event add_resource_event; - resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] { - RTC_DCHECK_RUN_ON(&resource_adaptation_queue_); - if (!resource_adaptation_processor_) { - // The VideoStreamEncoder was stopped and the processor destroyed before - // this task had a chance to execute. No action needed. - return; - } - resource_adaptation_processor_->AddResource(resource); - add_resource_event.Set(); - }); - add_resource_event.Wait(rtc::Event::kForever); } std::vector> VideoStreamEncoder::GetAdaptationResources() { - std::vector> resources; - rtc::Event event; - resource_adaptation_queue_.PostTask([this, &resources, &event] { - RTC_DCHECK_RUN_ON(&resource_adaptation_queue_); - if (!resource_adaptation_processor_) { - // The VideoStreamEncoder was stopped and the processor destroyed before - // this task had a chance to execute. No action needed. - return; - } - resources = resource_adaptation_processor_->GetResources(); - event.Set(); - }); - event.Wait(rtc::Event::kForever); - return resources; + return resource_adaptation_processor_->GetResources(); } void VideoStreamEncoder::SetSource( @@ -2115,22 +2094,10 @@ void VideoStreamEncoder::InjectAdaptationResource( encoder_queue_.PostTask([this, resource, reason, &map_resource_event] { RTC_DCHECK_RUN_ON(&encoder_queue_); stream_resource_manager_.MapResourceToReason(resource, reason); + resource_adaptation_processor_->AddResource(resource); map_resource_event.Set(); }); map_resource_event.Wait(rtc::Event::kForever); - - rtc::Event add_resource_event; - resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] { - RTC_DCHECK_RUN_ON(&resource_adaptation_queue_); - if (!resource_adaptation_processor_) { - // The VideoStreamEncoder was stopped and the processor destroyed before - // this task had a chance to execute. No action needed. - return; - } - resource_adaptation_processor_->AddResource(resource); - add_resource_event.Set(); - }); - add_resource_event.Wait(rtc::Event::kForever); } void VideoStreamEncoder::InjectAdaptationConstraint( diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h index 95d4dcb69e..bd1593d3c6 100644 --- a/video/video_stream_encoder.h +++ b/video/video_stream_encoder.h @@ -416,9 +416,10 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface, RTC_GUARDED_BY(&resource_adaptation_queue_); // Responsible for adapting input resolution or frame rate to ensure resources // (e.g. CPU or bandwidth) are not overused. - // This class is single-threaded on the resource adaptation queue. - std::unique_ptr resource_adaptation_processor_ - RTC_GUARDED_BY(&resource_adaptation_queue_); + // Adding resources can occur on any thread, but all other methods need to be + // called on the adaptation thread. + std::unique_ptr + resource_adaptation_processor_; std::unique_ptr degradation_preference_manager_; std::vector adaptation_constraints_ RTC_GUARDED_BY(&resource_adaptation_queue_);