diff --git a/modules/congestion_controller/rtp/pacer_controller.cc b/modules/congestion_controller/rtp/pacer_controller.cc index 1da67e0df9..a577f85e84 100644 --- a/modules/congestion_controller/rtp/pacer_controller.cc +++ b/modules/congestion_controller/rtp/pacer_controller.cc @@ -25,26 +25,22 @@ PacerController::~PacerController() = default; void PacerController::OnCongestionWindow(CongestionWindow congestion_window) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - if (congestion_window.enabled) { - congestion_window_ = congestion_window; - } else { - congestion_window_ = rtc::nullopt; - congested_ = false; - UpdatePacerState(); - } + if (congestion_window.enabled) + pacer_->SetCongestionWindow(congestion_window.data_window.bytes()); + else + pacer_->SetCongestionWindow(PacedSender::kNoCongestionWindow); } void PacerController::OnNetworkAvailability(NetworkAvailability msg) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); network_available_ = msg.network_available; - congested_ = false; - UpdatePacerState(); + pacer_->UpdateOutstandingData(0); + SetPacerState(!msg.network_available); } void PacerController::OnNetworkRouteChange(NetworkRouteChange) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - congested_ = false; - UpdatePacerState(); + pacer_->UpdateOutstandingData(0); } void PacerController::OnPacerConfig(PacerConfig msg) { @@ -62,15 +58,7 @@ void PacerController::OnProbeClusterConfig(ProbeClusterConfig config) { void PacerController::OnOutstandingData(OutstandingData msg) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - if (congestion_window_.has_value()) { - congested_ = msg.in_flight_data > congestion_window_->data_window; - } - UpdatePacerState(); -} - -void PacerController::UpdatePacerState() { - bool pause = congested_ || !network_available_; - SetPacerState(pause); + pacer_->UpdateOutstandingData(msg.in_flight_data.bytes()); } void PacerController::SetPacerState(bool paused) { diff --git a/modules/congestion_controller/rtp/pacer_controller.h b/modules/congestion_controller/rtp/pacer_controller.h index e25bc6f061..6aa029ca71 100644 --- a/modules/congestion_controller/rtp/pacer_controller.h +++ b/modules/congestion_controller/rtp/pacer_controller.h @@ -38,13 +38,10 @@ class PacerController { void OnProbeClusterConfig(ProbeClusterConfig msg); private: - void UpdatePacerState(); void SetPacerState(bool paused); PacedSender* const pacer_; rtc::Optional current_pacer_config_; - rtc::Optional congestion_window_; - bool congested_ = false; bool pacer_paused_ = false; bool network_available_ = true; diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index c9e30ca08e..afec7d0094 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -33,7 +33,8 @@ namespace { // Time limit in milliseconds between packet bursts. const int64_t kMinPacketLimitMs = 5; -const int64_t kPausedPacketIntervalMs = 500; +const int64_t kCongestedPacketIntervalMs = 500; +const int64_t kPausedProcessIntervalMs = kCongestedPacketIntervalMs; const int64_t kMaxElapsedTimeMs = 2000; // Upper cap on process interval, in case process has not been called in a long @@ -119,6 +120,22 @@ void PacedSender::Resume() { process_thread_->WakeUp(this); } +void PacedSender::SetCongestionWindow(int64_t congestion_window_bytes) { + rtc::CritScope cs(&critsect_); + congestion_window_bytes_ = congestion_window_bytes; +} + +void PacedSender::UpdateOutstandingData(int64_t outstanding_bytes) { + rtc::CritScope cs(&critsect_); + outstanding_bytes_ = outstanding_bytes; +} + +bool PacedSender::Congested() const { + if (congestion_window_bytes_ == kNoCongestionWindow) + return false; + return outstanding_bytes_ >= congestion_window_bytes_; +} + void PacedSender::SetProbingEnabled(bool enabled) { rtc::CritScope cs(&critsect_); RTC_CHECK_EQ(0, packet_counter_); @@ -225,7 +242,7 @@ int64_t PacedSender::TimeUntilNextProcess() { // When paused we wake up every 500 ms to send a padding packet to ensure // we won't get stuck in the paused state due to no feedback being received. if (paused_) - return std::max(kPausedPacketIntervalMs - elapsed_time_ms, 0); + return std::max(kPausedProcessIntervalMs - elapsed_time_ms, 0); if (prober_->IsProbing()) { int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); @@ -246,12 +263,14 @@ void PacedSender::Process() { << kMaxElapsedTimeMs << " ms"; elapsed_time_ms = kMaxElapsedTimeMs; } - // When paused we send a padding packet every 500 ms to ensure we won't get - // stuck in the paused state due to no feedback being received. - if (paused_) { + // When congested we send a padding packet every 500 ms to ensure we won't get + // stuck in the congested state due to no feedback being received. + // TODO(srte): Stop sending packet in paused state when pause is no longer + // used for congestion windows. + if (paused_ || Congested()) { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. - if (elapsed_time_ms >= kPausedPacketIntervalMs && packet_counter_ > 0) { + if (elapsed_time_ms >= kCongestedPacketIntervalMs && packet_counter_ > 0) { PacedPacketInfo pacing_info; size_t bytes_sent = SendPadding(1, pacing_info); alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); @@ -292,7 +311,7 @@ void PacedSender::Process() { } // The paused state is checked in the loop since SendPacket leaves the // critical section allowing the paused state to be changed from other code. - while (!packets_->Empty() && !paused_) { + while (!packets_->Empty() && !paused_ && !Congested()) { // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can // reinsert it if send fails. @@ -311,7 +330,7 @@ void PacedSender::Process() { } } - if (packets_->Empty()) { + if (packets_->Empty() && !Congested()) { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. if (packet_counter_ > 0) { @@ -388,6 +407,7 @@ void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { } void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) { + outstanding_bytes_ += bytes_sent; media_budget_->UseBudget(bytes_sent); padding_budget_->UseBudget(bytes_sent); } diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 179101e3b2..888bedfb5d 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -49,6 +49,7 @@ class PacedSender : public Pacer { protected: virtual ~PacketSender() {} }; + static constexpr int64_t kNoCongestionWindow = -1; // Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than // this value, the packet producers should wait (eg drop frames rather than @@ -81,6 +82,9 @@ class PacedSender : public Pacer { // Resume sending packets. void Resume(); + void SetCongestionWindow(int64_t congestion_window_bytes); + void UpdateOutstandingData(int64_t outstanding_bytes); + // Enable bitrate probing. Enabled by default, mostly here to simplify // testing. Must be called before any packets are being sent to have an // effect. @@ -153,6 +157,9 @@ class PacedSender : public Pacer { size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + void OnBytesSent(size_t bytes_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + const Clock* const clock_; PacketSender* const packet_sender_; const std::unique_ptr alr_detector_ RTC_PT_GUARDED_BY(critsect_); @@ -186,6 +193,9 @@ class PacedSender : public Pacer { RTC_PT_GUARDED_BY(critsect_); uint64_t packet_counter_ RTC_GUARDED_BY(critsect_); + int64_t congestion_window_bytes_ RTC_GUARDED_BY(critsect_) = + kNoCongestionWindow; + int64_t outstanding_bytes_ RTC_GUARDED_BY(critsect_) = 0; float pacing_factor_ RTC_GUARDED_BY(critsect_); // Lock to avoid race when attaching process thread. This can happen due to // the Call class setting network state on SendSideCongestionController, which diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index b46c358a1e..44914b7c63 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -562,6 +562,103 @@ TEST_F(PacedSenderTest, HighPrioDoesntAffectBudget) { EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); } +TEST_F(PacedSenderTest, SendsOnlyPaddingWhenCongested) { + uint32_t ssrc = 202020; + uint16_t sequence_number = 1000; + int kPacketSize = 250; + int kCongestionWindow = kPacketSize * 10; + + send_bucket_->UpdateOutstandingData(0); + send_bucket_->SetCongestionWindow(kCongestionWindow); + int sent_data = 0; + while (sent_data < kCongestionWindow) { + sent_data += kPacketSize; + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + + size_t blocked_packets = 0; + int64_t expected_time_until_padding = 500; + while (expected_time_until_padding > 5) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + blocked_packets++; + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + expected_time_until_padding -= 5; + } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + EXPECT_EQ(blocked_packets, send_bucket_->QueueSizePackets()); +} + +TEST_F(PacedSenderTest, ResumesSendingWhenCongestionEnds) { + uint32_t ssrc = 202020; + uint16_t sequence_number = 1000; + int64_t kPacketSize = 250; + int64_t kCongestionCount = 10; + int64_t kCongestionWindow = kPacketSize * kCongestionCount; + int64_t kCongestionTimeMs = 1000; + + send_bucket_->UpdateOutstandingData(0); + send_bucket_->SetCongestionWindow(kCongestionWindow); + int sent_data = 0; + while (sent_data < kCongestionWindow) { + sent_data += kPacketSize; + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)).Times(0); + int unacked_packets = 0; + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + unacked_packets++; + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + testing::Mock::VerifyAndClearExpectations(&callback_); + + // First mark half of the congested packets as cleared and make sure that just + // as many are sent + int ack_count = kCongestionCount / 2; + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _)) + .Times(ack_count) + .WillRepeatedly(Return(true)); + send_bucket_->UpdateOutstandingData(kCongestionWindow - + kPacketSize * ack_count); + + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + unacked_packets -= ack_count; + testing::Mock::VerifyAndClearExpectations(&callback_); + + // Second make sure all packets are sent if sent packets are continuously + // marked as acked. + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _)) + .Times(unacked_packets) + .WillRepeatedly(Return(true)); + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + send_bucket_->UpdateOutstandingData(0); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } +} + TEST_F(PacedSenderTest, Pause) { uint32_t ssrc_low_priority = 12345; uint32_t ssrc = 12346;