From ad113e50d251c95adcf501ed29f8312ad1193a35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Thu, 26 Nov 2015 16:26:12 +0100 Subject: [PATCH] Fix bug in calculation of averge queue time in paced sender. Also work around a flaw in fake encoder which caused bogus perf regression in rampup tests. BUG=560434 R=mflodman@webrtc.org, stefan@webrtc.org Review URL: https://codereview.webrtc.org/1474533006 . Cr-Commit-Position: refs/heads/master@{#10811} --- webrtc/modules/pacing/paced_sender.cc | 49 ++++++++++++++----- webrtc/modules/pacing/paced_sender.h | 4 ++ .../modules/pacing/paced_sender_unittest.cc | 45 +++++++++++++++++ webrtc/test/fake_encoder.cc | 5 ++ 4 files changed, 90 insertions(+), 13 deletions(-) diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index e38405a6a2..dcdf64e97e 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -32,6 +32,9 @@ const int64_t kMaxIntervalTimeMs = 30; } // namespace +// TODO(sprang): Move at least PacketQueue and MediaBudget out to separate +// files, so that we can more easily test them. + namespace webrtc { namespace paced_sender { struct Packet { @@ -93,9 +96,11 @@ class PacketQueue { virtual ~PacketQueue() {} void Push(const Packet& packet) { - if (!AddToDupeSet(packet)) { + if (!AddToDupeSet(packet)) return; - } + + UpdateQueueTime(packet.enqueue_time_ms); + // Store packet in list, use pointers in priority queue for cheaper moves. // Packets have a handle to its own iterator in the list, for easy removal // when popping from queue. @@ -119,6 +124,9 @@ class PacketQueue { bytes_ -= packet.bytes; queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); packet_list_.erase(packet.this_it); + RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); + if (packet_list_.empty()) + RTC_DCHECK_EQ(0u, queue_time_sum_); } bool Empty() const { return prio_queue_.empty(); } @@ -134,13 +142,20 @@ class PacketQueue { return it->enqueue_time_ms; } - int64_t AverageQueueTimeMs() { - int64_t now = clock_->TimeInMilliseconds(); - RTC_DCHECK_GE(now, time_last_updated_); - int64_t delta = now - time_last_updated_; - queue_time_sum_ += delta * prio_queue_.size(); - time_last_updated_ = now; - return queue_time_sum_ / prio_queue_.size(); + void UpdateQueueTime(int64_t timestamp_ms) { + RTC_DCHECK_GE(timestamp_ms, time_last_updated_); + int64_t delta = timestamp_ms - time_last_updated_; + // Use packet packet_list_.size() not prio_queue_.size() here, as there + // might be an outstanding element popped from prio_queue_ currently in the + // SendPacket() call, while packet_list_ will always be correct. + queue_time_sum_ += delta * packet_list_.size(); + time_last_updated_ = timestamp_ms; + } + + int64_t AverageQueueTimeMs() const { + if (prio_queue_.empty()) + return 0; + return queue_time_sum_ / packet_list_.size(); } private: @@ -290,12 +305,13 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, prober_->SetEnabled(true); prober_->MaybeInitializeProbe(bitrate_bps_); + int64_t now_ms = clock_->TimeInMilliseconds(); if (capture_time_ms < 0) - capture_time_ms = clock_->TimeInMilliseconds(); + capture_time_ms = now_ms; - packets_->Push(paced_sender::Packet( - priority, ssrc, sequence_number, capture_time_ms, - clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++)); + packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number, + capture_time_ms, now_ms, bytes, + retransmission, packet_counter_++)); } int64_t PacedSender::ExpectedQueueTimeMs() const { @@ -319,6 +335,12 @@ int64_t PacedSender::QueueInMs() const { return clock_->TimeInMilliseconds() - oldest_packet; } +int64_t PacedSender::AverageQueueTimeMs() { + CriticalSectionScoped cs(critsect_.get()); + packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); + return packets_->AverageQueueTimeMs(); +} + int64_t PacedSender::TimeUntilNextProcess() { CriticalSectionScoped cs(critsect_.get()); if (prober_->IsProbing()) { @@ -345,6 +367,7 @@ int32_t PacedSender::Process() { // Assuming equal size packets and input/output rate, the average packet // has avg_time_left_ms left to get queue_size_bytes out of the queue, if // time constraint shall be met. Determine bitrate needed for that. + packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); int64_t avg_time_left_ms = std::max( 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); int min_bitrate_needed_kbps = diff --git a/webrtc/modules/pacing/paced_sender.h b/webrtc/modules/pacing/paced_sender.h index d1e5ce341d..62e794fdbc 100644 --- a/webrtc/modules/pacing/paced_sender.h +++ b/webrtc/modules/pacing/paced_sender.h @@ -113,6 +113,10 @@ class PacedSender : public Module, public RtpPacketSender { // packets in the queue, given the current size and bitrate, ignoring prio. virtual int64_t ExpectedQueueTimeMs() const; + // Returns the average time since being enqueued, in milliseconds, for all + // packets currently in the pacer queue, or 0 if queue is empty. + virtual int64_t AverageQueueTimeMs(); + // Returns the number of milliseconds until the module want a worker thread // to call Process. int64_t TimeUntilNextProcess() override; diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index 1a2936df53..bf00a05237 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -825,5 +825,50 @@ TEST_F(PacedSenderTest, PaddingOveruse) { send_bucket_->Process(); } +TEST_F(PacedSenderTest, AverageQueueTime) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + const int kBitrateBps = 10 * kPacketSize * 8; // 10 packets per second. + const int kBitrateKbps = (kBitrateBps + 500) / 1000; + + send_bucket_->UpdateBitrate(kBitrateKbps, kBitrateKbps, kBitrateKbps); + + EXPECT_EQ(0, send_bucket_->AverageQueueTimeMs()); + + int64_t first_capture_time = clock_.TimeInMilliseconds(); + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, sequence_number, + first_capture_time, kPacketSize, false); + clock_.AdvanceTimeMilliseconds(10); + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, + sequence_number + 1, clock_.TimeInMilliseconds(), + kPacketSize, false); + clock_.AdvanceTimeMilliseconds(10); + + EXPECT_EQ((20 + 10) / 2, send_bucket_->AverageQueueTimeMs()); + + // Only first packet (queued for 20ms) should be removed, leave the second + // packet (queued for 10ms) alone in the queue. + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, + first_capture_time, false)) + .Times(1) + .WillRepeatedly(Return(true)); + send_bucket_->Process(); + + EXPECT_EQ(10, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(10); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1, + first_capture_time + 10, false)) + .Times(1) + .WillRepeatedly(Return(true)); + for (int i = 0; i < 3; ++i) { + clock_.AdvanceTimeMilliseconds(30); // Max delta. + send_bucket_->Process(); + } + + EXPECT_EQ(0, send_bucket_->AverageQueueTimeMs()); +} + } // namespace test } // namespace webrtc diff --git a/webrtc/test/fake_encoder.cc b/webrtc/test/fake_encoder.cc index 44fb1c5882..165fd3e536 100644 --- a/webrtc/test/fake_encoder.cc +++ b/webrtc/test/fake_encoder.cc @@ -57,6 +57,11 @@ int32_t FakeEncoder::Encode(const VideoFrame& input_image, // at the display time of the previous frame. time_since_last_encode_ms = time_now_ms - last_encode_time_ms_; } + if (time_since_last_encode_ms > 3 * 1000 / config_.maxFramerate) { + // Rudimentary check to make sure we don't widely overshoot bitrate target + // when resuming encoding after a suspension. + time_since_last_encode_ms = 3 * 1000 / config_.maxFramerate; + } size_t bits_available = static_cast(target_bitrate_kbps_ * time_since_last_encode_ms);