Move ownership of congestion window state to rtp sender controller.

When congestion window is used, two different mechanisms can currently
update the outstanding data state in the pacer:
* OnPacketSent() withing the pacer itself, when a packet is sent
* UpdateOutstandingData(), when RtpTransportControllerSend either:
  a. Receives an OnPacketSent() callback (increase outstanding data)
  b. Receives transport feedback (decrease outstanding data)

This creates a lot of calls to UpdateOutstandingData(), more than one
per sent packet. Each requires locking and/or thread jumps. To avoid
that, this CL moves the congestion window state to
RtpTransportController send - and we only post a congested flag down
the the pacer when the state is changed.

The only benefit I can see is of the old way is we prevent sending
new packets immedately when the window is full, rather than in some
edge cases queue extra packets on the network task queue before the
congestion signal is received. That should be rare and benign.
I think this simplified logic, which is easier to read and more
performant, is a better tradeoff.

Bug: webrtc:13417
Change-Id: I326dd88db86dc0d6dc685c61920654ac024e57ef
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/255600
Auto-Submit: Erik Språng <sprang@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36220}
This commit is contained in:
Erik Språng 2022-03-16 14:20:49 +01:00 committed by WebRTC LUCI CQ
parent e66e6a845b
commit 6673437775
10 changed files with 58 additions and 157 deletions

View File

@ -129,6 +129,8 @@ RtpTransportControllerSend::RtpTransportControllerSend(
relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()),
transport_overhead_bytes_per_packet_(0),
network_available_(false),
congestion_window_size_(DataSize::PlusInfinity()),
is_congested_(false),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
task_queue_(task_queue_factory->CreateTaskQueue(
"rtp_send_controller",
@ -202,6 +204,15 @@ void RtpTransportControllerSend::UpdateControlState() {
observer_->OnTargetTransferRate(*update);
}
void RtpTransportControllerSend::UpdateCongestedState() {
bool congested = transport_feedback_adapter_.GetOutstandingData() >=
congestion_window_size_;
if (congested != is_congested_) {
is_congested_ = congested;
pacer()->SetCongested(congested);
}
}
RtpPacketPacer* RtpTransportControllerSend::pacer() {
if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get();
@ -361,7 +372,8 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
} else {
UpdateInitialConstraints(msg.constraints);
}
pacer()->UpdateOutstandingData(DataSize::Zero());
is_congested_ = false;
pacer()->SetCongested(false);
});
}
}
@ -382,7 +394,8 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
} else {
pacer()->Pause();
}
pacer()->UpdateOutstandingData(DataSize::Zero());
is_congested_ = false;
pacer()->SetCongested(false);
if (controller_) {
control_handler_->SetNetworkAvailability(network_available_);
@ -421,12 +434,11 @@ void RtpTransportControllerSend::OnSentPacket(
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (packet_msg) {
// Only update outstanding data in pacer if:
// Only update outstanding data if:
// 1. Packet feadback is used.
// 2. The packet has not yet received an acknowledgement.
// 3. It is not a retransmission of an earlier packet.
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
UpdateCongestedState();
if (controller_)
PostUpdates(controller_->OnSentPacket(*packet_msg));
}
@ -583,10 +595,8 @@ void RtpTransportControllerSend::OnTransportFeedback(
if (controller_)
PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
// Only update outstanding data in pacer if any packet is first time
// acked.
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
// Only update outstanding data if any packet is first time acked.
UpdateCongestedState();
}
});
}
@ -678,7 +688,8 @@ void RtpTransportControllerSend::UpdateStreamsConfig() {
void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
if (update.congestion_window) {
pacer()->SetCongestionWindow(*update.congestion_window);
congestion_window_size_ = *update.congestion_window;
UpdateCongestedState();
}
if (update.pacer_config) {
pacer()->SetPacingRates(update.pacer_config->data_rate(),

View File

@ -158,6 +158,7 @@ class RtpTransportControllerSend final
RTC_RUN_ON(task_queue_);
void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_);
void UpdateControlState() RTC_RUN_ON(task_queue_);
void UpdateCongestedState() RTC_RUN_ON(task_queue_);
RtpPacketPacer* pacer();
const RtpPacketPacer* pacer() const;
@ -211,6 +212,9 @@ class RtpTransportControllerSend final
RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_);
RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_);
DataSize congestion_window_size_ RTC_GUARDED_BY(task_queue_);
bool is_congested_ RTC_GUARDED_BY(task_queue_);
// Protected by internal locks.
RateLimiter retransmission_rate_limiter_;

