diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 862f4e189f..c35c50c67b 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -85,7 +85,8 @@ PacedSender::PacedSender(const Clock* clock, prober_(rtc::MakeUnique(event_log)), probing_send_failure_(false), pacing_bitrate_kbps_(0), - time_last_update_us_(clock->TimeInMicroseconds()), + time_last_process_us_(clock->TimeInMicroseconds()), + last_send_time_us_(clock->TimeInMicroseconds()), first_sent_packet_ms_(-1), packets_(std::move(packets)), packet_counter_(0), @@ -200,7 +201,8 @@ int64_t PacedSender::QueueInMs() const { int64_t PacedSender::TimeUntilNextProcess() { rtc::CritScope cs(&critsect_); - int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; + int64_t elapsed_time_us = + clock_->TimeInMicroseconds() - time_last_process_us_; int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; // When paused we wake up every 500 ms to send a padding packet to ensure // we won't get stuck in the paused state due to no feedback being received. @@ -218,21 +220,23 @@ int64_t PacedSender::TimeUntilNextProcess() { void PacedSender::Process() { int64_t now_us = clock_->TimeInMicroseconds(); rtc::CritScope cs(&critsect_); - int64_t elapsed_time_ms = std::min( - kMaxIntervalTimeMs, (now_us - time_last_update_us_ + 500) / 1000); - int target_bitrate_kbps = pacing_bitrate_kbps_; + time_last_process_us_ = now_us; + int64_t elapsed_time_ms = (now_us - last_send_time_us_ + 500) / 1000; + // When paused we send a padding packet every 500 ms to ensure we won't get + // stuck in the paused state due to no feedback being received. if (paused_) { - PacedPacketInfo pacing_info; - time_last_update_us_ = now_us; // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. - if (packet_counter_ == 0) - return; - SendPadding(1, pacing_info); + if (elapsed_time_ms >= kPausedPacketIntervalMs && packet_counter_ > 0) { + PacedPacketInfo pacing_info; + SendPadding(1, pacing_info); + last_send_time_us_ = clock_->TimeInMicroseconds(); + } return; } + int target_bitrate_kbps = pacing_bitrate_kbps_; if (elapsed_time_ms > 0) { size_t queue_size_bytes = packets_->SizeInBytes(); if (queue_size_bytes > 0) { @@ -252,7 +256,7 @@ void PacedSender::Process() { UpdateBudgetWithElapsedTime(elapsed_time_ms); } - time_last_update_us_ = now_us; + last_send_time_us_ = clock_->TimeInMicroseconds(); bool is_probing = prober_->IsProbing(); PacedPacketInfo pacing_info; @@ -262,6 +266,8 @@ void PacedSender::Process() { pacing_info = prober_->CurrentCluster(); recommended_probe_size = prober_->RecommendedMinProbeSize(); } + // The paused state is checked in the loop since SendPacket leaves the + // critical section allowing the paused state to be changed from other code. while (!packets_->Empty() && !paused_) { // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can @@ -269,10 +275,8 @@ void PacedSender::Process() { const PacketQueue::Packet& packet = packets_->BeginPop(); if (SendPacket(packet, pacing_info)) { - // Send succeeded, remove it from the queue. - if (first_sent_packet_ms_ == -1) - first_sent_packet_ms_ = clock_->TimeInMilliseconds(); bytes_sent += packet.bytes; + // Send succeeded, remove it from the queue. packets_->FinalizePop(packet); if (is_probing && bytes_sent > recommended_probe_size) break; @@ -323,6 +327,8 @@ bool PacedSender::SendPacket(const PacketQueue::Packet& packet, critsect_.Enter(); if (success) { + if (first_sent_packet_ms_ == -1) + first_sent_packet_ms_ = clock_->TimeInMilliseconds(); if (packet.priority != kHighPriority || account_for_audio_) { // Update media bytes sent. // TODO(eladalon): TimeToSendPacket() can also return |true| in some @@ -351,6 +357,7 @@ size_t PacedSender::SendPadding(size_t padding_needed, } void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { + delta_time_ms = std::min(kMaxIntervalTimeMs, delta_time_ms); media_budget_->IncreaseBudget(delta_time_ms); padding_budget_->IncreaseBudget(delta_time_ms); } diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 0501bbb691..13f3307b4f 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -156,7 +156,8 @@ class PacedSender : public Pacer { // order to meet pace time constraint). uint32_t pacing_bitrate_kbps_ RTC_GUARDED_BY(critsect_); - int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_); + int64_t time_last_process_us_ RTC_GUARDED_BY(critsect_); + int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_); int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_); const std::unique_ptr packets_ diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index 263495f64e..4180aa8a8c 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -599,16 +599,17 @@ TEST_P(PacedSenderTest, Pause) { send_bucket_->Process(); int64_t expected_time_until_send = 500; - EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); - while (expected_time_until_send >= 0) { - // TimeUntilNextProcess must not return 0 when paused. If it does, - // we risk running a busy loop, so ideally it should return a large value. - EXPECT_EQ(expected_time_until_send, send_bucket_->TimeUntilNextProcess()); - if (expected_time_until_send == 0) - send_bucket_->Process(); + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + while (expected_time_until_send >= 5) { + send_bucket_->Process(); clock_.AdvanceTimeMilliseconds(5); expected_time_until_send -= 5; } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + testing::Mock::VerifyAndClearExpectations(&callback_); // Expect high prio packets to come out first followed by normal // prio packets and low prio packets (all in capture order). @@ -649,6 +650,11 @@ TEST_P(PacedSenderTest, Pause) { } send_bucket_->Resume(); + // The pacer was resumed directly after the previous process call finished. It + // will therefore wait 5 ms until next process. + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + for (size_t i = 0; i < 4; i++) { EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); send_bucket_->Process();