Prioritize audio resend before video resend and implement TTL.

Adds separate priorities for audio and video retranmission.
Done by adding an original type to RtpPacketToSend.

Add possiblity to set TTL for audio nack, video nack and video packet separately.
Oldest packet for these types are dropped when a new packet of that type is pushed to the pacer, or when the pacer switch current priority type to that priority.

Effect is that:
   -pacer queue does not grow unlimited for these types if a TTL has been set.
   -an old packet is not sent.

Bug: webrtc:15740
Change-Id: I38718bc570aebca54eacbded69824905f3694f41
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/331823
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41414}
This commit is contained in:
Per K 2023-12-19 12:42:55 +01:00 committed by WebRTC LUCI CQ
parent 70ad987c64
commit b9ba02c025
9 changed files with 382 additions and 45 deletions

View File

@ -63,6 +63,7 @@ rtc_library("pacing") {
] ]
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/cleanup", "//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/container:inlined_vector",
"//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional", "//third_party/abseil-cpp/absl/types:optional",

View File

@ -19,11 +19,11 @@
#include "absl/strings/match.h" #include "absl/strings/match.h"
#include "api/units/data_size.h" #include "api/units/data_size.h"
#include "api/units/time_delta.h" #include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/pacing/bitrate_prober.h" #include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
namespace webrtc { namespace webrtc {
@ -44,8 +44,6 @@ bool IsEnabled(const FieldTrialsView& field_trials, absl::string_view key) {
} // namespace } // namespace
const TimeDelta PacingController::kMaxExpectedQueueLength =
TimeDelta::Millis(2000);
const TimeDelta PacingController::kPausedProcessInterval = const TimeDelta PacingController::kPausedProcessInterval =
kCongestedPacketInterval; kCongestedPacketInterval;
const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1); const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1);
@ -57,11 +55,13 @@ const TimeDelta PacingController::kMaxEarlyProbeProcessing =
PacingController::PacingController(Clock* clock, PacingController::PacingController(Clock* clock,
PacketSender* packet_sender, PacketSender* packet_sender,
const FieldTrialsView& field_trials) const FieldTrialsView& field_trials,
Configuration configuration)
: clock_(clock), : clock_(clock),
packet_sender_(packet_sender), packet_sender_(packet_sender),
field_trials_(field_trials), field_trials_(field_trials),
drain_large_queues_( drain_large_queues_(
configuration.drain_large_queues &&
!IsDisabled(field_trials_, "WebRTC-Pacer-DrainQueue")), !IsDisabled(field_trials_, "WebRTC-Pacer-DrainQueue")),
send_padding_if_silent_( send_padding_if_silent_(
IsEnabled(field_trials_, "WebRTC-Pacer-PadInSilence")), IsEnabled(field_trials_, "WebRTC-Pacer-PadInSilence")),
@ -71,9 +71,10 @@ PacingController::PacingController(Clock* clock,
fast_retransmissions_( fast_retransmissions_(
IsEnabled(field_trials_, "WebRTC-Pacer-FastRetransmissions")), IsEnabled(field_trials_, "WebRTC-Pacer-FastRetransmissions")),
keyframe_flushing_( keyframe_flushing_(
configuration.keyframe_flushing ||
IsEnabled(field_trials_, "WebRTC-Pacer-KeyframeFlushing")), IsEnabled(field_trials_, "WebRTC-Pacer-KeyframeFlushing")),
transport_overhead_per_packet_(DataSize::Zero()), transport_overhead_per_packet_(DataSize::Zero()),
send_burst_interval_(kDefaultBurstInterval), send_burst_interval_(configuration.send_burst_interval),
last_timestamp_(clock_->CurrentTime()), last_timestamp_(clock_->CurrentTime()),
paused_(false), paused_(false),
media_debt_(DataSize::Zero()), media_debt_(DataSize::Zero()),
@ -86,9 +87,11 @@ PacingController::PacingController(Clock* clock,
last_process_time_(clock->CurrentTime()), last_process_time_(clock->CurrentTime()),
last_send_time_(last_process_time_), last_send_time_(last_process_time_),
seen_first_packet_(false), seen_first_packet_(false),
packet_queue_(/*creation_time=*/last_process_time_), packet_queue_(/*creation_time=*/last_process_time_,
configuration.prioritize_audio_retransmission,
configuration.packet_queue_ttl),
congested_(false), congested_(false),
queue_time_limit_(kMaxExpectedQueueLength), queue_time_limit_(configuration.queue_time_limit),
account_for_audio_(false), account_for_audio_(false),
include_overhead_(false), include_overhead_(false),
circuit_breaker_threshold_(1 << 16) { circuit_breaker_threshold_(1 << 16) {
@ -710,8 +713,7 @@ Timestamp PacingController::NextUnpacedSendTime() const {
} }
if (fast_retransmissions_) { if (fast_retransmissions_) {
Timestamp leading_retransmission_send_time = Timestamp leading_retransmission_send_time =
packet_queue_.LeadingPacketEnqueueTime( packet_queue_.LeadingPacketEnqueueTimeForRetransmission();
RtpPacketMediaType::kRetransmission);
if (leading_retransmission_send_time.IsFinite()) { if (leading_retransmission_send_time.IsFinite()) {
return leading_retransmission_send_time; return leading_retransmission_send_time;
} }

View File

@ -67,11 +67,6 @@ class PacingController {
} }
}; };
// Expected max pacer delay. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
// UpdateBitrate() so that this limit will be upheld.
static const TimeDelta kMaxExpectedQueueLength;
// If no media or paused, wake up at least every `kPausedProcessIntervalMs` in // If no media or paused, wake up at least every `kPausedProcessIntervalMs` in
// order to send a keep-alive packet so we don't get stuck in a bad state due // order to send a keep-alive packet so we don't get stuck in a bad state due
// to lack of feedback. // to lack of feedback.
@ -93,14 +88,45 @@ class PacingController {
// the send burst interval. // the send burst interval.
// Ex: max send burst interval = 63Kb / 10Mbit/s = 50ms. // Ex: max send burst interval = 63Kb / 10Mbit/s = 50ms.
static constexpr DataSize kMaxBurstSize = DataSize::Bytes(63 * 1000); static constexpr DataSize kMaxBurstSize = DataSize::Bytes(63 * 1000);
// The pacer is allowed to send enqued packets in bursts and can build up a
// packet "debt" that correspond to approximately the send rate during // Configuration default values.
// the burst interval.
static constexpr TimeDelta kDefaultBurstInterval = TimeDelta::Millis(40); static constexpr TimeDelta kDefaultBurstInterval = TimeDelta::Millis(40);
static constexpr TimeDelta kMaxExpectedQueueLength = TimeDelta::Millis(2000);
struct Configuration {
// If the pacer queue grows longer than the configured max queue limit,
// pacer sends at the minimum rate needed to keep the max queue limit and
// ignore the current bandwidth estimate.
bool drain_large_queues = true;
// Expected max pacer delay. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
// SetPacingRates() so that this limit will be upheld if
// `drain_large_queues` is set.
TimeDelta queue_time_limit = kMaxExpectedQueueLength;
// If the first packet of a keyframe is enqueued on a RTP stream, pacer
// skips forward to that packet and drops other enqueued packets on that
// stream, unless a keyframe is already being paced.
bool keyframe_flushing = false;
// Audio retransmission is prioritized before video retransmission packets.
bool prioritize_audio_retransmission = false;
// Configure separate timeouts per priority. After a timeout, a packet of
// that sort will not be paced and instead dropped.
// Note: to set TTL on audio retransmission,
// `prioritize_audio_retransmission` must be true.
PacketQueueTTL packet_queue_ttl;
// The pacer is allowed to send enqueued packets in bursts and can build up
// a packet "debt" that correspond to approximately the send rate during the
// burst interval.
TimeDelta send_burst_interval = kDefaultBurstInterval;
};
static Configuration DefaultConfiguration() { return Configuration{}; }
PacingController(Clock* clock, PacingController(Clock* clock,
PacketSender* packet_sender, PacketSender* packet_sender,
const FieldTrialsView& field_trials); const FieldTrialsView& field_trials,
Configuration configuration = DefaultConfiguration());
~PacingController(); ~PacingController();

View File

@ -2348,5 +2348,43 @@ TEST_F(PacingControllerTest, FlushesPacketsOnKeyFrames) {
pacer->ProcessPackets(); pacer->ProcessPackets();
} }
TEST_F(PacingControllerTest, CanControlQueueSizeUsingTtl) {
const uint32_t kSsrc = 12345;
const uint32_t kAudioSsrc = 2345;
uint16_t sequence_number = 1234;
PacingController::Configuration config;
config.drain_large_queues = false;
config.packet_queue_ttl.video = TimeDelta::Millis(500);
auto pacer =
std::make_unique<PacingController>(&clock_, &callback_, trials_, config);
pacer->SetPacingRates(DataRate::BitsPerSec(100'000), DataRate::Zero());
Timestamp send_time = Timestamp::Zero();
for (int i = 0; i < 100; ++i) {
// Enqueue a new audio and video frame every 33ms.
if (clock_.CurrentTime() - send_time > TimeDelta::Millis(33)) {
for (int j = 0; j < 3; ++j) {
auto packet = BuildPacket(RtpPacketMediaType::kVideo, kSsrc,
/*sequence_number=*/++sequence_number,
/*capture_time_ms=*/2,
/*size_bytes=*/1000);
pacer->EnqueuePacket(std::move(packet));
}
auto packet = BuildPacket(RtpPacketMediaType::kAudio, kAudioSsrc,
/*sequence_number=*/++sequence_number,
/*capture_time_ms=*/2,
/*size_bytes=*/100);
pacer->EnqueuePacket(std::move(packet));
send_time = clock_.CurrentTime();
}
EXPECT_LE(clock_.CurrentTime() - pacer->OldestPacketEnqueueTime(),
TimeDelta::Millis(500));
clock_.AdvanceTime(pacer->NextSendTime() - clock_.CurrentTime());
pacer->ProcessPackets();
}
}
} // namespace } // namespace
} // namespace webrtc } // namespace webrtc

View File

@ -10,41 +10,70 @@
#include "modules/pacing/prioritized_packet_queue.h" #include "modules/pacing/prioritized_packet_queue.h"
#include <algorithm>
#include <array>
#include <utility> #include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/types/optional.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h"
namespace webrtc { namespace webrtc {
namespace { namespace {
constexpr int kAudioPrioLevel = 0; constexpr int kAudioPrioLevel = 0;
int GetPriorityForType(RtpPacketMediaType type) { int GetPriorityForType(
RtpPacketMediaType type,
absl::optional<RtpPacketToSend::OriginalType> original_type) {
// Lower number takes priority over higher. // Lower number takes priority over higher.
switch (type) { switch (type) {
case RtpPacketMediaType::kAudio: case RtpPacketMediaType::kAudio:
// Audio is always prioritized over other packet types. // Audio is always prioritized over other packet types.
return kAudioPrioLevel; return kAudioPrioLevel;
case RtpPacketMediaType::kRetransmission: case RtpPacketMediaType::kRetransmission:
// Send retransmissions before new media. // Send retransmissions before new media. If original_type is set, audio
// retransmission is prioritized more than video retransmission.
if (original_type == RtpPacketToSend::OriginalType::kVideo) {
return kAudioPrioLevel + 2;
}
return kAudioPrioLevel + 1; return kAudioPrioLevel + 1;
case RtpPacketMediaType::kVideo: case RtpPacketMediaType::kVideo:
case RtpPacketMediaType::kForwardErrorCorrection: case RtpPacketMediaType::kForwardErrorCorrection:
// Video has "normal" priority, in the old speak. // Video has "normal" priority, in the old speak.
// Send redundancy concurrently to video. If it is delayed it might have a // Send redundancy concurrently to video. If it is delayed it might have a
// lower chance of being useful. // lower chance of being useful.
return kAudioPrioLevel + 2; return kAudioPrioLevel + 3;
case RtpPacketMediaType::kPadding: case RtpPacketMediaType::kPadding:
// Packets that are in themselves likely useless, only sent to keep the // Packets that are in themselves likely useless, only sent to keep the
// BWE high. // BWE high.
return kAudioPrioLevel + 3; return kAudioPrioLevel + 4;
} }
RTC_CHECK_NOTREACHED(); RTC_CHECK_NOTREACHED();
} }
} // namespace } // namespace
absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
PrioritizedPacketQueue::ToTtlPerPrio(PacketQueueTTL packet_queue_ttl) {
absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
ttl_per_prio(kNumPriorityLevels, TimeDelta::PlusInfinity());
ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kAudio)] =
packet_queue_ttl.audio_retransmission;
ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kVideo)] =
packet_queue_ttl.video_retransmission;
ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kVideo, absl::nullopt)] =
packet_queue_ttl.video;
return ttl_per_prio;
}
DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const { DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const {
return DataSize::Bytes(packet->payload_size() + packet->padding_size()); return DataSize::Bytes(packet->payload_size() + packet->padding_size());
} }
@ -109,8 +138,13 @@ PrioritizedPacketQueue::StreamQueue::DequeueAll() {
return packets_by_prio; return packets_by_prio;
} }
PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time) PrioritizedPacketQueue::PrioritizedPacketQueue(
: queue_time_sum_(TimeDelta::Zero()), Timestamp creation_time,
bool prioritize_audio_retransmission,
PacketQueueTTL packet_queue_ttl)
: prioritize_audio_retransmission_(prioritize_audio_retransmission),
time_to_live_per_prio_(ToTtlPerPrio(packet_queue_ttl)),
queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()), pause_time_sum_(TimeDelta::Zero()),
size_packets_(0), size_packets_(0),
size_packets_per_media_type_({}), size_packets_per_media_type_({}),
@ -133,7 +167,11 @@ void PrioritizedPacketQueue::Push(Timestamp enqueue_time,
enqueue_times_.insert(enqueue_times_.end(), enqueue_time); enqueue_times_.insert(enqueue_times_.end(), enqueue_time);
RTC_DCHECK(packet->packet_type().has_value()); RTC_DCHECK(packet->packet_type().has_value());
RtpPacketMediaType packet_type = packet->packet_type().value(); RtpPacketMediaType packet_type = packet->packet_type().value();
int prio_level = GetPriorityForType(packet_type); int prio_level =
GetPriorityForType(packet_type, prioritize_audio_retransmission_
? packet->original_packet_type()
: absl::nullopt);
PurgeOldPacketsAtPriorityLevel(prio_level, enqueue_time);
RTC_DCHECK_GE(prio_level, 0); RTC_DCHECK_GE(prio_level, 0);
RTC_DCHECK_LT(prio_level, kNumPriorityLevels); RTC_DCHECK_LT(prio_level, kNumPriorityLevels);
QueuedPacket queued_packed = {.packet = std::move(packet), QueuedPacket queued_packed = {.packet = std::move(packet),
@ -214,7 +252,8 @@ PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime( Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
RtpPacketMediaType type) const { RtpPacketMediaType type) const {
const int priority_level = GetPriorityForType(type); RTC_DCHECK(type != RtpPacketMediaType::kRetransmission);
const int priority_level = GetPriorityForType(type, absl::nullopt);
if (streams_by_prio_[priority_level].empty()) { if (streams_by_prio_[priority_level].empty()) {
return Timestamp::MinusInfinity(); return Timestamp::MinusInfinity();
} }
@ -222,6 +261,39 @@ Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
priority_level); priority_level);
} }
Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTimeForRetransmission()
const {
if (!prioritize_audio_retransmission_) {
const int priority_level =
GetPriorityForType(RtpPacketMediaType::kRetransmission, absl::nullopt);
if (streams_by_prio_[priority_level].empty()) {
return Timestamp::PlusInfinity();
}
return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
priority_level);
}
const int audio_priority_level =
GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kAudio);
const int video_priority_level =
GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kVideo);
Timestamp next_audio =
streams_by_prio_[audio_priority_level].empty()
? Timestamp::PlusInfinity()
: streams_by_prio_[audio_priority_level]
.front()
->LeadingPacketEnqueueTime(audio_priority_level);
Timestamp next_video =
streams_by_prio_[video_priority_level].empty()
? Timestamp::PlusInfinity()
: streams_by_prio_[video_priority_level]
.front()
->LeadingPacketEnqueueTime(video_priority_level);
return std::min(next_audio, next_video);
}
Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const { Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const {
return enqueue_times_.empty() ? Timestamp::MinusInfinity() return enqueue_times_.empty() ? Timestamp::MinusInfinity()
: enqueue_times_.front(); : enqueue_times_.front();
@ -283,9 +355,6 @@ void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
// Update the global top prio level if neccessary. // Update the global top prio level if neccessary.
RTC_DCHECK(streams_by_prio_[i].front() == &queue); RTC_DCHECK(streams_by_prio_[i].front() == &queue);
streams_by_prio_[i].pop_front(); streams_by_prio_[i].pop_front();
if (i == top_active_prio_level_) {
MaybeUpdateTopPrioLevel();
}
} else { } else {
// More than stream had packets at this prio level, filter this one out. // More than stream had packets at this prio level, filter this one out.
std::deque<StreamQueue*> filtered_queue; std::deque<StreamQueue*> filtered_queue;
@ -298,6 +367,7 @@ void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
} }
} }
} }
MaybeUpdateTopPrioLevel();
} }
bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const { bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const {
@ -340,13 +410,15 @@ void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) {
} }
void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() { void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
if (streams_by_prio_[top_active_prio_level_].empty()) { if (top_active_prio_level_ == -1 ||
streams_by_prio_[top_active_prio_level_].empty()) {
// No stream queues have packets at this prio level, find top priority // No stream queues have packets at this prio level, find top priority
// that is not empty. // that is not empty.
if (size_packets_ == 0) { if (size_packets_ == 0) {
top_active_prio_level_ = -1; top_active_prio_level_ = -1;
} else { } else {
for (int i = 0; i < kNumPriorityLevels; ++i) { for (int i = 0; i < kNumPriorityLevels; ++i) {
PurgeOldPacketsAtPriorityLevel(i, last_update_time_);
if (!streams_by_prio_[i].empty()) { if (!streams_by_prio_[i].empty()) {
top_active_prio_level_ = i; top_active_prio_level_ = i;
break; break;
@ -356,4 +428,35 @@ void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
} }
} }
void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level,
Timestamp now) {
RTC_DCHECK(prio_level >= 0 && prio_level < kNumPriorityLevels);
TimeDelta time_to_live = time_to_live_per_prio_[prio_level];
if (time_to_live.IsInfinite()) {
return;
}
std::deque<StreamQueue*>& queues = streams_by_prio_[prio_level];
auto iter = queues.begin();
while (iter != queues.end()) {
StreamQueue* queue_ptr = *iter;
while (queue_ptr->HasPacketsAtPrio(prio_level) &&
(now - queue_ptr->LeadingPacketEnqueueTime(prio_level)) >
time_to_live) {
QueuedPacket packet = queue_ptr->DequeuePacket(prio_level);
RTC_LOG(LS_INFO) << "Dropping old packet on SSRC: "
<< packet.packet->Ssrc()
<< " seq:" << packet.packet->SequenceNumber()
<< " time in queue:" << (now - packet.enqueue_time).ms()
<< " ms";
DequeuePacketInternal(packet);
}
if (!queue_ptr->HasPacketsAtPrio(prio_level)) {
iter = queues.erase(iter);
} else {
++iter;
}
}
}
} // namespace webrtc } // namespace webrtc

View File

@ -18,8 +18,8 @@
#include <list> #include <list>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <vector>
#include "absl/container/inlined_vector.h"
#include "api/units/data_size.h" #include "api/units/data_size.h"
#include "api/units/time_delta.h" #include "api/units/time_delta.h"
#include "api/units/timestamp.h" #include "api/units/timestamp.h"
@ -27,9 +27,19 @@
namespace webrtc { namespace webrtc {
// Describes how long time a packet may stay in the queue before being dropped.
struct PacketQueueTTL {
TimeDelta audio_retransmission = TimeDelta::PlusInfinity();
TimeDelta video_retransmission = TimeDelta::PlusInfinity();
TimeDelta video = TimeDelta::PlusInfinity();
};
class PrioritizedPacketQueue { class PrioritizedPacketQueue {
public: public:
explicit PrioritizedPacketQueue(Timestamp creation_time); explicit PrioritizedPacketQueue(
Timestamp creation_time,
bool prioritize_audio_retransmission = false,
PacketQueueTTL packet_queue_ttl = PacketQueueTTL());
PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete; PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete;
PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete; PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete;
@ -63,6 +73,7 @@ class PrioritizedPacketQueue {
// method, for the given packet type. If queue has no packets, of that type, // method, for the given packet type. If queue has no packets, of that type,
// returns Timestamp::MinusInfinity(). // returns Timestamp::MinusInfinity().
Timestamp LeadingPacketEnqueueTime(RtpPacketMediaType type) const; Timestamp LeadingPacketEnqueueTime(RtpPacketMediaType type) const;
Timestamp LeadingPacketEnqueueTimeForRetransmission() const;
// Enqueue time of the oldest packet in the queue, // Enqueue time of the oldest packet in the queue,
// Timestamp::MinusInfinity() if queue is empty. // Timestamp::MinusInfinity() if queue is empty.
@ -90,7 +101,7 @@ class PrioritizedPacketQueue {
bool HasKeyframePackets(uint32_t ssrc) const; bool HasKeyframePackets(uint32_t ssrc) const;
private: private:
static constexpr int kNumPriorityLevels = 4; static constexpr int kNumPriorityLevels = 5;
class QueuedPacket { class QueuedPacket {
public: public:
@ -139,6 +150,15 @@ class PrioritizedPacketQueue {
// if so move it to the lowest non-empty index. // if so move it to the lowest non-empty index.
void MaybeUpdateTopPrioLevel(); void MaybeUpdateTopPrioLevel();
void PurgeOldPacketsAtPriorityLevel(int prio_level, Timestamp now);
static absl::InlinedVector<TimeDelta, kNumPriorityLevels> ToTtlPerPrio(
PacketQueueTTL);
const bool prioritize_audio_retransmission_;
const absl::InlinedVector<TimeDelta, kNumPriorityLevels>
time_to_live_per_prio_;
// Cumulative sum, over all packets, of time spent in the queue. // Cumulative sum, over all packets, of time spent in the queue.
TimeDelta queue_time_sum_; TimeDelta queue_time_sum_;
// Cumulative sum of time the queue has spent in a paused state. // Cumulative sum of time the queue has spent in a paused state.

View File

@ -10,6 +10,7 @@
#include "modules/pacing/prioritized_packet_queue.h" #include "modules/pacing/prioritized_packet_queue.h"
#include <memory>
#include <utility> #include <utility>
#include "api/units/time_delta.h" #include "api/units/time_delta.h"
@ -26,18 +27,39 @@ constexpr uint32_t kDefaultSsrc = 123;
constexpr int kDefaultPayloadSize = 789; constexpr int kDefaultPayloadSize = 789;
std::unique_ptr<RtpPacketToSend> CreatePacket(RtpPacketMediaType type, std::unique_ptr<RtpPacketToSend> CreatePacket(RtpPacketMediaType type,
uint16_t sequence_number, uint16_t seq,
uint32_t ssrc = kDefaultSsrc, uint32_t ssrc = kDefaultSsrc,
bool is_key_frame = false) { bool is_key_frame = false) {
auto packet = std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr); auto packet = std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
packet->set_packet_type(type); packet->set_packet_type(type);
packet->SetSsrc(ssrc); packet->SetSsrc(ssrc);
packet->SetSequenceNumber(sequence_number); packet->SetSequenceNumber(seq);
packet->SetPayloadSize(kDefaultPayloadSize); packet->SetPayloadSize(kDefaultPayloadSize);
packet->set_is_key_frame(is_key_frame); packet->set_is_key_frame(is_key_frame);
return packet; return packet;
} }
std::unique_ptr<RtpPacketToSend> CreateRetransmissionPacket(
RtpPacketMediaType original_type,
uint16_t seq,
uint32_t ssrc = kDefaultSsrc) {
auto packet = std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
packet->set_packet_type(original_type);
packet->set_packet_type(RtpPacketMediaType::kRetransmission);
RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kRetransmission);
if (original_type == RtpPacketMediaType::kVideo) {
RTC_DCHECK(packet->original_packet_type() ==
RtpPacketToSend::OriginalType::kVideo);
} else {
RTC_DCHECK(packet->original_packet_type() ==
RtpPacketToSend::OriginalType::kAudio);
}
packet->SetSsrc(ssrc);
packet->SetSequenceNumber(seq);
packet->SetPayloadSize(kDefaultPayloadSize);
return packet;
}
} // namespace } // namespace
TEST(PrioritizedPacketQueue, ReturnsPacketsInPrioritizedOrder) { TEST(PrioritizedPacketQueue, ReturnsPacketsInPrioritizedOrder) {
@ -49,18 +71,42 @@ TEST(PrioritizedPacketQueue, ReturnsPacketsInPrioritizedOrder) {
queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2)); queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2));
queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection, queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection,
/*seq=*/3)); /*seq=*/3));
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/4)); queue.Push(now,
queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/5)); CreateRetransmissionPacket(RtpPacketMediaType::kVideo, /*seq=*/4));
queue.Push(now,
CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/5));
queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/6));
// Packets should be returned in high to low order. // Packets should be returned in high to low order.
EXPECT_EQ(queue.Pop()->SequenceNumber(), 5); EXPECT_EQ(queue.Pop()->SequenceNumber(), 6);
// Audio and video retransmission has same prio, but video was enqueued first.
EXPECT_EQ(queue.Pop()->SequenceNumber(), 4); EXPECT_EQ(queue.Pop()->SequenceNumber(), 4);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 5);
// Video and FEC prioritized equally - but video was enqueued first. // Video and FEC prioritized equally - but video was enqueued first.
EXPECT_EQ(queue.Pop()->SequenceNumber(), 2); EXPECT_EQ(queue.Pop()->SequenceNumber(), 2);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 3); EXPECT_EQ(queue.Pop()->SequenceNumber(), 3);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 1); EXPECT_EQ(queue.Pop()->SequenceNumber(), 1);
} }
TEST(PrioritizedPacketQueue,
PrioritizeAudioRetransmissionBeforeVideoRetransmissionIfConfigured) {
Timestamp now = Timestamp::Zero();
PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true);
// Add packets in low to high packet order.
queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3));
queue.Push(now,
CreateRetransmissionPacket(RtpPacketMediaType::kVideo, /*seq=*/4));
queue.Push(now,
CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/5));
queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/6));
// Packets should be returned in high to low order.
EXPECT_EQ(queue.Pop()->SequenceNumber(), 6);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 5);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 4);
}
TEST(PrioritizedPacketQueue, ReturnsEqualPrioPacketsInRoundRobinOrder) { TEST(PrioritizedPacketQueue, ReturnsEqualPrioPacketsInRoundRobinOrder) {
Timestamp now = Timestamp::Zero(); Timestamp now = Timestamp::Zero();
PrioritizedPacketQueue queue(now); PrioritizedPacketQueue queue(now);
@ -251,6 +297,26 @@ TEST(PrioritizedPacketQueue, ReportsLeadingPacketEnqueueTime) {
Timestamp::MinusInfinity()); Timestamp::MinusInfinity());
} }
TEST(PrioritizedPacketQueue, ReportsLeadingPacketEnqueueTimeForRetransmission) {
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero(),
/*prioritize_audio_retransmission=*/true);
EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(),
Timestamp::PlusInfinity());
queue.Push(Timestamp::Millis(10),
CreateRetransmissionPacket(RtpPacketMediaType::kVideo, /*seq=*/1));
queue.Push(Timestamp::Millis(11),
CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/2));
EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(),
Timestamp::Millis(10));
queue.Pop(); // Pop audio retransmission since it has higher prio.
EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(),
Timestamp::Millis(10));
queue.Pop(); // Pop video retransmission.
EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(),
Timestamp::PlusInfinity());
}
TEST(PrioritizedPacketQueue, TEST(PrioritizedPacketQueue,
PushAndPopUpdatesSizeInPacketsPerRtpPacketMediaType) { PushAndPopUpdatesSizeInPacketsPerRtpPacketMediaType) {
Timestamp now = Timestamp::Zero(); Timestamp now = Timestamp::Zero();
@ -272,7 +338,7 @@ TEST(PrioritizedPacketQueue,
RtpPacketMediaType::kVideo)], RtpPacketMediaType::kVideo)],
1); 1);
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, 3)); queue.Push(now, CreateRetransmissionPacket(RtpPacketMediaType::kVideo, 3));
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>( EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>(
RtpPacketMediaType::kRetransmission)], RtpPacketMediaType::kRetransmission)],
1); 1);
@ -326,6 +392,8 @@ TEST(PrioritizedPacketQueue, ClearsPackets) {
// Remove all of them. // Remove all of them.
queue.RemovePacketsForSsrc(kSsrc); queue.RemovePacketsForSsrc(kSsrc);
EXPECT_TRUE(queue.Empty()); EXPECT_TRUE(queue.Empty());
queue.RemovePacketsForSsrc(kSsrc);
EXPECT_TRUE(queue.Empty());
} }
TEST(PrioritizedPacketQueue, ClearPacketsAffectsOnlySpecifiedSsrc) { TEST(PrioritizedPacketQueue, ClearPacketsAffectsOnlySpecifiedSsrc) {
@ -338,16 +406,16 @@ TEST(PrioritizedPacketQueue, ClearPacketsAffectsOnlySpecifiedSsrc) {
// ensuring they are first in line. // ensuring they are first in line.
queue.Push( queue.Push(
now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/1, kRemovingSsrc)); now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/1, kRemovingSsrc));
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/2, queue.Push(now, CreateRetransmissionPacket(RtpPacketMediaType::kVideo,
kRemovingSsrc)); /*seq=*/2, kRemovingSsrc));
// Add a video packet and a retransmission for the SSRC that will remain. // Add a video packet and a retransmission for the SSRC that will remain.
// The retransmission packets now both have pointers to their respective qeues // The retransmission packets now both have pointers to their respective qeues
// from the same prio level. // from the same prio level.
queue.Push(now, queue.Push(now,
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3, kStayingSsrc)); CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3, kStayingSsrc));
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/4, queue.Push(now, CreateRetransmissionPacket(RtpPacketMediaType::kVideo,
kStayingSsrc)); /*seq=*/4, kStayingSsrc));
EXPECT_EQ(queue.SizeInPackets(), 4); EXPECT_EQ(queue.SizeInPackets(), 4);
@ -413,4 +481,64 @@ TEST(PrioritizedPacketQueue, ReportsKeyframePackets) {
EXPECT_FALSE(queue.HasKeyframePackets(kVideoSsrc2)); EXPECT_FALSE(queue.HasKeyframePackets(kVideoSsrc2));
} }
TEST(PrioritizedPacketQueue, PacketsDroppedIfNotPulledWithinTttl) {
Timestamp now = Timestamp::Zero();
PacketQueueTTL ttls;
ttls.audio_retransmission = TimeDelta::Millis(200);
PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true,
ttls);
queue.Push(now,
CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/1));
now += ttls.audio_retransmission + TimeDelta::Millis(1);
EXPECT_EQ(queue.SizeInPackets(), 1);
queue.Push(now,
CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/2));
EXPECT_EQ(queue.SizeInPackets(), 1);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 2);
}
TEST(PrioritizedPacketQueue, DontSendPacketsAfterTttl) {
Timestamp now = Timestamp::Zero();
PacketQueueTTL ttls;
ttls.audio_retransmission = TimeDelta::Millis(200);
PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true,
ttls);
queue.Push(now,
CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/1));
now += ttls.audio_retransmission + TimeDelta::Millis(1);
EXPECT_EQ(queue.SizeInPackets(), 1);
queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2));
queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/3));
// Expect the old packet to have been removed since it was not popped in time.
EXPECT_EQ(queue.SizeInPackets(), 3);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 3);
EXPECT_EQ(queue.SizeInPackets(), 1);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 2);
EXPECT_EQ(queue.SizeInPackets(), 0);
}
TEST(PrioritizedPacketQueue,
SendsPacketsAfterTttlIfPrioHigherThanPushedPackets) {
Timestamp now = Timestamp::Zero();
PacketQueueTTL ttls;
ttls.audio_retransmission = TimeDelta::Millis(200);
PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true,
ttls);
queue.Push(now,
CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/1));
now += ttls.audio_retransmission + TimeDelta::Millis(1);
EXPECT_EQ(queue.SizeInPackets(), 1);
queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2));
// This test just show that TTL is not enforced strictly. If a new audio
// packet had been queued before a packet was popped, the audio retransmission
// packet would have been dropped.
EXPECT_EQ(queue.SizeInPackets(), 2);
EXPECT_EQ(queue.Pop()->SequenceNumber(), 1);
EXPECT_EQ(queue.SizeInPackets(), 1);
}
} // namespace webrtc } // namespace webrtc

