diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index f1a6201ce7..8ee347b74a 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -79,7 +79,8 @@ RtpTransportControllerSend::PacerSettings::PacerSettings( const WebRtcKeyValueConfig* trials) : tq_disabled("Disabled"), holdback_window("holdback_window", PacingController::kMinSleepTime), - holdback_packets("holdback_packets", -1) { + holdback_packets("holdback_packets", + TaskQueuePacedSender::kNoPacketHoldback) { ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets}, trials->Lookup("WebRTC-TaskQueuePacer")); } diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 9215462239..4fb33ff072 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -38,11 +38,6 @@ constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2); // time. Applies only to periodic mode. constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis(30); -// Allow probes to be processed slightly ahead of inteded send time. Currently -// set to 1ms as this is intended to allow times be rounded down to the nearest -// millisecond. -constexpr TimeDelta kMaxEarlyProbeProcessing = TimeDelta::Millis(1); - constexpr int kFirstPriority = 0; bool IsDisabled(const WebRtcKeyValueConfig& field_trials, @@ -94,6 +89,8 @@ const float PacingController::kDefaultPaceMultiplier = 2.5f; const TimeDelta PacingController::kPausedProcessInterval = kCongestedPacketInterval; const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1); +const TimeDelta PacingController::kMaxEarlyProbeProcessing = + TimeDelta::Millis(1); PacingController::PacingController(Clock* clock, PacketSender* packet_sender, @@ -133,7 +130,7 @@ PacingController::PacingController(Clock* clock, packet_counter_(0), congestion_window_size_(DataSize::PlusInfinity()), outstanding_data_(DataSize::Zero()), - queue_time_limit(kMaxExpectedQueueLength), + queue_time_limit_(kMaxExpectedQueueLength), account_for_audio_(false), include_overhead_(false) { if (!drain_large_queues_) { @@ -224,6 +221,7 @@ void PacingController::SetPacingRates(DataRate pacing_rate, media_rate_ = pacing_rate; padding_rate_ = padding_rate; pacing_bitrate_ = pacing_rate; + media_budget_.set_target_rate_kbps(pacing_rate.kbps()); padding_budget_.set_target_rate_kbps(padding_rate.kbps()); RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" @@ -302,10 +300,7 @@ void PacingController::EnqueuePacketInternal( // Use that as last process time only if it's prior to now. target_process_time = std::min(now, next_send_time); } - - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_process_time); - UpdateBudgetWithElapsedTime(elapsed_time); - last_process_time_ = target_process_time; + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time)); } packet_queue_.Push(priority, now, packet_counter_++, std::move(packet)); } @@ -316,7 +311,6 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { if (last_process_time_.IsMinusInfinity() || now < last_process_time_) { return TimeDelta::Zero(); } - RTC_DCHECK_GE(now, last_process_time_); TimeDelta elapsed_time = now - last_process_time_; last_process_time_ = now; if (elapsed_time > kMaxElapsedTime) { @@ -333,8 +327,7 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const { packet_counter_ == 0) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. - TimeDelta elapsed_since_last_send = now - last_send_time_; - if (elapsed_since_last_send >= kCongestedPacketInterval) { + if (now - last_send_time_ >= kCongestedPacketInterval) { return true; } } @@ -343,17 +336,17 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const { Timestamp PacingController::NextSendTime() const { const Timestamp now = CurrentTime(); + Timestamp next_send_time = Timestamp::PlusInfinity(); if (paused_) { return last_send_time_ + kPausedProcessInterval; } // If probing is active, that always takes priority. - if (prober_.is_probing()) { + if (prober_.is_probing() && !probing_send_failure_) { Timestamp probe_time = prober_.NextProbeTime(now); - // `probe_time` == PlusInfinity indicates no probe scheduled. - if (probe_time != Timestamp::PlusInfinity() && !probing_send_failure_) { - return probe_time; + if (!probe_time.IsPlusInfinity()) { + return probe_time.IsMinusInfinity() ? now : probe_time; } } @@ -365,86 +358,53 @@ Timestamp PacingController::NextSendTime() const { // In dynamic mode, figure out when the next packet should be sent, // given the current conditions. - if (!pace_audio_) { - // Not pacing audio, if leading packet is audio its target send - // time is the time at which it was enqueued. - absl::optional audio_enqueue_time = - packet_queue_.LeadingAudioPacketEnqueueTime(); - if (audio_enqueue_time.has_value()) { - return *audio_enqueue_time; - } + // Not pacing audio, if leading packet is audio its target send + // time is the time at which it was enqueued. + absl::optional unpaced_audio_time = + pace_audio_ ? absl::nullopt + : packet_queue_.LeadingAudioPacketEnqueueTime(); + if (unpaced_audio_time) { + return *unpaced_audio_time; } + // We need to at least send keep-alive packets with some interval. if (Congested() || packet_counter_ == 0) { - // We need to at least send keep-alive packets with some interval. return last_send_time_ + kCongestedPacketInterval; } - // Check how long until we can send the next media packet. if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { - return std::min(last_send_time_ + kPausedProcessInterval, - last_process_time_ + media_debt_ / media_rate_); - } - - // If we _don't_ have pending packets, check how long until we have - // bandwidth for padding packets. Both media and padding debts must - // have been drained to do this. - if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { + // Check how long until we can send the next media packet. + next_send_time = last_process_time_ + media_debt_ / media_rate_; + } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { + // If we _don't_ have pending packets, check how long until we have + // bandwidth for padding packets. Both media and padding debts must + // have been drained to do this. + RTC_DCHECK_GT(media_rate_, DataRate::Zero()); TimeDelta drain_time = std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_); - return std::min(last_send_time_ + kPausedProcessInterval, - last_process_time_ + drain_time); + next_send_time = last_process_time_ + drain_time; + } else { + // Nothing to do. + next_send_time = last_process_time_ + kPausedProcessInterval; } if (send_padding_if_silent_) { - return last_send_time_ + kPausedProcessInterval; + next_send_time = + std::min(next_send_time, last_send_time_ + kPausedProcessInterval); } - return last_process_time_ + kPausedProcessInterval; + + return next_send_time; } void PacingController::ProcessPackets() { Timestamp now = CurrentTime(); Timestamp target_send_time = now; - if (mode_ == ProcessMode::kDynamic) { - target_send_time = NextSendTime(); - TimeDelta early_execute_margin = - prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); - if (target_send_time.IsMinusInfinity()) { - target_send_time = now; - } else if (now < target_send_time - early_execute_margin) { - // We are too early, but if queue is empty still allow draining some debt. - // Probing is allowed to be sent up to kMinSleepTime early. - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); - UpdateBudgetWithElapsedTime(elapsed_time); - return; - } - - if (target_send_time < last_process_time_) { - // After the last process call, at time X, the target send time - // shifted to be earlier than X. This should normally not happen - // but we want to make sure rounding errors or erratic behavior - // of NextSendTime() does not cause issue. In particular, if the - // buffer reduction of - // rate * (target_send_time - previous_process_time) - // in the main loop doesn't clean up the existing debt we may not - // be able to send again. We don't want to check this reordering - // there as it is the normal exit condtion when the buffer is - // exhausted and there are packets in the queue. - UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time); - target_send_time = last_process_time_; - } - } - - Timestamp previous_process_time = last_process_time_; - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); if (ShouldSendKeepalive(now)) { + DataSize keepalive_data_sent = DataSize::Zero(); // We can not send padding unless a normal packet has first been sent. If // we do, timestamps get messed up. - if (packet_counter_ == 0) { - last_send_time_ = now; - } else { - DataSize keepalive_data_sent = DataSize::Zero(); + if (packet_counter_ > 0) { std::vector> keepalive_packets = packet_sender_->GeneratePadding(DataSize::Bytes(1)); for (auto& packet : keepalive_packets) { @@ -455,14 +415,29 @@ void PacingController::ProcessPackets() { EnqueuePacket(std::move(packet)); } } - OnPaddingSent(keepalive_data_sent); } + OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now); } if (paused_) { return; } + if (mode_ == ProcessMode::kDynamic) { + TimeDelta early_execute_margin = + prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); + + target_send_time = NextSendTime(); + if (now + early_execute_margin < target_send_time) { + // We are too early, but if queue is empty still allow draining some debt. + // Probing is allowed to be sent up to kMinSleepTime early. + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now)); + return; + } + } + + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time); + if (elapsed_time > TimeDelta::Zero()) { DataRate target_rate = pacing_bitrate_; DataSize queue_size_data = packet_queue_.Size(); @@ -474,7 +449,7 @@ void PacingController::ProcessPackets() { if (drain_large_queues_) { TimeDelta avg_time_left = std::max(TimeDelta::Millis(1), - queue_time_limit - packet_queue_.AverageQueueTime()); + queue_time_limit_ - packet_queue_.AverageQueueTime()); DataRate min_rate_needed = queue_size_data / avg_time_left; if (min_rate_needed > target_rate) { target_rate = min_rate_needed; @@ -489,13 +464,12 @@ void PacingController::ProcessPackets() { // up to (process interval duration) * (target rate), so we only need to // update it once before the packet sending loop. media_budget_.set_target_rate_kbps(target_rate.kbps()); - UpdateBudgetWithElapsedTime(elapsed_time); } else { media_rate_ = target_rate; } + UpdateBudgetWithElapsedTime(elapsed_time); } - bool first_packet_in_probe = false; PacedPacketInfo pacing_info; DataSize recommended_probe_size = DataSize::Zero(); bool is_probing = prober_.is_probing(); @@ -504,9 +478,23 @@ void PacingController::ProcessPackets() { // use actual send time rather than target. pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo()); if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) { - first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0; recommended_probe_size = prober_.RecommendedMinProbeSize(); RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero()); + + // If first packet in probe, insert a small padding packet so we have a + // more reliable start window for the rate estimation. + if (pacing_info.probe_cluster_bytes_sent == 0) { + auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1)); + // If no RTP modules sending media are registered, we may not get a + // padding packet back. + if (!padding.empty()) { + // Insert with high priority so larger media packets don't preempt it. + EnqueuePacketInternal(std::move(padding[0]), kFirstPriority); + // We should never get more than one padding packets with a requested + // size of 1 byte. + RTC_DCHECK_EQ(padding.size(), 1u); + } + } } else { // No valid probe cluster returned, probe might have timed out. is_probing = false; @@ -514,102 +502,74 @@ void PacingController::ProcessPackets() { } DataSize data_sent = DataSize::Zero(); - - // The paused state is checked in the loop since it leaves the critical - // section allowing the paused state to be changed from other code. - while (!paused_) { - if (first_packet_in_probe) { - // If first packet in probe, insert a small padding packet so we have a - // more reliable start window for the rate estimation. - auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1)); - // If no RTP modules sending media are registered, we may not get a - // padding packet back. - if (!padding.empty()) { - // Insert with high priority so larger media packets don't preempt it. - EnqueuePacketInternal(std::move(padding[0]), kFirstPriority); - // We should never get more than one padding packets with a requested - // size of 1 byte. - RTC_DCHECK_EQ(padding.size(), 1u); - } - first_packet_in_probe = false; - } - - if (mode_ == ProcessMode::kDynamic && - previous_process_time < target_send_time) { - // Reduce buffer levels with amount corresponding to time between last - // process and target send time for the next packet. - // If the process call is late, that may be the time between the optimal - // send times for two packets we should already have sent. - UpdateBudgetWithElapsedTime(target_send_time - previous_process_time); - previous_process_time = target_send_time; - } - - // Fetch the next packet, so long as queue is not empty or budget is not + while (true) { + // Fetch packet, so long as queue is not empty or budget is not // exhausted. std::unique_ptr rtp_packet = GetPendingPacket(pacing_info, target_send_time, now); - if (rtp_packet == nullptr) { // No packet available to send, check if we should send padding. DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent); if (padding_to_add > DataSize::Zero()) { std::vector> padding_packets = packet_sender_->GeneratePadding(padding_to_add); - if (padding_packets.empty()) { - // No padding packets were generated, quite send loop. - break; + if (!padding_packets.empty()) { + for (auto& packet : padding_packets) { + EnqueuePacket(std::move(packet)); + } + // Continue loop to send the padding that was just added. + continue; + } else { + // Can't generate padding, still update padding budget for next send + // time. + UpdatePaddingBudgetWithSentData(padding_to_add); } - for (auto& packet : padding_packets) { - EnqueuePacket(std::move(packet)); - } - // Continue loop to send the padding that was just added. - continue; } - // Can't fetch new packet and no padding to send, exit send loop. break; - } + } else { + RTC_DCHECK(rtp_packet); + RTC_DCHECK(rtp_packet->packet_type().has_value()); + const RtpPacketMediaType packet_type = *rtp_packet->packet_type(); + DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() + + rtp_packet->padding_size()); - RTC_DCHECK(rtp_packet); - RTC_DCHECK(rtp_packet->packet_type().has_value()); - const RtpPacketMediaType packet_type = *rtp_packet->packet_type(); - DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() + - rtp_packet->padding_size()); + if (include_overhead_) { + packet_size += DataSize::Bytes(rtp_packet->headers_size()) + + transport_overhead_per_packet_; + } - if (include_overhead_) { - packet_size += DataSize::Bytes(rtp_packet->headers_size()) + - transport_overhead_per_packet_; - } + packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); + for (auto& packet : packet_sender_->FetchFec()) { + EnqueuePacket(std::move(packet)); + } + data_sent += packet_size; - packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); - for (auto& packet : packet_sender_->FetchFec()) { - EnqueuePacket(std::move(packet)); - } - data_sent += packet_size; + // Send done, update send time. + OnPacketSent(packet_type, packet_size, now); - // Send done, update send/process time to the target send time. - OnPacketSent(packet_type, packet_size, target_send_time); + // If we are currently probing, we need to stop the send loop when we + // have reached the send target. + if (is_probing && data_sent >= recommended_probe_size) { + break; + } - // If we are currently probing, we need to stop the send loop when we have - // reached the send target. - if (is_probing && data_sent >= recommended_probe_size) { - break; - } - - if (mode_ == ProcessMode::kDynamic) { // Update target send time in case that are more packets that we are late // in processing. - Timestamp next_send_time = NextSendTime(); - if (next_send_time.IsMinusInfinity()) { - target_send_time = now; - } else { - target_send_time = std::min(now, next_send_time); + if (mode_ == ProcessMode::kDynamic) { + target_send_time = NextSendTime(); + if (target_send_time > now) { + // Exit loop if not probing. + if (!is_probing) { + break; + } + target_send_time = now; + } + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time)); } } } - last_process_time_ = std::max(last_process_time_, previous_process_time); - if (is_probing) { probing_send_failure_ = data_sent == DataSize::Zero(); if (!probing_send_failure_) { @@ -631,8 +591,8 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, } if (packet_counter_ == 0) { - // We can not send padding unless a normal packet has first been sent. If we - // do, timestamps get messed up. + // We can not send padding unless a normal packet has first been sent. If + // we do, timestamps get messed up. return DataSize::Zero(); } @@ -697,25 +657,16 @@ std::unique_ptr PacingController::GetPendingPacket( void PacingController::OnPacketSent(RtpPacketMediaType packet_type, DataSize packet_size, Timestamp send_time) { - if (!first_sent_packet_time_) { + if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) { first_sent_packet_time_ = send_time; } + bool audio_packet = packet_type == RtpPacketMediaType::kAudio; - if (!audio_packet || account_for_audio_) { - // Update media bytes sent. + if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) { UpdateBudgetWithSentData(packet_size); } - last_send_time_ = send_time; - last_process_time_ = send_time; -} -void PacingController::OnPaddingSent(DataSize data_sent) { - if (data_sent > DataSize::Zero()) { - UpdateBudgetWithSentData(data_sent); - } - Timestamp now = CurrentTime(); - last_send_time_ = now; - last_process_time_ = now; + last_send_time_ = send_time; } void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { @@ -733,17 +684,24 @@ void PacingController::UpdateBudgetWithSentData(DataSize size) { outstanding_data_ += size; if (mode_ == ProcessMode::kPeriodic) { media_budget_.UseBudget(size.bytes()); - padding_budget_.UseBudget(size.bytes()); } else { media_debt_ += size; media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime); + } + UpdatePaddingBudgetWithSentData(size); +} + +void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) { + if (mode_ == ProcessMode::kPeriodic) { + padding_budget_.UseBudget(size.bytes()); + } else { padding_debt_ += size; padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime); } } void PacingController::SetQueueTimeLimit(TimeDelta limit) { - queue_time_limit = limit; + queue_time_limit_ = limit; } } // namespace webrtc diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index 5d6d26b917..e7ef201a77 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -79,6 +79,11 @@ class PacingController { static const TimeDelta kMinSleepTime; + // Allow probes to be processed slightly ahead of inteded send time. Currently + // set to 1ms as this is intended to allow times be rounded down to the + // nearest millisecond. + static const TimeDelta kMaxEarlyProbeProcessing; + PacingController(Clock* clock, PacketSender* packet_sender, RtcEventLog* event_log, @@ -158,6 +163,7 @@ class PacingController { // Updates the number of bytes that can be sent for the next time interval. void UpdateBudgetWithElapsedTime(TimeDelta delta); void UpdateBudgetWithSentData(DataSize size); + void UpdatePaddingBudgetWithSentData(DataSize size); DataSize PaddingToAdd(DataSize recommended_probe_size, DataSize data_sent) const; @@ -169,7 +175,6 @@ class PacingController { void OnPacketSent(RtpPacketMediaType packet_type, DataSize packet_size, Timestamp send_time); - void OnPaddingSent(DataSize padding_sent); Timestamp CurrentTime() const; @@ -196,9 +201,9 @@ class PacingController { mutable Timestamp last_timestamp_; bool paused_; - // In dynamic mode, `media_budget_` and `padding_budget_` will be used to + // In periodic mode, `media_budget_` and `padding_budget_` will be used to // track when packets can be sent. - // In periodic mode, `media_debt_` and `padding_debt_` will be used together + // In dynamic mode, `media_debt_` and `padding_debt_` will be used together // with the target rates. // This is the media budget, keeping track of how many bits of media @@ -229,7 +234,7 @@ class PacingController { DataSize congestion_window_size_; DataSize outstanding_data_; - TimeDelta queue_time_limit; + TimeDelta queue_time_limit_; bool account_for_audio_; bool include_overhead_; }; diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index c392a88720..18e53d1ad7 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -12,15 +12,14 @@ #include #include -#include "absl/memory/memory.h" + #include "rtc_base/checks.h" -#include "rtc_base/event.h" -#include "rtc_base/logging.h" -#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/trace_event.h" namespace webrtc { +const int TaskQueuePacedSender::kNoPacketHoldback = -1; + TaskQueuePacedSender::TaskQueuePacedSender( Clock* clock, PacingController::PacketSender* packet_sender, @@ -41,10 +40,11 @@ TaskQueuePacedSender::TaskQueuePacedSender( is_started_(false), is_shutdown_(false), packet_size_(/*alpha=*/0.95), + include_overhead_(false), task_queue_(task_queue_factory->CreateTaskQueue( "TaskQueuePacedSender", TaskQueueFactory::Priority::NORMAL)) { - packet_size_.Apply(1, 0); + RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime); } TaskQueuePacedSender::~TaskQueuePacedSender() { @@ -140,7 +140,11 @@ void TaskQueuePacedSender::EnqueuePackets( task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable { RTC_DCHECK_RUN_ON(&task_queue_); for (auto& packet : packets_) { - packet_size_.Apply(1, packet->size()); + size_t packet_size = packet->payload_size() + packet->padding_size(); + if (include_overhead_) { + packet_size += packet->headers_size(); + } + packet_size_.Apply(1, packet_size); RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero()); pacing_controller_.EnqueuePacket(std::move(packet)); } @@ -159,6 +163,7 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) { void TaskQueuePacedSender::SetIncludeOverhead() { task_queue_.PostTask([this]() { RTC_DCHECK_RUN_ON(&task_queue_); + include_overhead_ = true; pacing_controller_.SetIncludeOverhead(); MaybeProcessPackets(Timestamp::MinusInfinity()); }); @@ -194,13 +199,16 @@ absl::optional TaskQueuePacedSender::FirstSentPacketTime() const { TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const { Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time; - if (oldest_packet.IsInfinite()) + if (oldest_packet.IsInfinite()) { return TimeDelta::Zero(); + } // (webrtc:9716): The clock is not always monotonic. Timestamp current = clock_->CurrentTime(); - if (current < oldest_packet) + if (current < oldest_packet) { return TimeDelta::Zero(); + } + return current - oldest_packet; } @@ -213,70 +221,70 @@ void TaskQueuePacedSender::MaybeProcessPackets( Timestamp scheduled_process_time) { RTC_DCHECK_RUN_ON(&task_queue_); +#if RTC_TRACE_EVENTS_ENABLED + TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"), + "TaskQueuePacedSender::MaybeProcessPackets"); +#endif + if (is_shutdown_ || !is_started_) { return; } - // Normally, run ProcessPackets() only if this is the scheduled task. - // If it is not but it is already time to process and there either is - // no scheduled task or the schedule has shifted forward in time, run - // anyway and clear any schedule. - Timestamp next_process_time = pacing_controller_.NextSendTime(); + Timestamp next_send_time = pacing_controller_.NextSendTime(); + RTC_DCHECK(next_send_time.IsFinite()); const Timestamp now = clock_->CurrentTime(); - const bool is_scheduled_call = next_process_time_ == scheduled_process_time; - if (is_scheduled_call) { - // Indicate no pending scheduled call. + + TimeDelta early_execute_margin = + pacing_controller_.IsProbing() + ? PacingController::kMaxEarlyProbeProcessing + : TimeDelta::Zero(); + + // Process packets and update stats. + while (next_send_time <= now + early_execute_margin) { + pacing_controller_.ProcessPackets(); + next_send_time = pacing_controller_.NextSendTime(); + RTC_DCHECK(next_send_time.IsFinite()); + } + UpdateStats(); + + // Ignore retired scheduled task, otherwise reset `next_process_time_`. + if (scheduled_process_time.IsFinite()) { + if (scheduled_process_time != next_process_time_) { + return; + } next_process_time_ = Timestamp::MinusInfinity(); } - if (is_scheduled_call || - (now >= next_process_time && (next_process_time_.IsInfinite() || - next_process_time < next_process_time_))) { - pacing_controller_.ProcessPackets(); - next_process_time = pacing_controller_.NextSendTime(); - } - TimeDelta hold_back_window = max_hold_back_window_; - DataRate pacing_rate = pacing_controller_.pacing_rate(); - DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered()); - if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() && - !avg_packet_size.IsZero()) { - TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate; - hold_back_window = - std::min(hold_back_window, - avg_packet_send_time * max_hold_back_window_in_packets_); - } - - absl::optional time_to_next_process; - if (pacing_controller_.IsProbing() && - next_process_time != next_process_time_) { - // If we're probing and there isn't already a wakeup scheduled for the next - // process time, always post a task and just round sleep time down to - // nearest millisecond. - if (next_process_time.IsMinusInfinity()) { - time_to_next_process = TimeDelta::Zero(); - } else { - time_to_next_process = - std::max(TimeDelta::Zero(), - (next_process_time - now).RoundDownTo(TimeDelta::Millis(1))); + // Do not hold back in probing. + TimeDelta hold_back_window = TimeDelta::Zero(); + if (!pacing_controller_.IsProbing()) { + hold_back_window = max_hold_back_window_; + DataRate pacing_rate = pacing_controller_.pacing_rate(); + if (max_hold_back_window_in_packets_ != kNoPacketHoldback && + !pacing_rate.IsZero() && + packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) { + TimeDelta avg_packet_send_time = + DataSize::Bytes(packet_size_.filtered()) / pacing_rate; + hold_back_window = + std::min(hold_back_window, + avg_packet_send_time * max_hold_back_window_in_packets_); } - } else if (next_process_time_.IsMinusInfinity() || - next_process_time <= next_process_time_ - hold_back_window) { - // Schedule a new task since there is none currently scheduled - // (`next_process_time_` is infinite), or the new process time is at least - // one holdback window earlier than whatever is currently scheduled. - time_to_next_process = std::max(next_process_time - now, hold_back_window); } - if (time_to_next_process) { - // Set a new scheduled process time and post a delayed task. - next_process_time_ = next_process_time; + // Calculate next process time. + TimeDelta time_to_next_process = + std::max(hold_back_window, next_send_time - now - early_execute_margin); + next_send_time = now + time_to_next_process; + // If no in flight task or in flight task is later than `next_send_time`, + // schedule a new one. Previous in flight task will be retired. + if (next_process_time_.IsMinusInfinity() || + next_process_time_ > next_send_time) { task_queue_.PostDelayedHighPrecisionTask( - [this, next_process_time]() { MaybeProcessPackets(next_process_time); }, - time_to_next_process->ms()); + [this, next_send_time]() { MaybeProcessPackets(next_send_time); }, + time_to_next_process.RoundUpTo(TimeDelta::Millis(1)).ms()); + next_process_time_ = next_send_time; } - - UpdateStats(); } void TaskQueuePacedSender::UpdateStats() { diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 353f137963..bf17de52f9 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -14,9 +14,7 @@ #include #include -#include #include -#include #include #include "absl/types/optional.h" @@ -29,7 +27,6 @@ #include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "rtc_base/numerics/exp_filter.h" -#include "rtc_base/synchronization/mutex.h" #include "rtc_base/task_queue.h" #include "rtc_base/thread_annotations.h" @@ -39,6 +36,8 @@ class RtcEventLog; class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { public: + static const int kNoPacketHoldback; + // The `hold_back_window` parameter sets a lower bound on time to sleep if // there is currently a pacer queue and packets can't immediately be // processed. Increasing this reduces thread wakeups at the expense of higher @@ -51,7 +50,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { const WebRtcKeyValueConfig* field_trials, TaskQueueFactory* task_queue_factory, TimeDelta max_hold_back_window = PacingController::kMinSleepTime, - int max_hold_back_window_in_packets = -1); + int max_hold_back_window_in_packets = kNoPacketHoldback); ~TaskQueuePacedSender() override; @@ -156,6 +155,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // Filtered size of enqueued packets, in bytes. rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_); + bool include_overhead_ RTC_GUARDED_BY(task_queue_); mutable Mutex stats_mutex_; Stats current_stats_ RTC_GUARDED_BY(stats_mutex_); diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index d78365d499..c2d1fbd095 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -37,7 +37,6 @@ constexpr uint32_t kVideoSsrc = 234565; constexpr uint32_t kVideoRtxSsrc = 34567; constexpr uint32_t kFlexFecSsrc = 45678; constexpr size_t kDefaultPacketSize = 1234; -constexpr int kNoPacketHoldback = -1; class MockPacketRouter : public PacketRouter { public: @@ -120,7 +119,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, kNoPacketHoldback); + PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); // Insert a number of packets, covering one second. static constexpr size_t kPacketsToSend = 42; @@ -160,7 +159,7 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, kNoPacketHoldback); + PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); // Insert a number of packets to be sent 200ms apart. const size_t kPacketsPerSecond = 5; @@ -212,7 +211,7 @@ TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, kNoPacketHoldback); + PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -241,11 +240,11 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow, kNoPacketHoldback); + TaskQueuePacedSender pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -278,11 +277,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow, kNoPacketHoldback); + TaskQueuePacedSender pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -306,7 +305,7 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); } -TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { +TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) { ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; @@ -314,7 +313,7 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, kNoPacketHoldback); + PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); // Set rates so one packet adds 4ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -339,15 +338,16 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { // Advance to less than 3ms before next packet send time. time_controller.AdvanceTime(TimeDelta::Micros(1001)); - // Trigger a probe at 4x the current pacing rate and insert the number of + // Trigger a probe at 2x the current pacing rate and insert the number of // packets the probe needs. const DataRate kProbeRate = 2 * kPacingDataRate; const int kProbeClusterId = 1; pacer.CreateProbeCluster(kProbeRate, kProbeClusterId); - // Expected size for each probe in a cluster is twice the expected bits - // sent during min_probe_delta. - // Expect one additional call since probe always starts with a small + // Expected size for each probe in a cluster is twice the expected bits sent + // during min_probe_delta. + // Expect one additional call since probe always starts with a small (1 byte) + // padding packet that's not counted into the probe rate here. const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2); const DataSize kProbeSize = kProbeRate * kProbeTimeDelta; const size_t kNumPacketsInProbe = @@ -381,7 +381,7 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, kNoPacketHoldback); + PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); // Set rates so one packet adds 4ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -423,8 +423,8 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { // Verify the amount of probing data sent. // Probe always starts with a small (1 byte) padding packet that's not // counted into the probe rate here. - EXPECT_EQ(data_sent, - kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1)); + const DataSize kMinProbeSize = 2 * kMinProbeDelta * kProbingRate; + EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize); } TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) { @@ -534,7 +534,7 @@ TEST(TaskQueuePacedSenderTest, Stats) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, kNoPacketHoldback); + PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); // Simulate ~2mbps video stream, covering one second. static constexpr size_t kPacketsToSend = 200;