diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index a45f7caced..fdfa3b271c 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -19,10 +19,12 @@ rtc_static_library("pacing") { "pacer.h", "packet_queue.cc", "packet_queue.h", - "packet_queue2.cc", - "packet_queue2.h", + "packet_queue_interface.cc", + "packet_queue_interface.h", "packet_router.cc", "packet_router.h", + "round_robin_packet_queue.cc", + "round_robin_packet_queue.h", ] if (!build_with_chromium && is_clang) { diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 51414c16d2..862f4e189f 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -20,6 +20,8 @@ #include "modules/include/module_common_types.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" +#include "modules/pacing/packet_queue.h" +#include "modules/pacing/round_robin_packet_queue.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" @@ -52,18 +54,29 @@ namespace webrtc { const int64_t PacedSender::kMaxQueueLengthMs = 2000; +namespace { +std::unique_ptr CreatePacketQueue(const Clock* clock, + bool round_robin) { + if (round_robin) { + return rtc::MakeUnique(clock); + } else { + return rtc::MakeUnique(clock); + } +} +} // namespace + PacedSender::PacedSender(const Clock* clock, PacketSender* packet_sender, - RtcEventLog* event_log) : - PacedSender(clock, packet_sender, event_log, - IsRoundRobinPacingEnabled() - ? rtc::MakeUnique(clock) - : rtc::MakeUnique(clock)) {} + RtcEventLog* event_log) + : PacedSender(clock, + packet_sender, + event_log, + CreatePacketQueue(clock, IsRoundRobinPacingEnabled())) {} PacedSender::PacedSender(const Clock* clock, PacketSender* packet_sender, RtcEventLog* event_log, - std::unique_ptr packets) + std::unique_ptr packets) : clock_(clock), packet_sender_(packet_sender), paused_(false), diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 1c73a79c72..0501bbb691 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -15,7 +15,7 @@ #include "api/optional.h" #include "modules/pacing/pacer.h" -#include "modules/pacing/packet_queue2.h" +#include "modules/pacing/packet_queue_interface.h" #include "rtc_base/criticalsection.h" #include "rtc_base/thread_annotations.h" #include "typedefs.h" // NOLINT(build/include) @@ -62,7 +62,7 @@ class PacedSender : public Pacer { PacedSender(const Clock* clock, PacketSender* packet_sender, RtcEventLog* event_log, - std::unique_ptr packets); + std::unique_ptr packets); ~PacedSender() override; @@ -129,7 +129,7 @@ class PacedSender : public Pacer { void UpdateBudgetWithBytesSent(size_t bytes) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - bool SendPacket(const PacketQueue::Packet& packet, + bool SendPacket(const PacketQueueInterface::Packet& packet, const PacedPacketInfo& cluster_info) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) @@ -159,7 +159,8 @@ class PacedSender : public Pacer { int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_); int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_); - const std::unique_ptr packets_ RTC_PT_GUARDED_BY(critsect_); + const std::unique_ptr packets_ + RTC_PT_GUARDED_BY(critsect_); uint64_t packet_counter_ RTC_GUARDED_BY(critsect_); // Lock to avoid race when attaching process thread. This can happen due to diff --git a/modules/pacing/packet_queue.cc b/modules/pacing/packet_queue.cc index 667394a0af..eb0f822d50 100644 --- a/modules/pacing/packet_queue.cc +++ b/modules/pacing/packet_queue.cc @@ -25,28 +25,6 @@ namespace webrtc { -PacketQueue::Packet::Packet(RtpPacketSender::Priority priority, - uint32_t ssrc, - uint16_t seq_number, - int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, - bool retransmission, - uint64_t enqueue_order) - : priority(priority), - ssrc(ssrc), - sequence_number(seq_number), - capture_time_ms(capture_time_ms), - enqueue_time_ms(enqueue_time_ms), - sum_paused_ms(0), - bytes(length_in_bytes), - retransmission(retransmission), - enqueue_order(enqueue_order) {} - -PacketQueue::Packet::Packet(const Packet& other) = default; - -PacketQueue::Packet::~Packet() {} - PacketQueue::PacketQueue(const Clock* clock) : bytes_(0), clock_(clock), @@ -69,17 +47,17 @@ void PacketQueue::Push(const Packet& packet) { bytes_ += packet.bytes; } -const PacketQueue::Packet& PacketQueue::BeginPop() { - const PacketQueue::Packet& packet = *prio_queue_.top(); +const PacketQueueInterface::Packet& PacketQueue::BeginPop() { + const Packet& packet = *prio_queue_.top(); prio_queue_.pop(); return packet; } -void PacketQueue::CancelPop(const PacketQueue::Packet& packet) { +void PacketQueue::CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } -void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) { +void PacketQueue::FinalizePop(const Packet& packet) { bytes_ -= packet.bytes; int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); diff --git a/modules/pacing/packet_queue.h b/modules/pacing/packet_queue.h index 240961601c..8041e08057 100644 --- a/modules/pacing/packet_queue.h +++ b/modules/pacing/packet_queue.h @@ -16,62 +16,29 @@ #include #include +#include "modules/pacing/packet_queue_interface.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" namespace webrtc { -class PacketQueue { +class PacketQueue : public PacketQueueInterface { public: explicit PacketQueue(const Clock* clock); - virtual ~PacketQueue(); + ~PacketQueue() override; - struct Packet { - Packet(RtpPacketSender::Priority priority, - uint32_t ssrc, - uint16_t seq_number, - int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, - bool retransmission, - uint64_t enqueue_order); + using Packet = PacketQueueInterface::Packet; - Packet(const Packet& other); - - virtual ~Packet(); - - bool operator<(const Packet& other) const { - if (priority != other.priority) - return priority > other.priority; - if (retransmission != other.retransmission) - return other.retransmission; - - return enqueue_order > other.enqueue_order; - } - - RtpPacketSender::Priority priority; - uint32_t ssrc; - uint16_t sequence_number; - int64_t capture_time_ms; // Absolute time of frame capture. - int64_t enqueue_time_ms; // Absolute time of pacer queue entry. - int64_t sum_paused_ms; - size_t bytes; - bool retransmission; - uint64_t enqueue_order; - std::list::iterator this_it; - std::multiset::iterator enqueue_time_it; - }; - - virtual void Push(const Packet& packet); - virtual const Packet& BeginPop(); - virtual void CancelPop(const Packet& packet); - virtual void FinalizePop(const Packet& packet); - virtual bool Empty() const; - virtual size_t SizeInPackets() const; - virtual uint64_t SizeInBytes() const; - virtual int64_t OldestEnqueueTimeMs() const; - virtual void UpdateQueueTime(int64_t timestamp_ms); - virtual void SetPauseState(bool paused, int64_t timestamp_ms); - virtual int64_t AverageQueueTimeMs() const; + void Push(const Packet& packet) override; + const Packet& BeginPop() override; + void CancelPop(const Packet& packet) override; + void FinalizePop(const Packet& packet) override; + bool Empty() const override; + size_t SizeInPackets() const override; + uint64_t SizeInBytes() const override; + int64_t OldestEnqueueTimeMs() const override; + void UpdateQueueTime(int64_t timestamp_ms) override; + void SetPauseState(bool paused, int64_t timestamp_ms) override; + int64_t AverageQueueTimeMs() const override; private: // Try to add a packet to the set of ssrc/seqno identifiers currently in the diff --git a/modules/pacing/packet_queue_interface.cc b/modules/pacing/packet_queue_interface.cc new file mode 100644 index 0000000000..a82d7df4f2 --- /dev/null +++ b/modules/pacing/packet_queue_interface.cc @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/packet_queue_interface.h" + +namespace webrtc { + +PacketQueueInterface::Packet::Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order) + : priority(priority), + ssrc(ssrc), + sequence_number(seq_number), + capture_time_ms(capture_time_ms), + enqueue_time_ms(enqueue_time_ms), + sum_paused_ms(0), + bytes(length_in_bytes), + retransmission(retransmission), + enqueue_order(enqueue_order) {} + +PacketQueueInterface::Packet::Packet(const Packet& other) = default; + +PacketQueueInterface::Packet::~Packet() {} + +bool PacketQueueInterface::Packet::operator<( + const PacketQueueInterface::Packet& other) const { + if (priority != other.priority) + return priority > other.priority; + if (retransmission != other.retransmission) + return other.retransmission; + + return enqueue_order > other.enqueue_order; +} +} // namespace webrtc diff --git a/modules/pacing/packet_queue_interface.h b/modules/pacing/packet_queue_interface.h new file mode 100644 index 0000000000..81a8b05b1b --- /dev/null +++ b/modules/pacing/packet_queue_interface.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACKET_QUEUE_INTERFACE_H_ +#define MODULES_PACING_PACKET_QUEUE_INTERFACE_H_ + +#include + +#include +#include +#include + +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" + +namespace webrtc { + +class PacketQueueInterface { + public: + PacketQueueInterface() = default; + virtual ~PacketQueueInterface() = default; + + struct Packet { + Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order); + Packet(const Packet& other); + virtual ~Packet(); + bool operator<(const Packet& other) const; + + RtpPacketSender::Priority priority; + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; // Absolute time of frame capture. + int64_t enqueue_time_ms; // Absolute time of pacer queue entry. + int64_t sum_paused_ms; + size_t bytes; + bool retransmission; + uint64_t enqueue_order; + std::list::iterator this_it; + std::multiset::iterator enqueue_time_it; + }; + + virtual void Push(const Packet& packet) = 0; + virtual const Packet& BeginPop() = 0; + virtual void CancelPop(const Packet& packet) = 0; + virtual void FinalizePop(const Packet& packet) = 0; + virtual bool Empty() const = 0; + virtual size_t SizeInPackets() const = 0; + virtual uint64_t SizeInBytes() const = 0; + virtual int64_t OldestEnqueueTimeMs() const = 0; + virtual void UpdateQueueTime(int64_t timestamp_ms) = 0; + virtual void SetPauseState(bool paused, int64_t timestamp_ms) = 0; + virtual int64_t AverageQueueTimeMs() const = 0; +}; +} // namespace webrtc + +#endif // MODULES_PACING_PACKET_QUEUE_INTERFACE_H_ diff --git a/modules/pacing/packet_queue2.cc b/modules/pacing/round_robin_packet_queue.cc similarity index 84% rename from modules/pacing/packet_queue2.cc rename to modules/pacing/round_robin_packet_queue.cc index 6aee807af3..80f9ea799c 100644 --- a/modules/pacing/packet_queue2.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -8,7 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "modules/pacing/packet_queue2.h" +#include "modules/pacing/round_robin_packet_queue.h" #include @@ -17,17 +17,15 @@ namespace webrtc { -PacketQueue2::Stream::Stream() : bytes(0) {} -PacketQueue2::Stream::~Stream() {} +RoundRobinPacketQueue::Stream::Stream() : bytes(0) {} +RoundRobinPacketQueue::Stream::~Stream() {} -PacketQueue2::PacketQueue2(const Clock* clock) - : PacketQueue(clock), - clock_(clock), - time_last_updated_(clock_->TimeInMilliseconds()) {} +RoundRobinPacketQueue::RoundRobinPacketQueue(const Clock* clock) + : clock_(clock), time_last_updated_(clock_->TimeInMilliseconds()) {} -PacketQueue2::~PacketQueue2() {} +RoundRobinPacketQueue::~RoundRobinPacketQueue() {} -void PacketQueue2::Push(const Packet& packet_to_insert) { +void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) { Packet packet(packet_to_insert); auto stream_info_it = streams_.find(packet.ssrc); @@ -70,7 +68,7 @@ void PacketQueue2::Push(const Packet& packet_to_insert) { size_bytes_ += packet.bytes; } -const PacketQueue2::Packet& PacketQueue2::BeginPop() { +const PacketQueueInterface::Packet& RoundRobinPacketQueue::BeginPop() { RTC_CHECK(!pop_packet_ && !pop_stream_); Stream* stream = GetHighestPriorityStream(); @@ -81,14 +79,14 @@ const PacketQueue2::Packet& PacketQueue2::BeginPop() { return *pop_packet_; } -void PacketQueue2::CancelPop(const Packet& packet) { +void RoundRobinPacketQueue::CancelPop(const Packet& packet) { RTC_CHECK(pop_packet_ && pop_stream_); (*pop_stream_)->packet_queue.push(*pop_packet_); pop_packet_.reset(); pop_stream_.reset(); } -void PacketQueue2::FinalizePop(const Packet& packet) { +void RoundRobinPacketQueue::FinalizePop(const Packet& packet) { RTC_CHECK(!paused_); if (!Empty()) { RTC_CHECK(pop_packet_ && pop_stream_); @@ -137,28 +135,28 @@ void PacketQueue2::FinalizePop(const Packet& packet) { } } -bool PacketQueue2::Empty() const { +bool RoundRobinPacketQueue::Empty() const { RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) || (stream_priorities_.empty() && size_packets_ == 0)); return stream_priorities_.empty(); } -size_t PacketQueue2::SizeInPackets() const { +size_t RoundRobinPacketQueue::SizeInPackets() const { return size_packets_; } -uint64_t PacketQueue2::SizeInBytes() const { +uint64_t RoundRobinPacketQueue::SizeInBytes() const { return size_bytes_; } -int64_t PacketQueue2::OldestEnqueueTimeMs() const { +int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const { if (Empty()) return 0; RTC_CHECK(!enqueue_times_.empty()); return *enqueue_times_.begin(); } -void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) { +void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) { RTC_CHECK_GE(timestamp_ms, time_last_updated_); if (timestamp_ms == time_last_updated_) return; @@ -174,20 +172,21 @@ void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) { time_last_updated_ = timestamp_ms; } -void PacketQueue2::SetPauseState(bool paused, int64_t timestamp_ms) { +void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { if (paused_ == paused) return; UpdateQueueTime(timestamp_ms); paused_ = paused; } -int64_t PacketQueue2::AverageQueueTimeMs() const { +int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const { if (Empty()) return 0; return queue_time_sum_ms_ / size_packets_; } -PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() { +RoundRobinPacketQueue::Stream* +RoundRobinPacketQueue::GetHighestPriorityStream() { RTC_CHECK(!stream_priorities_.empty()); uint32_t ssrc = stream_priorities_.begin()->second; @@ -198,7 +197,7 @@ PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() { return &stream_info_it->second; } -bool PacketQueue2::IsSsrcScheduled(uint32_t ssrc) const { +bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const { for (const auto& scheduled_stream : stream_priorities_) { if (scheduled_stream.second == ssrc) return true; diff --git a/modules/pacing/packet_queue2.h b/modules/pacing/round_robin_packet_queue.h similarity index 88% rename from modules/pacing/packet_queue2.h rename to modules/pacing/round_robin_packet_queue.h index 8c18a0fb60..fe0cf70cd4 100644 --- a/modules/pacing/packet_queue2.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -8,24 +8,24 @@ * be found in the AUTHORS file in the root of the source tree. */ -#ifndef MODULES_PACING_PACKET_QUEUE2_H_ -#define MODULES_PACING_PACKET_QUEUE2_H_ +#ifndef MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_ +#define MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_ #include #include #include -#include "modules/pacing/packet_queue.h" +#include "modules/pacing/packet_queue_interface.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" namespace webrtc { -class PacketQueue2 : public PacketQueue { +class RoundRobinPacketQueue : public PacketQueueInterface { public: - explicit PacketQueue2(const Clock* clock); - ~PacketQueue2() override; + explicit RoundRobinPacketQueue(const Clock* clock); + ~RoundRobinPacketQueue() override; - using Packet = PacketQueue::Packet; + using Packet = PacketQueueInterface::Packet; void Push(const Packet& packet) override; const Packet& BeginPop() override; @@ -41,6 +41,7 @@ class PacketQueue2 : public PacketQueue { void UpdateQueueTime(int64_t timestamp_ms) override; void SetPauseState(bool paused, int64_t timestamp_ms) override; + private: struct StreamPrioKey { StreamPrioKey() = default; StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes) @@ -73,7 +74,6 @@ class PacketQueue2 : public PacketQueue { std::multimap::iterator priority_it; }; - private: static constexpr size_t kMaxLeadingBytes = 1400; Stream* GetHighestPriorityStream(); @@ -108,4 +108,4 @@ class PacketQueue2 : public PacketQueue { }; } // namespace webrtc -#endif // MODULES_PACING_PACKET_QUEUE2_H_ +#endif // MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_