Added congestion control functionality to pacer.

This adds the ability to the pacer to apply a congestion window by
tracking sent data. This makes it more reliable when the congestion
window is small enough to be filled at a high rate as there are less
thread context switches that might affect the timing and performance.

Outstanding data is not reduced by the pacer as it has no information
about acknowledged packet feedback. This is by design as the pacer would
also need to keep track of on which connection packets were sent or
received, requiring a larger, more complex, change to the pacer.

Bug: webrtc:8415
Change-Id: I4ecd303e835552ced042cd21186da910288a8258
Reviewed-on: https://webrtc-review.googlesource.com/51764
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#22371}
This commit is contained in:
Sebastian Jansson 2018-03-09 12:48:01 +01:00 committed by Commit Bot
parent dae6aad6e7
commit 45d9c1de9c
5 changed files with 143 additions and 31 deletions

View File

@ -25,26 +25,22 @@ PacerController::~PacerController() = default;
void PacerController::OnCongestionWindow(CongestionWindow congestion_window) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
if (congestion_window.enabled) {
congestion_window_ = congestion_window;
} else {
congestion_window_ = rtc::nullopt;
congested_ = false;
UpdatePacerState();
}
if (congestion_window.enabled)
pacer_->SetCongestionWindow(congestion_window.data_window.bytes());
else
pacer_->SetCongestionWindow(PacedSender::kNoCongestionWindow);
}
void PacerController::OnNetworkAvailability(NetworkAvailability msg) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
network_available_ = msg.network_available;
congested_ = false;
UpdatePacerState();
pacer_->UpdateOutstandingData(0);
SetPacerState(!msg.network_available);
}
void PacerController::OnNetworkRouteChange(NetworkRouteChange) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
congested_ = false;
UpdatePacerState();
pacer_->UpdateOutstandingData(0);
}
void PacerController::OnPacerConfig(PacerConfig msg) {
@ -62,15 +58,7 @@ void PacerController::OnProbeClusterConfig(ProbeClusterConfig config) {
void PacerController::OnOutstandingData(OutstandingData msg) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
if (congestion_window_.has_value()) {
congested_ = msg.in_flight_data > congestion_window_->data_window;
}
UpdatePacerState();
}
void PacerController::UpdatePacerState() {
bool pause = congested_ || !network_available_;
SetPacerState(pause);
pacer_->UpdateOutstandingData(msg.in_flight_data.bytes());
}
void PacerController::SetPacerState(bool paused) {

View File

@ -38,13 +38,10 @@ class PacerController {
void OnProbeClusterConfig(ProbeClusterConfig msg);
private:
void UpdatePacerState();
void SetPacerState(bool paused);
PacedSender* const pacer_;
rtc::Optional<PacerConfig> current_pacer_config_;
rtc::Optional<CongestionWindow> congestion_window_;
bool congested_ = false;
bool pacer_paused_ = false;
bool network_available_ = true;

View File

@ -33,7 +33,8 @@
namespace {
// Time limit in milliseconds between packet bursts.
const int64_t kMinPacketLimitMs = 5;
const int64_t kPausedPacketIntervalMs = 500;
const int64_t kCongestedPacketIntervalMs = 500;
const int64_t kPausedProcessIntervalMs = kCongestedPacketIntervalMs;
const int64_t kMaxElapsedTimeMs = 2000;
// Upper cap on process interval, in case process has not been called in a long
@ -119,6 +120,22 @@ void PacedSender::Resume() {
process_thread_->WakeUp(this);
}
void PacedSender::SetCongestionWindow(int64_t congestion_window_bytes) {
rtc::CritScope cs(&critsect_);
congestion_window_bytes_ = congestion_window_bytes;
}
void PacedSender::UpdateOutstandingData(int64_t outstanding_bytes) {
rtc::CritScope cs(&critsect_);
outstanding_bytes_ = outstanding_bytes;
}
bool PacedSender::Congested() const {
if (congestion_window_bytes_ == kNoCongestionWindow)
return false;
return outstanding_bytes_ >= congestion_window_bytes_;
}
void PacedSender::SetProbingEnabled(bool enabled) {
rtc::CritScope cs(&critsect_);
RTC_CHECK_EQ(0, packet_counter_);
@ -225,7 +242,7 @@ int64_t PacedSender::TimeUntilNextProcess() {
// When paused we wake up every 500 ms to send a padding packet to ensure
// we won't get stuck in the paused state due to no feedback being received.
if (paused_)
return std::max<int64_t>(kPausedPacketIntervalMs - elapsed_time_ms, 0);
return std::max<int64_t>(kPausedProcessIntervalMs - elapsed_time_ms, 0);
if (prober_->IsProbing()) {
int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
@ -246,12 +263,14 @@ void PacedSender::Process() {
<< kMaxElapsedTimeMs << " ms";
elapsed_time_ms = kMaxElapsedTimeMs;
}
// When paused we send a padding packet every 500 ms to ensure we won't get
// stuck in the paused state due to no feedback being received.
if (paused_) {
// When congested we send a padding packet every 500 ms to ensure we won't get
// stuck in the congested state due to no feedback being received.
// TODO(srte): Stop sending packet in paused state when pause is no longer
// used for congestion windows.
if (paused_ || Congested()) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (elapsed_time_ms >= kPausedPacketIntervalMs && packet_counter_ > 0) {
if (elapsed_time_ms >= kCongestedPacketIntervalMs && packet_counter_ > 0) {
PacedPacketInfo pacing_info;
size_t bytes_sent = SendPadding(1, pacing_info);
alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms);
@ -292,7 +311,7 @@ void PacedSender::Process() {
}
// The paused state is checked in the loop since SendPacket leaves the
// critical section allowing the paused state to be changed from other code.
while (!packets_->Empty() && !paused_) {
while (!packets_->Empty() && !paused_ && !Congested()) {
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
@ -311,7 +330,7 @@ void PacedSender::Process() {
}
}
if (packets_->Empty()) {
if (packets_->Empty() && !Congested()) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (packet_counter_ > 0) {
@ -388,6 +407,7 @@ void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
}
void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
outstanding_bytes_ += bytes_sent;
media_budget_->UseBudget(bytes_sent);
padding_budget_->UseBudget(bytes_sent);
}

View File

@ -49,6 +49,7 @@ class PacedSender : public Pacer {
protected:
virtual ~PacketSender() {}
};
static constexpr int64_t kNoCongestionWindow = -1;
// Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than
// this value, the packet producers should wait (eg drop frames rather than
@ -81,6 +82,9 @@ class PacedSender : public Pacer {
// Resume sending packets.
void Resume();
void SetCongestionWindow(int64_t congestion_window_bytes);
void UpdateOutstandingData(int64_t outstanding_bytes);
// Enable bitrate probing. Enabled by default, mostly here to simplify
// testing. Must be called before any packets are being sent to have an
// effect.
@ -153,6 +157,9 @@ class PacedSender : public Pacer {
size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnBytesSent(size_t bytes_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
const Clock* const clock_;
PacketSender* const packet_sender_;
const std::unique_ptr<AlrDetector> alr_detector_ RTC_PT_GUARDED_BY(critsect_);
@ -186,6 +193,9 @@ class PacedSender : public Pacer {
RTC_PT_GUARDED_BY(critsect_);
uint64_t packet_counter_ RTC_GUARDED_BY(critsect_);
int64_t congestion_window_bytes_ RTC_GUARDED_BY(critsect_) =
kNoCongestionWindow;
int64_t outstanding_bytes_ RTC_GUARDED_BY(critsect_) = 0;
float pacing_factor_ RTC_GUARDED_BY(critsect_);
// Lock to avoid race when attaching process thread. This can happen due to
// the Call class setting network state on SendSideCongestionController, which

View File

@ -562,6 +562,103 @@ TEST_F(PacedSenderTest, HighPrioDoesntAffectBudget) {
EXPECT_EQ(0u, send_bucket_->QueueSizePackets());
}
TEST_F(PacedSenderTest, SendsOnlyPaddingWhenCongested) {
uint32_t ssrc = 202020;
uint16_t sequence_number = 1000;
int kPacketSize = 250;
int kCongestionWindow = kPacketSize * 10;
send_bucket_->UpdateOutstandingData(0);
send_bucket_->SetCongestionWindow(kCongestionWindow);
int sent_data = 0;
while (sent_data < kCongestionWindow) {
sent_data += kPacketSize;
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize, false);
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
}
testing::Mock::VerifyAndClearExpectations(&callback_);
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)).Times(0);
EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0);
size_t blocked_packets = 0;
int64_t expected_time_until_padding = 500;
while (expected_time_until_padding > 5) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
blocked_packets++;
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
expected_time_until_padding -= 5;
}
testing::Mock::VerifyAndClearExpectations(&callback_);
EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1);
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
EXPECT_EQ(blocked_packets, send_bucket_->QueueSizePackets());
}
TEST_F(PacedSenderTest, ResumesSendingWhenCongestionEnds) {
uint32_t ssrc = 202020;
uint16_t sequence_number = 1000;
int64_t kPacketSize = 250;
int64_t kCongestionCount = 10;
int64_t kCongestionWindow = kPacketSize * kCongestionCount;
int64_t kCongestionTimeMs = 1000;
send_bucket_->UpdateOutstandingData(0);
send_bucket_->SetCongestionWindow(kCongestionWindow);
int sent_data = 0;
while (sent_data < kCongestionWindow) {
sent_data += kPacketSize;
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize, false);
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
}
testing::Mock::VerifyAndClearExpectations(&callback_);
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)).Times(0);
int unacked_packets = 0;
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
unacked_packets++;
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
}
testing::Mock::VerifyAndClearExpectations(&callback_);
// First mark half of the congested packets as cleared and make sure that just
// as many are sent
int ack_count = kCongestionCount / 2;
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _))
.Times(ack_count)
.WillRepeatedly(Return(true));
send_bucket_->UpdateOutstandingData(kCongestionWindow -
kPacketSize * ack_count);
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
}
unacked_packets -= ack_count;
testing::Mock::VerifyAndClearExpectations(&callback_);
// Second make sure all packets are sent if sent packets are continuously
// marked as acked.
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _))
.Times(unacked_packets)
.WillRepeatedly(Return(true));
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
send_bucket_->UpdateOutstandingData(0);
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
}
}
TEST_F(PacedSenderTest, Pause) {
uint32_t ssrc_low_priority = 12345;
uint32_t ssrc = 12346;