Don't boost pacing rate after pause.

The pacer has a mechanism to make sure all packets are sent within some
time limit. This is based on the average queue time of the packets in
the pacer queue.

If the pacer is paused while packets are still in the queue (for
instance if the underlying transport goes down temporarily), on resume
all those packets might be past the time limit and thus will all be
burst out onto the network in a tight loop.

This CL subtracts pause time from the queue time, effectively pausing
the clock for the queue while the pacer is paused, so that when we
resume the pacing bitrate will be the same as when we paused.

BUG=webrtc:7694

Review-Url: https://codereview.webrtc.org/2994323002
Cr-Commit-Position: refs/heads/master@{#19367}
This commit is contained in:
sprang 2017-08-16 05:38:49 -07:00 committed by Commit Bot
parent 5b9746ef10
commit ddcfb9fc6a
3 changed files with 105 additions and 10 deletions

View File

@ -55,6 +55,7 @@ struct Packet {
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
sum_paused_ms(0),
bytes(length_in_bytes),
retransmission(retransmission),
enqueue_order(enqueue_order) {}
@ -62,8 +63,9 @@ struct Packet {
RtpPacketSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
int64_t enqueue_time_ms;
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused.
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
@ -96,7 +98,8 @@ class PacketQueue {
: bytes_(0),
clock_(clock),
queue_time_sum_(0),
time_last_updated_(clock_->TimeInMilliseconds()) {}
time_last_updated_(clock_->TimeInMilliseconds()),
paused_(false) {}
virtual ~PacketQueue() {}
void Push(const Packet& packet) {
@ -126,7 +129,11 @@ class PacketQueue {
void FinalizePop(const Packet& packet) {
RemoveFromDupeSet(packet);
bytes_ -= packet.bytes;
queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
packet_queue_time_ms -= packet.sum_paused_ms;
RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
queue_time_sum_ -= packet_queue_time_ms;
packet_list_.erase(packet.this_it);
RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
if (packet_list_.empty())
@ -148,14 +155,34 @@ class PacketQueue {
void UpdateQueueTime(int64_t timestamp_ms) {
RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
int64_t delta = timestamp_ms - time_last_updated_;
// Use packet packet_list_.size() not prio_queue_.size() here, as there
// might be an outstanding element popped from prio_queue_ currently in the
// SendPacket() call, while packet_list_ will always be correct.
queue_time_sum_ += delta * packet_list_.size();
if (timestamp_ms == time_last_updated_)
return;
int64_t delta_ms = timestamp_ms - time_last_updated_;
if (paused_) {
// Increase per-packet accumulators of time spent in queue while paused,
// so that we can disregard that when subtracting main accumulator when
// popping packet from the queue.
for (auto& it : packet_list_) {
it.sum_paused_ms += delta_ms;
}
} else {
// Use packet packet_list_.size() not prio_queue_.size() here, as there
// might be an outstanding element popped from prio_queue_ currently in
// the SendPacket() call, while packet_list_ will always be correct.
queue_time_sum_ += delta_ms * packet_list_.size();
}
time_last_updated_ = timestamp_ms;
}
void SetPauseState(bool paused, int64_t timestamp_ms) {
if (paused_ == paused)
return;
UpdateQueueTime(timestamp_ms);
paused_ = paused;
}
int64_t AverageQueueTimeMs() const {
if (prio_queue_.empty())
return 0;
@ -200,6 +227,7 @@ class PacketQueue {
const Clock* const clock_;
int64_t queue_time_sum_;
int64_t time_last_updated_;
bool paused_;
};
} // namespace paced_sender
@ -243,6 +271,7 @@ void PacedSender::Pause() {
{
rtc::CritScope cs(&critsect_);
paused_ = true;
packets_->SetPauseState(true, clock_->TimeInMilliseconds());
}
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new (longer) estimate for when to call Process().
@ -255,6 +284,7 @@ void PacedSender::Resume() {
{
rtc::CritScope cs(&critsect_);
paused_ = false;
packets_->SetPauseState(false, clock_->TimeInMilliseconds());
}
// Tell the process thread to call our TimeUntilNextProcess() method to
// refresh the estimate for when to call Process().

View File

@ -136,7 +136,8 @@ class PacedSender : public Module, public RtpPacketSender {
virtual rtc::Optional<int64_t> GetApplicationLimitedRegionStartTime() const;
// Returns the average time since being enqueued, in milliseconds, for all
// packets currently in the pacer queue, or 0 if queue is empty.
// packets currently in the pacer queue, excluding any time the pacer has been
// paused. Returns 0 if queue is empty.
virtual int64_t AverageQueueTimeMs();
// Returns the number of milliseconds until the module want a worker thread

View File

@ -1094,5 +1094,69 @@ TEST_F(PacedSenderTest, AvoidBusyLoopOnSendFailure) {
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
}
TEST_F(PacedSenderTest, QueueTimeWithPause) {
const size_t kPacketSize = 1200;
const uint32_t kSsrc = 12346;
uint16_t sequence_number = 1234;
send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
clock_.AdvanceTimeMilliseconds(100);
EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
send_bucket_->Pause();
EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
// In paused state, queue time should not increase.
clock_.AdvanceTimeMilliseconds(100);
EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
send_bucket_->Resume();
EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
clock_.AdvanceTimeMilliseconds(100);
EXPECT_EQ(200, send_bucket_->AverageQueueTimeMs());
}
TEST_F(PacedSenderTest, QueueTimePausedDuringPush) {
const size_t kPacketSize = 1200;
const uint32_t kSsrc = 12346;
uint16_t sequence_number = 1234;
send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
clock_.AdvanceTimeMilliseconds(100);
send_bucket_->Pause();
clock_.AdvanceTimeMilliseconds(100);
EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
// Add a new packet during paused phase.
send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
// From a queue time perspective, packet inserted during pause will have zero
// queue time. Average queue time will then be (0 + 100) / 2 = 50.
EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs());
clock_.AdvanceTimeMilliseconds(100);
EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs());
send_bucket_->Resume();
EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs());
clock_.AdvanceTimeMilliseconds(100);
EXPECT_EQ(150, send_bucket_->AverageQueueTimeMs());
}
// TODO(sprang): Extract PacketQueue from PacedSender so that we can test
// removing elements while paused. (This is possible, but only because of semi-
// racy condition so can't easily be tested).
} // namespace test
} // namespace webrtc