From 0f86c1f1253e4edca0ebcbbd49fb8d385e519ab8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Tue, 26 Oct 2021 16:19:03 +0200 Subject: [PATCH] Add ability to control TaskQueuePacedSender holdback window. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Holdback window can be specified as absolute time and in terms of packet send times. Example: WebRTC-TaskQueuePacer/Enabled,holdback_window:20ms,holdback_packet:3/ If current conditions have us running with 2000kbps pacing rate and 1250byte (10kbit) packets, each packet send time is 5ms. The holdback window would then be min(20ms, 3*5ms) = 15ms. The default is like before 1ms and packets no take into account when TQ pacer is used, parameters have no effect with legacy process thread pacer. Bug: webrtc:10809 Change-Id: I800de05107e2d4df461eabaaf1ca04fb4c5de51e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/233421 Commit-Queue: Erik Språng Reviewed-by: Philip Eliasson Reviewed-by: Henrik Boström Cr-Commit-Position: refs/heads/main@{#35266} --- call/rtp_transport_controller_send.cc | 37 +- call/rtp_transport_controller_send.h | 12 +- modules/pacing/BUILD.gn | 1 + modules/pacing/pacing_controller.h | 1 + modules/pacing/task_queue_paced_sender.cc | 27 +- modules/pacing/task_queue_paced_sender.h | 13 +- .../task_queue_paced_sender_unittest.cc | 970 ++++++++++-------- 7 files changed, 602 insertions(+), 459 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 9d27b330cb..8825df2f7b 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -75,6 +75,15 @@ bool IsRelayed(const rtc::NetworkRoute& route) { } // namespace +RtpTransportControllerSend::PacerSettings::PacerSettings( + const WebRtcKeyValueConfig* trials) + : tq_disabled("Disabled"), + holdback_window("holdback_window", PacingController::kMinSleepTime), + holdback_packets("holdback_packets", -1) { + ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets}, + trials->Lookup("WebRTC-TaskQueuePacer")); +} + RtpTransportControllerSend::RtpTransportControllerSend( Clock* clock, webrtc::RtcEventLog* event_log, @@ -89,8 +98,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( bitrate_configurator_(bitrate_config), pacer_started_(false), process_thread_(std::move(process_thread)), - use_task_queue_pacer_(!IsDisabled(trials, "WebRTC-TaskQueuePacer")), - process_thread_pacer_(use_task_queue_pacer_ + pacer_settings_(trials), + process_thread_pacer_(pacer_settings_.use_task_queue_pacer() ? nullptr : new PacedSender(clock, &packet_router_, @@ -98,14 +107,14 @@ RtpTransportControllerSend::RtpTransportControllerSend( trials, process_thread_.get())), task_queue_pacer_( - use_task_queue_pacer_ - ? new TaskQueuePacedSender( - clock, - &packet_router_, - event_log, - trials, - task_queue_factory, - /*hold_back_window = */ PacingController::kMinSleepTime) + pacer_settings_.use_task_queue_pacer() + ? new TaskQueuePacedSender(clock, + &packet_router_, + event_log, + trials, + task_queue_factory, + pacer_settings_.holdback_window.Get(), + pacer_settings_.holdback_packets.Get()) : nullptr), observer_(nullptr), controller_factory_override_(controller_factory), @@ -194,14 +203,14 @@ void RtpTransportControllerSend::UpdateControlState() { } RtpPacketPacer* RtpTransportControllerSend::pacer() { - if (use_task_queue_pacer_) { + if (pacer_settings_.use_task_queue_pacer()) { return task_queue_pacer_.get(); } return process_thread_pacer_.get(); } const RtpPacketPacer* RtpTransportControllerSend::pacer() const { - if (use_task_queue_pacer_) { + if (pacer_settings_.use_task_queue_pacer()) { return task_queue_pacer_.get(); } return process_thread_pacer_.get(); @@ -226,7 +235,7 @@ RtpTransportControllerSend::transport_feedback_observer() { } RtpPacketSender* RtpTransportControllerSend::packet_sender() { - if (use_task_queue_pacer_) { + if (pacer_settings_.use_task_queue_pacer()) { return task_queue_pacer_.get(); } return process_thread_pacer_.get(); @@ -503,7 +512,7 @@ void RtpTransportControllerSend::IncludeOverheadInPacedSender() { void RtpTransportControllerSend::EnsureStarted() { if (!pacer_started_) { pacer_started_ = true; - if (use_task_queue_pacer_) { + if (pacer_settings_.use_task_queue_pacer()) { task_queue_pacer_->EnsureStarted(); } else { process_thread_->Start(); diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index f1b90c7f52..ac4213d568 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -128,6 +128,16 @@ class RtpTransportControllerSend final void OnRemoteNetworkEstimate(NetworkStateEstimate estimate) override; private: + struct PacerSettings { + explicit PacerSettings(const WebRtcKeyValueConfig* trials); + + bool use_task_queue_pacer() const { return !tq_disabled.Get(); } + + FieldTrialFlag tq_disabled; // Kill-switch not normally used. + FieldTrialParameter holdback_window; + FieldTrialParameter holdback_packets; + }; + void MaybeCreateControllers() RTC_RUN_ON(task_queue_); void UpdateInitialConstraints(TargetRateConstraints new_contraints) RTC_RUN_ON(task_queue_); @@ -158,7 +168,7 @@ class RtpTransportControllerSend final std::map network_routes_; bool pacer_started_; const std::unique_ptr process_thread_; - const bool use_task_queue_pacer_; + const PacerSettings pacer_settings_; std::unique_ptr process_thread_pacer_; std::unique_ptr task_queue_pacer_; diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 0787105f14..0653356c1d 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -48,6 +48,7 @@ rtc_library("pacing") { "../../logging:rtc_event_pacing", "../../rtc_base:checks", "../../rtc_base:rtc_base_approved", + "../../rtc_base:rtc_numerics", "../../rtc_base:rtc_task_queue", "../../rtc_base/experiments:field_trial_parser", "../../rtc_base/synchronization:mutex", diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index 38bb9e543e..aade9322f2 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -103,6 +103,7 @@ class PacingController { // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate); + DataRate pacing_rate() const { return pacing_bitrate_; } // Currently audio traffic is not accounted by pacer and passed through. // With the introduction of audio BWE audio traffic will be accounted for diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index 515cba31dd..f2de9ecb64 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -36,9 +36,11 @@ TaskQueuePacedSender::TaskQueuePacedSender( RtcEventLog* event_log, const WebRtcKeyValueConfig* field_trials, TaskQueueFactory* task_queue_factory, - TimeDelta hold_back_window) + TimeDelta max_hold_back_window, + int max_hold_back_window_in_packets) : clock_(clock), - hold_back_window_(hold_back_window), + max_hold_back_window_(max_hold_back_window), + max_hold_back_window_in_packets_(max_hold_back_window_in_packets), pacing_controller_(clock, packet_sender, event_log, @@ -48,9 +50,12 @@ TaskQueuePacedSender::TaskQueuePacedSender( stats_update_scheduled_(false), last_stats_time_(Timestamp::MinusInfinity()), is_shutdown_(false), + packet_size_(/*alpha=*/0.95), task_queue_(task_queue_factory->CreateTaskQueue( "TaskQueuePacedSender", - TaskQueueFactory::Priority::NORMAL)) {} + TaskQueueFactory::Priority::NORMAL)) { + packet_size_.Apply(1, 0); +} TaskQueuePacedSender::~TaskQueuePacedSender() { // Post an immediate task to mark the queue as shutting down. @@ -144,6 +149,7 @@ void TaskQueuePacedSender::EnqueuePackets( task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable { RTC_DCHECK_RUN_ON(&task_queue_); for (auto& packet : packets_) { + packet_size_.Apply(1, packet->size()); RTC_DCHECK_GE(packet->capture_time_ms(), 0); pacing_controller_.EnqueuePacket(std::move(packet)); } @@ -227,6 +233,17 @@ void TaskQueuePacedSender::MaybeProcessPackets( next_process_time = pacing_controller_.NextSendTime(); } + TimeDelta hold_back_window = max_hold_back_window_; + DataRate pacing_rate = pacing_controller_.pacing_rate(); + DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered()); + if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() && + !avg_packet_size.IsZero()) { + TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate; + hold_back_window = + std::min(hold_back_window, + avg_packet_send_time * max_hold_back_window_in_packets_); + } + absl::optional time_to_next_process; if (pacing_controller_.IsProbing() && next_process_time != next_process_time_) { @@ -241,11 +258,11 @@ void TaskQueuePacedSender::MaybeProcessPackets( (next_process_time - now).RoundDownTo(TimeDelta::Millis(1))); } } else if (next_process_time_.IsMinusInfinity() || - next_process_time <= next_process_time_ - hold_back_window_) { + next_process_time <= next_process_time_ - hold_back_window) { // Schedule a new task since there is none currently scheduled // (`next_process_time_` is infinite), or the new process time is at least // one holdback window earlier than whatever is currently scheduled. - time_to_next_process = std::max(next_process_time - now, hold_back_window_); + time_to_next_process = std::max(next_process_time - now, hold_back_window); } if (time_to_next_process) { diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index d39417bf45..ebe9846075 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -29,6 +29,7 @@ #include "modules/pacing/pacing_controller.h" #include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" +#include "rtc_base/numerics/exp_filter.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/task_queue.h" #include "rtc_base/thread_annotations.h" @@ -43,14 +44,15 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // there is currently a pacer queue and packets can't immediately be // processed. Increasing this reduces thread wakeups at the expense of higher // latency. - // TODO(bugs.webrtc.org/10809): Remove default value for hold_back_window. + // TODO(bugs.webrtc.org/10809): Remove default values. TaskQueuePacedSender( Clock* clock, PacingController::PacketSender* packet_sender, RtcEventLog* event_log, const WebRtcKeyValueConfig* field_trials, TaskQueueFactory* task_queue_factory, - TimeDelta hold_back_window = PacingController::kMinSleepTime); + TimeDelta max_hold_back_window = PacingController::kMinSleepTime, + int max_hold_back_window_in_packets = -1); ~TaskQueuePacedSender() override; @@ -132,7 +134,9 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { Stats GetStats() const; Clock* const clock_; - const TimeDelta hold_back_window_; + const TimeDelta max_hold_back_window_; + const int max_hold_back_window_in_packets_; + PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_); // We want only one (valid) delayed process task in flight at a time. @@ -161,6 +165,9 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // never drain. bool is_shutdown_ RTC_GUARDED_BY(task_queue_); + // Filtered size of enqueued packets, in bytes. + rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_); + mutable Mutex stats_mutex_; Stats current_stats_ RTC_GUARDED_BY(stats_mutex_); diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index aca1ba0b4d..b921331dd1 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -37,6 +37,7 @@ constexpr uint32_t kVideoSsrc = 234565; constexpr uint32_t kVideoRtxSsrc = 34567; constexpr uint32_t kFlexFecSsrc = 45678; constexpr size_t kDefaultPacketSize = 1234; +constexpr int kNoPacketHoldback = -1; class MockPacketRouter : public PacketRouter { public: @@ -70,13 +71,15 @@ class TaskQueuePacedSenderForTest : public TaskQueuePacedSender { RtcEventLog* event_log, const WebRtcKeyValueConfig* field_trials, TaskQueueFactory* task_queue_factory, - TimeDelta hold_back_window) + TimeDelta hold_back_window, + int max_hold_back_window_in_packets) : TaskQueuePacedSender(clock, packet_router, event_log, field_trials, task_queue_factory, - hold_back_window) {} + hold_back_window, + max_hold_back_window_in_packets) {} void OnStatsUpdated(const Stats& stats) override { ++num_stats_updates_; @@ -110,484 +113,579 @@ std::vector> GeneratePadding( namespace test { - std::unique_ptr BuildRtpPacket(RtpPacketMediaType type) { - auto packet = std::make_unique(nullptr); - packet->set_packet_type(type); - switch (type) { - case RtpPacketMediaType::kAudio: - packet->SetSsrc(kAudioSsrc); - break; - case RtpPacketMediaType::kVideo: - packet->SetSsrc(kVideoSsrc); - break; - case RtpPacketMediaType::kRetransmission: - case RtpPacketMediaType::kPadding: - packet->SetSsrc(kVideoRtxSsrc); - break; - case RtpPacketMediaType::kForwardErrorCorrection: - packet->SetSsrc(kFlexFecSsrc); - break; - } - - packet->SetPayloadSize(kDefaultPacketSize); - return packet; +std::unique_ptr BuildRtpPacket(RtpPacketMediaType type) { + auto packet = std::make_unique(nullptr); + packet->set_packet_type(type); + switch (type) { + case RtpPacketMediaType::kAudio: + packet->SetSsrc(kAudioSsrc); + break; + case RtpPacketMediaType::kVideo: + packet->SetSsrc(kVideoSsrc); + break; + case RtpPacketMediaType::kRetransmission: + case RtpPacketMediaType::kPadding: + packet->SetSsrc(kVideoRtxSsrc); + break; + case RtpPacketMediaType::kForwardErrorCorrection: + packet->SetSsrc(kFlexFecSsrc); + break; } - std::vector> GeneratePackets( - RtpPacketMediaType type, - size_t num_packets) { - std::vector> packets; - for (size_t i = 0; i < num_packets; ++i) { - packets.push_back(BuildRtpPacket(type)); - } - return packets; + packet->SetPayloadSize(kDefaultPacketSize); + return packet; +} + +std::vector> GeneratePackets( + RtpPacketMediaType type, + size_t num_packets) { + std::vector> packets; + for (size_t i = 0; i < num_packets; ++i) { + packets.push_back(BuildRtpPacket(type)); } + return packets; +} - TEST(TaskQueuePacedSenderTest, PacesPackets) { - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); +TEST(TaskQueuePacedSenderTest, PacesPackets) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime, kNoPacketHoldback); - // Insert a number of packets, covering one second. - static constexpr size_t kPacketsToSend = 42; - pacer.SetPacingRates( - DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), - DataRate::Zero()); - pacer.EnsureStarted(); - pacer.EnqueuePackets( - GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); + // Insert a number of packets, covering one second. + static constexpr size_t kPacketsToSend = 42; + pacer.SetPacingRates( + DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), + DataRate::Zero()); + pacer.EnsureStarted(); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); - // Expect all of them to be sent. - size_t packets_sent = 0; - Timestamp end_time = Timestamp::PlusInfinity(); - EXPECT_CALL(packet_router, SendPacket) - .WillRepeatedly([&](std::unique_ptr packet, - const PacedPacketInfo& cluster_info) { - ++packets_sent; - if (packets_sent == kPacketsToSend) { - end_time = time_controller.GetClock()->CurrentTime(); - } - }); + // Expect all of them to be sent. + size_t packets_sent = 0; + Timestamp end_time = Timestamp::PlusInfinity(); + EXPECT_CALL(packet_router, SendPacket) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + ++packets_sent; + if (packets_sent == kPacketsToSend) { + end_time = time_controller.GetClock()->CurrentTime(); + } + }); - const Timestamp start_time = time_controller.GetClock()->CurrentTime(); + const Timestamp start_time = time_controller.GetClock()->CurrentTime(); - // Packets should be sent over a period of close to 1s. Expect a little - // lower than this since initial probing is a bit quicker. - time_controller.AdvanceTime(TimeDelta::Seconds(1)); - EXPECT_EQ(packets_sent, kPacketsToSend); - ASSERT_TRUE(end_time.IsFinite()); - EXPECT_NEAR((end_time - start_time).ms(), 1000.0, 50.0); - } + // Packets should be sent over a period of close to 1s. Expect a little + // lower than this since initial probing is a bit quicker. + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + EXPECT_EQ(packets_sent, kPacketsToSend); + ASSERT_TRUE(end_time.IsFinite()); + EXPECT_NEAR((end_time - start_time).ms(), 1000.0, 50.0); +} - TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); +TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime, kNoPacketHoldback); - // Insert a number of packets to be sent 200ms apart. - const size_t kPacketsPerSecond = 5; - const DataRate kPacingRate = - DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond); - pacer.SetPacingRates(kPacingRate, DataRate::Zero()); - pacer.EnsureStarted(); + // Insert a number of packets to be sent 200ms apart. + const size_t kPacketsPerSecond = 5; + const DataRate kPacingRate = + DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond); + pacer.SetPacingRates(kPacingRate, DataRate::Zero()); + pacer.EnsureStarted(); - // Send some initial packets to be rid of any probes. - EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond); - pacer.EnqueuePackets( - GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond)); - time_controller.AdvanceTime(TimeDelta::Seconds(1)); + // Send some initial packets to be rid of any probes. + EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond)); + time_controller.AdvanceTime(TimeDelta::Seconds(1)); - // Insert three packets, and record send time of each of them. - // After the second packet is sent, double the send rate so we can - // check the third packets is sent after half the wait time. - Timestamp first_packet_time = Timestamp::MinusInfinity(); - Timestamp second_packet_time = Timestamp::MinusInfinity(); - Timestamp third_packet_time = Timestamp::MinusInfinity(); + // Insert three packets, and record send time of each of them. + // After the second packet is sent, double the send rate so we can + // check the third packets is sent after half the wait time. + Timestamp first_packet_time = Timestamp::MinusInfinity(); + Timestamp second_packet_time = Timestamp::MinusInfinity(); + Timestamp third_packet_time = Timestamp::MinusInfinity(); - EXPECT_CALL(packet_router, SendPacket) - .Times(3) - .WillRepeatedly([&](std::unique_ptr packet, - const PacedPacketInfo& cluster_info) { - if (first_packet_time.IsInfinite()) { - first_packet_time = time_controller.GetClock()->CurrentTime(); - } else if (second_packet_time.IsInfinite()) { - second_packet_time = time_controller.GetClock()->CurrentTime(); - pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); - } else { - third_packet_time = time_controller.GetClock()->CurrentTime(); - } - }); + EXPECT_CALL(packet_router, SendPacket) + .Times(3) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + if (first_packet_time.IsInfinite()) { + first_packet_time = time_controller.GetClock()->CurrentTime(); + } else if (second_packet_time.IsInfinite()) { + second_packet_time = time_controller.GetClock()->CurrentTime(); + pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); + } else { + third_packet_time = time_controller.GetClock()->CurrentTime(); + } + }); - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3)); - time_controller.AdvanceTime(TimeDelta::Millis(500)); - ASSERT_TRUE(third_packet_time.IsFinite()); - EXPECT_NEAR((second_packet_time - first_packet_time).ms(), 200.0, - 1.0); - EXPECT_NEAR((third_packet_time - second_packet_time).ms(), 100.0, - 1.0); - } + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3)); + time_controller.AdvanceTime(TimeDelta::Millis(500)); + ASSERT_TRUE(third_packet_time.IsFinite()); + EXPECT_NEAR((second_packet_time - first_packet_time).ms(), 200.0, + 1.0); + EXPECT_NEAR((third_packet_time - second_packet_time).ms(), 100.0, + 1.0); +} - TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); +TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime, kNoPacketHoldback); - const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate; + const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate; - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - pacer.EnsureStarted(); + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + pacer.EnsureStarted(); - // Add some initial video packets, only one should be sent. - EXPECT_CALL(packet_router, SendPacket); - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + // Add some initial video packets, only one should be sent. + EXPECT_CALL(packet_router, SendPacket); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); + + // Advance time, but still before next packet should be sent. + time_controller.AdvanceTime(kPacketPacingTime / 2); + + // Insert an audio packet, it should be sent immediately. + EXPECT_CALL(packet_router, SendPacket); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); +} + +TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); + + // Set rates so one packet adds one ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + pacer.EnsureStarted(); + + // Add 10 packets. The first should be sent immediately since the buffers + // are clear. + EXPECT_CALL(packet_router, SendPacket); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); + + // Advance time to 1ms before the coalescing window ends. No packets should + // be sent. + EXPECT_CALL(packet_router, SendPacket).Times(0); + time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); + + // Advance time to where coalescing window ends. All packets that should + // have been sent up til now will be sent. + EXPECT_CALL(packet_router, SendPacket).Times(5); + time_controller.AdvanceTime(TimeDelta::Millis(1)); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); +} + +TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); + + // Set rates so one packet adds one ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + pacer.EnsureStarted(); + + // Add 10 packets. The first should be sent immediately since the buffers + // are clear. This will also trigger the probe to start. + EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); + pacer.CreateProbeCluster(kPacingDataRate * 2, 17); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); + + // Advance time to 1ms before the coalescing window ends. Packets should be + // flying. + EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); + time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); +} + +TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); + const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + pacer.EnsureStarted(); + + const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); + + // Nothing inserted, no stats updates yet. + EXPECT_EQ(pacer.num_stats_updates_, 0u); + + // Insert one packet, stats should be updated. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, 1u); + + // Advance time half of the min stats update interval, and trigger a + // refresh - stats should not be updated yet. + time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); + pacer.EnqueuePackets({}); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, 1u); + + // Advance time the next half, now stats update is triggered. + time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); + pacer.EnqueuePackets({}); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, 2u); +} + +TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); + + // Set rates so one packet adds 10ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(10); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); + const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33); + + // Nothing inserted, no stats updates yet. + size_t num_expected_stats_updates = 0; + EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + pacer.EnsureStarted(); + time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); + // Updating pacing rates refreshes stats. + EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); + + // Record time when we insert first packet, this triggers the scheduled + // stats updating. + Clock* const clock = time_controller.GetClock(); + const Timestamp start_time = clock->CurrentTime(); + + while (clock->CurrentTime() - start_time <= + kMaxTimeBetweenStatsUpdates - kPacketPacingTime) { + // Enqueue packet, expect stats update. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); time_controller.AdvanceTime(TimeDelta::Zero()); - ::testing::Mock::VerifyAndClearExpectations(&packet_router); + EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - // Advance time, but still before next packet should be sent. + // Advance time to halfway through pacing time, expect another stats + // update. time_controller.AdvanceTime(kPacketPacingTime / 2); - - // Insert an audio packet, it should be sent immediately. - EXPECT_CALL(packet_router, SendPacket); - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); - time_controller.AdvanceTime(TimeDelta::Zero()); - ::testing::Mock::VerifyAndClearExpectations(&packet_router); - } - - TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kCoalescingWindow); - - // Set rates so one packet adds one ms of buffer level. - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); - const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; - - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - pacer.EnsureStarted(); - - // Add 10 packets. The first should be sent immediately since the buffers - // are clear. - EXPECT_CALL(packet_router, SendPacket); - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); - time_controller.AdvanceTime(TimeDelta::Zero()); - ::testing::Mock::VerifyAndClearExpectations(&packet_router); - - // Advance time to 1ms before the coalescing window ends. No packets should - // be sent. - EXPECT_CALL(packet_router, SendPacket).Times(0); - time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); - - // Advance time to where coalescing window ends. All packets that should - // have been sent up til now will be sent. - EXPECT_CALL(packet_router, SendPacket).Times(5); - time_controller.AdvanceTime(TimeDelta::Millis(1)); - ::testing::Mock::VerifyAndClearExpectations(&packet_router); - } - - TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kCoalescingWindow); - - // Set rates so one packet adds one ms of buffer level. - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); - const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; - - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - pacer.EnsureStarted(); - - // Add 10 packets. The first should be sent immediately since the buffers - // are clear. This will also trigger the probe to start. - EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); - pacer.CreateProbeCluster(kPacingDataRate * 2, 17); - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); - time_controller.AdvanceTime(TimeDelta::Zero()); - ::testing::Mock::VerifyAndClearExpectations(&packet_router); - - // Advance time to 1ms before the coalescing window ends. Packets should be - // flying. - EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); - time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); - } - - TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kCoalescingWindow); - const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - pacer.EnsureStarted(); - - const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); - - // Nothing inserted, no stats updates yet. - EXPECT_EQ(pacer.num_stats_updates_, 0u); - - // Insert one packet, stats should be updated. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 1u); - - // Advance time half of the min stats update interval, and trigger a - // refresh - stats should not be updated yet. - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); pacer.EnqueuePackets({}); time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 1u); - - // Advance time the next half, now stats update is triggered. - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); - pacer.EnqueuePackets({}); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 2u); - } - - TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kCoalescingWindow); - - // Set rates so one packet adds 10ms of buffer level. - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = TimeDelta::Millis(10); - const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; - const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); - const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33); - - // Nothing inserted, no stats updates yet. - size_t num_expected_stats_updates = 0; - EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - pacer.EnsureStarted(); - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); - // Updating pacing rates refreshes stats. EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - // Record time when we insert first packet, this triggers the scheduled - // stats updating. - Clock* const clock = time_controller.GetClock(); - const Timestamp start_time = clock->CurrentTime(); - - while (clock->CurrentTime() - start_time <= - kMaxTimeBetweenStatsUpdates - kPacketPacingTime) { - // Enqueue packet, expect stats update. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - - // Advance time to halfway through pacing time, expect another stats - // update. - time_controller.AdvanceTime(kPacketPacingTime / 2); - pacer.EnqueuePackets({}); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - - // Advance time the rest of the way. - time_controller.AdvanceTime(kPacketPacingTime / 2); - } - - // At this point, the pace queue is drained so there is no more intersting - // update to be made - but there is still as schduled task that should run - // `kMaxTimeBetweenStatsUpdates` after the first update. - time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates - - clock->CurrentTime()); - EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - - // Advance time a significant time - don't expect any more calls as stats - // updating does not happen when queue is drained. - time_controller.AdvanceTime(TimeDelta::Millis(400)); - EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); + // Advance time the rest of the way. + time_controller.AdvanceTime(kPacketPacingTime / 2); } - TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { - ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); + // At this point, the pace queue is drained so there is no more intersting + // update to be made - but there is still as schduled task that should run + // `kMaxTimeBetweenStatsUpdates` after the first update. + time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates - + clock->CurrentTime()); + EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - // Set rates so one packet adds 4ms of buffer level. - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); - const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; - pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); - pacer.EnsureStarted(); - EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { - return std::vector>(); - }); - EXPECT_CALL(packet_router, GeneratePadding(_)) - .WillRepeatedly( - [](DataSize target_size) { return GeneratePadding(target_size); }); + // Advance time a significant time - don't expect any more calls as stats + // updating does not happen when queue is drained. + time_controller.AdvanceTime(TimeDelta::Millis(400)); + EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); +} - // Enqueue two packets, only the first is sent immediately and the next - // will be scheduled for sending in 4ms. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2)); - const int kNotAProbe = PacedPacketInfo::kNotAProbe; - EXPECT_CALL( - packet_router, - SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, - kNotAProbe))); - // Advance to less than 3ms before next packet send time. - time_controller.AdvanceTime(TimeDelta::Micros(1001)); +TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { + ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime, kNoPacketHoldback); - // Trigger a probe at 4x the current pacing rate and insert the number of - // packets the probe needs. - const DataRate kProbeRate = 2 * kPacingDataRate; - const int kProbeClusterId = 1; - pacer.CreateProbeCluster(kProbeRate, kProbeClusterId); + // Set rates so one packet adds 4ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); + pacer.EnsureStarted(); + EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { + return std::vector>(); + }); + EXPECT_CALL(packet_router, GeneratePadding(_)) + .WillRepeatedly( + [](DataSize target_size) { return GeneratePadding(target_size); }); - // Expected size for each probe in a cluster is twice the expected bits - // sent during min_probe_delta. - // Expect one additional call since probe always starts with a small - const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2); - const DataSize kProbeSize = kProbeRate * kProbeTimeDelta; - const size_t kNumPacketsInProbe = - (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize; - EXPECT_CALL( - packet_router, - SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, - kProbeClusterId))) - .Times(kNumPacketsInProbe + 1); + // Enqueue two packets, only the first is sent immediately and the next + // will be scheduled for sending in 4ms. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2)); + const int kNotAProbe = PacedPacketInfo::kNotAProbe; + EXPECT_CALL(packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kNotAProbe))); + // Advance to less than 3ms before next packet send time. + time_controller.AdvanceTime(TimeDelta::Micros(1001)); - pacer.EnqueuePackets( - GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe)); - time_controller.AdvanceTime(TimeDelta::Zero()); + // Trigger a probe at 4x the current pacing rate and insert the number of + // packets the probe needs. + const DataRate kProbeRate = 2 * kPacingDataRate; + const int kProbeClusterId = 1; + pacer.CreateProbeCluster(kProbeRate, kProbeClusterId); - // The pacer should have scheduled the next probe to be sent in - // kProbeTimeDelta. That there was existing scheduled call less than - // PacingController::kMinSleepTime before this should not matter. + // Expected size for each probe in a cluster is twice the expected bits + // sent during min_probe_delta. + // Expect one additional call since probe always starts with a small + const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2); + const DataSize kProbeSize = kProbeRate * kProbeTimeDelta; + const size_t kNumPacketsInProbe = + (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize; + EXPECT_CALL(packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kProbeClusterId))) + .Times(kNumPacketsInProbe + 1); - EXPECT_CALL( - packet_router, - SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, - kProbeClusterId))) - .Times(AtLeast(1)); - time_controller.AdvanceTime(TimeDelta::Millis(2)); - } + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe)); + time_controller.AdvanceTime(TimeDelta::Zero()); - TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { - // Set min_probe_delta to be less than kMinSleepTime (1ms). - const TimeDelta kMinProbeDelta = TimeDelta::Micros(100); - ScopedFieldTrials trials( - "WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/"); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); + // The pacer should have scheduled the next probe to be sent in + // kProbeTimeDelta. That there was existing scheduled call less than + // PacingController::kMinSleepTime before this should not matter. - // Set rates so one packet adds 4ms of buffer level. - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); - const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; - pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); - pacer.EnsureStarted(); - EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { - return std::vector>(); - }); - EXPECT_CALL(packet_router, GeneratePadding) - .WillRepeatedly( - [](DataSize target_size) { return GeneratePadding(target_size); }); + EXPECT_CALL(packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kProbeClusterId))) + .Times(AtLeast(1)); + time_controller.AdvanceTime(TimeDelta::Millis(2)); +} - // Set a high probe rate. - const int kProbeClusterId = 1; - DataRate kProbingRate = kPacingDataRate * 10; - pacer.CreateProbeCluster(kProbingRate, kProbeClusterId); +TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { + // Set min_probe_delta to be less than kMinSleepTime (1ms). + const TimeDelta kMinProbeDelta = TimeDelta::Micros(100); + ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/"); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime, kNoPacketHoldback); - // Advance time less than PacingController::kMinSleepTime, probing packets - // for the first millisecond should be sent immediately. Min delta between - // probes is 2x 100us, meaning 4 times per ms we will get least one call to - // SendPacket(). - DataSize data_sent = DataSize::Zero(); - EXPECT_CALL( - packet_router, - SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, - kProbeClusterId))) - .Times(AtLeast(4)) - .WillRepeatedly([&](std::unique_ptr packet, - const PacedPacketInfo&) { - data_sent += - DataSize::Bytes(packet->payload_size() + packet->padding_size()); - }); + // Set rates so one packet adds 4ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); + pacer.EnsureStarted(); + EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { + return std::vector>(); + }); + EXPECT_CALL(packet_router, GeneratePadding) + .WillRepeatedly( + [](DataSize target_size) { return GeneratePadding(target_size); }); - // Add one packet to kickstart probing, the rest will be padding packets. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); - time_controller.AdvanceTime(kMinProbeDelta); + // Set a high probe rate. + const int kProbeClusterId = 1; + DataRate kProbingRate = kPacingDataRate * 10; + pacer.CreateProbeCluster(kProbingRate, kProbeClusterId); - // Verify the amount of probing data sent. - // Probe always starts with a small (1 byte) padding packet that's not - // counted into the probe rate here. - EXPECT_EQ(data_sent, - kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1)); - } + // Advance time less than PacingController::kMinSleepTime, probing packets + // for the first millisecond should be sent immediately. Min delta between + // probes is 2x 100us, meaning 4 times per ms we will get least one call to + // SendPacket(). + DataSize data_sent = DataSize::Zero(); + EXPECT_CALL(packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kProbeClusterId))) + .Times(AtLeast(4)) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo&) { + data_sent += + DataSize::Bytes(packet->payload_size() + packet->padding_size()); + }); - TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kCoalescingWindow); - const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + // Add one packet to kickstart probing, the rest will be padding packets. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(kMinProbeDelta); - const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); + // Verify the amount of probing data sent. + // Probe always starts with a small (1 byte) padding packet that's not + // counted into the probe rate here. + EXPECT_EQ(data_sent, + kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1)); +} - // Nothing inserted, no stats updates yet. - EXPECT_EQ(pacer.num_stats_updates_, 0u); +TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); + const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - // Insert one packet, stats should not be updated. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 0u); + const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); + + // Nothing inserted, no stats updates yet. + EXPECT_EQ(pacer.num_stats_updates_, 0u); + + // Insert one packet, stats should not be updated. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, 0u); + + // Advance time of the min stats update interval, and trigger a + // refresh - stats should not be updated still. + time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); + EXPECT_EQ(pacer.num_stats_updates_, 0u); +} + +TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) { + const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10); + const int kPacketBasedHoldback = 5; + + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kFixedCoalescingWindow, kPacketBasedHoldback); + + // Set rates so one packet adds one ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + const TimeDelta kExpectedHoldbackWindow = + kPacketPacingTime * kPacketBasedHoldback; + // `kFixedCoalescingWindow` sets the upper bound for the window. + ASSERT_GE(kFixedCoalescingWindow, kExpectedHoldbackWindow); + + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { + return std::vector>(); + }); + pacer.EnsureStarted(); + + // Add some packets and wait till all have been sent, so that the pacer + // has a valid estimate of packet size. + const int kNumWarmupPackets = 40; + EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets)); + // Wait until all packes have been sent, with a 2x margin. + time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2)); + + // Enqueue packets. Expect only the first one to be sent immediately. + EXPECT_CALL(packet_router, SendPacket).Times(1); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback)); + time_controller.AdvanceTime(TimeDelta::Zero()); + + // Advance time to 1ms before the coalescing window ends. + EXPECT_CALL(packet_router, SendPacket).Times(0); + time_controller.AdvanceTime(kExpectedHoldbackWindow - TimeDelta::Millis(1)); + + // Advance past where the coalescing window should end. + EXPECT_CALL(packet_router, SendPacket).Times(kPacketBasedHoldback - 1); + time_controller.AdvanceTime(TimeDelta::Millis(1)); +} + +TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) { + const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(2); + const int kPacketBasedHoldback = 5; + + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kFixedCoalescingWindow, kPacketBasedHoldback); + + // Set rates so one packet adds one ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + const TimeDelta kExpectedPacketHoldbackWindow = + kPacketPacingTime * kPacketBasedHoldback; + // |kFixedCoalescingWindow| sets the upper bound for the window. + ASSERT_LT(kFixedCoalescingWindow, kExpectedPacketHoldbackWindow); + + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { + return std::vector>(); + }); + pacer.EnsureStarted(); + + // Add some packets and wait till all have been sent, so that the pacer + // has a valid estimate of packet size. + const int kNumWarmupPackets = 40; + EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets)); + // Wait until all packes have been sent, with a 2x margin. + time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2)); + + // Enqueue packets. Expect onlt the first one to be sent immediately. + EXPECT_CALL(packet_router, SendPacket).Times(1); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback)); + time_controller.AdvanceTime(TimeDelta::Zero()); + + // Advance time to the fixed coalescing window, that should take presedence so + // at least some of the packets should be sent. + EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); + time_controller.AdvanceTime(kFixedCoalescingWindow); +} - // Advance time of the min stats update interval, and trigger a - // refresh - stats should not be updated still. - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); - EXPECT_EQ(pacer.num_stats_updates_, 0u); - } } // namespace test } // namespace webrtc