Add new prioritized packet queue.
This queue is a more strict round robing queue, unlike the class named RoundRobinPacketQueue. That is, we don't have the same logic to prioritize lower-bitrate streams. The queue time mechanism is essentially directly copied from the previous implementation however. Bug: webrtc:11340 Change-Id: Ie38ba8ce27c985f5f1e907cec068d6a365089bcc Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/260562 Reviewed-by: Danil Chapovalov <danilchap@webrtc.org> Commit-Queue: Erik Språng <sprang@webrtc.org> Cr-Commit-Position: refs/heads/main@{#36737}
This commit is contained in:
parent
869c87a2b9
commit
b73c058702
@ -23,6 +23,8 @@ rtc_library("pacing") {
|
|||||||
"pacing_controller.h",
|
"pacing_controller.h",
|
||||||
"packet_router.cc",
|
"packet_router.cc",
|
||||||
"packet_router.h",
|
"packet_router.h",
|
||||||
|
"prioritized_packet_queue.cc",
|
||||||
|
"prioritized_packet_queue.h",
|
||||||
"round_robin_packet_queue.cc",
|
"round_robin_packet_queue.cc",
|
||||||
"round_robin_packet_queue.h",
|
"round_robin_packet_queue.h",
|
||||||
"rtp_packet_pacer.h",
|
"rtp_packet_pacer.h",
|
||||||
@ -93,6 +95,7 @@ if (rtc_include_tests) {
|
|||||||
"paced_sender_unittest.cc",
|
"paced_sender_unittest.cc",
|
||||||
"pacing_controller_unittest.cc",
|
"pacing_controller_unittest.cc",
|
||||||
"packet_router_unittest.cc",
|
"packet_router_unittest.cc",
|
||||||
|
"prioritized_packet_queue_unittest.cc",
|
||||||
"task_queue_paced_sender_unittest.cc",
|
"task_queue_paced_sender_unittest.cc",
|
||||||
]
|
]
|
||||||
deps = [
|
deps = [
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
#include "absl/strings/match.h"
|
#include "absl/strings/match.h"
|
||||||
#include "modules/pacing/bitrate_prober.h"
|
#include "modules/pacing/bitrate_prober.h"
|
||||||
#include "modules/pacing/interval_budget.h"
|
#include "modules/pacing/interval_budget.h"
|
||||||
|
#include "modules/pacing/prioritized_packet_queue.h"
|
||||||
#include "modules/pacing/round_robin_packet_queue.h"
|
#include "modules/pacing/round_robin_packet_queue.h"
|
||||||
#include "rtc_base/checks.h"
|
#include "rtc_base/checks.h"
|
||||||
#include "rtc_base/experiments/field_trial_parser.h"
|
#include "rtc_base/experiments/field_trial_parser.h"
|
||||||
@ -55,6 +56,15 @@ TimeDelta GetDynamicPaddingTarget(const FieldTrialsView& field_trials) {
|
|||||||
return padding_target.Get();
|
return padding_target.Get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<PacingController::PacketQueue> CreatePacketQueue(
|
||||||
|
const FieldTrialsView& field_trials,
|
||||||
|
Timestamp creation_time) {
|
||||||
|
if (field_trials.IsEnabled("WebRTC-Pacer-UsePrioritizedPacketQueue")) {
|
||||||
|
return std::make_unique<PrioritizedPacketQueue>(creation_time);
|
||||||
|
}
|
||||||
|
return std::make_unique<RoundRobinPacketQueue>(creation_time);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
const TimeDelta PacingController::kMaxExpectedQueueLength =
|
const TimeDelta PacingController::kMaxExpectedQueueLength =
|
||||||
@ -98,8 +108,7 @@ PacingController::PacingController(Clock* clock,
|
|||||||
last_process_time_(clock->CurrentTime()),
|
last_process_time_(clock->CurrentTime()),
|
||||||
last_send_time_(last_process_time_),
|
last_send_time_(last_process_time_),
|
||||||
seen_first_packet_(false),
|
seen_first_packet_(false),
|
||||||
packet_queue_(
|
packet_queue_(CreatePacketQueue(field_trials_, last_process_time_)),
|
||||||
std::make_unique<RoundRobinPacketQueue>(last_process_time_)),
|
|
||||||
congested_(false),
|
congested_(false),
|
||||||
queue_time_limit_(kMaxExpectedQueueLength),
|
queue_time_limit_(kMaxExpectedQueueLength),
|
||||||
account_for_audio_(false),
|
account_for_audio_(false),
|
||||||
@ -237,7 +246,7 @@ TimeDelta PacingController::ExpectedQueueTime() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
size_t PacingController::QueueSizePackets() const {
|
size_t PacingController::QueueSizePackets() const {
|
||||||
return packet_queue_->SizeInPackets();
|
return rtc::checked_cast<size_t>(packet_queue_->SizeInPackets());
|
||||||
}
|
}
|
||||||
|
|
||||||
DataSize PacingController::QueueSizeData() const {
|
DataSize PacingController::QueueSizeData() const {
|
||||||
|
|||||||
@ -71,7 +71,7 @@ class PacingController {
|
|||||||
std::unique_ptr<RtpPacketToSend> packet) = 0;
|
std::unique_ptr<RtpPacketToSend> packet) = 0;
|
||||||
virtual std::unique_ptr<RtpPacketToSend> Pop() = 0;
|
virtual std::unique_ptr<RtpPacketToSend> Pop() = 0;
|
||||||
|
|
||||||
virtual size_t SizeInPackets() const = 0;
|
virtual int SizeInPackets() const = 0;
|
||||||
bool Empty() const { return SizeInPackets() == 0; }
|
bool Empty() const { return SizeInPackets() == 0; }
|
||||||
virtual DataSize SizeInPayloadBytes() const = 0;
|
virtual DataSize SizeInPayloadBytes() const = 0;
|
||||||
|
|
||||||
@ -88,6 +88,7 @@ class PacingController {
|
|||||||
// Average queue time for the packets currently in the queue.
|
// Average queue time for the packets currently in the queue.
|
||||||
// The queuing time is calculated from Push() to the last UpdateQueueTime()
|
// The queuing time is calculated from Push() to the last UpdateQueueTime()
|
||||||
// call - with any time spent in a paused state subtracted.
|
// call - with any time spent in a paused state subtracted.
|
||||||
|
// Returns TimeDelta::Zero() for an empty queue.
|
||||||
virtual TimeDelta AverageQueueTime() const = 0;
|
virtual TimeDelta AverageQueueTime() const = 0;
|
||||||
|
|
||||||
// Called during packet processing or when pause stats changes. Since the
|
// Called during packet processing or when pause stats changes. Since the
|
||||||
|
|||||||
253
modules/pacing/prioritized_packet_queue.cc
Normal file
253
modules/pacing/prioritized_packet_queue.cc
Normal file
@ -0,0 +1,253 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "modules/pacing/prioritized_packet_queue.h"
|
||||||
|
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
||||||
|
#include "rtc_base/checks.h"
|
||||||
|
|
||||||
|
namespace webrtc {
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
constexpr int kAudioPrioLevel = 0;
|
||||||
|
|
||||||
|
int GetPriorityForType(RtpPacketMediaType type) {
|
||||||
|
// Lower number takes priority over higher.
|
||||||
|
switch (type) {
|
||||||
|
case RtpPacketMediaType::kAudio:
|
||||||
|
// Audio is always prioritized over other packet types.
|
||||||
|
return kAudioPrioLevel;
|
||||||
|
case RtpPacketMediaType::kRetransmission:
|
||||||
|
// Send retransmissions before new media.
|
||||||
|
return kAudioPrioLevel + 1;
|
||||||
|
case RtpPacketMediaType::kVideo:
|
||||||
|
case RtpPacketMediaType::kForwardErrorCorrection:
|
||||||
|
// Video has "normal" priority, in the old speak.
|
||||||
|
// Send redundancy concurrently to video. If it is delayed it might have a
|
||||||
|
// lower chance of being useful.
|
||||||
|
return kAudioPrioLevel + 2;
|
||||||
|
case RtpPacketMediaType::kPadding:
|
||||||
|
// Packets that are in themselves likely useless, only sent to keep the
|
||||||
|
// BWE high.
|
||||||
|
return kAudioPrioLevel + 3;
|
||||||
|
}
|
||||||
|
RTC_CHECK_NOTREACHED();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const {
|
||||||
|
return DataSize::Bytes(packet->payload_size() + packet->padding_size());
|
||||||
|
}
|
||||||
|
|
||||||
|
PrioritizedPacketQueue::StreamQueue::StreamQueue(Timestamp creation_time)
|
||||||
|
: last_enqueue_time_(creation_time) {}
|
||||||
|
|
||||||
|
bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet,
|
||||||
|
int priority_level) {
|
||||||
|
bool first_packet_at_level = packets_[priority_level].empty();
|
||||||
|
packets_[priority_level].push_back(std::move(packet));
|
||||||
|
return first_packet_at_level;
|
||||||
|
}
|
||||||
|
|
||||||
|
PrioritizedPacketQueue::QueuedPacket
|
||||||
|
PrioritizedPacketQueue::StreamQueue::DequePacket(int priority_level) {
|
||||||
|
RTC_DCHECK(!packets_[priority_level].empty());
|
||||||
|
QueuedPacket packet = std::move(packets_[priority_level].front());
|
||||||
|
packets_[priority_level].pop_front();
|
||||||
|
return packet;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PrioritizedPacketQueue::StreamQueue::HasPacketsAtPrio(
|
||||||
|
int priority_level) const {
|
||||||
|
return !packets_[priority_level].empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PrioritizedPacketQueue::StreamQueue::IsEmpty() const {
|
||||||
|
for (const std::deque<QueuedPacket>& queue : packets_) {
|
||||||
|
if (!queue.empty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Timestamp PrioritizedPacketQueue::StreamQueue::LeadingAudioPacketEnqueueTime()
|
||||||
|
const {
|
||||||
|
RTC_DCHECK(!packets_[kAudioPrioLevel].empty());
|
||||||
|
return packets_[kAudioPrioLevel].begin()->enqueue_time;
|
||||||
|
}
|
||||||
|
|
||||||
|
Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const {
|
||||||
|
return last_enqueue_time_;
|
||||||
|
}
|
||||||
|
|
||||||
|
PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time)
|
||||||
|
: queue_time_sum_(TimeDelta::Zero()),
|
||||||
|
pause_time_sum_(TimeDelta::Zero()),
|
||||||
|
size_packets_(0),
|
||||||
|
size_payload_(DataSize::Zero()),
|
||||||
|
last_update_time_(creation_time),
|
||||||
|
paused_(false),
|
||||||
|
last_culling_time_(creation_time),
|
||||||
|
top_active_prio_level_(-1) {}
|
||||||
|
|
||||||
|
void PrioritizedPacketQueue::Push(Timestamp enqueue_time,
|
||||||
|
std::unique_ptr<RtpPacketToSend> packet) {
|
||||||
|
StreamQueue* stream_queue;
|
||||||
|
auto [it, inserted] = streams_.emplace(packet->Ssrc(), nullptr);
|
||||||
|
if (inserted) {
|
||||||
|
it->second = std::make_unique<StreamQueue>(enqueue_time);
|
||||||
|
}
|
||||||
|
stream_queue = it->second.get();
|
||||||
|
|
||||||
|
auto enqueue_time_iterator =
|
||||||
|
enqueue_times_.insert(enqueue_times_.end(), enqueue_time);
|
||||||
|
int prio_level = GetPriorityForType(*packet->packet_type());
|
||||||
|
RTC_DCHECK_GE(prio_level, 0);
|
||||||
|
RTC_DCHECK_LT(prio_level, kNumPriorityLevels);
|
||||||
|
QueuedPacket queued_packed = {.packet = std::move(packet),
|
||||||
|
.enqueue_time = enqueue_time,
|
||||||
|
.enqueue_time_iterator = enqueue_time_iterator};
|
||||||
|
// In order to figure out how much time a packet has spent in the queue
|
||||||
|
// while not in a paused state, we subtract the total amount of time the
|
||||||
|
// queue has been paused so far, and when the packet is popped we subtract
|
||||||
|
// the total amount of time the queue has been paused at that moment. This
|
||||||
|
// way we subtract the total amount of time the packet has spent in the
|
||||||
|
// queue while in a paused state.
|
||||||
|
UpdateAverageQueueTime(enqueue_time);
|
||||||
|
queued_packed.enqueue_time -= pause_time_sum_;
|
||||||
|
++size_packets_;
|
||||||
|
size_payload_ += queued_packed.PacketSize();
|
||||||
|
|
||||||
|
if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) {
|
||||||
|
// Number packets at `prio_level` for this steam is now non-zero.
|
||||||
|
streams_by_prio_[prio_level].push_back(stream_queue);
|
||||||
|
}
|
||||||
|
if (top_active_prio_level_ < 0 || prio_level < top_active_prio_level_) {
|
||||||
|
top_active_prio_level_ = prio_level;
|
||||||
|
}
|
||||||
|
|
||||||
|
static constexpr TimeDelta kTimeout = TimeDelta::Millis(500);
|
||||||
|
if (enqueue_time - last_culling_time_ > kTimeout) {
|
||||||
|
for (auto it = streams_.begin(); it != streams_.end();) {
|
||||||
|
if (it->second->IsEmpty() &&
|
||||||
|
it->second->LastEnqueueTime() + kTimeout < enqueue_time) {
|
||||||
|
streams_.erase(it++);
|
||||||
|
} else {
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
last_culling_time_ = enqueue_time;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {
|
||||||
|
if (size_packets_ == 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
RTC_DCHECK_GE(top_active_prio_level_, 0);
|
||||||
|
StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
|
||||||
|
QueuedPacket packet = stream_queue.DequePacket(top_active_prio_level_);
|
||||||
|
--size_packets_;
|
||||||
|
size_payload_ -= packet.PacketSize();
|
||||||
|
|
||||||
|
// Calculate the total amount of time spent by this packet in the queue
|
||||||
|
// while in a non-paused state. Note that the `pause_time_sum_ms_` was
|
||||||
|
// subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
|
||||||
|
// by subtracting it now we effectively remove the time spent in in the
|
||||||
|
// queue while in a paused state.
|
||||||
|
TimeDelta time_in_non_paused_state =
|
||||||
|
last_update_time_ - packet.enqueue_time - pause_time_sum_;
|
||||||
|
queue_time_sum_ -= time_in_non_paused_state;
|
||||||
|
|
||||||
|
RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
|
||||||
|
|
||||||
|
RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
|
||||||
|
enqueue_times_.erase(packet.enqueue_time_iterator);
|
||||||
|
|
||||||
|
// Remove StreamQueue from head of fifo-queue for this prio level, and
|
||||||
|
// and add it to the end if it still has packets.
|
||||||
|
streams_by_prio_[top_active_prio_level_].pop_front();
|
||||||
|
if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
|
||||||
|
streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
|
||||||
|
} else if (streams_by_prio_[top_active_prio_level_].empty()) {
|
||||||
|
// No stream queues have packets at this prio level, find top priority
|
||||||
|
// that is not empty.
|
||||||
|
if (size_packets_ == 0) {
|
||||||
|
top_active_prio_level_ = -1;
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < kNumPriorityLevels; ++i) {
|
||||||
|
if (!streams_by_prio_[i].empty()) {
|
||||||
|
top_active_prio_level_ = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::move(packet.packet);
|
||||||
|
}
|
||||||
|
|
||||||
|
int PrioritizedPacketQueue::SizeInPackets() const {
|
||||||
|
return size_packets_;
|
||||||
|
}
|
||||||
|
|
||||||
|
DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const {
|
||||||
|
return size_payload_;
|
||||||
|
}
|
||||||
|
|
||||||
|
Timestamp PrioritizedPacketQueue::LeadingAudioPacketEnqueueTime() const {
|
||||||
|
if (streams_by_prio_[kAudioPrioLevel].empty()) {
|
||||||
|
return Timestamp::MinusInfinity();
|
||||||
|
}
|
||||||
|
return streams_by_prio_[kAudioPrioLevel]
|
||||||
|
.front()
|
||||||
|
->LeadingAudioPacketEnqueueTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const {
|
||||||
|
return enqueue_times_.empty() ? Timestamp::MinusInfinity()
|
||||||
|
: enqueue_times_.front();
|
||||||
|
}
|
||||||
|
|
||||||
|
TimeDelta PrioritizedPacketQueue::AverageQueueTime() const {
|
||||||
|
if (size_packets_ == 0) {
|
||||||
|
return TimeDelta::Zero();
|
||||||
|
}
|
||||||
|
return queue_time_sum_ / size_packets_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PrioritizedPacketQueue::UpdateAverageQueueTime(Timestamp now) {
|
||||||
|
RTC_CHECK_GE(now, last_update_time_);
|
||||||
|
if (now == last_update_time_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TimeDelta delta = now - last_update_time_;
|
||||||
|
|
||||||
|
if (paused_) {
|
||||||
|
pause_time_sum_ += delta;
|
||||||
|
} else {
|
||||||
|
queue_time_sum_ += delta * size_packets_;
|
||||||
|
}
|
||||||
|
|
||||||
|
last_update_time_ = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) {
|
||||||
|
UpdateAverageQueueTime(now);
|
||||||
|
paused_ = paused;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace webrtc
|
||||||
124
modules/pacing/prioritized_packet_queue.h
Normal file
124
modules/pacing/prioritized_packet_queue.h
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_
|
||||||
|
#define MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
|
#include <deque>
|
||||||
|
#include <list>
|
||||||
|
#include <memory>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
|
#include "api/units/data_size.h"
|
||||||
|
#include "api/units/time_delta.h"
|
||||||
|
#include "api/units/timestamp.h"
|
||||||
|
#include "modules/pacing/pacing_controller.h"
|
||||||
|
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
|
||||||
|
|
||||||
|
namespace webrtc {
|
||||||
|
|
||||||
|
class PrioritizedPacketQueue : public PacingController::PacketQueue {
|
||||||
|
public:
|
||||||
|
explicit PrioritizedPacketQueue(Timestamp creation_time);
|
||||||
|
PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete;
|
||||||
|
PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete;
|
||||||
|
|
||||||
|
void Push(Timestamp enqueue_time,
|
||||||
|
std::unique_ptr<RtpPacketToSend> packet) override;
|
||||||
|
std::unique_ptr<RtpPacketToSend> Pop() override;
|
||||||
|
int SizeInPackets() const override;
|
||||||
|
DataSize SizeInPayloadBytes() const override;
|
||||||
|
Timestamp LeadingAudioPacketEnqueueTime() const override;
|
||||||
|
Timestamp OldestEnqueueTime() const override;
|
||||||
|
TimeDelta AverageQueueTime() const override;
|
||||||
|
void UpdateAverageQueueTime(Timestamp now) override;
|
||||||
|
void SetPauseState(bool paused, Timestamp now) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
static constexpr int kNumPriorityLevels = 4;
|
||||||
|
|
||||||
|
class QueuedPacket {
|
||||||
|
public:
|
||||||
|
QueuedPacket(QueuedPacket&&) = default;
|
||||||
|
QueuedPacket& operator=(QueuedPacket&&) = default;
|
||||||
|
|
||||||
|
QueuedPacket(const QueuedPacket&) = delete;
|
||||||
|
QueuedPacket& operator=(const QueuedPacket&) = delete;
|
||||||
|
|
||||||
|
DataSize PacketSize() const;
|
||||||
|
|
||||||
|
std::unique_ptr<RtpPacketToSend> packet;
|
||||||
|
Timestamp enqueue_time;
|
||||||
|
std::list<Timestamp>::iterator enqueue_time_iterator;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Class containing packets for an RTP stream.
|
||||||
|
// For each priority level, packets are simply stored in a fifo queue.
|
||||||
|
class StreamQueue {
|
||||||
|
public:
|
||||||
|
explicit StreamQueue(Timestamp creation_time);
|
||||||
|
StreamQueue(StreamQueue&&) = default;
|
||||||
|
StreamQueue& operator=(StreamQueue&&) = default;
|
||||||
|
|
||||||
|
StreamQueue(const StreamQueue&) = delete;
|
||||||
|
StreamQueue& operator=(const StreamQueue&) = delete;
|
||||||
|
|
||||||
|
// Enqueue packet at the given priority level. Returns true if the packet
|
||||||
|
// count for that priority level went from zero to non-zero.
|
||||||
|
bool EnqueuePacket(QueuedPacket packet, int priority_level);
|
||||||
|
|
||||||
|
QueuedPacket DequePacket(int priority_level);
|
||||||
|
|
||||||
|
bool HasPacketsAtPrio(int priority_level) const;
|
||||||
|
bool IsEmpty() const;
|
||||||
|
Timestamp LeadingAudioPacketEnqueueTime() const;
|
||||||
|
Timestamp LastEnqueueTime() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::deque<QueuedPacket> packets_[kNumPriorityLevels];
|
||||||
|
Timestamp last_enqueue_time_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Cumulative sum, over all packets, of time spent in the queue.
|
||||||
|
TimeDelta queue_time_sum_;
|
||||||
|
// Cumulative sum of time the queue has spent in a paused state.
|
||||||
|
TimeDelta pause_time_sum_;
|
||||||
|
// Total number of packets stored in this queue.
|
||||||
|
int size_packets_;
|
||||||
|
// Sum of payload sizes for all packts stored in this queue.
|
||||||
|
DataSize size_payload_;
|
||||||
|
// The last time queue/pause time sums were updated.
|
||||||
|
Timestamp last_update_time_;
|
||||||
|
bool paused_;
|
||||||
|
|
||||||
|
// Last time `streams_` was culled for inactive streams.
|
||||||
|
Timestamp last_culling_time_;
|
||||||
|
|
||||||
|
// Map from SSRC to packet queues for the associated RTP stream.
|
||||||
|
std::unordered_map<uint32_t, std::unique_ptr<StreamQueue>> streams_;
|
||||||
|
|
||||||
|
// For each priority level, a queue of StreamQueues which have at least one
|
||||||
|
// packet pending for that prio level.
|
||||||
|
std::deque<StreamQueue*> streams_by_prio_[kNumPriorityLevels];
|
||||||
|
|
||||||
|
// The first index into `stream_by_prio_` that is non-empty.
|
||||||
|
int top_active_prio_level_;
|
||||||
|
|
||||||
|
// Ordered list of enqueue times. Additions are always increasing and added to
|
||||||
|
// the end. QueuedPacket instances have a iterators into this list for fast
|
||||||
|
// removal.
|
||||||
|
std::list<Timestamp> enqueue_times_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace webrtc
|
||||||
|
|
||||||
|
#endif // MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_
|
||||||
233
modules/pacing/prioritized_packet_queue_unittest.cc
Normal file
233
modules/pacing/prioritized_packet_queue_unittest.cc
Normal file
@ -0,0 +1,233 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "modules/pacing/prioritized_packet_queue.h"
|
||||||
|
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include "api/units/time_delta.h"
|
||||||
|
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
|
||||||
|
#include "rtc_base/checks.h"
|
||||||
|
#include "test/gmock.h"
|
||||||
|
#include "test/gtest.h"
|
||||||
|
|
||||||
|
namespace webrtc {
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
constexpr uint32_t kDefaultSsrc = 123;
|
||||||
|
constexpr int kDefaultPayloadSize = 789;
|
||||||
|
|
||||||
|
std::unique_ptr<RtpPacketToSend> CreatePacket(RtpPacketMediaType type,
|
||||||
|
uint16_t sequence_number,
|
||||||
|
uint32_t ssrc = kDefaultSsrc) {
|
||||||
|
auto packet = std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
|
||||||
|
packet->set_packet_type(type);
|
||||||
|
packet->SetSsrc(ssrc);
|
||||||
|
packet->SetSequenceNumber(sequence_number);
|
||||||
|
packet->SetPayloadSize(kDefaultPayloadSize);
|
||||||
|
return packet;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReturnsPacketsInPrioritizedOrder) {
|
||||||
|
Timestamp now = Timestamp::Zero();
|
||||||
|
PrioritizedPacketQueue queue(now);
|
||||||
|
|
||||||
|
// Add packets in low to high packet order.
|
||||||
|
queue.Push(now, CreatePacket(RtpPacketMediaType::kPadding, /*seq=*/1));
|
||||||
|
queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2));
|
||||||
|
queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection,
|
||||||
|
/*seq=*/3));
|
||||||
|
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/4));
|
||||||
|
queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/5));
|
||||||
|
|
||||||
|
// Packets should be returned in high to low order.
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 5);
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 4);
|
||||||
|
// Video and FEC prioritized equally - but video was enqueued first.
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 2);
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 3);
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReturnsEqualPrioPacketsInRoundRobinOrder) {
|
||||||
|
Timestamp now = Timestamp::Zero();
|
||||||
|
PrioritizedPacketQueue queue(now);
|
||||||
|
|
||||||
|
// Insert video packets (prioritized equally), simulating a simulcast-type use
|
||||||
|
// case.
|
||||||
|
queue.Push(now,
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/1, /*ssrc=*/100));
|
||||||
|
|
||||||
|
queue.Push(now,
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2, /*ssrc=*/101));
|
||||||
|
queue.Push(now,
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3, /*ssrc=*/101));
|
||||||
|
|
||||||
|
queue.Push(now,
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/4, /*ssrc=*/102));
|
||||||
|
queue.Push(now,
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/5, /*ssrc=*/102));
|
||||||
|
queue.Push(now,
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/6, /*ssrc=*/102));
|
||||||
|
queue.Push(now,
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/7, /*ssrc=*/102));
|
||||||
|
|
||||||
|
// First packet from each SSRC.
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 1);
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 2);
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 4);
|
||||||
|
|
||||||
|
// Second packets from streams that have packets left.
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 3);
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 5);
|
||||||
|
|
||||||
|
// Only packets from last stream remaining.
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 6);
|
||||||
|
EXPECT_EQ(queue.Pop()->SequenceNumber(), 7);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReportsSizeInPackets) {
|
||||||
|
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero());
|
||||||
|
EXPECT_EQ(queue.SizeInPackets(), 0);
|
||||||
|
|
||||||
|
queue.Push(/*enqueue_time=*/Timestamp::Zero(),
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo,
|
||||||
|
/*seq_no=*/1));
|
||||||
|
EXPECT_EQ(queue.SizeInPackets(), 1);
|
||||||
|
|
||||||
|
queue.Pop();
|
||||||
|
EXPECT_EQ(queue.SizeInPackets(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReportsPayloadSize) {
|
||||||
|
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero());
|
||||||
|
EXPECT_EQ(queue.SizeInPayloadBytes(), DataSize::Zero());
|
||||||
|
|
||||||
|
queue.Push(/*enqueue_time=*/Timestamp::Zero(),
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo,
|
||||||
|
/*seq_no=*/1));
|
||||||
|
EXPECT_EQ(queue.SizeInPayloadBytes(), DataSize::Bytes(kDefaultPayloadSize));
|
||||||
|
|
||||||
|
queue.Pop();
|
||||||
|
EXPECT_EQ(queue.SizeInPayloadBytes(), DataSize::Zero());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReportsPaddingSize) {
|
||||||
|
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero());
|
||||||
|
EXPECT_EQ(queue.SizeInPayloadBytes(), DataSize::Zero());
|
||||||
|
static constexpr DataSize kPaddingSize = DataSize::Bytes(190);
|
||||||
|
|
||||||
|
auto packet = std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
|
||||||
|
packet->set_packet_type(RtpPacketMediaType::kPadding);
|
||||||
|
packet->SetSsrc(kDefaultSsrc);
|
||||||
|
packet->SetSequenceNumber(/*seq=*/1);
|
||||||
|
packet->SetPadding(kPaddingSize.bytes());
|
||||||
|
queue.Push(/*enqueue_time=*/Timestamp::Zero(), std::move(packet));
|
||||||
|
EXPECT_EQ(queue.SizeInPayloadBytes(), kPaddingSize);
|
||||||
|
|
||||||
|
queue.Pop();
|
||||||
|
EXPECT_EQ(queue.SizeInPayloadBytes(), DataSize::Zero());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReportsOldestEnqueueTime) {
|
||||||
|
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero());
|
||||||
|
EXPECT_EQ(queue.OldestEnqueueTime(), Timestamp::MinusInfinity());
|
||||||
|
|
||||||
|
// Add three packets, with the middle packet having higher prio.
|
||||||
|
queue.Push(Timestamp::Millis(10),
|
||||||
|
CreatePacket(RtpPacketMediaType::kPadding, /*seq=*/1));
|
||||||
|
queue.Push(Timestamp::Millis(20),
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2));
|
||||||
|
queue.Push(Timestamp::Millis(30),
|
||||||
|
CreatePacket(RtpPacketMediaType::kPadding, /*seq=*/3));
|
||||||
|
EXPECT_EQ(queue.OldestEnqueueTime(), Timestamp::Millis(10));
|
||||||
|
|
||||||
|
queue.Pop(); // Pop packet with enqueue time 20.
|
||||||
|
EXPECT_EQ(queue.OldestEnqueueTime(), Timestamp::Millis(10));
|
||||||
|
|
||||||
|
queue.Pop(); // Pop packet with enqueue time 10.
|
||||||
|
EXPECT_EQ(queue.OldestEnqueueTime(), Timestamp::Millis(30));
|
||||||
|
|
||||||
|
queue.Pop(); // Pop packet with enqueue time 30, queue empty again.
|
||||||
|
EXPECT_EQ(queue.OldestEnqueueTime(), Timestamp::MinusInfinity());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReportsAverageQueueTime) {
|
||||||
|
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero());
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Zero());
|
||||||
|
|
||||||
|
// Add three packets, with the middle packet having higher prio.
|
||||||
|
queue.Push(Timestamp::Millis(10),
|
||||||
|
CreatePacket(RtpPacketMediaType::kPadding, /*seq=*/1));
|
||||||
|
queue.Push(Timestamp::Millis(20),
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2));
|
||||||
|
queue.Push(Timestamp::Millis(30),
|
||||||
|
CreatePacket(RtpPacketMediaType::kPadding, /*seq=*/3));
|
||||||
|
|
||||||
|
queue.UpdateAverageQueueTime(Timestamp::Millis(40));
|
||||||
|
// Packets have waited 30, 20, 10 ms -> average = 20ms.
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Millis(20));
|
||||||
|
|
||||||
|
queue.Pop(); // Pop packet with enqueue time 20.
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Millis(20));
|
||||||
|
|
||||||
|
queue.Pop(); // Pop packet with enqueue time 10.
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Millis(10));
|
||||||
|
|
||||||
|
queue.Pop(); // Pop packet with enqueue time 30, queue empty again.
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Zero());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, SubtractsPusedTimeFromAverageQueueTime) {
|
||||||
|
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero());
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Zero());
|
||||||
|
|
||||||
|
// Add a packet and then enable paused state.
|
||||||
|
queue.Push(Timestamp::Millis(100),
|
||||||
|
CreatePacket(RtpPacketMediaType::kPadding, /*seq=*/1));
|
||||||
|
queue.SetPauseState(true, Timestamp::Millis(600));
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Millis(500));
|
||||||
|
|
||||||
|
// Enqueue a packet 500ms into the paused state. Queue time of
|
||||||
|
// original packet is still seen as 500ms and new one has 0ms giving
|
||||||
|
// an average of 250ms.
|
||||||
|
queue.Push(Timestamp::Millis(1100),
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2));
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Millis(250));
|
||||||
|
|
||||||
|
// Unpause some time later, queue time still unchanged.
|
||||||
|
queue.SetPauseState(false, Timestamp::Millis(1600));
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Millis(250));
|
||||||
|
|
||||||
|
// Update queue time 500ms after pause state ended.
|
||||||
|
queue.UpdateAverageQueueTime(Timestamp::Millis(2100));
|
||||||
|
EXPECT_EQ(queue.AverageQueueTime(), TimeDelta::Millis(750));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PrioritizedPacketQueue, ReportsLeadingAudioEnqueueTime) {
|
||||||
|
PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero());
|
||||||
|
EXPECT_EQ(queue.LeadingAudioPacketEnqueueTime(), Timestamp::MinusInfinity());
|
||||||
|
|
||||||
|
queue.Push(Timestamp::Millis(10),
|
||||||
|
CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/1));
|
||||||
|
EXPECT_EQ(queue.LeadingAudioPacketEnqueueTime(), Timestamp::MinusInfinity());
|
||||||
|
|
||||||
|
queue.Push(Timestamp::Millis(20),
|
||||||
|
CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/2));
|
||||||
|
|
||||||
|
EXPECT_EQ(queue.LeadingAudioPacketEnqueueTime(), Timestamp::Millis(20));
|
||||||
|
|
||||||
|
queue.Pop(); // Pop audio packet.
|
||||||
|
EXPECT_EQ(queue.LeadingAudioPacketEnqueueTime(), Timestamp::MinusInfinity());
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace webrtc
|
||||||
@ -182,7 +182,7 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
|
|||||||
return rtp_packet;
|
return rtp_packet;
|
||||||
}
|
}
|
||||||
|
|
||||||
RTC_DCHECK_GT(size_packets_, 0u);
|
RTC_DCHECK_GT(size_packets_, 0);
|
||||||
Stream* stream = GetHighestPriorityStream();
|
Stream* stream = GetHighestPriorityStream();
|
||||||
const QueuedPacket& queued_packet = stream->packet_queue.top();
|
const QueuedPacket& queued_packet = stream->packet_queue.top();
|
||||||
|
|
||||||
@ -231,7 +231,7 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
|
|||||||
return rtp_packet;
|
return rtp_packet;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t RoundRobinPacketQueue::SizeInPackets() const {
|
int RoundRobinPacketQueue::SizeInPackets() const {
|
||||||
return size_packets_;
|
return size_packets_;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,7 +280,7 @@ void RoundRobinPacketQueue::UpdateAverageQueueTime(Timestamp now) {
|
|||||||
if (paused_) {
|
if (paused_) {
|
||||||
pause_time_sum_ += delta;
|
pause_time_sum_ += delta;
|
||||||
} else {
|
} else {
|
||||||
queue_time_sum_ += TimeDelta::Micros(delta.us() * size_packets_);
|
queue_time_sum_ += delta * size_packets_;
|
||||||
}
|
}
|
||||||
|
|
||||||
time_last_updated_ = now;
|
time_last_updated_ = now;
|
||||||
|
|||||||
@ -28,7 +28,6 @@
|
|||||||
#include "modules/pacing/pacing_controller.h"
|
#include "modules/pacing/pacing_controller.h"
|
||||||
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
||||||
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
|
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
|
||||||
#include "system_wrappers/include/clock.h"
|
|
||||||
|
|
||||||
namespace webrtc {
|
namespace webrtc {
|
||||||
|
|
||||||
@ -41,7 +40,7 @@ class RoundRobinPacketQueue : public PacingController::PacketQueue {
|
|||||||
std::unique_ptr<RtpPacketToSend> packet) override;
|
std::unique_ptr<RtpPacketToSend> packet) override;
|
||||||
std::unique_ptr<RtpPacketToSend> Pop() override;
|
std::unique_ptr<RtpPacketToSend> Pop() override;
|
||||||
|
|
||||||
size_t SizeInPackets() const override;
|
int SizeInPackets() const override;
|
||||||
DataSize SizeInPayloadBytes() const override;
|
DataSize SizeInPayloadBytes() const override;
|
||||||
Timestamp LeadingAudioPacketEnqueueTime() const override;
|
Timestamp LeadingAudioPacketEnqueueTime() const override;
|
||||||
Timestamp OldestEnqueueTime() const override;
|
Timestamp OldestEnqueueTime() const override;
|
||||||
@ -142,7 +141,7 @@ class RoundRobinPacketQueue : public PacingController::PacketQueue {
|
|||||||
int64_t enqueue_count_;
|
int64_t enqueue_count_;
|
||||||
|
|
||||||
bool paused_;
|
bool paused_;
|
||||||
size_t size_packets_;
|
int size_packets_;
|
||||||
DataSize size_;
|
DataSize size_;
|
||||||
DataSize max_size_;
|
DataSize max_size_;
|
||||||
TimeDelta queue_time_sum_;
|
TimeDelta queue_time_sum_;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user