View File

@ -87,18 +87,10 @@ void PacedSender::Resume() {
}
}
void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
void PacedSender::SetCongested(bool congested) {
{
MutexLock lock(&mutex_);
pacing_controller_.SetCongestionWindow(congestion_window_size);
}
MaybeWakupProcessThread();
}
void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
{
MutexLock lock(&mutex_);
pacing_controller_.UpdateOutstandingData(outstanding_data);
pacing_controller_.SetCongested(congested);
}
MaybeWakupProcessThread();
}

View File

@ -80,8 +80,7 @@ class PacedSender : public RtpPacketPacer, public RtpPacketSender {
// Resume sending packets.
void Resume() override;
void SetCongestionWindow(DataSize congestion_window_size) override;
void UpdateOutstandingData(DataSize outstanding_data) override;
void SetCongested(bool congested) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;

View File

@ -129,8 +129,7 @@ PacingController::PacingController(Clock* clock,
last_send_time_(last_process_time_),
packet_queue_(last_process_time_),
packet_counter_(0),
congestion_window_size_(DataSize::PlusInfinity()),
outstanding_data_(DataSize::Zero()),
congested_(false),
queue_time_limit(kMaxExpectedQueueLength),
account_for_audio_(false),
include_overhead_(false) {
@ -169,29 +168,11 @@ bool PacingController::IsPaused() const {
return paused_;
}
void PacingController::SetCongestionWindow(DataSize congestion_window_size) {
const bool was_congested = Congested();
congestion_window_size_ = congestion_window_size;
if (was_congested && !Congested()) {
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime());
UpdateBudgetWithElapsedTime(elapsed_time);
void PacingController::SetCongested(bool congested) {
if (congested_ && !congested) {
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(CurrentTime()));
}
}
void PacingController::UpdateOutstandingData(DataSize outstanding_data) {
const bool was_congested = Congested();
outstanding_data_ = outstanding_data;
if (was_congested && !Congested()) {
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime());
UpdateBudgetWithElapsedTime(elapsed_time);
}
}
bool PacingController::Congested() const {
if (congestion_window_size_.IsFinite()) {
return outstanding_data_ >= congestion_window_size_;
}
return false;
congested_ = congested;
}
bool PacingController::IsProbing() const {
@ -327,7 +308,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
}
bool PacingController::ShouldSendKeepalive(Timestamp now) const {
if (send_padding_if_silent_ || paused_ || Congested() ||
if (send_padding_if_silent_ || paused_ || congested_ ||
packet_counter_ == 0) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
@ -373,7 +354,7 @@ Timestamp PacingController::NextSendTime() const {
}
}
if (Congested() || packet_counter_ == 0) {
if (congested_ || packet_counter_ == 0) {
// We need to at least send keep-alive packets with some interval.
return last_send_time_ + kCongestedPacketInterval;
}
@ -623,7 +604,7 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
return DataSize::Zero();
}
if (Congested()) {
if (congested_) {
// Don't add padding if congested, even if requested for probing.
return DataSize::Zero();
}
@ -665,7 +646,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();
bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;
if (!unpaced_audio_packet && !is_probe) {
if (Congested()) {
if (congested_) {
// Don't send anything if congested.
return nullptr;
}
@ -728,7 +709,6 @@ void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
}
void PacingController::UpdateBudgetWithSentData(DataSize size) {
outstanding_data_ += size;
if (mode_ == ProcessMode::kPeriodic) {
media_budget_.UseBudget(size.bytes());
padding_budget_.UseBudget(size.bytes());

View File

@ -97,8 +97,7 @@ class PacingController {
void Resume(); // Resume sending packets.
bool IsPaused() const;
void SetCongestionWindow(DataSize congestion_window_size);
void UpdateOutstandingData(DataSize outstanding_data);
void SetCongested(bool congested);
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
@ -145,8 +144,6 @@ class PacingController {
// is available.
void ProcessPackets();
bool Congested() const;
bool IsProbing() const;
private:
@ -225,8 +222,7 @@ class PacingController {
RoundRobinPacketQueue packet_queue_;
uint64_t packet_counter_;
DataSize congestion_window_size_;
DataSize outstanding_data_;
bool congested_;
TimeDelta queue_time_limit;
bool account_for_audio_;

View File

@ -399,12 +399,11 @@ TEST_P(PacingControllerFieldTrialTest, CongestionWindowAffectsAudioInTrial) {
EXPECT_CALL(callback_, SendPadding).Times(0);
PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam());
pacer.SetPacingRates(DataRate::KilobitsPerSec(10000), DataRate::Zero());
pacer.SetCongestionWindow(DataSize::Bytes(video.packet_size - 100));
pacer.UpdateOutstandingData(DataSize::Zero());
// Video packet fills congestion window.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
ProcessNext(&pacer);
pacer.SetCongested(true);
// Audio packet blocked due to congestion.
InsertPacket(&pacer, &audio);
EXPECT_CALL(callback_, SendPacket).Times(0);
@ -416,7 +415,7 @@ TEST_P(PacingControllerFieldTrialTest, CongestionWindowAffectsAudioInTrial) {
ProcessNext(&pacer);
// Audio packet unblocked when congestion window clear.
::testing::Mock::VerifyAndClearExpectations(&callback_);
pacer.UpdateOutstandingData(DataSize::Zero());
pacer.SetCongested(false);
EXPECT_CALL(callback_, SendPacket).Times(1);
ProcessNext(&pacer);
}
@ -427,12 +426,11 @@ TEST_P(PacingControllerFieldTrialTest,
const test::ExplicitKeyValueConfig trials("");
PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam());
pacer.SetPacingRates(DataRate::BitsPerSec(10000000), DataRate::Zero());
pacer.SetCongestionWindow(DataSize::Bytes(800));
pacer.UpdateOutstandingData(DataSize::Zero());
// Video packet fills congestion window.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
ProcessNext(&pacer);
pacer.SetCongested(true);
// Audio not blocked due to congestion.
InsertPacket(&pacer, &audio);
EXPECT_CALL(callback_, SendPacket).Times(1);
@ -1062,21 +1060,18 @@ TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) {
uint32_t ssrc = 202020;
uint16_t sequence_number = 1000;
int kPacketSize = 250;
int kCongestionWindow = kPacketSize * 10;
pacer_->UpdateOutstandingData(DataSize::Zero());
pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow));
int sent_data = 0;
while (sent_data < kCongestionWindow) {
sent_data += kPacketSize;
// Send an initial packet so we have a last send time.
SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
AdvanceTimeAndProcess();
}
::testing::Mock::VerifyAndClearExpectations(&callback_);
// Set congested state, we should not send anything until the 500ms since
// last send time limit for keep-alives is triggered.
EXPECT_CALL(callback_, SendPacket).Times(0);
EXPECT_CALL(callback_, SendPadding).Times(0);
pacer_->SetCongested(true);
size_t blocked_packets = 0;
int64_t expected_time_until_padding = 500;
while (expected_time_until_padding > 5) {
@ -1087,6 +1082,7 @@ TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) {
pacer_->ProcessPackets();
expected_time_until_padding -= 5;
}
::testing::Mock::VerifyAndClearExpectations(&callback_);
EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1));
EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1);
@ -1105,15 +1101,13 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) {
// to be sent in a row.
pacer_->SetPacingRates(DataRate::BitsPerSec(400 * 8 * 1000 / 5),
DataRate::Zero());
// The congestion window is small enough to only let one packet through.
pacer_->SetCongestionWindow(DataSize::Bytes(800));
pacer_->UpdateOutstandingData(DataSize::Zero());
// Not yet budget limited or congested, packet is sent.
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(1);
clock_.AdvanceTimeMilliseconds(5);
pacer_->ProcessPackets();
// Packet blocked due to congestion.
pacer_->SetCongested(true);
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(0);
clock_.AdvanceTimeMilliseconds(5);
@ -1127,7 +1121,7 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) {
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(1);
clock_.AdvanceTimeMilliseconds(5);
pacer_->UpdateOutstandingData(DataSize::Zero());
pacer_->SetCongested(false);
pacer_->ProcessPackets();
// Should be blocked due to budget limitation as congestion has be removed.
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
@ -1136,61 +1130,6 @@ TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) {
pacer_->ProcessPackets();
}
TEST_P(PacingControllerTest, ResumesSendingWhenCongestionEnds) {
uint32_t ssrc = 202020;
uint16_t sequence_number = 1000;
int64_t kPacketSize = 250;
int64_t kCongestionCount = 10;
int64_t kCongestionWindow = kPacketSize * kCongestionCount;
int64_t kCongestionTimeMs = 1000;
pacer_->UpdateOutstandingData(DataSize::Zero());
pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow));
int sent_data = 0;
while (sent_data < kCongestionWindow) {
sent_data += kPacketSize;
SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
clock_.AdvanceTimeMilliseconds(5);
pacer_->ProcessPackets();
}
::testing::Mock::VerifyAndClearExpectations(&callback_);
EXPECT_CALL(callback_, SendPacket).Times(0);
int unacked_packets = 0;
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
Send(RtpPacketMediaType::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
unacked_packets++;
clock_.AdvanceTimeMilliseconds(5);
pacer_->ProcessPackets();
}
::testing::Mock::VerifyAndClearExpectations(&callback_);
// First mark half of the congested packets as cleared and make sure that just
// as many are sent
int ack_count = kCongestionCount / 2;
EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count);
pacer_->UpdateOutstandingData(
DataSize::Bytes(kCongestionWindow - kPacketSize * ack_count));
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
clock_.AdvanceTimeMilliseconds(5);
pacer_->ProcessPackets();
}
unacked_packets -= ack_count;
::testing::Mock::VerifyAndClearExpectations(&callback_);
// Second make sure all packets are sent if sent packets are continuously
// marked as acked.
EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _))
.Times(unacked_packets);
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
pacer_->UpdateOutstandingData(DataSize::Zero());
clock_.AdvanceTimeMilliseconds(5);
pacer_->ProcessPackets();
}
}
TEST_P(PacingControllerTest, Pause) {
uint32_t ssrc_low_priority = 12345;
uint32_t ssrc = 12346;

View File

@ -34,8 +34,7 @@ class RtpPacketPacer {
// Resume sending packets.
virtual void Resume() = 0;
virtual void SetCongestionWindow(DataSize congestion_window_size) = 0;
virtual void UpdateOutstandingData(DataSize outstanding_data) = 0;
virtual void SetCongested(bool congested) = 0;
// Sets the pacing rates. Must be called once before packets can be sent.
virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0;

View File

@ -105,28 +105,10 @@ void TaskQueuePacedSender::Resume() {
});
}
void TaskQueuePacedSender::SetCongestionWindow(
DataSize congestion_window_size) {
task_queue_.PostTask([this, congestion_window_size]() {
void TaskQueuePacedSender::SetCongested(bool congested) {
task_queue_.PostTask([this, congested]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetCongestionWindow(congestion_window_size);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
if (task_queue_.IsCurrent()) {
RTC_DCHECK_RUN_ON(&task_queue_);
// Fast path since this can be called once per sent packet while on the
// task queue.
pacing_controller_.UpdateOutstandingData(outstanding_data);
MaybeProcessPackets(Timestamp::MinusInfinity());
return;
}
task_queue_.PostTask([this, outstanding_data]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.UpdateOutstandingData(outstanding_data);
pacing_controller_.SetCongested(congested);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}

View File

@ -86,8 +86,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// Resume sending packets.
void Resume() override;
void SetCongestionWindow(DataSize congestion_window_size) override;
void UpdateOutstandingData(DataSize outstanding_data) override;
void SetCongested(bool congested) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;