From e6256055e7db560da560829a3e249898f9685d1f Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Fri, 4 May 2018 14:08:15 +0200 Subject: [PATCH] Moving task queue from Call to transport controller. Moving ownership of worker task queue in Call to RtpTransportControllerSend. This CL also ensures that the task queue is not destroyed until the process thread running SendSideCongestionController is stopped. The worker queue should be owned by RtpTransportControllerSend since it is mainly used for rtp and transport related tasks such as bitrate allocation and signaling network state. Bug: webrtc:9232 Change-Id: I211edf1a3b9f9b2572875d5584cb788cb2449ef9 Reviewed-on: https://webrtc-review.googlesource.com/63023 Commit-Queue: Sebastian Jansson Reviewed-by: Niels Moller Cr-Commit-Position: refs/heads/master@{#23119} --- call/BUILD.gn | 1 + call/call.cc | 97 ++++++++++--------- call/rtp_transport_controller_send.cc | 25 ++++- call/rtp_transport_controller_send.h | 6 ++ .../rtp_transport_controller_send_interface.h | 2 + .../test/mock_rtp_transport_controller_send.h | 1 + 6 files changed, 80 insertions(+), 52 deletions(-) diff --git a/call/BUILD.gn b/call/BUILD.gn index 1924e26780..cf3a592bcc 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -110,6 +110,7 @@ rtc_source_set("rtp_sender") { "../modules/utility", "../rtc_base:rtc_base", "../rtc_base:rtc_base_approved", + "../rtc_base:rtc_task_queue", "../system_wrappers:field_trial_api", ] } diff --git a/call/call.cc b/call/call.cc index 4f18330e2f..aadc27e123 100644 --- a/call/call.cc +++ b/call/call.cc @@ -166,11 +166,11 @@ std::unique_ptr CreateRtcLogStreamConfig( namespace internal { -class Call : public webrtc::Call, - public PacketReceiver, - public RecoveredPacketReceiver, - public TargetTransferRateObserver, - public BitrateAllocator::LimitObserver { +class Call final : public webrtc::Call, + public PacketReceiver, + public RecoveredPacketReceiver, + public TargetTransferRateObserver, + public BitrateAllocator::LimitObserver { public: Call(const Call::Config& config, std::unique_ptr transport_send); @@ -362,19 +362,21 @@ class Call : public webrtc::Call, AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_); RateLimiter retransmission_rate_limiter_; - std::unique_ptr transport_send_; ReceiveSideCongestionController receive_side_cc_; const std::unique_ptr receive_time_calculator_; const std::unique_ptr video_send_delay_stats_; const int64_t start_ms_; - // TODO(perkj): |worker_queue_| is supposed to replace - // |module_process_thread_|. - // |worker_queue| is defined last to ensure all pending tasks are cancelled - // and deleted before any other members. - rtc::TaskQueue worker_queue_; + // Caches transport_send_.get(), to avoid racing with destructor. + // Note that this is declared before transport_send_ to ensure that it is not + // invalidated until no more tasks can be running on the transport_send_ task + // queue. + RtpTransportControllerSendInterface* transport_send_ptr_; + // Declared last since it will issue callbacks from a task queue. Declaring it + // last ensures that it is destroyed first and any running tasks are finished. + std::unique_ptr transport_send_; RTC_DISALLOW_COPY_AND_ASSIGN(Call); }; } // namespace internal @@ -444,11 +446,11 @@ Call::Call(const Call::Config& config, receive_side_cc_(clock_, transport_send->packet_router()), receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()), video_send_delay_stats_(new SendDelayStats(clock_)), - start_ms_(clock_->TimeInMilliseconds()), - worker_queue_("call_worker_queue") { + start_ms_(clock_->TimeInMilliseconds()) { RTC_DCHECK(config.event_log != nullptr); transport_send->RegisterTargetTransferRateObserver(this); transport_send_ = std::move(transport_send); + transport_send_ptr_ = transport_send_.get(); call_stats_->RegisterStatsObserver(&receive_side_cc_); call_stats_->RegisterStatsObserver(transport_send_->GetCallStatsObserver()); @@ -591,10 +593,14 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } } + // TODO(srte): AudioSendStream should call GetWorkerQueue directly rather than + // having it injected. + AudioSendStream* send_stream = new AudioSendStream( - config, config_.audio_state, &worker_queue_, module_process_thread_.get(), - transport_send_.get(), bitrate_allocator_.get(), event_log_, - call_stats_.get(), suspended_rtp_state, &sent_rtp_audio_timer_ms_); + config, config_.audio_state, transport_send_ptr_->GetWorkerQueue(), + module_process_thread_.get(), transport_send_ptr_, + bitrate_allocator_.get(), event_log_, call_stats_.get(), + suspended_rtp_state, &sent_rtp_audio_timer_ms_); { WriteLockScoped write_lock(*send_crit_); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == @@ -649,7 +655,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( event_log_->Log(rtc::MakeUnique( CreateRtcLogStreamConfig(config))); AudioReceiveStream* receive_stream = new AudioReceiveStream( - &audio_receiver_controller_, transport_send_->packet_router(), + &audio_receiver_controller_, transport_send_ptr_->packet_router(), module_process_thread_.get(), config, config_.audio_state, event_log_); { WriteLockScoped write_lock(*receive_crit_); @@ -718,9 +724,12 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( // Copy ssrcs from |config| since |config| is moved. std::vector ssrcs = config.rtp.ssrcs; + // TODO(srte): VideoSendStream should call GetWorkerQueue directly rather than + // having it injected. VideoSendStream* send_stream = new VideoSendStream( - num_cpu_cores_, module_process_thread_.get(), &worker_queue_, - call_stats_.get(), transport_send_.get(), bitrate_allocator_.get(), + num_cpu_cores_, module_process_thread_.get(), + transport_send_ptr_->GetWorkerQueue(), call_stats_.get(), + transport_send_ptr_, bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_, std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, suspended_video_payload_states_, std::move(fec_controller), @@ -799,7 +808,7 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( VideoReceiveStream* receive_stream = new VideoReceiveStream( &video_receiver_controller_, num_cpu_cores_, - transport_send_->packet_router(), std::move(configuration), + transport_send_ptr_->packet_router(), std::move(configuration), module_process_thread_.get(), call_stats_.get()); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); @@ -907,7 +916,7 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { } RtpTransportControllerSendInterface* Call::GetTransportControllerSend() { - return transport_send_.get(); + return transport_send_ptr_; } Call::Stats Call::GetStats() const { @@ -930,8 +939,9 @@ Call::Stats Call::GetStats() const { // available. { rtc::CritScope cs(&aggregate_network_up_crit_); - stats.pacer_delay_ms = - aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0; + stats.pacer_delay_ms = aggregate_network_up_ + ? transport_send_ptr_->GetPacerQueuingDelayMs() + : 0; } stats.rtt_ms = call_stats_->LastProcessedRtt(); @@ -945,19 +955,19 @@ Call::Stats Call::GetStats() const { void Call::SetBitrateAllocationStrategy( std::unique_ptr bitrate_allocation_strategy) { - if (!worker_queue_.IsCurrent()) { - rtc::BitrateAllocationStrategy* strategy_raw = - bitrate_allocation_strategy.release(); - auto functor = [this, strategy_raw]() { - SetBitrateAllocationStrategy( - rtc::WrapUnique(strategy_raw)); - }; - worker_queue_.PostTask([functor] { functor(); }); - return; - } - RTC_DCHECK_RUN_ON(&worker_queue_); - bitrate_allocator_->SetBitrateAllocationStrategy( - std::move(bitrate_allocation_strategy)); + // TODO(srte): This function should be moved to RtpTransportControllerSend + // when BitrateAllocator is moved there. + struct Functor { + void operator()() { + bitrate_allocator_->SetBitrateAllocationStrategy( + std::move(bitrate_allocation_strategy_)); + } + BitrateAllocator* bitrate_allocator_; + std::unique_ptr + bitrate_allocation_strategy_; + }; + transport_send_ptr_->GetWorkerQueue()->PostTask(Functor{ + bitrate_allocator_.get(), std::move(bitrate_allocation_strategy)}); } void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { @@ -1050,23 +1060,16 @@ void Call::UpdateAggregateNetworkState() { rtc::CritScope cs(&aggregate_network_up_crit_); aggregate_network_up_ = aggregate_network_up; } - transport_send_->OnNetworkAvailability(aggregate_network_up); + transport_send_ptr_->OnNetworkAvailability(aggregate_network_up); } void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { video_send_delay_stats_->OnSentPacket(sent_packet.packet_id, clock_->TimeInMilliseconds()); - transport_send_->OnSentPacket(sent_packet); + transport_send_ptr_->OnSentPacket(sent_packet); } void Call::OnTargetTransferRate(TargetTransferRate msg) { - // TODO(perkj): Consider making sure CongestionController operates on - // |worker_queue_|. - if (!worker_queue_.IsCurrent()) { - worker_queue_.PostTask([this, msg] { OnTargetTransferRate(msg); }); - return; - } - RTC_DCHECK_RUN_ON(&worker_queue_); uint32_t target_bitrate_bps = msg.target_rate.bps(); int loss_ratio_255 = msg.network_estimate.loss_rate_ratio * 255; uint8_t fraction_loss = @@ -1116,9 +1119,9 @@ void Call::OnAllocationLimitsChanged(uint32_t min_send_bitrate_bps, uint32_t max_padding_bitrate_bps, uint32_t total_bitrate_bps, bool has_packet_feedback) { - transport_send_->SetAllocatedSendBitrateLimits( + transport_send_ptr_->SetAllocatedSendBitrateLimits( min_send_bitrate_bps, max_padding_bitrate_bps, total_bitrate_bps); - transport_send_->SetPerPacketFeedbackAvailable(has_packet_feedback); + transport_send_ptr_->SetPerPacketFeedbackAvailable(has_packet_feedback); rtc::CritScope lock(&bitrate_crit_); min_allocated_send_bitrate_bps_ = min_send_bitrate_bps; configured_max_padding_bitrate_bps_ = max_padding_bitrate_bps; diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 23d8e0ae9d..e86cca37c3 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -63,7 +63,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( event_log, &pacer_, bitrate_config, - TaskQueueExperimentEnabled())) { + TaskQueueExperimentEnabled())), + task_queue_("rtp_send_controller") { send_side_cc_ptr_ = send_side_cc_.get(); process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE); @@ -92,10 +93,24 @@ void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps, msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps); msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0; msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms); - rtc::CritScope cs(&observer_crit_); - // We wont register as observer until we have an observer. - RTC_DCHECK(observer_ != nullptr); - observer_->OnTargetTransferRate(msg); + + if (!task_queue_.IsCurrent()) { + task_queue_.PostTask([this, msg] { + rtc::CritScope cs(&observer_crit_); + // We won't register as observer until we have an observer. + RTC_DCHECK(observer_ != nullptr); + observer_->OnTargetTransferRate(msg); + }); + } else { + rtc::CritScope cs(&observer_crit_); + // We won't register as observer until we have an observer. + RTC_DCHECK(observer_ != nullptr); + observer_->OnTargetTransferRate(msg); + } +} + +rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() { + return &task_queue_; } PacketRouter* RtpTransportControllerSend::packet_router() { diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index c7b9b037c9..ea4910da9f 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -23,6 +23,7 @@ #include "modules/utility/include/process_thread.h" #include "rtc_base/constructormagic.h" #include "rtc_base/networkroute.h" +#include "rtc_base/task_queue.h" namespace webrtc { class Clock; @@ -47,6 +48,7 @@ class RtpTransportControllerSend final int64_t probing_interval_ms) override; // Implements RtpTransportControllerSendInterface + rtc::TaskQueue* GetWorkerQueue() override; PacketRouter* packet_router() override; TransportFeedbackObserver* transport_feedback_observer() override; @@ -102,6 +104,10 @@ class RtpTransportControllerSend final // Declared last since it will issue callbacks from a task queue. Declaring it // last ensures that it is destroyed first. const std::unique_ptr send_side_cc_; + // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. + // |task_queue_| is defined last to ensure all pending tasks are cancelled + // and deleted before any other members. + rtc::TaskQueue task_queue_; RTC_DISALLOW_COPY_AND_ASSIGN(RtpTransportControllerSend); }; diff --git a/call/rtp_transport_controller_send_interface.h b/call/rtp_transport_controller_send_interface.h index 7458b3790c..bb95bc9137 100644 --- a/call/rtp_transport_controller_send_interface.h +++ b/call/rtp_transport_controller_send_interface.h @@ -21,6 +21,7 @@ namespace rtc { struct SentPacket; struct NetworkRoute; +class TaskQueue; } // namespace rtc namespace webrtc { @@ -62,6 +63,7 @@ class TransportFeedbackObserver; class RtpTransportControllerSendInterface { public: virtual ~RtpTransportControllerSendInterface() {} + virtual rtc::TaskQueue* GetWorkerQueue() = 0; virtual PacketRouter* packet_router() = 0; virtual TransportFeedbackObserver* transport_feedback_observer() = 0; diff --git a/call/test/mock_rtp_transport_controller_send.h b/call/test/mock_rtp_transport_controller_send.h index 1878f1d09f..d903eef20b 100644 --- a/call/test/mock_rtp_transport_controller_send.h +++ b/call/test/mock_rtp_transport_controller_send.h @@ -27,6 +27,7 @@ namespace webrtc { class MockRtpTransportControllerSend : public RtpTransportControllerSendInterface { public: + MOCK_METHOD0(GetWorkerQueue, rtc::TaskQueue*()); MOCK_METHOD0(packet_router, PacketRouter*()); MOCK_METHOD0(transport_feedback_observer, TransportFeedbackObserver*()); MOCK_METHOD0(packet_sender, RtpPacketSender*());