diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index bf1c6eceee..2442bf0723 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -55,6 +55,7 @@ struct Packet { sequence_number(seq_number), capture_time_ms(capture_time_ms), enqueue_time_ms(enqueue_time_ms), + sum_paused_ms(0), bytes(length_in_bytes), retransmission(retransmission), enqueue_order(enqueue_order) {} @@ -62,8 +63,9 @@ struct Packet { RtpPacketSender::Priority priority; uint32_t ssrc; uint16_t sequence_number; - int64_t capture_time_ms; - int64_t enqueue_time_ms; + int64_t capture_time_ms; // Absolute time of frame capture. + int64_t enqueue_time_ms; // Absolute time of pacer queue entry. + int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused. size_t bytes; bool retransmission; uint64_t enqueue_order; @@ -96,7 +98,8 @@ class PacketQueue { : bytes_(0), clock_(clock), queue_time_sum_(0), - time_last_updated_(clock_->TimeInMilliseconds()) {} + time_last_updated_(clock_->TimeInMilliseconds()), + paused_(false) {} virtual ~PacketQueue() {} void Push(const Packet& packet) { @@ -126,7 +129,11 @@ class PacketQueue { void FinalizePop(const Packet& packet) { RemoveFromDupeSet(packet); bytes_ -= packet.bytes; - queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); + int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; + RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); + packet_queue_time_ms -= packet.sum_paused_ms; + RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_); + queue_time_sum_ -= packet_queue_time_ms; packet_list_.erase(packet.this_it); RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); if (packet_list_.empty()) @@ -148,14 +155,34 @@ class PacketQueue { void UpdateQueueTime(int64_t timestamp_ms) { RTC_DCHECK_GE(timestamp_ms, time_last_updated_); - int64_t delta = timestamp_ms - time_last_updated_; - // Use packet packet_list_.size() not prio_queue_.size() here, as there - // might be an outstanding element popped from prio_queue_ currently in the - // SendPacket() call, while packet_list_ will always be correct. - queue_time_sum_ += delta * packet_list_.size(); + if (timestamp_ms == time_last_updated_) + return; + + int64_t delta_ms = timestamp_ms - time_last_updated_; + + if (paused_) { + // Increase per-packet accumulators of time spent in queue while paused, + // so that we can disregard that when subtracting main accumulator when + // popping packet from the queue. + for (auto& it : packet_list_) { + it.sum_paused_ms += delta_ms; + } + } else { + // Use packet packet_list_.size() not prio_queue_.size() here, as there + // might be an outstanding element popped from prio_queue_ currently in + // the SendPacket() call, while packet_list_ will always be correct. + queue_time_sum_ += delta_ms * packet_list_.size(); + } time_last_updated_ = timestamp_ms; } + void SetPauseState(bool paused, int64_t timestamp_ms) { + if (paused_ == paused) + return; + UpdateQueueTime(timestamp_ms); + paused_ = paused; + } + int64_t AverageQueueTimeMs() const { if (prio_queue_.empty()) return 0; @@ -200,6 +227,7 @@ class PacketQueue { const Clock* const clock_; int64_t queue_time_sum_; int64_t time_last_updated_; + bool paused_; }; } // namespace paced_sender @@ -243,6 +271,7 @@ void PacedSender::Pause() { { rtc::CritScope cs(&critsect_); paused_ = true; + packets_->SetPauseState(true, clock_->TimeInMilliseconds()); } // Tell the process thread to call our TimeUntilNextProcess() method to get // a new (longer) estimate for when to call Process(). @@ -255,6 +284,7 @@ void PacedSender::Resume() { { rtc::CritScope cs(&critsect_); paused_ = false; + packets_->SetPauseState(false, clock_->TimeInMilliseconds()); } // Tell the process thread to call our TimeUntilNextProcess() method to // refresh the estimate for when to call Process(). diff --git a/webrtc/modules/pacing/paced_sender.h b/webrtc/modules/pacing/paced_sender.h index 1d38d0956d..a1f7ebefec 100644 --- a/webrtc/modules/pacing/paced_sender.h +++ b/webrtc/modules/pacing/paced_sender.h @@ -136,7 +136,8 @@ class PacedSender : public Module, public RtpPacketSender { virtual rtc::Optional GetApplicationLimitedRegionStartTime() const; // Returns the average time since being enqueued, in milliseconds, for all - // packets currently in the pacer queue, or 0 if queue is empty. + // packets currently in the pacer queue, excluding any time the pacer has been + // paused. Returns 0 if queue is empty. virtual int64_t AverageQueueTimeMs(); // Returns the number of milliseconds until the module want a worker thread diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index 5b814621c3..0b2ac1c987 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -1094,5 +1094,69 @@ TEST_F(PacedSenderTest, AvoidBusyLoopOnSendFailure) { EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); } +TEST_F(PacedSenderTest, QueueTimeWithPause) { + const size_t kPacketSize = 1200; + const uint32_t kSsrc = 12346; + uint16_t sequence_number = 1234; + + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + send_bucket_->Pause(); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + // In paused state, queue time should not increase. + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + send_bucket_->Resume(); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(200, send_bucket_->AverageQueueTimeMs()); +} + +TEST_F(PacedSenderTest, QueueTimePausedDuringPush) { + const size_t kPacketSize = 1200; + const uint32_t kSsrc = 12346; + uint16_t sequence_number = 1234; + + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + clock_.AdvanceTimeMilliseconds(100); + send_bucket_->Pause(); + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + // Add a new packet during paused phase. + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + // From a queue time perspective, packet inserted during pause will have zero + // queue time. Average queue time will then be (0 + 100) / 2 = 50. + EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs()); + + send_bucket_->Resume(); + EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(150, send_bucket_->AverageQueueTimeMs()); +} + +// TODO(sprang): Extract PacketQueue from PacedSender so that we can test +// removing elements while paused. (This is possible, but only because of semi- +// racy condition so can't easily be tested). + } // namespace test } // namespace webrtc