diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index f6c85d4ed3..0a3d3c0b34 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -153,7 +153,7 @@ int64_t PacedSender::TimeUntilNextProcess() { TimeDelta sleep_time = std::max(TimeDelta::Zero(), next_send_time - clock_->CurrentTime()); if (process_mode_ == PacingController::ProcessMode::kDynamic) { - return std::max(sleep_time, PacingController::kMinSleepTime).ms(); + return sleep_time.RoundTo(TimeDelta::ms(1)).ms(); } return sleep_time.ms(); } diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index 7d1b4cb92b..23f1d6014e 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -28,13 +28,16 @@ using ::testing::_; using ::testing::Return; using ::testing::SaveArg; -namespace webrtc { namespace { constexpr uint32_t kAudioSsrc = 12345; constexpr uint32_t kVideoSsrc = 234565; constexpr uint32_t kVideoRtxSsrc = 34567; constexpr uint32_t kFlexFecSsrc = 45678; constexpr size_t kDefaultPacketSize = 234; +} // namespace + +namespace webrtc { +namespace test { // Mock callback implementing the raw api. class MockCallback : public PacketRouter { @@ -47,41 +50,17 @@ class MockCallback : public PacketRouter { std::vector>(size_t target_size_bytes)); }; -class ProcessModeTrials : public WebRtcKeyValueConfig { - public: - explicit ProcessModeTrials(bool dynamic_process) : mode_(dynamic_process) {} - - std::string Lookup(absl::string_view key) const override { - if (key == "WebRTC-Pacer-DynamicProcess") { - return mode_ ? "Enabled" : "Disabled"; - } - return ""; - } - - private: - const bool mode_; -}; -} // namespace - -namespace test { - class PacedSenderTest : public ::testing::TestWithParam { public: - PacedSenderTest() - : clock_(0), - paced_module_(nullptr), - trials_(GetParam() == PacingController::ProcessMode::kDynamic) {} + PacedSenderTest() : clock_(0), paced_module_(nullptr) {} void SetUp() override { EXPECT_CALL(process_thread_, RegisterModule) .WillOnce(SaveArg<0>(&paced_module_)); pacer_ = std::make_unique(&clock_, &callback_, nullptr, - &trials_, &process_thread_); - EXPECT_CALL(process_thread_, WakeUp).WillRepeatedly([&](Module* module) { - clock_.AdvanceTimeMilliseconds(module->TimeUntilNextProcess()); - }); + nullptr, &process_thread_); EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1); } @@ -113,7 +92,6 @@ class PacedSenderTest MockCallback callback_; MockProcessThread process_thread_; Module* paced_module_; - ProcessModeTrials trials_; std::unique_ptr pacer_; }; @@ -130,6 +108,7 @@ TEST_P(PacedSenderTest, PacesPackets) { // Expect all of them to be sent. size_t packets_sent = 0; + clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess()); EXPECT_CALL(callback_, SendPacket) .WillRepeatedly( [&](std::unique_ptr packet, diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 8be62090d6..985fb5c5ec 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -276,7 +276,6 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { if (last_process_time_.IsMinusInfinity()) { return TimeDelta::Zero(); } - RTC_DCHECK_GE(now, last_process_time_); TimeDelta elapsed_time = now - last_process_time_; last_process_time_ = now; if (elapsed_time > kMaxElapsedTime) { @@ -335,11 +334,9 @@ Timestamp PacingController::NextSendTime() const { return last_send_time_ + kCongestedPacketInterval; } - // Check how long until media buffer has drained. We schedule a call - // for when the last packet in the queue drains as otherwise we may - // be late in starting padding. - if (media_rate_ > DataRate::Zero() && - (!packet_queue_.Empty() || !media_debt_.IsZero())) { + // If there are pending packets, check how long it will take until buffers + // have emptied. + if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { return std::min(last_send_time_ + kPausedProcessInterval, last_process_time_ + media_debt_ / media_rate_); } @@ -351,38 +348,21 @@ Timestamp PacingController::NextSendTime() const { last_process_time_ + padding_debt_ / padding_rate_); } - if (send_padding_if_silent_) { - return last_send_time_ + kPausedProcessInterval; - } - return last_process_time_ + kPausedProcessInterval; + return last_send_time_ + kPausedProcessInterval; } void PacingController::ProcessPackets() { Timestamp now = CurrentTime(); + RTC_DCHECK_GE(now, last_process_time_); Timestamp target_send_time = now; if (mode_ == ProcessMode::kDynamic) { target_send_time = NextSendTime(); if (target_send_time.IsMinusInfinity()) { target_send_time = now; - } else if (now < target_send_time) { + } else if (now + kMinSleepTime < target_send_time) { // We are too early, abort and regroup! return; } - - if (target_send_time < last_process_time_) { - // After the last process call, at time X, the target send time - // shifted to be earlier than X. This should normally not happen - // but we want to make sure rounding errors or erratic behavior - // of NextSendTime() does not cause issue. In particular, if the - // buffer reduction of - // rate * (target_send_time - previous_process_time) - // in the main loop doesn't clean up the existing debt we may not - // be able to send again. We don't want to check this reordering - // there as it is the normal exit condtion when the buffer is - // exhausted and there are packets in the queue. - UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time); - target_send_time = last_process_time_; - } } Timestamp previous_process_time = last_process_time_; @@ -605,7 +585,6 @@ std::unique_ptr PacingController::GetPendingPacket( return nullptr; } } else { - // Dynamic processing mode. if (now <= target_send_time) { // We allow sending slightly early if we think that we would actually // had been able to, had we been right on time - i.e. the current debt @@ -614,6 +593,11 @@ std::unique_ptr PacingController::GetPendingPacket( if (now + flush_time > target_send_time) { return nullptr; } + } else { + // In dynamic mode we should never try get a non-probe packet until + // the media debt is actually zero. Since there can be rounding errors, + // allow some discrepancy. + RTC_DCHECK_LE(media_debt_, media_rate_ * kMinSleepTime); } } } diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index 9337ad2f8a..5b5f6e71c1 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -732,48 +732,33 @@ TEST_P(PacingControllerTest, Padding) { EXPECT_LE((actual_pace_time - expected_pace_time).Abs(), PacingController::kMinSleepTime); - // Pacing media happens at 2.5x, but padding was configured with 1.0x + // Pacing media happens 2.5x factor, but padding was configured with 1.0x // factor. We have to wait until the padding debt is gone before we start // sending padding. const TimeDelta time_to_padding_debt_free = (expected_pace_time * kPaceMultiplier) - actual_pace_time; - clock_.AdvanceTime(time_to_padding_debt_free - - PacingController::kMinSleepTime); - pacer_->ProcessPackets(); + TimeDelta time_to_next = pacer_->NextSendTime() - clock_.CurrentTime(); + EXPECT_EQ(time_to_next, time_to_padding_debt_free); + clock_.AdvanceTime(time_to_next); // Send 10 padding packets. const size_t kPaddingPacketsToSend = 10; DataSize padding_sent = DataSize::Zero(); - size_t packets_sent = 0; - Timestamp first_send_time = Timestamp::MinusInfinity(); - Timestamp last_send_time = Timestamp::MinusInfinity(); - EXPECT_CALL(callback_, SendPadding) .Times(kPaddingPacketsToSend) .WillRepeatedly([&](size_t target_size) { - ++packets_sent; - if (packets_sent < kPaddingPacketsToSend) { - // Don't count bytes of last packet, instead just - // use this as the time the last packet finished - // sending. - padding_sent += DataSize::bytes(target_size); - } - if (first_send_time.IsInfinite()) { - first_send_time = clock_.CurrentTime(); - } else { - last_send_time = clock_.CurrentTime(); - } + padding_sent += DataSize::bytes(target_size); return target_size; }); EXPECT_CALL(callback_, SendPacket(_, _, _, false, true)) .Times(kPaddingPacketsToSend); - - while (packets_sent < kPaddingPacketsToSend) { + const Timestamp padding_start_time = clock_.CurrentTime(); + for (size_t i = 0; i < kPaddingPacketsToSend; ++i) { AdvanceTimeAndProcess(); } // Verify rate of sent padding. - TimeDelta padding_duration = last_send_time - first_send_time; + TimeDelta padding_duration = pacer_->NextSendTime() - padding_start_time; DataRate padding_rate = padding_sent / padding_duration; EXPECT_EQ(padding_rate, kTargetRate); } @@ -796,18 +781,15 @@ TEST_P(PacingControllerTest, NoPaddingBeforeNormalPacket) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, capture_time_ms, 250); - bool padding_sent = false; - EXPECT_CALL(callback_, SendPadding).WillOnce([&](size_t padding) { - padding_sent = true; + EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) { return padding; }); EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); if (PeriodicProcess()) { pacer_->ProcessPackets(); } else { - while (!padding_sent) { - AdvanceTimeAndProcess(); - } + AdvanceTimeAndProcess(); // Media. + AdvanceTimeAndProcess(); // Padding. } } @@ -1695,6 +1677,47 @@ TEST_P(PacingControllerTest, SmallFirstProbePacket) { } } +TEST_P(PacingControllerTest, TaskEarly) { + if (PeriodicProcess()) { + // This test applies only when NOT using interval budget. + return; + } + + // Set a low send rate to more easily test timing issues. + DataRate kSendRate = DataRate::kbps(30); + pacer_->SetPacingRates(kSendRate, DataRate::Zero()); + + // Add two packets. + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + + // Process packets, only first should be sent. + EXPECT_CALL(callback_, SendPacket).Times(1); + pacer_->ProcessPackets(); + + Timestamp next_send_time = pacer_->NextSendTime(); + + // Packets won't be sent if we try process more than one sleep time early. + ASSERT_GT(next_send_time - clock_.CurrentTime(), + PacingController::kMinSleepTime); + clock_.AdvanceTime(next_send_time - clock_.CurrentTime() - + (PacingController::kMinSleepTime + TimeDelta::ms(1))); + + EXPECT_CALL(callback_, SendPacket).Times(0); + pacer_->ProcessPackets(); + + // Assume timing is accurate within +-100us due to rounding. + const TimeDelta kErrorMargin = TimeDelta::us(100); + + // Check that next scheduled send time is still the same (within margin). + EXPECT_LT((pacer_->NextSendTime() - next_send_time).Abs(), kErrorMargin); + + // Advance to within error margin for execution. + clock_.AdvanceTime(TimeDelta::ms(1) + kErrorMargin); + EXPECT_CALL(callback_, SendPacket).Times(1); + pacer_->ProcessPackets(); +} + TEST_P(PacingControllerTest, TaskLate) { if (PeriodicProcess()) { // This test applies only when NOT using interval budget.