Reland "Pacer: Reduce TQ wake up and improve packet size estimation"

Update `early_execute_margin` after process packets, and the test case.

Original change's description:
>Pacer: Reduce TQ wake up and improve packet size estimation
>
>The TQ Pacer schedules delayed task according to target time of
>PacingController. It drains all valid ProcessPackets() in single loop,
>denies retired scheduled tasks, and round up the timeout to 1ms.
>
>This CL also improves packet size estimation in TQ Pacer by removing
>zero initialization, and introduces `include_overhead_` configuration.
>
>Tests:
>1. webrtc_perf_tests: MaybeProcessPackets() calls
>  2075147 -> 2007995
>
>2. module_unittests: MaybeProcessPackets() calls
>  203393 -> 183563
>
>3. peerconnection_unittests: MaybeProcessPackets() calls
>  66713-> 64333
>
>Bug: webrtc:13417, webrtc:13437
>Change-Id: I18eb0a36dbe063c606b1f27014df74a65ebfc486
>Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/242962
>Reviewed-by: Erik Språng <sprang@webrtc.org>
>Reviewed-by: Henrik Boström <hbos@webrtc.org>
>Commit-Queue: Erik Språng <sprang@webrtc.org>
>Cr-Commit-Position: refs/heads/main@{#36179}

Bug: webrtc:13417, webrtc:13437
Change-Id: I79f2554cf02364b67ce7073698611a3ae337a73b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/256145
Reviewed-by: Erik Språng <sprang@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Owners-Override: Erik Språng <sprang@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36283}
This commit is contained in:
Jianhui Dai 2022-03-19 15:38:51 +08:00 committed by WebRTC LUCI CQ
parent 98274ee948
commit df59e53818
6 changed files with 279 additions and 255 deletions

View File

@ -77,7 +77,8 @@ RtpTransportControllerSend::PacerSettings::PacerSettings(
const WebRtcKeyValueConfig& trials)
: tq_disabled("Disabled"),
holdback_window("holdback_window", PacingController::kMinSleepTime),
holdback_packets("holdback_packets", -1) {
holdback_packets("holdback_packets",
TaskQueuePacedSender::kNoPacketHoldback) {
ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets},
trials.Lookup("WebRTC-TaskQueuePacer"));
}

View File

