diff --git a/call/BUILD.gn b/call/BUILD.gn index 1924e26780..cf3a592bcc 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -110,6 +110,7 @@ rtc_source_set("rtp_sender") { "../modules/utility", "../rtc_base:rtc_base", "../rtc_base:rtc_base_approved", + "../rtc_base:rtc_task_queue", "../system_wrappers:field_trial_api", ] } diff --git a/call/call.cc b/call/call.cc index 4f18330e2f..aadc27e123 100644 --- a/call/call.cc +++ b/call/call.cc @@ -166,11 +166,11 @@ std::unique_ptr CreateRtcLogStreamConfig( namespace internal { -class Call : public webrtc::Call, - public PacketReceiver, - public RecoveredPacketReceiver, - public TargetTransferRateObserver, - public BitrateAllocator::LimitObserver { +class Call final : public webrtc::Call, + public PacketReceiver, + public RecoveredPacketReceiver, + public TargetTransferRateObserver, + public BitrateAllocator::LimitObserver { public: Call(const Call::Config& config, std::unique_ptr transport_send); @@ -362,19 +362,21 @@ class Call : public webrtc::Call, AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_); RateLimiter retransmission_rate_limiter_; - std::unique_ptr transport_send_; ReceiveSideCongestionController receive_side_cc_; const std::unique_ptr receive_time_calculator_; const std::unique_ptr video_send_delay_stats_; const int64_t start_ms_; - // TODO(perkj): |worker_queue_| is supposed to replace - // |module_process_thread_|. - // |worker_queue| is defined last to ensure all pending tasks are cancelled - // and deleted before any other members. - rtc::TaskQueue worker_queue_; + // Caches transport_send_.get(), to avoid racing with destructor. + // Note that this is declared before transport_send_ to ensure that it is not + // invalidated until no more tasks can be running on the transport_send_ task + // queue. + RtpTransportControllerSendInterface* transport_send_ptr_; + // Declared last since it will issue callbacks from a task queue. Declaring it + // last ensures that it is destroyed first and any running tasks are finished. + std::unique_ptr transport_send_; RTC_DISALLOW_COPY_AND_ASSIGN(Call); }; } // namespace internal @@ -444,11 +446,11 @@ Call::Call(const Call::Config& config, receive_side_cc_(clock_, transport_send->packet_router()), receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()), video_send_delay_stats_(new SendDelayStats(clock_)), - start_ms_(clock_->TimeInMilliseconds()), - worker_queue_("call_worker_queue") { + start_ms_(clock_->TimeInMilliseconds()) { RTC_DCHECK(config.event_log != nullptr); transport_send->RegisterTargetTransferRateObserver(this); transport_send_ = std::move(transport_send); + transport_send_ptr_ = transport_send_.get(); call_stats_->RegisterStatsObserver(&receive_side_cc_); call_stats_->RegisterStatsObserver(transport_send_->GetCallStatsObserver()); @@ -591,10 +593,14 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } } + // TODO(srte): AudioSendStream should call GetWorkerQueue directly rather than + // having it injected. + AudioSendStream* send_stream = new AudioSendStream( - config, config_.audio_state, &worker_queue_, module_process_thread_.get(), - transport_send_.get(), bitrate_allocator_.get(), event_log_, - call_stats_.get(), suspended_rtp_state, &sent_rtp_audio_timer_ms_); + config, config_.audio_state, transport_send_ptr_->GetWorkerQueue(), + module_process_thread_.get(), transport_send_ptr_, + bitrate_allocator_.get(), event_log_, call_stats_.get(), + suspended_rtp_state, &sent_rtp_audio_timer_ms_); { WriteLockScoped write_lock(*send_crit_); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == @@ -649,7 +655,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( event_log_->Log(rtc::MakeUnique( CreateRtcLogStreamConfig(config))); AudioReceiveStream* receive_stream = new AudioReceiveStream( - &audio_receiver_controller_, transport_send_->packet_router(), + &audio_receiver_controller_, transport_send_ptr_->packet_router(), module_process_thread_.get(), config, config_.audio_state, event_log_); { WriteLockScoped write_lock(*receive_crit_); @@ -718,9 +724,12 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( // Copy ssrcs from |config| since |config| is moved. std::vector ssrcs = config.rtp.ssrcs; + // TODO(srte): VideoSendStream should call GetWorkerQueue directly rather than + // having it injected. VideoSendStream* send_stream = new VideoSendStream( - num_cpu_cores_, module_process_thread_.get(), &worker_queue_, - call_stats_.get(), transport_send_.get(), bitrate_allocator_.get(), + num_cpu_cores_, module_process_thread_.get(), + transport_send_ptr_->GetWorkerQueue(), call_stats_.get(), + transport_send_ptr_, bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_, std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, suspended_video_payload_states_, std::move(fec_controller), @@ -799,7 +808,7 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( VideoReceiveStream* receive_stream = new VideoReceiveStream( &video_receiver_controller_, num_cpu_cores_, - transport_send_->packet_router(), std::move(configuration), + transport_send_ptr_->packet_router(), std::move(configuration), module_process_thread_.get(), call_stats_.get()); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); @@ -907,7 +916,7 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { } RtpTransportControllerSendInterface* Call::GetTransportControllerSend() { - return transport_send_.get(); + return transport_send_ptr_; } Call::Stats Call::GetStats() const { @@ -930,8 +939,9 @@ Call::Stats Call::GetStats() const { // available. { rtc::CritScope cs(&aggregate_network_up_crit_); - stats.pacer_delay_ms = - aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0; + stats.pacer_delay_ms = aggregate_network_up_ + ? transport_send_ptr_->GetPacerQueuingDelayMs() + : 0; } stats.rtt_ms = call_stats_->LastProcessedRtt(); @@ -945,19 +955,19 @@ Call::Stats Call::GetStats() const { void Call::SetBitrateAllocationStrategy( std::unique_ptr bitrate_allocation_strategy) { - if (!worker_queue_.IsCurrent()) { - rtc::BitrateAllocationStrategy* strategy_raw = - bitrate_allocation_strategy.release(); - auto functor = [this, strategy_raw]() { - SetBitrateAllocationStrategy( - rtc::WrapUnique(strategy_raw)); - }; - worker_queue_.PostTask([functor] { functor(); }); - return; - } - RTC_DCHECK_RUN_ON(&worker_queue_); - bitrate_allocator_->SetBitrateAllocationStrategy( - std::move(bitrate_allocation_strategy)); + // TODO(srte): This function should be moved to RtpTransportControllerSend + // when BitrateAllocator is moved there. + struct Functor { + void operator()() { + bitrate_allocator_->SetBitrateAllocationStrategy( + std::move(bitrate_allocation_strategy_)); + } + BitrateAllocator* bitrate_allocator_; + std::unique_ptr + bitrate_allocation_strategy_; + }; + transport_send_ptr_->GetWorkerQueue()->PostTask(Functor{ + bitrate_allocator_.get(), std::move(bitrate_allocation_strategy)}); } void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { @@ -1050,23 +1060,16 @@ void Call::UpdateAggregateNetworkState() { rtc::CritScope cs(&aggregate_network_up_crit_); aggregate_network_up_ = aggregate_network_up; } - transport_send_->OnNetworkAvailability(aggregate_network_up); + transport_send_ptr_->OnNetworkAvailability(aggregate_network_up); } void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { video_send_delay_stats_->OnSentPacket(sent_packet.packet_id, clock_->TimeInMilliseconds()); - transport_send_->OnSentPacket(sent_packet); + transport_send_ptr_->OnSentPacket(sent_packet); } void Call::OnTargetTransferRate(TargetTransferRate msg) { - // TODO(perkj): Consider making sure CongestionController operates on - // |worker_queue_|. - if (!worker_queue_.IsCurrent()) { - worker_queue_.PostTask([this, msg] { OnTargetTransferRate(msg); }); - return; - } - RTC_DCHECK_RUN_ON(&worker_queue_); uint32_t target_bitrate_bps = msg.target_rate.bps(); int loss_ratio_255 = msg.network_estimate.loss_rate_ratio * 255; uint8_t fraction_loss = @@ -1116,9 +1119,9 @@ void Call::OnAllocationLimitsChanged(uint32_t min_send_bitrate_bps, uint32_t max_padding_bitrate_bps, uint32_t total_bitrate_bps, bool has_packet_feedback) { - transport_send_->SetAllocatedSendBitrateLimits( + transport_send_ptr_->SetAllocatedSendBitrateLimits( min_send_bitrate_bps, max_padding_bitrate_bps, total_bitrate_bps); - transport_send_->SetPerPacketFeedbackAvailable(has_packet_feedback); + transport_send_ptr_->SetPerPacketFeedbackAvailable(has_packet_feedback); rtc::CritScope lock(&bitrate_crit_); min_allocated_send_bitrate_bps_ = min_send_bitrate_bps; configured_max_padding_bitrate_bps_ = max_padding_bitrate_bps; diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 23d8e0ae9d..e86cca37c3 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -63,7 +63,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( event_log, &pacer_, bitrate_config, - TaskQueueExperimentEnabled())) { + TaskQueueExperimentEnabled())), + task_queue_("rtp_send_controller") { send_side_cc_ptr_ = send_side_cc_.get(); process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE); @@ -92,10 +93,24 @@ void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps, msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps); msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0; msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms); - rtc::CritScope cs(&observer_crit_); - // We wont register as observer until we have an observer. - RTC_DCHECK(observer_ != nullptr); - observer_->OnTargetTransferRate(msg); + + if (!task_queue_.IsCurrent()) { + task_queue_.PostTask([this, msg] { + rtc::CritScope cs(&observer_crit_); + // We won't register as observer until we have an observer. + RTC_DCHECK(observer_ != nullptr); + observer_->OnTargetTransferRate(msg); + }); + } else { + rtc::CritScope cs(&observer_crit_); + // We won't register as observer until we have an observer. + RTC_DCHECK(observer_ != nullptr); + observer_->OnTargetTransferRate(msg); + } +} + +rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() { + return &task_queue_; } PacketRouter* RtpTransportControllerSend::packet_router() { diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index c7b9b037c9..ea4910da9f 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -23,6 +23,7 @@ #include "modules/utility/include/process_thread.h" #include "rtc_base/constructormagic.h" #include "rtc_base/networkroute.h" +#include "rtc_base/task_queue.h" namespace webrtc { class Clock; @@ -47,6 +48,7 @@ class RtpTransportControllerSend final int64_t probing_interval_ms) override; // Implements RtpTransportControllerSendInterface + rtc::TaskQueue* GetWorkerQueue() override; PacketRouter* packet_router() override; TransportFeedbackObserver* transport_feedback_observer() override; @@ -102,6 +104,10 @@ class RtpTransportControllerSend final // Declared last since it will issue callbacks from a task queue. Declaring it // last ensures that it is destroyed first. const std::unique_ptr send_side_cc_; + // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. + // |task_queue_| is defined last to ensure all pending tasks are cancelled + // and deleted before any other members. + rtc::TaskQueue task_queue_; RTC_DISALLOW_COPY_AND_ASSIGN(RtpTransportControllerSend); }; diff --git a/call/rtp_transport_controller_send_interface.h b/call/rtp_transport_controller_send_interface.h index 7458b3790c..bb95bc9137 100644 --- a/call/rtp_transport_controller_send_interface.h +++ b/call/rtp_transport_controller_send_interface.h @@ -21,6 +21,7 @@ namespace rtc { struct SentPacket; struct NetworkRoute; +class TaskQueue; } // namespace rtc namespace webrtc { @@ -62,6 +63,7 @@ class TransportFeedbackObserver; class RtpTransportControllerSendInterface { public: virtual ~RtpTransportControllerSendInterface() {} + virtual rtc::TaskQueue* GetWorkerQueue() = 0; virtual PacketRouter* packet_router() = 0; virtual TransportFeedbackObserver* transport_feedback_observer() = 0; diff --git a/call/test/mock_rtp_transport_controller_send.h b/call/test/mock_rtp_transport_controller_send.h index 1878f1d09f..d903eef20b 100644 --- a/call/test/mock_rtp_transport_controller_send.h +++ b/call/test/mock_rtp_transport_controller_send.h @@ -27,6 +27,7 @@ namespace webrtc { class MockRtpTransportControllerSend : public RtpTransportControllerSendInterface { public: + MOCK_METHOD0(GetWorkerQueue, rtc::TaskQueue*()); MOCK_METHOD0(packet_router, PacketRouter*()); MOCK_METHOD0(transport_feedback_observer, TransportFeedbackObserver*()); MOCK_METHOD0(packet_sender, RtpPacketSender*());