From 881829b145820aefc1f0b33dc69a7ea35e33f4d2 Mon Sep 17 00:00:00 2001 From: philipel Date: Fri, 13 Oct 2017 13:27:23 +0200 Subject: [PATCH] New PacketQueue2 which prioritize packets in a round-robin fashion. Bug: webrtc:8287, webrtc:8288 Change-Id: I69ab846851c308d51e23e2b3bc3b55b19d1a07e8 Reviewed-on: https://webrtc-review.googlesource.com/7300 Commit-Queue: Philip Eliasson Reviewed-by: Stefan Holmer Cr-Commit-Position: refs/heads/master@{#20281} --- modules/pacing/BUILD.gn | 2 + modules/pacing/packet_queue2.cc | 212 ++++++++++++++++++++++++++++++++ modules/pacing/packet_queue2.h | 138 +++++++++++++++++++++ 3 files changed, 352 insertions(+) create mode 100644 modules/pacing/packet_queue2.cc create mode 100644 modules/pacing/packet_queue2.h diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index fd350a4026..e9d8348e8f 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -21,6 +21,8 @@ rtc_static_library("pacing") { "pacer.h", "packet_queue.cc", "packet_queue.h", + "packet_queue2.cc", + "packet_queue2.h", "packet_router.cc", "packet_router.h", ] diff --git a/modules/pacing/packet_queue2.cc b/modules/pacing/packet_queue2.cc new file mode 100644 index 0000000000..46f65e0e8f --- /dev/null +++ b/modules/pacing/packet_queue2.cc @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/packet_queue2.h" + +#include + +#include "rtc_base/checks.h" +#include "system_wrappers/include/clock.h" + +namespace webrtc { + +PacketQueue2::Stream::Stream() : bytes(0) {} +PacketQueue2::Stream::~Stream() {} + +PacketQueue2::Packet::Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order) + : priority(priority), + ssrc(ssrc), + sequence_number(seq_number), + capture_time_ms(capture_time_ms), + enqueue_time_ms(enqueue_time_ms), + bytes(length_in_bytes), + retransmission(retransmission), + enqueue_order(enqueue_order) {} + +PacketQueue2::Packet::Packet(const Packet& other) = default; + +PacketQueue2::Packet::~Packet() {} + +PacketQueue2::PacketQueue2(const Clock* clock) + : clock_(clock), time_last_updated_(clock_->TimeInMilliseconds()) {} + +PacketQueue2::~PacketQueue2() {} + +void PacketQueue2::Push(const Packet& packet_to_insert) { + Packet packet(packet_to_insert); + + auto stream_info_it = streams_.find(packet.ssrc); + if (stream_info_it == streams_.end()) { + stream_info_it = streams_.emplace(packet.ssrc, Stream()).first; + stream_info_it->second.priority_it = stream_priorities_.end(); + stream_info_it->second.ssrc = packet.ssrc; + } + + Stream* streams_ = &stream_info_it->second; + + if (streams_->priority_it == stream_priorities_.end()) { + // If the SSRC is not currently scheduled, add it to |stream_priorities_|. + RTC_CHECK(!IsSsrcScheduled(streams_->ssrc)); + streams_->priority_it = stream_priorities_.emplace( + StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc); + } else if (packet.priority < streams_->priority_it->first.priority) { + // If the priority of this SSRC increased, remove the outdated StreamPrioKey + // and insert a new one with the new priority. Note that + // RtpPacketSender::Priority uses lower ordinal for higher priority. + stream_priorities_.erase(streams_->priority_it); + streams_->priority_it = stream_priorities_.emplace( + StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc); + } + RTC_CHECK(streams_->priority_it != stream_priorities_.end()); + + packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms); + + // In order to figure out how much time a packet has spent in the queue while + // not in a paused state, we subtract the total amount of time the queue has + // been paused so far, and when the packet is poped we subtract the total + // amount of time the queue has been paused at that moment. This way we + // subtract the total amount of time the packet has spent in the queue while + // in a paused state. + UpdateQueueTime(packet.enqueue_time_ms); + packet.enqueue_time_ms -= pause_time_sum_ms_; + streams_->packet_queue.push(packet); + + size_packets_ += 1; + size_bytes_ += packet.bytes; +} + +const PacketQueue2::Packet& PacketQueue2::Top() { + return GetHighestPriorityStream()->packet_queue.top(); +} + +void PacketQueue2::Pop() { + RTC_CHECK(!paused_); + if (!Empty()) { + Stream* streams_ = GetHighestPriorityStream(); + stream_priorities_.erase(streams_->priority_it); + const Packet& packet = streams_->packet_queue.top(); + + // Calculate the total amount of time spent by this packet in the queue + // while in a non-paused state. Note that the |pause_time_sum_ms_| was + // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and + // by subtracting it now we effectively remove the time spent in in the + // queue while in a paused state. + int64_t time_in_non_paused_state_ms = + time_last_updated_ - packet.enqueue_time_ms - pause_time_sum_ms_; + queue_time_sum_ms_ -= time_in_non_paused_state_ms; + + RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end()); + enqueue_times_.erase(packet.enqueue_time_it); + + // Update |bytes| of this stream. The general idea is that the stream that + // has sent the least amount of bytes should have the highest priority. + // The problem with that is if streams send with different rates, in which + // case a "budget" will be built up for the stream sending at the lower + // rate. To avoid building a too large budget we limit |bytes| to be within + // kMaxLeading bytes of the stream that has sent the most amount of bytes. + streams_->bytes = + std::max(streams_->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes); + max_bytes_ = std::max(max_bytes_, streams_->bytes); + + size_bytes_ -= packet.bytes; + size_packets_ -= 1; + RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0); + streams_->packet_queue.pop(); + + // If there are packets left to be sent, schedule the stream again. + RTC_CHECK(!IsSsrcScheduled(streams_->ssrc)); + if (streams_->packet_queue.empty()) { + streams_->priority_it = stream_priorities_.end(); + } else { + RtpPacketSender::Priority priority = + streams_->packet_queue.top().priority; + streams_->priority_it = stream_priorities_.emplace( + StreamPrioKey(priority, streams_->bytes), streams_->ssrc); + } + } +} + +bool PacketQueue2::Empty() const { + RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) || + (stream_priorities_.empty() && size_packets_ == 0)); + return stream_priorities_.empty(); +} + +size_t PacketQueue2::SizeInPackets() const { + return size_packets_; +} + +uint64_t PacketQueue2::SizeInBytes() const { + return size_bytes_; +} + +int64_t PacketQueue2::OldestEnqueueTimeMs() const { + if (Empty()) + return 0; + RTC_CHECK(!enqueue_times_.empty()); + return *enqueue_times_.begin(); +} + +void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) { + RTC_CHECK_GE(timestamp_ms, time_last_updated_); + if (timestamp_ms == time_last_updated_) + return; + + int64_t delta_ms = timestamp_ms - time_last_updated_; + + if (paused_) { + pause_time_sum_ms_ += delta_ms; + } else { + queue_time_sum_ms_ += delta_ms * size_packets_; + } + + time_last_updated_ = timestamp_ms; +} + +void PacketQueue2::SetPauseState(bool paused, int64_t timestamp_ms) { + if (paused_ == paused) + return; + UpdateQueueTime(timestamp_ms); + paused_ = paused; +} + +int64_t PacketQueue2::AverageQueueTimeMs() const { + if (Empty()) + return 0; + return queue_time_sum_ms_ / size_packets_; +} + +PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() { + RTC_CHECK(!stream_priorities_.empty()); + uint32_t ssrc = stream_priorities_.begin()->second; + + auto stream_info_it = streams_.find(ssrc); + RTC_CHECK(stream_info_it != streams_.end()); + RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin()); + RTC_CHECK(!stream_info_it->second.packet_queue.empty()); + return &stream_info_it->second; +} + +bool PacketQueue2::IsSsrcScheduled(uint32_t ssrc) const { + for (const auto& scheduled_stream : stream_priorities_) { + if (scheduled_stream.second == ssrc) + return true; + } + return false; +} + +} // namespace webrtc diff --git a/modules/pacing/packet_queue2.h b/modules/pacing/packet_queue2.h new file mode 100644 index 0000000000..c7a23371b5 --- /dev/null +++ b/modules/pacing/packet_queue2.h @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACKET_QUEUE2_H_ +#define MODULES_PACING_PACKET_QUEUE2_H_ + +#include +#include +#include + +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" + +namespace webrtc { + +class PacketQueue2 { + public: + explicit PacketQueue2(const Clock* clock); + virtual ~PacketQueue2(); + + struct Packet { + Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order); + + Packet(const Packet& other); + + virtual ~Packet(); + + bool operator<(const Packet& other) const { + if (priority != other.priority) + return priority > other.priority; + if (retransmission != other.retransmission) + return other.retransmission; + + return enqueue_order > other.enqueue_order; + } + + RtpPacketSender::Priority priority; + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; // Absolute time of frame capture. + int64_t enqueue_time_ms; // Absolute time of pacer queue entry. + size_t bytes; + bool retransmission; + uint64_t enqueue_order; + std::multiset::iterator enqueue_time_it; + }; + + void Push(const Packet& packet); + const Packet& Top(); + void Pop(); + + bool Empty() const; + size_t SizeInPackets() const; + uint64_t SizeInBytes() const; + + int64_t OldestEnqueueTimeMs() const; + int64_t AverageQueueTimeMs() const; + void UpdateQueueTime(int64_t timestamp_ms); + void SetPauseState(bool paused, int64_t timestamp_ms); + + struct StreamPrioKey { + StreamPrioKey() = default; + StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes) + : priority(priority), bytes(bytes) {} + + bool operator<(const StreamPrioKey& other) const { + if (priority != other.priority) + return priority < other.priority; + return bytes > other.bytes; + } + + const RtpPacketSender::Priority priority; + const size_t bytes; + }; + + struct Stream { + Stream(); + + virtual ~Stream(); + + size_t bytes; + uint32_t ssrc; + std::priority_queue packet_queue; + + // Whenever a packet is inserted for this stream we check if |priority_it| + // points to an element in |stream_priorities_|, and if it does it means + // this stream has already been scheduled, and if the scheduled priority is + // lower than the priority of the incoming packet we reschedule this stream + // with the higher priority. + std::multimap::iterator priority_it; + }; + + private: + static constexpr size_t kMaxLeadingBytes = 1400; + + Stream* GetHighestPriorityStream(); + + // Just used to verify correctness. + bool IsSsrcScheduled(uint32_t ssrc) const; + + const Clock* const clock_; + bool paused_ = false; + size_t size_packets_ = 0; + size_t size_bytes_ = 0; + size_t max_bytes_ = kMaxLeadingBytes; + int64_t time_last_updated_; + int64_t queue_time_sum_ms_ = 0; + int64_t pause_time_sum_ms_ = 0; + + // A map of streams used to prioritize from which stream to send next. We use + // a multimap instead of a priority_queue since the priority of a stream can + // change as a new packet is inserted, and a multimap allows us to remove and + // then reinsert a StreamPrioKey if the priority has increased. + std::multimap stream_priorities_; + + // A map of SSRCs to Streams. + std::map streams_; + + // The enqueue time of every packet currently in the queue. Used to figure out + // the age of the oldest packet in the queue. + std::multiset enqueue_times_; +}; +} // namespace webrtc + +#endif // MODULES_PACING_PACKET_QUEUE2_H_