View File

@ -12,6 +12,8 @@
#include <cstdint> #include <cstdint>
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc { namespace webrtc {
RtpPacketToSend::RtpPacketToSend(const ExtensionManager* extensions) RtpPacketToSend::RtpPacketToSend(const ExtensionManager* extensions)
@ -28,4 +30,13 @@ RtpPacketToSend& RtpPacketToSend::operator=(RtpPacketToSend&& packet) = default;
RtpPacketToSend::~RtpPacketToSend() = default; RtpPacketToSend::~RtpPacketToSend() = default;
void RtpPacketToSend::set_packet_type(RtpPacketMediaType type) {
if (packet_type_ == RtpPacketMediaType::kAudio) {
original_packet_type_ = OriginalType::kAudio;
} else if (packet_type_ == RtpPacketMediaType::kVideo) {
original_packet_type_ = OriginalType::kVideo;
}
packet_type_ = type;
}
} // namespace webrtc } // namespace webrtc

View File

@ -49,11 +49,18 @@ class RtpPacketToSend : public RtpPacket {
webrtc::Timestamp capture_time() const { return capture_time_; } webrtc::Timestamp capture_time() const { return capture_time_; }
void set_capture_time(webrtc::Timestamp time) { capture_time_ = time; } void set_capture_time(webrtc::Timestamp time) { capture_time_ = time; }
void set_packet_type(RtpPacketMediaType type) { packet_type_ = type; } void set_packet_type(RtpPacketMediaType type);
absl::optional<RtpPacketMediaType> packet_type() const { absl::optional<RtpPacketMediaType> packet_type() const {
return packet_type_; return packet_type_;
} }
enum class OriginalType { kAudio, kVideo };
// Original type does not change if packet type is changed to kRetransmission.
absl::optional<OriginalType> original_packet_type() const {
return original_packet_type_;
}
// If this is a retransmission, indicates the sequence number of the original // If this is a retransmission, indicates the sequence number of the original
// media packet that this packet represents. If RTX is used this will likely // media packet that this packet represents. If RTX is used this will likely
// be different from SequenceNumber(). // be different from SequenceNumber().
@ -133,6 +140,7 @@ class RtpPacketToSend : public RtpPacket {
private: private:
webrtc::Timestamp capture_time_ = webrtc::Timestamp::Zero(); webrtc::Timestamp capture_time_ = webrtc::Timestamp::Zero();
absl::optional<RtpPacketMediaType> packet_type_; absl::optional<RtpPacketMediaType> packet_type_;
absl::optional<OriginalType> original_packet_type_;
bool allow_retransmission_ = false; bool allow_retransmission_ = false;
absl::optional<uint16_t> retransmitted_sequence_number_; absl::optional<uint16_t> retransmitted_sequence_number_;
rtc::scoped_refptr<rtc::RefCountedBase> additional_data_; rtc::scoped_refptr<rtc::RefCountedBase> additional_data_;