RtpTransportControllerSend::ProcessSentPacket: remove PostTask.
This CL removes a PostTask in response to packet receipt reception. This is made possible due to PacketRouter lock removal in https://webrtc-review.googlesource.com/c/src/+/300964. Depending on how transport code is organized, this may lead to possibility of packet receipts arriving in RtpTransportControllerSend which may re-enter the PacingController's ProcessPackets method, leading to out-of-order packet sends. Fix this by detecting re-entry and avoiding a second ProcessPackets call in the TaskQueuePacedSender. Bug: chromium:1373439 Change-Id: I24928f2d28a240d0860fe7e4a114cedf1f13d2bd Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/304580 Reviewed-by: Erik Språng <sprang@webrtc.org> Commit-Queue: Markus Handell <handellm@webrtc.org> Reviewed-by: Stefan Holmer <stefan@webrtc.org> Reviewed-by: Per Kjellander <perkj@webrtc.org> Cr-Commit-Position: refs/heads/main@{#40017}
This commit is contained in:
parent
9be593f340
commit
e32b6228d3
@ -372,25 +372,24 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
|
||||
}
|
||||
void RtpTransportControllerSend::OnSentPacket(
|
||||
const rtc::SentPacket& sent_packet) {
|
||||
// Normally called on the network thread !
|
||||
// TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and
|
||||
// simplify task posting logic when the combined network/worker project
|
||||
// Normally called on the network thread!
|
||||
// TODO(crbug.com/1373439): Clarify other thread contexts calling in,
|
||||
// and simplify task posting logic when the combined network/worker project
|
||||
// launches.
|
||||
if (TaskQueueBase::Current() != task_queue_) {
|
||||
task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() {
|
||||
RTC_DCHECK_RUN_ON(&sequence_checker_);
|
||||
ProcessSentPacket(sent_packet, /*posted_to_worker=*/true);
|
||||
ProcessSentPacket(sent_packet);
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
RTC_DCHECK_RUN_ON(&sequence_checker_);
|
||||
ProcessSentPacket(sent_packet, /*posted_to_worker=*/false);
|
||||
ProcessSentPacket(sent_packet);
|
||||
}
|
||||
|
||||
void RtpTransportControllerSend::ProcessSentPacket(
|
||||
const rtc::SentPacket& sent_packet,
|
||||
bool posted_to_worker) {
|
||||
const rtc::SentPacket& sent_packet) {
|
||||
RTC_DCHECK_RUN_ON(&sequence_checker_);
|
||||
absl::optional<SentPacket> packet_msg =
|
||||
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
|
||||
@ -403,24 +402,7 @@ void RtpTransportControllerSend::ProcessSentPacket(
|
||||
control_update = controller_->OnSentPacket(*packet_msg);
|
||||
if (!congestion_update && !control_update.has_updates())
|
||||
return;
|
||||
if (posted_to_worker) {
|
||||
ProcessSentPacketUpdates(std::move(control_update));
|
||||
} else {
|
||||
// TODO(bugs.webrtc.org/137439): Aim to remove downstream locks to permit
|
||||
// removing this PostTask.
|
||||
// At least in test situations (and possibly in production environments), we
|
||||
// may get here synchronously with locks taken in PacketRouter::SendPacket.
|
||||
// Because the pacer may at times synchronously re-enter
|
||||
// PacketRouter::SendPacket, we need to break the chain here and PostTask to
|
||||
// get out of the lock. In testing, having updates to process happens pretty
|
||||
// rarely so we do not usually get here.
|
||||
task_queue_->PostTask(
|
||||
SafeTask(safety_.flag(),
|
||||
[this, control_update = std::move(control_update)]() mutable {
|
||||
RTC_DCHECK_RUN_ON(&sequence_checker_);
|
||||
ProcessSentPacketUpdates(std::move(control_update));
|
||||
}));
|
||||
}
|
||||
ProcessSentPacketUpdates(std::move(control_update));
|
||||
}
|
||||
|
||||
// RTC_RUN_ON(task_queue_)
|
||||
|
||||
@ -141,8 +141,8 @@ class RtpTransportControllerSend final
|
||||
void UpdateCongestedState() RTC_RUN_ON(sequence_checker_);
|
||||
absl::optional<bool> GetCongestedStateUpdate() const
|
||||
RTC_RUN_ON(sequence_checker_);
|
||||
void ProcessSentPacket(const rtc::SentPacket& sent_packet,
|
||||
bool posted_to_worker) RTC_RUN_ON(sequence_checker_);
|
||||
void ProcessSentPacket(const rtc::SentPacket& sent_packet)
|
||||
RTC_RUN_ON(sequence_checker_);
|
||||
void ProcessSentPacketUpdates(NetworkControlUpdate updates)
|
||||
RTC_RUN_ON(sequence_checker_);
|
||||
|
||||
|
||||
@ -1774,7 +1774,7 @@ void WebRtcVideoChannel::OnPacketReceived(
|
||||
// depending on configuration set at object initialization.
|
||||
RTC_DCHECK_RUN_ON(&network_thread_checker_);
|
||||
|
||||
// TODO(bugs.webrtc.org/137439): Stop posting to the worker thread when the
|
||||
// TODO(crbug.com/1373439): Stop posting to the worker thread when the
|
||||
// combined network/worker project launches.
|
||||
if (webrtc::TaskQueueBase::Current() != worker_thread_) {
|
||||
worker_thread_->PostTask(
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
#include <algorithm>
|
||||
#include <utility>
|
||||
|
||||
#include "absl/cleanup/cleanup.h"
|
||||
#include "api/task_queue/pending_task_safety_flag.h"
|
||||
#include "api/transport/network_types.h"
|
||||
#include "rtc_base/checks.h"
|
||||
@ -83,7 +84,7 @@ void TaskQueuePacedSender::CreateProbeClusters(
|
||||
std::vector<ProbeClusterConfig> probe_cluster_configs) {
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
pacing_controller_.CreateProbeClusters(probe_cluster_configs);
|
||||
MaybeProcessPackets(Timestamp::MinusInfinity());
|
||||
MaybeScheduleProcessPackets();
|
||||
}
|
||||
|
||||
void TaskQueuePacedSender::Pause() {
|
||||
@ -100,14 +101,14 @@ void TaskQueuePacedSender::Resume() {
|
||||
void TaskQueuePacedSender::SetCongested(bool congested) {
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
pacing_controller_.SetCongested(congested);
|
||||
MaybeProcessPackets(Timestamp::MinusInfinity());
|
||||
MaybeScheduleProcessPackets();
|
||||
}
|
||||
|
||||
void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
|
||||
DataRate padding_rate) {
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
|
||||
MaybeProcessPackets(Timestamp::MinusInfinity());
|
||||
MaybeScheduleProcessPackets();
|
||||
}
|
||||
|
||||
void TaskQueuePacedSender::EnqueuePackets(
|
||||
@ -200,6 +201,12 @@ void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
|
||||
current_stats_ = stats;
|
||||
}
|
||||
|
||||
// RTC_RUN_ON(task_queue_)
|
||||
void TaskQueuePacedSender::MaybeScheduleProcessPackets() {
|
||||
if (!processing_packets_)
|
||||
MaybeProcessPackets(Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
void TaskQueuePacedSender::MaybeProcessPackets(
|
||||
Timestamp scheduled_process_time) {
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
@ -211,6 +218,15 @@ void TaskQueuePacedSender::MaybeProcessPackets(
|
||||
return;
|
||||
}
|
||||
|
||||
// Protects against re-entry from transport feedback calling into the task
|
||||
// queue pacer.
|
||||
RTC_DCHECK(!processing_packets_);
|
||||
processing_packets_ = true;
|
||||
absl::Cleanup cleanup = [this] {
|
||||
RTC_DCHECK_RUN_ON(task_queue_);
|
||||
processing_packets_ = false;
|
||||
};
|
||||
|
||||
Timestamp next_send_time = pacing_controller_.NextSendTime();
|
||||
RTC_DCHECK(next_send_time.IsFinite());
|
||||
const Timestamp now = clock_->CurrentTime();
|
||||
|
||||
@ -131,9 +131,12 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
|
||||
void OnStatsUpdated(const Stats& stats);
|
||||
|
||||
private:
|
||||
// Call in response to state updates that could warrant sending out packets.
|
||||
// Protected against re-entry from packet sent receipts.
|
||||
void MaybeScheduleProcessPackets() RTC_RUN_ON(task_queue_);
|
||||
// Check if it is time to send packets, or schedule a delayed task if not.
|
||||
// Use Timestamp::MinusInfinity() to indicate that this call has _not_
|
||||
// been scheduled by the pacing controller. If this is the case, check if
|
||||
// been scheduled by the pacing controller. If this is the case, check if we
|
||||
// can execute immediately otherwise schedule a delay task that calls this
|
||||
// method again with desired (finite) scheduled process time.
|
||||
void MaybeProcessPackets(Timestamp scheduled_process_time);
|
||||
@ -180,6 +183,8 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
|
||||
bool include_overhead_ RTC_GUARDED_BY(task_queue_);
|
||||
|
||||
Stats current_stats_ RTC_GUARDED_BY(task_queue_);
|
||||
// Protects against ProcessPackets reentry from packet sent receipts.
|
||||
bool processing_packets_ RTC_GUARDED_BY(task_queue_) = false;
|
||||
|
||||
ScopedTaskSafety safety_;
|
||||
TaskQueueBase* task_queue_;
|
||||
|
||||
@ -297,7 +297,7 @@ void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet,
|
||||
RtpPacketMediaType packet_type = *packet->packet_type();
|
||||
RtpPacketCounter counter(*packet);
|
||||
size_t size = packet->size();
|
||||
// TODO(bugs.webrtc.org/137439): clean up task posting when the combined
|
||||
// TODO(crbug.com/1373439): clean up task posting when the combined
|
||||
// network/worker project launches.
|
||||
if (TaskQueueBase::Current() != worker_queue_) {
|
||||
worker_queue_->PostTask(SafeTask(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user