diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h index 6d394c26eb..26aefe5244 100644 --- a/webrtc/modules/pacing/include/paced_sender.h +++ b/webrtc/modules/pacing/include/paced_sender.h @@ -28,6 +28,9 @@ class PacedSender : public Module { kNormalPriority = 2, // Put in back of the line. kLowPriority = 3, // Put in back of the low priority line. }; + // Low priority packets are mixed with the normal priority packets + // while we are paused. + class Callback { public: // Note: packets sent as a result of a callback should not pass by this @@ -47,6 +50,12 @@ class PacedSender : public Module { // Enable/disable pacing. void SetStatus(bool enable); + // Temporarily pause all sending. + void Pause(); + + // Resume sending packets. + void Resume(); + // Current total estimated bitrate. void UpdateBitrate(int target_bitrate_kbps); @@ -80,6 +89,10 @@ class PacedSender : public Module { bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms); + // Local helper function to GetNextPacket. + void GetNextPacketFromList(std::list* list, + uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms); + // Updates the number of bytes that can be sent for the next time interval. void UpdateBytesPerInterval(uint32_t delta_time_in_ms); @@ -88,6 +101,7 @@ class PacedSender : public Module { Callback* callback_; bool enable_; + bool paused_; scoped_ptr critsect_; int target_bitrate_kbytes_per_s_; int bytes_remaining_interval_; @@ -95,6 +109,7 @@ class PacedSender : public Module { TickTime time_last_update_; TickTime time_last_send_; + std::list high_priority_packets_; std::list normal_priority_packets_; std::list low_priority_packets_; }; diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc index 0bd21cb366..1001b33a90 100644 --- a/webrtc/modules/pacing/paced_sender.cc +++ b/webrtc/modules/pacing/paced_sender.cc @@ -39,6 +39,7 @@ namespace webrtc { PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps) : callback_(callback), enable_(false), + paused_(false), critsect_(CriticalSectionWrapper::CreateCriticalSection()), target_bitrate_kbytes_per_s_(target_bitrate_kbps >> 3), // Divide by 8. bytes_remaining_interval_(0), @@ -48,10 +49,21 @@ PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps) } PacedSender::~PacedSender() { + high_priority_packets_.clear(); normal_priority_packets_.clear(); low_priority_packets_.clear(); } +void PacedSender::Pause() { + CriticalSectionScoped cs(critsect_.get()); + paused_ = true; +} + +void PacedSender::Resume() { + CriticalSectionScoped cs(critsect_.get()); + paused_ = false; +} + void PacedSender::SetStatus(bool enable) { CriticalSectionScoped cs(critsect_.get()); enable_ = enable; @@ -70,12 +82,38 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, UpdateState(bytes); return true; // We can send now. } + if (paused_) { + // Queue all packets when we are paused. + switch (priority) { + case kHighPriority: + high_priority_packets_.push_back( + Packet(ssrc, sequence_number, capture_time_ms, bytes)); + break; + case kNormalPriority: + case kLowPriority: + // Queue the low priority packets in the normal priority queue when we + // are paused to avoid starvation. + normal_priority_packets_.push_back( + Packet(ssrc, sequence_number, capture_time_ms, bytes)); + break; + } + return false; + } + switch (priority) { case kHighPriority: - UpdateState(bytes); - return true; // We can send now. + if (high_priority_packets_.empty() && + bytes_remaining_interval_ > 0) { + UpdateState(bytes); + return true; // We can send now. + } + high_priority_packets_.push_back( + Packet(ssrc, sequence_number, capture_time_ms, bytes)); + return false; case kNormalPriority: - if (normal_priority_packets_.empty() && bytes_remaining_interval_ > 0) { + if (high_priority_packets_.empty() && + normal_priority_packets_.empty() && + bytes_remaining_interval_ > 0) { UpdateState(bytes); return true; // We can send now. } @@ -83,7 +121,8 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, Packet(ssrc, sequence_number, capture_time_ms, bytes)); return false; case kLowPriority: - if (normal_priority_packets_.empty() && + if (high_priority_packets_.empty() && + normal_priority_packets_.empty() && low_priority_packets_.empty() && bytes_remaining_interval_ > 0) { UpdateState(bytes); @@ -114,7 +153,7 @@ int32_t PacedSender::Process() { CriticalSectionScoped cs(critsect_.get()); int elapsed_time_ms = (now - time_last_update_).Milliseconds(); time_last_update_ = now; - if (elapsed_time_ms > 0) { + if (!paused_ && elapsed_time_ms > 0) { uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); UpdateBytesPerInterval(delta_time_ms); uint32_t ssrc; @@ -125,7 +164,8 @@ int32_t PacedSender::Process() { callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms); critsect_->Enter(); } - if (normal_priority_packets_.empty() && + if (high_priority_packets_.empty() && + normal_priority_packets_.empty() && low_priority_packets_.empty() && padding_bytes_remaining_interval_ > 0) { critsect_->Leave(); @@ -164,41 +204,49 @@ bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number, if (bytes_remaining_interval_ <= 0) { // All bytes consumed for this interval. // Check if we have not sent in a too long time. - if (!normal_priority_packets_.empty()) { - if ((TickTime::Now() - time_last_send_).Milliseconds() > - kMaxQueueTimeWithoutSendingMs) { - Packet packet = normal_priority_packets_.front(); - UpdateState(packet.bytes_); - *sequence_number = packet.sequence_number_; - *ssrc = packet.ssrc_; - *capture_time_ms = packet.capture_time_ms_; - normal_priority_packets_.pop_front(); + if ((TickTime::Now() - time_last_send_).Milliseconds() > + kMaxQueueTimeWithoutSendingMs) { + if (!high_priority_packets_.empty()) { + GetNextPacketFromList(&high_priority_packets_, ssrc, sequence_number, + capture_time_ms); + return true; + } + if (!normal_priority_packets_.empty()) { + GetNextPacketFromList(&normal_priority_packets_, ssrc, sequence_number, + capture_time_ms); return true; } } return false; } + if (!high_priority_packets_.empty()) { + GetNextPacketFromList(&high_priority_packets_, ssrc, sequence_number, + capture_time_ms); + return true; + } if (!normal_priority_packets_.empty()) { - Packet packet = normal_priority_packets_.front(); - UpdateState(packet.bytes_); - *sequence_number = packet.sequence_number_; - *ssrc = packet.ssrc_; - *capture_time_ms = packet.capture_time_ms_; - normal_priority_packets_.pop_front(); + GetNextPacketFromList(&normal_priority_packets_, ssrc, sequence_number, + capture_time_ms); return true; } if (!low_priority_packets_.empty()) { - Packet packet = low_priority_packets_.front(); - UpdateState(packet.bytes_); - *sequence_number = packet.sequence_number_; - *ssrc = packet.ssrc_; - *capture_time_ms = packet.capture_time_ms_; - low_priority_packets_.pop_front(); + GetNextPacketFromList(&low_priority_packets_, ssrc, sequence_number, + capture_time_ms); return true; } return false; } +void PacedSender::GetNextPacketFromList(std::list* list, + uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms) { + Packet packet = list->front(); + UpdateState(packet.bytes_); + *sequence_number = packet.sequence_number_; + *ssrc = packet.ssrc_; + *capture_time_ms = packet.capture_time_ms_; + list->pop_front(); +} + // MUST have critsect_ when calling. void PacedSender::UpdateState(int num_bytes) { time_last_send_ = TickTime::Now(); diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc index 845ac40218..da6e673278 100644 --- a/webrtc/modules/pacing/paced_sender_unittest.cc +++ b/webrtc/modules/pacing/paced_sender_unittest.cc @@ -13,11 +13,12 @@ #include "webrtc/modules/pacing/include/paced_sender.h" -namespace { - const int kTargetBitrate = 800; -}; +using testing::_; namespace webrtc { +namespace test { + +static const int kTargetBitrate = 800; class MockPacedSenderCallback : public PacedSender::Callback { public: @@ -54,7 +55,7 @@ TEST_F(PacedSenderTest, QueuePacket) { EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, sequence_number, capture_time_ms, 250)); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); - EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0); + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(0); TickTime::AdvanceFakeClock(4); @@ -87,12 +88,12 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) { EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, capture_time_ms, 250)); } - EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0); + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); for (int k = 0; k < 10; ++k) { EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); EXPECT_CALL(callback_, - TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(3); + TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); } @@ -159,21 +160,20 @@ TEST_F(PacedSenderTest, Priority) { ssrc, sequence_number++, capture_time_ms, 250)); EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, capture_time_ms, 250)); - EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kHighPriority, + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority, ssrc, sequence_number++, capture_time_ms, 250)); - // Expect all normal priority to be sent out first. - EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0); - EXPECT_CALL(callback_, - TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(2); + // Expect all high and normal priority to be sent out first. + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(0, send_bucket_->Process()); - EXPECT_CALL(callback_, TimeToSendPacket(ssrc_low_priority, - testing::_, capture_time_ms_low_priority)).Times(1); + EXPECT_CALL(callback_, TimeToSendPacket( + ssrc_low_priority, _, capture_time_ms_low_priority)).Times(1); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); TickTime::AdvanceFakeClock(5); @@ -181,4 +181,61 @@ TEST_F(PacedSenderTest, Priority) { EXPECT_EQ(0, send_bucket_->Process()); } +TEST_F(PacedSenderTest, Pause) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + int64_t second_capture_time_ms = 67890; + + // Due to the multiplicative factor we can send 3 packets not 2 packets. + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kLowPriority, + ssrc_low_priority, sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + + send_bucket_->Pause(); + + // Expect everything to be queued. + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kLowPriority, + ssrc_low_priority, sequence_number++, capture_time_ms, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, + ssrc, sequence_number++, second_capture_time_ms, 250)); + EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority, + ssrc, sequence_number++, capture_time_ms, 250)); + + // Expect no packet to come out while paused. + EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _)).Times(0); + + for (int i = 0; i < 10; ++i) { + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + } + // Expect high prio packets to come out first followed by all packets in the + // way they were added. + EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms)).Times(3); + + send_bucket_->Resume(); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); + + EXPECT_CALL(callback_, + TimeToSendPacket(_, _, second_capture_time_ms)).Times(1); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + TickTime::AdvanceFakeClock(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0, send_bucket_->Process()); +} + +} // namespace test } // namespace webrtc