Optimizes thread usage with task queue pacer.

The TaskQueuePacedSender today has some inefficiencies:
* Enqueuing a packet will trigger a MaybeProcessPackets() call, but it
  won't actually run immediately even if it should - instead it will
  schedule a new call in at least 1ms. This incurs delays and extra
  CPU overhead.
* Sometimes thread wakeups are scheduled simply in order to do
  book-keeping: ProcessPackets() will be called when the media debt has
  gone down to 0 even if there is no packet in the queue, in order to
  check if we should send padding.

This CL fixes that by called ProcessPackets() immediately if it is
actually time to do so, and by immediately determining when padding
should be sent without having a separate call to drain media debt.

Bug: webrtc:10809
Change-Id: I4870e86e6de2ce4197463fd5b788ad4717fc7177
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/172842
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31010}
This commit is contained in:
Erik Språng 2020-04-06 16:30:23 +02:00 committed by Commit Bot
parent d339bde338
commit be152f5f9e
4 changed files with 104 additions and 14 deletions

View File

@ -285,7 +285,7 @@ void PacingController::EnqueuePacketInternal(
}
if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() &&
media_debt_.IsZero()) {
NextSendTime() <= now) {
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
UpdateBudgetWithElapsedTime(elapsed_time);
}
@ -360,20 +360,20 @@ Timestamp PacingController::NextSendTime() const {
return last_send_time_ + kCongestedPacketInterval;
}
// Check how long until media buffer has drained. We schedule a call
// for when the last packet in the queue drains as otherwise we may
// be late in starting padding.
if (media_rate_ > DataRate::Zero() &&
(!packet_queue_.Empty() || !media_debt_.IsZero())) {
// Check how long until we can send the next media packet.
if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + media_debt_ / media_rate_);
}
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets.
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
TimeDelta drain_time =
std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_);
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + padding_debt_ / padding_rate_);
last_process_time_ + drain_time);
}
if (send_padding_if_silent_) {

View File

@ -1921,6 +1921,68 @@ TEST_P(PacingControllerTest, AccountsForAudioEnqueuTime) {
EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime);
}
TEST_P(PacingControllerTest, NextSendTimeAccountsForPadding) {
if (PeriodicProcess()) {
// This test applies only when NOT using interval budget.
return;
}
const uint32_t kSsrc = 12345;
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(130);
const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
uint32_t sequnce_number = 1;
// Start with no padding.
pacer_->SetPacingRates(kPacingDataRate, DataRate::Zero());
// Send a single packet.
SendAndExpectPacket(RtpPacketMediaType::kVideo, kSsrc, sequnce_number++,
clock_.TimeInMilliseconds(), kPacketSize.bytes());
pacer_->ProcessPackets();
::testing::Mock::VerifyAndClearExpectations(&callback_);
// With current conditions, no need to wake until next keep-alive.
EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(),
PacingController::kPausedProcessInterval);
// Enqueue a new packet, that can't be sent until previous buffer has
// drained.
SendAndExpectPacket(RtpPacketMediaType::kVideo, kSsrc, sequnce_number++,
clock_.TimeInMilliseconds(), kPacketSize.bytes());
EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime);
clock_.AdvanceTime(kPacketPacingTime);
pacer_->ProcessPackets();
::testing::Mock::VerifyAndClearExpectations(&callback_);
// With current conditions, again no need to wake until next keep-alive.
EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(),
PacingController::kPausedProcessInterval);
// Set a non-zero padding rate. Padding also can't be sent until
// previous debt has cleared. Since padding was disabled before, there
// currently is no padding debt.
pacer_->SetPacingRates(kPacingDataRate, kPacingDataRate / 2);
EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime);
// Advance time, expect padding.
EXPECT_CALL(callback_, SendPadding).WillOnce(Return(kPacketSize.bytes()));
clock_.AdvanceTime(kPacketPacingTime);
pacer_->ProcessPackets();
::testing::Mock::VerifyAndClearExpectations(&callback_);
// Since padding rate is half of pacing rate, next time we can send
// padding is double the packet pacing time.
EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(),
kPacketPacingTime * 2);
// Insert a packet to be sent, this take precedence again.
Send(RtpPacketMediaType::kVideo, kSsrc, sequnce_number++,
clock_.TimeInMilliseconds(), kPacketSize.bytes());
EXPECT_EQ(pacer_->NextSendTime() - clock_.CurrentTime(), kPacketPacingTime);
}
INSTANTIATE_TEST_SUITE_P(
WithAndWithoutIntervalBudget,
PacingControllerTest,

View File

@ -182,19 +182,24 @@ void TaskQueuePacedSender::MaybeProcessPackets(
return;
}
// Normally, run ProcessPackets() only if this is the scheduled task.
// If it is not but it is already time to process and there either is
// no scheduled task or the schedule has shifted forward in time, run
// anyway and clear any schedule.
Timestamp next_process_time = pacing_controller_.NextSendTime();
const Timestamp now = clock_->CurrentTime();
// Run ProcessPackets() only if this is the schedules task, or if there is
// no scheduled task and we need to process immediately.
if ((scheduled_process_time.IsFinite() &&
scheduled_process_time == next_process_time_) ||
(next_process_time_.IsInfinite() &&
pacing_controller_.NextSendTime() <= now)) {
(now >= next_process_time && (next_process_time_.IsInfinite() ||
next_process_time < next_process_time_))) {
pacing_controller_.ProcessPackets();
next_process_time_ = Timestamp::MinusInfinity();
next_process_time = pacing_controller_.NextSendTime();
}
Timestamp next_process_time = std::max(now + PacingController::kMinSleepTime,
pacing_controller_.NextSendTime());
next_process_time =
std::max(now + PacingController::kMinSleepTime, next_process_time);
TimeDelta sleep_time = next_process_time - now;
if (next_process_time_.IsMinusInfinity() ||
next_process_time <=

View File

@ -173,5 +173,28 @@ TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
1.0);
}
TEST_F(TaskQueuePacedSenderTest, SendsAudioImmediately) {
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
pacer_.SetPacingRates(kPacingDataRate, DataRate::Zero());
// Add some initial video packets, only one should be sent.
EXPECT_CALL(packet_router_, SendPacket);
pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
time_controller_.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router_);
// Advance time, but still before next packet should be sent.
time_controller_.AdvanceTime(kPacketPacingTime / 2);
// Insert an audio packet, it should be sent immediately.
EXPECT_CALL(packet_router_, SendPacket);
pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
time_controller_.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router_);
}
} // namespace test
} // namespace webrtc