From eb277527f039fbe2a301820430e10f111ed0b9e4 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Fri, 3 Mar 2023 16:38:10 +0100 Subject: [PATCH] Stop Posting tasks when we don't need to. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under the combined network/worker thread project, tasks are unnecessarily posted to the same thread. This CL reaps 90% overhead savings in sent packet feedback as measured with Perfetto under a 49p Meet call. The identity of the posted calls was uncovered with WebRTC/Chrome's new location-aware tracing. TESTED=presubmit + local Meet calls. Bug: chromium:1373439 Change-Id: I0c43aa4de884831838747d52b21c45fd360106e8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295780 Reviewed-by: Tomas Gunnarsson Commit-Queue: Markus Handell Reviewed-by: Harald Alvestrand Reviewed-by: Erik Språng Cr-Commit-Position: refs/heads/main@{#39484} --- api/transport/network_types.h | 6 ++ call/rtp_transport_controller_send.cc | 101 ++++++++++++++++++++------ call/rtp_transport_controller_send.h | 5 ++ 3 files changed, 88 insertions(+), 24 deletions(-) diff --git a/api/transport/network_types.h b/api/transport/network_types.h index 29a7cf7705..a62c350474 100644 --- a/api/transport/network_types.h +++ b/api/transport/network_types.h @@ -233,6 +233,12 @@ struct NetworkControlUpdate { NetworkControlUpdate(); NetworkControlUpdate(const NetworkControlUpdate&); ~NetworkControlUpdate(); + + bool has_updates() const { + return congestion_window.has_value() || pacer_config.has_value() || + !probe_cluster_configs.empty() || target_rate.has_value(); + } + absl::optional congestion_window; absl::optional pacer_config; std::vector probe_cluster_configs; diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 0b467b733a..2d871ceacd 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -181,12 +181,19 @@ void RtpTransportControllerSend::UpdateControlState() { } void RtpTransportControllerSend::UpdateCongestedState() { + if (auto update = GetCongestedStateUpdate()) { + is_congested_ = update.value(); + pacer_.SetCongested(update.value()); + } +} + +absl::optional RtpTransportControllerSend::GetCongestedStateUpdate() + const { bool congested = transport_feedback_adapter_.GetOutstandingData() >= congestion_window_size_; - if (congested != is_congested_) { - is_congested_ = congested; - pacer_.SetCongested(congested); - } + if (congested != is_congested_) + return congested; + return absl::nullopt; } MaybeWorkerThread* RtpTransportControllerSend::GetWorkerQueue() { @@ -392,27 +399,73 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { // Normally called on the network thread ! + // TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and + // simplify task posting logic when the combined network/worker project + // launches. + if (TaskQueueBase::Current() != task_queue_.TaskQueueForPost()) { + // We can't use SafeTask here if we are using an owned task queue, because + // the safety flag will be destroyed when RtpTransportControllerSend is + // destroyed on the worker thread. But we must use SafeTask if we are using + // the worker thread, since the worker thread outlives + // RtpTransportControllerSend. + task_queue_.TaskQueueForPost()->PostTask( + task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() { + RTC_DCHECK_RUN_ON(&task_queue_); + ProcessSentPacket(sent_packet, /*posted_to_worker=*/true); + })); + return; + } - // We can not use SafeTask here if we are using an owned task queue, because - // the safety flag will be destroyed when RtpTransportControllerSend is - // destroyed on the worker thread. But we must use SafeTask if we are using - // the worker thread, since the worker thread outlive - // RtpTransportControllerSend. - task_queue_.TaskQueueForPost()->PostTask( - task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() { - RTC_DCHECK_RUN_ON(&task_queue_); - absl::optional packet_msg = - transport_feedback_adapter_.ProcessSentPacket(sent_packet); - if (packet_msg) { - // Only update outstanding data if: - // 1. Packet feedback is used. - // 2. The packet has not yet received an acknowledgement. - // 3. It is not a retransmission of an earlier packet. - UpdateCongestedState(); - if (controller_) - PostUpdates(controller_->OnSentPacket(*packet_msg)); - } - })); + RTC_DCHECK_RUN_ON(&task_queue_); + ProcessSentPacket(sent_packet, /*posted_to_worker=*/false); +} + +// RTC_RUN_ON(task_queue_) +void RtpTransportControllerSend::ProcessSentPacket( + const rtc::SentPacket& sent_packet, + bool posted_to_worker) { + absl::optional packet_msg = + transport_feedback_adapter_.ProcessSentPacket(sent_packet); + if (!packet_msg) + return; + + auto congestion_update = GetCongestedStateUpdate(); + NetworkControlUpdate control_update; + if (controller_) + control_update = controller_->OnSentPacket(*packet_msg); + if (!congestion_update && !control_update.has_updates()) + return; + if (posted_to_worker) { + ProcessSentPacketUpdates(std::move(control_update)); + } else { + // TODO(bugs.webrtc.org/137439): Aim to remove downstream locks to permit + // removing this PostTask. + // At least in test situations (and possibly in production environments), we + // may get here synchronously with locks taken in PacketRouter::SendPacket. + // Because the pacer may at times synchronously re-enter + // PacketRouter::SendPacket, we need to break the chain here and PostTask to + // get out of the lock. In testing, having updates to process happens pretty + // rarely so we do not usually get here. + task_queue_.TaskQueueForPost()->PostTask(task_queue_.MaybeSafeTask( + safety_.flag(), + [this, control_update = std::move(control_update)]() mutable { + RTC_DCHECK_RUN_ON(&task_queue_); + ProcessSentPacketUpdates(std::move(control_update)); + })); + } +} + +// RTC_RUN_ON(task_queue_) +void RtpTransportControllerSend::ProcessSentPacketUpdates( + NetworkControlUpdate updates) { + // Only update outstanding data if: + // 1. Packet feedback is used. + // 2. The packet has not yet received an acknowledgement. + // 3. It is not a retransmission of an earlier packet. + UpdateCongestedState(); + if (controller_) { + PostUpdates(std::move(updates)); + } } void RtpTransportControllerSend::OnReceivedPacket( diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 9b41a0a413..eaac55f829 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -141,6 +141,11 @@ class RtpTransportControllerSend final void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_); void UpdateControlState() RTC_RUN_ON(task_queue_); void UpdateCongestedState() RTC_RUN_ON(task_queue_); + absl::optional GetCongestedStateUpdate() const RTC_RUN_ON(task_queue_); + void ProcessSentPacket(const rtc::SentPacket& sent_packet, + bool posted_to_worker) RTC_RUN_ON(task_queue_); + void ProcessSentPacketUpdates(NetworkControlUpdate updates) + RTC_RUN_ON(task_queue_); Clock* const clock_; RtcEventLog* const event_log_;