From 667343777521c71d022ab7e67839f14e19e5932a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Wed, 16 Mar 2022 14:20:49 +0100 Subject: [PATCH] Move ownership of congestion window state to rtp sender controller. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When congestion window is used, two different mechanisms can currently update the outstanding data state in the pacer: * OnPacketSent() withing the pacer itself, when a packet is sent * UpdateOutstandingData(), when RtpTransportControllerSend either: a. Receives an OnPacketSent() callback (increase outstanding data) b. Receives transport feedback (decrease outstanding data) This creates a lot of calls to UpdateOutstandingData(), more than one per sent packet. Each requires locking and/or thread jumps. To avoid that, this CL moves the congestion window state to RtpTransportController send - and we only post a congested flag down the the pacer when the state is changed. The only benefit I can see is of the old way is we prevent sending new packets immedately when the window is full, rather than in some edge cases queue extra packets on the network task queue before the congestion signal is received. That should be rare and benign. I think this simplified logic, which is easier to read and more performant, is a better tradeoff. Bug: webrtc:13417 Change-Id: I326dd88db86dc0d6dc685c61920654ac024e57ef Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/255600 Auto-Submit: Erik Språng Reviewed-by: Henrik Boström Commit-Queue: Henrik Boström Cr-Commit-Position: refs/heads/main@{#36220} --- call/rtp_transport_controller_send.cc | 31 ++++--- call/rtp_transport_controller_send.h | 4 + modules/pacing/paced_sender.cc | 12 +-- modules/pacing/paced_sender.h | 3 +- modules/pacing/pacing_controller.cc | 38 ++------- modules/pacing/pacing_controller.h | 8 +- modules/pacing/pacing_controller_unittest.cc | 89 +++----------------- modules/pacing/rtp_packet_pacer.h | 3 +- modules/pacing/task_queue_paced_sender.cc | 24 +----- modules/pacing/task_queue_paced_sender.h | 3 +- 10 files changed, 58 insertions(+), 157 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 569ba9d730..2bf24e1994 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -129,6 +129,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()), transport_overhead_bytes_per_packet_(0), network_available_(false), + congestion_window_size_(DataSize::PlusInfinity()), + is_congested_(false), retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs), task_queue_(task_queue_factory->CreateTaskQueue( "rtp_send_controller", @@ -202,6 +204,15 @@ void RtpTransportControllerSend::UpdateControlState() { observer_->OnTargetTransferRate(*update); } +void RtpTransportControllerSend::UpdateCongestedState() { + bool congested = transport_feedback_adapter_.GetOutstandingData() >= + congestion_window_size_; + if (congested != is_congested_) { + is_congested_ = congested; + pacer()->SetCongested(congested); + } +} + RtpPacketPacer* RtpTransportControllerSend::pacer() { if (pacer_settings_.use_task_queue_pacer()) { return task_queue_pacer_.get(); @@ -361,7 +372,8 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( } else { UpdateInitialConstraints(msg.constraints); } - pacer()->UpdateOutstandingData(DataSize::Zero()); + is_congested_ = false; + pacer()->SetCongested(false); }); } } @@ -382,7 +394,8 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) { } else { pacer()->Pause(); } - pacer()->UpdateOutstandingData(DataSize::Zero()); + is_congested_ = false; + pacer()->SetCongested(false); if (controller_) { control_handler_->SetNetworkAvailability(network_available_); @@ -421,12 +434,11 @@ void RtpTransportControllerSend::OnSentPacket( absl::optional packet_msg = transport_feedback_adapter_.ProcessSentPacket(sent_packet); if (packet_msg) { - // Only update outstanding data in pacer if: + // Only update outstanding data if: // 1. Packet feadback is used. // 2. The packet has not yet received an acknowledgement. // 3. It is not a retransmission of an earlier packet. - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + UpdateCongestedState(); if (controller_) PostUpdates(controller_->OnSentPacket(*packet_msg)); } @@ -583,10 +595,8 @@ void RtpTransportControllerSend::OnTransportFeedback( if (controller_) PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); - // Only update outstanding data in pacer if any packet is first time - // acked. - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + // Only update outstanding data if any packet is first time acked. + UpdateCongestedState(); } }); } @@ -678,7 +688,8 @@ void RtpTransportControllerSend::UpdateStreamsConfig() { void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) { if (update.congestion_window) { - pacer()->SetCongestionWindow(*update.congestion_window); + congestion_window_size_ = *update.congestion_window; + UpdateCongestedState(); } if (update.pacer_config) { pacer()->SetPacingRates(update.pacer_config->data_rate(), diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 471ee7f5f4..ba14fdd24f 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -158,6 +158,7 @@ class RtpTransportControllerSend final RTC_RUN_ON(task_queue_); void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_); void UpdateControlState() RTC_RUN_ON(task_queue_); + void UpdateCongestedState() RTC_RUN_ON(task_queue_); RtpPacketPacer* pacer(); const RtpPacketPacer* pacer() const; @@ -211,6 +212,9 @@ class RtpTransportControllerSend final RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); + DataSize congestion_window_size_ RTC_GUARDED_BY(task_queue_); + bool is_congested_ RTC_GUARDED_BY(task_queue_); + // Protected by internal locks. RateLimiter retransmission_rate_limiter_; diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 5eed6c2607..9b419f0380 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -87,18 +87,10 @@ void PacedSender::Resume() { } } -void PacedSender::SetCongestionWindow(DataSize congestion_window_size) { +void PacedSender::SetCongested(bool congested) { { MutexLock lock(&mutex_); - pacing_controller_.SetCongestionWindow(congestion_window_size); - } - MaybeWakupProcessThread(); -} - -void PacedSender::UpdateOutstandingData(DataSize outstanding_data) { - { - MutexLock lock(&mutex_); - pacing_controller_.UpdateOutstandingData(outstanding_data); + pacing_controller_.SetCongested(congested); } MaybeWakupProcessThread(); } diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 88fd79697d..bf82e78dbe 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -80,8 +80,7 @@ class PacedSender : public RtpPacketPacer, public RtpPacketSender { // Resume sending packets. void Resume() override; - void SetCongestionWindow(DataSize congestion_window_size) override; - void UpdateOutstandingData(DataSize outstanding_data) override; + void SetCongested(bool congested) override; // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override; diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 9bfe85c0a8..c9628e0eb5 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -129,8 +129,7 @@ PacingController::PacingController(Clock* clock, last_send_time_(last_process_time_), packet_queue_(last_process_time_), packet_counter_(0), - congestion_window_size_(DataSize::PlusInfinity()), - outstanding_data_(DataSize::Zero()), + congested_(false), queue_time_limit(kMaxExpectedQueueLength), account_for_audio_(false), include_overhead_(false) { @@ -169,29 +168,11 @@ bool PacingController::IsPaused() const { return paused_; } -void PacingController::SetCongestionWindow(DataSize congestion_window_size) { - const bool was_congested = Congested(); - congestion_window_size_ = congestion_window_size; - if (was_congested && !Congested()) { - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime()); - UpdateBudgetWithElapsedTime(elapsed_time); +void PacingController::SetCongested(bool congested) { + if (congested_ && !congested) { + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(CurrentTime())); } -} - -void PacingController::UpdateOutstandingData(DataSize outstanding_data) { - const bool was_congested = Congested(); - outstanding_data_ = outstanding_data; - if (was_congested && !Congested()) { - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime()); - UpdateBudgetWithElapsedTime(elapsed_time); - } -} - -bool PacingController::Congested() const { - if (congestion_window_size_.IsFinite()) { - return outstanding_data_ >= congestion_window_size_; - } - return false; + congested_ = congested; } bool PacingController::IsProbing() const { @@ -327,7 +308,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { } bool PacingController::ShouldSendKeepalive(Timestamp now) const { - if (send_padding_if_silent_ || paused_ || Congested() || + if (send_padding_if_silent_ || paused_ || congested_ || packet_counter_ == 0) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. @@ -373,7 +354,7 @@ Timestamp PacingController::NextSendTime() const { } } - if (Congested() || packet_counter_ == 0) { + if (congested_ || packet_counter_ == 0) { // We need to at least send keep-alive packets with some interval. return last_send_time_ + kCongestedPacketInterval; } @@ -623,7 +604,7 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, return DataSize::Zero(); } - if (Congested()) { + if (congested_) { // Don't add padding if congested, even if requested for probing. return DataSize::Zero(); } @@ -665,7 +646,7 @@ std::unique_ptr PacingController::GetPendingPacket( !pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value(); bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe; if (!unpaced_audio_packet && !is_probe) { - if (Congested()) { + if (congested_) { // Don't send anything if congested. return nullptr; } @@ -728,7 +709,6 @@ void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { } void PacingController::UpdateBudgetWithSentData(DataSize size) { - outstanding_data_ += size; if (mode_ == ProcessMode::kPeriodic) { media_budget_.UseBudget(size.bytes()); padding_budget_.UseBudget(size.bytes()); diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index f7c5601c91..d0c2e731ff 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -97,8 +97,7 @@ class PacingController { void Resume(); // Resume sending packets. bool IsPaused() const; - void SetCongestionWindow(DataSize congestion_window_size); - void UpdateOutstandingData(DataSize outstanding_data); + void SetCongested(bool congested); // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate); @@ -145,8 +144,6 @@ class PacingController { // is available. void ProcessPackets(); - bool Congested() const; - bool IsProbing() const; private: @@ -225,8 +222,7 @@ class PacingController { RoundRobinPacketQueue packet_queue_; uint64_t packet_counter_; - DataSize congestion_window_size_; - DataSize outstanding_data_; + bool congested_; TimeDelta queue_time_limit; bool account_for_audio_; diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index af2ce548e0..c3ab266853 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -399,12 +399,11 @@ TEST_P(PacingControllerFieldTrialTest, CongestionWindowAffectsAudioInTrial) { EXPECT_CALL(callback_, SendPadding).Times(0); PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam()); pacer.SetPacingRates(DataRate::KilobitsPerSec(10000), DataRate::Zero()); - pacer.SetCongestionWindow(DataSize::Bytes(video.packet_size - 100)); - pacer.UpdateOutstandingData(DataSize::Zero()); // Video packet fills congestion window. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); + pacer.SetCongested(true); // Audio packet blocked due to congestion. InsertPacket(&pacer, &audio); EXPECT_CALL(callback_, SendPacket).Times(0); @@ -416,7 +415,7 @@ TEST_P(PacingControllerFieldTrialTest, CongestionWindowAffectsAudioInTrial) { ProcessNext(&pacer); // Audio packet unblocked when congestion window clear. ::testing::Mock::VerifyAndClearExpectations(&callback_); - pacer.UpdateOutstandingData(DataSize::Zero()); + pacer.SetCongested(false); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); } @@ -427,12 +426,11 @@ TEST_P(PacingControllerFieldTrialTest, const test::ExplicitKeyValueConfig trials(""); PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam()); pacer.SetPacingRates(DataRate::BitsPerSec(10000000), DataRate::Zero()); - pacer.SetCongestionWindow(DataSize::Bytes(800)); - pacer.UpdateOutstandingData(DataSize::Zero()); // Video packet fills congestion window. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); + pacer.SetCongested(true); // Audio not blocked due to congestion. InsertPacket(&pacer, &audio); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -1062,21 +1060,18 @@ TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) { uint32_t ssrc = 202020; uint16_t sequence_number = 1000; int kPacketSize = 250; - int kCongestionWindow = kPacketSize * 10; - pacer_->UpdateOutstandingData(DataSize::Zero()); - pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow)); - int sent_data = 0; - while (sent_data < kCongestionWindow) { - sent_data += kPacketSize; - SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), kPacketSize); - AdvanceTimeAndProcess(); - } + // Send an initial packet so we have a last send time. + SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + AdvanceTimeAndProcess(); ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // Set congested state, we should not send anything until the 500ms since + // last send time limit for keep-alives is triggered. EXPECT_CALL(callback_, SendPacket).Times(0); EXPECT_CALL(callback_, SendPadding).Times(0); - + pacer_->SetCongested(true); size_t blocked_packets = 0; int64_t expected_time_until_padding = 500; while (expected_time_until_padding > 5) { @@ -1087,6 +1082,7 @@ TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) { pacer_->ProcessPackets(); expected_time_until_padding -= 5; } + ::testing::Mock::VerifyAndClearExpectations(&callback_); EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); @@ -1105,15 +1101,13 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { // to be sent in a row. pacer_->SetPacingRates(DataRate::BitsPerSec(400 * 8 * 1000 / 5), DataRate::Zero()); - // The congestion window is small enough to only let one packet through. - pacer_->SetCongestionWindow(DataSize::Bytes(800)); - pacer_->UpdateOutstandingData(DataSize::Zero()); // Not yet budget limited or congested, packet is sent. Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(1); clock_.AdvanceTimeMilliseconds(5); pacer_->ProcessPackets(); // Packet blocked due to congestion. + pacer_->SetCongested(true); Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(0); clock_.AdvanceTimeMilliseconds(5); @@ -1127,7 +1121,7 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(1); clock_.AdvanceTimeMilliseconds(5); - pacer_->UpdateOutstandingData(DataSize::Zero()); + pacer_->SetCongested(false); pacer_->ProcessPackets(); // Should be blocked due to budget limitation as congestion has be removed. Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); @@ -1136,61 +1130,6 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { pacer_->ProcessPackets(); } -TEST_P(PacingControllerTest, ResumesSendingWhenCongestionEnds) { - uint32_t ssrc = 202020; - uint16_t sequence_number = 1000; - int64_t kPacketSize = 250; - int64_t kCongestionCount = 10; - int64_t kCongestionWindow = kPacketSize * kCongestionCount; - int64_t kCongestionTimeMs = 1000; - - pacer_->UpdateOutstandingData(DataSize::Zero()); - pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow)); - int sent_data = 0; - while (sent_data < kCongestionWindow) { - sent_data += kPacketSize; - SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), kPacketSize); - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } - ::testing::Mock::VerifyAndClearExpectations(&callback_); - EXPECT_CALL(callback_, SendPacket).Times(0); - int unacked_packets = 0; - for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { - Send(RtpPacketMediaType::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), kPacketSize); - unacked_packets++; - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } - ::testing::Mock::VerifyAndClearExpectations(&callback_); - - // First mark half of the congested packets as cleared and make sure that just - // as many are sent - int ack_count = kCongestionCount / 2; - EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count); - pacer_->UpdateOutstandingData( - DataSize::Bytes(kCongestionWindow - kPacketSize * ack_count)); - - for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } - unacked_packets -= ack_count; - ::testing::Mock::VerifyAndClearExpectations(&callback_); - - // Second make sure all packets are sent if sent packets are continuously - // marked as acked. - EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)) - .Times(unacked_packets); - for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { - pacer_->UpdateOutstandingData(DataSize::Zero()); - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } -} - TEST_P(PacingControllerTest, Pause) { uint32_t ssrc_low_priority = 12345; uint32_t ssrc = 12346; diff --git a/modules/pacing/rtp_packet_pacer.h b/modules/pacing/rtp_packet_pacer.h index 3dc2b27612..a201838858 100644 --- a/modules/pacing/rtp_packet_pacer.h +++ b/modules/pacing/rtp_packet_pacer.h @@ -34,8 +34,7 @@ class RtpPacketPacer { // Resume sending packets. virtual void Resume() = 0; - virtual void SetCongestionWindow(DataSize congestion_window_size) = 0; - virtual void UpdateOutstandingData(DataSize outstanding_data) = 0; + virtual void SetCongested(bool congested) = 0; // Sets the pacing rates. Must be called once before packets can be sent. virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0; diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index 620a54135e..c2b376cfa7 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -105,28 +105,10 @@ void TaskQueuePacedSender::Resume() { }); } -void TaskQueuePacedSender::SetCongestionWindow( - DataSize congestion_window_size) { - task_queue_.PostTask([this, congestion_window_size]() { +void TaskQueuePacedSender::SetCongested(bool congested) { + task_queue_.PostTask([this, congested]() { RTC_DCHECK_RUN_ON(&task_queue_); - pacing_controller_.SetCongestionWindow(congestion_window_size); - MaybeProcessPackets(Timestamp::MinusInfinity()); - }); -} - -void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) { - if (task_queue_.IsCurrent()) { - RTC_DCHECK_RUN_ON(&task_queue_); - // Fast path since this can be called once per sent packet while on the - // task queue. - pacing_controller_.UpdateOutstandingData(outstanding_data); - MaybeProcessPackets(Timestamp::MinusInfinity()); - return; - } - - task_queue_.PostTask([this, outstanding_data]() { - RTC_DCHECK_RUN_ON(&task_queue_); - pacing_controller_.UpdateOutstandingData(outstanding_data); + pacing_controller_.SetCongested(congested); MaybeProcessPackets(Timestamp::MinusInfinity()); }); } diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 61a625521d..33d7b5e3f4 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -86,8 +86,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // Resume sending packets. void Resume() override; - void SetCongestionWindow(DataSize congestion_window_size) override; - void UpdateOutstandingData(DataSize outstanding_data) override; + void SetCongested(bool congested) override; // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;