From 9981bd928f01eb77b882185a063934d3dbba8d8a Mon Sep 17 00:00:00 2001 From: philipel Date: Tue, 26 Sep 2017 17:16:06 +0200 Subject: [PATCH] Move PacketQueue out of paced_sender.cc to its own packet_queue.{cc,h}. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: webrtc:8287, webrtc:8288 Change-Id: If8937458c5b8f5a75b3de441aa409ae873f4bda2 Reviewed-on: https://webrtc-review.googlesource.com/3761 Reviewed-by: Erik Språng Commit-Queue: Philip Eliasson Cr-Commit-Position: refs/heads/master@{#20003} --- modules/pacing/BUILD.gn | 2 + modules/pacing/paced_sender.cc | 206 +-------------------------------- modules/pacing/paced_sender.h | 13 +-- modules/pacing/packet_queue.cc | 174 ++++++++++++++++++++++++++++ modules/pacing/packet_queue.h | 109 +++++++++++++++++ 5 files changed, 294 insertions(+), 210 deletions(-) create mode 100644 modules/pacing/packet_queue.cc create mode 100644 modules/pacing/packet_queue.h diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 4882d4f714..f72d3f990f 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -19,6 +19,8 @@ rtc_static_library("pacing") { "paced_sender.cc", "paced_sender.h", "pacer.h", + "packet_queue.cc", + "packet_queue.h", "packet_router.cc", "packet_router.h", ] diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 785a2de0cf..96355b30c0 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -37,201 +37,7 @@ const int64_t kMaxIntervalTimeMs = 30; } // namespace -// TODO(sprang): Move at least PacketQueue out to separate files, so that we can -// more easily test them. - namespace webrtc { -namespace paced_sender { -struct Packet { - Packet(RtpPacketSender::Priority priority, - uint32_t ssrc, - uint16_t seq_number, - int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, - bool retransmission, - uint64_t enqueue_order) - : priority(priority), - ssrc(ssrc), - sequence_number(seq_number), - capture_time_ms(capture_time_ms), - enqueue_time_ms(enqueue_time_ms), - sum_paused_ms(0), - bytes(length_in_bytes), - retransmission(retransmission), - enqueue_order(enqueue_order) {} - - RtpPacketSender::Priority priority; - uint32_t ssrc; - uint16_t sequence_number; - int64_t capture_time_ms; // Absolute time of frame capture. - int64_t enqueue_time_ms; // Absolute time of pacer queue entry. - int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused. - size_t bytes; - bool retransmission; - uint64_t enqueue_order; - std::list::iterator this_it; -}; - -// Used by priority queue to sort packets. -struct Comparator { - bool operator()(const Packet* first, const Packet* second) { - // Highest prio = 0. - if (first->priority != second->priority) - return first->priority > second->priority; - - // Retransmissions go first. - if (second->retransmission != first->retransmission) - return second->retransmission; - - // Older frames have higher prio. - if (first->capture_time_ms != second->capture_time_ms) - return first->capture_time_ms > second->capture_time_ms; - - return first->enqueue_order > second->enqueue_order; - } -}; - -// Class encapsulating a priority queue with some extensions. -class PacketQueue { - public: - explicit PacketQueue(const Clock* clock) - : bytes_(0), - clock_(clock), - queue_time_sum_(0), - time_last_updated_(clock_->TimeInMilliseconds()), - paused_(false) {} - virtual ~PacketQueue() {} - - void Push(const Packet& packet) { - if (!AddToDupeSet(packet)) - return; - - UpdateQueueTime(packet.enqueue_time_ms); - - // Store packet in list, use pointers in priority queue for cheaper moves. - // Packets have a handle to its own iterator in the list, for easy removal - // when popping from queue. - packet_list_.push_front(packet); - std::list::iterator it = packet_list_.begin(); - it->this_it = it; // Handle for direct removal from list. - prio_queue_.push(&(*it)); // Pointer into list. - bytes_ += packet.bytes; - } - - const Packet& BeginPop() { - const Packet& packet = *prio_queue_.top(); - prio_queue_.pop(); - return packet; - } - - void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } - - void FinalizePop(const Packet& packet) { - RemoveFromDupeSet(packet); - bytes_ -= packet.bytes; - int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; - RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); - packet_queue_time_ms -= packet.sum_paused_ms; - RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_); - queue_time_sum_ -= packet_queue_time_ms; - packet_list_.erase(packet.this_it); - RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); - if (packet_list_.empty()) - RTC_DCHECK_EQ(0, queue_time_sum_); - } - - bool Empty() const { return prio_queue_.empty(); } - - size_t SizeInPackets() const { return prio_queue_.size(); } - - uint64_t SizeInBytes() const { return bytes_; } - - int64_t OldestEnqueueTimeMs() const { - auto it = packet_list_.rbegin(); - if (it == packet_list_.rend()) - return 0; - return it->enqueue_time_ms; - } - - void UpdateQueueTime(int64_t timestamp_ms) { - RTC_DCHECK_GE(timestamp_ms, time_last_updated_); - if (timestamp_ms == time_last_updated_) - return; - - int64_t delta_ms = timestamp_ms - time_last_updated_; - - if (paused_) { - // Increase per-packet accumulators of time spent in queue while paused, - // so that we can disregard that when subtracting main accumulator when - // popping packet from the queue. - for (auto& it : packet_list_) { - it.sum_paused_ms += delta_ms; - } - } else { - // Use packet packet_list_.size() not prio_queue_.size() here, as there - // might be an outstanding element popped from prio_queue_ currently in - // the SendPacket() call, while packet_list_ will always be correct. - queue_time_sum_ += delta_ms * packet_list_.size(); - } - time_last_updated_ = timestamp_ms; - } - - void SetPauseState(bool paused, int64_t timestamp_ms) { - if (paused_ == paused) - return; - UpdateQueueTime(timestamp_ms); - paused_ = paused; - } - - int64_t AverageQueueTimeMs() const { - if (prio_queue_.empty()) - return 0; - return queue_time_sum_ / packet_list_.size(); - } - - private: - // Try to add a packet to the set of ssrc/seqno identifiers currently in the - // queue. Return true if inserted, false if this is a duplicate. - bool AddToDupeSet(const Packet& packet) { - SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); - if (it == dupe_map_.end()) { - // First for this ssrc, just insert. - dupe_map_[packet.ssrc].insert(packet.sequence_number); - return true; - } - - // Insert returns a pair, where second is a bool set to true if new element. - return it->second.insert(packet.sequence_number).second; - } - - void RemoveFromDupeSet(const Packet& packet) { - SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); - RTC_DCHECK(it != dupe_map_.end()); - it->second.erase(packet.sequence_number); - if (it->second.empty()) { - dupe_map_.erase(it); - } - } - - // List of packets, in the order the were enqueued. Since dequeueing may - // occur out of order, use list instead of vector. - std::list packet_list_; - // Priority queue of the packets, sorted according to Comparator. - // Use pointers into list, to avoid moving whole struct within heap. - std::priority_queue, Comparator> prio_queue_; - // Total number of bytes in the queue. - uint64_t bytes_; - // Map >, for checking duplicates. - typedef std::map > SsrcSeqNoMap; - SsrcSeqNoMap dupe_map_; - const Clock* const clock_; - int64_t queue_time_sum_; - int64_t time_last_updated_; - bool paused_; -}; - -} // namespace paced_sender const int64_t PacedSender::kMaxQueueLengthMs = 2000; const float PacedSender::kDefaultPaceMultiplier = 2.5f; @@ -253,7 +59,7 @@ PacedSender::PacedSender(const Clock* clock, pacing_bitrate_kbps_(0), time_last_update_us_(clock->TimeInMicroseconds()), first_sent_packet_ms_(-1), - packets_(new paced_sender::PacketQueue(clock)), + packets_(new PacketQueue(clock)), packet_counter_(0), pacing_factor_(kDefaultPaceMultiplier), queue_time_limit(kMaxQueueLengthMs) { @@ -342,9 +148,9 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, if (capture_time_ms < 0) capture_time_ms = now_ms; - packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number, - capture_time_ms, now_ms, bytes, - retransmission, packet_counter_++)); + packets_->Push(PacketQueue::Packet(priority, ssrc, sequence_number, + capture_time_ms, now_ms, bytes, + retransmission, packet_counter_++)); } int64_t PacedSender::ExpectedQueueTimeMs() const { @@ -455,7 +261,7 @@ void PacedSender::Process() { // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can // reinsert it if send fails. - const paced_sender::Packet& packet = packets_->BeginPop(); + const PacketQueue::Packet& packet = packets_->BeginPop(); if (SendPacket(packet, pacing_info)) { // Send succeeded, remove it from the queue. @@ -496,7 +302,7 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { process_thread_ = process_thread; } -bool PacedSender::SendPacket(const paced_sender::Packet& packet, +bool PacedSender::SendPacket(const PacketQueue::Packet& packet, const PacedPacketInfo& pacing_info) { RTC_DCHECK(!paused_); if (media_budget_->bytes_remaining() == 0 && diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 754d750be7..499f1d90ae 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -11,12 +11,11 @@ #ifndef MODULES_PACING_PACED_SENDER_H_ #define MODULES_PACING_PACED_SENDER_H_ -#include #include -#include #include "api/optional.h" #include "modules/pacing/pacer.h" +#include "modules/pacing/packet_queue.h" #include "rtc_base/criticalsection.h" #include "rtc_base/thread_annotations.h" #include "typedefs.h" // NOLINT(build/include) @@ -29,12 +28,6 @@ class ProbeClusterCreatedObserver; class RtcEventLog; class IntervalBudget; -namespace paced_sender { -class IntervalBudget; -struct Packet; -class PacketQueue; -} // namespace paced_sender - class PacedSender : public Pacer { public: class PacketSender { @@ -159,7 +152,7 @@ class PacedSender : public Pacer { void UpdateBudgetWithBytesSent(size_t bytes) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - bool SendPacket(const paced_sender::Packet& packet, + bool SendPacket(const PacketQueue::Packet& packet, const PacedPacketInfo& cluster_info) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) @@ -191,7 +184,7 @@ class PacedSender : public Pacer { int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_); int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_); - std::unique_ptr packets_ RTC_GUARDED_BY(critsect_); + std::unique_ptr packets_ RTC_GUARDED_BY(critsect_); uint64_t packet_counter_; ProcessThread* process_thread_ = nullptr; diff --git a/modules/pacing/packet_queue.cc b/modules/pacing/packet_queue.cc new file mode 100644 index 0000000000..765e43fcf6 --- /dev/null +++ b/modules/pacing/packet_queue.cc @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/packet_queue.h" + +#include +#include +#include + +#include "modules/include/module_common_types.h" +#include "modules/pacing/alr_detector.h" +#include "modules/pacing/bitrate_prober.h" +#include "modules/pacing/interval_budget.h" +#include "modules/utility/include/process_thread.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "system_wrappers/include/clock.h" +#include "system_wrappers/include/field_trial.h" + +namespace webrtc { + +PacketQueue::Packet::Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order) + : priority(priority), + ssrc(ssrc), + sequence_number(seq_number), + capture_time_ms(capture_time_ms), + enqueue_time_ms(enqueue_time_ms), + sum_paused_ms(0), + bytes(length_in_bytes), + retransmission(retransmission), + enqueue_order(enqueue_order) {} + +PacketQueue::Packet::~Packet() {} + +PacketQueue::PacketQueue(const Clock* clock) + : bytes_(0), + clock_(clock), + queue_time_sum_(0), + time_last_updated_(clock_->TimeInMilliseconds()), + paused_(false) {} + +PacketQueue::~PacketQueue() {} + +void PacketQueue::Push(const Packet& packet) { + if (!AddToDupeSet(packet)) + return; + + UpdateQueueTime(packet.enqueue_time_ms); + + // Store packet in list, use pointers in priority queue for cheaper moves. + // Packets have a handle to its own iterator in the list, for easy removal + // when popping from queue. + packet_list_.push_front(packet); + std::list::iterator it = packet_list_.begin(); + it->this_it = it; // Handle for direct removal from list. + prio_queue_.push(&(*it)); // Pointer into list. + bytes_ += packet.bytes; +} + +const PacketQueue::Packet& PacketQueue::BeginPop() { + const PacketQueue::Packet& packet = *prio_queue_.top(); + prio_queue_.pop(); + return packet; +} + +void PacketQueue::CancelPop(const PacketQueue::Packet& packet) { + prio_queue_.push(&(*packet.this_it)); +} + +void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) { + RemoveFromDupeSet(packet); + bytes_ -= packet.bytes; + int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; + RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); + packet_queue_time_ms -= packet.sum_paused_ms; + RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_); + queue_time_sum_ -= packet_queue_time_ms; + packet_list_.erase(packet.this_it); + RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); + if (packet_list_.empty()) + RTC_DCHECK_EQ(0, queue_time_sum_); +} + +bool PacketQueue::Empty() const { + return prio_queue_.empty(); +} + +size_t PacketQueue::SizeInPackets() const { + return prio_queue_.size(); +} + +uint64_t PacketQueue::SizeInBytes() const { + return bytes_; +} + +int64_t PacketQueue::OldestEnqueueTimeMs() const { + auto it = packet_list_.rbegin(); + if (it == packet_list_.rend()) + return 0; + return it->enqueue_time_ms; +} + +void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) { + RTC_DCHECK_GE(timestamp_ms, time_last_updated_); + if (timestamp_ms == time_last_updated_) + return; + + int64_t delta_ms = timestamp_ms - time_last_updated_; + + if (paused_) { + // Increase per-packet accumulators of time spent in queue while paused, + // so that we can disregard that when subtracting main accumulator when + // popping packet from the queue. + for (auto& it : packet_list_) { + it.sum_paused_ms += delta_ms; + } + } else { + // Use packet packet_list_.size() not prio_queue_.size() here, as there + // might be an outstanding element popped from prio_queue_ currently in + // the SendPacket() call, while packet_list_ will always be correct. + queue_time_sum_ += delta_ms * packet_list_.size(); + } + time_last_updated_ = timestamp_ms; +} + +void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { + if (paused_ == paused) + return; + UpdateQueueTime(timestamp_ms); + paused_ = paused; +} + +int64_t PacketQueue::AverageQueueTimeMs() const { + if (prio_queue_.empty()) + return 0; + return queue_time_sum_ / packet_list_.size(); +} + +bool PacketQueue::AddToDupeSet(const PacketQueue::Packet& packet) { + SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); + if (it == dupe_map_.end()) { + // First for this ssrc, just insert. + dupe_map_[packet.ssrc].insert(packet.sequence_number); + return true; + } + + // Insert returns a pair, where second is a bool set to true if new element. + return it->second.insert(packet.sequence_number).second; +} + +void PacketQueue::RemoveFromDupeSet(const PacketQueue::Packet& packet) { + SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); + RTC_DCHECK(it != dupe_map_.end()); + it->second.erase(packet.sequence_number); + if (it->second.empty()) { + dupe_map_.erase(it); + } +} + +} // namespace webrtc diff --git a/modules/pacing/packet_queue.h b/modules/pacing/packet_queue.h new file mode 100644 index 0000000000..c6aa8433b4 --- /dev/null +++ b/modules/pacing/packet_queue.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACKET_QUEUE_H_ +#define MODULES_PACING_PACKET_QUEUE_H_ + +#include +#include +#include +#include +#include + +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" + +namespace webrtc { + +class PacketQueue { + public: + explicit PacketQueue(const Clock* clock); + virtual ~PacketQueue(); + + struct Packet { + Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order); + + virtual ~Packet(); + + RtpPacketSender::Priority priority; + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; // Absolute time of frame capture. + int64_t enqueue_time_ms; // Absolute time of pacer queue entry. + int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused. + size_t bytes; + bool retransmission; + uint64_t enqueue_order; + std::list::iterator this_it; + }; + + void Push(const Packet& packet); + const Packet& BeginPop(); + void CancelPop(const Packet& packet); + void FinalizePop(const Packet& packet); + bool Empty() const; + size_t SizeInPackets() const; + uint64_t SizeInBytes() const; + int64_t OldestEnqueueTimeMs() const; + void UpdateQueueTime(int64_t timestamp_ms); + void SetPauseState(bool paused, int64_t timestamp_ms); + int64_t AverageQueueTimeMs() const; + + private: + // Try to add a packet to the set of ssrc/seqno identifiers currently in the + // queue. Return true if inserted, false if this is a duplicate. + bool AddToDupeSet(const Packet& packet); + + void RemoveFromDupeSet(const Packet& packet); + + // Used by priority queue to sort packets. + struct Comparator { + bool operator()(const Packet* first, const Packet* second) { + // Highest prio = 0. + if (first->priority != second->priority) + return first->priority > second->priority; + + // Retransmissions go first. + if (second->retransmission != first->retransmission) + return second->retransmission; + + // Older frames have higher prio. + if (first->capture_time_ms != second->capture_time_ms) + return first->capture_time_ms > second->capture_time_ms; + + return first->enqueue_order > second->enqueue_order; + } + }; + + // List of packets, in the order the were enqueued. Since dequeueing may + // occur out of order, use list instead of vector. + std::list packet_list_; + // Priority queue of the packets, sorted according to Comparator. + // Use pointers into list, to avodi moving whole struct within heap. + std::priority_queue, Comparator> prio_queue_; + // Total number of bytes in the queue. + uint64_t bytes_; + // Map >, for checking duplicates. + typedef std::map > SsrcSeqNoMap; + SsrcSeqNoMap dupe_map_; + const Clock* const clock_; + int64_t queue_time_sum_; + int64_t time_last_updated_; + bool paused_; +}; +} // namespace webrtc + +#endif // MODULES_PACING_PACKET_QUEUE_H_