diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 5f3039065a..9e8117f9ce 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -372,25 +372,24 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { } void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { - // Normally called on the network thread ! - // TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and - // simplify task posting logic when the combined network/worker project + // Normally called on the network thread! + // TODO(crbug.com/1373439): Clarify other thread contexts calling in, + // and simplify task posting logic when the combined network/worker project // launches. if (TaskQueueBase::Current() != task_queue_) { task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() { RTC_DCHECK_RUN_ON(&sequence_checker_); - ProcessSentPacket(sent_packet, /*posted_to_worker=*/true); + ProcessSentPacket(sent_packet); })); return; } RTC_DCHECK_RUN_ON(&sequence_checker_); - ProcessSentPacket(sent_packet, /*posted_to_worker=*/false); + ProcessSentPacket(sent_packet); } void RtpTransportControllerSend::ProcessSentPacket( - const rtc::SentPacket& sent_packet, - bool posted_to_worker) { + const rtc::SentPacket& sent_packet) { RTC_DCHECK_RUN_ON(&sequence_checker_); absl::optional packet_msg = transport_feedback_adapter_.ProcessSentPacket(sent_packet); @@ -403,24 +402,7 @@ void RtpTransportControllerSend::ProcessSentPacket( control_update = controller_->OnSentPacket(*packet_msg); if (!congestion_update && !control_update.has_updates()) return; - if (posted_to_worker) { - ProcessSentPacketUpdates(std::move(control_update)); - } else { - // TODO(bugs.webrtc.org/137439): Aim to remove downstream locks to permit - // removing this PostTask. - // At least in test situations (and possibly in production environments), we - // may get here synchronously with locks taken in PacketRouter::SendPacket. - // Because the pacer may at times synchronously re-enter - // PacketRouter::SendPacket, we need to break the chain here and PostTask to - // get out of the lock. In testing, having updates to process happens pretty - // rarely so we do not usually get here. - task_queue_->PostTask( - SafeTask(safety_.flag(), - [this, control_update = std::move(control_update)]() mutable { - RTC_DCHECK_RUN_ON(&sequence_checker_); - ProcessSentPacketUpdates(std::move(control_update)); - })); - } + ProcessSentPacketUpdates(std::move(control_update)); } // RTC_RUN_ON(task_queue_) diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 02a7f524a4..b5134a523e 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -141,8 +141,8 @@ class RtpTransportControllerSend final void UpdateCongestedState() RTC_RUN_ON(sequence_checker_); absl::optional GetCongestedStateUpdate() const RTC_RUN_ON(sequence_checker_); - void ProcessSentPacket(const rtc::SentPacket& sent_packet, - bool posted_to_worker) RTC_RUN_ON(sequence_checker_); + void ProcessSentPacket(const rtc::SentPacket& sent_packet) + RTC_RUN_ON(sequence_checker_); void ProcessSentPacketUpdates(NetworkControlUpdate updates) RTC_RUN_ON(sequence_checker_); diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index 04056da7b9..5e6e3033a0 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -1774,7 +1774,7 @@ void WebRtcVideoChannel::OnPacketReceived( // depending on configuration set at object initialization. RTC_DCHECK_RUN_ON(&network_thread_checker_); - // TODO(bugs.webrtc.org/137439): Stop posting to the worker thread when the + // TODO(crbug.com/1373439): Stop posting to the worker thread when the // combined network/worker project launches. if (webrtc::TaskQueueBase::Current() != worker_thread_) { worker_thread_->PostTask( diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index b5dfdd466a..afa36ea88d 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -13,6 +13,7 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "api/task_queue/pending_task_safety_flag.h" #include "api/transport/network_types.h" #include "rtc_base/checks.h" @@ -83,7 +84,7 @@ void TaskQueuePacedSender::CreateProbeClusters( std::vector probe_cluster_configs) { RTC_DCHECK_RUN_ON(task_queue_); pacing_controller_.CreateProbeClusters(probe_cluster_configs); - MaybeProcessPackets(Timestamp::MinusInfinity()); + MaybeScheduleProcessPackets(); } void TaskQueuePacedSender::Pause() { @@ -100,14 +101,14 @@ void TaskQueuePacedSender::Resume() { void TaskQueuePacedSender::SetCongested(bool congested) { RTC_DCHECK_RUN_ON(task_queue_); pacing_controller_.SetCongested(congested); - MaybeProcessPackets(Timestamp::MinusInfinity()); + MaybeScheduleProcessPackets(); } void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) { RTC_DCHECK_RUN_ON(task_queue_); pacing_controller_.SetPacingRates(pacing_rate, padding_rate); - MaybeProcessPackets(Timestamp::MinusInfinity()); + MaybeScheduleProcessPackets(); } void TaskQueuePacedSender::EnqueuePackets( @@ -200,6 +201,12 @@ void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) { current_stats_ = stats; } +// RTC_RUN_ON(task_queue_) +void TaskQueuePacedSender::MaybeScheduleProcessPackets() { + if (!processing_packets_) + MaybeProcessPackets(Timestamp::MinusInfinity()); +} + void TaskQueuePacedSender::MaybeProcessPackets( Timestamp scheduled_process_time) { RTC_DCHECK_RUN_ON(task_queue_); @@ -211,6 +218,15 @@ void TaskQueuePacedSender::MaybeProcessPackets( return; } + // Protects against re-entry from transport feedback calling into the task + // queue pacer. + RTC_DCHECK(!processing_packets_); + processing_packets_ = true; + absl::Cleanup cleanup = [this] { + RTC_DCHECK_RUN_ON(task_queue_); + processing_packets_ = false; + }; + Timestamp next_send_time = pacing_controller_.NextSendTime(); RTC_DCHECK(next_send_time.IsFinite()); const Timestamp now = clock_->CurrentTime(); diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 7bb00572d0..fd71be1654 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -131,9 +131,12 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { void OnStatsUpdated(const Stats& stats); private: + // Call in response to state updates that could warrant sending out packets. + // Protected against re-entry from packet sent receipts. + void MaybeScheduleProcessPackets() RTC_RUN_ON(task_queue_); // Check if it is time to send packets, or schedule a delayed task if not. // Use Timestamp::MinusInfinity() to indicate that this call has _not_ - // been scheduled by the pacing controller. If this is the case, check if + // been scheduled by the pacing controller. If this is the case, check if we // can execute immediately otherwise schedule a delay task that calls this // method again with desired (finite) scheduled process time. void MaybeProcessPackets(Timestamp scheduled_process_time); @@ -180,6 +183,8 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { bool include_overhead_ RTC_GUARDED_BY(task_queue_); Stats current_stats_ RTC_GUARDED_BY(task_queue_); + // Protects against ProcessPackets reentry from packet sent receipts. + bool processing_packets_ RTC_GUARDED_BY(task_queue_) = false; ScopedTaskSafety safety_; TaskQueueBase* task_queue_; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc index fdc2792793..22da99b786 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc @@ -297,7 +297,7 @@ void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, RtpPacketMediaType packet_type = *packet->packet_type(); RtpPacketCounter counter(*packet); size_t size = packet->size(); - // TODO(bugs.webrtc.org/137439): clean up task posting when the combined + // TODO(crbug.com/1373439): clean up task posting when the combined // network/worker project launches. if (TaskQueueBase::Current() != worker_queue_) { worker_queue_->PostTask(SafeTask(