diff --git a/video/video_send_stream.cc b/video/video_send_stream.cc index e95ca260a4..e5545e761c 100644 --- a/video/video_send_stream.cc +++ b/video/video_send_stream.cc @@ -149,7 +149,8 @@ VideoSendStream::VideoSendStream( const std::map& suspended_payload_states, std::unique_ptr fec_controller, const FieldTrialsView& field_trials) - : transport_(transport), + : rtp_transport_queue_(transport->GetWorkerQueue()), + transport_(transport), stats_proxy_(clock, config, encoder_config.content_type, field_trials), config_(std::move(config)), content_type_(encoder_config.content_type), @@ -236,7 +237,12 @@ void VideoSendStream::StartPerRtpStream(const std::vector active_layers) { } active_layers_string << "}"; RTC_LOG(LS_INFO) << "StartPerRtpStream: " << active_layers_string.str(); - send_stream_.StartPerRtpStream(active_layers); + + rtp_transport_queue_->RunOrPost( + SafeTask(transport_queue_safety_, [this, active_layers] { + send_stream_.StartPerRtpStream(active_layers); + })); + running_ = running; } @@ -246,7 +252,13 @@ void VideoSendStream::Stop() { return; RTC_DLOG(LS_INFO) << "VideoSendStream::Stop"; running_ = false; - send_stream_.Stop(); + rtp_transport_queue_->RunOrPost(SafeTask(transport_queue_safety_, [this] { + // As the stream can get re-used and implicitly restarted via changing + // the state of the active layers, we do not mark the + // `transport_queue_safety_` flag with `SetNotAlive()` here. That's only + // done when we stop permanently via `StopPermanentlyAndGetRtpStates()`. + send_stream_.Stop(); + })); } bool VideoSendStream::started() { @@ -288,7 +300,9 @@ void VideoSendStream::ReconfigureVideoEncoder(VideoEncoderConfig config, } VideoSendStream::Stats VideoSendStream::GetStats() { - RTC_DCHECK_RUN_ON(&thread_checker_); + // TODO(perkj, solenberg): Some test cases in EndToEndTest call GetStats from + // a network thread. See comment in Call::GetStats(). + // RTC_DCHECK_RUN_ON(&thread_checker_); return stats_proxy_.GetStats(); } @@ -306,9 +320,13 @@ void VideoSendStream::StopPermanentlyAndGetRtpStates( // Always run these cleanup steps regardless of whether running_ was set // or not. This will unregister callbacks before destruction. // See `VideoSendStreamImpl::StopVideoSendStream` for more. - send_stream_.Stop(); - *rtp_state_map = send_stream_.GetRtpStates(); - *payload_state_map = send_stream_.GetRtpPayloadStates(); + rtp_transport_queue_->RunSynchronous( + [this, rtp_state_map, payload_state_map]() { + transport_queue_safety_->SetNotAlive(); + send_stream_.Stop(); + *rtp_state_map = send_stream_.GetRtpStates(); + *payload_state_map = send_stream_.GetRtpPayloadStates(); + }); } void VideoSendStream::DeliverRtcp(const uint8_t* packet, size_t length) { @@ -317,7 +335,6 @@ void VideoSendStream::DeliverRtcp(const uint8_t* packet, size_t length) { } void VideoSendStream::GenerateKeyFrame(const std::vector& rids) { - RTC_DCHECK_RUN_ON(&thread_checker_); // Map rids to layers. If rids is empty, generate a keyframe for all layers. std::vector next_frames(config_.rtp.ssrcs.size(), VideoFrameType::kVideoFrameKey); diff --git a/video/video_send_stream.h b/video/video_send_stream.h index 7a3454a3a0..a7ce112b21 100644 --- a/video/video_send_stream.h +++ b/video/video_send_stream.h @@ -104,7 +104,11 @@ class VideoSendStream : public webrtc::VideoSendStream { absl::optional GetPacingFactorOverride() const; RTC_NO_UNIQUE_ADDRESS SequenceChecker thread_checker_; + MaybeWorkerThread* const rtp_transport_queue_; RtpTransportControllerSendInterface* const transport_; + rtc::Event thread_sync_event_; + rtc::scoped_refptr transport_queue_safety_ = + PendingTaskSafetyFlag::CreateDetached(); SendStatisticsProxy stats_proxy_; const VideoSendStream::Config config_; diff --git a/video/video_send_stream_impl.cc b/video/video_send_stream_impl.cc index 597bf39198..5fa2af398d 100644 --- a/video/video_send_stream_impl.cc +++ b/video/video_send_stream_impl.cc @@ -233,7 +233,7 @@ VideoSendStreamImpl::VideoSendStreamImpl( pacing_config_(PacingConfig(field_trials)), stats_proxy_(stats_proxy), config_(config), - worker_queue_(TaskQueueBase::Current()), + rtp_transport_queue_(transport->GetWorkerQueue()), timed_out_(false), transport_(transport), bitrate_allocator_(bitrate_allocator), @@ -298,26 +298,33 @@ VideoSendStreamImpl::VideoSendStreamImpl( transport->EnablePeriodicAlrProbing(*enable_alr_bw_probing); } - if (configured_pacing_factor_) - transport_->SetPacingFactor(*configured_pacing_factor_); + rtp_transport_queue_->RunOrPost(SafeTask(transport_queue_safety_, [this] { + if (configured_pacing_factor_) + transport_->SetPacingFactor(*configured_pacing_factor_); - video_stream_encoder_->SetStartBitrate( - bitrate_allocator_->GetStartBitrate(this)); + video_stream_encoder_->SetStartBitrate( + bitrate_allocator_->GetStartBitrate(this)); + })); } VideoSendStreamImpl::~VideoSendStreamImpl() { RTC_DCHECK_RUN_ON(&thread_checker_); RTC_LOG(LS_INFO) << "~VideoSendStreamImpl: " << config_->ToString(); + // TODO(webrtc:14502): Change `transport_queue_safety_` to be of type + // ScopedTaskSafety if experiment WebRTC-SendPacketsOnWorkerThread succeed. + if (rtp_transport_queue_->IsCurrent()) { + transport_queue_safety_->SetNotAlive(); + } } void VideoSendStreamImpl::DeliverRtcp(const uint8_t* packet, size_t length) { - RTC_DCHECK_RUN_ON(&thread_checker_); + // Runs on a worker thread. rtp_video_sender_->DeliverRtcp(packet, length); } void VideoSendStreamImpl::StartPerRtpStream( const std::vector active_layers) { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); bool previously_active = rtp_video_sender_->IsActive(); rtp_video_sender_->SetActiveModules(active_layers); if (!rtp_video_sender_->IsActive() && previously_active) { @@ -328,7 +335,8 @@ void VideoSendStreamImpl::StartPerRtpStream( } void VideoSendStreamImpl::StartupVideoSendStream() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); + transport_queue_safety_->SetAlive(); bitrate_allocator_->AddObserver(this, GetAllocationConfig()); // Start monitoring encoder activity. @@ -338,8 +346,9 @@ void VideoSendStreamImpl::StartupVideoSendStream() { activity_ = false; timed_out_ = false; check_encoder_activity_task_ = RepeatingTaskHandle::DelayedStart( - worker_queue_, kEncoderTimeOut, [this] { - RTC_DCHECK_RUN_ON(&thread_checker_); + rtp_transport_queue_->TaskQueueForDelayedTasks(), kEncoderTimeOut, + [this] { + RTC_DCHECK_RUN_ON(rtp_transport_queue_); if (!activity_) { if (!timed_out_) { SignalEncoderTimedOut(); @@ -359,27 +368,29 @@ void VideoSendStreamImpl::StartupVideoSendStream() { } void VideoSendStreamImpl::Stop() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); RTC_LOG(LS_INFO) << "VideoSendStreamImpl::Stop"; if (!rtp_video_sender_->IsActive()) return; + RTC_DCHECK(transport_queue_safety_->alive()); TRACE_EVENT_INSTANT0("webrtc", "VideoSendStream::Stop"); rtp_video_sender_->Stop(); StopVideoSendStream(); } void VideoSendStreamImpl::StopVideoSendStream() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); bitrate_allocator_->RemoveObserver(this); check_encoder_activity_task_.Stop(); video_stream_encoder_->OnBitrateUpdated(DataRate::Zero(), DataRate::Zero(), DataRate::Zero(), 0, 0, 0); stats_proxy_->OnSetEncoderTargetRate(0); + transport_queue_safety_->SetNotAlive(); } void VideoSendStreamImpl::SignalEncoderTimedOut() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); // If the encoder has not produced anything the last kEncoderTimeOut and it // is supposed to, deregister as BitrateAllocatorObserver. This can happen // if a camera stops producing frames. @@ -392,9 +403,9 @@ void VideoSendStreamImpl::SignalEncoderTimedOut() { void VideoSendStreamImpl::OnBitrateAllocationUpdated( const VideoBitrateAllocation& allocation) { // OnBitrateAllocationUpdated is invoked from the encoder task queue or - // the worker_queue_. - auto task = [this, allocation] { - RTC_DCHECK_RUN_ON(&thread_checker_); + // the rtp_transport_queue_. + auto task = [=] { + RTC_DCHECK_RUN_ON(rtp_transport_queue_); if (encoder_target_rate_bps_ == 0) { return; } @@ -430,9 +441,9 @@ void VideoSendStreamImpl::OnBitrateAllocationUpdated( // Send bitrate allocation metadata only if encoder is not paused. rtp_video_sender_->OnBitrateAllocationUpdated(allocation); }; - if (!worker_queue_->IsCurrent()) { - worker_queue_->PostTask( - SafeTask(worker_queue_safety_.flag(), std::move(task))); + if (!rtp_transport_queue_->IsCurrent()) { + rtp_transport_queue_->TaskQueueForPost()->PostTask( + SafeTask(transport_queue_safety_, std::move(task))); } else { task(); } @@ -446,7 +457,7 @@ void VideoSendStreamImpl::OnVideoLayersAllocationUpdated( } void VideoSendStreamImpl::SignalEncoderActive() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); if (rtp_video_sender_->IsActive()) { RTC_LOG(LS_INFO) << "SignalEncoderActive, Encoder is active."; bitrate_allocator_->AddObserver(this, GetAllocationConfig()); @@ -469,12 +480,12 @@ void VideoSendStreamImpl::OnEncoderConfigurationChanged( VideoEncoderConfig::ContentType content_type, int min_transmit_bitrate_bps) { // Currently called on the encoder TQ - RTC_DCHECK(!worker_queue_->IsCurrent()); + RTC_DCHECK(!rtp_transport_queue_->IsCurrent()); auto closure = [this, streams = std::move(streams), is_svc, content_type, min_transmit_bitrate_bps]() mutable { RTC_DCHECK_GE(config_->rtp.ssrcs.size(), streams.size()); TRACE_EVENT0("webrtc", "VideoSendStream::OnEncoderConfigurationChanged"); - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); const VideoCodecType codec_type = PayloadStringToCodecType(config_->rtp.payload_name); @@ -526,8 +537,8 @@ void VideoSendStreamImpl::OnEncoderConfigurationChanged( } }; - worker_queue_->PostTask( - SafeTask(worker_queue_safety_.flag(), std::move(closure))); + rtp_transport_queue_->TaskQueueForPost()->PostTask( + SafeTask(transport_queue_safety_, std::move(closure))); } EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage( @@ -539,10 +550,10 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage( // Indicate that there still is activity going on. activity_ = true; - RTC_DCHECK(!worker_queue_->IsCurrent()); + RTC_DCHECK(!rtp_transport_queue_->IsCurrent()); auto task_to_run_on_worker = [this]() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); if (disable_padding_) { disable_padding_ = false; // To ensure that padding bitrate is propagated to the bitrate allocator. @@ -555,8 +566,8 @@ EncodedImageCallback::Result VideoSendStreamImpl::OnEncodedImage( OnBitrateAllocationUpdated(*context->throttled_allocation); } }; - worker_queue_->PostTask( - SafeTask(worker_queue_safety_.flag(), std::move(task_to_run_on_worker))); + rtp_transport_queue_->TaskQueueForPost()->PostTask( + SafeTask(transport_queue_safety_, std::move(task_to_run_on_worker))); return rtp_video_sender_->OnEncodedImage(encoded_image, codec_specific_info); } @@ -576,7 +587,7 @@ std::map VideoSendStreamImpl::GetRtpPayloadStates() } uint32_t VideoSendStreamImpl::OnBitrateUpdated(BitrateAllocationUpdate update) { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(rtp_transport_queue_); RTC_DCHECK(rtp_video_sender_->IsActive()) << "VideoSendStream::Start has not been called."; diff --git a/video/video_send_stream_impl.h b/video/video_send_stream_impl.h index dc1bf89508..f145450655 100644 --- a/video/video_send_stream_impl.h +++ b/video/video_send_stream_impl.h @@ -121,14 +121,14 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver, void StartupVideoSendStream(); // Removes the bitrate observer, stops monitoring and notifies the video // encoder of the bitrate update. - void StopVideoSendStream() RTC_RUN_ON(thread_checker_); + void StopVideoSendStream() RTC_RUN_ON(rtp_transport_queue_); void ConfigureProtection(); void ConfigureSsrcs(); void SignalEncoderTimedOut(); void SignalEncoderActive(); MediaStreamAllocationConfig GetAllocationConfig() const - RTC_RUN_ON(thread_checker_); + RTC_RUN_ON(rtp_transport_queue_); RTC_NO_UNIQUE_ADDRESS SequenceChecker thread_checker_; Clock* const clock_; @@ -138,30 +138,31 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver, SendStatisticsProxy* const stats_proxy_; const VideoSendStream::Config* const config_; - TaskQueueBase* const worker_queue_; + MaybeWorkerThread* const rtp_transport_queue_; RepeatingTaskHandle check_encoder_activity_task_ - RTC_GUARDED_BY(thread_checker_); + RTC_GUARDED_BY(rtp_transport_queue_); std::atomic_bool activity_; - bool timed_out_ RTC_GUARDED_BY(thread_checker_); + bool timed_out_ RTC_GUARDED_BY(rtp_transport_queue_); RtpTransportControllerSendInterface* const transport_; BitrateAllocatorInterface* const bitrate_allocator_; - bool disable_padding_ RTC_GUARDED_BY(thread_checker_); - int max_padding_bitrate_ RTC_GUARDED_BY(thread_checker_); - int encoder_min_bitrate_bps_ RTC_GUARDED_BY(thread_checker_); - uint32_t encoder_max_bitrate_bps_ RTC_GUARDED_BY(thread_checker_); - uint32_t encoder_target_rate_bps_ RTC_GUARDED_BY(thread_checker_); - double encoder_bitrate_priority_ RTC_GUARDED_BY(thread_checker_); + bool disable_padding_; + int max_padding_bitrate_; + int encoder_min_bitrate_bps_; + uint32_t encoder_max_bitrate_bps_; + uint32_t encoder_target_rate_bps_; + double encoder_bitrate_priority_; VideoStreamEncoderInterface* const video_stream_encoder_; RtcpBandwidthObserver* const bandwidth_observer_; RtpVideoSenderInterface* const rtp_video_sender_; - ScopedTaskSafety worker_queue_safety_; + rtc::scoped_refptr transport_queue_safety_ = + PendingTaskSafetyFlag::CreateDetached(); // Context for the most recent and last sent video bitrate allocation. Used to // throttle sending of similar bitrate allocations. @@ -171,7 +172,7 @@ class VideoSendStreamImpl : public webrtc::BitrateAllocatorObserver, int64_t last_send_time_ms; }; absl::optional video_bitrate_allocation_context_ - RTC_GUARDED_BY(thread_checker_); + RTC_GUARDED_BY(rtp_transport_queue_); const absl::optional configured_pacing_factor_; }; } // namespace internal