From be152f5f9e9106bb3f33be8cb794761ed76c1297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Mon, 6 Apr 2020 16:30:23 +0200 Subject: [PATCH] Optimizes thread usage with task queue pacer. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TaskQueuePacedSender today has some inefficiencies: * Enqueuing a packet will trigger a MaybeProcessPackets() call, but it won't actually run immediately even if it should - instead it will schedule a new call in at least 1ms. This incurs delays and extra CPU overhead. * Sometimes thread wakeups are scheduled simply in order to do book-keeping: ProcessPackets() will be called when the media debt has gone down to 0 even if there is no packet in the queue, in order to check if we should send padding. This CL fixes that by called ProcessPackets() immediately if it is actually time to do so, and by immediately determining when padding should be sent without having a separate call to drain media debt. Bug: webrtc:10809 Change-Id: I4870e86e6de2ce4197463fd5b788ad4717fc7177 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/172842 Reviewed-by: Ilya Nikolaevskiy Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/master@{#31010} --- modules/pacing/pacing_controller.cc | 16 ++--- modules/pacing/pacing_controller_unittest.cc | 62 +++++++++++++++++++ modules/pacing/task_queue_paced_sender.cc | 17 +++-- .../task_queue_paced_sender_unittest.cc | 23 +++++++ 4 files changed, 104 insertions(+), 14 deletions(-) diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 1dde8d29d4..14feacf3b8 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -285,7 +285,7 @@ void PacingController::EnqueuePacketInternal( } if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() && - media_debt_.IsZero()) { + NextSendTime() <= now) { TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); UpdateBudgetWithElapsedTime(elapsed_time); } @@ -360,20 +360,20 @@ Timestamp PacingController::NextSendTime() const { return last_send_time_ + kCongestedPacketInterval; } - // Check how long until media buffer has drained. We schedule a call - // for when the last packet in the queue drains as otherwise we may - // be late in starting padding. - if (media_rate_ > DataRate::Zero() && - (!packet_queue_.Empty() || !media_debt_.IsZero())) { + // Check how long until we can send the next media packet. + if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { return std::min(last_send_time_ + kPausedProcessInterval, last_process_time_ + media_debt_ / media_rate_); } // If we _don't_ have pending packets, check how long until we have - // bandwidth for padding packets. + // bandwidth for padding packets. Both media and padding debts must + // have been drained to do this. if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { + TimeDelta drain_time = + std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_); return std::min(last_send_time_ + kPausedProcessInterval, - last_process_time_ + padding_debt_ / padding_rate_); + last_process_time_ + drain_time); } if (send_padding_if_silent_) { diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index fb56c98051..3226c02d8a 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -1921,6 +1921,68 @@ TEST_P(PacingControllerTest, AccountsForAudioEnqueuTime) { EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime); } +TEST_P(PacingControllerTest, NextSendTimeAccountsForPadding) { + if (PeriodicProcess()) { + // This test applies only when NOT using interval budget. + return; + } + + const uint32_t kSsrc = 12345; + const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); + const DataSize kPacketSize = DataSize::Bytes(130); + const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate; + + uint32_t sequnce_number = 1; + + // Start with no padding. + pacer_->SetPacingRates(kPacingDataRate, DataRate::Zero()); + + // Send a single packet. + SendAndExpectPacket(RtpPacketMediaType::kVideo, kSsrc, sequnce_number++, + clock_.TimeInMilliseconds(), kPacketSize.bytes()); + pacer_->ProcessPackets(); + ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // With current conditions, no need to wake until next keep-alive. + EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), + PacingController::kPausedProcessInterval); + + // Enqueue a new packet, that can't be sent until previous buffer has + // drained. + SendAndExpectPacket(RtpPacketMediaType::kVideo, kSsrc, sequnce_number++, + clock_.TimeInMilliseconds(), kPacketSize.bytes()); + EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime); + clock_.AdvanceTime(kPacketPacingTime); + pacer_->ProcessPackets(); + ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // With current conditions, again no need to wake until next keep-alive. + EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), + PacingController::kPausedProcessInterval); + + // Set a non-zero padding rate. Padding also can't be sent until + // previous debt has cleared. Since padding was disabled before, there + // currently is no padding debt. + pacer_->SetPacingRates(kPacingDataRate, kPacingDataRate / 2); + EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime); + + // Advance time, expect padding. + EXPECT_CALL(callback_, SendPadding).WillOnce(Return(kPacketSize.bytes())); + clock_.AdvanceTime(kPacketPacingTime); + pacer_->ProcessPackets(); + ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // Since padding rate is half of pacing rate, next time we can send + // padding is double the packet pacing time. + EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), + kPacketPacingTime * 2); + + // Insert a packet to be sent, this take precedence again. + Send(RtpPacketMediaType::kVideo, kSsrc, sequnce_number++, + clock_.TimeInMilliseconds(), kPacketSize.bytes()); + EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime); +} + INSTANTIATE_TEST_SUITE_P( WithAndWithoutIntervalBudget, PacingControllerTest, diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index c4eac821e6..778d79f843 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -182,19 +182,24 @@ void TaskQueuePacedSender::MaybeProcessPackets( return; } + // Normally, run ProcessPackets() only if this is the scheduled task. + // If it is not but it is already time to process and there either is + // no scheduled task or the schedule has shifted forward in time, run + // anyway and clear any schedule. + Timestamp next_process_time = pacing_controller_.NextSendTime(); const Timestamp now = clock_->CurrentTime(); - // Run ProcessPackets() only if this is the schedules task, or if there is - // no scheduled task and we need to process immediately. if ((scheduled_process_time.IsFinite() && scheduled_process_time == next_process_time_) || - (next_process_time_.IsInfinite() && - pacing_controller_.NextSendTime() <= now)) { + (now >= next_process_time && (next_process_time_.IsInfinite() || + next_process_time < next_process_time_))) { pacing_controller_.ProcessPackets(); next_process_time_ = Timestamp::MinusInfinity(); + next_process_time = pacing_controller_.NextSendTime(); } - Timestamp next_process_time = std::max(now + PacingController::kMinSleepTime, - pacing_controller_.NextSendTime()); + next_process_time = + std::max(now + PacingController::kMinSleepTime, next_process_time); + TimeDelta sleep_time = next_process_time - now; if (next_process_time_.IsMinusInfinity() || next_process_time <= diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index b7e6483c1c..ba2aad21ff 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -173,5 +173,28 @@ TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { 1.0); } +TEST_F(TaskQueuePacedSenderTest, SendsAudioImmediately) { + const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate; + + pacer_.SetPacingRates(kPacingDataRate, DataRate::Zero()); + + // Add some initial video packets, only one should be sent. + EXPECT_CALL(packet_router_, SendPacket); + pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + time_controller_.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router_); + + // Advance time, but still before next packet should be sent. + time_controller_.AdvanceTime(kPacketPacingTime / 2); + + // Insert an audio packet, it should be sent immediately. + EXPECT_CALL(packet_router_, SendPacket); + pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); + time_controller_.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router_); +} + } // namespace test } // namespace webrtc