diff --git a/call/call.cc b/call/call.cc index aea03af8d3..acb49f24d4 100644 --- a/call/call.cc +++ b/call/call.cc @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -169,34 +170,6 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() { return current; } -// Called from the destructor of Call to report the collected send histograms. -void UpdateSendHistograms(Timestamp now, - Timestamp first_sent_packet, - AvgCounter& estimated_send_bitrate_kbps_counter, - AvgCounter& pacer_bitrate_kbps_counter) { - TimeDelta elapsed = now - first_sent_packet; - if (elapsed.seconds() < metrics::kMinRunTimeInSeconds) - return; - - const int kMinRequiredPeriodicSamples = 5; - AggregatedStats send_bitrate_stats = - estimated_send_bitrate_kbps_counter.ProcessAndGetStats(); - if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps", - send_bitrate_stats.average); - RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, " - << send_bitrate_stats.ToString(); - } - AggregatedStats pacer_bitrate_stats = - pacer_bitrate_kbps_counter.ProcessAndGetStats(); - if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps", - pacer_bitrate_stats.average); - RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, " - << pacer_bitrate_stats.ToString(); - } -} - } // namespace namespace internal { @@ -334,7 +307,7 @@ class Call final : public webrtc::Call, void AddReceivedVideoBytes(int bytes, webrtc::Timestamp arrival_time); private: - SequenceChecker sequence_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_; RateCounter received_bytes_per_second_counter_ RTC_GUARDED_BY(sequence_checker_); RateCounter received_audio_bytes_per_second_counter_ @@ -353,6 +326,32 @@ class Call final : public webrtc::Call, RTC_GUARDED_BY(sequence_checker_); }; + // Thread-compatible class that collects sent packet stats and exposes + // them as UMA histograms on destruction, provided SetFirstPacketTime was + // called with a non-empty packet timestamp before the destructor. + class SendStats { + public: + explicit SendStats(Clock* clock); + ~SendStats(); + + void SetFirstPacketTime(absl::optional first_sent_packet_time); + void PauseSendAndPacerBitrateCounters(); + void AddTargetBitrateSample(uint32_t target_bitrate_bps); + void SetMinAllocatableRate(BitrateAllocationLimits limits); + + private: + RTC_NO_UNIQUE_ADDRESS SequenceChecker destructor_sequence_checker_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_; + Clock* const clock_ RTC_GUARDED_BY(destructor_sequence_checker_); + AvgCounter estimated_send_bitrate_kbps_counter_ + RTC_GUARDED_BY(sequence_checker_); + AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(sequence_checker_); + uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(sequence_checker_){ + 0}; + absl::optional first_sent_packet_time_ + RTC_GUARDED_BY(destructor_sequence_checker_); + }; + void DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) RTC_RUN_ON(network_thread_); DeliveryStatus DeliverRtp(MediaType media_type, @@ -374,6 +373,7 @@ class Call final : public webrtc::Call, TaskQueueFactory* const task_queue_factory_; TaskQueueBase* const worker_thread_; TaskQueueBase* const network_thread_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker send_transport_sequence_checker_; const int num_cpu_cores_; const rtc::scoped_refptr module_process_thread_; @@ -445,6 +445,10 @@ class Call final : public webrtc::Call, std::map video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_); std::set video_send_streams_ RTC_GUARDED_BY(worker_thread_); + // True if |video_send_streams_| is empty, false if not. The atomic variable + // is used to decide UMA send statistics behavior and enables avoiding a + // PostTask(). + std::atomic video_send_streams_empty_{true}; // Each forwarder wraps an adaptation resource that was added to the call. std::vector> @@ -460,23 +464,21 @@ class Call final : public webrtc::Call, webrtc::RtcEventLog* const event_log_; - // TODO(bugs.webrtc.org/11993) ready to move receive stats access to the - // network thread. + // TODO(bugs.webrtc.org/11993) ready to move stats access to the network + // thread. ReceiveStats receive_stats_ RTC_GUARDED_BY(worker_thread_); - - uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_); - uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); - uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); - AvgCounter estimated_send_bitrate_kbps_counter_ - RTC_GUARDED_BY(worker_thread_); - AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_); + SendStats send_stats_ RTC_GUARDED_BY(send_transport_sequence_checker_); + // |last_bandwidth_bps_| and |configured_max_padding_bitrate_bps_| being + // atomic avoids a PostTask. The variables are used for stats gathering. + std::atomic last_bandwidth_bps_{0}; + std::atomic configured_max_padding_bitrate_bps_{0}; ReceiveSideCongestionController receive_side_cc_; const std::unique_ptr receive_time_calculator_; const std::unique_ptr video_send_delay_stats_; - const int64_t start_ms_; + const Timestamp start_of_call_; // Note that |task_safety_| needs to be at a greater scope than the task queue // owned by |transport_send_| since calls might arrive on the network thread @@ -491,11 +493,10 @@ class Call final : public webrtc::Call, // https://webrtc-review.googlesource.com/c/src/+/63023/9/call/call.cc // https://bugs.chromium.org/p/chromium/issues/detail?id=992640 RtpTransportControllerSendInterface* const transport_send_ptr_ - RTC_GUARDED_BY(send_transport_queue_); + RTC_GUARDED_BY(send_transport_sequence_checker_); // Declared last since it will issue callbacks from a task queue. Declaring it // last ensures that it is destroyed first and any running tasks are finished. const std::unique_ptr transport_send_; - rtc::TaskQueue* const send_transport_queue_; bool is_started_ RTC_GUARDED_BY(worker_thread_) = false; @@ -743,6 +744,69 @@ Call::ReceiveStats::~ReceiveStats() { } } +Call::SendStats::SendStats(Clock* clock) + : clock_(clock), + estimated_send_bitrate_kbps_counter_(clock, nullptr, true), + pacer_bitrate_kbps_counter_(clock, nullptr, true) { + destructor_sequence_checker_.Detach(); + sequence_checker_.Detach(); +} + +Call::SendStats::~SendStats() { + RTC_DCHECK_RUN_ON(&destructor_sequence_checker_); + if (!first_sent_packet_time_) + return; + + TimeDelta elapsed = clock_->CurrentTime() - *first_sent_packet_time_; + if (elapsed.seconds() < metrics::kMinRunTimeInSeconds) + return; + + const int kMinRequiredPeriodicSamples = 5; + AggregatedStats send_bitrate_stats = + estimated_send_bitrate_kbps_counter_.ProcessAndGetStats(); + if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps", + send_bitrate_stats.average); + RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, " + << send_bitrate_stats.ToString(); + } + AggregatedStats pacer_bitrate_stats = + pacer_bitrate_kbps_counter_.ProcessAndGetStats(); + if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { + RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps", + pacer_bitrate_stats.average); + RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, " + << pacer_bitrate_stats.ToString(); + } +} + +void Call::SendStats::SetFirstPacketTime( + absl::optional first_sent_packet_time) { + RTC_DCHECK_RUN_ON(&destructor_sequence_checker_); + first_sent_packet_time_ = first_sent_packet_time; +} + +void Call::SendStats::PauseSendAndPacerBitrateCounters() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + estimated_send_bitrate_kbps_counter_.ProcessAndPause(); + pacer_bitrate_kbps_counter_.ProcessAndPause(); +} + +void Call::SendStats::AddTargetBitrateSample(uint32_t target_bitrate_bps) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); + // Pacer bitrate may be higher than bitrate estimate if enforcing min + // bitrate. + uint32_t pacer_bitrate_bps = + std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); + pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); +} + +void Call::SendStats::SetMinAllocatableRate(BitrateAllocationLimits limits) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); +} + Call::Call(Clock* clock, const Call::Config& config, std::unique_ptr transport_send, @@ -766,11 +830,7 @@ Call::Call(Clock* clock, aggregate_network_up_(false), event_log_(config.event_log), receive_stats_(clock_), - last_bandwidth_bps_(0), - min_allocated_send_bitrate_bps_(0), - configured_max_padding_bitrate_bps_(0), - estimated_send_bitrate_kbps_counter_(clock_, nullptr, true), - pacer_bitrate_kbps_counter_(clock_, nullptr, true), + send_stats_(clock_), receive_side_cc_(clock, absl::bind_front(&PacketRouter::SendCombinedRtcpPacket, transport_send->packet_router()), @@ -779,15 +839,15 @@ Call::Call(Clock* clock, /*network_state_estimator=*/nullptr), receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()), video_send_delay_stats_(new SendDelayStats(clock_)), - start_ms_(clock_->TimeInMilliseconds()), + start_of_call_(clock_->CurrentTime()), transport_send_ptr_(transport_send.get()), - transport_send_(std::move(transport_send)), - send_transport_queue_(transport_send_->GetWorkerQueue()) { + transport_send_(std::move(transport_send)) { RTC_DCHECK(config.event_log != nullptr); RTC_DCHECK(config.trials != nullptr); RTC_DCHECK(network_thread_); RTC_DCHECK(worker_thread_->IsCurrent()); - RTC_DCHECK(send_transport_queue_); + + send_transport_sequence_checker_.Detach(); // Do not remove this call; it is here to convince the compiler that the // WebRTC source timestamp string needs to be in the final binary. @@ -814,22 +874,11 @@ Call::~Call() { receive_side_cc_.GetRemoteBitrateEstimator(true)); module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_); call_stats_->DeregisterStatsObserver(&receive_side_cc_); + send_stats_.SetFirstPacketTime(transport_send_->GetFirstPacketTime()); - absl::optional first_sent_packet_time = - transport_send_->GetFirstPacketTime(); - - Timestamp now = clock_->CurrentTime(); - - // Only update histograms after process threads have been shut down, so that - // they won't try to concurrently update stats. - if (first_sent_packet_time) { - UpdateSendHistograms(now, *first_sent_packet_time, - estimated_send_bitrate_kbps_counter_, - pacer_bitrate_kbps_counter_); - } - - RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.LifetimeInSeconds", - (now.ms() - start_ms_) / 1000); + RTC_HISTOGRAM_COUNTS_100000( + "WebRTC.Call.LifetimeInSeconds", + (clock_->CurrentTime() - start_of_call_).seconds()); } void Call::EnsureStarted() { @@ -1027,6 +1076,8 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( video_send_ssrcs_[ssrc] = send_stream; } video_send_streams_.insert(send_stream); + video_send_streams_empty_.store(false, std::memory_order_relaxed); + // Forward resources that were previously added to the call to the new stream. for (const auto& resource_forwarder : adaptation_resource_forwarders_) { resource_forwarder->OnCreateVideoSendStream(send_stream); @@ -1075,6 +1126,8 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { resource_forwarder->OnDestroyVideoSendStream(send_stream_impl); } video_send_streams_.erase(send_stream_impl); + if (video_send_streams_.empty()) + video_send_streams_empty_.store(true, std::memory_order_relaxed); RTC_CHECK(send_stream_impl != nullptr); @@ -1248,8 +1301,10 @@ Call::Stats Call::GetStats() const { receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate( &ssrcs, &recv_bandwidth); stats.recv_bandwidth_bps = recv_bandwidth; - stats.send_bandwidth_bps = last_bandwidth_bps_; - stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_; + stats.send_bandwidth_bps = + last_bandwidth_bps_.load(std::memory_order_relaxed); + stats.max_padding_bitrate_bps = + configured_max_padding_bitrate_bps_.load(std::memory_order_relaxed); return stats; } @@ -1353,50 +1408,40 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { } void Call::OnStartRateUpdate(DataRate start_rate) { - RTC_DCHECK_RUN_ON(send_transport_queue_); + RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); bitrate_allocator_->UpdateStartRate(start_rate.bps()); } void Call::OnTargetTransferRate(TargetTransferRate msg) { - RTC_DCHECK_RUN_ON(send_transport_queue_); + RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); uint32_t target_bitrate_bps = msg.target_rate.bps(); // For controlling the rate of feedback messages. receive_side_cc_.OnBitrateChanged(target_bitrate_bps); bitrate_allocator_->OnNetworkEstimateChanged(msg); - worker_thread_->PostTask( - ToQueuedTask(task_safety_, [this, target_bitrate_bps]() { - RTC_DCHECK_RUN_ON(worker_thread_); - last_bandwidth_bps_ = target_bitrate_bps; + last_bandwidth_bps_.store(target_bitrate_bps, std::memory_order_relaxed); - // Ignore updates if bitrate is zero (the aggregate network state is - // down) or if we're not sending video. - if (target_bitrate_bps == 0 || video_send_streams_.empty()) { - estimated_send_bitrate_kbps_counter_.ProcessAndPause(); - pacer_bitrate_kbps_counter_.ProcessAndPause(); - return; - } - - estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); - // Pacer bitrate may be higher than bitrate estimate if enforcing min - // bitrate. - uint32_t pacer_bitrate_bps = - std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); - pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); - })); + // Ignore updates if bitrate is zero (the aggregate network state is + // down) or if we're not sending video. + // Using |video_send_streams_empty_| is racy but as the caller can't + // reasonably expect synchronize with changes in |video_send_streams_| (being + // on |send_transport_sequence_checker|), we can avoid a PostTask this way. + if (target_bitrate_bps == 0 || + video_send_streams_empty_.load(std::memory_order_relaxed)) { + send_stats_.PauseSendAndPacerBitrateCounters(); + } else { + send_stats_.AddTargetBitrateSample(target_bitrate_bps); + } } void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { - RTC_DCHECK_RUN_ON(send_transport_queue_); + RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); transport_send_ptr_->SetAllocatedSendBitrateLimits(limits); - - worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() { - RTC_DCHECK_RUN_ON(worker_thread_); - min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); - configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps(); - })); + send_stats_.SetMinAllocatableRate(limits); + configured_max_padding_bitrate_bps_.store(limits.max_padding_rate.bps(), + std::memory_order_relaxed); } void Call::ConfigureSync(const std::string& sync_group) {