diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 3cea1a3fa8..5ce38a3936 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -23,8 +23,6 @@ rtc_library("pacing") { "packet_router.h", "prioritized_packet_queue.cc", "prioritized_packet_queue.h", - "round_robin_packet_queue.cc", - "round_robin_packet_queue.h", "rtp_packet_pacer.h", "task_queue_paced_sender.cc", "task_queue_paced_sender.h", @@ -91,7 +89,6 @@ if (rtc_include_tests) { "pacing_controller_unittest.cc", "packet_router_unittest.cc", "prioritized_packet_queue_unittest.cc", - "round_robin_packet_queue_unittest.cc", "task_queue_paced_sender_unittest.cc", ] deps = [ diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index cdd908c9f8..942d4abce5 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -18,8 +18,6 @@ #include "absl/strings/match.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" -#include "modules/pacing/prioritized_packet_queue.h" -#include "modules/pacing/round_robin_packet_queue.h" #include "rtc_base/checks.h" #include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/logging.h" @@ -45,15 +43,6 @@ bool IsEnabled(const FieldTrialsView& field_trials, absl::string_view key) { return absl::StartsWith(field_trials.Lookup(key), "Enabled"); } -std::unique_ptr CreatePacketQueue( - const FieldTrialsView& field_trials, - Timestamp creation_time) { - if (field_trials.IsDisabled("WebRTC-Pacer-UsePrioritizedPacketQueue")) { - return std::make_unique(creation_time); - } - return std::make_unique(creation_time); -} - } // namespace const TimeDelta PacingController::kMaxExpectedQueueLength = @@ -93,7 +82,7 @@ PacingController::PacingController(Clock* clock, last_process_time_(clock->CurrentTime()), last_send_time_(last_process_time_), seen_first_packet_(false), - packet_queue_(CreatePacketQueue(field_trials_, last_process_time_)), + packet_queue_(/*creation_time=*/last_process_time_), congested_(false), queue_time_limit_(kMaxExpectedQueueLength), account_for_audio_(false), @@ -130,14 +119,14 @@ void PacingController::Pause() { if (!paused_) RTC_LOG(LS_INFO) << "PacedSender paused."; paused_ = true; - packet_queue_->SetPauseState(true, CurrentTime()); + packet_queue_.SetPauseState(true, CurrentTime()); } void PacingController::Resume() { if (paused_) RTC_LOG(LS_INFO) << "PacedSender resumed."; paused_ = false; - packet_queue_->SetPauseState(false, CurrentTime()); + packet_queue_.SetPauseState(false, CurrentTime()); } bool PacingController::IsPaused() const { @@ -207,7 +196,7 @@ void PacingController::EnqueuePacket(std::unique_ptr packet) { prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size())); const Timestamp now = CurrentTime(); - if (packet_queue_->Empty()) { + if (packet_queue_.Empty()) { // If queue is empty, we need to "fast-forward" the last process time, // so that we don't use passed time as budget for sending the first new // packet. @@ -220,7 +209,7 @@ void PacingController::EnqueuePacket(std::unique_ptr packet) { } UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time)); } - packet_queue_->Push(now, std::move(packet)); + packet_queue_.Push(now, std::move(packet)); seen_first_packet_ = true; // Queue length has increased, check if we need to change the pacing rate. @@ -251,18 +240,18 @@ TimeDelta PacingController::ExpectedQueueTime() const { } size_t PacingController::QueueSizePackets() const { - return rtc::checked_cast(packet_queue_->SizeInPackets()); + return rtc::checked_cast(packet_queue_.SizeInPackets()); } const std::array& PacingController::SizeInPacketsPerRtpPacketMediaType() const { - return packet_queue_->SizeInPacketsPerRtpPacketMediaType(); + return packet_queue_.SizeInPacketsPerRtpPacketMediaType(); } DataSize PacingController::QueueSizeData() const { - DataSize size = packet_queue_->SizeInPayloadBytes(); + DataSize size = packet_queue_.SizeInPayloadBytes(); if (include_overhead_) { - size += static_cast(packet_queue_->SizeInPackets()) * + size += static_cast(packet_queue_.SizeInPackets()) * transport_overhead_per_packet_; } return size; @@ -277,7 +266,7 @@ absl::optional PacingController::FirstSentPacketTime() const { } Timestamp PacingController::OldestPacketEnqueueTime() const { - return packet_queue_->OldestEnqueueTime(); + return packet_queue_.OldestEnqueueTime(); } TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { @@ -328,7 +317,7 @@ Timestamp PacingController::NextSendTime() const { // time is the time at which it was enqueued. Timestamp unpaced_audio_time = pace_audio_ ? Timestamp::PlusInfinity() - : packet_queue_->LeadingAudioPacketEnqueueTime(); + : packet_queue_.LeadingAudioPacketEnqueueTime(); if (unpaced_audio_time.IsFinite()) { return unpaced_audio_time; } @@ -338,7 +327,7 @@ Timestamp PacingController::NextSendTime() const { return last_send_time_ + kCongestedPacketInterval; } - if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) { + if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { // If packets are allowed to be sent in a burst, the // debt is allowed to grow up to one packet more than what can be sent // during 'send_burst_period_'. @@ -346,7 +335,7 @@ Timestamp PacingController::NextSendTime() const { next_send_time = last_process_time_ + ((send_burst_interval_ > drain_time) ? TimeDelta::Zero() : drain_time); - } else if (padding_rate_ > DataRate::Zero() && packet_queue_->Empty()) { + } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { // If we _don't_ have pending packets, check how long until we have // bandwidth for padding packets. Both media and padding debts must // have been drained to do this. @@ -539,7 +528,7 @@ void PacingController::ProcessPackets() { DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, DataSize data_sent) const { - if (!packet_queue_->Empty()) { + if (!packet_queue_.Empty()) { // Actual payload available, no need to add padding. return DataSize::Zero(); } @@ -588,7 +577,7 @@ std::unique_ptr PacingController::GetPendingPacket( } } - if (packet_queue_->Empty()) { + if (packet_queue_.Empty()) { return nullptr; } @@ -596,7 +585,7 @@ std::unique_ptr PacingController::GetPendingPacket( // Unpaced audio packets and probes are exempted from send checks. bool unpaced_audio_packet = - !pace_audio_ && packet_queue_->LeadingAudioPacketEnqueueTime().IsFinite(); + !pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().IsFinite(); if (!unpaced_audio_packet && !is_probe) { if (congested_) { // Don't send anything if congested. @@ -616,7 +605,7 @@ std::unique_ptr PacingController::GetPendingPacket( } } - return packet_queue_->Pop(); + return packet_queue_.Pop(); } void PacingController::OnPacketSent(RtpPacketMediaType packet_type, @@ -665,10 +654,10 @@ void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) { // Assuming equal size packets and input/output rate, the average packet // has avg_time_left_ms left to get queue_size_bytes out of the queue, if // time constraint shall be met. Determine bitrate needed for that. - packet_queue_->UpdateAverageQueueTime(now); + packet_queue_.UpdateAverageQueueTime(now); TimeDelta avg_time_left = std::max(TimeDelta::Millis(1), - queue_time_limit_ - packet_queue_->AverageQueueTime()); + queue_time_limit_ - packet_queue_.AverageQueueTime()); DataRate min_rate_needed = queue_size_data / avg_time_left; if (min_rate_needed > pacing_rate_) { adjusted_media_rate_ = min_rate_needed; diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index c0a69266a0..5689780f17 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -26,6 +26,7 @@ #include "api/transport/network_types.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" +#include "modules/pacing/prioritized_packet_queue.h" #include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/include/rtp_packet_sender.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -53,52 +54,6 @@ class PacingController { DataSize size) = 0; }; - // Interface for class hanlding storage of and prioritization of packets - // pending to be sent by the pacer. - // Note that for the methods taking a Timestamp as parameter, the parameter - // will never decrease between two subsequent calls. - class PacketQueue { - public: - virtual ~PacketQueue() = default; - - virtual void Push(Timestamp enqueue_time, - std::unique_ptr packet) = 0; - virtual std::unique_ptr Pop() = 0; - - virtual int SizeInPackets() const = 0; - bool Empty() const { return SizeInPackets() == 0; } - virtual DataSize SizeInPayloadBytes() const = 0; - - // Total packets in the queue per media type (RtpPacketMediaType values are - // used as lookup index). - virtual const std::array& - SizeInPacketsPerRtpPacketMediaType() const = 0; - - // If the next packet, that would be returned by Pop() if called - // now, is an audio packet this method returns the enqueue time - // of that packet. If queue is empty or top packet is not audio, - // returns Timestamp::MinusInfinity(). - virtual Timestamp LeadingAudioPacketEnqueueTime() const = 0; - - // Enqueue time of the oldest packet in the queue, - // Timestamp::MinusInfinity() if queue is empty. - virtual Timestamp OldestEnqueueTime() const = 0; - - // Average queue time for the packets currently in the queue. - // The queuing time is calculated from Push() to the last UpdateQueueTime() - // call - with any time spent in a paused state subtracted. - // Returns TimeDelta::Zero() for an empty queue. - virtual TimeDelta AverageQueueTime() const = 0; - - // Called during packet processing or when pause stats changes. Since the - // AverageQueueTime() method does not look at the wall time, this method - // needs to be called before querying queue time. - virtual void UpdateAverageQueueTime(Timestamp now) = 0; - - // Set the pause state, while `paused` is true queuing time is not counted. - virtual void SetPauseState(bool paused, Timestamp now) = 0; - }; - // Expected max pacer delay. If ExpectedQueueTime() is higher than // this value, the packet producers should wait (eg drop frames rather than // encoding them). Bitrate sent may temporarily exceed target set by @@ -260,7 +215,7 @@ class PacingController { absl::optional first_sent_packet_time_; bool seen_first_packet_; - std::unique_ptr packet_queue_; + PrioritizedPacketQueue packet_queue_; bool congested_; diff --git a/modules/pacing/prioritized_packet_queue.cc b/modules/pacing/prioritized_packet_queue.cc index 83ec77da28..261d3132cb 100644 --- a/modules/pacing/prioritized_packet_queue.cc +++ b/modules/pacing/prioritized_packet_queue.cc @@ -216,6 +216,10 @@ DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const { return size_payload_; } +bool PrioritizedPacketQueue::Empty() const { + return size_packets_ == 0; +} + const std::array& PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const { return size_packets_per_media_type_; diff --git a/modules/pacing/prioritized_packet_queue.h b/modules/pacing/prioritized_packet_queue.h index c770435aa1..f730a37d93 100644 --- a/modules/pacing/prioritized_packet_queue.h +++ b/modules/pacing/prioritized_packet_queue.h @@ -21,29 +21,63 @@ #include "api/units/data_size.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" -#include "modules/pacing/pacing_controller.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" namespace webrtc { -class PrioritizedPacketQueue : public PacingController::PacketQueue { +class PrioritizedPacketQueue { public: explicit PrioritizedPacketQueue(Timestamp creation_time); PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete; PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete; - void Push(Timestamp enqueue_time, - std::unique_ptr packet) override; - std::unique_ptr Pop() override; - int SizeInPackets() const override; - DataSize SizeInPayloadBytes() const override; + // Add a packet to the queue. The enqueue time is used for queue time stats + // and to report the leading packet enqueue time per packet type. + void Push(Timestamp enqueue_time, std::unique_ptr packet); + + // Remove the next packet from the queue. Packets a prioritized first + // according to packet type, in the following order: + // - audio, retransmissions, video / fec, padding + // For each packet type, we use one FIFO-queue per SSRC and emit from + // those queues in a round-robin fashion. + std::unique_ptr Pop(); + + // Number of packets in the queue. + int SizeInPackets() const; + + // Sum of all payload bytes in the queue, where the payload is calculated + // as `packet->payload_size() + packet->padding_size()`. + DataSize SizeInPayloadBytes() const; + + // Convenience method for `SizeInPackets() == 0`. + bool Empty() const; + + // Total packets in the queue per media type (RtpPacketMediaType values are + // used as lookup index). const std::array& SizeInPacketsPerRtpPacketMediaType() - const override; - Timestamp LeadingAudioPacketEnqueueTime() const override; - Timestamp OldestEnqueueTime() const override; - TimeDelta AverageQueueTime() const override; - void UpdateAverageQueueTime(Timestamp now) override; - void SetPauseState(bool paused, Timestamp now) override; + const; + + // The enqueue time of the next audio packet this queue will return via the + // Pop() method. If queue has no audio packets, returns MinusInfinity(). + Timestamp LeadingAudioPacketEnqueueTime() const; + + // Enqueue time of the oldest packet in the queue, + // Timestamp::MinusInfinity() if queue is empty. + Timestamp OldestEnqueueTime() const; + + // Average queue time for the packets currently in the queue. + // The queuing time is calculated from Push() to the last UpdateQueueTime() + // call - with any time spent in a paused state subtracted. + // Returns TimeDelta::Zero() for an empty queue. + TimeDelta AverageQueueTime() const; + + // Called during packet processing or when pause stats changes. Since the + // AverageQueueTime() method does not look at the wall time, this method + // needs to be called before querying queue time. + void UpdateAverageQueueTime(Timestamp now); + + // Set the pause state, while `paused` is true queuing time is not counted. + void SetPauseState(bool paused, Timestamp now); private: static constexpr int kNumPriorityLevels = 4; diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc deleted file mode 100644 index d7525e9d5a..0000000000 --- a/modules/pacing/round_robin_packet_queue.cc +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/pacing/round_robin_packet_queue.h" - -#include -#include -#include - -#include "absl/strings/match.h" -#include "rtc_base/checks.h" - -namespace webrtc { -namespace { -static constexpr DataSize kMaxLeadingSize = DataSize::Bytes(1400); - -int GetPriorityForType(RtpPacketMediaType type) { - // Lower number takes priority over higher. - switch (type) { - case RtpPacketMediaType::kAudio: - // Audio is always prioritized over other packet types. - return 0; - case RtpPacketMediaType::kRetransmission: - // Send retransmissions before new media. - return 1; - case RtpPacketMediaType::kVideo: - case RtpPacketMediaType::kForwardErrorCorrection: - // Video has "normal" priority, in the old speak. - // Send redundancy concurrently to video. If it is delayed it might have a - // lower chance of being useful. - return 2; - case RtpPacketMediaType::kPadding: - // Packets that are in themselves likely useless, only sent to keep the - // BWE high. - return 3; - } - RTC_CHECK_NOTREACHED(); -} - -} // namespace - -RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) = - default; -RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default; - -RoundRobinPacketQueue::QueuedPacket::QueuedPacket( - int priority, - Timestamp enqueue_time, - int64_t enqueue_order, - std::multiset::iterator enqueue_time_it, - std::unique_ptr packet) - : priority_(priority), - enqueue_time_(enqueue_time), - enqueue_order_(enqueue_order), - is_retransmission_(packet->packet_type() == - RtpPacketMediaType::kRetransmission), - enqueue_time_it_(enqueue_time_it), - owned_packet_(packet.release()) {} - -bool RoundRobinPacketQueue::QueuedPacket::operator<( - const RoundRobinPacketQueue::QueuedPacket& other) const { - if (priority_ != other.priority_) - return priority_ > other.priority_; - if (is_retransmission_ != other.is_retransmission_) - return other.is_retransmission_; - - return enqueue_order_ > other.enqueue_order_; -} - -int RoundRobinPacketQueue::QueuedPacket::Priority() const { - return priority_; -} - -RtpPacketMediaType RoundRobinPacketQueue::QueuedPacket::Type() const { - return *owned_packet_->packet_type(); -} - -uint32_t RoundRobinPacketQueue::QueuedPacket::Ssrc() const { - return owned_packet_->Ssrc(); -} - -Timestamp RoundRobinPacketQueue::QueuedPacket::EnqueueTime() const { - return enqueue_time_; -} - -bool RoundRobinPacketQueue::QueuedPacket::IsRetransmission() const { - return Type() == RtpPacketMediaType::kRetransmission; -} - -int64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const { - return enqueue_order_; -} - -RtpPacketToSend* RoundRobinPacketQueue::QueuedPacket::RtpPacket() const { - return owned_packet_; -} - -void RoundRobinPacketQueue::QueuedPacket::UpdateEnqueueTimeIterator( - std::multiset::iterator it) { - enqueue_time_it_ = it; -} - -std::multiset::iterator -RoundRobinPacketQueue::QueuedPacket::EnqueueTimeIterator() const { - return enqueue_time_it_; -} - -void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime( - TimeDelta pause_time_sum) { - enqueue_time_ -= pause_time_sum; -} - -RoundRobinPacketQueue::PriorityPacketQueue::const_iterator -RoundRobinPacketQueue::PriorityPacketQueue::begin() const { - return c.begin(); -} - -RoundRobinPacketQueue::PriorityPacketQueue::const_iterator -RoundRobinPacketQueue::PriorityPacketQueue::end() const { - return c.end(); -} - -RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {} -RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default; -RoundRobinPacketQueue::Stream::~Stream() = default; - -RoundRobinPacketQueue::RoundRobinPacketQueue(Timestamp start_time) - : transport_overhead_per_packet_(DataSize::Zero()), - time_last_updated_(start_time), - enqueue_count_(0), - paused_(false), - size_packets_(0), - size_packets_per_media_type_({}), - size_(DataSize::Zero()), - max_size_(kMaxLeadingSize), - queue_time_sum_(TimeDelta::Zero()), - pause_time_sum_(TimeDelta::Zero()), - include_overhead_(false) {} - -RoundRobinPacketQueue::~RoundRobinPacketQueue() { - // Make sure to release any packets owned by raw pointer in QueuedPacket. - while (size_packets_ > 0) { - Pop(); - } -} - -void RoundRobinPacketQueue::Push(Timestamp enqueue_time, - std::unique_ptr packet) { - RTC_CHECK(packet->packet_type().has_value()); - RtpPacketMediaType packet_type = packet->packet_type().value(); - int priority = GetPriorityForType(packet_type); - if (size_packets_ == 0) { - // Single packet fast-path. - single_packet_queue_.emplace( - QueuedPacket(priority, enqueue_time, enqueue_count_++, - enqueue_times_.end(), std::move(packet))); - UpdateAverageQueueTime(enqueue_time); - single_packet_queue_->SubtractPauseTime(pause_time_sum_); - size_packets_ = 1; - ++size_packets_per_media_type_[static_cast(packet_type)]; - size_ += PacketSize(*single_packet_queue_); - } else { - MaybePromoteSinglePacketToNormalQueue(); - Push(QueuedPacket(priority, enqueue_time, enqueue_count_++, - enqueue_times_.insert(enqueue_time), std::move(packet))); - } -} - -std::unique_ptr RoundRobinPacketQueue::Pop() { - if (single_packet_queue_.has_value()) { - RTC_DCHECK(stream_priorities_.empty()); - std::unique_ptr rtp_packet( - single_packet_queue_->RtpPacket()); - single_packet_queue_.reset(); - queue_time_sum_ = TimeDelta::Zero(); - size_packets_ = 0; - RTC_CHECK(rtp_packet->packet_type().has_value()); - RtpPacketMediaType packet_type = rtp_packet->packet_type().value(); - size_packets_per_media_type_[static_cast(packet_type)] -= 1; - RTC_CHECK_GE(size_packets_per_media_type_[static_cast(packet_type)], - 0); - size_ = DataSize::Zero(); - return rtp_packet; - } - - RTC_DCHECK_GT(size_packets_, 0); - Stream* stream = GetHighestPriorityStream(); - const QueuedPacket& queued_packet = stream->packet_queue.top(); - - stream_priorities_.erase(stream->priority_it); - - // Calculate the total amount of time spent by this packet in the queue - // while in a non-paused state. Note that the `pause_time_sum_ms_` was - // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and - // by subtracting it now we effectively remove the time spent in in the - // queue while in a paused state. - TimeDelta time_in_non_paused_state = - time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_; - queue_time_sum_ -= time_in_non_paused_state; - - RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end()); - enqueue_times_.erase(queued_packet.EnqueueTimeIterator()); - - // Update `bytes` of this stream. The general idea is that the stream that - // has sent the least amount of bytes should have the highest priority. - // The problem with that is if streams send with different rates, in which - // case a "budget" will be built up for the stream sending at the lower - // rate. To avoid building a too large budget we limit `bytes` to be within - // kMaxLeading bytes of the stream that has sent the most amount of bytes. - DataSize packet_size = PacketSize(queued_packet); - stream->size = - std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize); - max_size_ = std::max(max_size_, stream->size); - - size_ -= packet_size; - size_packets_ -= 1; - size_packets_per_media_type_[static_cast(queued_packet.Type())] -= 1; - RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero()); - RTC_CHECK_GE( - size_packets_per_media_type_[static_cast(queued_packet.Type())], - 0); - - std::unique_ptr rtp_packet(queued_packet.RtpPacket()); - stream->packet_queue.pop(); - - // If there are packets left to be sent, schedule the stream again. - RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); - if (stream->packet_queue.empty()) { - stream->priority_it = stream_priorities_.end(); - } else { - int priority = stream->packet_queue.top().Priority(); - stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(priority, stream->size), stream->ssrc); - } - - return rtp_packet; -} - -int RoundRobinPacketQueue::SizeInPackets() const { - return size_packets_; -} - -DataSize RoundRobinPacketQueue::SizeInPayloadBytes() const { - return size_; -} - -const std::array& -RoundRobinPacketQueue::SizeInPacketsPerRtpPacketMediaType() const { - return size_packets_per_media_type_; -} - -Timestamp RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime() const { - if (single_packet_queue_.has_value()) { - if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) { - return single_packet_queue_->EnqueueTime(); - } - return Timestamp::MinusInfinity(); - } - - if (stream_priorities_.empty()) { - return Timestamp::MinusInfinity(); - } - uint32_t ssrc = stream_priorities_.begin()->second; - - const auto& top_packet = streams_.find(ssrc)->second.packet_queue.top(); - if (top_packet.Type() == RtpPacketMediaType::kAudio) { - return top_packet.EnqueueTime(); - } - return Timestamp::MinusInfinity(); -} - -Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const { - if (single_packet_queue_.has_value()) { - return single_packet_queue_->EnqueueTime(); - } - - if (size_packets_ == 0) - return Timestamp::MinusInfinity(); - RTC_CHECK(!enqueue_times_.empty()); - return *enqueue_times_.begin(); -} - -void RoundRobinPacketQueue::UpdateAverageQueueTime(Timestamp now) { - RTC_CHECK_GE(now, time_last_updated_); - if (now == time_last_updated_) - return; - - TimeDelta delta = now - time_last_updated_; - - if (paused_) { - pause_time_sum_ += delta; - } else { - queue_time_sum_ += delta * size_packets_; - } - - time_last_updated_ = now; -} - -void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) { - if (paused_ == paused) - return; - UpdateAverageQueueTime(now); - paused_ = paused; -} - -TimeDelta RoundRobinPacketQueue::AverageQueueTime() const { - if (size_packets_ == 0) - return TimeDelta::Zero(); - return queue_time_sum_ / size_packets_; -} - -void RoundRobinPacketQueue::Push(QueuedPacket packet) { - auto stream_info_it = streams_.find(packet.Ssrc()); - if (stream_info_it == streams_.end()) { - stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first; - stream_info_it->second.priority_it = stream_priorities_.end(); - stream_info_it->second.ssrc = packet.Ssrc(); - } - - Stream* stream = &stream_info_it->second; - - if (stream->priority_it == stream_priorities_.end()) { - // If the SSRC is not currently scheduled, add it to `stream_priorities_`. - RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); - stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc()); - } else if (packet.Priority() < stream->priority_it->first.priority) { - // If the priority of this SSRC increased, remove the outdated StreamPrioKey - // and insert a new one with the new priority. Note that `priority_` uses - // lower ordinal for higher priority. - stream_priorities_.erase(stream->priority_it); - stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc()); - } - RTC_CHECK(stream->priority_it != stream_priorities_.end()); - - if (packet.EnqueueTimeIterator() == enqueue_times_.end()) { - // Promotion from single-packet queue. Just add to enqueue times. - packet.UpdateEnqueueTimeIterator( - enqueue_times_.insert(packet.EnqueueTime())); - } else { - // In order to figure out how much time a packet has spent in the queue - // while not in a paused state, we subtract the total amount of time the - // queue has been paused so far, and when the packet is popped we subtract - // the total amount of time the queue has been paused at that moment. This - // way we subtract the total amount of time the packet has spent in the - // queue while in a paused state. - UpdateAverageQueueTime(packet.EnqueueTime()); - packet.SubtractPauseTime(pause_time_sum_); - - size_packets_ += 1; - size_packets_per_media_type_[static_cast(packet.Type())] += 1; - size_ += PacketSize(packet); - } - - stream->packet_queue.push(packet); -} - -DataSize RoundRobinPacketQueue::PacketSize(const QueuedPacket& packet) const { - DataSize packet_size = DataSize::Bytes(packet.RtpPacket()->payload_size() + - packet.RtpPacket()->padding_size()); - if (include_overhead_) { - packet_size += DataSize::Bytes(packet.RtpPacket()->headers_size()) + - transport_overhead_per_packet_; - } - return packet_size; -} - -void RoundRobinPacketQueue::MaybePromoteSinglePacketToNormalQueue() { - if (single_packet_queue_.has_value()) { - Push(*single_packet_queue_); - single_packet_queue_.reset(); - } -} - -RoundRobinPacketQueue::Stream* -RoundRobinPacketQueue::GetHighestPriorityStream() { - RTC_CHECK(!stream_priorities_.empty()); - uint32_t ssrc = stream_priorities_.begin()->second; - - auto stream_info_it = streams_.find(ssrc); - RTC_CHECK(stream_info_it != streams_.end()); - RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin()); - RTC_CHECK(!stream_info_it->second.packet_queue.empty()); - return &stream_info_it->second; -} - -bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const { - for (const auto& scheduled_stream : stream_priorities_) { - if (scheduled_stream.second == ssrc) - return true; - } - return false; -} - -} // namespace webrtc diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h deleted file mode 100644 index 052b98b16b..0000000000 --- a/modules/pacing/round_robin_packet_queue.h +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_ -#define MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_ - -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "absl/types/optional.h" -#include "api/units/data_size.h" -#include "api/units/time_delta.h" -#include "api/units/timestamp.h" -#include "modules/pacing/pacing_controller.h" -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" - -namespace webrtc { - -class RoundRobinPacketQueue : public PacingController::PacketQueue { - public: - explicit RoundRobinPacketQueue(Timestamp start_time); - ~RoundRobinPacketQueue(); - - void Push(Timestamp enqueue_time, - std::unique_ptr packet) override; - std::unique_ptr Pop() override; - - int SizeInPackets() const override; - DataSize SizeInPayloadBytes() const override; - const std::array& SizeInPacketsPerRtpPacketMediaType() - const override; - Timestamp LeadingAudioPacketEnqueueTime() const override; - Timestamp OldestEnqueueTime() const override; - TimeDelta AverageQueueTime() const override; - void UpdateAverageQueueTime(Timestamp now) override; - void SetPauseState(bool paused, Timestamp now) override; - - private: - struct QueuedPacket { - public: - QueuedPacket(int priority, - Timestamp enqueue_time, - int64_t enqueue_order, - std::multiset::iterator enqueue_time_it, - std::unique_ptr packet); - QueuedPacket(const QueuedPacket& rhs); - ~QueuedPacket(); - - bool operator<(const QueuedPacket& other) const; - - int Priority() const; - RtpPacketMediaType Type() const; - uint32_t Ssrc() const; - Timestamp EnqueueTime() const; - bool IsRetransmission() const; - int64_t EnqueueOrder() const; - RtpPacketToSend* RtpPacket() const; - - std::multiset::iterator EnqueueTimeIterator() const; - void UpdateEnqueueTimeIterator(std::multiset::iterator it); - void SubtractPauseTime(TimeDelta pause_time_sum); - - private: - int priority_; - Timestamp enqueue_time_; // Absolute time of pacer queue entry. - int64_t enqueue_order_; - bool is_retransmission_; // Cached for performance. - std::multiset::iterator enqueue_time_it_; - // Raw pointer since priority_queue doesn't allow for moving - // out of the container. - RtpPacketToSend* owned_packet_; - }; - - class PriorityPacketQueue : public std::priority_queue { - public: - using const_iterator = container_type::const_iterator; - const_iterator begin() const; - const_iterator end() const; - }; - - struct StreamPrioKey { - StreamPrioKey(int priority, DataSize size) - : priority(priority), size(size) {} - - bool operator<(const StreamPrioKey& other) const { - if (priority != other.priority) - return priority < other.priority; - return size < other.size; - } - - const int priority; - const DataSize size; - }; - - struct Stream { - Stream(); - Stream(const Stream&); - - virtual ~Stream(); - - DataSize size; - uint32_t ssrc; - - PriorityPacketQueue packet_queue; - - // Whenever a packet is inserted for this stream we check if `priority_it` - // points to an element in `stream_priorities_`, and if it does it means - // this stream has already been scheduled, and if the scheduled priority is - // lower than the priority of the incoming packet we reschedule this stream - // with the higher priority. - std::multimap::iterator priority_it; - }; - - void Push(QueuedPacket packet); - - DataSize PacketSize(const QueuedPacket& packet) const; - void MaybePromoteSinglePacketToNormalQueue(); - - Stream* GetHighestPriorityStream(); - - // Just used to verify correctness. - bool IsSsrcScheduled(uint32_t ssrc) const; - - DataSize transport_overhead_per_packet_; - - Timestamp time_last_updated_; - - int64_t enqueue_count_; - - bool paused_; - int size_packets_; - std::array size_packets_per_media_type_; - DataSize size_; - DataSize max_size_; - TimeDelta queue_time_sum_; - TimeDelta pause_time_sum_; - - // A map of streams used to prioritize from which stream to send next. We use - // a multimap instead of a priority_queue since the priority of a stream can - // change as a new packet is inserted, and a multimap allows us to remove and - // then reinsert a StreamPrioKey if the priority has increased. - std::multimap stream_priorities_; - - // A map of SSRCs to Streams. - std::unordered_map streams_; - - // The enqueue time of every packet currently in the queue. Used to figure out - // the age of the oldest packet in the queue. - std::multiset enqueue_times_; - - absl::optional single_packet_queue_; - - bool include_overhead_; -}; -} // namespace webrtc - -#endif // MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_ diff --git a/modules/pacing/round_robin_packet_queue_unittest.cc b/modules/pacing/round_robin_packet_queue_unittest.cc deleted file mode 100644 index 86f07be429..0000000000 --- a/modules/pacing/round_robin_packet_queue_unittest.cc +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/pacing/round_robin_packet_queue.h" - -#include - -#include "api/units/timestamp.h" -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" -#include "rtc_base/checks.h" -#include "test/gmock.h" -#include "test/gtest.h" - -namespace webrtc { - -namespace { - -constexpr uint32_t kDefaultSsrc = 123; -constexpr int kDefaultPayloadSize = 321; - -std::unique_ptr CreatePacket(RtpPacketMediaType type, - uint16_t sequence_number) { - auto packet = std::make_unique(/*extensions=*/nullptr); - packet->set_packet_type(type); - packet->SetSsrc(kDefaultSsrc); - packet->SetSequenceNumber(sequence_number); - packet->SetPayloadSize(kDefaultPayloadSize); - return packet; -} - -} // namespace - -TEST(RoundRobinPacketQueueTest, - PushAndPopUpdatesSizeInPacketsPerRtpPacketMediaType) { - Timestamp now = Timestamp::Zero(); - RoundRobinPacketQueue queue(now); - - // Initially all sizes are zero. - for (size_t i = 0; i < kNumMediaTypes; ++i) { - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0); - } - - // Push packets. - queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, 1)); - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( - RtpPacketMediaType::kAudio)], - 1); - - queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, 2)); - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( - RtpPacketMediaType::kVideo)], - 1); - - queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, 3)); - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( - RtpPacketMediaType::kRetransmission)], - 1); - - queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection, 4)); - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( - RtpPacketMediaType::kForwardErrorCorrection)], - 1); - - queue.Push(now, CreatePacket(RtpPacketMediaType::kPadding, 5)); - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( - RtpPacketMediaType::kPadding)], - 1); - - // Now all sizes are 1. - for (size_t i = 0; i < kNumMediaTypes; ++i) { - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 1); - } - - // Popping happens in a priority order based on media type. This test does not - // assert what this order is, only that the counter for the popped packet's - // media type is decremented. - for (size_t i = 0; i < kNumMediaTypes; ++i) { - auto popped_packet = queue.Pop(); - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( - popped_packet->packet_type().value())], - 0); - } - - // We've popped all packets, so all sizes are zero. - for (size_t i = 0; i < kNumMediaTypes; ++i) { - EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0); - } -} - -} // namespace webrtc