From 45d9c1de9cfda4afd3cb2b8de09fda712ec1235d Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Fri, 9 Mar 2018 12:48:01 +0100 Subject: [PATCH] Added congestion control functionality to pacer. This adds the ability to the pacer to apply a congestion window by tracking sent data. This makes it more reliable when the congestion window is small enough to be filled at a high rate as there are less thread context switches that might affect the timing and performance. Outstanding data is not reduced by the pacer as it has no information about acknowledged packet feedback. This is by design as the pacer would also need to keep track of on which connection packets were sent or received, requiring a larger, more complex, change to the pacer. Bug: webrtc:8415 Change-Id: I4ecd303e835552ced042cd21186da910288a8258 Reviewed-on: https://webrtc-review.googlesource.com/51764 Reviewed-by: Philip Eliasson Commit-Queue: Sebastian Jansson Cr-Commit-Position: refs/heads/master@{#22371} --- .../rtp/pacer_controller.cc | 28 ++---- .../rtp/pacer_controller.h | 3 - modules/pacing/paced_sender.cc | 36 +++++-- modules/pacing/paced_sender.h | 10 ++ modules/pacing/paced_sender_unittest.cc | 97 +++++++++++++++++++ 5 files changed, 143 insertions(+), 31 deletions(-) diff --git a/modules/congestion_controller/rtp/pacer_controller.cc b/modules/congestion_controller/rtp/pacer_controller.cc index 1da67e0df9..a577f85e84 100644 --- a/modules/congestion_controller/rtp/pacer_controller.cc +++ b/modules/congestion_controller/rtp/pacer_controller.cc @@ -25,26 +25,22 @@ PacerController::~PacerController() = default; void PacerController::OnCongestionWindow(CongestionWindow congestion_window) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - if (congestion_window.enabled) { - congestion_window_ = congestion_window; - } else { - congestion_window_ = rtc::nullopt; - congested_ = false; - UpdatePacerState(); - } + if (congestion_window.enabled) + pacer_->SetCongestionWindow(congestion_window.data_window.bytes()); + else + pacer_->SetCongestionWindow(PacedSender::kNoCongestionWindow); } void PacerController::OnNetworkAvailability(NetworkAvailability msg) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); network_available_ = msg.network_available; - congested_ = false; - UpdatePacerState(); + pacer_->UpdateOutstandingData(0); + SetPacerState(!msg.network_available); } void PacerController::OnNetworkRouteChange(NetworkRouteChange) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - congested_ = false; - UpdatePacerState(); + pacer_->UpdateOutstandingData(0); } void PacerController::OnPacerConfig(PacerConfig msg) { @@ -62,15 +58,7 @@ void PacerController::OnProbeClusterConfig(ProbeClusterConfig config) { void PacerController::OnOutstandingData(OutstandingData msg) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); - if (congestion_window_.has_value()) { - congested_ = msg.in_flight_data > congestion_window_->data_window; - } - UpdatePacerState(); -} - -void PacerController::UpdatePacerState() { - bool pause = congested_ || !network_available_; - SetPacerState(pause); + pacer_->UpdateOutstandingData(msg.in_flight_data.bytes()); } void PacerController::SetPacerState(bool paused) { diff --git a/modules/congestion_controller/rtp/pacer_controller.h b/modules/congestion_controller/rtp/pacer_controller.h index e25bc6f061..6aa029ca71 100644 --- a/modules/congestion_controller/rtp/pacer_controller.h +++ b/modules/congestion_controller/rtp/pacer_controller.h @@ -38,13 +38,10 @@ class PacerController { void OnProbeClusterConfig(ProbeClusterConfig msg); private: - void UpdatePacerState(); void SetPacerState(bool paused); PacedSender* const pacer_; rtc::Optional current_pacer_config_; - rtc::Optional congestion_window_; - bool congested_ = false; bool pacer_paused_ = false; bool network_available_ = true; diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index c9e30ca08e..afec7d0094 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -33,7 +33,8 @@ namespace { // Time limit in milliseconds between packet bursts. const int64_t kMinPacketLimitMs = 5; -const int64_t kPausedPacketIntervalMs = 500; +const int64_t kCongestedPacketIntervalMs = 500; +const int64_t kPausedProcessIntervalMs = kCongestedPacketIntervalMs; const int64_t kMaxElapsedTimeMs = 2000; // Upper cap on process interval, in case process has not been called in a long @@ -119,6 +120,22 @@ void PacedSender::Resume() { process_thread_->WakeUp(this); } +void PacedSender::SetCongestionWindow(int64_t congestion_window_bytes) { + rtc::CritScope cs(&critsect_); + congestion_window_bytes_ = congestion_window_bytes; +} + +void PacedSender::UpdateOutstandingData(int64_t outstanding_bytes) { + rtc::CritScope cs(&critsect_); + outstanding_bytes_ = outstanding_bytes; +} + +bool PacedSender::Congested() const { + if (congestion_window_bytes_ == kNoCongestionWindow) + return false; + return outstanding_bytes_ >= congestion_window_bytes_; +} + void PacedSender::SetProbingEnabled(bool enabled) { rtc::CritScope cs(&critsect_); RTC_CHECK_EQ(0, packet_counter_); @@ -225,7 +242,7 @@ int64_t PacedSender::TimeUntilNextProcess() { // When paused we wake up every 500 ms to send a padding packet to ensure // we won't get stuck in the paused state due to no feedback being received. if (paused_) - return std::max(kPausedPacketIntervalMs - elapsed_time_ms, 0); + return std::max(kPausedProcessIntervalMs - elapsed_time_ms, 0); if (prober_->IsProbing()) { int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); @@ -246,12 +263,14 @@ void PacedSender::Process() { << kMaxElapsedTimeMs << " ms"; elapsed_time_ms = kMaxElapsedTimeMs; } - // When paused we send a padding packet every 500 ms to ensure we won't get - // stuck in the paused state due to no feedback being received. - if (paused_) { + // When congested we send a padding packet every 500 ms to ensure we won't get + // stuck in the congested state due to no feedback being received. + // TODO(srte): Stop sending packet in paused state when pause is no longer + // used for congestion windows. + if (paused_ || Congested()) { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. - if (elapsed_time_ms >= kPausedPacketIntervalMs && packet_counter_ > 0) { + if (elapsed_time_ms >= kCongestedPacketIntervalMs && packet_counter_ > 0) { PacedPacketInfo pacing_info; size_t bytes_sent = SendPadding(1, pacing_info); alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); @@ -292,7 +311,7 @@ void PacedSender::Process() { } // The paused state is checked in the loop since SendPacket leaves the // critical section allowing the paused state to be changed from other code. - while (!packets_->Empty() && !paused_) { + while (!packets_->Empty() && !paused_ && !Congested()) { // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can // reinsert it if send fails. @@ -311,7 +330,7 @@ void PacedSender::Process() { } } - if (packets_->Empty()) { + if (packets_->Empty() && !Congested()) { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. if (packet_counter_ > 0) { @@ -388,6 +407,7 @@ void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { } void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) { + outstanding_bytes_ += bytes_sent; media_budget_->UseBudget(bytes_sent); padding_budget_->UseBudget(bytes_sent); } diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 179101e3b2..888bedfb5d 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -49,6 +49,7 @@ class PacedSender : public Pacer { protected: virtual ~PacketSender() {} }; + static constexpr int64_t kNoCongestionWindow = -1; // Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than // this value, the packet producers should wait (eg drop frames rather than @@ -81,6 +82,9 @@ class PacedSender : public Pacer { // Resume sending packets. void Resume(); + void SetCongestionWindow(int64_t congestion_window_bytes); + void UpdateOutstandingData(int64_t outstanding_bytes); + // Enable bitrate probing. Enabled by default, mostly here to simplify // testing. Must be called before any packets are being sent to have an // effect. @@ -153,6 +157,9 @@ class PacedSender : public Pacer { size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + void OnBytesSent(size_t bytes_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + const Clock* const clock_; PacketSender* const packet_sender_; const std::unique_ptr alr_detector_ RTC_PT_GUARDED_BY(critsect_); @@ -186,6 +193,9 @@ class PacedSender : public Pacer { RTC_PT_GUARDED_BY(critsect_); uint64_t packet_counter_ RTC_GUARDED_BY(critsect_); + int64_t congestion_window_bytes_ RTC_GUARDED_BY(critsect_) = + kNoCongestionWindow; + int64_t outstanding_bytes_ RTC_GUARDED_BY(critsect_) = 0; float pacing_factor_ RTC_GUARDED_BY(critsect_); // Lock to avoid race when attaching process thread. This can happen due to // the Call class setting network state on SendSideCongestionController, which diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index b46c358a1e..44914b7c63 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -562,6 +562,103 @@ TEST_F(PacedSenderTest, HighPrioDoesntAffectBudget) { EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); } +TEST_F(PacedSenderTest, SendsOnlyPaddingWhenCongested) { + uint32_t ssrc = 202020; + uint16_t sequence_number = 1000; + int kPacketSize = 250; + int kCongestionWindow = kPacketSize * 10; + + send_bucket_->UpdateOutstandingData(0); + send_bucket_->SetCongestionWindow(kCongestionWindow); + int sent_data = 0; + while (sent_data < kCongestionWindow) { + sent_data += kPacketSize; + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + + size_t blocked_packets = 0; + int64_t expected_time_until_padding = 500; + while (expected_time_until_padding > 5) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + blocked_packets++; + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + expected_time_until_padding -= 5; + } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + EXPECT_EQ(blocked_packets, send_bucket_->QueueSizePackets()); +} + +TEST_F(PacedSenderTest, ResumesSendingWhenCongestionEnds) { + uint32_t ssrc = 202020; + uint16_t sequence_number = 1000; + int64_t kPacketSize = 250; + int64_t kCongestionCount = 10; + int64_t kCongestionWindow = kPacketSize * kCongestionCount; + int64_t kCongestionTimeMs = 1000; + + send_bucket_->UpdateOutstandingData(0); + send_bucket_->SetCongestionWindow(kCongestionWindow); + int sent_data = 0; + while (sent_data < kCongestionWindow) { + sent_data += kPacketSize; + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)).Times(0); + int unacked_packets = 0; + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + unacked_packets++; + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + testing::Mock::VerifyAndClearExpectations(&callback_); + + // First mark half of the congested packets as cleared and make sure that just + // as many are sent + int ack_count = kCongestionCount / 2; + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _)) + .Times(ack_count) + .WillRepeatedly(Return(true)); + send_bucket_->UpdateOutstandingData(kCongestionWindow - + kPacketSize * ack_count); + + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } + unacked_packets -= ack_count; + testing::Mock::VerifyAndClearExpectations(&callback_); + + // Second make sure all packets are sent if sent packets are continuously + // marked as acked. + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _)) + .Times(unacked_packets) + .WillRepeatedly(Return(true)); + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + send_bucket_->UpdateOutstandingData(0); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + } +} + TEST_F(PacedSenderTest, Pause) { uint32_t ssrc_low_priority = 12345; uint32_t ssrc = 12346;