@ -38,11 +38,6 @@ constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2);
// time. Applies only to periodic mode.
constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis(30);
// Allow probes to be processed slightly ahead of inteded send time. Currently
// set to 1ms as this is intended to allow times be rounded down to the nearest
// millisecond.
constexpr TimeDelta kMaxEarlyProbeProcessing = TimeDelta::Millis(1);
constexpr int kFirstPriority = 0;
bool IsDisabled(const WebRtcKeyValueConfig& field_trials,
@ -94,6 +89,8 @@ const float PacingController::kDefaultPaceMultiplier = 2.5f;
const TimeDelta PacingController::kPausedProcessInterval =
kCongestedPacketInterval;
const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1);
const TimeDelta PacingController::kMaxEarlyProbeProcessing =
TimeDelta::Millis(1);
PacingController::PacingController(Clock* clock,
PacketSender* packet_sender,
@ -129,7 +126,7 @@ PacingController::PacingController(Clock* clock,
packet_queue_(last_process_time_),
packet_counter_(0),
congested_(false),
queue_time_limit(kMaxExpectedQueueLength),
queue_time_limit_(kMaxExpectedQueueLength),
account_for_audio_(false),
include_overhead_(false) {
if (!drain_large_queues_) {
@ -202,6 +199,7 @@ void PacingController::SetPacingRates(DataRate pacing_rate,
media_rate_ = pacing_rate;
padding_rate_ = padding_rate;
pacing_bitrate_ = pacing_rate;
media_budget_.set_target_rate_kbps(pacing_rate.kbps());
padding_budget_.set_target_rate_kbps(padding_rate.kbps());
RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps="
@ -280,10 +278,7 @@ void PacingController::EnqueuePacketInternal(
// Use that as last process time only if it's prior to now.
target_process_time = std::min(now, next_send_time);
}
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_process_time);
UpdateBudgetWithElapsedTime(elapsed_time);
last_process_time_ = target_process_time;
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
@ -294,7 +289,6 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
if (last_process_time_.IsMinusInfinity() || now < last_process_time_) {
return TimeDelta::Zero();
}
RTC_DCHECK_GE(now, last_process_time_);
TimeDelta elapsed_time = now - last_process_time_;
last_process_time_ = now;
if (elapsed_time > kMaxElapsedTime) {
@ -311,8 +305,7 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const {
packet_counter_ == 0) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
TimeDelta elapsed_since_last_send = now - last_send_time_;
if (elapsed_since_last_send >= kCongestedPacketInterval) {
if (now - last_send_time_ >= kCongestedPacketInterval) {
return true;
}
}
@ -321,17 +314,17 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const {
Timestamp PacingController::NextSendTime() const {
const Timestamp now = CurrentTime();
Timestamp next_send_time = Timestamp::PlusInfinity();
if (paused_) {
return last_send_time_ + kPausedProcessInterval;
}
// If probing is active, that always takes priority.
if (prober_.is_probing()) {
if (prober_.is_probing() && !probing_send_failure_) {
Timestamp probe_time = prober_.NextProbeTime(now);
// `probe_time` == PlusInfinity indicates no probe scheduled.
if (probe_time != Timestamp::PlusInfinity() && !probing_send_failure_) {
return probe_time;
if (!probe_time.IsPlusInfinity()) {
return probe_time.IsMinusInfinity() ? now : probe_time;
}
}
@ -343,14 +336,13 @@ Timestamp PacingController::NextSendTime() const {
// In dynamic mode, figure out when the next packet should be sent,
// given the current conditions.
if (!pace_audio_) {
// Not pacing audio, if leading packet is audio its target send
// time is the time at which it was enqueued.
absl::optional<Timestamp> audio_enqueue_time =
packet_queue_.LeadingAudioPacketEnqueueTime();
if (audio_enqueue_time.has_value()) {
return *audio_enqueue_time;
}
// Not pacing audio, if leading packet is audio its target send
// time is the time at which it was enqueued.
absl::optional<Timestamp> unpaced_audio_time =
pace_audio_ ? absl::nullopt
: packet_queue_.LeadingAudioPacketEnqueueTime();
if (unpaced_audio_time) {
return *unpaced_audio_time;
}
if (congested_ || packet_counter_ == 0) {
@ -358,71 +350,39 @@ Timestamp PacingController::NextSendTime() const {
return last_send_time_ + kCongestedPacketInterval;
}
// Check how long until we can send the next media packet.
if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + media_debt_ / media_rate_);
}
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
// Check how long until we can send the next media packet.
next_send_time = last_process_time_ + media_debt_ / media_rate_;
} else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
RTC_DCHECK_GT(media_rate_, DataRate::Zero());
TimeDelta drain_time =
std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_);
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + drain_time);
next_send_time = last_process_time_ + drain_time;
} else {
// Nothing to do.
next_send_time = last_process_time_ + kPausedProcessInterval;
}
if (send_padding_if_silent_) {
return last_send_time_ + kPausedProcessInterval;
next_send_time =
std::min(next_send_time, last_send_time_ + kPausedProcessInterval);
}
return last_process_time_ + kPausedProcessInterval;
return next_send_time;
}
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
Timestamp target_send_time = now;
if (mode_ == ProcessMode::kDynamic) {
target_send_time = NextSendTime();
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
if (target_send_time.IsMinusInfinity()) {
target_send_time = now;
} else if (now < target_send_time - early_execute_margin) {
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
UpdateBudgetWithElapsedTime(elapsed_time);
return;
}
if (target_send_time < last_process_time_) {
// After the last process call, at time X, the target send time
// shifted to be earlier than X. This should normally not happen
// but we want to make sure rounding errors or erratic behavior
// of NextSendTime() does not cause issue. In particular, if the
// buffer reduction of
// rate * (target_send_time - previous_process_time)
// in the main loop doesn't clean up the existing debt we may not
// be able to send again. We don't want to check this reordering
// there as it is the normal exit condtion when the buffer is
// exhausted and there are packets in the queue.
UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time);
target_send_time = last_process_time_;
}
}
Timestamp previous_process_time = last_process_time_;
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
if (ShouldSendKeepalive(now)) {
DataSize keepalive_data_sent = DataSize::Zero();
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (packet_counter_ == 0) {
last_send_time_ = now;
} else {
DataSize keepalive_data_sent = DataSize::Zero();
if (packet_counter_ > 0) {
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
for (auto& packet : keepalive_packets) {
@ -433,14 +393,29 @@ void PacingController::ProcessPackets() {
EnqueuePacket(std::move(packet));
}
}
OnPaddingSent(keepalive_data_sent);
}
OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now);
}
if (paused_) {
return;
}
if (mode_ == ProcessMode::kDynamic) {
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
target_send_time = NextSendTime();
if (now + early_execute_margin < target_send_time) {
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now));
return;
}
}
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packet_queue_.Size();
@ -452,7 +427,7 @@ void PacingController::ProcessPackets() {
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit - packet_queue_.AverageQueueTime());
queue_time_limit_ - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
@ -467,13 +442,12 @@ void PacingController::ProcessPackets() {
// up to (process interval duration) * (target rate), so we only need to
// update it once before the packet sending loop.
media_budget_.set_target_rate_kbps(target_rate.kbps());
UpdateBudgetWithElapsedTime(elapsed_time);
} else {
media_rate_ = target_rate;
}
UpdateBudgetWithElapsedTime(elapsed_time);
}
bool first_packet_in_probe = false;
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
@ -482,9 +456,23 @@ void PacingController::ProcessPackets() {
// use actual send time rather than target.
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
if (pacing_info.probe_cluster_bytes_sent == 0) {
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
}
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
@ -492,102 +480,74 @@ void PacingController::ProcessPackets() {
}
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
if (first_packet_in_probe) {
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
first_packet_in_probe = false;
}
if (mode_ == ProcessMode::kDynamic &&
previous_process_time < target_send_time) {
// Reduce buffer levels with amount corresponding to time between last
// process and target send time for the next packet.
// If the process call is late, that may be the time between the optimal
// send times for two packets we should already have sent.
UpdateBudgetWithElapsedTime(target_send_time - previous_process_time);
previous_process_time = target_send_time;
}
// Fetch the next packet, so long as queue is not empty or budget is not
while (true) {
// Fetch packet, so long as queue is not empty or budget is not
// exhausted.
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
if (!padding_packets.empty()) {
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
} else {
// Can't generate padding, still update padding budget for next send
// time.
UpdatePaddingBudgetWithSentData(padding_to_add);
}
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
} else {
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
// Send done, update send time.
OnPacketSent(packet_type, packet_size, now);
// Send done, update send/process time to the target send time.
OnPacketSent(packet_type, packet_size, target_send_time);
// If we are currently probing, we need to stop the send loop when we
// have reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
// If we are currently probing, we need to stop the send loop when we have
// reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
if (mode_ == ProcessMode::kDynamic) {
// Update target send time in case that are more packets that we are late
// in processing.
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsMinusInfinity()) {
target_send_time = now;
} else {
target_send_time = std::min(now, next_send_time);
if (mode_ == ProcessMode::kDynamic) {
target_send_time = NextSendTime();
if (target_send_time > now) {
// Exit loop if not probing.
if (!is_probing) {
break;
}
target_send_time = now;
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time));
}
}
}
last_process_time_ = std::max(last_process_time_, previous_process_time);
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
@ -609,8 +569,8 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
}
if (packet_counter_ == 0) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
return DataSize::Zero();
}
@ -675,25 +635,16 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
void PacingController::OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size,
Timestamp send_time) {
if (!first_sent_packet_time_) {
if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) {
first_sent_packet_time_ = send_time;
}
bool audio_packet = packet_type == RtpPacketMediaType::kAudio;
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) {
UpdateBudgetWithSentData(packet_size);
}
last_send_time_ = send_time;
last_process_time_ = send_time;
}
void PacingController::OnPaddingSent(DataSize data_sent) {
if (data_sent > DataSize::Zero()) {
UpdateBudgetWithSentData(data_sent);
}
Timestamp now = CurrentTime();
last_send_time_ = now;
last_process_time_ = now;
last_send_time_ = send_time;
}
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
@ -710,17 +661,24 @@ void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
void PacingController::UpdateBudgetWithSentData(DataSize size) {
if (mode_ == ProcessMode::kPeriodic) {
media_budget_.UseBudget(size.bytes());
padding_budget_.UseBudget(size.bytes());
} else {
media_debt_ += size;
media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime);
}
UpdatePaddingBudgetWithSentData(size);
}
void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) {
if (mode_ == ProcessMode::kPeriodic) {
padding_budget_.UseBudget(size.bytes());
} else {
padding_debt_ += size;
padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime);
}
}
void PacingController::SetQueueTimeLimit(TimeDelta limit) {
queue_time_limit = limit;
queue_time_limit_ = limit;
}
} // namespace webrtc

