diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h index a0c1f1dfc1..44e81447ab 100644 --- a/webrtc/modules/pacing/include/paced_sender.h +++ b/webrtc/modules/pacing/include/paced_sender.h @@ -52,6 +52,9 @@ class PacedSender : public Module { protected: virtual ~Callback() {} }; + + static const int kDefaultMaxQueueLengthMs = 2000; + PacedSender(Callback* callback, int target_bitrate_kbps, float pace_multiplier); @@ -85,6 +88,10 @@ class PacedSender : public Module { int bytes, bool retransmission); + // Sets the max length of the pacer queue in milliseconds. + // A negative queue size is interpreted as infinite. + virtual void set_max_queue_length_ms(int max_queue_length_ms); + // Returns the time since the oldest queued packet was captured. virtual int QueueInMs() const; @@ -105,6 +112,8 @@ class PacedSender : public Module { uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms, bool* retransmission); + bool SendPacketFromList(paced_sender::PacketList* packet_list); + // Updates the number of bytes that can be sent for the next time interval. void UpdateBytesPerInterval(uint32_t delta_time_in_ms); @@ -115,6 +124,7 @@ class PacedSender : public Module { const float pace_multiplier_; bool enabled_; bool paused_; + int max_queue_length_ms_; scoped_ptr critsect_; // This is the media budget, keeping track of how many bits of media // we can pace out during the current interval. diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index 664b5e36dc..7db408996b 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -124,6 +124,7 @@ PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps, pace_multiplier_(pace_multiplier), enabled_(false), paused_(false), + max_queue_length_ms_(kDefaultMaxQueueLengthMs), critsect_(CriticalSectionWrapper::CreateCriticalSection()), media_budget_(new paced_sender::IntervalBudget( pace_multiplier_ * target_bitrate_kbps)), @@ -206,6 +207,11 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, return false; } +void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) { + CriticalSectionScoped cs(critsect_.get()); + max_queue_length_ms_ = max_queue_length_ms; +} + int PacedSender::QueueInMs() const { CriticalSectionScoped cs(critsect_.get()); int64_t now_ms = TickTime::MillisecondTimestamp(); @@ -254,36 +260,10 @@ int32_t PacedSender::Process() { uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); UpdateBytesPerInterval(delta_time_ms); } - uint32_t ssrc; - uint16_t sequence_number; - int64_t capture_time_ms; - bool retransmission; paced_sender::PacketList* packet_list; while (ShouldSendNextPacket(&packet_list)) { - GetNextPacketFromList(packet_list, &ssrc, &sequence_number, - &capture_time_ms, &retransmission); - critsect_->Leave(); - - const bool success = callback_->TimeToSendPacket(ssrc, sequence_number, - capture_time_ms, - retransmission); - critsect_->Enter(); - // If packet cannot be sent then keep it in packet list and exit early. - // There's no need to send more packets. - if (!success) { + if (!SendPacketFromList(packet_list)) return 0; - } - packet_list->pop_front(); - const bool last_packet = packet_list->empty() || - packet_list->front().capture_time_ms_ > capture_time_ms; - if (packet_list != high_priority_packets_.get()) { - if (capture_time_ms > capture_time_ms_last_sent_) { - capture_time_ms_last_sent_ = capture_time_ms; - } else if (capture_time_ms == capture_time_ms_last_sent_ && - last_packet) { - TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms); - } - } } if (high_priority_packets_->empty() && normal_priority_packets_->empty() && @@ -304,6 +284,39 @@ int32_t PacedSender::Process() { return 0; } +// MUST have critsect_ when calling. +bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) { + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; + bool retransmission; + GetNextPacketFromList(packet_list, &ssrc, &sequence_number, + &capture_time_ms, &retransmission); + critsect_->Leave(); + + const bool success = callback_->TimeToSendPacket(ssrc, sequence_number, + capture_time_ms, + retransmission); + critsect_->Enter(); + // If packet cannot be sent then keep it in packet list and exit early. + // There's no need to send more packets. + if (!success) { + return false; + } + packet_list->pop_front(); + const bool last_packet = packet_list->empty() || + packet_list->front().capture_time_ms_ > capture_time_ms; + if (packet_list != high_priority_packets_.get()) { + if (capture_time_ms > capture_time_ms_last_sent_) { + capture_time_ms_last_sent_ = capture_time_ms; + } else if (capture_time_ms == capture_time_ms_last_sent_ && + last_packet) { + TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms); + } + } + return true; +} + // MUST have critsect_ when calling. void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { media_budget_->IncreaseBudget(delta_time_ms); @@ -327,6 +340,21 @@ bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) { return true; } } + // Send any old packets to avoid queuing for too long. + if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) { + int64_t high_priority_capture_time = -1; + if (!high_priority_packets_->empty()) { + high_priority_capture_time = + high_priority_packets_->front().capture_time_ms_; + *packet_list = high_priority_packets_.get(); + } + if (!normal_priority_packets_->empty() && high_priority_capture_time > + normal_priority_packets_->front().capture_time_ms_) { + *packet_list = normal_priority_packets_.get(); + } + if (*packet_list) + return true; + } return false; } if (!high_priority_packets_->empty()) { diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index 66a3383112..655c03da87 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -83,53 +83,50 @@ class PacedSenderTest : public ::testing::Test { TEST_F(PacedSenderTest, QueuePacket) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; - int64_t capture_time_ms = 56789; // Due to the multiplicative factor we can send 3 packets not 2 packets. SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); + int64_t queued_packet_timestamp = TickTime::MillisecondTimestamp(); EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, - sequence_number, capture_time_ms, 250, false)); + sequence_number, queued_packet_timestamp, 250, false)); send_bucket_->Process(); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); - EXPECT_CALL(callback_, - TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false)).Times(0); TickTime::AdvanceFakeClock(4); EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(1); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_CALL(callback_, TimeToSendPacket( - ssrc, sequence_number++, capture_time_ms, false)) + ssrc, sequence_number++, queued_packet_timestamp, false)) .Times(1) .WillRepeatedly(Return(true)); send_bucket_->Process(); sequence_number++; SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, - sequence_number++, capture_time_ms, 250, false)); + sequence_number++, TickTime::MillisecondTimestamp(), 250, false)); send_bucket_->Process(); } TEST_F(PacedSenderTest, PaceQueuedPackets) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; - int64_t capture_time_ms = 56789; // Due to the multiplicative factor we can send 3 packets not 2 packets. for (int i = 0; i < 3; ++i) { SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); } for (int j = 0; j < 30; ++j) { EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, - sequence_number++, capture_time_ms, 250, false)); + sequence_number++, TickTime::MillisecondTimestamp(), 250, false)); } send_bucket_->Process(); EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); @@ -137,7 +134,7 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) { EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); EXPECT_CALL(callback_, - TimeToSendPacket(ssrc, _, capture_time_ms, false)) + TimeToSendPacket(ssrc, _, _, false)) .Times(3) .WillRepeatedly(Return(true)); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); @@ -148,35 +145,34 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) { EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, - sequence_number, capture_time_ms, 250, false)); + sequence_number, TickTime::MillisecondTimestamp(), 250, false)); send_bucket_->Process(); } TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; - int64_t capture_time_ms = 56789; uint16_t queued_sequence_number; // Due to the multiplicative factor we can send 3 packets not 2 packets. for (int i = 0; i < 3; ++i) { SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); } queued_sequence_number = sequence_number; for (int j = 0; j < 30; ++j) { // Send in duplicate packets. EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, - sequence_number, capture_time_ms, 250, false)); + sequence_number, TickTime::MillisecondTimestamp(), 250, false)); EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, - sequence_number++, capture_time_ms, 250, false)); + sequence_number++, TickTime::MillisecondTimestamp(), 250, false)); } EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); send_bucket_->Process(); @@ -186,7 +182,8 @@ TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) { for (int i = 0; i < 3; ++i) { EXPECT_CALL(callback_, TimeToSendPacket(ssrc, queued_sequence_number++, - capture_time_ms, false)) + _, + false)) .Times(1) .WillRepeatedly(Return(true)); } @@ -198,29 +195,28 @@ TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) { EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, - sequence_number++, capture_time_ms, 250, false)); + sequence_number++, TickTime::MillisecondTimestamp(), 250, false)); send_bucket_->Process(); } TEST_F(PacedSenderTest, Padding) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; - int64_t capture_time_ms = 56789; send_bucket_->UpdateBitrate(kTargetBitrate, kTargetBitrate, kTargetBitrate); // Due to the multiplicative factor we can send 3 packets not 2 packets. SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, - capture_time_ms, 250, false); + TickTime::MillisecondTimestamp(), 250, false); // No padding is expected since we have sent too much already. EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); @@ -492,5 +488,33 @@ TEST_F(PacedSenderTest, ResendPacket) { EXPECT_EQ(0, send_bucket_->QueueInMs()); } +TEST_F(PacedSenderTest, MaxQueueLength) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + send_bucket_->UpdateBitrate(30, 0, 0); + for (int i = 0; i < 30; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + TickTime::MillisecondTimestamp(), + 1200, + false); + } + + TickTime::AdvanceFakeClock(2001); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + TickTime::MillisecondTimestamp(), + 1200, + false); + EXPECT_EQ(2001, send_bucket_->QueueInMs()); + send_bucket_->Process(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); + TickTime::AdvanceFakeClock(31); + send_bucket_->Process(); +} } // namespace test } // namespace webrtc diff --git a/webrtc/video_engine/vie_encoder.cc b/webrtc/video_engine/vie_encoder.cc index ce13b1a672..f226ada874 100644 --- a/webrtc/video_engine/vie_encoder.cc +++ b/webrtc/video_engine/vie_encoder.cc @@ -846,10 +846,15 @@ void ViEEncoder::SetSenderBufferingMode(int target_delay_ms) { // Disable external frame-droppers. vcm_.EnableFrameDropper(false); vpm_.EnableTemporalDecimation(false); + // We don't put any limits on the pacer queue when running in buffered mode + // since the encoder will be paused if the queue grow too large. + paced_sender_->set_max_queue_length_ms(-1); } else { // Real-time mode - enable frame droppers. vpm_.EnableTemporalDecimation(true); vcm_.EnableFrameDropper(true); + paced_sender_->set_max_queue_length_ms( + PacedSender::kDefaultMaxQueueLengthMs); } }