From e32b6228d35f63626aa934f8a538caaefe0e8031 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Tue, 9 May 2023 00:59:46 +0200 Subject: [PATCH] RtpTransportControllerSend::ProcessSentPacket: remove PostTask. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This CL removes a PostTask in response to packet receipt reception. This is made possible due to PacketRouter lock removal in https://webrtc-review.googlesource.com/c/src/+/300964. Depending on how transport code is organized, this may lead to possibility of packet receipts arriving in RtpTransportControllerSend which may re-enter the PacingController's ProcessPackets method, leading to out-of-order packet sends. Fix this by detecting re-entry and avoiding a second ProcessPackets call in the TaskQueuePacedSender. Bug: chromium:1373439 Change-Id: I24928f2d28a240d0860fe7e4a114cedf1f13d2bd Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/304580 Reviewed-by: Erik Språng Commit-Queue: Markus Handell Reviewed-by: Stefan Holmer Reviewed-by: Per Kjellander Cr-Commit-Position: refs/heads/main@{#40017} --- call/rtp_transport_controller_send.cc | 32 +++++--------------- call/rtp_transport_controller_send.h | 4 +-- media/engine/webrtc_video_engine.cc | 2 +- modules/pacing/task_queue_paced_sender.cc | 22 ++++++++++++-- modules/pacing/task_queue_paced_sender.h | 7 ++++- modules/rtp_rtcp/source/rtp_sender_egress.cc | 2 +- 6 files changed, 36 insertions(+), 33 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 5f3039065a..9e8117f9ce 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -372,25 +372,24 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { } void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { - // Normally called on the network thread ! - // TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and - // simplify task posting logic when the combined network/worker project + // Normally called on the network thread! + // TODO(crbug.com/1373439): Clarify other thread contexts calling in, + // and simplify task posting logic when the combined network/worker project // launches. if (TaskQueueBase::Current() != task_queue_) { task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() { RTC_DCHECK_RUN_ON(&sequence_checker_); - ProcessSentPacket(sent_packet, /*posted_to_worker=*/true); + ProcessSentPacket(sent_packet); })); return; } RTC_DCHECK_RUN_ON(&sequence_checker_); - ProcessSentPacket(sent_packet, /*posted_to_worker=*/false); + ProcessSentPacket(sent_packet); } void RtpTransportControllerSend::ProcessSentPacket( - const rtc::SentPacket& sent_packet, - bool posted_to_worker) { + const rtc::SentPacket& sent_packet) { RTC_DCHECK_RUN_ON(&sequence_checker_); absl::optional packet_msg = transport_feedback_adapter_.ProcessSentPacket(sent_packet); @@ -403,24 +402,7 @@ void RtpTransportControllerSend::ProcessSentPacket( control_update = controller_->OnSentPacket(*packet_msg); if (!congestion_update && !control_update.has_updates()) return; - if (posted_to_worker) { - ProcessSentPacketUpdates(std::move(control_update)); - } else { - // TODO(bugs.webrtc.org/137439): Aim to remove downstream locks to permit - // removing this PostTask. - // At least in test situations (and possibly in production environments), we - // may get here synchronously with locks taken in PacketRouter::SendPacket. - // Because the pacer may at times synchronously re-enter - // PacketRouter::SendPacket, we need to break the chain here and PostTask to - // get out of the lock. In testing, having updates to process happens pretty - // rarely so we do not usually get here. - task_queue_->PostTask( - SafeTask(safety_.flag(), - [this, control_update = std::move(control_update)]() mutable { - RTC_DCHECK_RUN_ON(&sequence_checker_); - ProcessSentPacketUpdates(std::move(control_update)); - })); - } + ProcessSentPacketUpdates(std::move(control_update)); } // RTC_RUN_ON(task_queue_) diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 02a7f524a4..b5134a523e 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -141,8 +141,8 @@ class RtpTransportControllerSend final void UpdateCongestedState() RTC_RUN_ON(sequence_checker_); absl::optional GetCongestedStateUpdate() const RTC_RUN_ON(sequence_checker_); - void ProcessSentPacket(const rtc::SentPacket& sent_packet, - bool posted_to_worker) RTC_RUN_ON(sequence_checker_); + void ProcessSentPacket(const rtc::SentPacket& sent_packet) + RTC_RUN_ON(sequence_checker_); void ProcessSentPacketUpdates(NetworkControlUpdate updates) RTC_RUN_ON(sequence_checker_); diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index 04056da7b9..5e6e3033a0 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -1774,7 +1774,7 @@ void WebRtcVideoChannel::OnPacketReceived( // depending on configuration set at object initialization. RTC_DCHECK_RUN_ON(&network_thread_checker_); - // TODO(bugs.webrtc.org/137439): Stop posting to the worker thread when the + // TODO(crbug.com/1373439): Stop posting to the worker thread when the // combined network/worker project launches. if (webrtc::TaskQueueBase::Current() != worker_thread_) { worker_thread_->PostTask( diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index b5dfdd466a..afa36ea88d 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -13,6 +13,7 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "api/task_queue/pending_task_safety_flag.h" #include "api/transport/network_types.h" #include "rtc_base/checks.h" @@ -83,7 +84,7 @@ void TaskQueuePacedSender::CreateProbeClusters( std::vector probe_cluster_configs) { RTC_DCHECK_RUN_ON(task_queue_); pacing_controller_.CreateProbeClusters(probe_cluster_configs); - MaybeProcessPackets(Timestamp::MinusInfinity()); + MaybeScheduleProcessPackets(); } void TaskQueuePacedSender::Pause() { @@ -100,14 +101,14 @@ void TaskQueuePacedSender::Resume() { void TaskQueuePacedSender::SetCongested(bool congested) { RTC_DCHECK_RUN_ON(task_queue_); pacing_controller_.SetCongested(congested); - MaybeProcessPackets(Timestamp::MinusInfinity()); + MaybeScheduleProcessPackets(); } void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) { RTC_DCHECK_RUN_ON(task_queue_); pacing_controller_.SetPacingRates(pacing_rate, padding_rate); - MaybeProcessPackets(Timestamp::MinusInfinity()); + MaybeScheduleProcessPackets(); } void TaskQueuePacedSender::EnqueuePackets( @@ -200,6 +201,12 @@ void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) { current_stats_ = stats; } +// RTC_RUN_ON(task_queue_) +void TaskQueuePacedSender::MaybeScheduleProcessPackets() { + if (!processing_packets_) + MaybeProcessPackets(Timestamp::MinusInfinity()); +} + void TaskQueuePacedSender::MaybeProcessPackets( Timestamp scheduled_process_time) { RTC_DCHECK_RUN_ON(task_queue_); @@ -211,6 +218,15 @@ void TaskQueuePacedSender::MaybeProcessPackets( return; } + // Protects against re-entry from transport feedback calling into the task + // queue pacer. + RTC_DCHECK(!processing_packets_); + processing_packets_ = true; + absl::Cleanup cleanup = [this] { + RTC_DCHECK_RUN_ON(task_queue_); + processing_packets_ = false; + }; + Timestamp next_send_time = pacing_controller_.NextSendTime(); RTC_DCHECK(next_send_time.IsFinite()); const Timestamp now = clock_->CurrentTime(); diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 7bb00572d0..fd71be1654 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -131,9 +131,12 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { void OnStatsUpdated(const Stats& stats); private: + // Call in response to state updates that could warrant sending out packets. + // Protected against re-entry from packet sent receipts. + void MaybeScheduleProcessPackets() RTC_RUN_ON(task_queue_); // Check if it is time to send packets, or schedule a delayed task if not. // Use Timestamp::MinusInfinity() to indicate that this call has _not_ - // been scheduled by the pacing controller. If this is the case, check if + // been scheduled by the pacing controller. If this is the case, check if we // can execute immediately otherwise schedule a delay task that calls this // method again with desired (finite) scheduled process time. void MaybeProcessPackets(Timestamp scheduled_process_time); @@ -180,6 +183,8 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { bool include_overhead_ RTC_GUARDED_BY(task_queue_); Stats current_stats_ RTC_GUARDED_BY(task_queue_); + // Protects against ProcessPackets reentry from packet sent receipts. + bool processing_packets_ RTC_GUARDED_BY(task_queue_) = false; ScopedTaskSafety safety_; TaskQueueBase* task_queue_; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc index fdc2792793..22da99b786 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc @@ -297,7 +297,7 @@ void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, RtpPacketMediaType packet_type = *packet->packet_type(); RtpPacketCounter counter(*packet); size_t size = packet->size(); - // TODO(bugs.webrtc.org/137439): clean up task posting when the combined + // TODO(crbug.com/1373439): clean up task posting when the combined // network/worker project launches. if (TaskQueueBase::Current() != worker_queue_) { worker_queue_->PostTask(SafeTask(