From 72e6cb0b3f548900fd3b548b4b6966e3f5ee854f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Mon, 25 Nov 2019 18:22:09 +0100 Subject: [PATCH] Fixes dynamic mode pacing issues. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This CL fixes a few issues in the (default-disabled) dynamic pacing mode: * Slight update to sleep timing to avoid short spin loops * Removed support for early execution as that lead to time-travel contradictions that were difficult to solve. * Makes sure we schedule a process call when a packet is due to be drained even if the queue is empty, so that padding will start at the correct time. * While paused or empty, sleep relative last send time if we send padding while silent - otherwise just relative to last process time. * If target send time shifts so far back that packet should have been sent prior to the last process, make sure we don't let the buffer level remain. * Update the PacedSender test to _actually_ use dynamic processing when the param says so. Bug: webrtc:10809 Change-Id: Iebfde9769647d2390fd192a40bbe2d5bf1f6cc62 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/160407 Reviewed-by: Ilya Nikolaevskiy Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/master@{#29911} --- modules/pacing/paced_sender.cc | 2 +- modules/pacing/paced_sender_unittest.cc | 35 +++++++-- modules/pacing/pacing_controller.cc | 38 ++++++--- modules/pacing/pacing_controller_unittest.cc | 81 +++++++------------- 4 files changed, 85 insertions(+), 71 deletions(-) diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 0a3d3c0b34..f6c85d4ed3 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -153,7 +153,7 @@ int64_t PacedSender::TimeUntilNextProcess() { TimeDelta sleep_time = std::max(TimeDelta::Zero(), next_send_time - clock_->CurrentTime()); if (process_mode_ == PacingController::ProcessMode::kDynamic) { - return sleep_time.RoundTo(TimeDelta::ms(1)).ms(); + return std::max(sleep_time, PacingController::kMinSleepTime).ms(); } return sleep_time.ms(); } diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index 23f1d6014e..7d1b4cb92b 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -28,16 +28,13 @@ using ::testing::_; using ::testing::Return; using ::testing::SaveArg; +namespace webrtc { namespace { constexpr uint32_t kAudioSsrc = 12345; constexpr uint32_t kVideoSsrc = 234565; constexpr uint32_t kVideoRtxSsrc = 34567; constexpr uint32_t kFlexFecSsrc = 45678; constexpr size_t kDefaultPacketSize = 234; -} // namespace - -namespace webrtc { -namespace test { // Mock callback implementing the raw api. class MockCallback : public PacketRouter { @@ -50,17 +47,41 @@ class MockCallback : public PacketRouter { std::vector>(size_t target_size_bytes)); }; +class ProcessModeTrials : public WebRtcKeyValueConfig { + public: + explicit ProcessModeTrials(bool dynamic_process) : mode_(dynamic_process) {} + + std::string Lookup(absl::string_view key) const override { + if (key == "WebRTC-Pacer-DynamicProcess") { + return mode_ ? "Enabled" : "Disabled"; + } + return ""; + } + + private: + const bool mode_; +}; +} // namespace + +namespace test { + class PacedSenderTest : public ::testing::TestWithParam { public: - PacedSenderTest() : clock_(0), paced_module_(nullptr) {} + PacedSenderTest() + : clock_(0), + paced_module_(nullptr), + trials_(GetParam() == PacingController::ProcessMode::kDynamic) {} void SetUp() override { EXPECT_CALL(process_thread_, RegisterModule) .WillOnce(SaveArg<0>(&paced_module_)); pacer_ = std::make_unique(&clock_, &callback_, nullptr, - nullptr, &process_thread_); + &trials_, &process_thread_); + EXPECT_CALL(process_thread_, WakeUp).WillRepeatedly([&](Module* module) { + clock_.AdvanceTimeMilliseconds(module->TimeUntilNextProcess()); + }); EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1); } @@ -92,6 +113,7 @@ class PacedSenderTest MockCallback callback_; MockProcessThread process_thread_; Module* paced_module_; + ProcessModeTrials trials_; std::unique_ptr pacer_; }; @@ -108,7 +130,6 @@ TEST_P(PacedSenderTest, PacesPackets) { // Expect all of them to be sent. size_t packets_sent = 0; - clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess()); EXPECT_CALL(callback_, SendPacket) .WillRepeatedly( [&](std::unique_ptr packet, diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 985fb5c5ec..8be62090d6 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -276,6 +276,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { if (last_process_time_.IsMinusInfinity()) { return TimeDelta::Zero(); } + RTC_DCHECK_GE(now, last_process_time_); TimeDelta elapsed_time = now - last_process_time_; last_process_time_ = now; if (elapsed_time > kMaxElapsedTime) { @@ -334,9 +335,11 @@ Timestamp PacingController::NextSendTime() const { return last_send_time_ + kCongestedPacketInterval; } - // If there are pending packets, check how long it will take until buffers - // have emptied. - if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { + // Check how long until media buffer has drained. We schedule a call + // for when the last packet in the queue drains as otherwise we may + // be late in starting padding. + if (media_rate_ > DataRate::Zero() && + (!packet_queue_.Empty() || !media_debt_.IsZero())) { return std::min(last_send_time_ + kPausedProcessInterval, last_process_time_ + media_debt_ / media_rate_); } @@ -348,21 +351,38 @@ Timestamp PacingController::NextSendTime() const { last_process_time_ + padding_debt_ / padding_rate_); } - return last_send_time_ + kPausedProcessInterval; + if (send_padding_if_silent_) { + return last_send_time_ + kPausedProcessInterval; + } + return last_process_time_ + kPausedProcessInterval; } void PacingController::ProcessPackets() { Timestamp now = CurrentTime(); - RTC_DCHECK_GE(now, last_process_time_); Timestamp target_send_time = now; if (mode_ == ProcessMode::kDynamic) { target_send_time = NextSendTime(); if (target_send_time.IsMinusInfinity()) { target_send_time = now; - } else if (now + kMinSleepTime < target_send_time) { + } else if (now < target_send_time) { // We are too early, abort and regroup! return; } + + if (target_send_time < last_process_time_) { + // After the last process call, at time X, the target send time + // shifted to be earlier than X. This should normally not happen + // but we want to make sure rounding errors or erratic behavior + // of NextSendTime() does not cause issue. In particular, if the + // buffer reduction of + // rate * (target_send_time - previous_process_time) + // in the main loop doesn't clean up the existing debt we may not + // be able to send again. We don't want to check this reordering + // there as it is the normal exit condtion when the buffer is + // exhausted and there are packets in the queue. + UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time); + target_send_time = last_process_time_; + } } Timestamp previous_process_time = last_process_time_; @@ -585,6 +605,7 @@ std::unique_ptr PacingController::GetPendingPacket( return nullptr; } } else { + // Dynamic processing mode. if (now <= target_send_time) { // We allow sending slightly early if we think that we would actually // had been able to, had we been right on time - i.e. the current debt @@ -593,11 +614,6 @@ std::unique_ptr PacingController::GetPendingPacket( if (now + flush_time > target_send_time) { return nullptr; } - } else { - // In dynamic mode we should never try get a non-probe packet until - // the media debt is actually zero. Since there can be rounding errors, - // allow some discrepancy. - RTC_DCHECK_LE(media_debt_, media_rate_ * kMinSleepTime); } } } diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index 5b5f6e71c1..9337ad2f8a 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -732,33 +732,48 @@ TEST_P(PacingControllerTest, Padding) { EXPECT_LE((actual_pace_time - expected_pace_time).Abs(), PacingController::kMinSleepTime); - // Pacing media happens 2.5x factor, but padding was configured with 1.0x + // Pacing media happens at 2.5x, but padding was configured with 1.0x // factor. We have to wait until the padding debt is gone before we start // sending padding. const TimeDelta time_to_padding_debt_free = (expected_pace_time * kPaceMultiplier) - actual_pace_time; - TimeDelta time_to_next = pacer_->NextSendTime() - clock_.CurrentTime(); - EXPECT_EQ(time_to_next, time_to_padding_debt_free); - clock_.AdvanceTime(time_to_next); + clock_.AdvanceTime(time_to_padding_debt_free - + PacingController::kMinSleepTime); + pacer_->ProcessPackets(); // Send 10 padding packets. const size_t kPaddingPacketsToSend = 10; DataSize padding_sent = DataSize::Zero(); + size_t packets_sent = 0; + Timestamp first_send_time = Timestamp::MinusInfinity(); + Timestamp last_send_time = Timestamp::MinusInfinity(); + EXPECT_CALL(callback_, SendPadding) .Times(kPaddingPacketsToSend) .WillRepeatedly([&](size_t target_size) { - padding_sent += DataSize::bytes(target_size); + ++packets_sent; + if (packets_sent < kPaddingPacketsToSend) { + // Don't count bytes of last packet, instead just + // use this as the time the last packet finished + // sending. + padding_sent += DataSize::bytes(target_size); + } + if (first_send_time.IsInfinite()) { + first_send_time = clock_.CurrentTime(); + } else { + last_send_time = clock_.CurrentTime(); + } return target_size; }); EXPECT_CALL(callback_, SendPacket(_, _, _, false, true)) .Times(kPaddingPacketsToSend); - const Timestamp padding_start_time = clock_.CurrentTime(); - for (size_t i = 0; i < kPaddingPacketsToSend; ++i) { + + while (packets_sent < kPaddingPacketsToSend) { AdvanceTimeAndProcess(); } // Verify rate of sent padding. - TimeDelta padding_duration = pacer_->NextSendTime() - padding_start_time; + TimeDelta padding_duration = last_send_time - first_send_time; DataRate padding_rate = padding_sent / padding_duration; EXPECT_EQ(padding_rate, kTargetRate); } @@ -781,15 +796,18 @@ TEST_P(PacingControllerTest, NoPaddingBeforeNormalPacket) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, capture_time_ms, 250); - EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) { + bool padding_sent = false; + EXPECT_CALL(callback_, SendPadding).WillOnce([&](size_t padding) { + padding_sent = true; return padding; }); EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); if (PeriodicProcess()) { pacer_->ProcessPackets(); } else { - AdvanceTimeAndProcess(); // Media. - AdvanceTimeAndProcess(); // Padding. + while (!padding_sent) { + AdvanceTimeAndProcess(); + } } } @@ -1677,47 +1695,6 @@ TEST_P(PacingControllerTest, SmallFirstProbePacket) { } } -TEST_P(PacingControllerTest, TaskEarly) { - if (PeriodicProcess()) { - // This test applies only when NOT using interval budget. - return; - } - - // Set a low send rate to more easily test timing issues. - DataRate kSendRate = DataRate::kbps(30); - pacer_->SetPacingRates(kSendRate, DataRate::Zero()); - - // Add two packets. - pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); - pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); - - // Process packets, only first should be sent. - EXPECT_CALL(callback_, SendPacket).Times(1); - pacer_->ProcessPackets(); - - Timestamp next_send_time = pacer_->NextSendTime(); - - // Packets won't be sent if we try process more than one sleep time early. - ASSERT_GT(next_send_time - clock_.CurrentTime(), - PacingController::kMinSleepTime); - clock_.AdvanceTime(next_send_time - clock_.CurrentTime() - - (PacingController::kMinSleepTime + TimeDelta::ms(1))); - - EXPECT_CALL(callback_, SendPacket).Times(0); - pacer_->ProcessPackets(); - - // Assume timing is accurate within +-100us due to rounding. - const TimeDelta kErrorMargin = TimeDelta::us(100); - - // Check that next scheduled send time is still the same (within margin). - EXPECT_LT((pacer_->NextSendTime() - next_send_time).Abs(), kErrorMargin); - - // Advance to within error margin for execution. - clock_.AdvanceTime(TimeDelta::ms(1) + kErrorMargin); - EXPECT_CALL(callback_, SendPacket).Times(1); - pacer_->ProcessPackets(); -} - TEST_P(PacingControllerTest, TaskLate) { if (PeriodicProcess()) { // This test applies only when NOT using interval budget.