diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index ca33b12b75..bb39f1f553 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -19,6 +19,8 @@ rtc_static_library("pacing") { "bitrate_prober.h", "paced_sender.cc", "paced_sender.h", + "pacing_controller.cc", + "pacing_controller.h", "packet_router.cc", "packet_router.h", "round_robin_packet_queue.cc", @@ -75,11 +77,13 @@ if (rtc_include_tests) { "bitrate_prober_unittest.cc", "interval_budget_unittest.cc", "paced_sender_unittest.cc", + "pacing_controller_unittest.cc", "packet_router_unittest.cc", ] deps = [ ":interval_budget", ":pacing", + "../../api/units:data_rate", "../../api/units:time_delta", "../../rtc_base:checks", "../../rtc_base:rtc_base_approved", diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 25a15a129b..665b070339 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -16,8 +16,6 @@ #include "absl/memory/memory.h" #include "api/rtc_event_log/rtc_event_log.h" -#include "modules/pacing/bitrate_prober.h" -#include "modules/pacing/interval_budget.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" @@ -25,50 +23,6 @@ #include "system_wrappers/include/clock.h" namespace webrtc { -namespace { -// Time limit in milliseconds between packet bursts. -constexpr TimeDelta kDefaultMinPacketLimit = TimeDelta::Millis<5>(); -constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis<500>(); -constexpr TimeDelta kPausedProcessInterval = kCongestedPacketInterval; -constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds<2>(); - -// Upper cap on process interval, in case process has not been called in a long -// time. -constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis<30>(); - -bool IsDisabled(const WebRtcKeyValueConfig& field_trials, - absl::string_view key) { - return field_trials.Lookup(key).find("Disabled") == 0; -} - -bool IsEnabled(const WebRtcKeyValueConfig& field_trials, - absl::string_view key) { - return field_trials.Lookup(key).find("Enabled") == 0; -} - -int GetPriorityForType(RtpPacketToSend::Type type) { - switch (type) { - case RtpPacketToSend::Type::kAudio: - // Audio is always prioritized over other packet types. - return 0; - case RtpPacketToSend::Type::kRetransmission: - // Send retransmissions before new media. - return 1; - case RtpPacketToSend::Type::kVideo: - // Video has "normal" priority, in the old speak. - return 2; - case RtpPacketToSend::Type::kForwardErrorCorrection: - // Send redundancy concurrently to video. If it is delayed it might have a - // lower chance of being useful. - return 2; - case RtpPacketToSend::Type::kPadding: - // Packets that are in themselves likely useless, only sent to keep the - // BWE high. - return 3; - } -} - -} // namespace const int64_t PacedSender::kMaxQueueLengthMs = 2000; const float PacedSender::kDefaultPaceMultiplier = 2.5f; @@ -76,60 +30,24 @@ PacedSender::PacedSender(Clock* clock, PacketRouter* packet_router, RtcEventLog* event_log, const WebRtcKeyValueConfig* field_trials) - : clock_(clock), + : pacing_controller_(clock, + static_cast(this), + event_log, + field_trials), packet_router_(packet_router), - fallback_field_trials_( - !field_trials ? absl::make_unique() : nullptr), - field_trials_(field_trials ? field_trials : fallback_field_trials_.get()), - drain_large_queues_( - !IsDisabled(*field_trials_, "WebRTC-Pacer-DrainQueue")), - send_padding_if_silent_( - IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")), - pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")), - min_packet_limit_(kDefaultMinPacketLimit), - last_timestamp_(clock_->CurrentTime()), - paused_(false), - media_budget_(0), - padding_budget_(0), - prober_(*field_trials_), - probing_send_failure_(false), - pacing_bitrate_(DataRate::Zero()), - time_last_process_(clock->CurrentTime()), - last_send_time_(time_last_process_), - packets_(time_last_process_, field_trials), - packet_counter_(0), - congestion_window_size_(DataSize::PlusInfinity()), - outstanding_data_(DataSize::Zero()), - process_thread_(nullptr), - queue_time_limit(TimeDelta::ms(kMaxQueueLengthMs)), - account_for_audio_(false), - legacy_packet_referencing_( - IsEnabled(*field_trials_, "WebRTC-Pacer-LegacyPacketReferencing")) { - if (!drain_large_queues_) { - RTC_LOG(LS_WARNING) << "Pacer queues will not be drained," - "pushback experiment must be enabled."; - } - FieldTrialParameter min_packet_limit_ms("", min_packet_limit_.ms()); - ParseFieldTrial({&min_packet_limit_ms}, - field_trials_->Lookup("WebRTC-Pacer-MinPacketLimitMs")); - min_packet_limit_ = TimeDelta::ms(min_packet_limit_ms.Get()); - UpdateBudgetWithElapsedTime(min_packet_limit_); -} + process_thread_(nullptr) {} -PacedSender::~PacedSender() {} +PacedSender::~PacedSender() = default; void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) { rtc::CritScope cs(&critsect_); - prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id); + return pacing_controller_.CreateProbeCluster(bitrate, cluster_id); } void PacedSender::Pause() { { rtc::CritScope cs(&critsect_); - if (!paused_) - RTC_LOG(LS_INFO) << "PacedSender paused."; - paused_ = true; - packets_.SetPauseState(true, CurrentTime()); + pacing_controller_.Pause(); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to get @@ -141,10 +59,7 @@ void PacedSender::Pause() { void PacedSender::Resume() { { rtc::CritScope cs(&critsect_); - if (paused_) - RTC_LOG(LS_INFO) << "PacedSender resumed."; - paused_ = false; - packets_.SetPauseState(false, CurrentTime()); + pacing_controller_.Resume(); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to @@ -155,49 +70,22 @@ void PacedSender::Resume() { void PacedSender::SetCongestionWindow(DataSize congestion_window_size) { rtc::CritScope cs(&critsect_); - congestion_window_size_ = congestion_window_size; + pacing_controller_.SetCongestionWindow(congestion_window_size); } void PacedSender::UpdateOutstandingData(DataSize outstanding_data) { rtc::CritScope cs(&critsect_); - outstanding_data_ = outstanding_data; -} - -bool PacedSender::Congested() const { - if (congestion_window_size_.IsFinite()) { - return outstanding_data_ >= congestion_window_size_; - } - return false; -} - -Timestamp PacedSender::CurrentTime() const { - Timestamp time = clock_->CurrentTime(); - if (time < last_timestamp_) { - RTC_LOG(LS_WARNING) - << "Non-monotonic clock behavior observed. Previous timestamp: " - << last_timestamp_.ms() << ", new timestamp: " << time.ms(); - RTC_DCHECK_GE(time, last_timestamp_); - time = last_timestamp_; - } - last_timestamp_ = time; - return time; + pacing_controller_.UpdateOutstandingData(outstanding_data); } void PacedSender::SetProbingEnabled(bool enabled) { rtc::CritScope cs(&critsect_); - RTC_CHECK_EQ(0, packet_counter_); - prober_.SetEnabled(enabled); + pacing_controller_.SetProbingEnabled(enabled); } void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) { rtc::CritScope cs(&critsect_); - RTC_DCHECK_GT(pacing_rate, DataRate::Zero()); - pacing_bitrate_ = pacing_rate; - padding_budget_.set_target_rate_kbps(padding_rate.kbps()); - - RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" - << pacing_bitrate_.kbps() - << " padding_budget_kbps=" << padding_rate.kbps(); + pacing_controller_.SetPacingRates(pacing_rate, padding_rate); } void PacedSender::InsertPacket(RtpPacketSender::Priority priority, @@ -207,288 +95,69 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, size_t bytes, bool retransmission) { rtc::CritScope cs(&critsect_); - RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) - << "SetPacingRate must be called before InsertPacket."; - - Timestamp now = CurrentTime(); - prober_.OnIncomingPacket(bytes); - - if (capture_time_ms < 0) - capture_time_ms = now.ms(); - - RtpPacketToSend::Type type; - switch (priority) { - case RtpPacketSender::kHighPriority: - type = RtpPacketToSend::Type::kAudio; - break; - case RtpPacketSender::kNormalPriority: - type = RtpPacketToSend::Type::kRetransmission; - break; - default: - type = RtpPacketToSend::Type::kVideo; - } - packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number, - capture_time_ms, now, DataSize::bytes(bytes), retransmission, - packet_counter_++); + pacing_controller_.InsertPacket(priority, ssrc, sequence_number, + capture_time_ms, bytes, retransmission); } void PacedSender::EnqueuePacket(std::unique_ptr packet) { rtc::CritScope cs(&critsect_); - RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) - << "SetPacingRate must be called before InsertPacket."; - - Timestamp now = CurrentTime(); - prober_.OnIncomingPacket(packet->payload_size()); - - if (packet->capture_time_ms() < 0) { - packet->set_capture_time_ms(now.ms()); - } - - RTC_CHECK(packet->packet_type()); - int priority = GetPriorityForType(*packet->packet_type()); - packets_.Push(priority, now, packet_counter_++, std::move(packet)); + pacing_controller_.EnqueuePacket(std::move(packet)); } void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { rtc::CritScope cs(&critsect_); - account_for_audio_ = account_for_audio; + pacing_controller_.SetAccountForAudioPackets(account_for_audio); } TimeDelta PacedSender::ExpectedQueueTime() const { rtc::CritScope cs(&critsect_); - RTC_DCHECK_GT(pacing_bitrate_, DataRate::Zero()); - return TimeDelta::ms( - (QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) / - pacing_bitrate_.bps()); + return pacing_controller_.ExpectedQueueTime(); } size_t PacedSender::QueueSizePackets() const { rtc::CritScope cs(&critsect_); - return packets_.SizeInPackets(); + return pacing_controller_.QueueSizePackets(); } DataSize PacedSender::QueueSizeData() const { rtc::CritScope cs(&critsect_); - return packets_.Size(); + return pacing_controller_.QueueSizeData(); } absl::optional PacedSender::FirstSentPacketTime() const { rtc::CritScope cs(&critsect_); - return first_sent_packet_time_; + return pacing_controller_.FirstSentPacketTime(); } TimeDelta PacedSender::OldestPacketWaitTime() const { rtc::CritScope cs(&critsect_); - Timestamp oldest_packet = packets_.OldestEnqueueTime(); - if (oldest_packet.IsInfinite()) { - return TimeDelta::Zero(); - } - - return CurrentTime() - oldest_packet; + return pacing_controller_.OldestPacketWaitTime(); } int64_t PacedSender::TimeUntilNextProcess() { rtc::CritScope cs(&critsect_); - TimeDelta elapsed_time = CurrentTime() - time_last_process_; + // When paused we wake up every 500 ms to send a padding packet to ensure // we won't get stuck in the paused state due to no feedback being received. - if (paused_) { - return std::max(kPausedProcessInterval - elapsed_time, TimeDelta::Zero()) + TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess(); + if (pacing_controller_.IsPaused()) { + return std::max(PacingController::kPausedProcessInterval - elapsed_time, + TimeDelta::Zero()) .ms(); } - if (prober_.IsProbing()) { - int64_t ret = prober_.TimeUntilNextProbe(CurrentTime().ms()); - if (ret > 0 || (ret == 0 && !probing_send_failure_)) - return ret; + auto next_probe = pacing_controller_.TimeUntilNextProbe(); + if (next_probe) { + return next_probe->ms(); } - return std::max(min_packet_limit_ - elapsed_time, TimeDelta::Zero()).ms(); -} -TimeDelta PacedSender::UpdateTimeAndGetElapsed(Timestamp now) { - TimeDelta elapsed_time = now - time_last_process_; - time_last_process_ = now; - if (elapsed_time > kMaxElapsedTime) { - RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time.ms() - << " ms) longer than expected, limiting to " - << kMaxElapsedTime.ms(); - elapsed_time = kMaxElapsedTime; - } - return elapsed_time; -} - -bool PacedSender::ShouldSendKeepalive(Timestamp now) const { - if (send_padding_if_silent_ || paused_ || Congested()) { - // We send a padding packet every 500 ms to ensure we won't get stuck in - // congested state due to no feedback being received. - TimeDelta elapsed_since_last_send = now - last_send_time_; - if (elapsed_since_last_send >= kCongestedPacketInterval) { - // We can not send padding unless a normal packet has first been sent. If - // we do, timestamps get messed up. - if (packet_counter_ > 0) { - return true; - } - } - } - return false; + const TimeDelta min_packet_limit = TimeDelta::ms(5); + return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms(); } void PacedSender::Process() { rtc::CritScope cs(&critsect_); - Timestamp now = CurrentTime(); - TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); - if (ShouldSendKeepalive(now)) { - if (legacy_packet_referencing_) { - critsect_.Leave(); - size_t bytes_sent = - packet_router_->TimeToSendPadding(1, PacedPacketInfo()); - critsect_.Enter(); - OnPaddingSent(DataSize::bytes(bytes_sent)); - } else { - DataSize keepalive_data_sent = DataSize::Zero(); - critsect_.Leave(); - std::vector> keepalive_packets = - packet_router_->GeneratePadding(1); - for (auto& packet : keepalive_packets) { - keepalive_data_sent += - DataSize::bytes(packet->payload_size() + packet->padding_size()); - packet_router_->SendPacket(std::move(packet), PacedPacketInfo()); - } - critsect_.Enter(); - OnPaddingSent(keepalive_data_sent); - } - } - - if (paused_) - return; - - if (elapsed_time > TimeDelta::Zero()) { - DataRate target_rate = pacing_bitrate_; - DataSize queue_size_data = packets_.Size(); - if (queue_size_data > DataSize::Zero()) { - // Assuming equal size packets and input/output rate, the average packet - // has avg_time_left_ms left to get queue_size_bytes out of the queue, if - // time constraint shall be met. Determine bitrate needed for that. - packets_.UpdateQueueTime(CurrentTime()); - if (drain_large_queues_) { - TimeDelta avg_time_left = std::max( - TimeDelta::ms(1), queue_time_limit - packets_.AverageQueueTime()); - DataRate min_rate_needed = queue_size_data / avg_time_left; - if (min_rate_needed > target_rate) { - target_rate = min_rate_needed; - RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps=" - << target_rate.kbps(); - } - } - } - - media_budget_.set_target_rate_kbps(target_rate.kbps()); - UpdateBudgetWithElapsedTime(elapsed_time); - } - - bool is_probing = prober_.IsProbing(); - PacedPacketInfo pacing_info; - absl::optional recommended_probe_size; - if (is_probing) { - pacing_info = prober_.CurrentCluster(); - recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize()); - } - - DataSize data_sent = DataSize::Zero(); - // The paused state is checked in the loop since it leaves the critical - // section allowing the paused state to be changed from other code. - while (!paused_) { - auto* packet = GetPendingPacket(pacing_info); - if (packet == nullptr) { - // No packet available to send, check if we should send padding. - if (!legacy_packet_referencing_) { - DataSize padding_to_add = - PaddingToAdd(recommended_probe_size, data_sent); - if (padding_to_add > DataSize::Zero()) { - critsect_.Leave(); - std::vector> padding_packets = - packet_router_->GeneratePadding(padding_to_add.bytes()); - critsect_.Enter(); - if (padding_packets.empty()) { - // No padding packets were generated, quite send loop. - break; - } - for (auto& packet : padding_packets) { - EnqueuePacket(std::move(packet)); - } - // Continue loop to send the padding that was just added. - continue; - } - } - - // Can't fetch new packet and no padding to send, exit send loop. - break; - } - - std::unique_ptr rtp_packet = packet->ReleasePacket(); - const bool owned_rtp_packet = rtp_packet != nullptr; - RtpPacketSendResult success; - - if (rtp_packet != nullptr) { - critsect_.Leave(); - packet_router_->SendPacket(std::move(rtp_packet), pacing_info); - critsect_.Enter(); - success = RtpPacketSendResult::kSuccess; - } else { - critsect_.Leave(); - success = packet_router_->TimeToSendPacket( - packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(), - packet->is_retransmission(), pacing_info); - critsect_.Enter(); - } - - if (success == RtpPacketSendResult::kSuccess || - success == RtpPacketSendResult::kPacketNotFound) { - // Packet sent or invalid packet, remove it from queue. - // TODO(webrtc:8052): Don't consume media budget on kInvalid. - data_sent += packet->size(); - // Send succeeded, remove it from the queue. - OnPacketSent(packet); - if (recommended_probe_size && data_sent > *recommended_probe_size) - break; - } else if (owned_rtp_packet) { - // Send failed, but we can't put it back in the queue, remove it without - // consuming budget. - packets_.FinalizePop(); - break; - } else { - // Send failed, put it back into the queue. - packets_.CancelPop(); - break; - } - } - - if (legacy_packet_referencing_ && packets_.Empty() && !Congested()) { - // We can not send padding unless a normal packet has first been sent. If we - // do, timestamps get messed up. - if (packet_counter_ > 0) { - DataSize padding_needed = - (recommended_probe_size && *recommended_probe_size > data_sent) - ? (*recommended_probe_size - data_sent) - : DataSize::bytes(padding_budget_.bytes_remaining()); - if (padding_needed > DataSize::Zero()) { - DataSize padding_sent = DataSize::Zero(); - critsect_.Leave(); - padding_sent = DataSize::bytes(packet_router_->TimeToSendPadding( - padding_needed.bytes(), pacing_info)); - critsect_.Enter(); - data_sent += padding_sent; - OnPaddingSent(padding_sent); - } - } - } - - if (is_probing) { - probing_send_failure_ = data_sent == DataSize::Zero(); - if (!probing_send_failure_) { - prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes()); - } - } + pacing_controller_.ProcessPackets(); } void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { @@ -497,93 +166,49 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { process_thread_ = process_thread; } -DataSize PacedSender::PaddingToAdd( - absl::optional recommended_probe_size, - DataSize data_sent) { - if (!packets_.Empty()) { - // Actual payload available, no need to add padding. - return DataSize::Zero(); - } - - if (Congested()) { - // Don't add padding if congested, even if requested for probing. - return DataSize::Zero(); - } - - if (packet_counter_ == 0) { - // We can not send padding unless a normal packet has first been sent. If we - // do, timestamps get messed up. - return DataSize::Zero(); - } - - if (recommended_probe_size) { - if (*recommended_probe_size > data_sent) { - return *recommended_probe_size - data_sent; - } - return DataSize::Zero(); - } - - return DataSize::bytes(padding_budget_.bytes_remaining()); -} - -RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket( - const PacedPacketInfo& pacing_info) { - if (packets_.Empty()) { - return nullptr; - } - - // Since we need to release the lock in order to send, we first pop the - // element from the priority queue but keep it in storage, so that we can - // reinsert it if send fails. - RoundRobinPacketQueue::QueuedPacket* packet = packets_.BeginPop(); - bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; - bool apply_pacing = !audio_packet || pace_audio_; - if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 && - pacing_info.probe_cluster_id == - PacedPacketInfo::kNotAProbe))) { - packets_.CancelPop(); - return nullptr; - } - return packet; -} - -void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) { - Timestamp now = CurrentTime(); - if (!first_sent_packet_time_) { - first_sent_packet_time_ = now; - } - bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; - if (!audio_packet || account_for_audio_) { - // Update media bytes sent. - UpdateBudgetWithSentData(packet->size()); - last_send_time_ = now; - } - // Send succeeded, remove it from the queue. - packets_.FinalizePop(); -} - -void PacedSender::OnPaddingSent(DataSize data_sent) { - if (data_sent > DataSize::Zero()) { - UpdateBudgetWithSentData(data_sent); - } - last_send_time_ = CurrentTime(); -} - -void PacedSender::UpdateBudgetWithElapsedTime(TimeDelta delta) { - delta = std::min(kMaxProcessingInterval, delta); - media_budget_.IncreaseBudget(delta.ms()); - padding_budget_.IncreaseBudget(delta.ms()); -} - -void PacedSender::UpdateBudgetWithSentData(DataSize size) { - outstanding_data_ += size; - media_budget_.UseBudget(size.bytes()); - padding_budget_.UseBudget(size.bytes()); -} - void PacedSender::SetQueueTimeLimit(TimeDelta limit) { rtc::CritScope cs(&critsect_); - queue_time_limit = limit; + pacing_controller_.SetQueueTimeLimit(limit); +} + +void PacedSender::SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + critsect_.Leave(); + packet_router_->SendPacket(std::move(packet), cluster_info); + critsect_.Enter(); +} + +std::vector> PacedSender::GeneratePadding( + DataSize size) { + std::vector> padding_packets; + critsect_.Leave(); + padding_packets = packet_router_->GeneratePadding(size.bytes()); + critsect_.Enter(); + return padding_packets; +} + +RtpPacketSendResult PacedSender::TimeToSendPacket( + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission, + const PacedPacketInfo& packet_info) { + RtpPacketSendResult result; + critsect_.Leave(); + result = packet_router_->TimeToSendPacket( + ssrc, sequence_number, capture_timestamp, retransmission, packet_info); + critsect_.Enter(); + return result; +} + +DataSize PacedSender::TimeToSendPadding(DataSize size, + const PacedPacketInfo& pacing_info) { + size_t padding_bytes_sent; + critsect_.Leave(); + padding_bytes_sent = + packet_router_->TimeToSendPadding(size.bytes(), pacing_info); + critsect_.Enter(); + return DataSize::bytes(padding_bytes_sent); } } // namespace webrtc diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 07c249f2c8..71e826d394 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -16,6 +16,7 @@ #include #include +#include #include "absl/types/optional.h" #include "api/function_view.h" @@ -25,14 +26,13 @@ #include "modules/include/module.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" +#include "modules/pacing/pacing_controller.h" #include "modules/pacing/packet_router.h" -#include "modules/pacing/round_robin_packet_queue.h" #include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/include/rtp_packet_sender.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/critical_section.h" -#include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -41,7 +41,8 @@ class RtcEventLog; class PacedSender : public Module, public RtpPacketPacer, - public RtpPacketSender { + public RtpPacketSender, + private PacingController::PacketSender { public: // Expected max pacer delay in ms. If ExpectedQueueTime() is higher than // this value, the packet producers should wait (eg drop frames rather than @@ -116,6 +117,7 @@ class PacedSender : public Module, // Below are methods specific to this implementation, such as things related // to module processing thread specifics or methods exposed for test. + // TODO(bugs.webrtc.org/10809): Remove when cleanup up unit tests. // Enable bitrate probing. Enabled by default, mostly here to simplify // testing. Must be called before any packets are being sent to have an // effect. @@ -134,69 +136,30 @@ class PacedSender : public Module, void ProcessThreadAttached(ProcessThread* process_thread) override; private: - TimeDelta UpdateTimeAndGetElapsed(Timestamp now) - RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - bool ShouldSendKeepalive(Timestamp now) const + // Methods implementing PacedSenderController:PacketSender. + + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - // Updates the number of bytes that can be sent for the next time interval. - void UpdateBudgetWithElapsedTime(TimeDelta delta) - RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - void UpdateBudgetWithSentData(DataSize size) - RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + std::vector> GeneratePadding( + DataSize size) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - DataSize PaddingToAdd(absl::optional recommended_probe_size, - DataSize data_sent) + // TODO(bugs.webrtc.org/10633): Remove these when old code path is gone. + RtpPacketSendResult TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission, + const PacedPacketInfo& packet_info) + override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + DataSize TimeToSendPadding(DataSize size, + const PacedPacketInfo& pacing_info) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - RoundRobinPacketQueue::QueuedPacket* GetPendingPacket( - const PacedPacketInfo& pacing_info) - RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) - RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - void OnPaddingSent(DataSize padding_sent) - RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - - bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - Timestamp CurrentTime() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - - Clock* const clock_; - PacketRouter* const packet_router_; - const std::unique_ptr fallback_field_trials_; - const WebRtcKeyValueConfig* field_trials_; - - const bool drain_large_queues_; - const bool send_padding_if_silent_; - const bool pace_audio_; - TimeDelta min_packet_limit_; - rtc::CriticalSection critsect_; - // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic. - // The last millisecond timestamp returned by |clock_|. - mutable Timestamp last_timestamp_ RTC_GUARDED_BY(critsect_); - bool paused_ RTC_GUARDED_BY(critsect_); - // This is the media budget, keeping track of how many bits of media - // we can pace out during the current interval. - IntervalBudget media_budget_ RTC_GUARDED_BY(critsect_); - // This is the padding budget, keeping track of how many bits of padding we're - // allowed to send out during the current interval. This budget will be - // utilized when there's no media to send. - IntervalBudget padding_budget_ RTC_GUARDED_BY(critsect_); + PacingController pacing_controller_ RTC_GUARDED_BY(critsect_); - BitrateProber prober_ RTC_GUARDED_BY(critsect_); - bool probing_send_failure_ RTC_GUARDED_BY(critsect_); - - DataRate pacing_bitrate_ RTC_GUARDED_BY(critsect_); - - Timestamp time_last_process_ RTC_GUARDED_BY(critsect_); - Timestamp last_send_time_ RTC_GUARDED_BY(critsect_); - absl::optional first_sent_packet_time_ RTC_GUARDED_BY(critsect_); - - RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_); - uint64_t packet_counter_ RTC_GUARDED_BY(critsect_); - - DataSize congestion_window_size_ RTC_GUARDED_BY(critsect_); - DataSize outstanding_data_ RTC_GUARDED_BY(critsect_); + PacketRouter* const packet_router_; // Lock to avoid race when attaching process thread. This can happen due to // the Call class setting network state on RtpTransportControllerSend, which @@ -205,14 +168,6 @@ class PacedSender : public Module, // queue separate from the thread used by Call, this causes a race. rtc::CriticalSection process_thread_lock_; ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_); - - TimeDelta queue_time_limit RTC_GUARDED_BY(critsect_); - bool account_for_audio_ RTC_GUARDED_BY(critsect_); - - // If true, PacedSender should only reference packets as in legacy mode. - // If false, PacedSender may have direct ownership of RtpPacketToSend objects. - // Defaults to true, will be changed to default false soon. - const bool legacy_packet_referencing_; }; } // namespace webrtc #endif // MODULES_PACING_PACED_SENDER_H_ diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc new file mode 100644 index 0000000000..233a3facf0 --- /dev/null +++ b/modules/pacing/pacing_controller.cc @@ -0,0 +1,552 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/pacing_controller.h" + +#include +#include +#include + +#include "absl/memory/memory.h" +#include "modules/pacing/bitrate_prober.h" +#include "modules/pacing/interval_budget.h" +#include "modules/utility/include/process_thread.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/time_utils.h" +#include "system_wrappers/include/clock.h" + +namespace webrtc { +namespace { +// Time limit in milliseconds between packet bursts. +constexpr TimeDelta kDefaultMinPacketLimit = TimeDelta::Millis<5>(); +constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis<500>(); +constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds<2>(); + +// Upper cap on process interval, in case process has not been called in a long +// time. +constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis<30>(); + +bool IsDisabled(const WebRtcKeyValueConfig& field_trials, + absl::string_view key) { + return field_trials.Lookup(key).find("Disabled") == 0; +} + +bool IsEnabled(const WebRtcKeyValueConfig& field_trials, + absl::string_view key) { + return field_trials.Lookup(key).find("Enabled") == 0; +} + +int GetPriorityForType(RtpPacketToSend::Type type) { + switch (type) { + case RtpPacketToSend::Type::kAudio: + // Audio is always prioritized over other packet types. + return 0; + case RtpPacketToSend::Type::kRetransmission: + // Send retransmissions before new media. + return 1; + case RtpPacketToSend::Type::kVideo: + // Video has "normal" priority, in the old speak. + return 2; + case RtpPacketToSend::Type::kForwardErrorCorrection: + // Send redundancy concurrently to video. If it is delayed it might have a + // lower chance of being useful. + return 2; + case RtpPacketToSend::Type::kPadding: + // Packets that are in themselves likely useless, only sent to keep the + // BWE high. + return 3; + } +} + +} // namespace + +const TimeDelta PacingController::kMaxExpectedQueueLength = + TimeDelta::Millis<2000>(); +const float PacingController::kDefaultPaceMultiplier = 2.5f; +const TimeDelta PacingController::kPausedProcessInterval = + kCongestedPacketInterval; + +PacingController::PacingController(Clock* clock, + PacketSender* packet_sender, + RtcEventLog* event_log, + const WebRtcKeyValueConfig* field_trials) + : clock_(clock), + packet_sender_(packet_sender), + fallback_field_trials_( + !field_trials ? absl::make_unique() : nullptr), + field_trials_(field_trials ? field_trials : fallback_field_trials_.get()), + drain_large_queues_( + !IsDisabled(*field_trials_, "WebRTC-Pacer-DrainQueue")), + send_padding_if_silent_( + IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")), + pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")), + min_packet_limit_(kDefaultMinPacketLimit), + last_timestamp_(clock_->CurrentTime()), + paused_(false), + media_budget_(0), + padding_budget_(0), + prober_(*field_trials_), + probing_send_failure_(false), + padding_failure_state_(false), + pacing_bitrate_(DataRate::Zero()), + time_last_process_(clock->CurrentTime()), + last_send_time_(time_last_process_), + packet_queue_(time_last_process_, field_trials), + packet_counter_(0), + congestion_window_size_(DataSize::PlusInfinity()), + outstanding_data_(DataSize::Zero()), + queue_time_limit(kMaxExpectedQueueLength), + account_for_audio_(false), + legacy_packet_referencing_( + IsEnabled(*field_trials_, "WebRTC-Pacer-LegacyPacketReferencing")) { + if (!drain_large_queues_) { + RTC_LOG(LS_WARNING) << "Pacer queues will not be drained," + "pushback experiment must be enabled."; + } + FieldTrialParameter min_packet_limit_ms("", min_packet_limit_.ms()); + ParseFieldTrial({&min_packet_limit_ms}, + field_trials_->Lookup("WebRTC-Pacer-MinPacketLimitMs")); + min_packet_limit_ = TimeDelta::ms(min_packet_limit_ms.Get()); + UpdateBudgetWithElapsedTime(min_packet_limit_); +} + +PacingController::~PacingController() = default; + +void PacingController::CreateProbeCluster(DataRate bitrate, int cluster_id) { + prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id); +} + +void PacingController::Pause() { + if (!paused_) + RTC_LOG(LS_INFO) << "PacedSender paused."; + paused_ = true; + packet_queue_.SetPauseState(true, CurrentTime()); +} + +void PacingController::Resume() { + if (paused_) + RTC_LOG(LS_INFO) << "PacedSender resumed."; + paused_ = false; + packet_queue_.SetPauseState(false, CurrentTime()); +} + +bool PacingController::IsPaused() const { + return paused_; +} + +void PacingController::SetCongestionWindow(DataSize congestion_window_size) { + congestion_window_size_ = congestion_window_size; +} + +void PacingController::UpdateOutstandingData(DataSize outstanding_data) { + outstanding_data_ = outstanding_data; +} + +bool PacingController::Congested() const { + if (congestion_window_size_.IsFinite()) { + return outstanding_data_ >= congestion_window_size_; + } + return false; +} + +Timestamp PacingController::CurrentTime() const { + Timestamp time = clock_->CurrentTime(); + if (time < last_timestamp_) { + RTC_LOG(LS_WARNING) + << "Non-monotonic clock behavior observed. Previous timestamp: " + << last_timestamp_.ms() << ", new timestamp: " << time.ms(); + RTC_DCHECK_GE(time, last_timestamp_); + time = last_timestamp_; + } + last_timestamp_ = time; + return time; +} + +void PacingController::SetProbingEnabled(bool enabled) { + RTC_CHECK_EQ(0, packet_counter_); + prober_.SetEnabled(enabled); +} + +void PacingController::SetPacingRates(DataRate pacing_rate, + DataRate padding_rate) { + RTC_DCHECK_GT(pacing_rate, DataRate::Zero()); + pacing_bitrate_ = pacing_rate; + padding_budget_.set_target_rate_kbps(padding_rate.kbps()); + + RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" + << pacing_bitrate_.kbps() + << " padding_budget_kbps=" << padding_rate.kbps(); +} + +void PacingController::InsertPacket(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission) { + RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) + << "SetPacingRate must be called before InsertPacket."; + + Timestamp now = CurrentTime(); + prober_.OnIncomingPacket(bytes); + + if (capture_time_ms < 0) + capture_time_ms = now.ms(); + + RtpPacketToSend::Type type; + switch (priority) { + case RtpPacketSender::kHighPriority: + type = RtpPacketToSend::Type::kAudio; + break; + case RtpPacketSender::kNormalPriority: + type = RtpPacketToSend::Type::kRetransmission; + break; + default: + type = RtpPacketToSend::Type::kVideo; + } + packet_queue_.Push(GetPriorityForType(type), type, ssrc, sequence_number, + capture_time_ms, now, DataSize::bytes(bytes), + retransmission, packet_counter_++); +} + +void PacingController::EnqueuePacket(std::unique_ptr packet) { + RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) + << "SetPacingRate must be called before InsertPacket."; + + Timestamp now = CurrentTime(); + prober_.OnIncomingPacket(packet->payload_size()); + + if (packet->capture_time_ms() < 0) { + packet->set_capture_time_ms(now.ms()); + } + + RTC_CHECK(packet->packet_type()); + int priority = GetPriorityForType(*packet->packet_type()); + packet_queue_.Push(priority, now, packet_counter_++, std::move(packet)); +} + +void PacingController::SetAccountForAudioPackets(bool account_for_audio) { + account_for_audio_ = account_for_audio; +} + +TimeDelta PacingController::ExpectedQueueTime() const { + RTC_DCHECK_GT(pacing_bitrate_, DataRate::Zero()); + return TimeDelta::ms( + (QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) / + pacing_bitrate_.bps()); +} + +size_t PacingController::QueueSizePackets() const { + return packet_queue_.SizeInPackets(); +} + +DataSize PacingController::QueueSizeData() const { + return packet_queue_.Size(); +} + +absl::optional PacingController::FirstSentPacketTime() const { + return first_sent_packet_time_; +} + +TimeDelta PacingController::OldestPacketWaitTime() const { + Timestamp oldest_packet = packet_queue_.OldestEnqueueTime(); + if (oldest_packet.IsInfinite()) { + return TimeDelta::Zero(); + } + + return CurrentTime() - oldest_packet; +} + +TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { + TimeDelta elapsed_time = now - time_last_process_; + time_last_process_ = now; + if (elapsed_time > kMaxElapsedTime) { + RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time.ms() + << " ms) longer than expected, limiting to " + << kMaxElapsedTime.ms(); + elapsed_time = kMaxElapsedTime; + } + return elapsed_time; +} + +bool PacingController::ShouldSendKeepalive(Timestamp now) const { + if (send_padding_if_silent_ || paused_ || Congested()) { + // We send a padding packet every 500 ms to ensure we won't get stuck in + // congested state due to no feedback being received. + TimeDelta elapsed_since_last_send = now - last_send_time_; + if (elapsed_since_last_send >= kCongestedPacketInterval) { + // We can not send padding unless a normal packet has first been sent. If + // we do, timestamps get messed up. + if (packet_counter_ > 0) { + return true; + } + } + } + return false; +} + +absl::optional PacingController::TimeUntilNextProbe() { + if (!prober_.IsProbing()) { + return absl::nullopt; + } + + TimeDelta time_delta = + TimeDelta::ms(prober_.TimeUntilNextProbe(CurrentTime().ms())); + if (time_delta > TimeDelta::Zero() || + (time_delta == TimeDelta::Zero() && !probing_send_failure_)) { + return time_delta; + } + + return absl::nullopt; +} + +TimeDelta PacingController::TimeElapsedSinceLastProcess() const { + return CurrentTime() - time_last_process_; +} + +void PacingController::ProcessPackets() { + Timestamp now = CurrentTime(); + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); + if (ShouldSendKeepalive(now)) { + if (legacy_packet_referencing_) { + OnPaddingSent(packet_sender_->TimeToSendPadding(DataSize::bytes(1), + PacedPacketInfo())); + } else { + DataSize keepalive_data_sent = DataSize::Zero(); + std::vector> keepalive_packets = + packet_sender_->GeneratePadding(DataSize::bytes(1)); + for (auto& packet : keepalive_packets) { + keepalive_data_sent += + DataSize::bytes(packet->payload_size() + packet->padding_size()); + packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo()); + } + OnPaddingSent(keepalive_data_sent); + } + } + + if (paused_) + return; + + if (elapsed_time > TimeDelta::Zero()) { + DataRate target_rate = pacing_bitrate_; + DataSize queue_size_data = packet_queue_.Size(); + if (queue_size_data > DataSize::Zero()) { + // Assuming equal size packets and input/output rate, the average packet + // has avg_time_left_ms left to get queue_size_bytes out of the queue, if + // time constraint shall be met. Determine bitrate needed for that. + packet_queue_.UpdateQueueTime(CurrentTime()); + if (drain_large_queues_) { + TimeDelta avg_time_left = + std::max(TimeDelta::ms(1), + queue_time_limit - packet_queue_.AverageQueueTime()); + DataRate min_rate_needed = queue_size_data / avg_time_left; + if (min_rate_needed > target_rate) { + target_rate = min_rate_needed; + RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps=" + << target_rate.kbps(); + } + } + } + + media_budget_.set_target_rate_kbps(target_rate.kbps()); + UpdateBudgetWithElapsedTime(elapsed_time); + } + + bool is_probing = prober_.IsProbing(); + PacedPacketInfo pacing_info; + absl::optional recommended_probe_size; + if (is_probing) { + pacing_info = prober_.CurrentCluster(); + recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize()); + } + + DataSize data_sent = DataSize::Zero(); + // The paused state is checked in the loop since it leaves the critical + // section allowing the paused state to be changed from other code. + while (!paused_) { + auto* packet = GetPendingPacket(pacing_info); + if (packet == nullptr) { + // No packet available to send, check if we should send padding. + if (!legacy_packet_referencing_) { + DataSize padding_to_add = + PaddingToAdd(recommended_probe_size, data_sent); + if (padding_to_add > DataSize::Zero()) { + std::vector> padding_packets = + packet_sender_->GeneratePadding(padding_to_add); + if (padding_packets.empty()) { + // No padding packets were generated, quite send loop. + break; + } + for (auto& packet : padding_packets) { + EnqueuePacket(std::move(packet)); + } + // Continue loop to send the padding that was just added. + continue; + } + } + + // Can't fetch new packet and no padding to send, exit send loop. + break; + } + + std::unique_ptr rtp_packet = packet->ReleasePacket(); + const bool owned_rtp_packet = rtp_packet != nullptr; + RtpPacketSendResult success; + + if (rtp_packet != nullptr) { + packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info); + success = RtpPacketSendResult::kSuccess; + } else { + success = packet_sender_->TimeToSendPacket( + packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(), + packet->is_retransmission(), pacing_info); + } + + if (success == RtpPacketSendResult::kSuccess || + success == RtpPacketSendResult::kPacketNotFound) { + // Packet sent or invalid packet, remove it from queue. + // TODO(webrtc:8052): Don't consume media budget on kInvalid. + data_sent += packet->size(); + // Send succeeded, remove it from the queue. + OnPacketSent(packet); + if (recommended_probe_size && data_sent > *recommended_probe_size) + break; + } else if (owned_rtp_packet) { + // Send failed, but we can't put it back in the queue, remove it without + // consuming budget. + packet_queue_.FinalizePop(); + break; + } else { + // Send failed, put it back into the queue. + packet_queue_.CancelPop(); + break; + } + } + + if (legacy_packet_referencing_ && packet_queue_.Empty() && !Congested()) { + // We can not send padding unless a normal packet has first been sent. If we + // do, timestamps get messed up. + if (packet_counter_ > 0) { + DataSize padding_needed = + (recommended_probe_size && *recommended_probe_size > data_sent) + ? (*recommended_probe_size - data_sent) + : DataSize::bytes(padding_budget_.bytes_remaining()); + if (padding_needed > DataSize::Zero()) { + DataSize padding_sent = DataSize::Zero(); + padding_sent = + packet_sender_->TimeToSendPadding(padding_needed, pacing_info); + data_sent += padding_sent; + OnPaddingSent(padding_sent); + } + } + } + + if (is_probing) { + probing_send_failure_ = data_sent == DataSize::Zero(); + if (!probing_send_failure_) { + prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes()); + } + } +} + +DataSize PacingController::PaddingToAdd( + absl::optional recommended_probe_size, + DataSize data_sent) { + if (!packet_queue_.Empty()) { + // Actual payload available, no need to add padding. + return DataSize::Zero(); + } + + if (Congested()) { + // Don't add padding if congested, even if requested for probing. + return DataSize::Zero(); + } + + if (packet_counter_ == 0) { + // We can not send padding unless a normal packet has first been sent. If we + // do, timestamps get messed up. + return DataSize::Zero(); + } + + if (recommended_probe_size) { + if (*recommended_probe_size > data_sent) { + return *recommended_probe_size - data_sent; + } + return DataSize::Zero(); + } + + return DataSize::bytes(padding_budget_.bytes_remaining()); +} + +RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket( + const PacedPacketInfo& pacing_info) { + if (packet_queue_.Empty()) { + return nullptr; + } + + // Since we need to release the lock in order to send, we first pop the + // element from the priority queue but keep it in storage, so that we can + // reinsert it if send fails. + RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop(); + bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; + bool apply_pacing = !audio_packet || pace_audio_; + if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 && + pacing_info.probe_cluster_id == + PacedPacketInfo::kNotAProbe))) { + packet_queue_.CancelPop(); + return nullptr; + } + return packet; +} + +void PacingController::OnPacketSent( + RoundRobinPacketQueue::QueuedPacket* packet) { + Timestamp now = CurrentTime(); + if (!first_sent_packet_time_) { + first_sent_packet_time_ = now; + } + bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; + if (!audio_packet || account_for_audio_) { + // Update media bytes sent. + UpdateBudgetWithSentData(packet->size()); + last_send_time_ = now; + } + // Send succeeded, remove it from the queue. + packet_queue_.FinalizePop(); + padding_failure_state_ = false; +} + +void PacingController::OnPaddingSent(DataSize data_sent) { + if (data_sent > DataSize::Zero()) { + UpdateBudgetWithSentData(data_sent); + } else { + padding_failure_state_ = true; + } + last_send_time_ = CurrentTime(); +} + +void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { + delta = std::min(kMaxProcessingInterval, delta); + media_budget_.IncreaseBudget(delta.ms()); + padding_budget_.IncreaseBudget(delta.ms()); +} + +void PacingController::UpdateBudgetWithSentData(DataSize size) { + outstanding_data_ += size; + media_budget_.UseBudget(size.bytes()); + padding_budget_.UseBudget(size.bytes()); +} + +void PacingController::SetQueueTimeLimit(TimeDelta limit) { + queue_time_limit = limit; +} + +} // namespace webrtc diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h new file mode 100644 index 0000000000..0948616919 --- /dev/null +++ b/modules/pacing/pacing_controller.h @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACING_CONTROLLER_H_ +#define MODULES_PACING_PACING_CONTROLLER_H_ + +#include +#include + +#include +#include +#include + +#include "absl/types/optional.h" +#include "api/function_view.h" +#include "api/rtc_event_log/rtc_event_log.h" +#include "api/transport/field_trial_based_config.h" +#include "api/transport/network_types.h" +#include "api/transport/webrtc_key_value_config.h" +#include "modules/pacing/bitrate_prober.h" +#include "modules/pacing/interval_budget.h" +#include "modules/pacing/round_robin_packet_queue.h" +#include "modules/pacing/rtp_packet_pacer.h" +#include "modules/rtp_rtcp/include/rtp_packet_sender.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" +#include "rtc_base/critical_section.h" +#include "rtc_base/experiments/field_trial_parser.h" +#include "rtc_base/thread_annotations.h" + +namespace webrtc { + +// This class implements a leaky-buck packet pacing algorithm. It handles the +// logic of determining which packets to send when, but the actual timing of +// the processing is done externally (e.g. PacedSender). Furthermore, the +// forwarding of packets when they are ready to be sent is also handled +// externally, via the PacedSendingController::PacketSender interface. +// +class PacingController { + public: + class PacketSender { + public: + virtual ~PacketSender() = default; + virtual void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) = 0; + virtual std::vector> GeneratePadding( + DataSize size) = 0; + + // TODO(bugs.webrtc.org/10633): Remove these when old code path is gone. + virtual RtpPacketSendResult TimeToSendPacket( + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission, + const PacedPacketInfo& packet_info) = 0; + virtual DataSize TimeToSendPadding(DataSize size, + const PacedPacketInfo& pacing_info) = 0; + }; + + // Expected max pacer delay. If ExpectedQueueTime() is higher than + // this value, the packet producers should wait (eg drop frames rather than + // encoding them). Bitrate sent may temporarily exceed target set by + // UpdateBitrate() so that this limit will be upheld. + static const TimeDelta kMaxExpectedQueueLength; + // Pacing-rate relative to our target send rate. + // Multiplicative factor that is applied to the target bitrate to calculate + // the number of bytes that can be transmitted per interval. + // Increasing this factor will result in lower delays in cases of bitrate + // overshoots from the encoder. + static const float kDefaultPaceMultiplier; + // If no media or paused, wake up at least every |kPausedProcessIntervalMs| in + // order to send a keep-alive packet so we don't get stuck in a bad state due + // to lack of feedback. + static const TimeDelta kPausedProcessInterval; + + PacingController(Clock* clock, + PacketSender* packet_sender, + RtcEventLog* event_log, + const WebRtcKeyValueConfig* field_trials); + + ~PacingController(); + + // Adds the packet information to the queue and calls TimeToSendPacket + // when it's time to send. + void InsertPacket(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission); + // Adds the packet to the queue and calls PacketRouter::SendPacket() when + // it's time to send. + void EnqueuePacket(std::unique_ptr packet); + + void CreateProbeCluster(DataRate bitrate, int cluster_id); + + void Pause(); // Temporarily pause all sending. + void Resume(); // Resume sending packets. + bool IsPaused() const; + + void SetCongestionWindow(DataSize congestion_window_size); + void UpdateOutstandingData(DataSize outstanding_data); + + // Sets the pacing rates. Must be called once before packets can be sent. + void SetPacingRates(DataRate pacing_rate, DataRate padding_rate); + + // Currently audio traffic is not accounted by pacer and passed through. + // With the introduction of audio BWE audio traffic will be accounted for + // the pacer budget calculation. The audio traffic still will be injected + // at high priority. + void SetAccountForAudioPackets(bool account_for_audio); + + // Returns the time since the oldest queued packet was enqueued. + TimeDelta OldestPacketWaitTime() const; + + size_t QueueSizePackets() const; + DataSize QueueSizeData() const; + + // Returns the time when the first packet was sent; + absl::optional FirstSentPacketTime() const; + + // Returns the number of milliseconds it will take to send the current + // packets in the queue, given the current size and bitrate, ignoring prio. + TimeDelta ExpectedQueueTime() const; + + void SetQueueTimeLimit(TimeDelta limit); + + // Enable bitrate probing. Enabled by default, mostly here to simplify + // testing. Must be called before any packets are being sent to have an + // effect. + void SetProbingEnabled(bool enabled); + + // Time until next probe should be sent. If this value is set, it should be + // respected - i.e. don't call ProcessPackets() before this specified time as + // that can have unintended side effects. + absl::optional TimeUntilNextProbe(); + + // Time since ProcessPackets() was last executed. + TimeDelta TimeElapsedSinceLastProcess() const; + + TimeDelta TimeUntilAvailableBudget() const; + + // Check queue of pending packets and send them or padding packets, if budget + // is available. + void ProcessPackets(); + + bool Congested() const; + + private: + TimeDelta UpdateTimeAndGetElapsed(Timestamp now); + bool ShouldSendKeepalive(Timestamp now) const; + + // Updates the number of bytes that can be sent for the next time interval. + void UpdateBudgetWithElapsedTime(TimeDelta delta); + void UpdateBudgetWithSentData(DataSize size); + + DataSize PaddingToAdd(absl::optional recommended_probe_size, + DataSize data_sent); + + RoundRobinPacketQueue::QueuedPacket* GetPendingPacket( + const PacedPacketInfo& pacing_info); + void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet); + void OnPaddingSent(DataSize padding_sent); + + Timestamp CurrentTime() const; + + Clock* const clock_; + PacketSender* const packet_sender_; + const std::unique_ptr fallback_field_trials_; + const WebRtcKeyValueConfig* field_trials_; + + const bool drain_large_queues_; + const bool send_padding_if_silent_; + const bool pace_audio_; + TimeDelta min_packet_limit_; + + // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic. + // The last millisecond timestamp returned by |clock_|. + mutable Timestamp last_timestamp_; + bool paused_; + // This is the media budget, keeping track of how many bits of media + // we can pace out during the current interval. + IntervalBudget media_budget_; + // This is the padding budget, keeping track of how many bits of padding we're + // allowed to send out during the current interval. This budget will be + // utilized when there's no media to send. + IntervalBudget padding_budget_; + + BitrateProber prober_; + bool probing_send_failure_; + bool padding_failure_state_; + + DataRate pacing_bitrate_; + + Timestamp time_last_process_; + Timestamp last_send_time_; + absl::optional first_sent_packet_time_; + + RoundRobinPacketQueue packet_queue_; + uint64_t packet_counter_; + + DataSize congestion_window_size_; + DataSize outstanding_data_; + + TimeDelta queue_time_limit; + bool account_for_audio_; + + // If true, PacedSender should only reference packets as in legacy mode. + // If false, PacedSender may have direct ownership of RtpPacketToSend objects. + // Defaults to true, will be changed to default false soon. + const bool legacy_packet_referencing_; +}; +} // namespace webrtc + +#endif // MODULES_PACING_PACING_CONTROLLER_H_ diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc new file mode 100644 index 0000000000..a092e01cf9 --- /dev/null +++ b/modules/pacing/pacing_controller_unittest.cc @@ -0,0 +1,1490 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/pacing_controller.h" + +#include +#include +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "api/units/data_rate.h" +#include "modules/pacing/packet_router.h" +#include "system_wrappers/include/clock.h" +#include "test/field_trial.h" +#include "test/gmock.h" +#include "test/gtest.h" + +using ::testing::_; +using ::testing::Field; +using ::testing::Pointee; +using ::testing::Property; +using ::testing::Return; + +namespace webrtc { +namespace test { +namespace { +constexpr DataRate kFirstClusterRate = DataRate::KilobitsPerSec<900>(); +constexpr DataRate kSecondClusterRate = DataRate::KilobitsPerSec<1800>(); + +// The error stems from truncating the time interval of probe packets to integer +// values. This results in probing slightly higher than the target bitrate. +// For 1.8 Mbps, this comes to be about 120 kbps with 1200 probe packets. +constexpr DataRate kProbingErrorMargin = DataRate::KilobitsPerSec<150>(); + +const float kPaceMultiplier = 2.5f; + +constexpr uint32_t kAudioSsrc = 12345; +constexpr uint32_t kVideoSsrc = 234565; +constexpr uint32_t kVideoRtxSsrc = 34567; +constexpr uint32_t kFlexFecSsrc = 45678; + +constexpr DataRate kTargetRate = DataRate::KilobitsPerSec<800>(); + +enum class PacerMode { kReferencePackets, kOwnPackets }; +std::string GetFieldTrialStirng(PacerMode mode) { + std::string field_trial = "WebRTC-Pacer-LegacyPacketReferencing/"; + switch (mode) { + case PacerMode::kOwnPackets: + field_trial += "Disabled"; + break; + case PacerMode::kReferencePackets: + field_trial += "Enabled"; + break; + } + field_trial += "/"; + return field_trial; +} + +// TODO(bugs.webrtc.org/10633): Remove when packets are always owned by pacer. +RtpPacketSender::Priority PacketTypeToPriority(RtpPacketToSend::Type type) { + switch (type) { + case RtpPacketToSend::Type::kAudio: + return RtpPacketSender::Priority::kHighPriority; + case RtpPacketToSend::Type::kVideo: + return RtpPacketSender::Priority::kLowPriority; + case RtpPacketToSend::Type::kRetransmission: + return RtpPacketSender::Priority::kNormalPriority; + case RtpPacketToSend::Type::kForwardErrorCorrection: + return RtpPacketSender::Priority::kLowPriority; + break; + case RtpPacketToSend::Type::kPadding: + RTC_NOTREACHED() << "Unexpected type for legacy path: kPadding"; + break; + } + return RtpPacketSender::Priority::kLowPriority; +} + +std::unique_ptr BuildPacket(RtpPacketToSend::Type type, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t size) { + auto packet = absl::make_unique(nullptr); + packet->set_packet_type(type); + packet->SetSsrc(ssrc); + packet->SetSequenceNumber(sequence_number); + packet->set_capture_time_ms(capture_time_ms); + packet->SetPayloadSize(size); + return packet; +} +} // namespace + +// Mock callback proxy, where both new and old api redirects to common mock +// methods that focus on core aspects. +class MockPacingControllerCallback : public PacingController::PacketSender { + public: + RtpPacketSendResult TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission, + const PacedPacketInfo& packet_info) { + SendPacket(ssrc, sequence_number, capture_timestamp, retransmission, false); + return RtpPacketSendResult::kSuccess; + } + + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) override { + SendPacket(packet->Ssrc(), packet->SequenceNumber(), + packet->capture_time_ms(), + packet->packet_type() == RtpPacketToSend::Type::kRetransmission, + packet->packet_type() == RtpPacketToSend::Type::kPadding); + } + + DataSize TimeToSendPadding(DataSize size, + const PacedPacketInfo& packet_info) override { + return DataSize::bytes(SendPadding(size.bytes())); + } + + std::vector> GeneratePadding( + DataSize target_size) override { + std::vector> ret; + size_t padding_size = SendPadding(target_size.bytes()); + if (padding_size > 0) { + auto packet = absl::make_unique(nullptr); + packet->SetPayloadSize(padding_size); + packet->set_packet_type(RtpPacketToSend::Type::kPadding); + ret.emplace_back(std::move(packet)); + } + return ret; + } + + MOCK_METHOD5(SendPacket, + void(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission, + bool padding)); + MOCK_METHOD1(SendPadding, size_t(size_t target_size)); +}; + +// Mock callback implementing the raw api. +class MockPacketSender : public PacingController::PacketSender { + public: + MOCK_METHOD5(TimeToSendPacket, + RtpPacketSendResult(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission, + const PacedPacketInfo& pacing_info)); + MOCK_METHOD2(TimeToSendPadding, + DataSize(DataSize size, const PacedPacketInfo& pacing_info)); + + MOCK_METHOD2(SendRtpPacket, + void(std::unique_ptr packet, + const PacedPacketInfo& cluster_info)); + MOCK_METHOD1( + GeneratePadding, + std::vector>(DataSize target_size)); +}; + +class PacingControllerPadding : public PacingController::PacketSender { + public: + static const size_t kPaddingPacketSize = 224; + + PacingControllerPadding() : padding_sent_(0) {} + + RtpPacketSendResult TimeToSendPacket( + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission, + const PacedPacketInfo& pacing_info) override { + return RtpPacketSendResult::kSuccess; + } + + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& pacing_info) override {} + + DataSize TimeToSendPadding(DataSize size, + const PacedPacketInfo& pacing_info) override { + size_t num_packets = + (size.bytes() + kPaddingPacketSize - 1) / kPaddingPacketSize; + padding_sent_ += kPaddingPacketSize * num_packets; + return DataSize::bytes(kPaddingPacketSize * num_packets); + } + + std::vector> GeneratePadding( + DataSize target_size) override { + size_t num_packets = + (target_size.bytes() + kPaddingPacketSize - 1) / kPaddingPacketSize; + std::vector> packets; + for (size_t i = 0; i < num_packets; ++i) { + packets.emplace_back(absl::make_unique(nullptr)); + packets.back()->SetPadding(kPaddingPacketSize); + packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding); + padding_sent_ += kPaddingPacketSize; + } + return packets; + } + + size_t padding_sent() { return padding_sent_; } + + private: + size_t padding_sent_; +}; + +class PacingControllerProbing : public PacingController::PacketSender { + public: + PacingControllerProbing() : packets_sent_(0), padding_sent_(0) {} + + RtpPacketSendResult TimeToSendPacket( + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission, + const PacedPacketInfo& pacing_info) override { + ++packets_sent_; + return RtpPacketSendResult::kSuccess; + } + + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& pacing_info) override { + if (packet->packet_type() != RtpPacketToSend::Type::kPadding) { + ++packets_sent_; + } + } + + DataSize TimeToSendPadding(DataSize size, + const PacedPacketInfo& pacing_info) override { + padding_sent_ += size.bytes(); + return DataSize::bytes(padding_sent_); + } + + std::vector> GeneratePadding( + DataSize target_size) override { + std::vector> packets; + packets.emplace_back(absl::make_unique(nullptr)); + packets.back()->SetPadding(target_size.bytes()); + packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding); + padding_sent_ += target_size.bytes(); + return packets; + } + + int packets_sent() const { return packets_sent_; } + + int padding_sent() const { return padding_sent_; } + + private: + int packets_sent_; + int padding_sent_; +}; + +class PacingControllerTest : public ::testing::TestWithParam { + protected: + PacingControllerTest() + : clock_(123456), field_trial_(GetFieldTrialStirng(GetParam())) { + srand(0); + // Need to initialize PacingController after we initialize clock. + pacer_ = absl::make_unique(&clock_, &callback_, nullptr, + nullptr); + Init(); + } + + void Init() { + pacer_->CreateProbeCluster(kFirstClusterRate, /*cluster_id=*/0); + pacer_->CreateProbeCluster(kSecondClusterRate, /*cluster_id=*/1); + // Default to bitrate probing disabled for testing purposes. Probing tests + // have to enable probing, either by creating a new PacingController + // instance or by calling SetProbingEnabled(true). + pacer_->SetProbingEnabled(false); + pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, DataRate::Zero()); + + clock_.AdvanceTime(TimeUntilNextProcess()); + } + + void Send(RtpPacketToSend::Type type, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t size) { + if (GetParam() == PacerMode::kReferencePackets) { + pacer_->InsertPacket(PacketTypeToPriority(type), ssrc, sequence_number, + capture_time_ms, size, + type == RtpPacketToSend::Type::kRetransmission); + } else { + pacer_->EnqueuePacket( + BuildPacket(type, ssrc, sequence_number, capture_time_ms, size)); + } + } + + void SendAndExpectPacket(RtpPacketToSend::Type type, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t size) { + Send(type, ssrc, sequence_number, capture_time_ms, size); + EXPECT_CALL( + callback_, + SendPacket(ssrc, sequence_number, capture_time_ms, + type == RtpPacketToSend::Type::kRetransmission, false)) + .Times(1); + } + + void ExpectSendPadding() { + if (GetParam() == PacerMode::kOwnPackets) { + EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1); + } + } + + std::unique_ptr BuildRtpPacket(RtpPacketToSend::Type type) { + auto packet = absl::make_unique(nullptr); + packet->set_packet_type(type); + switch (type) { + case RtpPacketToSend::Type::kAudio: + packet->SetSsrc(kAudioSsrc); + break; + case RtpPacketToSend::Type::kVideo: + packet->SetSsrc(kVideoSsrc); + break; + case RtpPacketToSend::Type::kRetransmission: + case RtpPacketToSend::Type::kPadding: + packet->SetSsrc(kVideoRtxSsrc); + break; + case RtpPacketToSend::Type::kForwardErrorCorrection: + packet->SetSsrc(kFlexFecSsrc); + break; + } + + packet->SetPayloadSize(234); + return packet; + } + + TimeDelta TimeUntilNextProcess() { + // TODO(bugs.webrtc.org/10809): Replace this with TimeUntilAvailableBudget() + // once ported from WIP code. For now, emulate PacedSender method. + + TimeDelta elapsed_time = pacer_->TimeElapsedSinceLastProcess(); + if (pacer_->IsPaused()) { + return std::max(PacingController::kPausedProcessInterval - elapsed_time, + TimeDelta::Zero()); + } + + auto next_probe = pacer_->TimeUntilNextProbe(); + if (next_probe) { + return *next_probe; + } + + const TimeDelta min_packet_limit = TimeDelta::ms(5); + return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()); + } + + SimulatedClock clock_; + ScopedFieldTrials field_trial_; + MockPacingControllerCallback callback_; + std::unique_ptr pacer_; +}; + +class PacingControllerFieldTrialTest + : public ::testing::TestWithParam { + protected: + struct MediaStream { + const RtpPacketToSend::Type type; + const uint32_t ssrc; + const size_t packet_size; + uint16_t seq_num; + }; + + const int kProcessIntervalsPerSecond = 1000 / 5; + + PacingControllerFieldTrialTest() : clock_(123456) {} + void InsertPacket(PacingController* pacer, MediaStream* stream) { + if (GetParam() == PacerMode::kReferencePackets) { + pacer->InsertPacket(PacketTypeToPriority(stream->type), stream->ssrc, + stream->seq_num++, clock_.TimeInMilliseconds(), + stream->packet_size, false); + } else { + pacer->EnqueuePacket( + BuildPacket(stream->type, stream->ssrc, stream->seq_num++, + clock_.TimeInMilliseconds(), stream->packet_size)); + } + } + void ProcessNext(PacingController* pacer) { + clock_.AdvanceTimeMilliseconds(5); + pacer->ProcessPackets(); + } + MediaStream audio{/*type*/ RtpPacketToSend::Type::kAudio, + /*ssrc*/ 3333, /*packet_size*/ 100, /*seq_num*/ 1000}; + MediaStream video{/*type*/ RtpPacketToSend::Type::kVideo, + /*ssrc*/ 4444, /*packet_size*/ 1000, /*seq_num*/ 1000}; + SimulatedClock clock_; + MockPacingControllerCallback callback_; +}; + +TEST_P(PacingControllerFieldTrialTest, DefaultNoPaddingInSilence) { + PacingController pacer(&clock_, &callback_, nullptr, nullptr); + pacer.SetPacingRates(kTargetRate, DataRate::Zero()); + // Video packet to reset last send time and provide padding data. + InsertPacket(&pacer, &video); + EXPECT_CALL(callback_, SendPacket).Times(1); + clock_.AdvanceTimeMilliseconds(5); + pacer.ProcessPackets(); + EXPECT_CALL(callback_, SendPadding).Times(0); + // Waiting 500 ms should not trigger sending of padding. + clock_.AdvanceTimeMilliseconds(500); + pacer.ProcessPackets(); +} + +TEST_P(PacingControllerFieldTrialTest, PaddingInSilenceWithTrial) { + ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) + + "WebRTC-Pacer-PadInSilence/Enabled/"); + PacingController pacer(&clock_, &callback_, nullptr, nullptr); + pacer.SetPacingRates(kTargetRate, DataRate::Zero()); + // Video packet to reset last send time and provide padding data. + InsertPacket(&pacer, &video); + if (GetParam() == PacerMode::kReferencePackets) { + // Only payload, not padding, sent by pacer in legacy mode. + EXPECT_CALL(callback_, SendPacket).Times(1); + } else { + EXPECT_CALL(callback_, SendPacket).Times(2); + } + clock_.AdvanceTimeMilliseconds(5); + pacer.ProcessPackets(); + EXPECT_CALL(callback_, SendPadding).WillOnce(Return(1000)); + // Waiting 500 ms should trigger sending of padding. + clock_.AdvanceTimeMilliseconds(500); + pacer.ProcessPackets(); +} + +TEST_P(PacingControllerFieldTrialTest, DefaultCongestionWindowAffectsAudio) { + EXPECT_CALL(callback_, SendPadding).Times(0); + PacingController pacer(&clock_, &callback_, nullptr, nullptr); + pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero()); + pacer.SetCongestionWindow(DataSize::bytes(800)); + pacer.UpdateOutstandingData(DataSize::Zero()); + // Video packet fills congestion window. + InsertPacket(&pacer, &video); + EXPECT_CALL(callback_, SendPacket).Times(1); + ProcessNext(&pacer); + // Audio packet blocked due to congestion. + InsertPacket(&pacer, &audio); + EXPECT_CALL(callback_, SendPacket).Times(0); + ProcessNext(&pacer); + ProcessNext(&pacer); + // Audio packet unblocked when congestion window clear. + ::testing::Mock::VerifyAndClearExpectations(&callback_); + pacer.UpdateOutstandingData(DataSize::Zero()); + EXPECT_CALL(callback_, SendPacket).Times(1); + ProcessNext(&pacer); +} + +TEST_P(PacingControllerFieldTrialTest, + CongestionWindowDoesNotAffectAudioInTrial) { + ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) + + "WebRTC-Pacer-BlockAudio/Disabled/"); + EXPECT_CALL(callback_, SendPadding).Times(0); + PacingController pacer(&clock_, &callback_, nullptr, nullptr); + pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero()); + pacer.SetCongestionWindow(DataSize::bytes(800)); + pacer.UpdateOutstandingData(DataSize::Zero()); + // Video packet fills congestion window. + InsertPacket(&pacer, &video); + EXPECT_CALL(callback_, SendPacket).Times(1); + ProcessNext(&pacer); + // Audio not blocked due to congestion. + InsertPacket(&pacer, &audio); + EXPECT_CALL(callback_, SendPacket).Times(1); + ProcessNext(&pacer); +} + +TEST_P(PacingControllerFieldTrialTest, DefaultBudgetAffectsAudio) { + PacingController pacer(&clock_, &callback_, nullptr, nullptr); + pacer.SetPacingRates( + DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond), + DataRate::Zero()); + // Video fills budget for following process periods. + InsertPacket(&pacer, &video); + EXPECT_CALL(callback_, SendPacket).Times(1); + ProcessNext(&pacer); + // Audio packet blocked due to budget limit. + EXPECT_CALL(callback_, SendPacket).Times(0); + InsertPacket(&pacer, &audio); + ProcessNext(&pacer); + ProcessNext(&pacer); + ::testing::Mock::VerifyAndClearExpectations(&callback_); + // Audio packet unblocked when the budget has recovered. + EXPECT_CALL(callback_, SendPacket).Times(1); + ProcessNext(&pacer); + ProcessNext(&pacer); +} + +TEST_P(PacingControllerFieldTrialTest, BudgetDoesNotAffectAudioInTrial) { + ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) + + "WebRTC-Pacer-BlockAudio/Disabled/"); + EXPECT_CALL(callback_, SendPadding).Times(0); + PacingController pacer(&clock_, &callback_, nullptr, nullptr); + pacer.SetPacingRates( + DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond), + DataRate::Zero()); + // Video fills budget for following process periods. + InsertPacket(&pacer, &video); + EXPECT_CALL(callback_, SendPacket).Times(1); + ProcessNext(&pacer); + // Audio packet not blocked due to budget limit. + EXPECT_CALL(callback_, SendPacket).Times(1); + InsertPacket(&pacer, &audio); + ProcessNext(&pacer); +} + +INSTANTIATE_TEST_SUITE_P(ReferencingAndOwningPackets, + PacingControllerFieldTrialTest, + ::testing::Values(PacerMode::kReferencePackets, + PacerMode::kOwnPackets)); + +TEST_P(PacingControllerTest, FirstSentPacketTimeIsSet) { + uint16_t sequence_number = 1234; + const uint32_t kSsrc = 12345; + const size_t kSizeBytes = 250; + const size_t kPacketToSend = 3; + const Timestamp kStartTime = clock_.CurrentTime(); + + // No packet sent. + EXPECT_FALSE(pacer_->FirstSentPacketTime().has_value()); + + for (size_t i = 0; i < kPacketToSend; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, kSsrc, sequence_number++, + clock_.TimeInMilliseconds(), kSizeBytes); + pacer_->ProcessPackets(); + clock_.AdvanceTime(TimeUntilNextProcess()); + } + EXPECT_EQ(kStartTime, pacer_->FirstSentPacketTime()); +} + +TEST_P(PacingControllerTest, QueuePacket) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + + int64_t queued_packet_timestamp = clock_.TimeInMilliseconds(); + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, + queued_packet_timestamp, 250); + EXPECT_EQ(packets_to_send + 1, pacer_->QueueSizePackets()); + pacer_->ProcessPackets(); + EXPECT_CALL(callback_, SendPadding).Times(0); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(1u, pacer_->QueueSizePackets()); + EXPECT_CALL(callback_, SendPacket(ssrc, sequence_number++, + queued_packet_timestamp, false, false)) + .Times(1); + pacer_->ProcessPackets(); + sequence_number++; + EXPECT_EQ(0u, pacer_->QueueSizePackets()); + + // We can send packets_to_send -1 packets of size 250 during the current + // interval since one packet has already been sent. + for (size_t i = 0; i < packets_to_send - 1; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + EXPECT_EQ(packets_to_send, pacer_->QueueSizePackets()); + pacer_->ProcessPackets(); + EXPECT_EQ(1u, pacer_->QueueSizePackets()); +} + +TEST_P(PacingControllerTest, PaceQueuedPackets) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + + for (size_t j = 0; j < packets_to_send_per_interval * 10; ++j) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + EXPECT_EQ(packets_to_send_per_interval + packets_to_send_per_interval * 10, + pacer_->QueueSizePackets()); + pacer_->ProcessPackets(); + EXPECT_EQ(packets_to_send_per_interval * 10, pacer_->QueueSizePackets()); + EXPECT_CALL(callback_, SendPadding).Times(0); + for (int k = 0; k < 10; ++k) { + clock_.AdvanceTime(TimeUntilNextProcess()); + EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, false)) + .Times(packets_to_send_per_interval); + pacer_->ProcessPackets(); + } + EXPECT_EQ(0u, pacer_->QueueSizePackets()); + clock_.AdvanceTime(TimeUntilNextProcess()); + EXPECT_EQ(0u, pacer_->QueueSizePackets()); + pacer_->ProcessPackets(); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, + clock_.TimeInMilliseconds(), 250); + pacer_->ProcessPackets(); + EXPECT_EQ(1u, pacer_->QueueSizePackets()); +} + +TEST_P(PacingControllerTest, RepeatedRetransmissionsAllowed) { + // Send one packet, then two retransmissions of that packet. + for (size_t i = 0; i < 3; i++) { + constexpr uint32_t ssrc = 333; + constexpr uint16_t sequence_number = 444; + constexpr size_t bytes = 250; + bool is_retransmission = (i != 0); // Original followed by retransmissions. + SendAndExpectPacket( + is_retransmission ? RtpPacketToSend::Type::kRetransmission + : RtpPacketToSend::Type::kVideo, + ssrc, sequence_number, clock_.TimeInMilliseconds(), bytes); + clock_.AdvanceTimeMilliseconds(5); + } + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, + CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, + clock_.TimeInMilliseconds(), 250); + + // Expect packet on second ssrc to be queued and sent as well. + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc + 1, sequence_number, + clock_.TimeInMilliseconds(), 250); + + clock_.AdvanceTimeMilliseconds(1000); + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, Padding) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + // No padding is expected since we have sent too much already. + EXPECT_CALL(callback_, SendPadding).Times(0); + pacer_->ProcessPackets(); + EXPECT_EQ(0u, pacer_->QueueSizePackets()); + + // 5 milliseconds later should not send padding since we filled the buffers + // initially. + EXPECT_CALL(callback_, SendPadding(250)).Times(0); + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + + // 5 milliseconds later we have enough budget to send some padding. + EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250)); + ExpectSendPadding(); + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, NoPaddingBeforeNormalPacket) { + pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); + + EXPECT_CALL(callback_, SendPadding).Times(0); + pacer_->ProcessPackets(); + clock_.AdvanceTime(TimeUntilNextProcess()); + + pacer_->ProcessPackets(); + clock_.AdvanceTime(TimeUntilNextProcess()); + + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + capture_time_ms, 250); + EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250)); + ExpectSendPadding(); + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, VerifyPaddingUpToBitrate) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + const int kTimeStep = 5; + const int64_t kBitrateWindow = 100; + pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); + + int64_t start_time = clock_.TimeInMilliseconds(); + while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + capture_time_ms, 250); + EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250)); + ExpectSendPadding(); + pacer_->ProcessPackets(); + clock_.AdvanceTimeMilliseconds(kTimeStep); + } +} + +TEST_P(PacingControllerTest, VerifyAverageBitrateVaryingMediaPayload) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + const int kTimeStep = 5; + const int64_t kBitrateWindow = 10000; + PacingControllerPadding callback; + pacer_ = + absl::make_unique(&clock_, &callback, nullptr, nullptr); + pacer_->SetProbingEnabled(false); + pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); + + int64_t start_time = clock_.TimeInMilliseconds(); + size_t media_bytes = 0; + while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { + int rand_value = rand(); // NOLINT (rand_r instead of rand) + size_t media_payload = rand_value % 100 + 200; // [200, 300] bytes. + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + capture_time_ms, media_payload); + media_bytes += media_payload; + clock_.AdvanceTimeMilliseconds(kTimeStep); + pacer_->ProcessPackets(); + } + EXPECT_NEAR(kTargetRate.kbps(), + static_cast(8 * (media_bytes + callback.padding_sent()) / + kBitrateWindow), + 1); +} + +TEST_P(PacingControllerTest, Priority) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + int64_t capture_time_ms_low_priority = 1234567; + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kRetransmission, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), 250); + } + pacer_->ProcessPackets(); + EXPECT_EQ(0u, pacer_->QueueSizePackets()); + + // Expect normal and low priority to be queued and high to pass through. + Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++, + capture_time_ms_low_priority, 250); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++, + capture_time_ms, 250); + } + Send(RtpPacketToSend::Type::kAudio, ssrc, sequence_number++, capture_time_ms, + 250); + + // Expect all high and normal priority to be sent out first. + EXPECT_CALL(callback_, SendPadding).Times(0); + EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, _, _)) + .Times(packets_to_send_per_interval + 1); + + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + EXPECT_EQ(1u, pacer_->QueueSizePackets()); + + EXPECT_CALL(callback_, SendPacket(ssrc_low_priority, _, + capture_time_ms_low_priority, _, _)) + .Times(1); + + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, RetransmissionPriority) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 45678; + int64_t capture_time_ms_retransmission = 56789; + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + pacer_->ProcessPackets(); + EXPECT_EQ(0u, pacer_->QueueSizePackets()); + + // Alternate retransmissions and normal packets. + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + capture_time_ms, 250); + Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++, + capture_time_ms_retransmission, 250); + } + EXPECT_EQ(2 * packets_to_send_per_interval, pacer_->QueueSizePackets()); + + // Expect all retransmissions to be sent out first despite having a later + // capture time. + EXPECT_CALL(callback_, SendPadding).Times(0); + EXPECT_CALL(callback_, SendPacket(_, _, _, false, _)).Times(0); + EXPECT_CALL(callback_, + SendPacket(ssrc, _, capture_time_ms_retransmission, true, _)) + .Times(packets_to_send_per_interval); + + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + EXPECT_EQ(packets_to_send_per_interval, pacer_->QueueSizePackets()); + + // Expect the remaining (non-retransmission) packets to be sent. + EXPECT_CALL(callback_, SendPadding).Times(0); + EXPECT_CALL(callback_, SendPacket(_, _, _, true, _)).Times(0); + EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, false, _)) + .Times(packets_to_send_per_interval); + + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + + EXPECT_EQ(0u, pacer_->QueueSizePackets()); +} + +TEST_P(PacingControllerTest, HighPrioDoesntAffectBudget) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + + // As high prio packets doesn't affect the budget, we should be able to send + // a high number of them at once. + for (int i = 0; i < 25; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kAudio, ssrc, sequence_number++, + capture_time_ms, 250); + } + pacer_->ProcessPackets(); + // Low prio packets does affect the budget. + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, capture_time_ms, + 250); + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + EXPECT_EQ(1u, pacer_->QueueSizePackets()); + EXPECT_CALL(callback_, + SendPacket(ssrc, sequence_number++, capture_time_ms, false, _)) + .Times(1); + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + EXPECT_EQ(0u, pacer_->QueueSizePackets()); +} + +TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) { + uint32_t ssrc = 202020; + uint16_t sequence_number = 1000; + int kPacketSize = 250; + int kCongestionWindow = kPacketSize * 10; + + pacer_->UpdateOutstandingData(DataSize::Zero()); + pacer_->SetCongestionWindow(DataSize::bytes(kCongestionWindow)); + int sent_data = 0; + while (sent_data < kCongestionWindow) { + sent_data += kPacketSize; + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + } + ::testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, SendPacket).Times(0); + EXPECT_CALL(callback_, SendPadding).Times(0); + + size_t blocked_packets = 0; + int64_t expected_time_until_padding = 500; + while (expected_time_until_padding > 5) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + blocked_packets++; + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + expected_time_until_padding -= 5; + } + ::testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); + ExpectSendPadding(); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + EXPECT_EQ(blocked_packets, pacer_->QueueSizePackets()); +} + +TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) { + uint32_t ssrc = 202020; + uint16_t seq_num = 1000; + int size = 1000; + auto now_ms = [this] { return clock_.TimeInMilliseconds(); }; + EXPECT_CALL(callback_, SendPadding).Times(0); + // The pacing rate is low enough that the budget should not allow two packets + // to be sent in a row. + pacer_->SetPacingRates(DataRate::bps(400 * 8 * 1000 / 5), DataRate::Zero()); + // The congestion window is small enough to only let one packet through. + pacer_->SetCongestionWindow(DataSize::bytes(800)); + pacer_->UpdateOutstandingData(DataSize::Zero()); + // Not yet budget limited or congested, packet is sent. + Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); + EXPECT_CALL(callback_, SendPacket).Times(1); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + // Packet blocked due to congestion. + Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); + EXPECT_CALL(callback_, SendPacket).Times(0); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + // Packet blocked due to congestion. + Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); + EXPECT_CALL(callback_, SendPacket).Times(0); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + pacer_->UpdateOutstandingData(DataSize::Zero()); + // Congestion removed and budget has recovered, packet is sent. + Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); + EXPECT_CALL(callback_, SendPacket).Times(1); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + pacer_->UpdateOutstandingData(DataSize::Zero()); + // Should be blocked due to budget limitation as congestion has be removed. + Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); + EXPECT_CALL(callback_, SendPacket).Times(0); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, ResumesSendingWhenCongestionEnds) { + uint32_t ssrc = 202020; + uint16_t sequence_number = 1000; + int64_t kPacketSize = 250; + int64_t kCongestionCount = 10; + int64_t kCongestionWindow = kPacketSize * kCongestionCount; + int64_t kCongestionTimeMs = 1000; + + pacer_->UpdateOutstandingData(DataSize::Zero()); + pacer_->SetCongestionWindow(DataSize::bytes(kCongestionWindow)); + int sent_data = 0; + while (sent_data < kCongestionWindow) { + sent_data += kPacketSize; + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + } + ::testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, SendPacket).Times(0); + int unacked_packets = 0; + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + unacked_packets++; + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + } + ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // First mark half of the congested packets as cleared and make sure that just + // as many are sent + int ack_count = kCongestionCount / 2; + EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count); + pacer_->UpdateOutstandingData( + DataSize::bytes(kCongestionWindow - kPacketSize * ack_count)); + + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + } + unacked_packets -= ack_count; + ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // Second make sure all packets are sent if sent packets are continuously + // marked as acked. + EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)) + .Times(unacked_packets); + for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { + pacer_->UpdateOutstandingData(DataSize::Zero()); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + } +} + +TEST_P(PacingControllerTest, Pause) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint32_t ssrc_high_priority = 12347; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250); + } + + pacer_->ProcessPackets(); + + pacer_->Pause(); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++, + capture_time_ms, 250); + Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++, + capture_time_ms, 250); + Send(RtpPacketToSend::Type::kAudio, ssrc_high_priority, sequence_number++, + capture_time_ms, 250); + } + clock_.AdvanceTimeMilliseconds(10000); + int64_t second_capture_time_ms = clock_.TimeInMilliseconds(); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++, + second_capture_time_ms, 250); + Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++, + second_capture_time_ms, 250); + Send(RtpPacketToSend::Type::kAudio, ssrc_high_priority, sequence_number++, + second_capture_time_ms, 250); + } + + // Expect everything to be queued. + EXPECT_EQ(TimeDelta::ms(second_capture_time_ms - capture_time_ms), + pacer_->OldestPacketWaitTime()); + + EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); + ExpectSendPadding(); + pacer_->ProcessPackets(); + + int64_t expected_time_until_send = 500; + EXPECT_CALL(callback_, SendPadding).Times(0); + while (expected_time_until_send >= 5) { + pacer_->ProcessPackets(); + clock_.AdvanceTimeMilliseconds(5); + expected_time_until_send -= 5; + } + + ::testing::Mock::VerifyAndClearExpectations(&callback_); + EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); + ExpectSendPadding(); + clock_.AdvanceTimeMilliseconds(5); + pacer_->ProcessPackets(); + ::testing::Mock::VerifyAndClearExpectations(&callback_); + + // Expect high prio packets to come out first followed by normal + // prio packets and low prio packets (all in capture order). + { + ::testing::InSequence sequence; + EXPECT_CALL(callback_, + SendPacket(ssrc_high_priority, _, capture_time_ms, _, _)) + .Times(packets_to_send_per_interval); + EXPECT_CALL(callback_, + SendPacket(ssrc_high_priority, _, second_capture_time_ms, _, _)) + .Times(packets_to_send_per_interval); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, _, _)) + .Times(1); + } + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, SendPacket(ssrc, _, second_capture_time_ms, _, _)) + .Times(1); + } + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, + SendPacket(ssrc_low_priority, _, capture_time_ms, _, _)) + .Times(1); + } + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, SendPacket(ssrc_low_priority, _, + second_capture_time_ms, _, _)) + .Times(1); + } + } + pacer_->Resume(); + + // The pacer was resumed directly after the previous process call finished. It + // will therefore wait 5 ms until next process. + clock_.AdvanceTime(TimeUntilNextProcess()); + + for (size_t i = 0; i < 4; i++) { + pacer_->ProcessPackets(); + clock_.AdvanceTime(TimeUntilNextProcess()); + } + + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); +} + +TEST_P(PacingControllerTest, ResendPacket) { + if (GetParam() == PacerMode::kOwnPackets) { + // This test only makes sense when re-sending is supported. + return; + } + + MockPacketSender callback; + + // Need to initialize PacedSender after we initialize clock. + pacer_ = + absl::make_unique(&clock_, &callback, nullptr, nullptr); + Init(); + + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + + pacer_->InsertPacket(RtpPacketSender::kNormalPriority, ssrc, sequence_number, + capture_time_ms, 250, false); + clock_.AdvanceTimeMilliseconds(1); + pacer_->InsertPacket(RtpPacketSender::kNormalPriority, ssrc, + sequence_number + 1, capture_time_ms + 1, 250, false); + clock_.AdvanceTimeMilliseconds(9999); + EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms), + pacer_->OldestPacketWaitTime()); + // Fails to send first packet so only one call. + EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms, + false, _)) + .Times(1) + .WillOnce(Return(RtpPacketSendResult::kTransportUnavailable)); + clock_.AdvanceTimeMilliseconds(10000); + pacer_->ProcessPackets(); + + // Queue remains unchanged. + EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms), + pacer_->OldestPacketWaitTime()); + + // Fails to send second packet. + EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms, + false, _)) + .WillOnce(Return(RtpPacketSendResult::kSuccess)); + EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number + 1, + capture_time_ms + 1, false, _)) + .WillOnce(Return(RtpPacketSendResult::kTransportUnavailable)); + clock_.AdvanceTimeMilliseconds(10000); + pacer_->ProcessPackets(); + + // Queue is reduced by 1 packet. + EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms - 1), + pacer_->OldestPacketWaitTime()); + + // Send second packet and queue becomes empty. + EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number + 1, + capture_time_ms + 1, false, _)) + .WillOnce(Return(RtpPacketSendResult::kSuccess)); + clock_.AdvanceTimeMilliseconds(10000); + pacer_->ProcessPackets(); + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); +} + +TEST_P(PacingControllerTest, ExpectedQueueTimeMs) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kNumPackets = 60; + const size_t kPacketSize = 1200; + const int32_t kMaxBitrate = kPaceMultiplier * 30000; + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + + pacer_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier), + DataRate::Zero()); + for (size_t i = 0; i < kNumPackets; ++i) { + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + } + + // Queue in ms = 1000 * (bytes in queue) *8 / (bits per second) + TimeDelta queue_time = + TimeDelta::ms(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate); + EXPECT_EQ(queue_time, pacer_->ExpectedQueueTime()); + + const Timestamp time_start = clock_.CurrentTime(); + while (pacer_->QueueSizePackets() > 0) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } + TimeDelta duration = clock_.CurrentTime() - time_start; + + EXPECT_EQ(TimeDelta::Zero(), pacer_->ExpectedQueueTime()); + + // Allow for aliasing, duration should be within one pack of max time limit. + const TimeDelta deviation = + duration - PacingController::kMaxExpectedQueueLength; + EXPECT_LT(deviation.Abs(), + TimeDelta::ms(1000 * kPacketSize * 8 / kMaxBitrate)); +} + +TEST_P(PacingControllerTest, QueueTimeGrowsOverTime) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); + + pacer_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier), + DataRate::Zero()); + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, + clock_.TimeInMilliseconds(), 1200); + + clock_.AdvanceTimeMilliseconds(500); + EXPECT_EQ(TimeDelta::ms(500), pacer_->OldestPacketWaitTime()); + pacer_->ProcessPackets(); + EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime()); +} + +TEST_P(PacingControllerTest, ProbingWithInsertedPackets) { + const size_t kPacketSize = 1200; + const int kInitialBitrateBps = 300000; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + + PacingControllerProbing packet_sender; + pacer_ = absl::make_unique(&clock_, &packet_sender, nullptr, + nullptr); + pacer_->CreateProbeCluster(kFirstClusterRate, + /*cluster_id=*/0); + pacer_->CreateProbeCluster(kSecondClusterRate, + /*cluster_id=*/1); + pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier), + DataRate::Zero()); + + for (int i = 0; i < 10; ++i) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + } + + int64_t start = clock_.TimeInMilliseconds(); + while (packet_sender.packets_sent() < 5) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } + int packets_sent = packet_sender.packets_sent(); + // Validate first cluster bitrate. Note that we have to account for number + // of intervals and hence (packets_sent - 1) on the first cluster. + EXPECT_NEAR((packets_sent - 1) * kPacketSize * 8000 / + (clock_.TimeInMilliseconds() - start), + kFirstClusterRate.bps(), kProbingErrorMargin.bps()); + EXPECT_EQ(0, packet_sender.padding_sent()); + + clock_.AdvanceTime(TimeUntilNextProcess()); + start = clock_.TimeInMilliseconds(); + while (packet_sender.packets_sent() < 10) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } + packets_sent = packet_sender.packets_sent() - packets_sent; + // Validate second cluster bitrate. + EXPECT_NEAR((packets_sent - 1) * kPacketSize * 8000 / + (clock_.TimeInMilliseconds() - start), + kSecondClusterRate.bps(), kProbingErrorMargin.bps()); +} + +TEST_P(PacingControllerTest, ProbingWithPaddingSupport) { + const size_t kPacketSize = 1200; + const int kInitialBitrateBps = 300000; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + + PacingControllerProbing packet_sender; + pacer_ = absl::make_unique(&clock_, &packet_sender, nullptr, + nullptr); + pacer_->CreateProbeCluster(kFirstClusterRate, + /*cluster_id=*/0); + pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier), + DataRate::Zero()); + + for (int i = 0; i < 3; ++i) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + } + + int64_t start = clock_.TimeInMilliseconds(); + int process_count = 0; + while (process_count < 5) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + ++process_count; + } + int packets_sent = packet_sender.packets_sent(); + int padding_sent = packet_sender.padding_sent(); + EXPECT_GT(packets_sent, 0); + EXPECT_GT(padding_sent, 0); + // Note that the number of intervals here for kPacketSize is + // packets_sent due to padding in the same cluster. + EXPECT_NEAR((packets_sent * kPacketSize * 8000 + padding_sent) / + (clock_.TimeInMilliseconds() - start), + kFirstClusterRate.bps(), kProbingErrorMargin.bps()); +} + +TEST_P(PacingControllerTest, PaddingOveruse) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + + pacer_->ProcessPackets(); + pacer_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier), + DataRate::Zero()); + + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + pacer_->ProcessPackets(); + + // Add 30kbit padding. When increasing budget, media budget will increase from + // negative (overuse) while padding budget will increase from 0. + clock_.AdvanceTimeMilliseconds(5); + pacer_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier), + DataRate::bps(30000)); + + SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + EXPECT_LT(TimeDelta::ms(5), pacer_->ExpectedQueueTime()); + // Don't send padding if queue is non-empty, even if padding budget > 0. + EXPECT_CALL(callback_, SendPadding).Times(0); + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, ProbeClusterId) { + MockPacketSender callback; + + pacer_ = + absl::make_unique(&clock_, &callback, nullptr, nullptr); + Init(); + + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + + pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate); + pacer_->SetProbingEnabled(true); + for (int i = 0; i < 10; ++i) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + } + + // First probing cluster. + if (GetParam() == PacerMode::kReferencePackets) { + EXPECT_CALL(callback, + TimeToSendPacket(_, _, _, _, + Field(&PacedPacketInfo::probe_cluster_id, 0))) + .Times(5) + .WillRepeatedly(Return(RtpPacketSendResult::kSuccess)); + } else { + EXPECT_CALL(callback, + SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 0))) + .Times(5); + } + + for (int i = 0; i < 5; ++i) { + clock_.AdvanceTimeMilliseconds(20); + pacer_->ProcessPackets(); + } + + // Second probing cluster. + if (GetParam() == PacerMode::kReferencePackets) { + EXPECT_CALL(callback, + TimeToSendPacket(_, _, _, _, + Field(&PacedPacketInfo::probe_cluster_id, 1))) + .Times(5) + .WillRepeatedly(Return(RtpPacketSendResult::kSuccess)); + EXPECT_CALL(callback, TimeToSendPadding).WillOnce(Return(DataSize::Zero())); + } else { + EXPECT_CALL(callback, + SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 1))) + .Times(5); + } + + for (int i = 0; i < 5; ++i) { + clock_.AdvanceTimeMilliseconds(20); + pacer_->ProcessPackets(); + } + + // Needed for the Field comparer below. + const int kNotAProbe = PacedPacketInfo::kNotAProbe; + // No more probing packets. + if (GetParam() == PacerMode::kReferencePackets) { + EXPECT_CALL(callback, + TimeToSendPadding( + _, Field(&PacedPacketInfo::probe_cluster_id, kNotAProbe))) + .WillOnce(Return(DataSize::bytes(500))); + } else { + EXPECT_CALL(callback, GeneratePadding).WillOnce([&](DataSize padding_size) { + std::vector> padding_packets; + padding_packets.emplace_back( + BuildPacket(RtpPacketToSend::Type::kPadding, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), padding_size.bytes())); + return padding_packets; + }); + EXPECT_CALL( + callback, + SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, kNotAProbe))) + .Times(1); + } + pacer_->ProcessPackets(); +} + +TEST_P(PacingControllerTest, OwnedPacketPrioritizedOnType) { + if (GetParam() != PacerMode::kOwnPackets) { + // This test only makes sense when using the new code path. + return; + } + + MockPacketSender callback; + pacer_ = + absl::make_unique(&clock_, &callback, nullptr, nullptr); + Init(); + + // Insert a packet of each type, from low to high priority. Since priority + // is weighted higher than insert order, these should come out of the pacer + // in backwards order with the exception of FEC and Video. + for (RtpPacketToSend::Type type : + {RtpPacketToSend::Type::kPadding, + RtpPacketToSend::Type::kForwardErrorCorrection, + RtpPacketToSend::Type::kVideo, RtpPacketToSend::Type::kRetransmission, + RtpPacketToSend::Type::kAudio}) { + pacer_->EnqueuePacket(BuildRtpPacket(type)); + } + + ::testing::InSequence seq; + EXPECT_CALL( + callback, + SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _)); + EXPECT_CALL(callback, + SendRtpPacket( + Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); + + // FEC and video actually have the same priority, so will come out in + // insertion order. + EXPECT_CALL(callback, + SendRtpPacket( + Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _)); + EXPECT_CALL( + callback, + SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _)); + + EXPECT_CALL(callback, + SendRtpPacket( + Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); + + clock_.AdvanceTimeMilliseconds(200); + pacer_->ProcessPackets(); +} + +INSTANTIATE_TEST_SUITE_P(ReferencingAndOwningPackets, + PacingControllerTest, + ::testing::Values(PacerMode::kReferencePackets, + PacerMode::kOwnPackets)); + +} // namespace test +} // namespace webrtc