diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 8ee347b74a..f1a6201ce7 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -79,8 +79,7 @@ RtpTransportControllerSend::PacerSettings::PacerSettings( const WebRtcKeyValueConfig* trials) : tq_disabled("Disabled"), holdback_window("holdback_window", PacingController::kMinSleepTime), - holdback_packets("holdback_packets", - TaskQueuePacedSender::kNoPacketHoldback) { + holdback_packets("holdback_packets", -1) { ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets}, trials->Lookup("WebRTC-TaskQueuePacer")); } diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 4fb33ff072..9215462239 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -38,6 +38,11 @@ constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2); // time. Applies only to periodic mode. constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis(30); +// Allow probes to be processed slightly ahead of inteded send time. Currently +// set to 1ms as this is intended to allow times be rounded down to the nearest +// millisecond. +constexpr TimeDelta kMaxEarlyProbeProcessing = TimeDelta::Millis(1); + constexpr int kFirstPriority = 0; bool IsDisabled(const WebRtcKeyValueConfig& field_trials, @@ -89,8 +94,6 @@ const float PacingController::kDefaultPaceMultiplier = 2.5f; const TimeDelta PacingController::kPausedProcessInterval = kCongestedPacketInterval; const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1); -const TimeDelta PacingController::kMaxEarlyProbeProcessing = - TimeDelta::Millis(1); PacingController::PacingController(Clock* clock, PacketSender* packet_sender, @@ -130,7 +133,7 @@ PacingController::PacingController(Clock* clock, packet_counter_(0), congestion_window_size_(DataSize::PlusInfinity()), outstanding_data_(DataSize::Zero()), - queue_time_limit_(kMaxExpectedQueueLength), + queue_time_limit(kMaxExpectedQueueLength), account_for_audio_(false), include_overhead_(false) { if (!drain_large_queues_) { @@ -221,7 +224,6 @@ void PacingController::SetPacingRates(DataRate pacing_rate, media_rate_ = pacing_rate; padding_rate_ = padding_rate; pacing_bitrate_ = pacing_rate; - media_budget_.set_target_rate_kbps(pacing_rate.kbps()); padding_budget_.set_target_rate_kbps(padding_rate.kbps()); RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" @@ -300,7 +302,10 @@ void PacingController::EnqueuePacketInternal( // Use that as last process time only if it's prior to now. target_process_time = std::min(now, next_send_time); } - UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time)); + + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_process_time); + UpdateBudgetWithElapsedTime(elapsed_time); + last_process_time_ = target_process_time; } packet_queue_.Push(priority, now, packet_counter_++, std::move(packet)); } @@ -311,6 +316,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { if (last_process_time_.IsMinusInfinity() || now < last_process_time_) { return TimeDelta::Zero(); } + RTC_DCHECK_GE(now, last_process_time_); TimeDelta elapsed_time = now - last_process_time_; last_process_time_ = now; if (elapsed_time > kMaxElapsedTime) { @@ -327,7 +333,8 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const { packet_counter_ == 0) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. - if (now - last_send_time_ >= kCongestedPacketInterval) { + TimeDelta elapsed_since_last_send = now - last_send_time_; + if (elapsed_since_last_send >= kCongestedPacketInterval) { return true; } } @@ -336,17 +343,17 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const { Timestamp PacingController::NextSendTime() const { const Timestamp now = CurrentTime(); - Timestamp next_send_time = Timestamp::PlusInfinity(); if (paused_) { return last_send_time_ + kPausedProcessInterval; } // If probing is active, that always takes priority. - if (prober_.is_probing() && !probing_send_failure_) { + if (prober_.is_probing()) { Timestamp probe_time = prober_.NextProbeTime(now); - if (!probe_time.IsPlusInfinity()) { - return probe_time.IsMinusInfinity() ? now : probe_time; + // `probe_time` == PlusInfinity indicates no probe scheduled. + if (probe_time != Timestamp::PlusInfinity() && !probing_send_failure_) { + return probe_time; } } @@ -358,53 +365,86 @@ Timestamp PacingController::NextSendTime() const { // In dynamic mode, figure out when the next packet should be sent, // given the current conditions. - // Not pacing audio, if leading packet is audio its target send - // time is the time at which it was enqueued. - absl::optional unpaced_audio_time = - pace_audio_ ? absl::nullopt - : packet_queue_.LeadingAudioPacketEnqueueTime(); - if (unpaced_audio_time) { - return *unpaced_audio_time; + if (!pace_audio_) { + // Not pacing audio, if leading packet is audio its target send + // time is the time at which it was enqueued. + absl::optional audio_enqueue_time = + packet_queue_.LeadingAudioPacketEnqueueTime(); + if (audio_enqueue_time.has_value()) { + return *audio_enqueue_time; + } } - // We need to at least send keep-alive packets with some interval. if (Congested() || packet_counter_ == 0) { + // We need to at least send keep-alive packets with some interval. return last_send_time_ + kCongestedPacketInterval; } + // Check how long until we can send the next media packet. if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { - // Check how long until we can send the next media packet. - next_send_time = last_process_time_ + media_debt_ / media_rate_; - } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { - // If we _don't_ have pending packets, check how long until we have - // bandwidth for padding packets. Both media and padding debts must - // have been drained to do this. - RTC_DCHECK_GT(media_rate_, DataRate::Zero()); + return std::min(last_send_time_ + kPausedProcessInterval, + last_process_time_ + media_debt_ / media_rate_); + } + + // If we _don't_ have pending packets, check how long until we have + // bandwidth for padding packets. Both media and padding debts must + // have been drained to do this. + if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { TimeDelta drain_time = std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_); - next_send_time = last_process_time_ + drain_time; - } else { - // Nothing to do. - next_send_time = last_process_time_ + kPausedProcessInterval; + return std::min(last_send_time_ + kPausedProcessInterval, + last_process_time_ + drain_time); } if (send_padding_if_silent_) { - next_send_time = - std::min(next_send_time, last_send_time_ + kPausedProcessInterval); + return last_send_time_ + kPausedProcessInterval; } - - return next_send_time; + return last_process_time_ + kPausedProcessInterval; } void PacingController::ProcessPackets() { Timestamp now = CurrentTime(); Timestamp target_send_time = now; + if (mode_ == ProcessMode::kDynamic) { + target_send_time = NextSendTime(); + TimeDelta early_execute_margin = + prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); + if (target_send_time.IsMinusInfinity()) { + target_send_time = now; + } else if (now < target_send_time - early_execute_margin) { + // We are too early, but if queue is empty still allow draining some debt. + // Probing is allowed to be sent up to kMinSleepTime early. + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); + UpdateBudgetWithElapsedTime(elapsed_time); + return; + } + + if (target_send_time < last_process_time_) { + // After the last process call, at time X, the target send time + // shifted to be earlier than X. This should normally not happen + // but we want to make sure rounding errors or erratic behavior + // of NextSendTime() does not cause issue. In particular, if the + // buffer reduction of + // rate * (target_send_time - previous_process_time) + // in the main loop doesn't clean up the existing debt we may not + // be able to send again. We don't want to check this reordering + // there as it is the normal exit condtion when the buffer is + // exhausted and there are packets in the queue. + UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time); + target_send_time = last_process_time_; + } + } + + Timestamp previous_process_time = last_process_time_; + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); if (ShouldSendKeepalive(now)) { - DataSize keepalive_data_sent = DataSize::Zero(); // We can not send padding unless a normal packet has first been sent. If // we do, timestamps get messed up. - if (packet_counter_ > 0) { + if (packet_counter_ == 0) { + last_send_time_ = now; + } else { + DataSize keepalive_data_sent = DataSize::Zero(); std::vector> keepalive_packets = packet_sender_->GeneratePadding(DataSize::Bytes(1)); for (auto& packet : keepalive_packets) { @@ -415,29 +455,14 @@ void PacingController::ProcessPackets() { EnqueuePacket(std::move(packet)); } } + OnPaddingSent(keepalive_data_sent); } - OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now); } if (paused_) { return; } - if (mode_ == ProcessMode::kDynamic) { - TimeDelta early_execute_margin = - prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); - - target_send_time = NextSendTime(); - if (now + early_execute_margin < target_send_time) { - // We are too early, but if queue is empty still allow draining some debt. - // Probing is allowed to be sent up to kMinSleepTime early. - UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now)); - return; - } - } - - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time); - if (elapsed_time > TimeDelta::Zero()) { DataRate target_rate = pacing_bitrate_; DataSize queue_size_data = packet_queue_.Size(); @@ -449,7 +474,7 @@ void PacingController::ProcessPackets() { if (drain_large_queues_) { TimeDelta avg_time_left = std::max(TimeDelta::Millis(1), - queue_time_limit_ - packet_queue_.AverageQueueTime()); + queue_time_limit - packet_queue_.AverageQueueTime()); DataRate min_rate_needed = queue_size_data / avg_time_left; if (min_rate_needed > target_rate) { target_rate = min_rate_needed; @@ -464,12 +489,13 @@ void PacingController::ProcessPackets() { // up to (process interval duration) * (target rate), so we only need to // update it once before the packet sending loop. media_budget_.set_target_rate_kbps(target_rate.kbps()); + UpdateBudgetWithElapsedTime(elapsed_time); } else { media_rate_ = target_rate; } - UpdateBudgetWithElapsedTime(elapsed_time); } + bool first_packet_in_probe = false; PacedPacketInfo pacing_info; DataSize recommended_probe_size = DataSize::Zero(); bool is_probing = prober_.is_probing(); @@ -478,23 +504,9 @@ void PacingController::ProcessPackets() { // use actual send time rather than target. pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo()); if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) { + first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0; recommended_probe_size = prober_.RecommendedMinProbeSize(); RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero()); - - // If first packet in probe, insert a small padding packet so we have a - // more reliable start window for the rate estimation. - if (pacing_info.probe_cluster_bytes_sent == 0) { - auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1)); - // If no RTP modules sending media are registered, we may not get a - // padding packet back. - if (!padding.empty()) { - // Insert with high priority so larger media packets don't preempt it. - EnqueuePacketInternal(std::move(padding[0]), kFirstPriority); - // We should never get more than one padding packets with a requested - // size of 1 byte. - RTC_DCHECK_EQ(padding.size(), 1u); - } - } } else { // No valid probe cluster returned, probe might have timed out. is_probing = false; @@ -502,74 +514,102 @@ void PacingController::ProcessPackets() { } DataSize data_sent = DataSize::Zero(); - while (true) { - // Fetch packet, so long as queue is not empty or budget is not + + // The paused state is checked in the loop since it leaves the critical + // section allowing the paused state to be changed from other code. + while (!paused_) { + if (first_packet_in_probe) { + // If first packet in probe, insert a small padding packet so we have a + // more reliable start window for the rate estimation. + auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1)); + // If no RTP modules sending media are registered, we may not get a + // padding packet back. + if (!padding.empty()) { + // Insert with high priority so larger media packets don't preempt it. + EnqueuePacketInternal(std::move(padding[0]), kFirstPriority); + // We should never get more than one padding packets with a requested + // size of 1 byte. + RTC_DCHECK_EQ(padding.size(), 1u); + } + first_packet_in_probe = false; + } + + if (mode_ == ProcessMode::kDynamic && + previous_process_time < target_send_time) { + // Reduce buffer levels with amount corresponding to time between last + // process and target send time for the next packet. + // If the process call is late, that may be the time between the optimal + // send times for two packets we should already have sent. + UpdateBudgetWithElapsedTime(target_send_time - previous_process_time); + previous_process_time = target_send_time; + } + + // Fetch the next packet, so long as queue is not empty or budget is not // exhausted. std::unique_ptr rtp_packet = GetPendingPacket(pacing_info, target_send_time, now); + if (rtp_packet == nullptr) { // No packet available to send, check if we should send padding. DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent); if (padding_to_add > DataSize::Zero()) { std::vector> padding_packets = packet_sender_->GeneratePadding(padding_to_add); - if (!padding_packets.empty()) { - for (auto& packet : padding_packets) { - EnqueuePacket(std::move(packet)); - } - // Continue loop to send the padding that was just added. - continue; - } else { - // Can't generate padding, still update padding budget for next send - // time. - UpdatePaddingBudgetWithSentData(padding_to_add); + if (padding_packets.empty()) { + // No padding packets were generated, quite send loop. + break; } + for (auto& packet : padding_packets) { + EnqueuePacket(std::move(packet)); + } + // Continue loop to send the padding that was just added. + continue; } + // Can't fetch new packet and no padding to send, exit send loop. break; - } else { - RTC_DCHECK(rtp_packet); - RTC_DCHECK(rtp_packet->packet_type().has_value()); - const RtpPacketMediaType packet_type = *rtp_packet->packet_type(); - DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() + - rtp_packet->padding_size()); + } - if (include_overhead_) { - packet_size += DataSize::Bytes(rtp_packet->headers_size()) + - transport_overhead_per_packet_; - } + RTC_DCHECK(rtp_packet); + RTC_DCHECK(rtp_packet->packet_type().has_value()); + const RtpPacketMediaType packet_type = *rtp_packet->packet_type(); + DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() + + rtp_packet->padding_size()); - packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); - for (auto& packet : packet_sender_->FetchFec()) { - EnqueuePacket(std::move(packet)); - } - data_sent += packet_size; + if (include_overhead_) { + packet_size += DataSize::Bytes(rtp_packet->headers_size()) + + transport_overhead_per_packet_; + } - // Send done, update send time. - OnPacketSent(packet_type, packet_size, now); + packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); + for (auto& packet : packet_sender_->FetchFec()) { + EnqueuePacket(std::move(packet)); + } + data_sent += packet_size; - // If we are currently probing, we need to stop the send loop when we - // have reached the send target. - if (is_probing && data_sent >= recommended_probe_size) { - break; - } + // Send done, update send/process time to the target send time. + OnPacketSent(packet_type, packet_size, target_send_time); + // If we are currently probing, we need to stop the send loop when we have + // reached the send target. + if (is_probing && data_sent >= recommended_probe_size) { + break; + } + + if (mode_ == ProcessMode::kDynamic) { // Update target send time in case that are more packets that we are late // in processing. - if (mode_ == ProcessMode::kDynamic) { - target_send_time = NextSendTime(); - if (target_send_time > now) { - // Exit loop if not probing. - if (!is_probing) { - break; - } - target_send_time = now; - } - UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time)); + Timestamp next_send_time = NextSendTime(); + if (next_send_time.IsMinusInfinity()) { + target_send_time = now; + } else { + target_send_time = std::min(now, next_send_time); } } } + last_process_time_ = std::max(last_process_time_, previous_process_time); + if (is_probing) { probing_send_failure_ = data_sent == DataSize::Zero(); if (!probing_send_failure_) { @@ -591,8 +631,8 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, } if (packet_counter_ == 0) { - // We can not send padding unless a normal packet has first been sent. If - // we do, timestamps get messed up. + // We can not send padding unless a normal packet has first been sent. If we + // do, timestamps get messed up. return DataSize::Zero(); } @@ -657,16 +697,25 @@ std::unique_ptr PacingController::GetPendingPacket( void PacingController::OnPacketSent(RtpPacketMediaType packet_type, DataSize packet_size, Timestamp send_time) { - if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) { + if (!first_sent_packet_time_) { first_sent_packet_time_ = send_time; } - bool audio_packet = packet_type == RtpPacketMediaType::kAudio; - if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) { + if (!audio_packet || account_for_audio_) { + // Update media bytes sent. UpdateBudgetWithSentData(packet_size); } - last_send_time_ = send_time; + last_process_time_ = send_time; +} + +void PacingController::OnPaddingSent(DataSize data_sent) { + if (data_sent > DataSize::Zero()) { + UpdateBudgetWithSentData(data_sent); + } + Timestamp now = CurrentTime(); + last_send_time_ = now; + last_process_time_ = now; } void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { @@ -684,24 +733,17 @@ void PacingController::UpdateBudgetWithSentData(DataSize size) { outstanding_data_ += size; if (mode_ == ProcessMode::kPeriodic) { media_budget_.UseBudget(size.bytes()); + padding_budget_.UseBudget(size.bytes()); } else { media_debt_ += size; media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime); - } - UpdatePaddingBudgetWithSentData(size); -} - -void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) { - if (mode_ == ProcessMode::kPeriodic) { - padding_budget_.UseBudget(size.bytes()); - } else { padding_debt_ += size; padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime); } } void PacingController::SetQueueTimeLimit(TimeDelta limit) { - queue_time_limit_ = limit; + queue_time_limit = limit; } } // namespace webrtc diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index e7ef201a77..5d6d26b917 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -79,11 +79,6 @@ class PacingController { static const TimeDelta kMinSleepTime; - // Allow probes to be processed slightly ahead of inteded send time. Currently - // set to 1ms as this is intended to allow times be rounded down to the - // nearest millisecond. - static const TimeDelta kMaxEarlyProbeProcessing; - PacingController(Clock* clock, PacketSender* packet_sender, RtcEventLog* event_log, @@ -163,7 +158,6 @@ class PacingController { // Updates the number of bytes that can be sent for the next time interval. void UpdateBudgetWithElapsedTime(TimeDelta delta); void UpdateBudgetWithSentData(DataSize size); - void UpdatePaddingBudgetWithSentData(DataSize size); DataSize PaddingToAdd(DataSize recommended_probe_size, DataSize data_sent) const; @@ -175,6 +169,7 @@ class PacingController { void OnPacketSent(RtpPacketMediaType packet_type, DataSize packet_size, Timestamp send_time); + void OnPaddingSent(DataSize padding_sent); Timestamp CurrentTime() const; @@ -201,9 +196,9 @@ class PacingController { mutable Timestamp last_timestamp_; bool paused_; - // In periodic mode, `media_budget_` and `padding_budget_` will be used to + // In dynamic mode, `media_budget_` and `padding_budget_` will be used to // track when packets can be sent. - // In dynamic mode, `media_debt_` and `padding_debt_` will be used together + // In periodic mode, `media_debt_` and `padding_debt_` will be used together // with the target rates. // This is the media budget, keeping track of how many bits of media @@ -234,7 +229,7 @@ class PacingController { DataSize congestion_window_size_; DataSize outstanding_data_; - TimeDelta queue_time_limit_; + TimeDelta queue_time_limit; bool account_for_audio_; bool include_overhead_; }; diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index 18e53d1ad7..c392a88720 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -12,14 +12,15 @@ #include #include - +#include "absl/memory/memory.h" #include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/trace_event.h" namespace webrtc { -const int TaskQueuePacedSender::kNoPacketHoldback = -1; - TaskQueuePacedSender::TaskQueuePacedSender( Clock* clock, PacingController::PacketSender* packet_sender, @@ -40,11 +41,10 @@ TaskQueuePacedSender::TaskQueuePacedSender( is_started_(false), is_shutdown_(false), packet_size_(/*alpha=*/0.95), - include_overhead_(false), task_queue_(task_queue_factory->CreateTaskQueue( "TaskQueuePacedSender", TaskQueueFactory::Priority::NORMAL)) { - RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime); + packet_size_.Apply(1, 0); } TaskQueuePacedSender::~TaskQueuePacedSender() { @@ -140,11 +140,7 @@ void TaskQueuePacedSender::EnqueuePackets( task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable { RTC_DCHECK_RUN_ON(&task_queue_); for (auto& packet : packets_) { - size_t packet_size = packet->payload_size() + packet->padding_size(); - if (include_overhead_) { - packet_size += packet->headers_size(); - } - packet_size_.Apply(1, packet_size); + packet_size_.Apply(1, packet->size()); RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero()); pacing_controller_.EnqueuePacket(std::move(packet)); } @@ -163,7 +159,6 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) { void TaskQueuePacedSender::SetIncludeOverhead() { task_queue_.PostTask([this]() { RTC_DCHECK_RUN_ON(&task_queue_); - include_overhead_ = true; pacing_controller_.SetIncludeOverhead(); MaybeProcessPackets(Timestamp::MinusInfinity()); }); @@ -199,16 +194,13 @@ absl::optional TaskQueuePacedSender::FirstSentPacketTime() const { TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const { Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time; - if (oldest_packet.IsInfinite()) { + if (oldest_packet.IsInfinite()) return TimeDelta::Zero(); - } // (webrtc:9716): The clock is not always monotonic. Timestamp current = clock_->CurrentTime(); - if (current < oldest_packet) { + if (current < oldest_packet) return TimeDelta::Zero(); - } - return current - oldest_packet; } @@ -221,70 +213,70 @@ void TaskQueuePacedSender::MaybeProcessPackets( Timestamp scheduled_process_time) { RTC_DCHECK_RUN_ON(&task_queue_); -#if RTC_TRACE_EVENTS_ENABLED - TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"), - "TaskQueuePacedSender::MaybeProcessPackets"); -#endif - if (is_shutdown_ || !is_started_) { return; } - Timestamp next_send_time = pacing_controller_.NextSendTime(); - RTC_DCHECK(next_send_time.IsFinite()); + // Normally, run ProcessPackets() only if this is the scheduled task. + // If it is not but it is already time to process and there either is + // no scheduled task or the schedule has shifted forward in time, run + // anyway and clear any schedule. + Timestamp next_process_time = pacing_controller_.NextSendTime(); const Timestamp now = clock_->CurrentTime(); - - TimeDelta early_execute_margin = - pacing_controller_.IsProbing() - ? PacingController::kMaxEarlyProbeProcessing - : TimeDelta::Zero(); - - // Process packets and update stats. - while (next_send_time <= now + early_execute_margin) { - pacing_controller_.ProcessPackets(); - next_send_time = pacing_controller_.NextSendTime(); - RTC_DCHECK(next_send_time.IsFinite()); - } - UpdateStats(); - - // Ignore retired scheduled task, otherwise reset `next_process_time_`. - if (scheduled_process_time.IsFinite()) { - if (scheduled_process_time != next_process_time_) { - return; - } + const bool is_scheduled_call = next_process_time_ == scheduled_process_time; + if (is_scheduled_call) { + // Indicate no pending scheduled call. next_process_time_ = Timestamp::MinusInfinity(); } + if (is_scheduled_call || + (now >= next_process_time && (next_process_time_.IsInfinite() || + next_process_time < next_process_time_))) { + pacing_controller_.ProcessPackets(); + next_process_time = pacing_controller_.NextSendTime(); + } - // Do not hold back in probing. - TimeDelta hold_back_window = TimeDelta::Zero(); - if (!pacing_controller_.IsProbing()) { - hold_back_window = max_hold_back_window_; - DataRate pacing_rate = pacing_controller_.pacing_rate(); - if (max_hold_back_window_in_packets_ != kNoPacketHoldback && - !pacing_rate.IsZero() && - packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) { - TimeDelta avg_packet_send_time = - DataSize::Bytes(packet_size_.filtered()) / pacing_rate; - hold_back_window = - std::min(hold_back_window, - avg_packet_send_time * max_hold_back_window_in_packets_); + TimeDelta hold_back_window = max_hold_back_window_; + DataRate pacing_rate = pacing_controller_.pacing_rate(); + DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered()); + if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() && + !avg_packet_size.IsZero()) { + TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate; + hold_back_window = + std::min(hold_back_window, + avg_packet_send_time * max_hold_back_window_in_packets_); + } + + absl::optional time_to_next_process; + if (pacing_controller_.IsProbing() && + next_process_time != next_process_time_) { + // If we're probing and there isn't already a wakeup scheduled for the next + // process time, always post a task and just round sleep time down to + // nearest millisecond. + if (next_process_time.IsMinusInfinity()) { + time_to_next_process = TimeDelta::Zero(); + } else { + time_to_next_process = + std::max(TimeDelta::Zero(), + (next_process_time - now).RoundDownTo(TimeDelta::Millis(1))); } + } else if (next_process_time_.IsMinusInfinity() || + next_process_time <= next_process_time_ - hold_back_window) { + // Schedule a new task since there is none currently scheduled + // (`next_process_time_` is infinite), or the new process time is at least + // one holdback window earlier than whatever is currently scheduled. + time_to_next_process = std::max(next_process_time - now, hold_back_window); } - // Calculate next process time. - TimeDelta time_to_next_process = - std::max(hold_back_window, next_send_time - now - early_execute_margin); - next_send_time = now + time_to_next_process; + if (time_to_next_process) { + // Set a new scheduled process time and post a delayed task. + next_process_time_ = next_process_time; - // If no in flight task or in flight task is later than `next_send_time`, - // schedule a new one. Previous in flight task will be retired. - if (next_process_time_.IsMinusInfinity() || - next_process_time_ > next_send_time) { task_queue_.PostDelayedHighPrecisionTask( - [this, next_send_time]() { MaybeProcessPackets(next_send_time); }, - time_to_next_process.RoundUpTo(TimeDelta::Millis(1)).ms()); - next_process_time_ = next_send_time; + [this, next_process_time]() { MaybeProcessPackets(next_process_time); }, + time_to_next_process->ms()); } + + UpdateStats(); } void TaskQueuePacedSender::UpdateStats() { diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index bf17de52f9..353f137963 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -14,7 +14,9 @@ #include #include +#include #include +#include #include #include "absl/types/optional.h" @@ -27,6 +29,7 @@ #include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "rtc_base/numerics/exp_filter.h" +#include "rtc_base/synchronization/mutex.h" #include "rtc_base/task_queue.h" #include "rtc_base/thread_annotations.h" @@ -36,8 +39,6 @@ class RtcEventLog; class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { public: - static const int kNoPacketHoldback; - // The `hold_back_window` parameter sets a lower bound on time to sleep if // there is currently a pacer queue and packets can't immediately be // processed. Increasing this reduces thread wakeups at the expense of higher @@ -50,7 +51,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { const WebRtcKeyValueConfig* field_trials, TaskQueueFactory* task_queue_factory, TimeDelta max_hold_back_window = PacingController::kMinSleepTime, - int max_hold_back_window_in_packets = kNoPacketHoldback); + int max_hold_back_window_in_packets = -1); ~TaskQueuePacedSender() override; @@ -155,7 +156,6 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { // Filtered size of enqueued packets, in bytes. rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_); - bool include_overhead_ RTC_GUARDED_BY(task_queue_); mutable Mutex stats_mutex_; Stats current_stats_ RTC_GUARDED_BY(stats_mutex_); diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index c2d1fbd095..d78365d499 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -37,6 +37,7 @@ constexpr uint32_t kVideoSsrc = 234565; constexpr uint32_t kVideoRtxSsrc = 34567; constexpr uint32_t kFlexFecSsrc = 45678; constexpr size_t kDefaultPacketSize = 1234; +constexpr int kNoPacketHoldback = -1; class MockPacketRouter : public PacketRouter { public: @@ -119,7 +120,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); + PacingController::kMinSleepTime, kNoPacketHoldback); // Insert a number of packets, covering one second. static constexpr size_t kPacketsToSend = 42; @@ -159,7 +160,7 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); + PacingController::kMinSleepTime, kNoPacketHoldback); // Insert a number of packets to be sent 200ms apart. const size_t kPacketsPerSecond = 5; @@ -211,7 +212,7 @@ TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); + PacingController::kMinSleepTime, kNoPacketHoldback); const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -240,11 +241,11 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback); + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -277,11 +278,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; - TaskQueuePacedSender pacer( - time_controller.GetClock(), &packet_router, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback); + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow, kNoPacketHoldback); // Set rates so one packet adds one ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -305,7 +306,7 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); } -TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) { +TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; @@ -313,7 +314,7 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); + PacingController::kMinSleepTime, kNoPacketHoldback); // Set rates so one packet adds 4ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -338,16 +339,15 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) { // Advance to less than 3ms before next packet send time. time_controller.AdvanceTime(TimeDelta::Micros(1001)); - // Trigger a probe at 2x the current pacing rate and insert the number of + // Trigger a probe at 4x the current pacing rate and insert the number of // packets the probe needs. const DataRate kProbeRate = 2 * kPacingDataRate; const int kProbeClusterId = 1; pacer.CreateProbeCluster(kProbeRate, kProbeClusterId); - // Expected size for each probe in a cluster is twice the expected bits sent - // during min_probe_delta. - // Expect one additional call since probe always starts with a small (1 byte) - // padding packet that's not counted into the probe rate here. + // Expected size for each probe in a cluster is twice the expected bits + // sent during min_probe_delta. + // Expect one additional call since probe always starts with a small const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2); const DataSize kProbeSize = kProbeRate * kProbeTimeDelta; const size_t kNumPacketsInProbe = @@ -381,7 +381,7 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); + PacingController::kMinSleepTime, kNoPacketHoldback); // Set rates so one packet adds 4ms of buffer level. const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); @@ -423,8 +423,8 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { // Verify the amount of probing data sent. // Probe always starts with a small (1 byte) padding packet that's not // counted into the probe rate here. - const DataSize kMinProbeSize = 2 * kMinProbeDelta * kProbingRate; - EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize); + EXPECT_EQ(data_sent, + kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1)); } TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) { @@ -534,7 +534,7 @@ TEST(TaskQueuePacedSenderTest, Stats) { time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), - PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback); + PacingController::kMinSleepTime, kNoPacketHoldback); // Simulate ~2mbps video stream, covering one second. static constexpr size_t kPacketsToSend = 200;