[Adaptation] Make ResourceAdaptationProcessorInterface resources thread-safe
This is one less dependency on the task queue, and will make things like removing resources and cleanup much easier in the future. Bug: webrtc:11754 Change-Id: I732f1935d1b58ffe09ca2a2bf59beebc1930214d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178869 Commit-Queue: Evan Shrubsole <eshr@google.com> Reviewed-by: Henrik Boström <hbos@webrtc.org> Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31686}
This commit is contained in:
parent
5fbd758b88
commit
517f81e0a4
@ -20,6 +20,7 @@
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/ref_counted_object.h"
|
||||
#include "rtc_base/strings/string_builder.h"
|
||||
#include "rtc_base/synchronization/sequence_checker.h"
|
||||
#include "rtc_base/task_utils/to_queued_task.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -127,38 +128,77 @@ void ResourceAdaptationProcessor::RemoveResourceLimitationsListener(
|
||||
|
||||
void ResourceAdaptationProcessor::AddResource(
|
||||
rtc::scoped_refptr<Resource> resource) {
|
||||
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
|
||||
RTC_DCHECK(resource);
|
||||
RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
|
||||
<< "Resource \"" << resource->Name() << "\" was already registered.";
|
||||
resources_.push_back(resource);
|
||||
{
|
||||
MutexLock crit(&resources_lock_);
|
||||
RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
|
||||
<< "Resource \"" << resource->Name() << "\" was already registered.";
|
||||
resources_.push_back(resource);
|
||||
}
|
||||
resource->SetResourceListener(resource_listener_delegate_);
|
||||
}
|
||||
|
||||
std::vector<rtc::scoped_refptr<Resource>>
|
||||
ResourceAdaptationProcessor::GetResources() const {
|
||||
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
|
||||
MutexLock crit(&resources_lock_);
|
||||
return resources_;
|
||||
}
|
||||
|
||||
void ResourceAdaptationProcessor::RemoveResource(
|
||||
rtc::scoped_refptr<Resource> resource) {
|
||||
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
|
||||
RTC_DCHECK(resource);
|
||||
RTC_LOG(INFO) << "Removing resource \"" << resource->Name() << "\".";
|
||||
auto it = absl::c_find(resources_, resource);
|
||||
RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
|
||||
<< "\" was not a registered resource.";
|
||||
resource->SetResourceListener(nullptr);
|
||||
{
|
||||
MutexLock crit(&resources_lock_);
|
||||
auto it = absl::c_find(resources_, resource);
|
||||
RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
|
||||
<< "\" was not a registered resource.";
|
||||
resources_.erase(it);
|
||||
}
|
||||
RemoveLimitationsImposedByResource(std::move(resource));
|
||||
}
|
||||
|
||||
void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
|
||||
rtc::scoped_refptr<Resource> resource) {
|
||||
if (!resource_adaptation_queue_->IsCurrent()) {
|
||||
resource_adaptation_queue_->PostTask(ToQueuedTask(
|
||||
[this, resource]() { RemoveLimitationsImposedByResource(resource); }));
|
||||
return;
|
||||
}
|
||||
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
|
||||
auto resource_adaptation_limits =
|
||||
adaptation_limits_by_resources_.find(resource);
|
||||
if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) {
|
||||
VideoStreamAdapter::RestrictionsWithCounters adaptation_limits =
|
||||
resource_adaptation_limits->second;
|
||||
adaptation_limits_by_resources_.erase(resource_adaptation_limits);
|
||||
MaybeUpdateResourceLimitationsOnResourceRemoval(adaptation_limits);
|
||||
if (adaptation_limits_by_resources_.empty()) {
|
||||
// Only the resource being removed was adapted so clear restrictions.
|
||||
stream_adapter_->ClearRestrictions();
|
||||
return;
|
||||
}
|
||||
|
||||
VideoStreamAdapter::RestrictionsWithCounters most_limited =
|
||||
FindMostLimitedResources().second;
|
||||
|
||||
if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) {
|
||||
// The removed limitations were less limited than the most limited
|
||||
// resource. Don't change the current restrictions.
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply the new most limited resource as the next restrictions.
|
||||
Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
|
||||
most_limited.counters, most_limited.restrictions);
|
||||
RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
|
||||
stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
|
||||
|
||||
RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
|
||||
"next most limited restrictions: "
|
||||
<< most_limited.restrictions.ToString() << " with counters "
|
||||
<< most_limited.counters.ToString();
|
||||
}
|
||||
resources_.erase(it);
|
||||
resource->SetResourceListener(nullptr);
|
||||
}
|
||||
|
||||
void ResourceAdaptationProcessor::AddAdaptationConstraint(
|
||||
@ -185,10 +225,13 @@ void ResourceAdaptationProcessor::OnResourceUsageStateMeasured(
|
||||
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
|
||||
RTC_DCHECK(resource);
|
||||
// |resource| could have been removed after signalling.
|
||||
if (absl::c_find(resources_, resource) == resources_.end()) {
|
||||
RTC_LOG(INFO) << "Ignoring signal from removed resource \""
|
||||
<< resource->Name() << "\".";
|
||||
return;
|
||||
{
|
||||
MutexLock crit(&resources_lock_);
|
||||
if (absl::c_find(resources_, resource) == resources_.end()) {
|
||||
RTC_LOG(INFO) << "Ignoring signal from removed resource \""
|
||||
<< resource->Name() << "\".";
|
||||
return;
|
||||
}
|
||||
}
|
||||
MitigationResultAndLogMessage result_and_message;
|
||||
switch (usage_state) {
|
||||
@ -372,36 +415,6 @@ void ResourceAdaptationProcessor::UpdateResourceLimitations(
|
||||
}
|
||||
}
|
||||
|
||||
void ResourceAdaptationProcessor::
|
||||
MaybeUpdateResourceLimitationsOnResourceRemoval(
|
||||
VideoStreamAdapter::RestrictionsWithCounters removed_limitations) {
|
||||
if (adaptation_limits_by_resources_.empty()) {
|
||||
// Only the resource being removed was adapted so clear restrictions.
|
||||
stream_adapter_->ClearRestrictions();
|
||||
return;
|
||||
}
|
||||
|
||||
VideoStreamAdapter::RestrictionsWithCounters most_limited =
|
||||
FindMostLimitedResources().second;
|
||||
|
||||
if (removed_limitations.counters.Total() <= most_limited.counters.Total()) {
|
||||
// The removed limitations were less limited than the most limited resource.
|
||||
// Don't change the current restrictions.
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply the new most limited resource as the next restrictions.
|
||||
Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
|
||||
most_limited.counters, most_limited.restrictions);
|
||||
RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
|
||||
stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
|
||||
|
||||
RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
|
||||
"next most limited restrictions: "
|
||||
<< most_limited.restrictions.ToString() << " with counters "
|
||||
<< most_limited.counters.ToString();
|
||||
}
|
||||
|
||||
void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated(
|
||||
VideoSourceRestrictions restrictions,
|
||||
const VideoAdaptationCounters& adaptation_counters,
|
||||
|
||||
@ -49,7 +49,8 @@ namespace webrtc {
|
||||
//
|
||||
// The ResourceAdaptationProcessor is single-threaded. It may be constructed on
|
||||
// any thread but MUST subsequently be used and destroyed on a single sequence,
|
||||
// i.e. the "resource adaptation task queue".
|
||||
// i.e. the "resource adaptation task queue". Resources can be added and removed
|
||||
// from any thread.
|
||||
class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface,
|
||||
public VideoSourceRestrictionsListener,
|
||||
public ResourceListener {
|
||||
@ -146,18 +147,18 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface,
|
||||
VideoStreamAdapter::RestrictionsWithCounters>
|
||||
FindMostLimitedResources() const RTC_RUN_ON(resource_adaptation_queue_);
|
||||
|
||||
void MaybeUpdateResourceLimitationsOnResourceRemoval(
|
||||
VideoStreamAdapter::RestrictionsWithCounters removed_limitations)
|
||||
RTC_RUN_ON(resource_adaptation_queue_);
|
||||
void RemoveLimitationsImposedByResource(
|
||||
rtc::scoped_refptr<Resource> resource);
|
||||
|
||||
TaskQueueBase* resource_adaptation_queue_;
|
||||
rtc::scoped_refptr<ResourceListenerDelegate> resource_listener_delegate_;
|
||||
// Input and output.
|
||||
VideoStreamEncoderObserver* const encoder_stats_observer_
|
||||
RTC_GUARDED_BY(resource_adaptation_queue_);
|
||||
std::vector<ResourceLimitationsListener*> resource_limitations_listeners_
|
||||
RTC_GUARDED_BY(resource_adaptation_queue_);
|
||||
mutable Mutex resources_lock_;
|
||||
std::vector<rtc::scoped_refptr<Resource>> resources_
|
||||
RTC_GUARDED_BY(resources_lock_);
|
||||
std::vector<ResourceLimitationsListener*> resource_limitations_listeners_
|
||||
RTC_GUARDED_BY(resource_adaptation_queue_);
|
||||
std::vector<AdaptationConstraint*> adaptation_constraints_
|
||||
RTC_GUARDED_BY(resource_adaptation_queue_);
|
||||
|
||||
@ -51,16 +51,16 @@ class ResourceAdaptationProcessorInterface {
|
||||
virtual void SetResourceAdaptationQueue(
|
||||
TaskQueueBase* resource_adaptation_queue) = 0;
|
||||
|
||||
// Starts or stops listening to resources, effectively enabling or disabling
|
||||
// processing.
|
||||
// TODO(https://crbug.com/webrtc/11172): Automatically register and unregister
|
||||
// with AddResource() and RemoveResource() instead. When the processor is
|
||||
// multi-stream aware, stream-specific resouces will get added and removed
|
||||
// over time.
|
||||
virtual void AddResourceLimitationsListener(
|
||||
ResourceLimitationsListener* limitations_listener) = 0;
|
||||
virtual void RemoveResourceLimitationsListener(
|
||||
ResourceLimitationsListener* limitations_listener) = 0;
|
||||
// Starts or stops listening to resources, effectively enabling or disabling
|
||||
// processing. May be called from anywhere.
|
||||
// TODO(https://crbug.com/webrtc/11172): Automatically register and unregister
|
||||
// with AddResource() and RemoveResource() instead. When the processor is
|
||||
// multi-stream aware, stream-specific resouces will get added and removed
|
||||
// over time.
|
||||
virtual void AddResource(rtc::scoped_refptr<Resource> resource) = 0;
|
||||
virtual std::vector<rtc::scoped_refptr<Resource>> GetResources() const = 0;
|
||||
virtual void RemoveResource(rtc::scoped_refptr<Resource> resource) = 0;
|
||||
|
||||
@ -52,9 +52,9 @@ void VideoStreamEncoderResource::UnregisterAdaptationTaskQueue() {
|
||||
|
||||
void VideoStreamEncoderResource::SetResourceListener(
|
||||
ResourceListener* listener) {
|
||||
RTC_DCHECK_RUN_ON(resource_adaptation_queue());
|
||||
// If you want to change listener you need to unregister the old listener by
|
||||
// setting it to null first.
|
||||
MutexLock crit(&listener_lock_);
|
||||
RTC_DCHECK(!listener_ || !listener) << "A listener is already set";
|
||||
listener_ = listener;
|
||||
}
|
||||
@ -65,7 +65,7 @@ std::string VideoStreamEncoderResource::Name() const {
|
||||
|
||||
void VideoStreamEncoderResource::OnResourceUsageStateMeasured(
|
||||
ResourceUsageState usage_state) {
|
||||
RTC_DCHECK_RUN_ON(resource_adaptation_queue());
|
||||
MutexLock crit(&listener_lock_);
|
||||
if (listener_) {
|
||||
listener_->OnResourceUsageStateMeasured(this, usage_state);
|
||||
}
|
||||
|
||||
@ -72,7 +72,8 @@ class VideoStreamEncoderResource : public Resource {
|
||||
// Treated as const after initialization.
|
||||
TaskQueueBase* encoder_queue_;
|
||||
TaskQueueBase* resource_adaptation_queue_ RTC_GUARDED_BY(lock_);
|
||||
ResourceListener* listener_ RTC_GUARDED_BY(resource_adaptation_queue());
|
||||
mutable Mutex listener_lock_;
|
||||
ResourceListener* listener_ RTC_GUARDED_BY(listener_lock_);
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
@ -410,14 +410,19 @@ void VideoStreamEncoder::Stop() {
|
||||
RTC_DCHECK_RUN_ON(&thread_checker_);
|
||||
video_source_sink_controller_.SetSource(nullptr);
|
||||
|
||||
if (resource_adaptation_processor_) {
|
||||
for (auto& resource : stream_resource_manager_.MappedResources()) {
|
||||
resource_adaptation_processor_->RemoveResource(resource);
|
||||
}
|
||||
}
|
||||
rtc::Event shutdown_adaptation_processor_event;
|
||||
resource_adaptation_queue_.PostTask([this,
|
||||
&shutdown_adaptation_processor_event] {
|
||||
RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
|
||||
if (resource_adaptation_processor_) {
|
||||
for (auto& resource : stream_resource_manager_.MappedResources()) {
|
||||
resource_adaptation_processor_->RemoveResource(resource);
|
||||
}
|
||||
// Removed on the resource_adaptaiton_processor_ queue because the
|
||||
// adaptation_constraints_ and adaptation_listeners_ fields are guarded by
|
||||
// this queue.
|
||||
for (auto* constraint : adaptation_constraints_) {
|
||||
resource_adaptation_processor_->RemoveAdaptationConstraint(constraint);
|
||||
}
|
||||
@ -479,41 +484,15 @@ void VideoStreamEncoder::AddAdaptationResource(
|
||||
RTC_DCHECK_RUN_ON(&encoder_queue_);
|
||||
stream_resource_manager_.MapResourceToReason(resource,
|
||||
VideoAdaptationReason::kCpu);
|
||||
resource_adaptation_processor_->AddResource(resource);
|
||||
map_resource_event.Set();
|
||||
});
|
||||
map_resource_event.Wait(rtc::Event::kForever);
|
||||
|
||||
// Add the resource to the processor.
|
||||
rtc::Event add_resource_event;
|
||||
resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] {
|
||||
RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
|
||||
if (!resource_adaptation_processor_) {
|
||||
// The VideoStreamEncoder was stopped and the processor destroyed before
|
||||
// this task had a chance to execute. No action needed.
|
||||
return;
|
||||
}
|
||||
resource_adaptation_processor_->AddResource(resource);
|
||||
add_resource_event.Set();
|
||||
});
|
||||
add_resource_event.Wait(rtc::Event::kForever);
|
||||
}
|
||||
|
||||
std::vector<rtc::scoped_refptr<Resource>>
|
||||
VideoStreamEncoder::GetAdaptationResources() {
|
||||
std::vector<rtc::scoped_refptr<Resource>> resources;
|
||||
rtc::Event event;
|
||||
resource_adaptation_queue_.PostTask([this, &resources, &event] {
|
||||
RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
|
||||
if (!resource_adaptation_processor_) {
|
||||
// The VideoStreamEncoder was stopped and the processor destroyed before
|
||||
// this task had a chance to execute. No action needed.
|
||||
return;
|
||||
}
|
||||
resources = resource_adaptation_processor_->GetResources();
|
||||
event.Set();
|
||||
});
|
||||
event.Wait(rtc::Event::kForever);
|
||||
return resources;
|
||||
return resource_adaptation_processor_->GetResources();
|
||||
}
|
||||
|
||||
void VideoStreamEncoder::SetSource(
|
||||
@ -2115,22 +2094,10 @@ void VideoStreamEncoder::InjectAdaptationResource(
|
||||
encoder_queue_.PostTask([this, resource, reason, &map_resource_event] {
|
||||
RTC_DCHECK_RUN_ON(&encoder_queue_);
|
||||
stream_resource_manager_.MapResourceToReason(resource, reason);
|
||||
resource_adaptation_processor_->AddResource(resource);
|
||||
map_resource_event.Set();
|
||||
});
|
||||
map_resource_event.Wait(rtc::Event::kForever);
|
||||
|
||||
rtc::Event add_resource_event;
|
||||
resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] {
|
||||
RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
|
||||
if (!resource_adaptation_processor_) {
|
||||
// The VideoStreamEncoder was stopped and the processor destroyed before
|
||||
// this task had a chance to execute. No action needed.
|
||||
return;
|
||||
}
|
||||
resource_adaptation_processor_->AddResource(resource);
|
||||
add_resource_event.Set();
|
||||
});
|
||||
add_resource_event.Wait(rtc::Event::kForever);
|
||||
}
|
||||
|
||||
void VideoStreamEncoder::InjectAdaptationConstraint(
|
||||
|
||||
@ -416,9 +416,10 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface,
|
||||
RTC_GUARDED_BY(&resource_adaptation_queue_);
|
||||
// Responsible for adapting input resolution or frame rate to ensure resources
|
||||
// (e.g. CPU or bandwidth) are not overused.
|
||||
// This class is single-threaded on the resource adaptation queue.
|
||||
std::unique_ptr<ResourceAdaptationProcessor> resource_adaptation_processor_
|
||||
RTC_GUARDED_BY(&resource_adaptation_queue_);
|
||||
// Adding resources can occur on any thread, but all other methods need to be
|
||||
// called on the adaptation thread.
|
||||
std::unique_ptr<ResourceAdaptationProcessorInterface>
|
||||
resource_adaptation_processor_;
|
||||
std::unique_ptr<DegradationPreferenceManager> degradation_preference_manager_;
|
||||
std::vector<AdaptationConstraint*> adaptation_constraints_
|
||||
RTC_GUARDED_BY(&resource_adaptation_queue_);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user