diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index a961f5b21b..fad90186b7 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -35,7 +35,8 @@ PacedSender::PacedSender(Clock* clock, : pacing_controller_(clock, static_cast(this), event_log, - field_trials), + field_trials, + PacingController::ProcessMode::kPeriodic), clock_(clock), packet_router_(packet_router), process_thread_(process_thread) { @@ -128,22 +129,9 @@ TimeDelta PacedSender::OldestPacketWaitTime() const { int64_t PacedSender::TimeUntilNextProcess() { rtc::CritScope cs(&critsect_); - // When paused we wake up every 500 ms to send a padding packet to ensure - // we won't get stuck in the paused state due to no feedback being received. - TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess(); - if (pacing_controller_.IsPaused()) { - return std::max(PacingController::kPausedProcessInterval - elapsed_time, - TimeDelta::Zero()) - .ms(); - } - - Timestamp next_probe = pacing_controller_.NextProbeTime(); - if (next_probe != Timestamp::PlusInfinity()) { - return std::max(TimeDelta::Zero(), next_probe - clock_->CurrentTime()).ms(); - } - - const TimeDelta min_packet_limit = TimeDelta::ms(5); - return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms(); + Timestamp next_send_time = pacing_controller_.NextSendTime(); + return std::max(TimeDelta::Zero(), next_send_time - clock_->CurrentTime()) + .ms(); } void PacedSender::Process() { diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 2d73247c10..6a8e203758 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -28,7 +28,11 @@ namespace { // Time limit in milliseconds between packet bursts. constexpr TimeDelta kDefaultMinPacketLimit = TimeDelta::Millis<5>(); constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis<500>(); +// TODO(sprang): Consider dropping this limit. +// The maximum debt level, in terms of time, capped when sending packets. +constexpr TimeDelta kMaxDebtInTime = TimeDelta::Millis<500>(); constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds<2>(); +constexpr DataSize kDefaultPaddingTarget = DataSize::Bytes<50>(); // Upper cap on process interval, in case process has not been called in a long // time. @@ -75,12 +79,15 @@ const TimeDelta PacingController::kMaxExpectedQueueLength = const float PacingController::kDefaultPaceMultiplier = 2.5f; const TimeDelta PacingController::kPausedProcessInterval = kCongestedPacketInterval; +const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis<1>(); PacingController::PacingController(Clock* clock, PacketSender* packet_sender, RtcEventLog* event_log, - const WebRtcKeyValueConfig* field_trials) - : clock_(clock), + const WebRtcKeyValueConfig* field_trials, + ProcessMode mode) + : mode_(mode), + clock_(clock), packet_sender_(packet_sender), fallback_field_trials_( !field_trials ? std::make_unique() : nullptr), @@ -97,13 +104,16 @@ PacingController::PacingController(Clock* clock, paused_(false), media_budget_(0), padding_budget_(0), + media_debt_(DataSize::Zero()), + padding_debt_(DataSize::Zero()), + media_rate_(DataRate::Zero()), + padding_rate_(DataRate::Zero()), prober_(*field_trials_), probing_send_failure_(false), - padding_failure_state_(false), pacing_bitrate_(DataRate::Zero()), - time_last_process_(clock->CurrentTime()), - last_send_time_(time_last_process_), - packet_queue_(time_last_process_, field_trials), + last_process_time_(clock->CurrentTime()), + last_send_time_(last_process_time_), + packet_queue_(last_process_time_, field_trials), packet_counter_(0), congestion_window_size_(DataSize::PlusInfinity()), outstanding_data_(DataSize::Zero()), @@ -145,11 +155,21 @@ bool PacingController::IsPaused() const { } void PacingController::SetCongestionWindow(DataSize congestion_window_size) { + const bool was_congested = Congested(); congestion_window_size_ = congestion_window_size; + if (was_congested && !Congested()) { + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime()); + UpdateBudgetWithElapsedTime(elapsed_time); + } } void PacingController::UpdateOutstandingData(DataSize outstanding_data) { + const bool was_congested = Congested(); outstanding_data_ = outstanding_data; + if (was_congested && !Congested()) { + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime()); + UpdateBudgetWithElapsedTime(elapsed_time); + } } bool PacingController::Congested() const { @@ -180,6 +200,8 @@ void PacingController::SetProbingEnabled(bool enabled) { void PacingController::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) { RTC_DCHECK_GT(pacing_rate, DataRate::Zero()); + media_rate_ = pacing_rate; + padding_rate_ = padding_rate; pacing_bitrate_ = pacing_rate; padding_budget_.set_target_rate_kbps(padding_rate.kbps()); @@ -241,12 +263,19 @@ void PacingController::EnqueuePacketInternal( packet->set_capture_time_ms(now.ms()); } + if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() && + media_debt_ == DataSize::Zero()) { + last_process_time_ = CurrentTime(); + } packet_queue_.Push(priority, now, packet_counter_++, std::move(packet)); } TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { - TimeDelta elapsed_time = now - time_last_process_; - time_last_process_ = now; + if (last_process_time_.IsMinusInfinity()) { + return TimeDelta::Zero(); + } + TimeDelta elapsed_time = now - last_process_time_; + last_process_time_ = now; if (elapsed_time > kMaxElapsedTime) { RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time.ms() << " ms) longer than expected, limiting to " @@ -257,60 +286,107 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { } bool PacingController::ShouldSendKeepalive(Timestamp now) const { - if (send_padding_if_silent_ || paused_ || Congested()) { + if (send_padding_if_silent_ || paused_ || Congested() || + packet_counter_ == 0) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. TimeDelta elapsed_since_last_send = now - last_send_time_; if (elapsed_since_last_send >= kCongestedPacketInterval) { - // We can not send padding unless a normal packet has first been sent. If - // we do, timestamps get messed up. - if (packet_counter_ > 0) { - return true; - } + return true; } } return false; } -Timestamp PacingController::NextProbeTime() { - if (!prober_.IsProbing()) { - return Timestamp::PlusInfinity(); - } - +Timestamp PacingController::NextSendTime() const { Timestamp now = CurrentTime(); - Timestamp probe_time = prober_.NextProbeTime(now); - if (probe_time.IsInfinite()) { - return probe_time; + + // If probing is active, that always takes priority. + if (prober_.IsProbing()) { + Timestamp probe_time = prober_.NextProbeTime(now); + // |probe_time| == PlusInfinity indicates no probe scheduled. + if (probe_time != Timestamp::PlusInfinity() && !probing_send_failure_) { + return probe_time; + } } - if (probe_time <= now && probing_send_failure_) { - return Timestamp::PlusInfinity(); + if (mode_ == ProcessMode::kPeriodic) { + // In periodc non-probing mode, we just have a fixed interval. + if (paused_) { + return last_send_time_ + kPausedProcessInterval; + } + return last_process_time_ + min_packet_limit_; } - return probe_time; -} + // In dynamic mode, figure out when the next packet should be sent, + // given the current conditions. -TimeDelta PacingController::TimeElapsedSinceLastProcess() const { - return CurrentTime() - time_last_process_; + if (Congested() || packet_counter_ == 0) { + // If congested, we only send keep-alive or audio (if audio is + // configured in pass-through mode). + if (!pace_audio_ && packet_queue_.NextPacketIsAudio()) { + return now; + } + + // We need to at least send keep-alive packets with some interval. + return last_send_time_ + kCongestedPacketInterval; + } + + // If there are pending packets, check how long it will take until buffers + // have emptied. + if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { + return std::min(last_send_time_ + kPausedProcessInterval, + last_process_time_ + media_debt_ / media_rate_); + } + + // If we _don't_ have pending packets, check how long until we have + // bandwidth for padding packets. + if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { + return std::min(last_send_time_ + kPausedProcessInterval, + last_process_time_ + padding_debt_ / padding_rate_); + } + + return last_send_time_ + kPausedProcessInterval; } void PacingController::ProcessPackets() { Timestamp now = CurrentTime(); - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); - if (ShouldSendKeepalive(now)) { - DataSize keepalive_data_sent = DataSize::Zero(); - std::vector> keepalive_packets = - packet_sender_->GeneratePadding(DataSize::bytes(1)); - for (auto& packet : keepalive_packets) { - keepalive_data_sent += - DataSize::bytes(packet->payload_size() + packet->padding_size()); - packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo()); + RTC_DCHECK_GE(now, last_process_time_); + Timestamp target_send_time = now; + if (mode_ == ProcessMode::kDynamic) { + target_send_time = NextSendTime(); + if (target_send_time.IsMinusInfinity()) { + target_send_time = now; + } else if (now + kMinSleepTime < target_send_time) { + // We are too early, abort and regroup! + return; } - OnPaddingSent(keepalive_data_sent); } - if (paused_) + Timestamp previous_process_time = last_process_time_; + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); + + if (ShouldSendKeepalive(now)) { + // We can not send padding unless a normal packet has first been sent. If + // we do, timestamps get messed up. + if (packet_counter_ == 0) { + last_send_time_ = now; + } else { + DataSize keepalive_data_sent = DataSize::Zero(); + std::vector> keepalive_packets = + packet_sender_->GeneratePadding(DataSize::bytes(1)); + for (auto& packet : keepalive_packets) { + keepalive_data_sent += + DataSize::bytes(packet->payload_size() + packet->padding_size()); + packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo()); + } + OnPaddingSent(keepalive_data_sent); + } + } + + if (paused_) { return; + } if (elapsed_time > TimeDelta::Zero()) { DataRate target_rate = pacing_bitrate_; @@ -319,7 +395,7 @@ void PacingController::ProcessPackets() { // Assuming equal size packets and input/output rate, the average packet // has avg_time_left_ms left to get queue_size_bytes out of the queue, if // time constraint shall be met. Determine bitrate needed for that. - packet_queue_.UpdateQueueTime(CurrentTime()); + packet_queue_.UpdateQueueTime(now); if (drain_large_queues_) { TimeDelta avg_time_left = std::max(TimeDelta::ms(1), @@ -333,8 +409,15 @@ void PacingController::ProcessPackets() { } } - media_budget_.set_target_rate_kbps(target_rate.kbps()); - UpdateBudgetWithElapsedTime(elapsed_time); + if (mode_ == ProcessMode::kPeriodic) { + // In periodic processing mode, the IntevalBudget allows positive budget + // up to (process interval duration) * (target rate), so we only need to + // update it once before the packet sending loop. + media_budget_.set_target_rate_kbps(target_rate.kbps()); + UpdateBudgetWithElapsedTime(elapsed_time); + } else { + media_rate_ = target_rate; + } } bool first_packet_in_probe = false; @@ -348,6 +431,7 @@ void PacingController::ProcessPackets() { } DataSize data_sent = DataSize::Zero(); + // The paused state is checked in the loop since it leaves the critical // section allowing the paused state to be changed from other code. while (!paused_) { @@ -367,7 +451,19 @@ void PacingController::ProcessPackets() { first_packet_in_probe = false; } - auto* packet = GetPendingPacket(pacing_info); + if (mode_ == ProcessMode::kDynamic && + previous_process_time < target_send_time) { + // Reduce buffer levels with amount corresponding to time between last + // process and target send time for the next packet. + // If the process call is late, that may be the time between the optimal + // send times for two packets we should already have sent. + UpdateBudgetWithElapsedTime(target_send_time - previous_process_time); + previous_process_time = target_send_time; + } + + // Fetch the next packet, so long as queue is not empty or budget is not + // exhausted. + auto* packet = GetPendingPacket(pacing_info, target_send_time, now); if (packet == nullptr) { // No packet available to send, check if we should send padding. DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent); @@ -394,10 +490,22 @@ void PacingController::ProcessPackets() { packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info); data_sent += packet->size(); - // Send succeeded, remove it from the queue. - OnPacketSent(packet); + // Send succeeded, remove it from the queue and update send/process time to + // the target send time. + OnPacketSent(packet, target_send_time); if (recommended_probe_size && data_sent > *recommended_probe_size) break; + + if (mode_ == ProcessMode::kDynamic) { + // Update target send time in case that are more packets that we are late + // in processing. + Timestamp next_send_time = NextSendTime(); + if (next_send_time.IsMinusInfinity()) { + target_send_time = now; + } else { + target_send_time = std::min(now, next_send_time); + } + } } if (is_probing) { @@ -410,7 +518,7 @@ void PacingController::ProcessPackets() { DataSize PacingController::PaddingToAdd( absl::optional recommended_probe_size, - DataSize data_sent) { + DataSize data_sent) const { if (!packet_queue_.Empty()) { // Actual payload available, no need to add padding. return DataSize::Zero(); @@ -434,66 +542,105 @@ DataSize PacingController::PaddingToAdd( return DataSize::Zero(); } - return DataSize::bytes(padding_budget_.bytes_remaining()); + if (mode_ == ProcessMode::kPeriodic) { + return DataSize::bytes(padding_budget_.bytes_remaining()); + } else if (padding_rate_ > DataRate::Zero() && + padding_debt_ == DataSize::Zero()) { + return kDefaultPaddingTarget; + } + return DataSize::Zero(); } RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket( - const PacedPacketInfo& pacing_info) { + const PacedPacketInfo& pacing_info, + Timestamp target_send_time, + Timestamp now) { if (packet_queue_.Empty()) { return nullptr; } - // Since we need to release the lock in order to send, we first pop the - // element from the priority queue but keep it in storage, so that we can - // reinsert it if send fails. - RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop(); - bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; - bool apply_pacing = !audio_packet || pace_audio_; - if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 && - pacing_info.probe_cluster_id == - PacedPacketInfo::kNotAProbe))) { - packet_queue_.CancelPop(); - return nullptr; + // First, check if there is any reason _not_ to send the next queued packet. + + // Unpaced audio packets and probes are exempted from send checks. + bool unpaced_audio_packet = !pace_audio_ && packet_queue_.NextPacketIsAudio(); + bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe; + if (!unpaced_audio_packet && !is_probe) { + if (Congested()) { + // Don't send anyting if congested. + return nullptr; + } + + if (mode_ == ProcessMode::kPeriodic) { + if (media_budget_.bytes_remaining() <= 0) { + // Not enough budget. + return nullptr; + } + } else { + if (now <= target_send_time) { + // We allow sending slightly early if we think that we would actually + // had been able to, had we been right on time - i.e. the current debt + // is not more than would be reduced to zero at the target sent time. + TimeDelta flush_time = media_debt_ / media_rate_; + if (now + flush_time > target_send_time) { + return nullptr; + } + } else { + // In dynamic mode we should never try get a non-probe packet until + // the media debt is actually zero. + RTC_DCHECK(media_debt_.IsZero()); + } + } } - return packet; + + return packet_queue_.BeginPop(); } -void PacingController::OnPacketSent( - RoundRobinPacketQueue::QueuedPacket* packet) { - Timestamp now = CurrentTime(); +void PacingController::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet, + Timestamp send_time) { if (!first_sent_packet_time_) { - first_sent_packet_time_ = now; + first_sent_packet_time_ = send_time; } bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; if (!audio_packet || account_for_audio_) { // Update media bytes sent. UpdateBudgetWithSentData(packet->size()); - last_send_time_ = now; } + last_send_time_ = send_time; + last_process_time_ = send_time; // Send succeeded, remove it from the queue. packet_queue_.FinalizePop(); - padding_failure_state_ = false; } void PacingController::OnPaddingSent(DataSize data_sent) { if (data_sent > DataSize::Zero()) { UpdateBudgetWithSentData(data_sent); - } else { - padding_failure_state_ = true; } last_send_time_ = CurrentTime(); + last_process_time_ = CurrentTime(); } void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { - delta = std::min(kMaxProcessingInterval, delta); - media_budget_.IncreaseBudget(delta.ms()); - padding_budget_.IncreaseBudget(delta.ms()); + if (mode_ == ProcessMode::kPeriodic) { + delta = std::min(kMaxProcessingInterval, delta); + media_budget_.IncreaseBudget(delta.ms()); + padding_budget_.IncreaseBudget(delta.ms()); + } else { + media_debt_ -= std::min(media_debt_, media_rate_ * delta); + padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta); + } } void PacingController::UpdateBudgetWithSentData(DataSize size) { outstanding_data_ += size; - media_budget_.UseBudget(size.bytes()); - padding_budget_.UseBudget(size.bytes()); + if (mode_ == ProcessMode::kPeriodic) { + media_budget_.UseBudget(size.bytes()); + padding_budget_.UseBudget(size.bytes()); + } else { + media_debt_ += size; + media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime); + padding_debt_ += size; + padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime); + } } void PacingController::SetQueueTimeLimit(TimeDelta limit) { diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index 6f3f9fb487..d6b5abfdf4 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -44,6 +44,13 @@ namespace webrtc { // class PacingController { public: + // Periodic mode uses the IntervalBudget class for tracking bitrate + // budgets, and expected ProcessPackets() to be called a fixed rate, + // e.g. every 5ms as implemented by PacedSender. + // Dynamic mode allows for arbitrary time delta between calls to + // ProcessPackets. + enum class ProcessMode { kPeriodic, kDynamic }; + class PacketSender { public: virtual ~PacketSender() = default; @@ -69,10 +76,13 @@ class PacingController { // to lack of feedback. static const TimeDelta kPausedProcessInterval; + static const TimeDelta kMinSleepTime; + PacingController(Clock* clock, PacketSender* packet_sender, RtcEventLog* event_log, - const WebRtcKeyValueConfig* field_trials); + const WebRtcKeyValueConfig* field_trials, + ProcessMode mode); ~PacingController(); @@ -118,16 +128,8 @@ class PacingController { // effect. void SetProbingEnabled(bool enabled); - // Time at which next probe should be sent. If this value is set, it should be - // respected - i.e. don't call ProcessPackets() before this specified time as - // that can have unintended side effects. - // If no scheduled probe, Timestamp::PlusInifinity() is returned. - Timestamp NextProbeTime(); - - // Time since ProcessPackets() was last executed. - TimeDelta TimeElapsedSinceLastProcess() const; - - TimeDelta TimeUntilAvailableBudget() const; + // Returns the next time we expect ProcessPackets() to be called. + Timestamp NextSendTime() const; // Check queue of pending packets and send them or padding packets, if budget // is available. @@ -146,15 +148,19 @@ class PacingController { void UpdateBudgetWithSentData(DataSize size); DataSize PaddingToAdd(absl::optional recommended_probe_size, - DataSize data_sent); + DataSize data_sent) const; RoundRobinPacketQueue::QueuedPacket* GetPendingPacket( - const PacedPacketInfo& pacing_info); - void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet); + const PacedPacketInfo& pacing_info, + Timestamp target_send_time, + Timestamp now); + void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet, + Timestamp send_time); void OnPaddingSent(DataSize padding_sent); Timestamp CurrentTime() const; + const ProcessMode mode_; Clock* const clock_; PacketSender* const packet_sender_; const std::unique_ptr fallback_field_trials_; @@ -164,12 +170,18 @@ class PacingController { const bool send_padding_if_silent_; const bool pace_audio_; const bool small_first_probe_packet_; + TimeDelta min_packet_limit_; // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic. // The last millisecond timestamp returned by |clock_|. mutable Timestamp last_timestamp_; bool paused_; + + // If |use_interval_budget_| is true, |media_budget_| and |padding_budget_| + // will be used to track when packets can be sent. Otherwise the media and + // padding debt counters will be used together with the target rates. + // This is the media budget, keeping track of how many bits of media // we can pace out during the current interval. IntervalBudget media_budget_; @@ -178,13 +190,17 @@ class PacingController { // utilized when there's no media to send. IntervalBudget padding_budget_; + DataSize media_debt_; + DataSize padding_debt_; + DataRate media_rate_; + DataRate padding_rate_; + BitrateProber prober_; bool probing_send_failure_; - bool padding_failure_state_; DataRate pacing_bitrate_; - Timestamp time_last_process_; + Timestamp last_process_time_; Timestamp last_send_time_; absl::optional first_sent_packet_time_; diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index bd2dd1de02..5b5f6e71c1 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -114,10 +114,12 @@ class PacingControllerPadding : public PacingController::PacketSender { public: static const size_t kPaddingPacketSize = 224; - PacingControllerPadding() : padding_sent_(0) {} + PacingControllerPadding() : padding_sent_(0), total_bytes_sent_(0) {} void SendRtpPacket(std::unique_ptr packet, - const PacedPacketInfo& pacing_info) override {} + const PacedPacketInfo& pacing_info) override { + total_bytes_sent_ += packet->payload_size(); + } std::vector> GeneratePadding( DataSize target_size) override { @@ -134,9 +136,11 @@ class PacingControllerPadding : public PacingController::PacketSender { } size_t padding_sent() { return padding_sent_; } + size_t total_bytes_sent() { return total_bytes_sent_; } private: size_t padding_sent_; + size_t total_bytes_sent_; }; class PacingControllerProbing : public PacingController::PacketSender { @@ -177,16 +181,21 @@ class PacingControllerProbing : public PacingController::PacketSender { int padding_sent_; }; -class PacingControllerTest : public ::testing::Test { +class PacingControllerTest + : public ::testing::TestWithParam { protected: PacingControllerTest() : clock_(123456) { srand(0); // Need to initialize PacingController after we initialize clock. pacer_ = std::make_unique(&clock_, &callback_, nullptr, - nullptr); + nullptr, GetParam()); Init(); } + bool PeriodicProcess() const { + return GetParam() == PacingController::ProcessMode::kPeriodic; + } + void Init() { pacer_->CreateProbeCluster(kFirstClusterRate, /*cluster_id=*/0); pacer_->CreateProbeCluster(kSecondClusterRate, /*cluster_id=*/1); @@ -245,22 +254,43 @@ class PacingControllerTest : public ::testing::Test { } TimeDelta TimeUntilNextProcess() { - // TODO(bugs.webrtc.org/10809): Replace this with TimeUntilAvailableBudget() - // once ported from WIP code. For now, emulate PacedSender method. + Timestamp now = clock_.CurrentTime(); + return std::max(pacer_->NextSendTime() - now, TimeDelta::Zero()); + } - TimeDelta elapsed_time = pacer_->TimeElapsedSinceLastProcess(); - if (pacer_->IsPaused()) { - return std::max(PacingController::kPausedProcessInterval - elapsed_time, - TimeDelta::Zero()); + void AdvanceTimeAndProcess() { + Timestamp now = clock_.CurrentTime(); + Timestamp next_send_time = pacer_->NextSendTime(); + clock_.AdvanceTime(std::max(TimeDelta::Zero(), next_send_time - now)); + pacer_->ProcessPackets(); + } + + void ConsumeInitialBudget() { + const uint32_t kSsrc = 54321; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + const size_t kPacketSize = 250; + + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * kPacketSize * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, kSsrc, + sequence_number++, capture_time_ms, kPacketSize); } - Timestamp next_probe = pacer_->NextProbeTime(); - if (next_probe != Timestamp::PlusInfinity()) { - return std::max(TimeDelta::Zero(), next_probe - clock_.CurrentTime()); + while (pacer_->QueueSizePackets() > 0) { + if (PeriodicProcess()) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); + } } - - const TimeDelta min_packet_limit = TimeDelta::ms(5); - return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()); } SimulatedClock clock_; @@ -268,7 +298,8 @@ class PacingControllerTest : public ::testing::Test { std::unique_ptr pacer_; }; -class PacingControllerFieldTrialTest : public ::testing::Test { +class PacingControllerFieldTrialTest + : public ::testing::TestWithParam { protected: struct MediaStream { const RtpPacketToSend::Type type; @@ -286,7 +317,17 @@ class PacingControllerFieldTrialTest : public ::testing::Test { clock_.TimeInMilliseconds(), stream->packet_size)); } void ProcessNext(PacingController* pacer) { - clock_.AdvanceTimeMilliseconds(5); + if (GetParam() == PacingController::ProcessMode::kPeriodic) { + TimeDelta process_interval = TimeDelta::ms(5); + clock_.AdvanceTime(process_interval); + pacer->ProcessPackets(); + return; + } + + Timestamp now = clock_.CurrentTime(); + Timestamp next_send_time = pacer->NextSendTime(); + TimeDelta wait_time = std::max(TimeDelta::Zero(), next_send_time - now); + clock_.AdvanceTime(wait_time); pacer->ProcessPackets(); } MediaStream audio{/*type*/ RtpPacketToSend::Type::kAudio, @@ -297,8 +338,8 @@ class PacingControllerFieldTrialTest : public ::testing::Test { MockPacingControllerCallback callback_; }; -TEST_F(PacingControllerFieldTrialTest, DefaultNoPaddingInSilence) { - PacingController pacer(&clock_, &callback_, nullptr, nullptr); +TEST_P(PacingControllerFieldTrialTest, DefaultNoPaddingInSilence) { + PacingController pacer(&clock_, &callback_, nullptr, nullptr, GetParam()); pacer.SetPacingRates(kTargetRate, DataRate::Zero()); // Video packet to reset last send time and provide padding data. InsertPacket(&pacer, &video); @@ -311,9 +352,9 @@ TEST_F(PacingControllerFieldTrialTest, DefaultNoPaddingInSilence) { pacer.ProcessPackets(); } -TEST_F(PacingControllerFieldTrialTest, PaddingInSilenceWithTrial) { +TEST_P(PacingControllerFieldTrialTest, PaddingInSilenceWithTrial) { ScopedFieldTrials trial("WebRTC-Pacer-PadInSilence/Enabled/"); - PacingController pacer(&clock_, &callback_, nullptr, nullptr); + PacingController pacer(&clock_, &callback_, nullptr, nullptr, GetParam()); pacer.SetPacingRates(kTargetRate, DataRate::Zero()); // Video packet to reset last send time and provide padding data. InsertPacket(&pacer, &video); @@ -326,11 +367,11 @@ TEST_F(PacingControllerFieldTrialTest, PaddingInSilenceWithTrial) { pacer.ProcessPackets(); } -TEST_F(PacingControllerFieldTrialTest, DefaultCongestionWindowAffectsAudio) { +TEST_P(PacingControllerFieldTrialTest, DefaultCongestionWindowAffectsAudio) { EXPECT_CALL(callback_, SendPadding).Times(0); - PacingController pacer(&clock_, &callback_, nullptr, nullptr); - pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero()); - pacer.SetCongestionWindow(DataSize::bytes(800)); + PacingController pacer(&clock_, &callback_, nullptr, nullptr, GetParam()); + pacer.SetPacingRates(DataRate::kbps(10000), DataRate::Zero()); + pacer.SetCongestionWindow(DataSize::bytes(video.packet_size - 100)); pacer.UpdateOutstandingData(DataSize::Zero()); // Video packet fills congestion window. InsertPacket(&pacer, &video); @@ -339,6 +380,10 @@ TEST_F(PacingControllerFieldTrialTest, DefaultCongestionWindowAffectsAudio) { // Audio packet blocked due to congestion. InsertPacket(&pacer, &audio); EXPECT_CALL(callback_, SendPacket).Times(0); + if (GetParam() == PacingController::ProcessMode::kDynamic) { + // Without interval budget we'll forward time to where we send keep-alive. + EXPECT_CALL(callback_, SendPadding(1)).Times(2); + } ProcessNext(&pacer); ProcessNext(&pacer); // Audio packet unblocked when congestion window clear. @@ -348,11 +393,11 @@ TEST_F(PacingControllerFieldTrialTest, DefaultCongestionWindowAffectsAudio) { ProcessNext(&pacer); } -TEST_F(PacingControllerFieldTrialTest, +TEST_P(PacingControllerFieldTrialTest, CongestionWindowDoesNotAffectAudioInTrial) { ScopedFieldTrials trial("WebRTC-Pacer-BlockAudio/Disabled/"); EXPECT_CALL(callback_, SendPadding).Times(0); - PacingController pacer(&clock_, &callback_, nullptr, nullptr); + PacingController pacer(&clock_, &callback_, nullptr, nullptr, GetParam()); pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero()); pacer.SetCongestionWindow(DataSize::bytes(800)); pacer.UpdateOutstandingData(DataSize::Zero()); @@ -366,31 +411,39 @@ TEST_F(PacingControllerFieldTrialTest, ProcessNext(&pacer); } -TEST_F(PacingControllerFieldTrialTest, DefaultBudgetAffectsAudio) { - PacingController pacer(&clock_, &callback_, nullptr, nullptr); - pacer.SetPacingRates( - DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond), - DataRate::Zero()); +TEST_P(PacingControllerFieldTrialTest, DefaultBudgetAffectsAudio) { + PacingController pacer(&clock_, &callback_, nullptr, nullptr, GetParam()); + DataRate pacing_rate = + DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond); + pacer.SetPacingRates(pacing_rate, DataRate::Zero()); // Video fills budget for following process periods. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); // Audio packet blocked due to budget limit. - EXPECT_CALL(callback_, SendPacket).Times(0); InsertPacket(&pacer, &audio); - ProcessNext(&pacer); - ProcessNext(&pacer); - ::testing::Mock::VerifyAndClearExpectations(&callback_); - // Audio packet unblocked when the budget has recovered. - EXPECT_CALL(callback_, SendPacket).Times(1); - ProcessNext(&pacer); - ProcessNext(&pacer); + Timestamp wait_start_time = clock_.CurrentTime(); + Timestamp wait_end_time = Timestamp::MinusInfinity(); + EXPECT_CALL(callback_, SendPacket) + .WillOnce([&](uint32_t ssrc, uint16_t sequence_number, + int64_t capture_timestamp, bool retransmission, + bool padding) { wait_end_time = clock_.CurrentTime(); }); + while (!wait_end_time.IsFinite()) { + ProcessNext(&pacer); + } + const TimeDelta expected_wait_time = + DataSize::bytes(video.packet_size) / pacing_rate; + // Verify delay is near expectation, within timing margin. + EXPECT_LT(((wait_end_time - wait_start_time) - expected_wait_time).Abs(), + GetParam() == PacingController::ProcessMode::kPeriodic + ? TimeDelta::ms(5) + : PacingController::kMinSleepTime); } -TEST_F(PacingControllerFieldTrialTest, BudgetDoesNotAffectAudioInTrial) { +TEST_P(PacingControllerFieldTrialTest, BudgetDoesNotAffectAudioInTrial) { ScopedFieldTrials trial("WebRTC-Pacer-BlockAudio/Disabled/"); EXPECT_CALL(callback_, SendPadding).Times(0); - PacingController pacer(&clock_, &callback_, nullptr, nullptr); + PacingController pacer(&clock_, &callback_, nullptr, nullptr, GetParam()); pacer.SetPacingRates( DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond), DataRate::Zero()); @@ -404,7 +457,11 @@ TEST_F(PacingControllerFieldTrialTest, BudgetDoesNotAffectAudioInTrial) { ProcessNext(&pacer); } -TEST_F(PacingControllerTest, FirstSentPacketTimeIsSet) { +INSTANTIATE_TEST_SUITE_P(WithAndWithoutIntervalBudget, + PacingControllerFieldTrialTest, + ::testing::Values(false, true)); + +TEST_P(PacingControllerTest, FirstSentPacketTimeIsSet) { uint16_t sequence_number = 1234; const uint32_t kSsrc = 12345; const size_t kSizeBytes = 250; @@ -417,33 +474,44 @@ TEST_F(PacingControllerTest, FirstSentPacketTimeIsSet) { for (size_t i = 0; i < kPacketToSend; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, kSsrc, sequence_number++, clock_.TimeInMilliseconds(), kSizeBytes); - pacer_->ProcessPackets(); clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); } EXPECT_EQ(kStartTime, pacer_->FirstSentPacketTime()); } -TEST_F(PacingControllerTest, QueuePacket) { +TEST_P(PacingControllerTest, QueuePacket) { + if (!PeriodicProcess()) { + // This test checks behavior applicable only when using interval budget. + return; + } + uint32_t ssrc = 12345; uint16_t sequence_number = 1234; - // Due to the multiplicative factor we can send 5 packets during a send + // Due to the multiplicative factor we can send 5 packets during a 5ms send // interval. (network capacity * multiplier / (8 bits per byte * // (packet size * #send intervals per second) - const size_t packets_to_send = + const size_t kPacketsToSend = kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); - for (size_t i = 0; i < packets_to_send; ++i) { + for (size_t i = 0; i < kPacketsToSend; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), 250); } + EXPECT_CALL(callback_, SendPadding).Times(0); + // Enqueue one extra packet. int64_t queued_packet_timestamp = clock_.TimeInMilliseconds(); Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, queued_packet_timestamp, 250); - EXPECT_EQ(packets_to_send + 1, pacer_->QueueSizePackets()); + EXPECT_EQ(kPacketsToSend + 1, pacer_->QueueSizePackets()); + + // The first kPacketsToSend packets will be sent with budget from the + // initial 5ms interval. pacer_->ProcessPackets(); - EXPECT_CALL(callback_, SendPadding).Times(0); - clock_.AdvanceTimeMilliseconds(5); EXPECT_EQ(1u, pacer_->QueueSizePackets()); + + // Advance time to next interval, make sure the last packet is sent. + clock_.AdvanceTimeMilliseconds(5); EXPECT_CALL(callback_, SendPacket(ssrc, sequence_number++, queued_packet_timestamp, false, false)) .Times(1); @@ -453,62 +521,133 @@ TEST_F(PacingControllerTest, QueuePacket) { // We can send packets_to_send -1 packets of size 250 during the current // interval since one packet has already been sent. - for (size_t i = 0; i < packets_to_send - 1; ++i) { + for (size_t i = 0; i < kPacketsToSend - 1; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), 250); } Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), 250); - EXPECT_EQ(packets_to_send, pacer_->QueueSizePackets()); + EXPECT_EQ(kPacketsToSend, pacer_->QueueSizePackets()); pacer_->ProcessPackets(); EXPECT_EQ(1u, pacer_->QueueSizePackets()); } -TEST_F(PacingControllerTest, PaceQueuedPackets) { +TEST_P(PacingControllerTest, QueueAndPacePackets) { + if (PeriodicProcess()) { + // This test checks behavior when not using interval budget. + return; + } + + const uint32_t kSsrc = 12345; + uint16_t sequence_number = 1234; + const DataSize kPackeSize = DataSize::bytes(250); + const TimeDelta kSendInterval = TimeDelta::ms(5); + + // Due to the multiplicative factor we can send 5 packets during a 5ms send + // interval. (send interval * network capacity * multiplier / packet size) + const size_t kPacketsToSend = (kSendInterval * kTargetRate).bytes() * + kPaceMultiplier / kPackeSize.bytes(); + + for (size_t i = 0; i < kPacketsToSend; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, kSsrc, sequence_number++, + clock_.TimeInMilliseconds(), kPackeSize.bytes()); + } + EXPECT_CALL(callback_, SendPadding).Times(0); + + // Enqueue one extra packet. + int64_t queued_packet_timestamp = clock_.TimeInMilliseconds(); + Send(RtpPacketToSend::Type::kVideo, kSsrc, sequence_number, + queued_packet_timestamp, kPackeSize.bytes()); + EXPECT_EQ(kPacketsToSend + 1, pacer_->QueueSizePackets()); + + // Send packets until the initial kPacketsToSend packets are done. + Timestamp start_time = clock_.CurrentTime(); + while (pacer_->QueueSizePackets() > 1) { + AdvanceTimeAndProcess(); + } + EXPECT_LT(clock_.CurrentTime() - start_time, kSendInterval); + + // Proceed till last packet can be sent. + EXPECT_CALL(callback_, SendPacket(kSsrc, sequence_number, + queued_packet_timestamp, false, false)) + .Times(1); + AdvanceTimeAndProcess(); + EXPECT_GE(clock_.CurrentTime() - start_time, kSendInterval); + EXPECT_EQ(pacer_->QueueSizePackets(), 0u); +} + +TEST_P(PacingControllerTest, PaceQueuedPackets) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; + const size_t kPacketSize = 250; // Due to the multiplicative factor we can send 5 packets during a send // interval. (network capacity * multiplier / (8 bits per byte * // (packet size * #send intervals per second) const size_t packets_to_send_per_interval = - kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + kTargetRate.bps() * kPaceMultiplier / (8 * kPacketSize * 200); for (size_t i = 0; i < packets_to_send_per_interval; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), 250); + clock_.TimeInMilliseconds(), kPacketSize); } for (size_t j = 0; j < packets_to_send_per_interval * 10; ++j) { Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), 250); + clock_.TimeInMilliseconds(), kPacketSize); } EXPECT_EQ(packets_to_send_per_interval + packets_to_send_per_interval * 10, pacer_->QueueSizePackets()); - pacer_->ProcessPackets(); - EXPECT_EQ(packets_to_send_per_interval * 10, pacer_->QueueSizePackets()); - EXPECT_CALL(callback_, SendPadding).Times(0); - for (int k = 0; k < 10; ++k) { - clock_.AdvanceTime(TimeUntilNextProcess()); - EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, false)) - .Times(packets_to_send_per_interval); + if (PeriodicProcess()) { pacer_->ProcessPackets(); + } else { + while (pacer_->QueueSizePackets() > packets_to_send_per_interval * 10) { + AdvanceTimeAndProcess(); + } } + EXPECT_EQ(pacer_->QueueSizePackets(), packets_to_send_per_interval * 10); + EXPECT_CALL(callback_, SendPadding).Times(0); + + EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, false)) + .Times(pacer_->QueueSizePackets()); + const TimeDelta expected_pace_time = + DataSize::bytes(pacer_->QueueSizePackets() * kPacketSize) / + (kPaceMultiplier * kTargetRate); + Timestamp start_time = clock_.CurrentTime(); + while (pacer_->QueueSizePackets() > 0) { + if (PeriodicProcess()) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); + } + } + const TimeDelta actual_pace_time = clock_.CurrentTime() - start_time; + EXPECT_LT( + (actual_pace_time - expected_pace_time).Abs(), + PeriodicProcess() ? TimeDelta::ms(5) : PacingController::kMinSleepTime); + EXPECT_EQ(0u, pacer_->QueueSizePackets()); clock_.AdvanceTime(TimeUntilNextProcess()); EXPECT_EQ(0u, pacer_->QueueSizePackets()); pacer_->ProcessPackets(); + // Send some more packet, just show that we can..? for (size_t i = 0; i < packets_to_send_per_interval; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), 250); } - Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, - clock_.TimeInMilliseconds(), 250); - pacer_->ProcessPackets(); - EXPECT_EQ(1u, pacer_->QueueSizePackets()); + EXPECT_EQ(packets_to_send_per_interval, pacer_->QueueSizePackets()); + if (PeriodicProcess()) { + pacer_->ProcessPackets(); + } else { + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + AdvanceTimeAndProcess(); + } + } + EXPECT_EQ(0u, pacer_->QueueSizePackets()); } -TEST_F(PacingControllerTest, RepeatedRetransmissionsAllowed) { +TEST_P(PacingControllerTest, RepeatedRetransmissionsAllowed) { // Send one packet, then two retransmissions of that packet. for (size_t i = 0; i < 3; i++) { constexpr uint32_t ssrc = 333; @@ -521,10 +660,16 @@ TEST_F(PacingControllerTest, RepeatedRetransmissionsAllowed) { ssrc, sequence_number, clock_.TimeInMilliseconds(), bytes); clock_.AdvanceTimeMilliseconds(5); } - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + pacer_->ProcessPackets(); + } else { + while (pacer_->QueueSizePackets() > 0) { + AdvanceTimeAndProcess(); + } + } } -TEST_F(PacingControllerTest, +TEST_P(PacingControllerTest, CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; @@ -537,46 +682,93 @@ TEST_F(PacingControllerTest, clock_.TimeInMilliseconds(), 250); clock_.AdvanceTimeMilliseconds(1000); - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + pacer_->ProcessPackets(); + } else { + while (pacer_->QueueSizePackets() > 0) { + AdvanceTimeAndProcess(); + } + } } -TEST_F(PacingControllerTest, Padding) { +TEST_P(PacingControllerTest, Padding) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; + const size_t kPacketSize = 250; pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); - // Due to the multiplicative factor we can send 5 packets during a send - // interval. (network capacity * multiplier / (8 bits per byte * - // (packet size * #send intervals per second) - const size_t packets_to_send_per_interval = - kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); - for (size_t i = 0; i < packets_to_send_per_interval; ++i) { - SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), 250); + if (PeriodicProcess()) { + ConsumeInitialBudget(); + + // 5 milliseconds later should not send padding since we filled the buffers + // initially. + EXPECT_CALL(callback_, SendPadding(kPacketSize)).Times(0); + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + + // 5 milliseconds later we have enough budget to send some padding. + EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(kPacketSize)); + EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + const size_t kPacketsToSend = 20; + for (size_t i = 0; i < kPacketsToSend; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize); + } + const TimeDelta expected_pace_time = + DataSize::bytes(pacer_->QueueSizePackets() * kPacketSize) / + (kPaceMultiplier * kTargetRate); + EXPECT_CALL(callback_, SendPadding).Times(0); + // Only the media packets should be sent. + Timestamp start_time = clock_.CurrentTime(); + while (pacer_->QueueSizePackets() > 0) { + AdvanceTimeAndProcess(); + } + const TimeDelta actual_pace_time = clock_.CurrentTime() - start_time; + EXPECT_LE((actual_pace_time - expected_pace_time).Abs(), + PacingController::kMinSleepTime); + + // Pacing media happens 2.5x factor, but padding was configured with 1.0x + // factor. We have to wait until the padding debt is gone before we start + // sending padding. + const TimeDelta time_to_padding_debt_free = + (expected_pace_time * kPaceMultiplier) - actual_pace_time; + TimeDelta time_to_next = pacer_->NextSendTime() - clock_.CurrentTime(); + EXPECT_EQ(time_to_next, time_to_padding_debt_free); + clock_.AdvanceTime(time_to_next); + + // Send 10 padding packets. + const size_t kPaddingPacketsToSend = 10; + DataSize padding_sent = DataSize::Zero(); + EXPECT_CALL(callback_, SendPadding) + .Times(kPaddingPacketsToSend) + .WillRepeatedly([&](size_t target_size) { + padding_sent += DataSize::bytes(target_size); + return target_size; + }); + EXPECT_CALL(callback_, SendPacket(_, _, _, false, true)) + .Times(kPaddingPacketsToSend); + const Timestamp padding_start_time = clock_.CurrentTime(); + for (size_t i = 0; i < kPaddingPacketsToSend; ++i) { + AdvanceTimeAndProcess(); + } + + // Verify rate of sent padding. + TimeDelta padding_duration = pacer_->NextSendTime() - padding_start_time; + DataRate padding_rate = padding_sent / padding_duration; + EXPECT_EQ(padding_rate, kTargetRate); } - // No padding is expected since we have sent too much already. - EXPECT_CALL(callback_, SendPadding).Times(0); - pacer_->ProcessPackets(); - EXPECT_EQ(0u, pacer_->QueueSizePackets()); - - // 5 milliseconds later should not send padding since we filled the buffers - // initially. - EXPECT_CALL(callback_, SendPadding(250)).Times(0); - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); - - // 5 milliseconds later we have enough budget to send some padding. - EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250)); - EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); } -TEST_F(PacingControllerTest, NoPaddingBeforeNormalPacket) { +TEST_P(PacingControllerTest, NoPaddingBeforeNormalPacket) { pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); EXPECT_CALL(callback_, SendPadding).Times(0); + pacer_->ProcessPackets(); clock_.AdvanceTime(TimeUntilNextProcess()); @@ -589,12 +781,24 @@ TEST_F(PacingControllerTest, NoPaddingBeforeNormalPacket) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, capture_time_ms, 250); - EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250)); + EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) { + return padding; + }); EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); // Media. + AdvanceTimeAndProcess(); // Padding. + } } -TEST_F(PacingControllerTest, VerifyPaddingUpToBitrate) { +TEST_P(PacingControllerTest, VerifyPaddingUpToBitrate) { + if (!PeriodicProcess()) { + // Already tested in PacingControllerTest.Padding. + return; + } + uint32_t ssrc = 12345; uint16_t sequence_number = 1234; int64_t capture_time_ms = 56789; @@ -613,58 +817,62 @@ TEST_F(PacingControllerTest, VerifyPaddingUpToBitrate) { } } -TEST_F(PacingControllerTest, VerifyAverageBitrateVaryingMediaPayload) { +TEST_P(PacingControllerTest, VerifyAverageBitrateVaryingMediaPayload) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; int64_t capture_time_ms = 56789; const int kTimeStep = 5; - const int64_t kBitrateWindow = 10000; + const TimeDelta kAveragingWindowLength = TimeDelta::seconds(10); PacingControllerPadding callback; - pacer_ = - std::make_unique(&clock_, &callback, nullptr, nullptr); + pacer_ = std::make_unique(&clock_, &callback, nullptr, + nullptr, GetParam()); pacer_->SetProbingEnabled(false); pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); - int64_t start_time = clock_.TimeInMilliseconds(); + Timestamp start_time = clock_.CurrentTime(); size_t media_bytes = 0; - while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { + while (clock_.CurrentTime() - start_time < kAveragingWindowLength) { + // Maybe add some new media packets corresponding to expected send rate. int rand_value = rand(); // NOLINT (rand_r instead of rand) - size_t media_payload = rand_value % 100 + 200; // [200, 300] bytes. - Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, - capture_time_ms, media_payload); - media_bytes += media_payload; - clock_.AdvanceTimeMilliseconds(kTimeStep); - pacer_->ProcessPackets(); + while ( + media_bytes < + (kTargetRate * (clock_.CurrentTime() - start_time)).bytes()) { + size_t media_payload = rand_value % 400 + 800; // [400, 1200] bytes. + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + capture_time_ms, media_payload); + media_bytes += media_payload; + } + + if (PeriodicProcess()) { + clock_.AdvanceTimeMilliseconds(kTimeStep); + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); + } } - EXPECT_NEAR(kTargetRate.kbps(), - static_cast(8 * (media_bytes + callback.padding_sent()) / - kBitrateWindow), - 1); + + EXPECT_NEAR( + kTargetRate.bps(), + (DataSize::bytes(callback.total_bytes_sent()) / kAveragingWindowLength) + .bps(), + (kTargetRate * 0.01 /* 1% error marging */).bps()); } -TEST_F(PacingControllerTest, Priority) { +TEST_P(PacingControllerTest, Priority) { uint32_t ssrc_low_priority = 12345; uint32_t ssrc = 12346; uint16_t sequence_number = 1234; int64_t capture_time_ms = 56789; int64_t capture_time_ms_low_priority = 1234567; - // Due to the multiplicative factor we can send 5 packets during a send - // interval. (network capacity * multiplier / (8 bits per byte * - // (packet size * #send intervals per second) - const size_t packets_to_send_per_interval = - kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); - for (size_t i = 0; i < packets_to_send_per_interval; ++i) { - SendAndExpectPacket(RtpPacketToSend::Type::kRetransmission, ssrc, - sequence_number++, clock_.TimeInMilliseconds(), 250); - } - pacer_->ProcessPackets(); - EXPECT_EQ(0u, pacer_->QueueSizePackets()); + ConsumeInitialBudget(); // Expect normal and low priority to be queued and high to pass through. Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++, capture_time_ms_low_priority, 250); + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); for (size_t i = 0; i < packets_to_send_per_interval; ++i) { Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++, capture_time_ms, 250); @@ -677,19 +885,29 @@ TEST_F(PacingControllerTest, Priority) { EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, _, _)) .Times(packets_to_send_per_interval + 1); - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + while (pacer_->QueueSizePackets() > 1) { + AdvanceTimeAndProcess(); + } + } + EXPECT_EQ(1u, pacer_->QueueSizePackets()); EXPECT_CALL(callback_, SendPacket(ssrc_low_priority, _, capture_time_ms_low_priority, _, _)) .Times(1); - - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); + } } -TEST_F(PacingControllerTest, RetransmissionPriority) { +TEST_P(PacingControllerTest, RetransmissionPriority) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; int64_t capture_time_ms = 45678; @@ -720,8 +938,14 @@ TEST_F(PacingControllerTest, RetransmissionPriority) { SendPacket(ssrc, _, capture_time_ms_retransmission, true, _)) .Times(packets_to_send_per_interval); - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + while (pacer_->QueueSizePackets() > packets_to_send_per_interval) { + AdvanceTimeAndProcess(); + } + } EXPECT_EQ(packets_to_send_per_interval, pacer_->QueueSizePackets()); // Expect the remaining (non-retransmission) packets to be sent. @@ -730,48 +954,65 @@ TEST_F(PacingControllerTest, RetransmissionPriority) { EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, false, _)) .Times(packets_to_send_per_interval); - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + while (pacer_->QueueSizePackets() > 0) { + AdvanceTimeAndProcess(); + } + } EXPECT_EQ(0u, pacer_->QueueSizePackets()); } -TEST_F(PacingControllerTest, HighPrioDoesntAffectBudget) { +TEST_P(PacingControllerTest, HighPrioDoesntAffectBudget) { + const size_t kPacketSize = 250; uint32_t ssrc = 12346; uint16_t sequence_number = 1234; int64_t capture_time_ms = 56789; // As high prio packets doesn't affect the budget, we should be able to send // a high number of them at once. - for (int i = 0; i < 25; ++i) { + const size_t kNumAudioPackets = 25; + for (size_t i = 0; i < kNumAudioPackets; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kAudio, ssrc, sequence_number++, - capture_time_ms, 250); + capture_time_ms, kPacketSize); } pacer_->ProcessPackets(); // Low prio packets does affect the budget. // Due to the multiplicative factor we can send 5 packets during a send // interval. (network capacity * multiplier / (8 bits per byte * // (packet size * #send intervals per second) - const size_t packets_to_send_per_interval = - kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); - for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + const size_t kPacketsToSendPerInterval = + kTargetRate.bps() * kPaceMultiplier / (8 * kPacketSize * 200); + for (size_t i = 0; i < kPacketsToSendPerInterval; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), 250); + clock_.TimeInMilliseconds(), kPacketSize); } - Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, capture_time_ms, - 250); - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); - EXPECT_EQ(1u, pacer_->QueueSizePackets()); - EXPECT_CALL(callback_, - SendPacket(ssrc, sequence_number++, capture_time_ms, false, _)) - .Times(1); - clock_.AdvanceTime(TimeUntilNextProcess()); - pacer_->ProcessPackets(); - EXPECT_EQ(0u, pacer_->QueueSizePackets()); + + // Send all packets and measure pace time. + Timestamp start_time = clock_.CurrentTime(); + while (pacer_->QueueSizePackets() > 0) { + if (PeriodicProcess()) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); + } + } + + // Measure pacing time. Expect only low-prio packets to affect this. + TimeDelta pacing_time = clock_.CurrentTime() - start_time; + TimeDelta expected_pacing_time = + DataSize::bytes(kPacketsToSendPerInterval * kPacketSize) / + (kTargetRate * kPaceMultiplier); + EXPECT_NEAR(pacing_time.us(), expected_pacing_time.us(), + PeriodicProcess() ? 5000.0 + : PacingController::kMinSleepTime.us()); } -TEST_F(PacingControllerTest, SendsOnlyPaddingWhenCongested) { +TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) { uint32_t ssrc = 202020; uint16_t sequence_number = 1000; int kPacketSize = 250; @@ -784,8 +1025,7 @@ TEST_F(PacingControllerTest, SendsOnlyPaddingWhenCongested) { sent_data += kPacketSize; SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), kPacketSize); - clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); + AdvanceTimeAndProcess(); } ::testing::Mock::VerifyAndClearExpectations(&callback_); EXPECT_CALL(callback_, SendPacket).Times(0); @@ -809,7 +1049,7 @@ TEST_F(PacingControllerTest, SendsOnlyPaddingWhenCongested) { EXPECT_EQ(blocked_packets, pacer_->QueueSizePackets()); } -TEST_F(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { +TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { uint32_t ssrc = 202020; uint16_t seq_num = 1000; int size = 1000; @@ -836,13 +1076,12 @@ TEST_F(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { EXPECT_CALL(callback_, SendPacket).Times(0); clock_.AdvanceTimeMilliseconds(5); pacer_->ProcessPackets(); - pacer_->UpdateOutstandingData(DataSize::Zero()); // Congestion removed and budget has recovered, packet is sent. Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(1); clock_.AdvanceTimeMilliseconds(5); - pacer_->ProcessPackets(); pacer_->UpdateOutstandingData(DataSize::Zero()); + pacer_->ProcessPackets(); // Should be blocked due to budget limitation as congestion has be removed. Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(0); @@ -850,7 +1089,7 @@ TEST_F(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { pacer_->ProcessPackets(); } -TEST_F(PacingControllerTest, ResumesSendingWhenCongestionEnds) { +TEST_P(PacingControllerTest, ResumesSendingWhenCongestionEnds) { uint32_t ssrc = 202020; uint16_t sequence_number = 1000; int64_t kPacketSize = 250; @@ -905,29 +1144,21 @@ TEST_F(PacingControllerTest, ResumesSendingWhenCongestionEnds) { } } -TEST_F(PacingControllerTest, Pause) { +TEST_P(PacingControllerTest, Pause) { uint32_t ssrc_low_priority = 12345; uint32_t ssrc = 12346; uint32_t ssrc_high_priority = 12347; uint16_t sequence_number = 1234; - int64_t capture_time_ms = clock_.TimeInMilliseconds(); EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); - // Due to the multiplicative factor we can send 5 packets during a send - // interval. (network capacity * multiplier / (8 bits per byte * - // (packet size * #send intervals per second) - const size_t packets_to_send_per_interval = - kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); - for (size_t i = 0; i < packets_to_send_per_interval; ++i) { - SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, - clock_.TimeInMilliseconds(), 250); - } - - pacer_->ProcessPackets(); + ConsumeInitialBudget(); pacer_->Pause(); + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); for (size_t i = 0; i < packets_to_send_per_interval; ++i) { Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++, capture_time_ms, 250); @@ -951,22 +1182,30 @@ TEST_F(PacingControllerTest, Pause) { EXPECT_EQ(TimeDelta::ms(second_capture_time_ms - capture_time_ms), pacer_->OldestPacketWaitTime()); - EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); + // Process triggers keep-alive packet. + EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) { + return padding; + }); EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); pacer_->ProcessPackets(); - int64_t expected_time_until_send = 500; + // Verify no packets sent for the rest of the paused process interval. + const TimeDelta kProcessInterval = TimeDelta::ms(5); + TimeDelta expected_time_until_send = PacingController::kPausedProcessInterval; EXPECT_CALL(callback_, SendPadding).Times(0); - while (expected_time_until_send >= 5) { + while (expected_time_until_send >= kProcessInterval) { pacer_->ProcessPackets(); - clock_.AdvanceTimeMilliseconds(5); - expected_time_until_send -= 5; + clock_.AdvanceTime(kProcessInterval); + expected_time_until_send -= kProcessInterval; } + // New keep-alive packet. ::testing::Mock::VerifyAndClearExpectations(&callback_); - EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); + EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) { + return padding; + }); EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); - clock_.AdvanceTimeMilliseconds(5); + clock_.AdvanceTime(kProcessInterval); pacer_->ProcessPackets(); ::testing::Mock::VerifyAndClearExpectations(&callback_); @@ -1002,19 +1241,66 @@ TEST_F(PacingControllerTest, Pause) { } pacer_->Resume(); - // The pacer was resumed directly after the previous process call finished. It - // will therefore wait 5 ms until next process. - clock_.AdvanceTime(TimeUntilNextProcess()); - - for (size_t i = 0; i < 4; i++) { - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + // The pacer was resumed directly after the previous process call finished. + // It will therefore wait 5 ms until next process. clock_.AdvanceTime(TimeUntilNextProcess()); + + for (size_t i = 0; i < 4; i++) { + pacer_->ProcessPackets(); + clock_.AdvanceTime(TimeUntilNextProcess()); + } + } else { + while (pacer_->QueueSizePackets() > 0) { + AdvanceTimeAndProcess(); + } } EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); } -TEST_F(PacingControllerTest, ExpectedQueueTimeMs) { +TEST_P(PacingControllerTest, InactiveFromStart) { + // Recreate the pacer without the inital time forwarding. + pacer_ = std::make_unique(&clock_, &callback_, nullptr, + nullptr, GetParam()); + pacer_->SetProbingEnabled(false); + pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); + + if (PeriodicProcess()) { + // In period mode, pause the pacer to check the same idle behavior as + // dynamic. + pacer_->Pause(); + } + + // No packets sent, there should be no keep-alives sent either. + EXPECT_CALL(callback_, SendPadding).Times(0); + EXPECT_CALL(callback_, SendPacket).Times(0); + pacer_->ProcessPackets(); + + const Timestamp start_time = clock_.CurrentTime(); + + // Determine the margin need so we can advance to the last possible moment + // that will not cause a process event. + const TimeDelta time_margin = + (GetParam() == PacingController::ProcessMode::kDynamic + ? PacingController::kMinSleepTime + : TimeDelta::Zero()) + + TimeDelta::us(1); + + EXPECT_EQ(pacer_->NextSendTime() - start_time, + PacingController::kPausedProcessInterval); + clock_.AdvanceTime(PacingController::kPausedProcessInterval - time_margin); + pacer_->ProcessPackets(); + EXPECT_EQ(pacer_->NextSendTime() - start_time, + PacingController::kPausedProcessInterval); + + clock_.AdvanceTime(time_margin); + pacer_->ProcessPackets(); + EXPECT_EQ(pacer_->NextSendTime() - start_time, + 2 * PacingController::kPausedProcessInterval); +} + +TEST_P(PacingControllerTest, ExpectedQueueTimeMs) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; const size_t kNumPackets = 60; @@ -1050,7 +1336,7 @@ TEST_F(PacingControllerTest, ExpectedQueueTimeMs) { TimeDelta::ms(1000 * kPacketSize * 8 / kMaxBitrate)); } -TEST_F(PacingControllerTest, QueueTimeGrowsOverTime) { +TEST_P(PacingControllerTest, QueueTimeGrowsOverTime) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); @@ -1066,7 +1352,7 @@ TEST_F(PacingControllerTest, QueueTimeGrowsOverTime) { EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); } -TEST_F(PacingControllerTest, ProbingWithInsertedPackets) { +TEST_P(PacingControllerTest, ProbingWithInsertedPackets) { const size_t kPacketSize = 1200; const int kInitialBitrateBps = 300000; uint32_t ssrc = 12346; @@ -1074,7 +1360,7 @@ TEST_F(PacingControllerTest, ProbingWithInsertedPackets) { PacingControllerProbing packet_sender; pacer_ = std::make_unique(&clock_, &packet_sender, nullptr, - nullptr); + nullptr, GetParam()); pacer_->CreateProbeCluster(kFirstClusterRate, /*cluster_id=*/0); pacer_->CreateProbeCluster(kSecondClusterRate, @@ -1113,7 +1399,7 @@ TEST_F(PacingControllerTest, ProbingWithInsertedPackets) { kSecondClusterRate.bps(), kProbingErrorMargin.bps()); } -TEST_F(PacingControllerTest, SkipsProbesWhenProcessIntervalTooLarge) { +TEST_P(PacingControllerTest, SkipsProbesWhenProcessIntervalTooLarge) { const size_t kPacketSize = 1200; const int kInitialBitrateBps = 300000; uint32_t ssrc = 12346; @@ -1121,7 +1407,7 @@ TEST_F(PacingControllerTest, SkipsProbesWhenProcessIntervalTooLarge) { PacingControllerProbing packet_sender; pacer_ = std::make_unique(&clock_, &packet_sender, nullptr, - nullptr); + nullptr, GetParam()); pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier), DataRate::Zero()); @@ -1159,22 +1445,22 @@ TEST_F(PacingControllerTest, SkipsProbesWhenProcessIntervalTooLarge) { EXPECT_EQ(packet_sender.packets_sent(), packets_sent_before_probe + 2); // We're exactly where we should be for the next probe. - EXPECT_TRUE(pacer_->NextProbeTime().IsFinite()); + const Timestamp probe_time = clock_.CurrentTime(); + EXPECT_EQ(pacer_->NextSendTime(), clock_.CurrentTime()); FieldTrialBasedConfig field_trial_config; BitrateProberConfig probing_config(&field_trial_config); EXPECT_GT(probing_config.max_probe_delay.Get(), TimeDelta::Zero()); - - // Advance to within max probe delay. + // Advance to within max probe delay, should still return same target. clock_.AdvanceTime(probing_config.max_probe_delay.Get()); - EXPECT_TRUE(pacer_->NextProbeTime().IsFinite()); + EXPECT_EQ(pacer_->NextSendTime(), probe_time); // Too high probe delay, drop it! clock_.AdvanceTime(TimeDelta::us(1)); - EXPECT_EQ(pacer_->NextProbeTime(), Timestamp::PlusInfinity()); + EXPECT_GT(pacer_->NextSendTime(), probe_time); } -TEST_F(PacingControllerTest, ProbingWithPaddingSupport) { +TEST_P(PacingControllerTest, ProbingWithPaddingSupport) { const size_t kPacketSize = 1200; const int kInitialBitrateBps = 300000; uint32_t ssrc = 12346; @@ -1182,7 +1468,7 @@ TEST_F(PacingControllerTest, ProbingWithPaddingSupport) { PacingControllerProbing packet_sender; pacer_ = std::make_unique(&clock_, &packet_sender, nullptr, - nullptr); + nullptr, GetParam()); pacer_->CreateProbeCluster(kFirstClusterRate, /*cluster_id=*/0); pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier), @@ -1211,11 +1497,12 @@ TEST_F(PacingControllerTest, ProbingWithPaddingSupport) { kFirstClusterRate.bps(), kProbingErrorMargin.bps()); } -TEST_F(PacingControllerTest, PaddingOveruse) { +TEST_P(PacingControllerTest, PaddingOveruse) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; const size_t kPacketSize = 1200; + // Initially no padding rate. pacer_->ProcessPackets(); pacer_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier), DataRate::Zero()); @@ -1235,14 +1522,18 @@ TEST_F(PacingControllerTest, PaddingOveruse) { EXPECT_LT(TimeDelta::ms(5), pacer_->ExpectedQueueTime()); // Don't send padding if queue is non-empty, even if padding budget > 0. EXPECT_CALL(callback_, SendPadding).Times(0); - pacer_->ProcessPackets(); + if (PeriodicProcess()) { + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); + } } -TEST_F(PacingControllerTest, ProbeClusterId) { +TEST_P(PacingControllerTest, ProbeClusterId) { MockPacketSender callback; - pacer_ = - std::make_unique(&clock_, &callback, nullptr, nullptr); + pacer_ = std::make_unique(&clock_, &callback, nullptr, + nullptr, GetParam()); Init(); uint32_t ssrc = 12346; @@ -1262,8 +1553,7 @@ TEST_F(PacingControllerTest, ProbeClusterId) { .Times(5); for (int i = 0; i < 5; ++i) { - clock_.AdvanceTimeMilliseconds(20); - pacer_->ProcessPackets(); + AdvanceTimeAndProcess(); } // Second probing cluster. @@ -1272,8 +1562,7 @@ TEST_F(PacingControllerTest, ProbeClusterId) { .Times(5); for (int i = 0; i < 5; ++i) { - clock_.AdvanceTimeMilliseconds(20); - pacer_->ProcessPackets(); + AdvanceTimeAndProcess(); } // Needed for the Field comparer below. @@ -1286,17 +1575,22 @@ TEST_F(PacingControllerTest, ProbeClusterId) { clock_.TimeInMilliseconds(), padding_size.bytes())); return padding_packets; }); - EXPECT_CALL( - callback, - SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, kNotAProbe))) - .Times(1); - pacer_->ProcessPackets(); + bool non_probe_packet_seen = false; + EXPECT_CALL(callback, SendRtpPacket) + .WillOnce([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + EXPECT_EQ(cluster_info.probe_cluster_id, kNotAProbe); + non_probe_packet_seen = true; + }); + while (!non_probe_packet_seen) { + AdvanceTimeAndProcess(); + } } -TEST_F(PacingControllerTest, OwnedPacketPrioritizedOnType) { +TEST_P(PacingControllerTest, OwnedPacketPrioritizedOnType) { MockPacketSender callback; - pacer_ = - std::make_unique(&clock_, &callback, nullptr, nullptr); + pacer_ = std::make_unique(&clock_, &callback, nullptr, + nullptr, GetParam()); Init(); // Insert a packet of each type, from low to high priority. Since priority @@ -1331,15 +1625,21 @@ TEST_F(PacingControllerTest, OwnedPacketPrioritizedOnType) { SendRtpPacket( Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); - clock_.AdvanceTimeMilliseconds(200); - pacer_->ProcessPackets(); + while (pacer_->QueueSizePackets() > 0) { + if (PeriodicProcess()) { + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + } else { + AdvanceTimeAndProcess(); + } + } } -TEST_F(PacingControllerTest, SmallFirstProbePacket) { +TEST_P(PacingControllerTest, SmallFirstProbePacket) { ScopedFieldTrials trial("WebRTC-Pacer-SmallFirstProbePacket/Enabled/"); MockPacketSender callback; - pacer_ = - std::make_unique(&clock_, &callback, nullptr, nullptr); + pacer_ = std::make_unique(&clock_, &callback, nullptr, + nullptr, GetParam()); pacer_->CreateProbeCluster(kFirstClusterRate, /*cluster_id=*/0); pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, DataRate::Zero()); @@ -1376,5 +1676,95 @@ TEST_F(PacingControllerTest, SmallFirstProbePacket) { clock_.AdvanceTimeMilliseconds(5); } } + +TEST_P(PacingControllerTest, TaskEarly) { + if (PeriodicProcess()) { + // This test applies only when NOT using interval budget. + return; + } + + // Set a low send rate to more easily test timing issues. + DataRate kSendRate = DataRate::kbps(30); + pacer_->SetPacingRates(kSendRate, DataRate::Zero()); + + // Add two packets. + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + + // Process packets, only first should be sent. + EXPECT_CALL(callback_, SendPacket).Times(1); + pacer_->ProcessPackets(); + + Timestamp next_send_time = pacer_->NextSendTime(); + + // Packets won't be sent if we try process more than one sleep time early. + ASSERT_GT(next_send_time - clock_.CurrentTime(), + PacingController::kMinSleepTime); + clock_.AdvanceTime(next_send_time - clock_.CurrentTime() - + (PacingController::kMinSleepTime + TimeDelta::ms(1))); + + EXPECT_CALL(callback_, SendPacket).Times(0); + pacer_->ProcessPackets(); + + // Assume timing is accurate within +-100us due to rounding. + const TimeDelta kErrorMargin = TimeDelta::us(100); + + // Check that next scheduled send time is still the same (within margin). + EXPECT_LT((pacer_->NextSendTime() - next_send_time).Abs(), kErrorMargin); + + // Advance to within error margin for execution. + clock_.AdvanceTime(TimeDelta::ms(1) + kErrorMargin); + EXPECT_CALL(callback_, SendPacket).Times(1); + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, TaskLate) { + if (PeriodicProcess()) { + // This test applies only when NOT using interval budget. + return; + } + + // Set a low send rate to more easily test timing issues. + DataRate kSendRate = DataRate::kbps(30); + pacer_->SetPacingRates(kSendRate, DataRate::Zero()); + + // Add four packets of equal size and priority. + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); + + // Process packets, only first should be sent. + EXPECT_CALL(callback_, SendPacket).Times(1); + pacer_->ProcessPackets(); + + Timestamp next_send_time = pacer_->NextSendTime(); + const TimeDelta time_between_packets = next_send_time - clock_.CurrentTime(); + + // Simulate a late process call, executed just before we allow sending the + // fourth packet. + clock_.AdvanceTime((time_between_packets * 3) - + (PacingController::kMinSleepTime + TimeDelta::ms(1))); + + EXPECT_CALL(callback_, SendPacket).Times(2); + pacer_->ProcessPackets(); + + // Check that next scheduled send time is within sleep-time + 1ms. + next_send_time = pacer_->NextSendTime(); + EXPECT_LE(next_send_time - clock_.CurrentTime(), + PacingController::kMinSleepTime + TimeDelta::ms(1)); + + // Advance to within error margin for execution. + clock_.AdvanceTime(TimeDelta::ms(1)); + EXPECT_CALL(callback_, SendPacket).Times(1); + pacer_->ProcessPackets(); +} + +INSTANTIATE_TEST_SUITE_P( + WithAndWithoutIntervalBudget, + PacingControllerTest, + ::testing::Values(PacingController::ProcessMode::kPeriodic, + PacingController::ProcessMode::kDynamic)); + } // namespace test } // namespace webrtc diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index 7b5eb9e304..02e9cd75a0 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -216,6 +216,17 @@ DataSize RoundRobinPacketQueue::Size() const { return size_; } +bool RoundRobinPacketQueue::NextPacketIsAudio() const { + if (stream_priorities_.empty()) { + return false; + } + uint32_t ssrc = stream_priorities_.begin()->second; + + auto stream_info_it = streams_.find(ssrc); + return stream_info_it->second.packet_queue.top().type() == + RtpPacketToSend::Type::kAudio; +} + Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const { if (Empty()) return Timestamp::MinusInfinity(); diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index abb6e3a46d..dcd25ad2ee 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -115,6 +115,7 @@ class RoundRobinPacketQueue { bool Empty() const; size_t SizeInPackets() const; DataSize Size() const; + bool NextPacketIsAudio() const; Timestamp OldestEnqueueTime() const; TimeDelta AverageQueueTime() const;