diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 7f7767f411..c9689d929b 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -15,10 +15,6 @@ rtc_static_library("pacing") { "paced_sender.cc", "paced_sender.h", "pacer.h", - "packet_queue.cc", - "packet_queue.h", - "packet_queue_interface.cc", - "packet_queue_interface.h", "packet_router.cc", "packet_router.h", "round_robin_packet_queue.cc", diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index eb690f72f1..0862d1d01e 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -22,7 +22,6 @@ #include "modules/include/module_common_types.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" -#include "modules/pacing/round_robin_packet_queue.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" @@ -51,15 +50,6 @@ const float PacedSender::kDefaultPaceMultiplier = 2.5f; PacedSender::PacedSender(const Clock* clock, PacketSender* packet_sender, RtcEventLog* event_log) - : PacedSender(clock, - packet_sender, - event_log, - absl::make_unique(clock)) {} - -PacedSender::PacedSender(const Clock* clock, - PacketSender* packet_sender, - RtcEventLog* event_log, - std::unique_ptr packets) : clock_(clock), packet_sender_(packet_sender), alr_detector_(absl::make_unique(event_log)), @@ -80,7 +70,7 @@ PacedSender::PacedSender(const Clock* clock, time_last_process_us_(clock->TimeInMicroseconds()), last_send_time_us_(clock->TimeInMicroseconds()), first_sent_packet_ms_(-1), - packets_(std::move(packets)), + packets_(clock), packet_counter_(0), pacing_factor_(kDefaultPaceMultiplier), queue_time_limit(kMaxQueueLengthMs), @@ -104,7 +94,7 @@ void PacedSender::Pause() { if (!paused_) RTC_LOG(LS_INFO) << "PacedSender paused."; paused_ = true; - packets_->SetPauseState(true, TimeMilliseconds()); + packets_.SetPauseState(true, TimeMilliseconds()); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to get @@ -119,7 +109,7 @@ void PacedSender::Resume() { if (paused_) RTC_LOG(LS_INFO) << "PacedSender resumed."; paused_ = false; - packets_->SetPauseState(false, TimeMilliseconds()); + packets_.SetPauseState(false, TimeMilliseconds()); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to @@ -212,7 +202,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, if (capture_time_ms < 0) capture_time_ms = now_ms; - packets_->Push(PacketQueueInterface::Packet( + packets_.Push(RoundRobinPacketQueue::Packet( priority, ssrc, sequence_number, capture_time_ms, now_ms, bytes, retransmission, packet_counter_++)); } @@ -225,7 +215,7 @@ void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { int64_t PacedSender::ExpectedQueueTimeMs() const { rtc::CritScope cs(&critsect_); RTC_DCHECK_GT(pacing_bitrate_kbps_, 0); - return static_cast(packets_->SizeInBytes() * 8 / + return static_cast(packets_.SizeInBytes() * 8 / pacing_bitrate_kbps_); } @@ -237,7 +227,7 @@ absl::optional PacedSender::GetApplicationLimitedRegionStartTime() size_t PacedSender::QueueSizePackets() const { rtc::CritScope cs(&critsect_); - return packets_->SizeInPackets(); + return packets_.SizeInPackets(); } int64_t PacedSender::FirstSentPacketTimeMs() const { @@ -248,7 +238,7 @@ int64_t PacedSender::FirstSentPacketTimeMs() const { int64_t PacedSender::QueueInMs() const { rtc::CritScope cs(&critsect_); - int64_t oldest_packet = packets_->OldestEnqueueTimeMs(); + int64_t oldest_packet = packets_.OldestEnqueueTimeMs(); if (oldest_packet == 0) return 0; @@ -303,15 +293,15 @@ void PacedSender::Process() { if (elapsed_time_ms > 0) { int target_bitrate_kbps = pacing_bitrate_kbps_; - size_t queue_size_bytes = packets_->SizeInBytes(); + size_t queue_size_bytes = packets_.SizeInBytes(); if (queue_size_bytes > 0) { // Assuming equal size packets and input/output rate, the average packet // has avg_time_left_ms left to get queue_size_bytes out of the queue, if // time constraint shall be met. Determine bitrate needed for that. - packets_->UpdateQueueTime(TimeMilliseconds()); + packets_.UpdateQueueTime(TimeMilliseconds()); if (drain_large_queues_) { int64_t avg_time_left_ms = std::max( - 1, queue_time_limit - packets_->AverageQueueTimeMs()); + 1, queue_time_limit - packets_.AverageQueueTimeMs()); int min_bitrate_needed_kbps = static_cast(queue_size_bytes * 8 / avg_time_left_ms); if (min_bitrate_needed_kbps > target_bitrate_kbps) @@ -333,26 +323,26 @@ void PacedSender::Process() { } // The paused state is checked in the loop since SendPacket leaves the // critical section allowing the paused state to be changed from other code. - while (!packets_->Empty() && !paused_) { + while (!packets_.Empty() && !paused_) { // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can // reinsert it if send fails. - const PacketQueueInterface::Packet& packet = packets_->BeginPop(); + const RoundRobinPacketQueue::Packet& packet = packets_.BeginPop(); if (SendPacket(packet, pacing_info)) { bytes_sent += packet.bytes; // Send succeeded, remove it from the queue. - packets_->FinalizePop(packet); + packets_.FinalizePop(packet); if (is_probing && bytes_sent > recommended_probe_size) break; } else { // Send failed, put it back into the queue. - packets_->CancelPop(packet); + packets_.CancelPop(packet); break; } } - if (packets_->Empty() && !Congested()) { + if (packets_.Empty() && !Congested()) { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. if (packet_counter_ > 0) { @@ -378,7 +368,7 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { process_thread_ = process_thread; } -bool PacedSender::SendPacket(const PacketQueueInterface::Packet& packet, +bool PacedSender::SendPacket(const RoundRobinPacketQueue::Packet& packet, const PacedPacketInfo& pacing_info) { RTC_DCHECK(!paused_); bool audio_packet = packet.priority == kHighPriority; diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index c5ca5a6b9a..f12917408a 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -15,7 +15,7 @@ #include "absl/types/optional.h" #include "modules/pacing/pacer.h" -#include "modules/pacing/packet_queue_interface.h" +#include "modules/pacing/round_robin_packet_queue.h" #include "rtc_base/criticalsection.h" #include "rtc_base/thread_annotations.h" @@ -65,11 +65,6 @@ class PacedSender : public Pacer { PacketSender* packet_sender, RtcEventLog* event_log); - PacedSender(const Clock* clock, - PacketSender* packet_sender, - RtcEventLog* event_log, - std::unique_ptr packets); - ~PacedSender() override; virtual void CreateProbeCluster(int bitrate_bps); @@ -149,7 +144,7 @@ class PacedSender : public Pacer { void UpdateBudgetWithBytesSent(size_t bytes) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - bool SendPacket(const PacketQueueInterface::Packet& packet, + bool SendPacket(const RoundRobinPacketQueue::Packet& packet, const PacedPacketInfo& cluster_info) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) @@ -195,8 +190,7 @@ class PacedSender : public Pacer { int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_); int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_); - const std::unique_ptr packets_ - RTC_PT_GUARDED_BY(critsect_); + RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_); uint64_t packet_counter_ RTC_GUARDED_BY(critsect_); int64_t congestion_window_bytes_ RTC_GUARDED_BY(critsect_) = diff --git a/modules/pacing/packet_queue.cc b/modules/pacing/packet_queue.cc deleted file mode 100644 index eb0f822d50..0000000000 --- a/modules/pacing/packet_queue.cc +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/pacing/packet_queue.h" - -#include -#include -#include - -#include "modules/include/module_common_types.h" -#include "modules/pacing/bitrate_prober.h" -#include "modules/pacing/interval_budget.h" -#include "modules/utility/include/process_thread.h" -#include "rtc_base/checks.h" -#include "rtc_base/logging.h" -#include "system_wrappers/include/clock.h" -#include "system_wrappers/include/field_trial.h" - -namespace webrtc { - -PacketQueue::PacketQueue(const Clock* clock) - : bytes_(0), - clock_(clock), - queue_time_sum_(0), - time_last_updated_(clock_->TimeInMilliseconds()), - paused_(false) {} - -PacketQueue::~PacketQueue() {} - -void PacketQueue::Push(const Packet& packet) { - UpdateQueueTime(packet.enqueue_time_ms); - - // Store packet in list, use pointers in priority queue for cheaper moves. - // Packets have a handle to its own iterator in the list, for easy removal - // when popping from queue. - packet_list_.push_front(packet); - std::list::iterator it = packet_list_.begin(); - it->this_it = it; // Handle for direct removal from list. - prio_queue_.push(&(*it)); // Pointer into list. - bytes_ += packet.bytes; -} - -const PacketQueueInterface::Packet& PacketQueue::BeginPop() { - const Packet& packet = *prio_queue_.top(); - prio_queue_.pop(); - return packet; -} - -void PacketQueue::CancelPop(const Packet& packet) { - prio_queue_.push(&(*packet.this_it)); -} - -void PacketQueue::FinalizePop(const Packet& packet) { - bytes_ -= packet.bytes; - int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; - RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); - packet_queue_time_ms -= packet.sum_paused_ms; - RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_); - queue_time_sum_ -= packet_queue_time_ms; - packet_list_.erase(packet.this_it); - RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); - if (packet_list_.empty()) - RTC_DCHECK_EQ(0, queue_time_sum_); -} - -bool PacketQueue::Empty() const { - return prio_queue_.empty(); -} - -size_t PacketQueue::SizeInPackets() const { - return prio_queue_.size(); -} - -uint64_t PacketQueue::SizeInBytes() const { - return bytes_; -} - -int64_t PacketQueue::OldestEnqueueTimeMs() const { - auto it = packet_list_.rbegin(); - if (it == packet_list_.rend()) - return 0; - return it->enqueue_time_ms; -} - -void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) { - RTC_DCHECK_GE(timestamp_ms, time_last_updated_); - if (timestamp_ms == time_last_updated_) - return; - - int64_t delta_ms = timestamp_ms - time_last_updated_; - - if (paused_) { - // Increase per-packet accumulators of time spent in queue while paused, - // so that we can disregard that when subtracting main accumulator when - // popping packet from the queue. - for (auto& it : packet_list_) { - it.sum_paused_ms += delta_ms; - } - } else { - // Use packet packet_list_.size() not prio_queue_.size() here, as there - // might be an outstanding element popped from prio_queue_ currently in - // the SendPacket() call, while packet_list_ will always be correct. - queue_time_sum_ += delta_ms * packet_list_.size(); - } - time_last_updated_ = timestamp_ms; -} - -void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { - if (paused_ == paused) - return; - UpdateQueueTime(timestamp_ms); - paused_ = paused; -} - -int64_t PacketQueue::AverageQueueTimeMs() const { - if (prio_queue_.empty()) - return 0; - return queue_time_sum_ / packet_list_.size(); -} - -} // namespace webrtc diff --git a/modules/pacing/packet_queue.h b/modules/pacing/packet_queue.h deleted file mode 100644 index 8041e08057..0000000000 --- a/modules/pacing/packet_queue.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_PACING_PACKET_QUEUE_H_ -#define MODULES_PACING_PACKET_QUEUE_H_ - -#include -#include -#include -#include - -#include "modules/pacing/packet_queue_interface.h" -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" - -namespace webrtc { - -class PacketQueue : public PacketQueueInterface { - public: - explicit PacketQueue(const Clock* clock); - ~PacketQueue() override; - - using Packet = PacketQueueInterface::Packet; - - void Push(const Packet& packet) override; - const Packet& BeginPop() override; - void CancelPop(const Packet& packet) override; - void FinalizePop(const Packet& packet) override; - bool Empty() const override; - size_t SizeInPackets() const override; - uint64_t SizeInBytes() const override; - int64_t OldestEnqueueTimeMs() const override; - void UpdateQueueTime(int64_t timestamp_ms) override; - void SetPauseState(bool paused, int64_t timestamp_ms) override; - int64_t AverageQueueTimeMs() const override; - - private: - // Try to add a packet to the set of ssrc/seqno identifiers currently in the - // queue. Return true if inserted, false if this is a duplicate. - bool AddToDupeSet(const Packet& packet); - - void RemoveFromDupeSet(const Packet& packet); - - // Used by priority queue to sort packets. - struct Comparator { - bool operator()(const Packet* first, const Packet* second) { - // Highest prio = 0. - if (first->priority != second->priority) - return first->priority > second->priority; - - // Retransmissions go first. - if (second->retransmission != first->retransmission) - return second->retransmission; - - // Older frames have higher prio. - if (first->capture_time_ms != second->capture_time_ms) - return first->capture_time_ms > second->capture_time_ms; - - return first->enqueue_order > second->enqueue_order; - } - }; - - // List of packets, in the order the were enqueued. Since dequeueing may - // occur out of order, use list instead of vector. - std::list packet_list_; - // Priority queue of the packets, sorted according to Comparator. - // Use pointers into list, to avodi moving whole struct within heap. - std::priority_queue, Comparator> prio_queue_; - // Total number of bytes in the queue. - uint64_t bytes_; - const Clock* const clock_; - int64_t queue_time_sum_; - int64_t time_last_updated_; - bool paused_; -}; -} // namespace webrtc - -#endif // MODULES_PACING_PACKET_QUEUE_H_ diff --git a/modules/pacing/packet_queue_interface.cc b/modules/pacing/packet_queue_interface.cc deleted file mode 100644 index a82d7df4f2..0000000000 --- a/modules/pacing/packet_queue_interface.cc +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "modules/pacing/packet_queue_interface.h" - -namespace webrtc { - -PacketQueueInterface::Packet::Packet(RtpPacketSender::Priority priority, - uint32_t ssrc, - uint16_t seq_number, - int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, - bool retransmission, - uint64_t enqueue_order) - : priority(priority), - ssrc(ssrc), - sequence_number(seq_number), - capture_time_ms(capture_time_ms), - enqueue_time_ms(enqueue_time_ms), - sum_paused_ms(0), - bytes(length_in_bytes), - retransmission(retransmission), - enqueue_order(enqueue_order) {} - -PacketQueueInterface::Packet::Packet(const Packet& other) = default; - -PacketQueueInterface::Packet::~Packet() {} - -bool PacketQueueInterface::Packet::operator<( - const PacketQueueInterface::Packet& other) const { - if (priority != other.priority) - return priority > other.priority; - if (retransmission != other.retransmission) - return other.retransmission; - - return enqueue_order > other.enqueue_order; -} -} // namespace webrtc diff --git a/modules/pacing/packet_queue_interface.h b/modules/pacing/packet_queue_interface.h deleted file mode 100644 index 81a8b05b1b..0000000000 --- a/modules/pacing/packet_queue_interface.h +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_PACING_PACKET_QUEUE_INTERFACE_H_ -#define MODULES_PACING_PACKET_QUEUE_INTERFACE_H_ - -#include - -#include -#include -#include - -#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" - -namespace webrtc { - -class PacketQueueInterface { - public: - PacketQueueInterface() = default; - virtual ~PacketQueueInterface() = default; - - struct Packet { - Packet(RtpPacketSender::Priority priority, - uint32_t ssrc, - uint16_t seq_number, - int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, - bool retransmission, - uint64_t enqueue_order); - Packet(const Packet& other); - virtual ~Packet(); - bool operator<(const Packet& other) const; - - RtpPacketSender::Priority priority; - uint32_t ssrc; - uint16_t sequence_number; - int64_t capture_time_ms; // Absolute time of frame capture. - int64_t enqueue_time_ms; // Absolute time of pacer queue entry. - int64_t sum_paused_ms; - size_t bytes; - bool retransmission; - uint64_t enqueue_order; - std::list::iterator this_it; - std::multiset::iterator enqueue_time_it; - }; - - virtual void Push(const Packet& packet) = 0; - virtual const Packet& BeginPop() = 0; - virtual void CancelPop(const Packet& packet) = 0; - virtual void FinalizePop(const Packet& packet) = 0; - virtual bool Empty() const = 0; - virtual size_t SizeInPackets() const = 0; - virtual uint64_t SizeInBytes() const = 0; - virtual int64_t OldestEnqueueTimeMs() const = 0; - virtual void UpdateQueueTime(int64_t timestamp_ms) = 0; - virtual void SetPauseState(bool paused, int64_t timestamp_ms) = 0; - virtual int64_t AverageQueueTimeMs() const = 0; -}; -} // namespace webrtc - -#endif // MODULES_PACING_PACKET_QUEUE_INTERFACE_H_ diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index 0989b45966..e892ddb4ea 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -17,6 +17,38 @@ namespace webrtc { +RoundRobinPacketQueue::Packet::Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order) + : priority(priority), + ssrc(ssrc), + sequence_number(seq_number), + capture_time_ms(capture_time_ms), + enqueue_time_ms(enqueue_time_ms), + sum_paused_ms(0), + bytes(length_in_bytes), + retransmission(retransmission), + enqueue_order(enqueue_order) {} + +RoundRobinPacketQueue::Packet::Packet(const Packet& other) = default; + +RoundRobinPacketQueue::Packet::~Packet() {} + +bool RoundRobinPacketQueue::Packet::operator<( + const RoundRobinPacketQueue::Packet& other) const { + if (priority != other.priority) + return priority > other.priority; + if (retransmission != other.retransmission) + return other.retransmission; + + return enqueue_order > other.enqueue_order; +} + RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {} RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default; RoundRobinPacketQueue::Stream::~Stream() {} @@ -69,7 +101,7 @@ void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) { size_bytes_ += packet.bytes; } -const PacketQueueInterface::Packet& RoundRobinPacketQueue::BeginPop() { +const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() { RTC_CHECK(!pop_packet_ && !pop_stream_); Stream* stream = GetHighestPriorityStream(); diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index e8cb19e9c7..2017eb64c0 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -11,35 +11,59 @@ #ifndef MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_ #define MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_ +#include #include #include #include -#include "modules/pacing/packet_queue_interface.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" namespace webrtc { -class RoundRobinPacketQueue : public PacketQueueInterface { +class RoundRobinPacketQueue { public: explicit RoundRobinPacketQueue(const Clock* clock); - ~RoundRobinPacketQueue() override; + ~RoundRobinPacketQueue(); - using Packet = PacketQueueInterface::Packet; + struct Packet { + Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order); + Packet(const Packet& other); + virtual ~Packet(); + bool operator<(const Packet& other) const; - void Push(const Packet& packet) override; - const Packet& BeginPop() override; - void CancelPop(const Packet& packet) override; - void FinalizePop(const Packet& packet) override; + RtpPacketSender::Priority priority; + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; // Absolute time of frame capture. + int64_t enqueue_time_ms; // Absolute time of pacer queue entry. + int64_t sum_paused_ms; + size_t bytes; + bool retransmission; + uint64_t enqueue_order; + std::list::iterator this_it; + std::multiset::iterator enqueue_time_it; + }; - bool Empty() const override; - size_t SizeInPackets() const override; - uint64_t SizeInBytes() const override; + void Push(const Packet& packet); + const Packet& BeginPop(); + void CancelPop(const Packet& packet); + void FinalizePop(const Packet& packet); - int64_t OldestEnqueueTimeMs() const override; - int64_t AverageQueueTimeMs() const override; - void UpdateQueueTime(int64_t timestamp_ms) override; - void SetPauseState(bool paused, int64_t timestamp_ms) override; + bool Empty() const; + size_t SizeInPackets() const; + uint64_t SizeInBytes() const; + + int64_t OldestEnqueueTimeMs() const; + int64_t AverageQueueTimeMs() const; + void UpdateQueueTime(int64_t timestamp_ms); + void SetPauseState(bool paused, int64_t timestamp_ms); private: struct StreamPrioKey {