From 94457795458e9babc245ded2c2bc15e92afb7132 Mon Sep 17 00:00:00 2001 From: Jianhui Dai Date: Tue, 7 Dec 2021 19:34:36 +0800 Subject: [PATCH] TaskQueuePacedSender: Remove pacer status update scheduler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pacer status is never changed unless MaybeProcessPackets() is called. This CL removes the scheduler, and updates pacer status after every MaybeProcessPackets(). Bug: webrtc:10809, webrtc:13417 Change-Id: Ib5f18decf44c1596c0a716d799600a72b2332abd Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/239120 Reviewed-by: Erik Språng Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/main@{#35489} --- modules/pacing/paced_sender.cc | 10 +- modules/pacing/pacing_controller.cc | 9 +- modules/pacing/pacing_controller.h | 14 +- modules/pacing/pacing_controller_unittest.cc | 18 +- modules/pacing/task_queue_paced_sender.cc | 84 ++---- modules/pacing/task_queue_paced_sender.h | 22 +- .../task_queue_paced_sender_unittest.cc | 282 ++++++------------ 7 files changed, 146 insertions(+), 293 deletions(-) diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 51d3edc301..acc492db92 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -163,7 +163,15 @@ absl::optional PacedSender::FirstSentPacketTime() const { TimeDelta PacedSender::OldestPacketWaitTime() const { MutexLock lock(&mutex_); - return pacing_controller_.OldestPacketWaitTime(); + Timestamp oldest_packet = pacing_controller_.OldestPacketEnqueueTime(); + if (oldest_packet.IsInfinite()) + return TimeDelta::Zero(); + + // (webrtc:9716): The clock is not always monotonic. + Timestamp current = clock_->CurrentTime(); + if (current < oldest_packet) + return TimeDelta::Zero(); + return current - oldest_packet; } int64_t PacedSender::TimeUntilNextProcess() { diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 896daa06a5..548540a208 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -281,13 +281,8 @@ absl::optional PacingController::FirstSentPacketTime() const { return first_sent_packet_time_; } -TimeDelta PacingController::OldestPacketWaitTime() const { - Timestamp oldest_packet = packet_queue_.OldestEnqueueTime(); - if (oldest_packet.IsInfinite()) { - return TimeDelta::Zero(); - } - - return CurrentTime() - oldest_packet; +Timestamp PacingController::OldestPacketEnqueueTime() const { + return packet_queue_.OldestEnqueueTime(); } void PacingController::EnqueuePacketInternal( diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index f8f31a8a10..5d6d26b917 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -41,7 +41,6 @@ namespace webrtc { // the processing is done externally (e.g. RtpPacketPacer). Furthermore, the // forwarding of packets when they are ready to be sent is also handled // externally, via the PacingController::PacketSender interface. -// class PacingController { public: // Periodic mode uses the IntervalBudget class for tracking bitrate @@ -114,8 +113,8 @@ class PacingController { void SetTransportOverhead(DataSize overhead_per_packet); - // Returns the time since the oldest queued packet was enqueued. - TimeDelta OldestPacketWaitTime() const; + // Returns the time when the oldest packet was queued. + Timestamp OldestPacketEnqueueTime() const; // Number of packets in the pacer queue. size_t QueueSizePackets() const; @@ -125,7 +124,7 @@ class PacingController { // Current buffer level, i.e. max of media and padding debt. DataSize CurrentBufferLevel() const; - // Returns the time when the first packet was sent; + // Returns the time when the first packet was sent. absl::optional FirstSentPacketTime() const; // Returns the number of milliseconds it will take to send the current @@ -197,9 +196,10 @@ class PacingController { mutable Timestamp last_timestamp_; bool paused_; - // If `use_interval_budget_` is true, `media_budget_` and `padding_budget_` - // will be used to track when packets can be sent. Otherwise the media and - // padding debt counters will be used together with the target rates. + // In dynamic mode, `media_budget_` and `padding_budget_` will be used to + // track when packets can be sent. + // In periodic mode, `media_debt_` and `padding_debt_` will be used together + // with the target rates. // This is the media budget, keeping track of how many bits of media // we can pace out during the current interval. diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index d1f751ea8a..e7634cd8d5 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -296,7 +296,7 @@ class PacingControllerTest int64_t capture_time_ms = clock_.TimeInMilliseconds(); const size_t kPacketSize = 250; - EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite()); // Due to the multiplicative factor we can send 5 packets during a send // interval. (network capacity * multiplier / (8 bits per byte * @@ -1193,7 +1193,7 @@ TEST_P(PacingControllerTest, Pause) { uint32_t ssrc_high_priority = 12347; uint16_t sequence_number = 1234; - EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite()); ConsumeInitialBudget(); @@ -1222,8 +1222,7 @@ TEST_P(PacingControllerTest, Pause) { } // Expect everything to be queued. - EXPECT_EQ(TimeDelta::Millis(second_capture_time_ms - capture_time_ms), - pacer_->OldestPacketWaitTime()); + EXPECT_EQ(capture_time_ms, pacer_->OldestPacketEnqueueTime().ms()); // Process triggers keep-alive packet. EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) { @@ -1299,7 +1298,7 @@ TEST_P(PacingControllerTest, Pause) { } } - EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite()); } TEST_P(PacingControllerTest, InactiveFromStart) { @@ -1349,7 +1348,7 @@ TEST_P(PacingControllerTest, ExpectedQueueTimeMs) { const size_t kNumPackets = 60; const size_t kPacketSize = 1200; const int32_t kMaxBitrate = kPaceMultiplier * 30000; - EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite()); pacer_->SetPacingRates(DataRate::BitsPerSec(30000 * kPaceMultiplier), DataRate::Zero()); @@ -1382,7 +1381,7 @@ TEST_P(PacingControllerTest, ExpectedQueueTimeMs) { TEST_P(PacingControllerTest, QueueTimeGrowsOverTime) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; - EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite()); pacer_->SetPacingRates(DataRate::BitsPerSec(30000 * kPaceMultiplier), DataRate::Zero()); @@ -1390,9 +1389,10 @@ TEST_P(PacingControllerTest, QueueTimeGrowsOverTime) { clock_.TimeInMilliseconds(), 1200); clock_.AdvanceTimeMilliseconds(500); - EXPECT_EQ(TimeDelta::Millis(500), pacer_->OldestPacketWaitTime()); + EXPECT_EQ(clock_.TimeInMilliseconds() - 500, + pacer_->OldestPacketEnqueueTime().ms()); pacer_->ProcessPackets(); - EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite()); } TEST_P(PacingControllerTest, ProbingWithInsertedPackets) { diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index 41ae683ff4..16c2de58c6 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -20,15 +20,6 @@ #include "rtc_base/trace_event.h" namespace webrtc { -namespace { -// If no calls to MaybeProcessPackets() happen, make sure we update stats -// at least every `kMaxTimeBetweenStatsUpdates` as long as the pacer isn't -// completely drained. -constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33); -// Don't call UpdateStats() more than `kMinTimeBetweenStatsUpdates` apart, -// for performance reasons. -constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); -} // namespace TaskQueuePacedSender::TaskQueuePacedSender( Clock* clock, @@ -47,8 +38,7 @@ TaskQueuePacedSender::TaskQueuePacedSender( field_trials, PacingController::ProcessMode::kDynamic), next_process_time_(Timestamp::MinusInfinity()), - stats_update_scheduled_(false), - last_stats_time_(Timestamp::MinusInfinity()), + is_started_(false), is_shutdown_(false), packet_size_(/*alpha=*/0.95), task_queue_(task_queue_factory->CreateTaskQueue( @@ -162,6 +152,7 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) { task_queue_.PostTask([this, account_for_audio]() { RTC_DCHECK_RUN_ON(&task_queue_); pacing_controller_.SetAccountForAudioPackets(account_for_audio); + MaybeProcessPackets(Timestamp::MinusInfinity()); }); } @@ -169,6 +160,7 @@ void TaskQueuePacedSender::SetIncludeOverhead() { task_queue_.PostTask([this]() { RTC_DCHECK_RUN_ON(&task_queue_); pacing_controller_.SetIncludeOverhead(); + MaybeProcessPackets(Timestamp::MinusInfinity()); }); } @@ -176,6 +168,7 @@ void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) { task_queue_.PostTask([this, overhead_per_packet]() { RTC_DCHECK_RUN_ON(&task_queue_); pacing_controller_.SetTransportOverhead(overhead_per_packet); + MaybeProcessPackets(Timestamp::MinusInfinity()); }); } @@ -200,7 +193,15 @@ absl::optional TaskQueuePacedSender::FirstSentPacketTime() const { } TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const { - return GetStats().oldest_packet_wait_time; + Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time; + if (oldest_packet.IsInfinite()) + return TimeDelta::Zero(); + + // (webrtc:9716): The clock is not always monotonic. + Timestamp current = clock_->CurrentTime(); + if (current < oldest_packet) + return TimeDelta::Zero(); + return current - oldest_packet; } void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) { @@ -275,68 +276,17 @@ void TaskQueuePacedSender::MaybeProcessPackets( time_to_next_process->ms()); } - MaybeUpdateStats(false); + UpdateStats(); } -void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) { - if (is_shutdown_) { - if (is_scheduled_call) { - stats_update_scheduled_ = false; - } - return; - } - - Timestamp now = clock_->CurrentTime(); - if (is_scheduled_call) { - // Allow scheduled task to process packets to clear up an remaining debt - // level in an otherwise empty queue. - pacing_controller_.ProcessPackets(); - } else { - if (now - last_stats_time_ < kMinTimeBetweenStatsUpdates) { - // Too frequent unscheduled stats update, return early. - return; - } - } - +void TaskQueuePacedSender::UpdateStats() { Stats new_stats; new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime(); - new_stats.oldest_packet_wait_time = pacing_controller_.OldestPacketWaitTime(); + new_stats.oldest_packet_enqueue_time = + pacing_controller_.OldestPacketEnqueueTime(); new_stats.queue_size = pacing_controller_.QueueSizeData(); OnStatsUpdated(new_stats); - - last_stats_time_ = now; - - bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 && - pacing_controller_.CurrentBufferLevel().IsZero(); - - // If there's anything interesting to get from the pacer and this is a - // scheduled call (or no scheduled call in flight), post a new scheduled stats - // update. - if (!pacer_drained) { - if (!stats_update_scheduled_) { - // There is no pending delayed task to update stats, add one. - // Treat this call as being scheduled in order to bootstrap scheduling - // loop. - stats_update_scheduled_ = true; - is_scheduled_call = true; - } - - // Only if on the scheduled call loop do we want to schedule a new delayed - // task. - if (is_scheduled_call) { - task_queue_.PostDelayedTask( - [this]() { - RTC_DCHECK_RUN_ON(&task_queue_); - MaybeUpdateStats(true); - }, - kMaxTimeBetweenStatsUpdates.ms()); - } - } else if (is_scheduled_call) { - // This is a scheduled call, signing out since there's nothing interesting - // left to check. - stats_update_scheduled_ = false; - } } TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const { diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index ebe9846075..cb7ca4def8 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -66,7 +66,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { void EnqueuePackets( std::vector> packets) override; - // Methods implementing RtpPacketPacer: + // Methods implementing RtpPacketPacer. void CreateProbeCluster(DataRate bitrate, int cluster_id) override; @@ -112,15 +112,15 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // Exposed as protected for test. struct Stats { Stats() - : oldest_packet_wait_time(TimeDelta::Zero()), + : oldest_packet_enqueue_time(Timestamp::MinusInfinity()), queue_size(DataSize::Zero()), expected_queue_time(TimeDelta::Zero()) {} - TimeDelta oldest_packet_wait_time; + Timestamp oldest_packet_enqueue_time; DataSize queue_size; TimeDelta expected_queue_time; absl::optional first_sent_packet_time; }; - virtual void OnStatsUpdated(const Stats& stats); + void OnStatsUpdated(const Stats& stats); private: // Check if it is time to send packets, or schedule a delayed task if not. @@ -130,7 +130,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // method again with desired (finite) scheduled process time. void MaybeProcessPackets(Timestamp scheduled_process_time); - void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_); + void UpdateStats() RTC_RUN_ON(task_queue_); Stats GetStats() const; Clock* const clock_; @@ -146,19 +146,9 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // Timestamp::MinusInfinity() indicates no valid pending task. Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_); - // Since we don't want to support synchronous calls that wait for a - // task execution, we poll the stats at some interval and update - // `current_stats_`, which can in turn be polled at any time. - - // True iff there is delayed task in flight that that will call - // UdpateStats(). - bool stats_update_scheduled_ RTC_GUARDED_BY(task_queue_); - // Last time stats were updated. - Timestamp last_stats_time_ RTC_GUARDED_BY(task_queue_); - // Indicates if this task queue is started. If not, don't allow // posting delayed tasks yet. - bool is_started_ RTC_GUARDED_BY(task_queue_) = false; + bool is_started_ RTC_GUARDED_BY(task_queue_); // Indicates if this task queue is shutting down. If so, don't allow // posting any more delayed tasks as that can cause the task queue to diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index b921331dd1..d78365d499 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -56,39 +56,6 @@ class MockPacketRouter : public PacketRouter { (override)); }; -class StatsUpdateObserver { - public: - StatsUpdateObserver() = default; - virtual ~StatsUpdateObserver() = default; - - virtual void OnStatsUpdated() = 0; -}; - -class TaskQueuePacedSenderForTest : public TaskQueuePacedSender { - public: - TaskQueuePacedSenderForTest(Clock* clock, - PacketRouter* packet_router, - RtcEventLog* event_log, - const WebRtcKeyValueConfig* field_trials, - TaskQueueFactory* task_queue_factory, - TimeDelta hold_back_window, - int max_hold_back_window_in_packets) - : TaskQueuePacedSender(clock, - packet_router, - event_log, - field_trials, - task_queue_factory, - hold_back_window, - max_hold_back_window_in_packets) {} - - void OnStatsUpdated(const Stats& stats) override { - ++num_stats_updates_; - TaskQueuePacedSender::OnStatsUpdated(stats); - } - - size_t num_stats_updates_ = 0; -}; - std::vector> GeneratePadding( DataSize target_size) { // 224 bytes is the max padding size for plain padding packets generated by @@ -149,7 +116,7 @@ std::vector> GeneratePackets( TEST(TaskQueuePacedSenderTest, PacesPackets) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( + TaskQueuePacedSender pacer( time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), @@ -189,7 +156,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) { TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( + TaskQueuePacedSender pacer( time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), @@ -241,7 +208,7 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( + TaskQueuePacedSender pacer( time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), @@ -274,11 +241,11 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow, kNoPacketHoldback); + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -311,11 +278,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow, kNoPacketHoldback); + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -339,110 +306,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); } -TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow, kNoPacketHoldback); - const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - pacer.EnsureStarted(); - - const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); - - // Nothing inserted, no stats updates yet. - EXPECT_EQ(pacer.num_stats_updates_, 0u); - - // Insert one packet, stats should be updated. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 1u); - - // Advance time half of the min stats update interval, and trigger a - // refresh - stats should not be updated yet. - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); - pacer.EnqueuePackets({}); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 1u); - - // Advance time the next half, now stats update is triggered. - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); - pacer.EnqueuePackets({}); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 2u); -} - -TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow, kNoPacketHoldback); - - // Set rates so one packet adds 10ms of buffer level. - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = TimeDelta::Millis(10); - const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; - const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); - const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33); - - // Nothing inserted, no stats updates yet. - size_t num_expected_stats_updates = 0; - EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - pacer.EnsureStarted(); - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); - // Updating pacing rates refreshes stats. - EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - - // Record time when we insert first packet, this triggers the scheduled - // stats updating. - Clock* const clock = time_controller.GetClock(); - const Timestamp start_time = clock->CurrentTime(); - - while (clock->CurrentTime() - start_time <= - kMaxTimeBetweenStatsUpdates - kPacketPacingTime) { - // Enqueue packet, expect stats update. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - - // Advance time to halfway through pacing time, expect another stats - // update. - time_controller.AdvanceTime(kPacketPacingTime / 2); - pacer.EnqueuePackets({}); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - - // Advance time the rest of the way. - time_controller.AdvanceTime(kPacketPacingTime / 2); - } - - // At this point, the pace queue is drained so there is no more intersting - // update to be made - but there is still as schduled task that should run - // `kMaxTimeBetweenStatsUpdates` after the first update. - time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates - - clock->CurrentTime()); - EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); - - // Advance time a significant time - don't expect any more calls as stats - // updating does not happen when queue is drained. - time_controller.AdvanceTime(TimeDelta::Millis(400)); - EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); -} - TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( + TaskQueuePacedSender pacer( time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), @@ -496,7 +364,6 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { // The pacer should have scheduled the next probe to be sent in // kProbeTimeDelta. That there was existing scheduled call less than // PacingController::kMinSleepTime before this should not matter. - EXPECT_CALL(packet_router, SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, kProbeClusterId))) @@ -510,7 +377,7 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/"); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( + TaskQueuePacedSender pacer( time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), @@ -560,45 +427,17 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1)); } -TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) { - const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); - GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); - MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow, kNoPacketHoldback); - const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); - pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - - const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); - - // Nothing inserted, no stats updates yet. - EXPECT_EQ(pacer.num_stats_updates_, 0u); - - // Insert one packet, stats should not be updated. - pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); - time_controller.AdvanceTime(TimeDelta::Zero()); - EXPECT_EQ(pacer.num_stats_updates_, 0u); - - // Advance time of the min stats update interval, and trigger a - // refresh - stats should not be updated still. - time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); - EXPECT_EQ(pacer.num_stats_updates_, 0u); -} - TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) { const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10); const int kPacketBasedHoldback = 5; GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kFixedCoalescingWindow, kPacketBasedHoldback); + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kFixedCoalescingWindow, kPacketBasedHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -645,11 +484,11 @@ TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSenderForTest pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kFixedCoalescingWindow, kPacketBasedHoldback); + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kFixedCoalescingWindow, kPacketBasedHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -687,5 +526,76 @@ TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) { time_controller.AdvanceTime(kFixedCoalescingWindow); } +TEST(TaskQueuePacedSenderTest, Stats) { + static constexpr Timestamp kStartTime = Timestamp::Millis(1234); + GlobalSimulatedTimeController time_controller(kStartTime); + MockPacketRouter packet_router; + TaskQueuePacedSender pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime, kNoPacketHoldback); + + // Simulate ~2mbps video stream, covering one second. + static constexpr size_t kPacketsToSend = 200; + static constexpr DataRate kPacingRate = + DataRate::BytesPerSec(kDefaultPacketSize * kPacketsToSend); + pacer.SetPacingRates(kPacingRate, DataRate::Zero()); + pacer.EnsureStarted(); + + // Allowed `QueueSizeData` and `ExpectedQueueTime` deviation. + static constexpr size_t kAllowedPacketsDeviation = 1; + static constexpr DataSize kAllowedQueueSizeDeviation = + DataSize::Bytes(kDefaultPacketSize * kAllowedPacketsDeviation); + static constexpr TimeDelta kAllowedQueueTimeDeviation = + kAllowedQueueSizeDeviation / kPacingRate; + + DataSize expected_queue_size = DataSize::MinusInfinity(); + TimeDelta expected_queue_time = TimeDelta::MinusInfinity(); + + EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend); + + // Stats before insert any packets. + EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero()); + EXPECT_FALSE(pacer.FirstSentPacketTime().has_value()); + EXPECT_TRUE(pacer.QueueSizeData().IsZero()); + EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero()); + + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); + + // Advance to 200ms. + time_controller.AdvanceTime(TimeDelta::Millis(200)); + EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(200)); + EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); + + expected_queue_size = kPacingRate * TimeDelta::Millis(800); + expected_queue_time = expected_queue_size / kPacingRate; + EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(), + kAllowedQueueSizeDeviation.bytes()); + EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(), + kAllowedQueueTimeDeviation.ms()); + + // Advance to 500ms. + time_controller.AdvanceTime(TimeDelta::Millis(300)); + EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(500)); + EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); + + expected_queue_size = kPacingRate * TimeDelta::Millis(500); + expected_queue_time = expected_queue_size / kPacingRate; + EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(), + kAllowedQueueSizeDeviation.bytes()); + EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(), + kAllowedQueueTimeDeviation.ms()); + + // Advance to 1000ms+, expect all packets to be sent. + time_controller.AdvanceTime(TimeDelta::Millis(500) + + kAllowedQueueTimeDeviation); + EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero()); + EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); + EXPECT_TRUE(pacer.QueueSizeData().IsZero()); + EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero()); +} + } // namespace test } // namespace webrtc