From 8d847f077c8d053d2266232e761a37801a58b754 Mon Sep 17 00:00:00 2001 From: Per Kjellander Date: Wed, 1 Jun 2022 20:21:58 +0200 Subject: [PATCH] Introduce PacerController::SendBurstInterval Allows the PacerController to send packets in bursts. If there are enqued packets, or a packet is enqueued while the pacer have a small media debt, an enqued packet is allowed to be sent immediately as long as the debt is smaller than the set burst interval. Bug: b/233850913 Change-Id: Ibb0fa63c97409ca23b9fa7148b5ff6ce8c4517e2 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264462 Reviewed-by: Philip Eliasson Commit-Queue: Per Kjellander Cr-Commit-Position: refs/heads/main@{#37098} --- modules/pacing/pacing_controller.cc | 50 ++++++++++------ modules/pacing/pacing_controller.h | 6 +- modules/pacing/pacing_controller_unittest.cc | 62 ++++++++++++++++++++ 3 files changed, 98 insertions(+), 20 deletions(-) diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 83f79ddc5f..a0f4ecb12e 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -88,6 +88,7 @@ PacingController::PacingController(Clock* clock, padding_target_duration_(GetDynamicPaddingTarget(field_trials_)), min_packet_limit_(kDefaultMinPacketLimit), transport_overhead_per_packet_(DataSize::Zero()), + send_burst_interval_(TimeDelta::Zero()), last_timestamp_(clock_->CurrentTime()), paused_(false), media_debt_(DataSize::Zero()), @@ -243,6 +244,10 @@ void PacingController::SetTransportOverhead(DataSize overhead_per_packet) { transport_overhead_per_packet_ = overhead_per_packet; } +void PacingController::SetSendBurstInterval(TimeDelta burst_interval) { + send_burst_interval_ = burst_interval; +} + TimeDelta PacingController::ExpectedQueueTime() const { RTC_DCHECK_GT(media_rate_, DataRate::Zero()); return TimeDelta::Millis( @@ -334,8 +339,13 @@ Timestamp PacingController::NextSendTime() const { } if (media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) { - // Check how long until we can send the next media packet. - next_send_time = last_process_time_ + media_debt_ / media_rate_; + // If packets are allowed to be sent in a burst, the + // debt is allowed to grow up to one packet more than what can be sent + // during 'send_burst_period_'. + TimeDelta drain_time = media_debt_ / media_rate_; + next_send_time = + last_process_time_ + + ((send_burst_interval_ > drain_time) ? TimeDelta::Zero() : drain_time); } else if (padding_rate_ > DataRate::Zero() && packet_queue_->Empty()) { // If we _don't_ have pending packets, check how long until we have // bandwidth for padding packets. Both media and padding debts must @@ -391,16 +401,16 @@ void PacingController::ProcessPackets() { return; } - TimeDelta early_execute_margin = - prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); + TimeDelta early_execute_margin = + prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); - target_send_time = NextSendTime(); - if (now + early_execute_margin < target_send_time) { - // We are too early, but if queue is empty still allow draining some debt. - // Probing is allowed to be sent up to kMinSleepTime early. - UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now)); - return; - } + target_send_time = NextSendTime(); + if (now + early_execute_margin < target_send_time) { + // We are too early, but if queue is empty still allow draining some debt. + // Probing is allowed to be sent up to kMinSleepTime early. + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now)); + return; + } TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time); @@ -609,15 +619,17 @@ std::unique_ptr PacingController::GetPendingPacket( return nullptr; } - if (now <= target_send_time) { - // We allow sending slightly early if we think that we would actually - // had been able to, had we been right on time - i.e. the current debt - // is not more than would be reduced to zero at the target sent time. - TimeDelta flush_time = media_debt_ / media_rate_; - if (now + flush_time > target_send_time) { - return nullptr; - } + if (now <= target_send_time && send_burst_interval_.IsZero()) { + // We allow sending slightly early if we think that we would actually + // had been able to, had we been right on time - i.e. the current debt + // is not more than would be reduced to zero at the target sent time. + // If we allow packets to be sent in a burst, packet are allowed to be + // sent early. + TimeDelta flush_time = media_debt_ / media_rate_; + if (now + flush_time > target_send_time) { + return nullptr; } + } } return packet_queue_->Pop(); diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index b1686c277f..f7c0939619 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -149,6 +149,10 @@ class PacingController { void SetIncludeOverhead(); void SetTransportOverhead(DataSize overhead_per_packet); + // The pacer is allowed to send enqued packets in bursts and can build up a + // packet "debt" that correspond to approximately the send rate during + // 'burst_interval'. + void SetSendBurstInterval(TimeDelta burst_interval); // Returns the time when the oldest packet was queued. Timestamp OldestPacketEnqueueTime() const; @@ -219,8 +223,8 @@ class PacingController { const TimeDelta padding_target_duration_; TimeDelta min_packet_limit_; - DataSize transport_overhead_per_packet_; + TimeDelta send_burst_interval_; // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic. // The last millisecond timestamp returned by `clock_`. diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index 44f39bf3e4..b9ec80e874 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -2001,5 +2001,67 @@ TEST_F(PacingControllerTest, HandlesSubMicrosecondPaddingInterval) { EXPECT_GT(pacer->NextSendTime(), clock_.CurrentTime()); } +TEST_F(PacingControllerTest, SendsPacketsInBurstImmediately) { + constexpr TimeDelta kMaxDelay = TimeDelta::Millis(20); + PacingController pacer(&clock_, &callback_, trials_); + pacer.SetSendBurstInterval(kMaxDelay); + pacer.SetPacingRates(DataRate::BytesPerSec(10000), DataRate::Zero()); + + // Max allowed send burst size is 100000*20/1000) = 200byte + pacer.EnqueuePacket(video_.BuildNextPacket(100)); + pacer.EnqueuePacket(video_.BuildNextPacket(100)); + pacer.EnqueuePacket(video_.BuildNextPacket(100)); + pacer.ProcessPackets(); + EXPECT_EQ(pacer.QueueSizePackets(), 1u); + EXPECT_EQ(pacer.NextSendTime(), clock_.CurrentTime() + kMaxDelay); + + AdvanceTimeUntil(pacer.NextSendTime()); + pacer.ProcessPackets(); + EXPECT_EQ(pacer.QueueSizePackets(), 0u); +} + +TEST_F(PacingControllerTest, SendsPacketsInBurstEvenIfNotEnqueedAtSameTime) { + constexpr TimeDelta kMaxDelay = TimeDelta::Millis(20); + PacingController pacer(&clock_, &callback_, trials_); + pacer.SetSendBurstInterval(kMaxDelay); + pacer.SetPacingRates(DataRate::BytesPerSec(10000), DataRate::Zero()); + pacer.EnqueuePacket(video_.BuildNextPacket(200)); + EXPECT_EQ(pacer.NextSendTime(), clock_.CurrentTime()); + pacer.ProcessPackets(); + clock_.AdvanceTime(TimeDelta::Millis(1)); + pacer.EnqueuePacket(video_.BuildNextPacket(200)); + EXPECT_EQ(pacer.NextSendTime(), clock_.CurrentTime()); + pacer.ProcessPackets(); + EXPECT_EQ(pacer.QueueSizePackets(), 0u); +} + +TEST_F(PacingControllerTest, RespectsTargetRateWhenSendingPacketsInBursts) { + PacingController pacer(&clock_, &callback_, trials_); + pacer.SetSendBurstInterval(TimeDelta::Millis(20)); + pacer.SetAccountForAudioPackets(true); + pacer.SetPacingRates(DataRate::KilobitsPerSec(1000), DataRate::Zero()); + Timestamp start_time = clock_.CurrentTime(); + // Inject 100 packets, with size 1000bytes over 100ms. + // Expect only 1Mbps / (8*1000) / 10 = 12 packets to be sent. + // Packets are sent in burst. Each burst is then 3 packets * 1000bytes at + // 1Mbits = 24ms long. Thus, expect 4 bursts. + EXPECT_CALL(callback_, SendPacket).Times(12); + int number_of_bursts = 0; + while (clock_.CurrentTime() < start_time + TimeDelta::Millis(100)) { + pacer.EnqueuePacket(video_.BuildNextPacket(1000)); + pacer.EnqueuePacket(video_.BuildNextPacket(1000)); + pacer.EnqueuePacket(video_.BuildNextPacket(1000)); + pacer.EnqueuePacket(video_.BuildNextPacket(1000)); + pacer.EnqueuePacket(video_.BuildNextPacket(1000)); + if (pacer.NextSendTime() <= clock_.CurrentTime()) { + pacer.ProcessPackets(); + ++number_of_bursts; + } + clock_.AdvanceTime(TimeDelta::Millis(5)); + } + EXPECT_EQ(pacer.QueueSizePackets(), 88u); + EXPECT_EQ(number_of_bursts, 4); +} + } // namespace } // namespace webrtc