diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index 3c299f827e..e38405a6a2 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -10,12 +10,11 @@ #include "webrtc/modules/pacing/paced_sender.h" -#include - #include #include #include +#include "webrtc/base/checks.h" #include "webrtc/modules/include/module_common_types.h" #include "webrtc/modules/pacing/bitrate_prober.h" #include "webrtc/system_wrappers/include/clock.h" @@ -86,7 +85,11 @@ struct Comparator { // Class encapsulating a priority queue with some extensions. class PacketQueue { public: - PacketQueue() : bytes_(0) {} + explicit PacketQueue(Clock* clock) + : bytes_(0), + clock_(clock), + queue_time_sum_(0), + time_last_updated_(clock_->TimeInMilliseconds()) {} virtual ~PacketQueue() {} void Push(const Packet& packet) { @@ -114,6 +117,7 @@ class PacketQueue { void FinalizePop(const Packet& packet) { RemoveFromDupeSet(packet); bytes_ -= packet.bytes; + queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); packet_list_.erase(packet.this_it); } @@ -123,13 +127,22 @@ class PacketQueue { uint64_t SizeInBytes() const { return bytes_; } - int64_t OldestEnqueueTime() const { - std::list::const_reverse_iterator it = packet_list_.rbegin(); + int64_t OldestEnqueueTimeMs() const { + auto it = packet_list_.rbegin(); if (it == packet_list_.rend()) return 0; return it->enqueue_time_ms; } + int64_t AverageQueueTimeMs() { + int64_t now = clock_->TimeInMilliseconds(); + RTC_DCHECK_GE(now, time_last_updated_); + int64_t delta = now - time_last_updated_; + queue_time_sum_ += delta * prio_queue_.size(); + time_last_updated_ = now; + return queue_time_sum_ / prio_queue_.size(); + } + private: // Try to add a packet to the set of ssrc/seqno identifiers currently in the // queue. Return true if inserted, false if this is a duplicate. @@ -147,7 +160,7 @@ class PacketQueue { void RemoveFromDupeSet(const Packet& packet) { SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); - assert(it != dupe_map_.end()); + RTC_DCHECK(it != dupe_map_.end()); it->second.erase(packet.sequence_number); if (it->second.empty()) { dupe_map_.erase(it); @@ -165,6 +178,9 @@ class PacketQueue { // Map >, for checking duplicates. typedef std::map > SsrcSeqNoMap; SsrcSeqNoMap dupe_map_; + Clock* const clock_; + int64_t queue_time_sum_; + int64_t time_last_updated_; }; class IntervalBudget { @@ -209,6 +225,7 @@ class IntervalBudget { }; } // namespace paced_sender +const int64_t PacedSender::kMaxQueueLengthMs = 2000; const float PacedSender::kDefaultPaceMultiplier = 2.5f; PacedSender::PacedSender(Clock* clock, @@ -225,8 +242,9 @@ PacedSender::PacedSender(Clock* clock, padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), prober_(new BitrateProber()), bitrate_bps_(1000 * bitrate_kbps), + max_bitrate_kbps_(max_bitrate_kbps), time_last_update_us_(clock->TimeInMicroseconds()), - packets_(new paced_sender::PacketQueue()), + packets_(new paced_sender::PacketQueue(clock)), packet_counter_(0) { UpdateBytesPerInterval(kMinPacketLimitMs); } @@ -244,7 +262,7 @@ void PacedSender::Resume() { } void PacedSender::SetProbingEnabled(bool enabled) { - assert(packet_counter_ == 0); + RTC_CHECK_EQ(0u, packet_counter_); probing_enabled_ = enabled; } @@ -252,9 +270,12 @@ void PacedSender::UpdateBitrate(int bitrate_kbps, int max_bitrate_kbps, int min_bitrate_kbps) { CriticalSectionScoped cs(critsect_.get()); - media_budget_->set_target_rate_kbps(max_bitrate_kbps); + // Don't set media bitrate here as it may be boosted in order to meet max + // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_ + // be updated in Process(). padding_budget_->set_target_rate_kbps(min_bitrate_kbps); bitrate_bps_ = 1000 * bitrate_kbps; + max_bitrate_kbps_ = max_bitrate_kbps; } void PacedSender::InsertPacket(RtpPacketSender::Priority priority, @@ -265,14 +286,12 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, bool retransmission) { CriticalSectionScoped cs(critsect_.get()); - if (probing_enabled_ && !prober_->IsProbing()) { + if (probing_enabled_ && !prober_->IsProbing()) prober_->SetEnabled(true); - } prober_->MaybeInitializeProbe(bitrate_bps_); - if (capture_time_ms < 0) { + if (capture_time_ms < 0) capture_time_ms = clock_->TimeInMilliseconds(); - } packets_->Push(paced_sender::Packet( priority, ssrc, sequence_number, capture_time_ms, @@ -281,9 +300,8 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, int64_t PacedSender::ExpectedQueueTimeMs() const { CriticalSectionScoped cs(critsect_.get()); - int target_rate = media_budget_->target_rate_kbps(); - assert(target_rate > 0); - return static_cast(packets_->SizeInBytes() * 8 / target_rate); + RTC_DCHECK_GT(max_bitrate_kbps_, 0); + return static_cast(packets_->SizeInBytes() * 8 / max_bitrate_kbps_); } size_t PacedSender::QueueSizePackets() const { @@ -294,7 +312,7 @@ size_t PacedSender::QueueSizePackets() const { int64_t PacedSender::QueueInMs() const { CriticalSectionScoped cs(critsect_.get()); - int64_t oldest_packet = packets_->OldestEnqueueTime(); + int64_t oldest_packet = packets_->OldestEnqueueTimeMs(); if (oldest_packet == 0) return 0; @@ -305,9 +323,8 @@ int64_t PacedSender::TimeUntilNextProcess() { CriticalSectionScoped cs(critsect_.get()); if (prober_->IsProbing()) { int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); - if (ret >= 0) { + if (ret >= 0) return ret; - } } int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; @@ -321,14 +338,29 @@ int32_t PacedSender::Process() { time_last_update_us_ = now_us; if (paused_) return 0; + int target_bitrate_kbps = max_bitrate_kbps_; if (elapsed_time_ms > 0) { + size_t queue_size_bytes = packets_->SizeInBytes(); + if (queue_size_bytes > 0) { + // Assuming equal size packets and input/output rate, the average packet + // has avg_time_left_ms left to get queue_size_bytes out of the queue, if + // time constraint shall be met. Determine bitrate needed for that. + int64_t avg_time_left_ms = std::max( + 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); + int min_bitrate_needed_kbps = + static_cast(queue_size_bytes * 8 / avg_time_left_ms); + if (min_bitrate_needed_kbps > target_bitrate_kbps) + target_bitrate_kbps = min_bitrate_needed_kbps; + } + + media_budget_->set_target_rate_kbps(target_bitrate_kbps); + int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); UpdateBytesPerInterval(delta_time_ms); } while (!packets_->Empty()) { - if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) { + if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) return 0; - } // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can @@ -337,9 +369,8 @@ int32_t PacedSender::Process() { if (SendPacket(packet)) { // Send succeeded, remove it from the queue. packets_->FinalizePop(packet); - if (prober_->IsProbing()) { + if (prober_->IsProbing()) return 0; - } } else { // Send failed, put it back into the queue. packets_->CancelPop(packet); @@ -351,10 +382,11 @@ int32_t PacedSender::Process() { return 0; size_t padding_needed; - if (prober_->IsProbing()) + if (prober_->IsProbing()) { padding_needed = prober_->RecommendedPacketSize(); - else + } else { padding_needed = padding_budget_->bytes_remaining(); + } if (padding_needed > 0) SendPadding(static_cast(padding_needed)); diff --git a/webrtc/modules/pacing/paced_sender.h b/webrtc/modules/pacing/paced_sender.h index fa9f59a3d7..d1e5ce341d 100644 --- a/webrtc/modules/pacing/paced_sender.h +++ b/webrtc/modules/pacing/paced_sender.h @@ -51,7 +51,11 @@ class PacedSender : public Module, public RtpPacketSender { virtual ~Callback() {} }; - static const int64_t kDefaultMaxQueueLengthMs = 2000; + // Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than + // this value, the packet producers should wait (eg drop frames rather than + // encoding them). Bitrate sent may temporarily exceed target set by + // UpdateBitrate() so that this limit will be upheld. + static const int64_t kMaxQueueLengthMs; // Pace in kbits/s until we receive first estimate. static const int kDefaultInitialPaceKbps = 2000; // Pacing-rate relative to our target send rate. @@ -142,7 +146,10 @@ class PacedSender : public Module, public RtpPacketSender { GUARDED_BY(critsect_); rtc::scoped_ptr prober_ GUARDED_BY(critsect_); + // Actual configured bitrates (media_budget_ may temporarily be higher in + // order to meet pace time constraint). int bitrate_bps_ GUARDED_BY(critsect_); + int max_bitrate_kbps_ GUARDED_BY(critsect_); int64_t time_last_update_us_ GUARDED_BY(critsect_); diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index 78a49cd4c3..1a2936df53 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -560,6 +560,9 @@ TEST_F(PacedSenderTest, Pause) { EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms, false)) .Times(3) .WillRepeatedly(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms, false)) + .Times(1) + .WillRepeatedly(Return(true)); send_bucket_->Resume(); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); @@ -567,13 +570,6 @@ TEST_F(PacedSenderTest, Pause) { EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); - EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms, false)) - .Times(1) - .WillRepeatedly(Return(true)); - EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); - clock_.AdvanceTimeMilliseconds(5); - EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); - EXPECT_EQ(0, send_bucket_->Process()); EXPECT_EQ(0, send_bucket_->QueueInMs()); } @@ -664,10 +660,9 @@ TEST_F(PacedSenderTest, ExpectedQueueTimeMs) { EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); - // Allow for aliasing, duration should be in [expected(n - 1), expected(n)]. - EXPECT_LE(duration, queue_in_ms); - EXPECT_GE(duration, - queue_in_ms - static_cast(kPacketSize * 8 / kMaxBitrate)); + // Allow for aliasing, duration should be within one pack of max time limit. + EXPECT_NEAR(duration, PacedSender::kMaxQueueLengthMs, + static_cast(kPacketSize * 8 / kMaxBitrate)); } TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) { diff --git a/webrtc/video_engine/vie_encoder.cc b/webrtc/video_engine/vie_encoder.cc index 94f5fdae66..9eb4a3e30c 100644 --- a/webrtc/video_engine/vie_encoder.cc +++ b/webrtc/video_engine/vie_encoder.cc @@ -327,7 +327,7 @@ bool ViEEncoder::EncoderPaused() const { static_cast(target_delay_ms_ * kEncoderPausePacerMargin), kMinPacingDelayMs); } - if (pacer_->ExpectedQueueTimeMs() > PacedSender::kDefaultMaxQueueLengthMs) { + if (pacer_->ExpectedQueueTimeMs() > PacedSender::kMaxQueueLengthMs) { // Too much data in pacer queue, drop frame. return true; }