VideoStreamEncoder - wait less.

Some of the state that's managed in VideoStreamEncoder, is updated
and accessed on the encoder queue, but a method that's used for testing
only (GetAdaptationResources()), represents a race between PostTask
operations that update state and checking for said state on the worker
thread.

This CL removes Wait() operations related to adaptation resources from
the common path and puts one in the test path instead.

Bug: webrtc:13612
Change-Id: Ie3e018e815e24951bc0634ed70de17eaf336a508
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249220
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35797}
This commit is contained in:
Tommi 2022-01-25 23:41:22 +01:00 committed by WebRTC LUCI CQ
parent ec803b3ef0
commit 62b01db428
7 changed files with 80 additions and 45 deletions

View File

@ -27,14 +27,8 @@ namespace webrtc {
ResourceAdaptationProcessor::ResourceListenerDelegate::ResourceListenerDelegate(
ResourceAdaptationProcessor* processor)
: task_queue_(nullptr), processor_(processor) {}
void ResourceAdaptationProcessor::ResourceListenerDelegate::SetTaskQueue(
TaskQueueBase* task_queue) {
RTC_DCHECK(!task_queue_);
RTC_DCHECK(task_queue);
task_queue_ = task_queue;
RTC_DCHECK_RUN_ON(task_queue_);
: task_queue_(TaskQueueBase::Current()), processor_(processor) {
RTC_DCHECK(task_queue_);
}
void ResourceAdaptationProcessor::ResourceListenerDelegate::
@ -70,14 +64,15 @@ ResourceAdaptationProcessor::MitigationResultAndLogMessage::
ResourceAdaptationProcessor::ResourceAdaptationProcessor(
VideoStreamAdapter* stream_adapter)
: task_queue_(nullptr),
: task_queue_(TaskQueueBase::Current()),
resource_listener_delegate_(
rtc::make_ref_counted<ResourceListenerDelegate>(this)),
resources_(),
stream_adapter_(stream_adapter),
last_reported_source_restrictions_(),
previous_mitigation_results_() {
RTC_DCHECK(stream_adapter_);
RTC_DCHECK(task_queue_);
stream_adapter_->AddRestrictionsListener(this);
}
ResourceAdaptationProcessor::~ResourceAdaptationProcessor() {
@ -89,16 +84,6 @@ ResourceAdaptationProcessor::~ResourceAdaptationProcessor() {
resource_listener_delegate_->OnProcessorDestroyed();
}
void ResourceAdaptationProcessor::SetTaskQueue(TaskQueueBase* task_queue) {
RTC_DCHECK(!task_queue_);
RTC_DCHECK(task_queue);
task_queue_ = task_queue;
resource_listener_delegate_->SetTaskQueue(task_queue);
RTC_DCHECK_RUN_ON(task_queue_);
// Now that we have the queue we can attach as adaptation listener.
stream_adapter_->AddRestrictionsListener(this);
}
void ResourceAdaptationProcessor::AddResourceLimitationsListener(
ResourceLimitationsListener* limitations_listener) {
RTC_DCHECK_RUN_ON(task_queue_);

View File

@ -58,8 +58,6 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface,
VideoStreamAdapter* video_stream_adapter);
~ResourceAdaptationProcessor() override;
void SetTaskQueue(TaskQueueBase* task_queue) override;
// ResourceAdaptationProcessorInterface implementation.
void AddResourceLimitationsListener(
ResourceLimitationsListener* limitations_listener) override;
@ -90,7 +88,6 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface,
public:
explicit ResourceListenerDelegate(ResourceAdaptationProcessor* processor);
void SetTaskQueue(TaskQueueBase* task_queue);
void OnProcessorDestroyed();
// ResourceListener implementation.

View File

@ -47,8 +47,6 @@ class ResourceAdaptationProcessorInterface {
public:
virtual ~ResourceAdaptationProcessorInterface();
virtual void SetTaskQueue(TaskQueueBase* task_queue) = 0;
virtual void AddResourceLimitationsListener(
ResourceLimitationsListener* limitations_listener) = 0;
virtual void RemoveResourceLimitationsListener(

View File

@ -93,7 +93,6 @@ class ResourceAdaptationProcessorTest : public ::testing::Test {
&frame_rate_provider_)),
processor_(std::make_unique<ResourceAdaptationProcessor>(
video_stream_adapter_.get())) {
processor_->SetTaskQueue(TaskQueueBase::Current());
video_stream_adapter_->AddRestrictionsListener(&restrictions_listener_);
processor_->AddResource(resource_);
processor_->AddResource(other_resource_);

View File

@ -647,9 +647,6 @@ VideoStreamEncoder::VideoStreamEncoder(
video_stream_adapter_(
std::make_unique<VideoStreamAdapter>(&input_state_provider_,
encoder_stats_observer)),
resource_adaptation_processor_(
std::make_unique<ResourceAdaptationProcessor>(
video_stream_adapter_.get())),
degradation_preference_manager_(
std::make_unique<DegradationPreferenceManager>(
video_stream_adapter_.get())),
@ -677,10 +674,13 @@ VideoStreamEncoder::VideoStreamEncoder(
frame_cadence_adapter_->Initialize(&cadence_callback_);
stream_resource_manager_.Initialize(&encoder_queue_);
rtc::Event initialize_processor_event;
encoder_queue_.PostTask([this, &initialize_processor_event] {
encoder_queue_.PostTask([this] {
RTC_DCHECK_RUN_ON(&encoder_queue_);
resource_adaptation_processor_->SetTaskQueue(encoder_queue_.Get());
resource_adaptation_processor_ =
std::make_unique<ResourceAdaptationProcessor>(
video_stream_adapter_.get());
stream_resource_manager_.SetAdaptationProcessor(
resource_adaptation_processor_.get(), video_stream_adapter_.get());
resource_adaptation_processor_->AddResourceLimitationsListener(
@ -694,9 +694,7 @@ VideoStreamEncoder::VideoStreamEncoder(
for (auto* constraint : adaptation_constraints_) {
video_stream_adapter_->AddAdaptationConstraint(constraint);
}
initialize_processor_event.Set();
});
initialize_processor_event.Wait(rtc::Event::kForever);
}
VideoStreamEncoder::~VideoStreamEncoder() {
@ -760,22 +758,31 @@ void VideoStreamEncoder::AddAdaptationResource(
// of this MapResourceToReason() call.
TRACE_EVENT_ASYNC_BEGIN0(
"webrtc", "VideoStreamEncoder::AddAdaptationResource(latency)", this);
rtc::Event map_resource_event;
encoder_queue_.PostTask([this, resource, &map_resource_event] {
encoder_queue_.PostTask([this, resource = std::move(resource)] {
TRACE_EVENT_ASYNC_END0(
"webrtc", "VideoStreamEncoder::AddAdaptationResource(latency)", this);
RTC_DCHECK_RUN_ON(&encoder_queue_);
additional_resources_.push_back(resource);
stream_resource_manager_.AddResource(resource, VideoAdaptationReason::kCpu);
map_resource_event.Set();
});
map_resource_event.Wait(rtc::Event::kForever);
}
std::vector<rtc::scoped_refptr<Resource>>
VideoStreamEncoder::GetAdaptationResources() {
RTC_DCHECK_RUN_ON(worker_queue_);
return resource_adaptation_processor_->GetResources();
// In practice, this method is only called by tests to verify operations that
// run on the encoder queue. So rather than force PostTask() operations to
// be accompanied by an event and a `Wait()`, we'll use PostTask + Wait()
// here.
rtc::Event event;
std::vector<rtc::scoped_refptr<Resource>> resources;
encoder_queue_.PostTask([&] {
RTC_DCHECK_RUN_ON(&encoder_queue_);
resources = resource_adaptation_processor_->GetResources();
event.Set();
});
event.Wait(rtc::Event::kForever);
return resources;
}
void VideoStreamEncoder::SetSource(
@ -2349,14 +2356,11 @@ void VideoStreamEncoder::CheckForAnimatedContent(
void VideoStreamEncoder::InjectAdaptationResource(
rtc::scoped_refptr<Resource> resource,
VideoAdaptationReason reason) {
rtc::Event map_resource_event;
encoder_queue_.PostTask([this, resource, reason, &map_resource_event] {
encoder_queue_.PostTask([this, resource = std::move(resource), reason] {
RTC_DCHECK_RUN_ON(&encoder_queue_);
additional_resources_.push_back(resource);
stream_resource_manager_.AddResource(resource, reason);
map_resource_event.Set();
});
map_resource_event.Wait(rtc::Event::kForever);
}
void VideoStreamEncoder::InjectAdaptationConstraint(

View File

@ -394,13 +394,13 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface,
// Provides video stream input states: current resolution and frame rate.
VideoStreamInputStateProvider input_state_provider_;
std::unique_ptr<VideoStreamAdapter> video_stream_adapter_
const std::unique_ptr<VideoStreamAdapter> video_stream_adapter_
RTC_GUARDED_BY(&encoder_queue_);
// Responsible for adapting input resolution or frame rate to ensure resources
// (e.g. CPU or bandwidth) are not overused. Adding resources can occur on any
// thread.
std::unique_ptr<ResourceAdaptationProcessorInterface>
resource_adaptation_processor_;
resource_adaptation_processor_ RTC_GUARDED_BY(&encoder_queue_);
std::unique_ptr<DegradationPreferenceManager> degradation_preference_manager_
RTC_GUARDED_BY(&encoder_queue_);
std::vector<AdaptationConstraint*> adaptation_constraints_

View File

@ -8863,6 +8863,58 @@ TEST_F(ReconfigureEncoderTest, ReconfiguredIfScalabilityModeChanges) {
RunTest({config1, config2}, /*expected_num_init_encode=*/2);
}
// Simple test that just creates and then immediately destroys an encoder.
// The purpose of the test is to make sure that nothing bad happens if the
// initialization step on the encoder queue, doesn't run.
TEST(VideoStreamEncoderSimpleTest, CreateDestroy) {
class SuperLazyTaskQueue : public webrtc::TaskQueueBase {
public:
SuperLazyTaskQueue() = default;
~SuperLazyTaskQueue() override = default;
private:
void Delete() override { delete this; }
void PostTask(std::unique_ptr<QueuedTask> task) override {
// meh.
}
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override {
ASSERT_TRUE(false);
}
};
// Lots of boiler plate.
GlobalSimulatedTimeController time_controller(Timestamp::Millis(0));
auto stats_proxy = std::make_unique<MockableSendStatisticsProxy>(
time_controller.GetClock(), VideoSendStream::Config(nullptr),
webrtc::VideoEncoderConfig::ContentType::kRealtimeVideo);
SimpleVideoStreamEncoderFactory::MockFakeEncoder mock_fake_encoder(
time_controller.GetClock());
test::VideoEncoderProxyFactory encoder_factory(&mock_fake_encoder);
std::unique_ptr<VideoBitrateAllocatorFactory> bitrate_allocator_factory =
CreateBuiltinVideoBitrateAllocatorFactory();
VideoStreamEncoderSettings encoder_settings{
VideoEncoder::Capabilities(/*loss_notification=*/false)};
encoder_settings.encoder_factory = &encoder_factory;
encoder_settings.bitrate_allocator_factory = bitrate_allocator_factory.get();
auto adapter = std::make_unique<MockFrameCadenceAdapter>();
EXPECT_CALL((*adapter.get()), Initialize).WillOnce(Return());
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
encoder_queue(new SuperLazyTaskQueue());
// Construct a VideoStreamEncoder instance and let it go out of scope without
// doing anything else (including calling Stop()). This should be fine since
// the posted init task will simply be deleted.
auto encoder = std::make_unique<VideoStreamEncoder>(
time_controller.GetClock(), 1, stats_proxy.get(), encoder_settings,
std::make_unique<CpuOveruseDetectorProxy>(stats_proxy.get()),
std::move(adapter), std::move(encoder_queue),
VideoStreamEncoder::BitrateAllocationCallbackType::
kVideoBitrateAllocation);
}
TEST(VideoStreamEncoderFrameCadenceTest, ActivatesFrameCadenceOnContentType) {
auto adapter = std::make_unique<MockFrameCadenceAdapter>();
auto* adapter_ptr = adapter.get();