diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 77f21bedbe..07e265b0da 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -403,7 +403,9 @@ void PacingController::ProcessPackets() { if (target_send_time.IsMinusInfinity()) { target_send_time = now; } else if (now < target_send_time) { - // We are too early, abort and regroup! + // We are too early, but if queue is empty still allow draining some debt. + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); + UpdateBudgetWithElapsedTime(elapsed_time); return; } diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index d058e0371d..e817f1b708 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -185,6 +185,11 @@ TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const { return GetStats().oldest_packet_wait_time; } +void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) { + rtc::CritScope cs(&stats_crit_); + current_stats_ = stats; +} + void TaskQueuePacedSender::MaybeProcessPackets( Timestamp scheduled_process_time) { RTC_DCHECK_RUN_ON(&task_queue_); @@ -232,40 +237,61 @@ void TaskQueuePacedSender::MaybeProcessPackets( void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) { if (is_shutdown_) { + if (is_scheduled_call) { + stats_update_scheduled_ = false; + } return; } Timestamp now = clock_->CurrentTime(); - if (!is_scheduled_call && - now - last_stats_time_ < kMinTimeBetweenStatsUpdates) { - // Too frequent unscheduled stats update, return early. - return; + if (is_scheduled_call) { + // Allow scheduled task to process packets to clear up an remaining debt + // level in an otherwise empty queue. + pacing_controller_.ProcessPackets(); + } else { + if (now - last_stats_time_ < kMinTimeBetweenStatsUpdates) { + // Too frequent unscheduled stats update, return early. + return; + } } - rtc::CritScope cs(&stats_crit_); - current_stats_.expected_queue_time = pacing_controller_.ExpectedQueueTime(); - current_stats_.first_sent_packet_time = - pacing_controller_.FirstSentPacketTime(); - current_stats_.oldest_packet_wait_time = - pacing_controller_.OldestPacketWaitTime(); - current_stats_.queue_size = pacing_controller_.QueueSizeData(); + Stats new_stats; + new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); + new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime(); + new_stats.oldest_packet_wait_time = pacing_controller_.OldestPacketWaitTime(); + new_stats.queue_size = pacing_controller_.QueueSizeData(); + OnStatsUpdated(new_stats); + last_stats_time_ = now; bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 && pacing_controller_.CurrentBufferLevel().IsZero(); // If there's anything interesting to get from the pacer and this is a - // scheduled call (no scheduled call in flight), post a new scheduled stats + // scheduled call (or no scheduled call in flight), post a new scheduled stats // update. - if (!pacer_drained && (is_scheduled_call || !stats_update_scheduled_)) { - task_queue_.PostDelayedTask( - [this]() { - RTC_DCHECK_RUN_ON(&task_queue_); - MaybeUpdateStats(true); - }, - kMaxTimeBetweenStatsUpdates.ms()); - stats_update_scheduled_ = true; - } else { + if (!pacer_drained) { + if (!stats_update_scheduled_) { + // There is no pending delayed task to update stats, add one. + // Treat this call as being scheduled in order to bootstrap scheduling + // loop. + stats_update_scheduled_ = true; + is_scheduled_call = true; + } + + // Only if on the scheduled call loop do we want to schedule a new delayed + // task. + if (is_scheduled_call) { + task_queue_.PostDelayedTask( + [this]() { + RTC_DCHECK_RUN_ON(&task_queue_); + MaybeUpdateStats(true); + }, + kMaxTimeBetweenStatsUpdates.ms()); + } + } else if (is_scheduled_call) { + // This is a scheduled call, signing out since there's nothing interesting + // left to check. stats_update_scheduled_ = false; } } diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 5e6a1770c2..c4ee5466e7 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -104,7 +104,8 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // specified by SetPacingRates() if needed to achieve this goal. void SetQueueTimeLimit(TimeDelta limit) override; - private: + protected: + // Exposed as protected for test. struct Stats { Stats() : oldest_packet_wait_time(TimeDelta::Zero()), @@ -115,7 +116,9 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { TimeDelta expected_queue_time; absl::optional first_sent_packet_time; }; + virtual void OnStatsUpdated(const Stats& stats); + private: // Check if it is time to send packets, or schedule a delayed task if not. // Use Timestamp::MinusInfinity() to indicate that this call has _not_ // been scheduled by the pacing controller. If this is the case, check if diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index 50fceea99a..876cd96cfd 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -48,6 +48,38 @@ class MockPacketRouter : public PacketRouter { (DataSize target_size), (override)); }; + +class StatsUpdateObserver { + public: + StatsUpdateObserver() = default; + virtual ~StatsUpdateObserver() = default; + + virtual void OnStatsUpdated() = 0; +}; + +class TaskQueuePacedSenderForTest : public TaskQueuePacedSender { + public: + TaskQueuePacedSenderForTest( + Clock* clock, + PacketRouter* packet_router, + RtcEventLog* event_log, + const WebRtcKeyValueConfig* field_trials, + TaskQueueFactory* task_queue_factory, + TimeDelta hold_back_window = PacingController::kMinSleepTime) + : TaskQueuePacedSender(clock, + packet_router, + event_log, + field_trials, + task_queue_factory, + hold_back_window) {} + + void OnStatsUpdated(const Stats& stats) override { + ++num_stats_updates_; + TaskQueuePacedSender::OnStatsUpdated(stats); + } + + size_t num_stats_updates_ = 0; +}; } // namespace namespace test { @@ -88,11 +120,11 @@ namespace test { TEST(TaskQueuePacedSenderTest, PacesPackets) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); // Insert a number of packets, covering one second. static constexpr size_t kPacketsToSend = 42; @@ -127,11 +159,11 @@ namespace test { TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); // Insert a number of packets to be sent 200ms apart. const size_t kPacketsPerSecond = 5; @@ -178,11 +210,11 @@ namespace test { TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime); + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -210,11 +242,11 @@ namespace test { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow); + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kCoalescingWindow); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -246,11 +278,11 @@ namespace test { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller.GetTaskQueueFactory(), - kCoalescingWindow); + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kCoalescingWindow); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -273,5 +305,102 @@ namespace test { time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); } + TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kCoalescingWindow); + const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + + const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); + + // Nothing inserted, no stats updates yet. + EXPECT_EQ(pacer.num_stats_updates_, 0u); + + // Insert one packet, stats should be updated. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, 1u); + + // Advance time half of the min stats update interval, and trigger a + // refresh - stats should not be updated yet. + time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); + pacer.EnqueuePackets({}); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, 1u); + + // Advance time the next half, now stats update is triggered. + time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); + pacer.EnqueuePackets({}); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, 2u); + } + + TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + kCoalescingWindow); + + // Set rates so one packet adds 10ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(10); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); + const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33); + + // Nothing inserted, no stats updates yet. + size_t num_expected_stats_updates = 0; + EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); + // Updating pacing rates refreshes stats. + EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); + + // Record time when we insert first packet, this triggers the scheduled + // stats updating. + Clock* const clock = time_controller.GetClock(); + const Timestamp start_time = clock->CurrentTime(); + + while (clock->CurrentTime() - start_time <= + kMaxTimeBetweenStatsUpdates - kPacketPacingTime) { + // Enqueue packet, expect stats update. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); + + // Advance time to halfway through pacing time, expect another stats + // update. + time_controller.AdvanceTime(kPacketPacingTime / 2); + pacer.EnqueuePackets({}); + time_controller.AdvanceTime(TimeDelta::Zero()); + EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); + + // Advance time the rest of the way. + time_controller.AdvanceTime(kPacketPacingTime / 2); + } + + // At this point, the pace queue is drained so there is no more intersting + // update to be made - but there is still as schduled task that should run + // |kMaxTimeBetweenStatsUpdates| after the first update. + time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates - + clock->CurrentTime()); + EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); + + // Advance time a significant time - don't expect any more calls as stats + // updating does not happen when queue is drained. + time_controller.AdvanceTime(TimeDelta::Millis(400)); + EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); + } + } // namespace test } // namespace webrtc