View File

@ -78,6 +78,11 @@ class PacingController {
static const TimeDelta kMinSleepTime;
// Allow probes to be processed slightly ahead of inteded send time. Currently
// set to 1ms as this is intended to allow times be rounded down to the
// nearest millisecond.
static const TimeDelta kMaxEarlyProbeProcessing;
PacingController(Clock* clock,
PacketSender* packet_sender,
const WebRtcKeyValueConfig& field_trials,
@ -153,6 +158,7 @@ class PacingController {
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBudgetWithElapsedTime(TimeDelta delta);
void UpdateBudgetWithSentData(DataSize size);
void UpdatePaddingBudgetWithSentData(DataSize size);
DataSize PaddingToAdd(DataSize recommended_probe_size,
DataSize data_sent) const;
@ -164,7 +170,6 @@ class PacingController {
void OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size,
Timestamp send_time);
void OnPaddingSent(DataSize padding_sent);
Timestamp CurrentTime() const;
@ -190,9 +195,9 @@ class PacingController {
mutable Timestamp last_timestamp_;
bool paused_;
// In dynamic mode, `media_budget_` and `padding_budget_` will be used to
// In periodic mode, `media_budget_` and `padding_budget_` will be used to
// track when packets can be sent.
// In periodic mode, `media_debt_` and `padding_debt_` will be used together
// In dynamic mode, `media_debt_` and `padding_debt_` will be used together
// with the target rates.
// This is the media budget, keeping track of how many bits of media
@ -222,7 +227,7 @@ class PacingController {
bool congested_;
TimeDelta queue_time_limit;
TimeDelta queue_time_limit_;
bool account_for_audio_;
bool include_overhead_;
};

View File

@ -15,9 +15,6 @@
#include "absl/memory/memory.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/trace_event.h"
namespace webrtc {
@ -29,6 +26,8 @@ constexpr const char* kSlackedTaskQueuePacedSenderFieldTrial =
} // namespace
const int TaskQueuePacedSender::kNoPacketHoldback = -1;
TaskQueuePacedSender::TaskQueuePacedSender(
Clock* clock,
PacingController::PacketSender* packet_sender,
@ -52,10 +51,11 @@ TaskQueuePacedSender::TaskQueuePacedSender(
is_started_(false),
is_shutdown_(false),
packet_size_(/*alpha=*/0.95),
include_overhead_(false),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender",
TaskQueueFactory::Priority::NORMAL)) {
packet_size_.Apply(1, 0);
RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
}
TaskQueuePacedSender::~TaskQueuePacedSender() {
@ -133,7 +133,11 @@ void TaskQueuePacedSender::EnqueuePackets(
task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
for (auto& packet : packets_) {
packet_size_.Apply(1, packet->size());
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
pacing_controller_.EnqueuePacket(std::move(packet));
}
@ -152,6 +156,7 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
void TaskQueuePacedSender::SetIncludeOverhead() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
include_overhead_ = true;
pacing_controller_.SetIncludeOverhead();
MaybeProcessPackets(Timestamp::MinusInfinity());
});
@ -187,13 +192,16 @@ absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
if (oldest_packet.IsInfinite())
if (oldest_packet.IsInfinite()) {
return TimeDelta::Zero();
}
// (webrtc:9716): The clock is not always monotonic.
Timestamp current = clock_->CurrentTime();
if (current < oldest_packet)
if (current < oldest_packet) {
return TimeDelta::Zero();
}
return current - oldest_packet;
}
@ -206,64 +214,69 @@ void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
RTC_DCHECK_RUN_ON(&task_queue_);
#if RTC_TRACE_EVENTS_ENABLED
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::MaybeProcessPackets");
#endif
if (is_shutdown_ || !is_started_) {
return;
}
// Normally, run ProcessPackets() only if this is the scheduled task.
// If it is not but it is already time to process and there either is
// no scheduled task or the schedule has shifted forward in time, run
// anyway and clear any schedule.
Timestamp next_process_time = pacing_controller_.NextSendTime();
Timestamp next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
const Timestamp now = clock_->CurrentTime();
const bool is_scheduled_call = next_process_time_ == scheduled_process_time;
if (is_scheduled_call) {
// Indicate no pending scheduled call.
TimeDelta early_execute_margin =
pacing_controller_.IsProbing()
? PacingController::kMaxEarlyProbeProcessing
: TimeDelta::Zero();
// Process packets and update stats.
while (next_send_time <= now + early_execute_margin) {
pacing_controller_.ProcessPackets();
next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
// Probing state could change. Get margin after process packets.
early_execute_margin = pacing_controller_.IsProbing()
? PacingController::kMaxEarlyProbeProcessing
: TimeDelta::Zero();
}
UpdateStats();
// Ignore retired scheduled task, otherwise reset `next_process_time_`.
if (scheduled_process_time.IsFinite()) {
if (scheduled_process_time != next_process_time_) {
return;
}
next_process_time_ = Timestamp::MinusInfinity();
}
if (is_scheduled_call ||
(now >= next_process_time && (next_process_time_.IsInfinite() ||
next_process_time < next_process_time_))) {
pacing_controller_.ProcessPackets();
next_process_time = pacing_controller_.NextSendTime();
}
TimeDelta hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered());
if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() &&
!avg_packet_size.IsZero()) {
TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
absl::optional<TimeDelta> time_to_next_process;
if (pacing_controller_.IsProbing() &&
next_process_time != next_process_time_) {
// If we're probing and there isn't already a wakeup scheduled for the next
// process time, always post a task and just round sleep time down to
// nearest millisecond.
if (next_process_time.IsMinusInfinity()) {
time_to_next_process = TimeDelta::Zero();
} else {
time_to_next_process =
std::max(TimeDelta::Zero(),
(next_process_time - now).RoundDownTo(TimeDelta::Millis(1)));
// Do not hold back in probing.
TimeDelta hold_back_window = TimeDelta::Zero();
if (!pacing_controller_.IsProbing()) {
hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
!pacing_rate.IsZero() &&
packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
TimeDelta avg_packet_send_time =
DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
} else if (next_process_time_.IsMinusInfinity() ||
next_process_time <= next_process_time_ - hold_back_window) {
// Schedule a new task since there is none currently scheduled
// (`next_process_time_` is infinite), or the new process time is at least
// one holdback window earlier than whatever is currently scheduled.
time_to_next_process = std::max(next_process_time - now, hold_back_window);
}
if (time_to_next_process) {
// Set a new scheduled process time and post a delayed task.
next_process_time_ = next_process_time;
// Calculate next process time.
TimeDelta time_to_next_process =
std::max(hold_back_window, next_send_time - now - early_execute_margin);
next_send_time = now + time_to_next_process;
// If no in flight task or in flight task is later than `next_send_time`,
// schedule a new one. Previous in flight task will be retired.
if (next_process_time_.IsMinusInfinity() ||
next_process_time_ > next_send_time) {
// Prefer low precision if allowed and not probing.
TaskQueueBase::DelayPrecision precision =
allow_low_precision_ && !pacing_controller_.IsProbing()
@ -272,11 +285,10 @@ void TaskQueuePacedSender::MaybeProcessPackets(
task_queue_.PostDelayedTaskWithPrecision(
precision,
[this, next_process_time]() { MaybeProcessPackets(next_process_time); },
time_to_next_process->ms<uint32_t>());
[this, next_send_time]() { MaybeProcessPackets(next_send_time); },
time_to_next_process.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>());
next_process_time_ = next_send_time;
}
UpdateStats();
}
void TaskQueuePacedSender::UpdateStats() {

View File

@ -14,9 +14,7 @@
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <memory>
#include <queue>
#include <vector>
#include "absl/base/attributes.h"
@ -31,7 +29,6 @@
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
@ -40,6 +37,8 @@ class Clock;
class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
public:
static const int kNoPacketHoldback;
// The `hold_back_window` parameter sets a lower bound on time to sleep if
// there is currently a pacer queue and packets can't immediately be
// processed. Increasing this reduces thread wakeups at the expense of higher
@ -163,6 +162,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// Filtered size of enqueued packets, in bytes.
rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_);
bool include_overhead_ RTC_GUARDED_BY(task_queue_);
mutable Mutex stats_mutex_;
Stats current_stats_ RTC_GUARDED_BY(stats_mutex_);

View File

@ -41,7 +41,6 @@ constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 1234;
constexpr int kNoPacketHoldback = -1;
class MockPacketRouter : public PacketRouter {
public:
@ -206,7 +205,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) {
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
kNoPacketHoldback);
TaskQueuePacedSender::kNoPacketHoldback);
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
@ -246,7 +245,7 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
kNoPacketHoldback);
TaskQueuePacedSender::kNoPacketHoldback);
// Insert a number of packets to be sent 200ms apart.
const size_t kPacketsPerSecond = 5;
@ -298,7 +297,7 @@ TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
kNoPacketHoldback);
TaskQueuePacedSender::kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -330,7 +329,8 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
ScopedKeyValueConfig trials;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
kCoalescingWindow,
TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -366,7 +366,8 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
ScopedKeyValueConfig trials;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
kCoalescingWindow,
TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -390,7 +391,7 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
ScopedKeyValueConfig trials(
"WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
@ -398,7 +399,7 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
kNoPacketHoldback);
TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -423,15 +424,16 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
// Advance to less than 3ms before next packet send time.
time_controller.AdvanceTime(TimeDelta::Micros(1001));
// Trigger a probe at 4x the current pacing rate and insert the number of
// Trigger a probe at 2x the current pacing rate and insert the number of
// packets the probe needs.
const DataRate kProbeRate = 2 * kPacingDataRate;
const int kProbeClusterId = 1;
pacer.CreateProbeCluster(kProbeRate, kProbeClusterId);
// Expected size for each probe in a cluster is twice the expected bits
// sent during min_probe_delta.
// Expect one additional call since probe always starts with a small
// Expected size for each probe in a cluster is twice the expected bits sent
// during min_probe_delta.
// Expect one additional call since probe always starts with a small (1 byte)
// padding packet that's not counted into the probe rate here.
const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
const size_t kNumPacketsInProbe =
@ -465,7 +467,7 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
kNoPacketHoldback);
TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -507,8 +509,8 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
// Verify the amount of probing data sent.
// Probe always starts with a small (1 byte) padding packet that's not
// counted into the probe rate here.
EXPECT_EQ(data_sent,
kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1));
const DataSize kMinProbeSize = 2 * kMinProbeDelta * kProbingRate;
EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize);
}
TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
@ -608,6 +610,50 @@ TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
time_controller.AdvanceTime(kFixedCoalescingWindow);
}
TEST(TaskQueuePacedSenderTest, ProbingStopDuringSendLoop) {
// Set a low `min_probe_delta` to let probing finish during send loop.
ScopedKeyValueConfig trials(
"WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so 2 packets adds 1ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
const DataRate kPacingDataRate = 2 * kPacketSize / kPacketPacingTime;
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
pacer.EnsureStarted();
EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
return std::vector<std::unique_ptr<RtpPacketToSend>>();
});
EXPECT_CALL(packet_router, GeneratePadding(_))
.WillRepeatedly(
[](DataSize target_size) { return GeneratePadding(target_size); });
// Set probe rate.
const int kProbeClusterId = 1;
const DataRate kProbingRate = kPacingDataRate;
pacer.CreateProbeCluster(kProbingRate, kProbeClusterId);
const int kPacketsToSend = 100;
const TimeDelta kPacketsPacedTime =
std::max(kPacketsToSend * kPacketSize / kPacingDataRate,
kPacketsToSend * kPacketSize / kProbingRate);
// Expect all packets and one padding packet sent.
EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend + 1);
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
time_controller.AdvanceTime(kPacketsPacedTime + TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, Stats) {
static constexpr Timestamp kStartTime = Timestamp::Millis(1234);
GlobalSimulatedTimeController time_controller(kStartTime);
@ -616,7 +662,7 @@ TEST(TaskQueuePacedSenderTest, Stats) {
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
kNoPacketHoldback);
TaskQueuePacedSender::kNoPacketHoldback);
// Simulate ~2mbps video stream, covering one second.
static constexpr size_t kPacketsToSend = 200;
@ -688,9 +734,10 @@ TEST(TaskQueuePacedSenderTest, HighPrecisionPacingWhenSlackIsDisabled) {
time_controller.GetTaskQueueFactory());
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router, experiments,
&task_queue_factory, PacingController::kMinSleepTime, kNoPacketHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
experiments, &task_queue_factory,
PacingController::kMinSleepTime,
TaskQueuePacedSender::kNoPacketHoldback);
// Send enough packets (covering one second) that pacing is triggered, i.e.
// delayed tasks being scheduled.
@ -731,9 +778,10 @@ TEST(TaskQueuePacedSenderTest, LowPrecisionPacingWhenSlackIsEnabled) {
time_controller.GetTaskQueueFactory());
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router, experiments,
&task_queue_factory, PacingController::kMinSleepTime, kNoPacketHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
experiments, &task_queue_factory,
PacingController::kMinSleepTime,
TaskQueuePacedSender::kNoPacketHoldback);
// Send enough packets (covering one second) that pacing is triggered, i.e.
// delayed tasks being scheduled.