diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 569ba9d730..2bf24e1994 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -129,6 +129,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()), transport_overhead_bytes_per_packet_(0), network_available_(false), + congestion_window_size_(DataSize::PlusInfinity()), + is_congested_(false), retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs), task_queue_(task_queue_factory->CreateTaskQueue( "rtp_send_controller", @@ -202,6 +204,15 @@ void RtpTransportControllerSend::UpdateControlState() { observer_->OnTargetTransferRate(*update); } +void RtpTransportControllerSend::UpdateCongestedState() { + bool congested = transport_feedback_adapter_.GetOutstandingData() >= + congestion_window_size_; + if (congested != is_congested_) { + is_congested_ = congested; + pacer()->SetCongested(congested); + } +} + RtpPacketPacer* RtpTransportControllerSend::pacer() { if (pacer_settings_.use_task_queue_pacer()) { return task_queue_pacer_.get(); @@ -361,7 +372,8 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( } else { UpdateInitialConstraints(msg.constraints); } - pacer()->UpdateOutstandingData(DataSize::Zero()); + is_congested_ = false; + pacer()->SetCongested(false); }); } } @@ -382,7 +394,8 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) { } else { pacer()->Pause(); } - pacer()->UpdateOutstandingData(DataSize::Zero()); + is_congested_ = false; + pacer()->SetCongested(false); if (controller_) { control_handler_->SetNetworkAvailability(network_available_); @@ -421,12 +434,11 @@ void RtpTransportControllerSend::OnSentPacket( absl::optional packet_msg = transport_feedback_adapter_.ProcessSentPacket(sent_packet); if (packet_msg) { - // Only update outstanding data in pacer if: + // Only update outstanding data if: // 1. Packet feadback is used. // 2. The packet has not yet received an acknowledgement. // 3. It is not a retransmission of an earlier packet. - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + UpdateCongestedState(); if (controller_) PostUpdates(controller_->OnSentPacket(*packet_msg)); } @@ -583,10 +595,8 @@ void RtpTransportControllerSend::OnTransportFeedback( if (controller_) PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); - // Only update outstanding data in pacer if any packet is first time - // acked. - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + // Only update outstanding data if any packet is first time acked. + UpdateCongestedState(); } }); } @@ -678,7 +688,8 @@ void RtpTransportControllerSend::UpdateStreamsConfig() { void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) { if (update.congestion_window) { - pacer()->SetCongestionWindow(*update.congestion_window); + congestion_window_size_ = *update.congestion_window; + UpdateCongestedState(); } if (update.pacer_config) { pacer()->SetPacingRates(update.pacer_config->data_rate(), diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 471ee7f5f4..ba14fdd24f 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -158,6 +158,7 @@ class RtpTransportControllerSend final RTC_RUN_ON(task_queue_); void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_); void UpdateControlState() RTC_RUN_ON(task_queue_); + void UpdateCongestedState() RTC_RUN_ON(task_queue_); RtpPacketPacer* pacer(); const RtpPacketPacer* pacer() const; @@ -211,6 +212,9 @@ class RtpTransportControllerSend final RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); + DataSize congestion_window_size_ RTC_GUARDED_BY(task_queue_); + bool is_congested_ RTC_GUARDED_BY(task_queue_); + // Protected by internal locks. RateLimiter retransmission_rate_limiter_; diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 5eed6c2607..9b419f0380 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -87,18 +87,10 @@ void PacedSender::Resume() { } } -void PacedSender::SetCongestionWindow(DataSize congestion_window_size) { +void PacedSender::SetCongested(bool congested) { { MutexLock lock(&mutex_); - pacing_controller_.SetCongestionWindow(congestion_window_size); - } - MaybeWakupProcessThread(); -} - -void PacedSender::UpdateOutstandingData(DataSize outstanding_data) { - { - MutexLock lock(&mutex_); - pacing_controller_.UpdateOutstandingData(outstanding_data); + pacing_controller_.SetCongested(congested); } MaybeWakupProcessThread(); } diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 88fd79697d..bf82e78dbe 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -80,8 +80,7 @@ class PacedSender : public RtpPacketPacer, public RtpPacketSender { // Resume sending packets. void Resume() override; - void SetCongestionWindow(DataSize congestion_window_size) override; - void UpdateOutstandingData(DataSize outstanding_data) override; + void SetCongested(bool congested) override; // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override; diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 9bfe85c0a8..c9628e0eb5 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -129,8 +129,7 @@ PacingController::PacingController(Clock* clock, last_send_time_(last_process_time_), packet_queue_(last_process_time_), packet_counter_(0), - congestion_window_size_(DataSize::PlusInfinity()), - outstanding_data_(DataSize::Zero()), + congested_(false), queue_time_limit(kMaxExpectedQueueLength), account_for_audio_(false), include_overhead_(false) { @@ -169,29 +168,11 @@ bool PacingController::IsPaused() const { return paused_; } -void PacingController::SetCongestionWindow(DataSize congestion_window_size) { - const bool was_congested = Congested(); - congestion_window_size_ = congestion_window_size; - if (was_congested && !Congested()) { - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime()); - UpdateBudgetWithElapsedTime(elapsed_time); +void PacingController::SetCongested(bool congested) { + if (congested_ && !congested) { + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(CurrentTime())); } -} - -void PacingController::UpdateOutstandingData(DataSize outstanding_data) { - const bool was_congested = Congested(); - outstanding_data_ = outstanding_data; - if (was_congested && !Congested()) { - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime()); - UpdateBudgetWithElapsedTime(elapsed_time); - } -} - -bool PacingController::Congested() const { - if (congestion_window_size_.IsFinite()) { - return outstanding_data_ >= congestion_window_size_; - } - return false; + congested_ = congested; } bool PacingController::IsProbing() const { @@ -327,7 +308,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { } bool PacingController::ShouldSendKeepalive(Timestamp now) const { - if (send_padding_if_silent_ || paused_ || Congested() || + if (send_padding_if_silent_ || paused_ || congested_ || packet_counter_ == 0) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. @@ -373,7 +354,7 @@ Timestamp PacingController::NextSendTime() const { } } - if (Congested() || packet_counter_ == 0) { + if (congested_ || packet_counter_ == 0) { // We need to at least send keep-alive packets with some interval. return last_send_time_ + kCongestedPacketInterval; } @@ -623,7 +604,7 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, return DataSize::Zero(); } - if (Congested()) { + if (congested_) { // Don't add padding if congested, even if requested for probing. return DataSize::Zero(); } @@ -665,7 +646,7 @@ std::unique_ptr PacingController::GetPendingPacket( !pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value(); bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe; if (!unpaced_audio_packet && !is_probe) { - if (Congested()) { + if (congested_) { // Don't send anything if congested. return nullptr; } @@ -728,7 +709,6 @@ void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { } void PacingController::UpdateBudgetWithSentData(DataSize size) { - outstanding_data_ += size; if (mode_ == ProcessMode::kPeriodic) { media_budget_.UseBudget(size.bytes()); padding_budget_.UseBudget(size.bytes()); diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index f7c5601c91..d0c2e731ff 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -97,8 +97,7 @@ class PacingController { void Resume(); // Resume sending packets. bool IsPaused() const; - void SetCongestionWindow(DataSize congestion_window_size); - void UpdateOutstandingData(DataSize outstanding_data); + void SetCongested(bool congested); // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate); @@ -145,8 +144,6 @@ class PacingController { // is available. void ProcessPackets(); - bool Congested() const; - bool IsProbing() const; private: @@ -225,8 +222,7 @@ class PacingController { RoundRobinPacketQueue packet_queue_; uint64_t packet_counter_; - DataSize congestion_window_size_; - DataSize outstanding_data_; + bool congested_; TimeDelta queue_time_limit; bool account_for_audio_; diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index af2ce548e0..c3ab266853 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -399,12 +399,11 @@ TEST_P(PacingControllerFieldTrialTest, CongestionWindowAffectsAudioInTrial) { EXPECT_CALL(callback_, SendPadding).Times(0); PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam()); pacer.SetPacingRates(DataRate::KilobitsPerSec(10000), DataRate::Zero()); - pacer.SetCongestionWindow(DataSize::Bytes(video.packet_size - 100)); - pacer.UpdateOutstandingData(DataSize::Zero()); // Video packet fills congestion window. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); + pacer.SetCongested(true); // Audio packet blocked due to congestion. InsertPacket(&pacer, &audio); EXPECT_CALL(callback_, SendPacket).Times(0); @@ -416,7 +415,7 @@ TEST_P(PacingControllerFieldTrialTest, CongestionWindowAffectsAudioInTrial) { ProcessNext(&pacer); // Audio packet unblocked when congestion window clear. ::testing::Mock::VerifyAndClearExpectations(&callback_); - pacer.UpdateOutstandingData(DataSize::Zero()); + pacer.SetCongested(false); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); } @@ -427,12 +426,11 @@ TEST_P(PacingControllerFieldTrialTest, const test::ExplicitKeyValueConfig trials(""); PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam()); pacer.SetPacingRates(DataRate::BitsPerSec(10000000), DataRate::Zero()); - pacer.SetCongestionWindow(DataSize::Bytes(800)); - pacer.UpdateOutstandingData(DataSize::Zero()); // Video packet fills congestion window. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); + pacer.SetCongested(true); // Audio not blocked due to congestion. InsertPacket(&pacer, &audio); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -1062,21 +1060,18 @@ TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) { uint32_t ssrc = 202020; uint16_t sequence_number = 1000; int kPacketSize = 250; - int kCongestionWindow = kPacketSize * 10; - pacer_->UpdateOutstandingData(DataSize::Zero()); - pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow)); - int sent_data = 0; - while (sent_data < kCongestionWindow) { - sent_data += kPacketSize; - SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), kPacketSize); - AdvanceTimeAndProcess(); - } + // Send an initial packet so we have a last send time. + SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + AdvanceTimeAndProcess(); ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // Set congested state, we should not send anything until the 500ms since + // last send time limit for keep-alives is triggered. EXPECT_CALL(callback_, SendPacket).Times(0); EXPECT_CALL(callback_, SendPadding).Times(0); - + pacer_->SetCongested(true); size_t blocked_packets = 0; int64_t expected_time_until_padding = 500; while (expected_time_until_padding > 5) { @@ -1087,6 +1082,7 @@ TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) { pacer_->ProcessPackets(); expected_time_until_padding -= 5; } + ::testing::Mock::VerifyAndClearExpectations(&callback_); EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); @@ -1105,15 +1101,13 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { // to be sent in a row. pacer_->SetPacingRates(DataRate::BitsPerSec(400 * 8 * 1000 / 5), DataRate::Zero()); - // The congestion window is small enough to only let one packet through. - pacer_->SetCongestionWindow(DataSize::Bytes(800)); - pacer_->UpdateOutstandingData(DataSize::Zero()); // Not yet budget limited or congested, packet is sent. Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(1); clock_.AdvanceTimeMilliseconds(5); pacer_->ProcessPackets(); // Packet blocked due to congestion. + pacer_->SetCongested(true); Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(0); clock_.AdvanceTimeMilliseconds(5); @@ -1127,7 +1121,7 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(1); clock_.AdvanceTimeMilliseconds(5); - pacer_->UpdateOutstandingData(DataSize::Zero()); + pacer_->SetCongested(false); pacer_->ProcessPackets(); // Should be blocked due to budget limitation as congestion has be removed. Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size); @@ -1136,61 +1130,6 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { pacer_->ProcessPackets(); } -TEST_P(PacingControllerTest, ResumesSendingWhenCongestionEnds) { - uint32_t ssrc = 202020; - uint16_t sequence_number = 1000; - int64_t kPacketSize = 250; - int64_t kCongestionCount = 10; - int64_t kCongestionWindow = kPacketSize * kCongestionCount; - int64_t kCongestionTimeMs = 1000; - - pacer_->UpdateOutstandingData(DataSize::Zero()); - pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow)); - int sent_data = 0; - while (sent_data < kCongestionWindow) { - sent_data += kPacketSize; - SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), kPacketSize); - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } - ::testing::Mock::VerifyAndClearExpectations(&callback_); - EXPECT_CALL(callback_, SendPacket).Times(0); - int unacked_packets = 0; - for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { - Send(RtpPacketMediaType::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), kPacketSize); - unacked_packets++; - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } - ::testing::Mock::VerifyAndClearExpectations(&callback_); - - // First mark half of the congested packets as cleared and make sure that just - // as many are sent - int ack_count = kCongestionCount / 2; - EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count); - pacer_->UpdateOutstandingData( - DataSize::Bytes(kCongestionWindow - kPacketSize * ack_count)); - - for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } - unacked_packets -= ack_count; - ::testing::Mock::VerifyAndClearExpectations(&callback_); - - // Second make sure all packets are sent if sent packets are continuously - // marked as acked. - EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)) - .Times(unacked_packets); - for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { - pacer_->UpdateOutstandingData(DataSize::Zero()); - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); - } -} - TEST_P(PacingControllerTest, Pause) { uint32_t ssrc_low_priority = 12345; uint32_t ssrc = 12346; diff --git a/modules/pacing/rtp_packet_pacer.h b/modules/pacing/rtp_packet_pacer.h index 3dc2b27612..a201838858 100644 --- a/modules/pacing/rtp_packet_pacer.h +++ b/modules/pacing/rtp_packet_pacer.h @@ -34,8 +34,7 @@ class RtpPacketPacer { // Resume sending packets. virtual void Resume() = 0; - virtual void SetCongestionWindow(DataSize congestion_window_size) = 0; - virtual void UpdateOutstandingData(DataSize outstanding_data) = 0; + virtual void SetCongested(bool congested) = 0; // Sets the pacing rates. Must be called once before packets can be sent. virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0; diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index 620a54135e..c2b376cfa7 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -105,28 +105,10 @@ void TaskQueuePacedSender::Resume() { }); } -void TaskQueuePacedSender::SetCongestionWindow( - DataSize congestion_window_size) { - task_queue_.PostTask([this, congestion_window_size]() { +void TaskQueuePacedSender::SetCongested(bool congested) { + task_queue_.PostTask([this, congested]() { RTC_DCHECK_RUN_ON(&task_queue_); - pacing_controller_.SetCongestionWindow(congestion_window_size); - MaybeProcessPackets(Timestamp::MinusInfinity()); - }); -} - -void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) { - if (task_queue_.IsCurrent()) { - RTC_DCHECK_RUN_ON(&task_queue_); - // Fast path since this can be called once per sent packet while on the - // task queue. - pacing_controller_.UpdateOutstandingData(outstanding_data); - MaybeProcessPackets(Timestamp::MinusInfinity()); - return; - } - - task_queue_.PostTask([this, outstanding_data]() { - RTC_DCHECK_RUN_ON(&task_queue_); - pacing_controller_.UpdateOutstandingData(outstanding_data); + pacing_controller_.SetCongested(congested); MaybeProcessPackets(Timestamp::MinusInfinity()); }); } diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 61a625521d..33d7b5e3f4 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -86,8 +86,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // Resume sending packets. void Resume() override; - void SetCongestionWindow(DataSize congestion_window_size) override; - void UpdateOutstandingData(DataSize outstanding_data) override; + void SetCongested(bool congested) override; // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;