VideoStreamEncoder: clean up threading constraints.

The sequences of threads entering the VideoStreamEncoder has been
unclear. Fix this by renaming the uninformational |main_queue_| to
|worker_queue_|, and introduce a new |network_queue_| which is set
on construction.

Bug: chromium:1255737
Change-Id: Ic4d3a5b8188b8cc98e60b72aee2c09c9afbc7356
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/236523
Reviewed-by: Henrik Andreassson <henrika@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35283}
This commit is contained in:
Markus Handell 2021-10-28 15:29:42 +02:00 committed by WebRTC LUCI CQ
parent 3cff171333
commit 2b10c479ce
6 changed files with 48 additions and 36 deletions

View File

@ -1029,7 +1029,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
std::vector<uint32_t> ssrcs = config.rtp.ssrcs; std::vector<uint32_t> ssrcs = config.rtp.ssrcs;
VideoSendStream* send_stream = new VideoSendStream( VideoSendStream* send_stream = new VideoSendStream(
clock_, num_cpu_cores_, task_queue_factory_, clock_, num_cpu_cores_, task_queue_factory_, network_thread_,
call_stats_->AsRtcpRttStats(), transport_send_.get(), call_stats_->AsRtcpRttStats(), transport_send_.get(),
bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_, bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_,
std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,

View File

@ -113,6 +113,7 @@ VideoSendStream::VideoSendStream(
Clock* clock, Clock* clock,
int num_cpu_cores, int num_cpu_cores,
TaskQueueFactory* task_queue_factory, TaskQueueFactory* task_queue_factory,
TaskQueueBase* network_queue,
RtcpRttStats* call_stats, RtcpRttStats* call_stats,
RtpTransportControllerSendInterface* transport, RtpTransportControllerSendInterface* transport,
BitrateAllocatorInterface* bitrate_allocator, BitrateAllocatorInterface* bitrate_allocator,
@ -135,6 +136,7 @@ VideoSendStream::VideoSendStream(
config_.encoder_settings, config_.encoder_settings,
std::make_unique<OveruseFrameDetector>(&stats_proxy_), std::make_unique<OveruseFrameDetector>(&stats_proxy_),
task_queue_factory, task_queue_factory,
network_queue,
GetBitrateAllocationCallbackType(config_))), GetBitrateAllocationCallbackType(config_))),
encoder_feedback_( encoder_feedback_(
clock, clock,

View File

@ -58,6 +58,7 @@ class VideoSendStream : public webrtc::VideoSendStream {
Clock* clock, Clock* clock,
int num_cpu_cores, int num_cpu_cores,
TaskQueueFactory* task_queue_factory, TaskQueueFactory* task_queue_factory,
TaskQueueBase* network_queue,
RtcpRttStats* call_stats, RtcpRttStats* call_stats,
RtpTransportControllerSendInterface* transport, RtpTransportControllerSendInterface* transport,
BitrateAllocatorInterface* bitrate_allocator, BitrateAllocatorInterface* bitrate_allocator,

View File

@ -593,8 +593,10 @@ VideoStreamEncoder::VideoStreamEncoder(
const VideoStreamEncoderSettings& settings, const VideoStreamEncoderSettings& settings,
std::unique_ptr<OveruseFrameDetector> overuse_detector, std::unique_ptr<OveruseFrameDetector> overuse_detector,
TaskQueueFactory* task_queue_factory, TaskQueueFactory* task_queue_factory,
TaskQueueBase* network_queue,
BitrateAllocationCallbackType allocation_cb_type) BitrateAllocationCallbackType allocation_cb_type)
: main_queue_(TaskQueueBase::Current()), : worker_queue_(TaskQueueBase::Current()),
network_queue_(network_queue),
number_of_cores_(number_of_cores), number_of_cores_(number_of_cores),
sink_(nullptr), sink_(nullptr),
settings_(settings), settings_(settings),
@ -665,7 +667,7 @@ VideoStreamEncoder::VideoStreamEncoder(
"EncoderQueue", "EncoderQueue",
TaskQueueFactory::Priority::NORMAL)) { TaskQueueFactory::Priority::NORMAL)) {
TRACE_EVENT0("webrtc", "VideoStreamEncoder::VideoStreamEncoder"); TRACE_EVENT0("webrtc", "VideoStreamEncoder::VideoStreamEncoder");
RTC_DCHECK(main_queue_); RTC_DCHECK(worker_queue_);
RTC_DCHECK(encoder_stats_observer); RTC_DCHECK(encoder_stats_observer);
RTC_DCHECK_GE(number_of_cores, 1); RTC_DCHECK_GE(number_of_cores, 1);
@ -694,13 +696,13 @@ VideoStreamEncoder::VideoStreamEncoder(
} }
VideoStreamEncoder::~VideoStreamEncoder() { VideoStreamEncoder::~VideoStreamEncoder() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(!video_source_sink_controller_.HasSource()) RTC_DCHECK(!video_source_sink_controller_.HasSource())
<< "Must call ::Stop() before destruction."; << "Must call ::Stop() before destruction.";
} }
void VideoStreamEncoder::Stop() { void VideoStreamEncoder::Stop() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.SetSource(nullptr); video_source_sink_controller_.SetSource(nullptr);
rtc::Event shutdown_event; rtc::Event shutdown_event;
@ -746,7 +748,7 @@ void VideoStreamEncoder::SetFecControllerOverride(
void VideoStreamEncoder::AddAdaptationResource( void VideoStreamEncoder::AddAdaptationResource(
rtc::scoped_refptr<Resource> resource) { rtc::scoped_refptr<Resource> resource) {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
TRACE_EVENT0("webrtc", "VideoStreamEncoder::AddAdaptationResource"); TRACE_EVENT0("webrtc", "VideoStreamEncoder::AddAdaptationResource");
// Map any externally added resources as kCpu for the sake of stats reporting. // Map any externally added resources as kCpu for the sake of stats reporting.
// TODO(hbos): Make the manager map any unknown resources to kCpu and get rid // TODO(hbos): Make the manager map any unknown resources to kCpu and get rid
@ -767,14 +769,14 @@ void VideoStreamEncoder::AddAdaptationResource(
std::vector<rtc::scoped_refptr<Resource>> std::vector<rtc::scoped_refptr<Resource>>
VideoStreamEncoder::GetAdaptationResources() { VideoStreamEncoder::GetAdaptationResources() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
return resource_adaptation_processor_->GetResources(); return resource_adaptation_processor_->GetResources();
} }
void VideoStreamEncoder::SetSource( void VideoStreamEncoder::SetSource(
rtc::VideoSourceInterface<VideoFrame>* source, rtc::VideoSourceInterface<VideoFrame>* source,
const DegradationPreference& degradation_preference) { const DegradationPreference& degradation_preference) {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.SetSource(source); video_source_sink_controller_.SetSource(source);
input_state_provider_.OnHasInputChanged(source); input_state_provider_.OnHasInputChanged(source);
@ -794,7 +796,7 @@ void VideoStreamEncoder::SetSource(
} }
void VideoStreamEncoder::SetSink(EncoderSink* sink, bool rotation_applied) { void VideoStreamEncoder::SetSink(EncoderSink* sink, bool rotation_applied) {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.SetRotationApplied(rotation_applied); video_source_sink_controller_.SetRotationApplied(rotation_applied);
video_source_sink_controller_.PushSourceSinkSettings(); video_source_sink_controller_.PushSourceSinkSettings();
@ -818,6 +820,7 @@ void VideoStreamEncoder::SetStartBitrate(int start_bitrate_bps) {
void VideoStreamEncoder::ConfigureEncoder(VideoEncoderConfig config, void VideoStreamEncoder::ConfigureEncoder(VideoEncoderConfig config,
size_t max_data_payload_length) { size_t max_data_payload_length) {
RTC_DCHECK_RUN_ON(worker_queue_);
encoder_queue_.PostTask( encoder_queue_.PostTask(
[this, config = std::move(config), max_data_payload_length]() mutable { [this, config = std::move(config), max_data_payload_length]() mutable {
RTC_DCHECK_RUN_ON(&encoder_queue_); RTC_DCHECK_RUN_ON(&encoder_queue_);
@ -1099,10 +1102,10 @@ void VideoStreamEncoder::ReconfigureEncoder() {
encoder_resolutions.emplace_back(simulcastStream.width, encoder_resolutions.emplace_back(simulcastStream.width,
simulcastStream.height); simulcastStream.height);
} }
main_queue_->PostTask(ToQueuedTask( worker_queue_->PostTask(ToQueuedTask(
task_safety_, [this, max_framerate, alignment, task_safety_, [this, max_framerate, alignment,
encoder_resolutions = std::move(encoder_resolutions)]() { encoder_resolutions = std::move(encoder_resolutions)]() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
if (max_framerate != if (max_framerate !=
video_source_sink_controller_.frame_rate_upper_limit() || video_source_sink_controller_.frame_rate_upper_limit() ||
alignment != video_source_sink_controller_.resolution_alignment() || alignment != video_source_sink_controller_.resolution_alignment() ||
@ -1269,6 +1272,8 @@ void VideoStreamEncoder::OnEncoderSettingsChanged() {
} }
void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) { void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) {
// Threading context here under Chromium is the network thread. Test
// environments may currently call in from other alien contexts.
RTC_DCHECK_RUNS_SERIALIZED(&incoming_frame_race_checker_); RTC_DCHECK_RUNS_SERIALIZED(&incoming_frame_race_checker_);
VideoFrame incoming_frame = video_frame; VideoFrame incoming_frame = video_frame;
@ -1379,12 +1384,12 @@ void VideoStreamEncoder::OnDiscardedFrame() {
void VideoStreamEncoder::OnConstraintsChanged( void VideoStreamEncoder::OnConstraintsChanged(
const webrtc::VideoTrackSourceConstraints& constraints) { const webrtc::VideoTrackSourceConstraints& constraints) {
// This method is called on the network thread. RTC_DCHECK_RUN_ON(network_queue_);
RTC_LOG(LS_INFO) << __func__ << " min_fps " RTC_LOG(LS_INFO) << __func__ << " min_fps "
<< constraints.min_fps.value_or(-1) << " max_fps " << constraints.min_fps.value_or(-1) << " max_fps "
<< constraints.max_fps.value_or(-1); << constraints.max_fps.value_or(-1);
main_queue_->PostTask(ToQueuedTask(task_safety_, [this, constraints] { worker_queue_->PostTask(ToQueuedTask(task_safety_, [this, constraints] {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
source_constraints_ = constraints; source_constraints_ = constraints;
})); }));
} }
@ -1809,8 +1814,8 @@ void VideoStreamEncoder::EncodeVideoFrame(const VideoFrame& video_frame,
} }
} else { } else {
encoder_failed_ = true; encoder_failed_ = true;
main_queue_->PostTask(ToQueuedTask(task_safety_, [this]() { worker_queue_->PostTask(ToQueuedTask(task_safety_, [this]() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
settings_.encoder_switch_request_callback->RequestEncoderFallback(); settings_.encoder_switch_request_callback->RequestEncoderFallback();
})); }));
} }
@ -2171,9 +2176,9 @@ void VideoStreamEncoder::OnVideoSourceRestrictionsUpdated(
RTC_LOG(INFO) << "Updating sink restrictions from " RTC_LOG(INFO) << "Updating sink restrictions from "
<< (reason ? reason->Name() : std::string("<null>")) << " to " << (reason ? reason->Name() : std::string("<null>")) << " to "
<< restrictions.ToString(); << restrictions.ToString();
main_queue_->PostTask(ToQueuedTask( worker_queue_->PostTask(ToQueuedTask(
task_safety_, [this, restrictions = std::move(restrictions)]() { task_safety_, [this, restrictions = std::move(restrictions)]() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.SetRestrictions(std::move(restrictions)); video_source_sink_controller_.SetRestrictions(std::move(restrictions));
video_source_sink_controller_.PushSourceSinkSettings(); video_source_sink_controller_.PushSourceSinkSettings();
})); }));
@ -2329,22 +2334,23 @@ void VideoStreamEncoder::CheckForAnimatedContent(
RTC_LOG(LS_INFO) << "Removing resolution cap due to no consistent " RTC_LOG(LS_INFO) << "Removing resolution cap due to no consistent "
"animation detection."; "animation detection.";
} }
main_queue_->PostTask(ToQueuedTask(task_safety_, [this, worker_queue_->PostTask(
should_cap_resolution]() { ToQueuedTask(task_safety_, [this, should_cap_resolution]() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
video_source_sink_controller_.SetPixelsPerFrameUpperLimit( video_source_sink_controller_.SetPixelsPerFrameUpperLimit(
should_cap_resolution ? absl::optional<size_t>(kMaxAnimationPixels) should_cap_resolution
: absl::nullopt); ? absl::optional<size_t>(kMaxAnimationPixels)
video_source_sink_controller_.PushSourceSinkSettings(); : absl::nullopt);
})); video_source_sink_controller_.PushSourceSinkSettings();
}));
} }
} }
// RTC_RUN_ON(&encoder_queue_) // RTC_RUN_ON(&encoder_queue_)
void VideoStreamEncoder::QueueRequestEncoderSwitch( void VideoStreamEncoder::QueueRequestEncoderSwitch(
const EncoderSwitchRequestCallback::Config& conf) { const EncoderSwitchRequestCallback::Config& conf) {
main_queue_->PostTask(ToQueuedTask(task_safety_, [this, conf]() { worker_queue_->PostTask(ToQueuedTask(task_safety_, [this, conf]() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
settings_.encoder_switch_request_callback->RequestEncoderSwitch(conf); settings_.encoder_switch_request_callback->RequestEncoderSwitch(conf);
})); }));
} }
@ -2352,8 +2358,8 @@ void VideoStreamEncoder::QueueRequestEncoderSwitch(
// RTC_RUN_ON(&encoder_queue_) // RTC_RUN_ON(&encoder_queue_)
void VideoStreamEncoder::QueueRequestEncoderSwitch( void VideoStreamEncoder::QueueRequestEncoderSwitch(
const webrtc::SdpVideoFormat& format) { const webrtc::SdpVideoFormat& format) {
main_queue_->PostTask(ToQueuedTask(task_safety_, [this, format]() { worker_queue_->PostTask(ToQueuedTask(task_safety_, [this, format]() {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
settings_.encoder_switch_request_callback->RequestEncoderSwitch(format); settings_.encoder_switch_request_callback->RequestEncoderSwitch(format);
})); }));
} }
@ -2367,8 +2373,8 @@ void VideoStreamEncoder::MaybeReportFrameRateConstraintUmas() {
encoder_config_.content_type == VideoEncoderConfig::ContentType::kScreen; encoder_config_.content_type == VideoEncoderConfig::ContentType::kScreen;
if (!is_screenshare) if (!is_screenshare)
return; return;
main_queue_->PostTask(ToQueuedTask(task_safety_, [this] { worker_queue_->PostTask(ToQueuedTask(task_safety_, [this] {
RTC_DCHECK_RUN_ON(main_queue_); RTC_DCHECK_RUN_ON(worker_queue_);
RTC_HISTOGRAM_BOOLEAN("WebRTC.Screenshare.FrameRateConstraints.Exists", RTC_HISTOGRAM_BOOLEAN("WebRTC.Screenshare.FrameRateConstraints.Exists",
source_constraints_.has_value()); source_constraints_.has_value());
if (source_constraints_.has_value()) { if (source_constraints_.has_value()) {

View File

@ -75,6 +75,7 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface,
const VideoStreamEncoderSettings& settings, const VideoStreamEncoderSettings& settings,
std::unique_ptr<OveruseFrameDetector> overuse_detector, std::unique_ptr<OveruseFrameDetector> overuse_detector,
TaskQueueFactory* task_queue_factory, TaskQueueFactory* task_queue_factory,
TaskQueueBase* network_queue,
BitrateAllocationCallbackType allocation_cb_type); BitrateAllocationCallbackType allocation_cb_type);
~VideoStreamEncoder() override; ~VideoStreamEncoder() override;
@ -231,7 +232,8 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface,
// Reports UMAs on frame rate constraints usage on the first call. // Reports UMAs on frame rate constraints usage on the first call.
void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&encoder_queue_); void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&encoder_queue_);
TaskQueueBase* const main_queue_; TaskQueueBase* const worker_queue_;
TaskQueueBase* const network_queue_;
const uint32_t number_of_cores_; const uint32_t number_of_cores_;
@ -246,7 +248,7 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface,
// The source's constraints. // The source's constraints.
absl::optional<VideoTrackSourceConstraints> source_constraints_ absl::optional<VideoTrackSourceConstraints> source_constraints_
RTC_GUARDED_BY(main_queue_); RTC_GUARDED_BY(worker_queue_);
bool has_reported_screenshare_frame_rate_umas_ bool has_reported_screenshare_frame_rate_umas_
RTC_GUARDED_BY(&encoder_queue_) = false; RTC_GUARDED_BY(&encoder_queue_) = false;
@ -410,7 +412,7 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface,
// to provide us with different resolution or frame rate. // to provide us with different resolution or frame rate.
// This class is thread-safe. // This class is thread-safe.
VideoSourceSinkController video_source_sink_controller_ VideoSourceSinkController video_source_sink_controller_
RTC_GUARDED_BY(main_queue_); RTC_GUARDED_BY(worker_queue_);
// Default bitrate limits in EncoderInfoSettings allowed. // Default bitrate limits in EncoderInfoSettings allowed.
const bool default_limits_allowed_; const bool default_limits_allowed_;
@ -424,7 +426,7 @@ class VideoStreamEncoder : public VideoStreamEncoderInterface,
// first to make sure no tasks run that use other members. // first to make sure no tasks run that use other members.
rtc::TaskQueue encoder_queue_; rtc::TaskQueue encoder_queue_;
// Used to cancel any potentially pending tasks to the main thread. // Used to cancel any potentially pending tasks to the worker thread.
ScopedTaskSafety task_safety_; ScopedTaskSafety task_safety_;
RTC_DISALLOW_COPY_AND_ASSIGN(VideoStreamEncoder); RTC_DISALLOW_COPY_AND_ASSIGN(VideoStreamEncoder);

View File

@ -356,6 +356,7 @@ class VideoStreamEncoderUnderTest : public VideoStreamEncoder {
overuse_detector_proxy_ = overuse_detector_proxy_ =
new CpuOveruseDetectorProxy(stats_proxy)), new CpuOveruseDetectorProxy(stats_proxy)),
task_queue_factory, task_queue_factory,
TaskQueueBase::Current(),
allocation_callback_type), allocation_callback_type),
time_controller_(time_controller), time_controller_(time_controller),
fake_cpu_resource_(FakeResource::Create("FakeResource[CPU]")), fake_cpu_resource_(FakeResource::Create("FakeResource[CPU]")),