This field trial was read in RTPSender, and the altered packet size passed along to the pacer. Now, the pacer packet queue looks directly at the packet instance, so it needs to be aware of the experiment flag in order to make the right decision. Bug: webrtc:10633, b/138582168 Change-Id: If1148f39c463e11ad49a659913465f131cf9b526 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/147270 Commit-Queue: Erik Språng <sprang@webrtc.org> Reviewed-by: Sebastian Jansson <srte@webrtc.org> Cr-Commit-Position: refs/heads/master@{#28714}
308 lines
11 KiB
C++
308 lines
11 KiB
C++
/*
|
|
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
|
|
#include "modules/pacing/round_robin_packet_queue.h"
|
|
|
|
#include <algorithm>
|
|
#include <cstdint>
|
|
#include <utility>
|
|
|
|
#include "rtc_base/checks.h"
|
|
|
|
namespace webrtc {
|
|
|
|
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
|
|
default;
|
|
RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
|
|
|
|
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
|
|
int priority,
|
|
RtpPacketToSend::Type type,
|
|
uint32_t ssrc,
|
|
uint16_t seq_number,
|
|
int64_t capture_time_ms,
|
|
int64_t enqueue_time_ms,
|
|
size_t length_in_bytes,
|
|
bool retransmission,
|
|
uint64_t enqueue_order,
|
|
std::multiset<int64_t>::iterator enqueue_time_it,
|
|
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
|
|
packet_it)
|
|
: type_(type),
|
|
priority_(priority),
|
|
ssrc_(ssrc),
|
|
sequence_number_(seq_number),
|
|
capture_time_ms_(capture_time_ms),
|
|
enqueue_time_ms_(enqueue_time_ms),
|
|
bytes_(length_in_bytes),
|
|
retransmission_(retransmission),
|
|
enqueue_order_(enqueue_order),
|
|
enqueue_time_it_(enqueue_time_it),
|
|
packet_it_(packet_it) {}
|
|
|
|
std::unique_ptr<RtpPacketToSend>
|
|
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
|
|
return packet_it_ ? std::move(**packet_it_) : nullptr;
|
|
}
|
|
|
|
void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs(
|
|
int64_t pause_time_sum_ms) {
|
|
enqueue_time_ms_ -= pause_time_sum_ms;
|
|
}
|
|
|
|
bool RoundRobinPacketQueue::QueuedPacket::operator<(
|
|
const RoundRobinPacketQueue::QueuedPacket& other) const {
|
|
if (priority_ != other.priority_)
|
|
return priority_ > other.priority_;
|
|
if (retransmission_ != other.retransmission_)
|
|
return other.retransmission_;
|
|
|
|
return enqueue_order_ > other.enqueue_order_;
|
|
}
|
|
|
|
RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {}
|
|
RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
|
|
RoundRobinPacketQueue::Stream::~Stream() {}
|
|
|
|
bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
|
|
if (!field_trials) {
|
|
return false;
|
|
}
|
|
return field_trials->Lookup(name).find("Enabled") == 0;
|
|
}
|
|
|
|
RoundRobinPacketQueue::RoundRobinPacketQueue(
|
|
int64_t start_time_us,
|
|
const WebRtcKeyValueConfig* field_trials)
|
|
: time_last_updated_ms_(start_time_us / 1000),
|
|
send_side_bwe_with_overhead_(
|
|
IsEnabled(field_trials, "WebRTC-SendSideBwe-WithOverhead")) {}
|
|
|
|
RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
|
|
|
|
void RoundRobinPacketQueue::Push(int priority,
|
|
RtpPacketToSend::Type type,
|
|
uint32_t ssrc,
|
|
uint16_t seq_number,
|
|
int64_t capture_time_ms,
|
|
int64_t enqueue_time_ms,
|
|
size_t length_in_bytes,
|
|
bool retransmission,
|
|
uint64_t enqueue_order) {
|
|
Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
|
|
enqueue_time_ms, length_in_bytes, retransmission,
|
|
enqueue_order, enqueue_times_.insert(enqueue_time_ms),
|
|
absl::nullopt));
|
|
}
|
|
|
|
void RoundRobinPacketQueue::Push(int priority,
|
|
int64_t enqueue_time_ms,
|
|
uint64_t enqueue_order,
|
|
std::unique_ptr<RtpPacketToSend> packet) {
|
|
uint32_t ssrc = packet->Ssrc();
|
|
uint16_t sequence_number = packet->SequenceNumber();
|
|
int64_t capture_time_ms = packet->capture_time_ms();
|
|
size_t size_bytes = send_side_bwe_with_overhead_
|
|
? packet->size()
|
|
: packet->payload_size() + packet->padding_size();
|
|
auto type = packet->packet_type();
|
|
RTC_DCHECK(type.has_value());
|
|
|
|
rtp_packets_.push_front(std::move(packet));
|
|
Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms,
|
|
enqueue_time_ms, size_bytes,
|
|
*type == RtpPacketToSend::Type::kRetransmission,
|
|
enqueue_order, enqueue_times_.insert(enqueue_time_ms),
|
|
rtp_packets_.begin()));
|
|
}
|
|
|
|
RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
|
|
RTC_CHECK(!pop_packet_ && !pop_stream_);
|
|
|
|
Stream* stream = GetHighestPriorityStream();
|
|
pop_stream_.emplace(stream);
|
|
pop_packet_.emplace(stream->packet_queue.top());
|
|
stream->packet_queue.pop();
|
|
|
|
return &pop_packet_.value();
|
|
}
|
|
|
|
void RoundRobinPacketQueue::CancelPop() {
|
|
RTC_CHECK(pop_packet_ && pop_stream_);
|
|
(*pop_stream_)->packet_queue.push(*pop_packet_);
|
|
pop_packet_.reset();
|
|
pop_stream_.reset();
|
|
}
|
|
|
|
void RoundRobinPacketQueue::FinalizePop() {
|
|
if (!Empty()) {
|
|
RTC_CHECK(pop_packet_ && pop_stream_);
|
|
Stream* stream = *pop_stream_;
|
|
stream_priorities_.erase(stream->priority_it);
|
|
const QueuedPacket& packet = *pop_packet_;
|
|
|
|
// Calculate the total amount of time spent by this packet in the queue
|
|
// while in a non-paused state. Note that the |pause_time_sum_ms_| was
|
|
// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
|
|
// by subtracting it now we effectively remove the time spent in in the
|
|
// queue while in a paused state.
|
|
int64_t time_in_non_paused_state_ms =
|
|
time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_;
|
|
queue_time_sum_ms_ -= time_in_non_paused_state_ms;
|
|
|
|
RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
|
|
enqueue_times_.erase(packet.EnqueueTimeIterator());
|
|
|
|
auto packet_it = packet.PacketIterator();
|
|
if (packet_it) {
|
|
rtp_packets_.erase(*packet_it);
|
|
}
|
|
|
|
// Update |bytes| of this stream. The general idea is that the stream that
|
|
// has sent the least amount of bytes should have the highest priority.
|
|
// The problem with that is if streams send with different rates, in which
|
|
// case a "budget" will be built up for the stream sending at the lower
|
|
// rate. To avoid building a too large budget we limit |bytes| to be within
|
|
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
|
|
stream->bytes = std::max(stream->bytes + packet.size_in_bytes(),
|
|
max_bytes_ - kMaxLeadingBytes);
|
|
max_bytes_ = std::max(max_bytes_, stream->bytes);
|
|
|
|
size_bytes_ -= packet.size_in_bytes();
|
|
size_packets_ -= 1;
|
|
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
|
|
|
|
// If there are packets left to be sent, schedule the stream again.
|
|
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
|
|
if (stream->packet_queue.empty()) {
|
|
stream->priority_it = stream_priorities_.end();
|
|
} else {
|
|
int priority = stream->packet_queue.top().priority();
|
|
stream->priority_it = stream_priorities_.emplace(
|
|
StreamPrioKey(priority, stream->bytes), stream->ssrc);
|
|
}
|
|
|
|
pop_packet_.reset();
|
|
pop_stream_.reset();
|
|
}
|
|
}
|
|
|
|
bool RoundRobinPacketQueue::Empty() const {
|
|
RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
|
|
(stream_priorities_.empty() && size_packets_ == 0));
|
|
return stream_priorities_.empty();
|
|
}
|
|
|
|
size_t RoundRobinPacketQueue::SizeInPackets() const {
|
|
return size_packets_;
|
|
}
|
|
|
|
uint64_t RoundRobinPacketQueue::SizeInBytes() const {
|
|
return size_bytes_;
|
|
}
|
|
|
|
int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const {
|
|
if (Empty())
|
|
return 0;
|
|
RTC_CHECK(!enqueue_times_.empty());
|
|
return *enqueue_times_.begin();
|
|
}
|
|
|
|
void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
|
|
RTC_CHECK_GE(timestamp_ms, time_last_updated_ms_);
|
|
if (timestamp_ms == time_last_updated_ms_)
|
|
return;
|
|
|
|
int64_t delta_ms = timestamp_ms - time_last_updated_ms_;
|
|
|
|
if (paused_) {
|
|
pause_time_sum_ms_ += delta_ms;
|
|
} else {
|
|
queue_time_sum_ms_ += delta_ms * size_packets_;
|
|
}
|
|
|
|
time_last_updated_ms_ = timestamp_ms;
|
|
}
|
|
|
|
void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
|
|
if (paused_ == paused)
|
|
return;
|
|
UpdateQueueTime(timestamp_ms);
|
|
paused_ = paused;
|
|
}
|
|
|
|
int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const {
|
|
if (Empty())
|
|
return 0;
|
|
return queue_time_sum_ms_ / size_packets_;
|
|
}
|
|
|
|
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
|
|
auto stream_info_it = streams_.find(packet.ssrc());
|
|
if (stream_info_it == streams_.end()) {
|
|
stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
|
|
stream_info_it->second.priority_it = stream_priorities_.end();
|
|
stream_info_it->second.ssrc = packet.ssrc();
|
|
}
|
|
|
|
Stream* stream = &stream_info_it->second;
|
|
|
|
if (stream->priority_it == stream_priorities_.end()) {
|
|
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
|
|
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
|
|
stream->priority_it = stream_priorities_.emplace(
|
|
StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
|
|
} else if (packet.priority() < stream->priority_it->first.priority) {
|
|
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
|
|
// and insert a new one with the new priority. Note that |priority_| uses
|
|
// lower ordinal for higher priority.
|
|
stream_priorities_.erase(stream->priority_it);
|
|
stream->priority_it = stream_priorities_.emplace(
|
|
StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
|
|
}
|
|
RTC_CHECK(stream->priority_it != stream_priorities_.end());
|
|
|
|
// In order to figure out how much time a packet has spent in the queue while
|
|
// not in a paused state, we subtract the total amount of time the queue has
|
|
// been paused so far, and when the packet is popped we subtract the total
|
|
// amount of time the queue has been paused at that moment. This way we
|
|
// subtract the total amount of time the packet has spent in the queue while
|
|
// in a paused state.
|
|
UpdateQueueTime(packet.enqueue_time_ms());
|
|
packet.SubtractPauseTimeMs(pause_time_sum_ms_);
|
|
|
|
size_packets_ += 1;
|
|
size_bytes_ += packet.size_in_bytes();
|
|
|
|
stream->packet_queue.push(packet);
|
|
}
|
|
|
|
RoundRobinPacketQueue::Stream*
|
|
RoundRobinPacketQueue::GetHighestPriorityStream() {
|
|
RTC_CHECK(!stream_priorities_.empty());
|
|
uint32_t ssrc = stream_priorities_.begin()->second;
|
|
|
|
auto stream_info_it = streams_.find(ssrc);
|
|
RTC_CHECK(stream_info_it != streams_.end());
|
|
RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
|
|
RTC_CHECK(!stream_info_it->second.packet_queue.empty());
|
|
return &stream_info_it->second;
|
|
}
|
|
|
|
bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
|
|
for (const auto& scheduled_stream : stream_priorities_) {
|
|
if (scheduled_stream.second == ssrc)
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
} // namespace webrtc
|