diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index d637b27ea4..179c788243 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -91,7 +91,8 @@ PacedSender::PacedSender(const Clock* clock, min_send_bitrate_kbps_(0u), max_padding_bitrate_kbps_(0u), pacing_bitrate_kbps_(0), - time_last_update_us_(clock->TimeInMicroseconds()), + time_last_process_us_(clock->TimeInMicroseconds()), + last_send_time_us_(clock->TimeInMicroseconds()), first_sent_packet_ms_(-1), packets_(std::move(packets)), packet_counter_(0), @@ -238,7 +239,8 @@ int64_t PacedSender::QueueInMs() const { int64_t PacedSender::TimeUntilNextProcess() { rtc::CritScope cs(&critsect_); - int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; + int64_t elapsed_time_us = + clock_->TimeInMicroseconds() - time_last_process_us_; int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; // When paused we wake up every 500 ms to send a padding packet to ensure // we won't get stuck in the paused state due to no feedback being received. @@ -256,22 +258,24 @@ int64_t PacedSender::TimeUntilNextProcess() { void PacedSender::Process() { int64_t now_us = clock_->TimeInMicroseconds(); rtc::CritScope cs(&critsect_); - int64_t elapsed_time_ms = std::min( - kMaxIntervalTimeMs, (now_us - time_last_update_us_ + 500) / 1000); - int target_bitrate_kbps = pacing_bitrate_kbps_; + time_last_process_us_ = now_us; + int64_t elapsed_time_ms = (now_us - last_send_time_us_ + 500) / 1000; + // When paused we send a padding packet every 500 ms to ensure we won't get + // stuck in the paused state due to no feedback being received. if (paused_) { - PacedPacketInfo pacing_info; - time_last_update_us_ = now_us; // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. - if (packet_counter_ == 0) - return; - size_t bytes_sent = SendPadding(1, pacing_info); - alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); + if (elapsed_time_ms >= kPausedPacketIntervalMs && packet_counter_ > 0) { + PacedPacketInfo pacing_info; + size_t bytes_sent = SendPadding(1, pacing_info); + alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); + last_send_time_us_ = clock_->TimeInMicroseconds(); + } return; } + int target_bitrate_kbps = pacing_bitrate_kbps_; if (elapsed_time_ms > 0) { size_t queue_size_bytes = packets_->SizeInBytes(); if (queue_size_bytes > 0) { @@ -291,7 +295,7 @@ void PacedSender::Process() { UpdateBudgetWithElapsedTime(elapsed_time_ms); } - time_last_update_us_ = now_us; + last_send_time_us_ = clock_->TimeInMicroseconds(); bool is_probing = prober_->IsProbing(); PacedPacketInfo pacing_info; @@ -301,6 +305,8 @@ void PacedSender::Process() { pacing_info = prober_->CurrentCluster(); recommended_probe_size = prober_->RecommendedMinProbeSize(); } + // The paused state is checked in the loop since SendPacket leaves the + // critical section allowing the paused state to be changed from other code. while (!packets_->Empty() && !paused_) { // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can @@ -308,10 +314,8 @@ void PacedSender::Process() { const PacketQueue::Packet& packet = packets_->BeginPop(); if (SendPacket(packet, pacing_info)) { - // Send succeeded, remove it from the queue. - if (first_sent_packet_ms_ == -1) - first_sent_packet_ms_ = clock_->TimeInMilliseconds(); bytes_sent += packet.bytes; + // Send succeeded, remove it from the queue. packets_->FinalizePop(packet); if (is_probing && bytes_sent > recommended_probe_size) break; @@ -363,6 +367,8 @@ bool PacedSender::SendPacket(const PacketQueue::Packet& packet, critsect_.Enter(); if (success) { + if (first_sent_packet_ms_ == -1) + first_sent_packet_ms_ = clock_->TimeInMilliseconds(); if (packet.priority != kHighPriority || account_for_audio_) { // Update media bytes sent. // TODO(eladalon): TimeToSendPacket() can also return |true| in some @@ -391,6 +397,7 @@ size_t PacedSender::SendPadding(size_t padding_needed, } void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { + delta_time_ms = std::min(kMaxIntervalTimeMs, delta_time_ms); media_budget_->IncreaseBudget(delta_time_ms); padding_budget_->IncreaseBudget(delta_time_ms); } diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 7c0954777a..179101e3b2 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -178,7 +178,8 @@ class PacedSender : public Pacer { uint32_t max_padding_bitrate_kbps_ RTC_GUARDED_BY(critsect_); uint32_t pacing_bitrate_kbps_ RTC_GUARDED_BY(critsect_); - int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_); + int64_t time_last_process_us_ RTC_GUARDED_BY(critsect_); + int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_); int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_); const std::unique_ptr packets_ diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index 4507f3f929..47a85a653d 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -622,16 +622,17 @@ TEST_P(PacedSenderTest, Pause) { send_bucket_->Process(); int64_t expected_time_until_send = 500; - EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); - while (expected_time_until_send >= 0) { - // TimeUntilNextProcess must not return 0 when paused. If it does, - // we risk running a busy loop, so ideally it should return a large value. - EXPECT_EQ(expected_time_until_send, send_bucket_->TimeUntilNextProcess()); - if (expected_time_until_send == 0) - send_bucket_->Process(); + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + while (expected_time_until_send >= 5) { + send_bucket_->Process(); clock_.AdvanceTimeMilliseconds(5); expected_time_until_send -= 5; } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + testing::Mock::VerifyAndClearExpectations(&callback_); // Expect high prio packets to come out first followed by normal // prio packets and low prio packets (all in capture order). @@ -672,6 +673,11 @@ TEST_P(PacedSenderTest, Pause) { } send_bucket_->Resume(); + // The pacer was resumed directly after the previous process call finished. It + // will therefore wait 5 ms until next process. + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + for (size_t i = 0; i < 4; i++) { EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); send_bucket_->Process();