diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index e7c6a43a28..ed6d2bd792 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -94,6 +94,7 @@ if (rtc_include_tests) { "pacing_controller_unittest.cc", "packet_router_unittest.cc", "prioritized_packet_queue_unittest.cc", + "round_robin_packet_queue_unittest.cc", "task_queue_paced_sender_unittest.cc", ] deps = [ @@ -103,6 +104,7 @@ if (rtc_include_tests) { "../../api/transport:network_control", "../../api/units:data_rate", "../../api/units:time_delta", + "../../api/units:timestamp", "../../modules/utility:mock_process_thread", "../../rtc_base:checks", "../../rtc_base:rtc_base_tests_utils", diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index a0f4ecb12e..98123c8e88 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -259,6 +259,11 @@ size_t PacingController::QueueSizePackets() const { return rtc::checked_cast(packet_queue_->SizeInPackets()); } +const std::array& +PacingController::SizeInPacketsPerRtpPacketMediaType() const { + return packet_queue_->SizeInPacketsPerRtpPacketMediaType(); +} + DataSize PacingController::QueueSizeData() const { DataSize size = packet_queue_->SizeInPayloadBytes(); if (include_overhead_) { diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index f7c0939619..b3949b6ae1 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -68,6 +69,11 @@ class PacingController { bool Empty() const { return SizeInPackets() == 0; } virtual DataSize SizeInPayloadBytes() const = 0; + // Total packets in the queue per media type (RtpPacketMediaType values are + // used as lookup index). + virtual const std::array& + SizeInPacketsPerRtpPacketMediaType() const = 0; + // If the next packet, that would be returned by Pop() if called // now, is an audio packet this method returns the enqueue time // of that packet. If queue is empty or top packet is not audio, @@ -159,6 +165,10 @@ class PacingController { // Number of packets in the pacer queue. size_t QueueSizePackets() const; + // Number of packets in the pacer queue per media type (RtpPacketMediaType + // values are used as lookup index). + const std::array& SizeInPacketsPerRtpPacketMediaType() + const; // Totals size of packets in the pacer queue. DataSize QueueSizeData() const; diff --git a/modules/pacing/prioritized_packet_queue.cc b/modules/pacing/prioritized_packet_queue.cc index b5c05828d4..83ec77da28 100644 --- a/modules/pacing/prioritized_packet_queue.cc +++ b/modules/pacing/prioritized_packet_queue.cc @@ -95,6 +95,7 @@ PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time) : queue_time_sum_(TimeDelta::Zero()), pause_time_sum_(TimeDelta::Zero()), size_packets_(0), + size_packets_per_media_type_({}), size_payload_(DataSize::Zero()), last_update_time_(creation_time), paused_(false), @@ -112,7 +113,9 @@ void PrioritizedPacketQueue::Push(Timestamp enqueue_time, auto enqueue_time_iterator = enqueue_times_.insert(enqueue_times_.end(), enqueue_time); - int prio_level = GetPriorityForType(*packet->packet_type()); + RTC_DCHECK(packet->packet_type().has_value()); + RtpPacketMediaType packet_type = packet->packet_type().value(); + int prio_level = GetPriorityForType(packet_type); RTC_DCHECK_GE(prio_level, 0); RTC_DCHECK_LT(prio_level, kNumPriorityLevels); QueuedPacket queued_packed = {.packet = std::move(packet), @@ -127,6 +130,7 @@ void PrioritizedPacketQueue::Push(Timestamp enqueue_time, UpdateAverageQueueTime(enqueue_time); queued_packed.enqueue_time -= pause_time_sum_; ++size_packets_; + ++size_packets_per_media_type_[static_cast(packet_type)]; size_payload_ += queued_packed.PacketSize(); if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) { @@ -160,6 +164,11 @@ std::unique_ptr PrioritizedPacketQueue::Pop() { StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front(); QueuedPacket packet = stream_queue.DequePacket(top_active_prio_level_); --size_packets_; + RTC_DCHECK(packet.packet->packet_type().has_value()); + RtpPacketMediaType packet_type = packet.packet->packet_type().value(); + --size_packets_per_media_type_[static_cast(packet_type)]; + RTC_DCHECK_GE(size_packets_per_media_type_[static_cast(packet_type)], + 0); size_payload_ -= packet.PacketSize(); // Calculate the total amount of time spent by this packet in the queue @@ -207,6 +216,11 @@ DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const { return size_payload_; } +const std::array& +PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const { + return size_packets_per_media_type_; +} + Timestamp PrioritizedPacketQueue::LeadingAudioPacketEnqueueTime() const { if (streams_by_prio_[kAudioPrioLevel].empty()) { return Timestamp::MinusInfinity(); diff --git a/modules/pacing/prioritized_packet_queue.h b/modules/pacing/prioritized_packet_queue.h index 2b65365261..c770435aa1 100644 --- a/modules/pacing/prioritized_packet_queue.h +++ b/modules/pacing/prioritized_packet_queue.h @@ -37,6 +37,8 @@ class PrioritizedPacketQueue : public PacingController::PacketQueue { std::unique_ptr Pop() override; int SizeInPackets() const override; DataSize SizeInPayloadBytes() const override; + const std::array& SizeInPacketsPerRtpPacketMediaType() + const override; Timestamp LeadingAudioPacketEnqueueTime() const override; Timestamp OldestEnqueueTime() const override; TimeDelta AverageQueueTime() const override; @@ -88,6 +90,8 @@ class PrioritizedPacketQueue : public PacingController::PacketQueue { TimeDelta pause_time_sum_; // Total number of packets stored in this queue. int size_packets_; + // Total number of packets stored in this queue per RtpPacketMediaType. + std::array size_packets_per_media_type_; // Sum of payload sizes for all packts stored in this queue. DataSize size_payload_; // The last time queue/pause time sums were updated. diff --git a/modules/pacing/prioritized_packet_queue_unittest.cc b/modules/pacing/prioritized_packet_queue_unittest.cc index d8732e2358..6e27ff018d 100644 --- a/modules/pacing/prioritized_packet_queue_unittest.cc +++ b/modules/pacing/prioritized_packet_queue_unittest.cc @@ -13,6 +13,7 @@ #include #include "api/units/time_delta.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "rtc_base/checks.h" #include "test/gmock.h" @@ -230,4 +231,61 @@ TEST(PrioritizedPacketQueue, ReportsLeadingAudioEnqueueTime) { EXPECT_EQ(queue.LeadingAudioPacketEnqueueTime(), Timestamp::MinusInfinity()); } +TEST(PrioritizedPacketQueue, + PushAndPopUpdatesSizeInPacketsPerRtpPacketMediaType) { + Timestamp now = Timestamp::Zero(); + PrioritizedPacketQueue queue(now); + + // Initially all sizes are zero. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0); + } + + // Push packets. + queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, 1)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kAudio)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, 2)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kVideo)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, 3)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kRetransmission)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection, 4)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kForwardErrorCorrection)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kPadding, 5)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kPadding)], + 1); + + // Now all sizes are 1. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 1); + } + + // Popping happens in a priority order based on media type. This test does not + // assert what this order is, only that the counter for the popped packet's + // media type is decremented. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + auto popped_packet = queue.Pop(); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + popped_packet->packet_type().value())], + 0); + } + + // We've popped all packets, so all sizes are zero. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0); + } +} + } // namespace webrtc diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index 638ab07344..d7525e9d5a 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -137,6 +137,7 @@ RoundRobinPacketQueue::RoundRobinPacketQueue(Timestamp start_time) enqueue_count_(0), paused_(false), size_packets_(0), + size_packets_per_media_type_({}), size_(DataSize::Zero()), max_size_(kMaxLeadingSize), queue_time_sum_(TimeDelta::Zero()), @@ -152,8 +153,9 @@ RoundRobinPacketQueue::~RoundRobinPacketQueue() { void RoundRobinPacketQueue::Push(Timestamp enqueue_time, std::unique_ptr packet) { - RTC_DCHECK(packet->packet_type().has_value()); - int priority = GetPriorityForType(*packet->packet_type()); + RTC_CHECK(packet->packet_type().has_value()); + RtpPacketMediaType packet_type = packet->packet_type().value(); + int priority = GetPriorityForType(packet_type); if (size_packets_ == 0) { // Single packet fast-path. single_packet_queue_.emplace( @@ -162,6 +164,7 @@ void RoundRobinPacketQueue::Push(Timestamp enqueue_time, UpdateAverageQueueTime(enqueue_time); single_packet_queue_->SubtractPauseTime(pause_time_sum_); size_packets_ = 1; + ++size_packets_per_media_type_[static_cast(packet_type)]; size_ += PacketSize(*single_packet_queue_); } else { MaybePromoteSinglePacketToNormalQueue(); @@ -178,6 +181,11 @@ std::unique_ptr RoundRobinPacketQueue::Pop() { single_packet_queue_.reset(); queue_time_sum_ = TimeDelta::Zero(); size_packets_ = 0; + RTC_CHECK(rtp_packet->packet_type().has_value()); + RtpPacketMediaType packet_type = rtp_packet->packet_type().value(); + size_packets_per_media_type_[static_cast(packet_type)] -= 1; + RTC_CHECK_GE(size_packets_per_media_type_[static_cast(packet_type)], + 0); size_ = DataSize::Zero(); return rtp_packet; } @@ -213,7 +221,11 @@ std::unique_ptr RoundRobinPacketQueue::Pop() { size_ -= packet_size; size_packets_ -= 1; + size_packets_per_media_type_[static_cast(queued_packet.Type())] -= 1; RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero()); + RTC_CHECK_GE( + size_packets_per_media_type_[static_cast(queued_packet.Type())], + 0); std::unique_ptr rtp_packet(queued_packet.RtpPacket()); stream->packet_queue.pop(); @@ -239,6 +251,11 @@ DataSize RoundRobinPacketQueue::SizeInPayloadBytes() const { return size_; } +const std::array& +RoundRobinPacketQueue::SizeInPacketsPerRtpPacketMediaType() const { + return size_packets_per_media_type_; +} + Timestamp RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime() const { if (single_packet_queue_.has_value()) { if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) { @@ -339,6 +356,7 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) { packet.SubtractPauseTime(pause_time_sum_); size_packets_ += 1; + size_packets_per_media_type_[static_cast(packet.Type())] += 1; size_ += PacketSize(packet); } diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index 4d6f93ba2d..052b98b16b 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -42,6 +42,8 @@ class RoundRobinPacketQueue : public PacingController::PacketQueue { int SizeInPackets() const override; DataSize SizeInPayloadBytes() const override; + const std::array& SizeInPacketsPerRtpPacketMediaType() + const override; Timestamp LeadingAudioPacketEnqueueTime() const override; Timestamp OldestEnqueueTime() const override; TimeDelta AverageQueueTime() const override; @@ -142,6 +144,7 @@ class RoundRobinPacketQueue : public PacingController::PacketQueue { bool paused_; int size_packets_; + std::array size_packets_per_media_type_; DataSize size_; DataSize max_size_; TimeDelta queue_time_sum_; diff --git a/modules/pacing/round_robin_packet_queue_unittest.cc b/modules/pacing/round_robin_packet_queue_unittest.cc new file mode 100644 index 0000000000..86f07be429 --- /dev/null +++ b/modules/pacing/round_robin_packet_queue_unittest.cc @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/round_robin_packet_queue.h" + +#include + +#include "api/units/timestamp.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" +#include "rtc_base/checks.h" +#include "test/gmock.h" +#include "test/gtest.h" + +namespace webrtc { + +namespace { + +constexpr uint32_t kDefaultSsrc = 123; +constexpr int kDefaultPayloadSize = 321; + +std::unique_ptr CreatePacket(RtpPacketMediaType type, + uint16_t sequence_number) { + auto packet = std::make_unique(/*extensions=*/nullptr); + packet->set_packet_type(type); + packet->SetSsrc(kDefaultSsrc); + packet->SetSequenceNumber(sequence_number); + packet->SetPayloadSize(kDefaultPayloadSize); + return packet; +} + +} // namespace + +TEST(RoundRobinPacketQueueTest, + PushAndPopUpdatesSizeInPacketsPerRtpPacketMediaType) { + Timestamp now = Timestamp::Zero(); + RoundRobinPacketQueue queue(now); + + // Initially all sizes are zero. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0); + } + + // Push packets. + queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, 1)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kAudio)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, 2)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kVideo)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, 3)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kRetransmission)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection, 4)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kForwardErrorCorrection)], + 1); + + queue.Push(now, CreatePacket(RtpPacketMediaType::kPadding, 5)); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + RtpPacketMediaType::kPadding)], + 1); + + // Now all sizes are 1. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 1); + } + + // Popping happens in a priority order based on media type. This test does not + // assert what this order is, only that the counter for the popped packet's + // media type is decremented. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + auto popped_packet = queue.Pop(); + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( + popped_packet->packet_type().value())], + 0); + } + + // We've popped all packets, so all sizes are zero. + for (size_t i = 0; i < kNumMediaTypes; ++i) { + EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0); + } +} + +} // namespace webrtc diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index 5f268a76a4..43c4c965e4 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -297,14 +297,23 @@ void TaskQueuePacedSender::MaybeProcessPackets( !pacing_controller_.IsProbing() ? TaskQueueBase::DelayPrecision::kLow : TaskQueueBase::DelayPrecision::kHigh; - // Optionally disable low precision if the expected queue time is greater - // than `max_low_precision_expected_queue_time`. - if (precision == TaskQueueBase::DelayPrecision::kLow && - slacked_pacer_flags_.max_low_precision_expected_queue_time && - pacing_controller_.ExpectedQueueTime() >= - slacked_pacer_flags_.max_low_precision_expected_queue_time - .Value()) { - precision = TaskQueueBase::DelayPrecision::kHigh; + // Check for cases where we need high precision. + if (precision == TaskQueueBase::DelayPrecision::kLow) { + auto& packets_per_type = + pacing_controller_.SizeInPacketsPerRtpPacketMediaType(); + bool audio_or_retransmission_packets_in_queue = + packets_per_type[static_cast(RtpPacketMediaType::kAudio)] > + 0 || + packets_per_type[static_cast( + RtpPacketMediaType::kRetransmission)] > 0; + bool queue_time_too_large = + slacked_pacer_flags_.max_low_precision_expected_queue_time && + pacing_controller_.ExpectedQueueTime() >= + slacked_pacer_flags_.max_low_precision_expected_queue_time + .Value(); + if (audio_or_retransmission_packets_in_queue || queue_time_too_large) { + precision = TaskQueueBase::DelayPrecision::kHigh; + } } task_queue_.PostDelayedTaskWithPrecision(