From 58ee187554a78ade9dbd95d76b5d8d892f84c7f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Tue, 18 Jun 2019 16:20:11 +0200 Subject: [PATCH] Add support within PacedSender and pacer queue for owning rtp packets. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This CL builds on https://webrtc-review.googlesource.com/c/src/+/142165 It adds the parts within the paced sender that uses those send methods. A follow-up will add the pre-pacer RTP sender parts. That CL will also add proper integration testing. Here, I mostly add coverage for the new send methods. When the old code-path is removed, all tests need to be converted to exclusively use the owned path. Bug: webrtc:10633 Change-Id: I870d9a2285f07a7b7b0ef6758aa310808f210f28 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/142179 Commit-Queue: Erik Språng Reviewed-by: Danil Chapovalov Reviewed-by: Sebastian Jansson Cr-Commit-Position: refs/heads/master@{#28308} --- modules/pacing/BUILD.gn | 1 + modules/pacing/paced_sender.cc | 99 +++++++-- modules/pacing/paced_sender.h | 16 +- modules/pacing/paced_sender_unittest.cc | 68 +++++++ modules/pacing/packet_router.cc | 25 +++ modules/pacing/packet_router.h | 7 +- modules/pacing/round_robin_packet_queue.cc | 211 ++++++++++++-------- modules/pacing/round_robin_packet_queue.h | 116 ++++++++--- modules/rtp_rtcp/include/rtp_packet_pacer.h | 4 +- modules/rtp_rtcp/include/rtp_rtcp.h | 1 + 10 files changed, 413 insertions(+), 135 deletions(-) diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index a35ba85e45..9c9f7d91f8 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -85,6 +85,7 @@ if (rtc_include_tests) { "../rtp_rtcp", "../rtp_rtcp:mock_rtp_rtcp", "../rtp_rtcp:rtp_rtcp_format", + "//third_party/abseil-cpp/absl/memory", ] } diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 3e36f14740..6177ca61fb 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -44,6 +44,27 @@ bool IsEnabled(const WebRtcKeyValueConfig& field_trials, return field_trials.Lookup(key).find("Enabled") == 0; } +int GetPriorityForType(RtpPacketToSend::Type type) { + switch (type) { + case RtpPacketToSend::Type::kAudio: + // Audio is always prioritized over other packet types. + return 0; + case RtpPacketToSend::Type::kRetransmission: + // Send retransmissions before new media. + return 1; + case RtpPacketToSend::Type::kVideo: + // Video has "normal" priority, in the old speak. + return 2; + case RtpPacketToSend::Type::kForwardErrorCorrection: + // Redundancy is OK to drop, but the content is hopefully not useless. + return 3; + case RtpPacketToSend::Type::kPadding: + // Packets that are in themselves likely useless, only sent to keep the + // BWE high. + return 4; + } +} + } // namespace const int64_t PacedSender::kMaxQueueLengthMs = 2000; const float PacedSender::kDefaultPaceMultiplier = 2.5f; @@ -186,9 +207,37 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, if (capture_time_ms < 0) capture_time_ms = now_ms; - packets_.Push(RoundRobinPacketQueue::Packet( - priority, ssrc, sequence_number, capture_time_ms, now_ms, bytes, - retransmission, packet_counter_++)); + RtpPacketToSend::Type type; + switch (priority) { + case RtpPacketPacer::kHighPriority: + type = RtpPacketToSend::Type::kAudio; + break; + case RtpPacketPacer::kNormalPriority: + type = RtpPacketToSend::Type::kRetransmission; + break; + default: + type = RtpPacketToSend::Type::kVideo; + } + packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number, + capture_time_ms, now_ms, bytes, retransmission, + packet_counter_++); +} + +void PacedSender::EnqueuePacket(std::unique_ptr packet) { + rtc::CritScope cs(&critsect_); + RTC_DCHECK(pacing_bitrate_kbps_ > 0) + << "SetPacingRate must be called before InsertPacket."; + + int64_t now_ms = TimeMilliseconds(); + prober_.OnIncomingPacket(packet->payload_size()); + + if (packet->capture_time_ms() < 0) { + packet->set_capture_time_ms(now_ms); + } + + RTC_CHECK(packet->packet_type()); + int priority = GetPriorityForType(*packet->packet_type()); + packets_.Push(priority, now_ms, packet_counter_++, std::move(packet)); } void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { @@ -324,27 +373,43 @@ void PacedSender::Process() { // The paused state is checked in the loop since it leaves the critical // section allowing the paused state to be changed from other code. while (!packets_.Empty() && !paused_) { - const auto* packet = GetPendingPacket(pacing_info); + auto* packet = GetPendingPacket(pacing_info); if (packet == nullptr) break; + std::unique_ptr rtp_packet = packet->ReleasePacket(); + const bool owned_rtp_packet = rtp_packet != nullptr; + critsect_.Leave(); - RtpPacketSendResult success = packet_router_->TimeToSendPacket( - packet->ssrc, packet->sequence_number, packet->capture_time_ms, - packet->retransmission, pacing_info); + + RtpPacketSendResult success; + if (rtp_packet != nullptr) { + packet_router_->SendPacket(std::move(rtp_packet), pacing_info); + success = RtpPacketSendResult::kSuccess; + } else { + success = packet_router_->TimeToSendPacket( + packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(), + packet->is_retransmission(), pacing_info); + } + critsect_.Enter(); if (success == RtpPacketSendResult::kSuccess || success == RtpPacketSendResult::kPacketNotFound) { // Packet sent or invalid packet, remove it from queue. // TODO(webrtc:8052): Don't consume media budget on kInvalid. - bytes_sent += packet->bytes; + bytes_sent += packet->size_in_bytes(); // Send succeeded, remove it from the queue. OnPacketSent(packet); if (is_probing && bytes_sent > recommended_probe_size) break; + } else if (owned_rtp_packet) { + // Send failed, but we can't put it back in the queue, remove it without + // consuming budget. + packets_.FinalizePop(); + break; } else { // Send failed, put it back into the queue. - packets_.CancelPop(*packet); + packets_.CancelPop(); break; } } @@ -379,34 +444,34 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { process_thread_ = process_thread; } -const RoundRobinPacketQueue::Packet* PacedSender::GetPendingPacket( +RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket( const PacedPacketInfo& pacing_info) { // Since we need to release the lock in order to send, we first pop the // element from the priority queue but keep it in storage, so that we can // reinsert it if send fails. - const RoundRobinPacketQueue::Packet* packet = &packets_.BeginPop(); - bool audio_packet = packet->priority == kHighPriority; + RoundRobinPacketQueue::QueuedPacket* packet = packets_.BeginPop(); + bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; bool apply_pacing = !audio_packet || pace_audio_; if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 && pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe))) { - packets_.CancelPop(*packet); + packets_.CancelPop(); return nullptr; } return packet; } -void PacedSender::OnPacketSent(const RoundRobinPacketQueue::Packet* packet) { +void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) { if (first_sent_packet_ms_ == -1) first_sent_packet_ms_ = TimeMilliseconds(); - bool audio_packet = packet->priority == kHighPriority; + bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; if (!audio_packet || account_for_audio_) { // Update media bytes sent. - UpdateBudgetWithBytesSent(packet->bytes); + UpdateBudgetWithBytesSent(packet->size_in_bytes()); last_send_time_us_ = clock_->TimeInMicroseconds(); } // Send succeeded, remove it from the queue. - packets_.FinalizePop(*packet); + packets_.FinalizePop(); } void PacedSender::OnPaddingSent(size_t bytes_sent) { diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index eb98ca2571..c67e162d4a 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -25,7 +25,9 @@ #include "modules/pacing/interval_budget.h" #include "modules/pacing/packet_router.h" #include "modules/pacing/round_robin_packet_queue.h" +#include "modules/rtp_rtcp/include/rtp_packet_pacer.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/critical_section.h" #include "rtc_base/experiments/field_trial_parser.h" @@ -35,7 +37,7 @@ namespace webrtc { class Clock; class RtcEventLog; -class PacedSender : public Module, public RtpPacketSender { +class PacedSender : public Module, public RtpPacketPacer { public: static constexpr int64_t kNoCongestionWindow = -1; @@ -77,8 +79,8 @@ class PacedSender : public Module, public RtpPacketSender { // Sets the pacing rates. Must be called once before packets can be sent. void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps); - // Returns true if we send the packet now, else it will add the packet - // information to the queue and call TimeToSendPacket when it's time to send. + // Adds the packet information to the queue and calls TimeToSendPacket + // when it's time to send. void InsertPacket(RtpPacketSender::Priority priority, uint32_t ssrc, uint16_t sequence_number, @@ -86,6 +88,10 @@ class PacedSender : public Module, public RtpPacketSender { size_t bytes, bool retransmission) override; + // Adds the packet to the queue and calls PacketRouter::SendPacket() when + // it's time to send. + void EnqueuePacket(std::unique_ptr packet) override; + // Currently audio traffic is not accounted by pacer and passed through. // With the introduction of audio BWE audio traffic will be accounted for // the pacer budget calculation. The audio traffic still will be injected @@ -129,10 +135,10 @@ class PacedSender : public Module, public RtpPacketSender { void UpdateBudgetWithBytesSent(size_t bytes) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - const RoundRobinPacketQueue::Packet* GetPendingPacket( + RoundRobinPacketQueue::QueuedPacket* GetPendingPacket( const PacedPacketInfo& pacing_info) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - void OnPacketSent(const RoundRobinPacketQueue::Packet* packet) + void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); void OnPaddingSent(size_t padding_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index d991d61e31..d630980aa5 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -12,6 +12,7 @@ #include #include +#include "absl/memory/memory.h" #include "modules/pacing/paced_sender.h" #include "modules/pacing/packet_router.h" #include "system_wrappers/include/clock.h" @@ -22,6 +23,8 @@ using ::testing::_; using ::testing::Field; +using ::testing::Pointee; +using ::testing::Property; using ::testing::Return; namespace { @@ -34,6 +37,11 @@ constexpr unsigned kSecondClusterBps = 1800000; constexpr int kBitrateProbingError = 150000; const float kPaceMultiplier = 2.5f; + +constexpr uint32_t kAudioSsrc = 12345; +constexpr uint32_t kVideoSsrc = 234565; +constexpr uint32_t kVideoRtxSsrc = 34567; +constexpr uint32_t kFlexFecSsrc = 45678; } // namespace namespace webrtc { @@ -49,6 +57,9 @@ class MockPacedSenderCallback : public PacketRouter { int64_t capture_time_ms, bool retransmission, const PacedPacketInfo& pacing_info)); + MOCK_METHOD2(SendPacket, + void(std::unique_ptr packet, + const PacedPacketInfo& pacing_info)); MOCK_METHOD2(TimeToSendPadding, size_t(size_t bytes, const PacedPacketInfo& pacing_info)); }; @@ -139,6 +150,30 @@ class PacedSenderTest : public ::testing::TestWithParam { .Times(1) .WillRepeatedly(Return(RtpPacketSendResult::kSuccess)); } + + std::unique_ptr BuildRtpPacket(RtpPacketToSend::Type type) { + auto packet = absl::make_unique(nullptr); + packet->set_packet_type(type); + switch (type) { + case RtpPacketToSend::Type::kAudio: + packet->SetSsrc(kAudioSsrc); + break; + case RtpPacketToSend::Type::kVideo: + packet->SetSsrc(kVideoSsrc); + break; + case RtpPacketToSend::Type::kRetransmission: + case RtpPacketToSend::Type::kPadding: + packet->SetSsrc(kVideoRtxSsrc); + break; + case RtpPacketToSend::Type::kForwardErrorCorrection: + packet->SetSsrc(kFlexFecSsrc); + break; + } + + packet->SetPayloadSize(234); + return packet; + } + SimulatedClock clock_; MockPacedSenderCallback callback_; std::unique_ptr send_bucket_; @@ -1292,6 +1327,39 @@ TEST_F(PacedSenderTest, AvoidBusyLoopOnSendFailure) { EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); } +TEST_F(PacedSenderTest, OwnedPacketPrioritizedOnType) { + // Insert a packet of each type, from low to high priority. Since priority + // is weighted higher than insert order, these should come out of the pacer + // in backwards order. + for (RtpPacketToSend::Type type : + {RtpPacketToSend::Type::kPadding, + RtpPacketToSend::Type::kForwardErrorCorrection, + RtpPacketToSend::Type::kVideo, RtpPacketToSend::Type::kRetransmission, + RtpPacketToSend::Type::kAudio}) { + send_bucket_->EnqueuePacket(BuildRtpPacket(type)); + } + + ::testing::InSequence seq; + EXPECT_CALL( + callback_, + SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _)); + EXPECT_CALL( + callback_, + SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); + EXPECT_CALL( + callback_, + SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _)); + EXPECT_CALL( + callback_, + SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _)); + EXPECT_CALL( + callback_, + SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _)); + + clock_.AdvanceTimeMilliseconds(200); + send_bucket_->Process(); +} + // TODO(philipel): Move to PacketQueue2 unittests. #if 0 TEST_F(PacedSenderTest, QueueTimeWithPause) { diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc index a7c2b939c1..6d2c7ff337 100644 --- a/modules/pacing/packet_router.cc +++ b/modules/pacing/packet_router.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include "absl/types/optional.h" #include "modules/rtp_rtcp/include/rtp_rtcp.h" @@ -20,6 +21,7 @@ #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" #include "rtc_base/atomic_ops.h" #include "rtc_base/checks.h" +#include "rtc_base/logging.h" #include "rtc_base/time_utils.h" namespace webrtc { @@ -125,6 +127,29 @@ RtpPacketSendResult PacketRouter::TimeToSendPacket( return RtpPacketSendResult::kPacketNotFound; } +void PacketRouter::SendPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + rtc::CritScope cs(&modules_crit_); + for (auto* rtp_module : rtp_send_modules_) { + if (rtp_module->TrySendPacket(packet.get(), cluster_info)) { + const bool can_send_padding = + (rtp_module->RtxSendStatus() & kRtxRedundantPayloads) && + rtp_module->HasBweExtensions(); + if (can_send_padding) { + // This is now the last module to send media, and has the desired + // properties needed for payload based padding. Cache it for later use. + last_send_module_ = rtp_module; + } + return; + } + } + + RTC_LOG(LS_WARNING) << "Failed to send packet, matching RTP module not found " + "or transport error. SSRC = " + << packet->Ssrc() << ", sequence number " + << packet->SequenceNumber(); +} + size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send, const PacedPacketInfo& pacing_info) { size_t total_bytes_sent = 0; diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h index 4ff5a0b476..9a51899438 100644 --- a/modules/pacing/packet_router.h +++ b/modules/pacing/packet_router.h @@ -14,11 +14,13 @@ #include #include #include +#include #include #include "api/transport/network_types.h" #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/critical_section.h" #include "rtc_base/thread_annotations.h" @@ -30,7 +32,7 @@ namespace rtcp { class TransportFeedback; } // namespace rtcp -// PacketRouter keeps track of RTP send modules to support the pacer. +// PacketRouter keeps track of rtp send modules to support the pacer. // In addition, it handles feedback messages, which are sent on a send // module if possible (sender report), otherwise on receive module // (receiver report). For the latter case, we also keep track of the @@ -56,6 +58,9 @@ class PacketRouter : public TransportSequenceNumberAllocator, bool retransmission, const PacedPacketInfo& packet_info); + virtual void SendPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info); + virtual size_t TimeToSendPadding(size_t bytes, const PacedPacketInfo& packet_info); diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index a87c47cee5..9f52a6a9c8 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -18,36 +18,53 @@ namespace webrtc { -RoundRobinPacketQueue::Packet::Packet(RtpPacketSender::Priority priority, - uint32_t ssrc, - uint16_t seq_number, - int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, - bool retransmission, - uint64_t enqueue_order) - : priority(priority), - ssrc(ssrc), - sequence_number(seq_number), - capture_time_ms(capture_time_ms), - enqueue_time_ms(enqueue_time_ms), - sum_paused_ms(0), - bytes(length_in_bytes), - retransmission(retransmission), - enqueue_order(enqueue_order) {} +RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) = + default; +RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default; -RoundRobinPacketQueue::Packet::Packet(const Packet& other) = default; +RoundRobinPacketQueue::QueuedPacket::QueuedPacket( + int priority, + RtpPacketToSend::Type type, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order, + std::multiset::iterator enqueue_time_it, + absl::optional>::iterator> + packet_it) + : type_(type), + priority_(priority), + ssrc_(ssrc), + sequence_number_(seq_number), + capture_time_ms_(capture_time_ms), + enqueue_time_ms_(enqueue_time_ms), + bytes_(length_in_bytes), + retransmission_(retransmission), + enqueue_order_(enqueue_order), + enqueue_time_it_(enqueue_time_it), + packet_it_(packet_it) {} -RoundRobinPacketQueue::Packet::~Packet() {} +std::unique_ptr +RoundRobinPacketQueue::QueuedPacket::ReleasePacket() { + return packet_it_ ? std::move(**packet_it_) : nullptr; +} -bool RoundRobinPacketQueue::Packet::operator<( - const RoundRobinPacketQueue::Packet& other) const { - if (priority != other.priority) - return priority > other.priority; - if (retransmission != other.retransmission) - return other.retransmission; +void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs( + int64_t pause_time_sum_ms) { + enqueue_time_ms_ -= pause_time_sum_ms; +} - return enqueue_order > other.enqueue_order; +bool RoundRobinPacketQueue::QueuedPacket::operator<( + const RoundRobinPacketQueue::QueuedPacket& other) const { + if (priority_ != other.priority_) + return priority_ > other.priority_; + if (retransmission_ != other.retransmission_) + return other.retransmission_; + + return enqueue_order_ > other.enqueue_order_; } RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {} @@ -59,50 +76,41 @@ RoundRobinPacketQueue::RoundRobinPacketQueue(int64_t start_time_us) RoundRobinPacketQueue::~RoundRobinPacketQueue() {} -void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) { - Packet packet(packet_to_insert); - - auto stream_info_it = streams_.find(packet.ssrc); - if (stream_info_it == streams_.end()) { - stream_info_it = streams_.emplace(packet.ssrc, Stream()).first; - stream_info_it->second.priority_it = stream_priorities_.end(); - stream_info_it->second.ssrc = packet.ssrc; - } - - Stream* stream = &stream_info_it->second; - - if (stream->priority_it == stream_priorities_.end()) { - // If the SSRC is not currently scheduled, add it to |stream_priorities_|. - RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); - stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(packet.priority, stream->bytes), packet.ssrc); - } else if (packet.priority < stream->priority_it->first.priority) { - // If the priority of this SSRC increased, remove the outdated StreamPrioKey - // and insert a new one with the new priority. Note that - // RtpPacketSender::Priority uses lower ordinal for higher priority. - stream_priorities_.erase(stream->priority_it); - stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(packet.priority, stream->bytes), packet.ssrc); - } - RTC_CHECK(stream->priority_it != stream_priorities_.end()); - - packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms); - - // In order to figure out how much time a packet has spent in the queue while - // not in a paused state, we subtract the total amount of time the queue has - // been paused so far, and when the packet is poped we subtract the total - // amount of time the queue has been paused at that moment. This way we - // subtract the total amount of time the packet has spent in the queue while - // in a paused state. - UpdateQueueTime(packet.enqueue_time_ms); - packet.enqueue_time_ms -= pause_time_sum_ms_; - stream->packet_queue.push(packet); - - size_packets_ += 1; - size_bytes_ += packet.bytes; +void RoundRobinPacketQueue::Push(int priority, + RtpPacketToSend::Type type, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order) { + Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms, + enqueue_time_ms, length_in_bytes, retransmission, + enqueue_order, enqueue_times_.insert(enqueue_time_ms), + absl::nullopt)); } -const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() { +void RoundRobinPacketQueue::Push(int priority, + int64_t enqueue_time_ms, + uint64_t enqueue_order, + std::unique_ptr packet) { + uint32_t ssrc = packet->Ssrc(); + uint16_t sequence_number = packet->SequenceNumber(); + int64_t capture_time_ms = packet->capture_time_ms(); + size_t size_bytes = packet->payload_size(); + auto type = packet->packet_type(); + RTC_DCHECK(type.has_value()); + + rtp_packets_.push_front(std::move(packet)); + Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms, + enqueue_time_ms, size_bytes, + *type == RtpPacketToSend::Type::kRetransmission, + enqueue_order, enqueue_times_.insert(enqueue_time_ms), + rtp_packets_.begin())); +} + +RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() { RTC_CHECK(!pop_packet_ && !pop_stream_); Stream* stream = GetHighestPriorityStream(); @@ -110,22 +118,22 @@ const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() { pop_packet_.emplace(stream->packet_queue.top()); stream->packet_queue.pop(); - return *pop_packet_; + return &pop_packet_.value(); } -void RoundRobinPacketQueue::CancelPop(const Packet& packet) { +void RoundRobinPacketQueue::CancelPop() { RTC_CHECK(pop_packet_ && pop_stream_); (*pop_stream_)->packet_queue.push(*pop_packet_); pop_packet_.reset(); pop_stream_.reset(); } -void RoundRobinPacketQueue::FinalizePop(const Packet& packet) { +void RoundRobinPacketQueue::FinalizePop() { if (!Empty()) { RTC_CHECK(pop_packet_ && pop_stream_); Stream* stream = *pop_stream_; stream_priorities_.erase(stream->priority_it); - const Packet& packet = *pop_packet_; + const QueuedPacket& packet = *pop_packet_; // Calculate the total amount of time spent by this packet in the queue // while in a non-paused state. Note that the |pause_time_sum_ms_| was @@ -133,11 +141,16 @@ void RoundRobinPacketQueue::FinalizePop(const Packet& packet) { // by subtracting it now we effectively remove the time spent in in the // queue while in a paused state. int64_t time_in_non_paused_state_ms = - time_last_updated_ms_ - packet.enqueue_time_ms - pause_time_sum_ms_; + time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_; queue_time_sum_ms_ -= time_in_non_paused_state_ms; - RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end()); - enqueue_times_.erase(packet.enqueue_time_it); + RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end()); + enqueue_times_.erase(packet.EnqueueTimeIterator()); + + auto packet_it = packet.PacketIterator(); + if (packet_it) { + rtp_packets_.erase(*packet_it); + } // Update |bytes| of this stream. The general idea is that the stream that // has sent the least amount of bytes should have the highest priority. @@ -145,11 +158,11 @@ void RoundRobinPacketQueue::FinalizePop(const Packet& packet) { // case a "budget" will be built up for the stream sending at the lower // rate. To avoid building a too large budget we limit |bytes| to be within // kMaxLeading bytes of the stream that has sent the most amount of bytes. - stream->bytes = - std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes); + stream->bytes = std::max(stream->bytes + packet.size_in_bytes(), + max_bytes_ - kMaxLeadingBytes); max_bytes_ = std::max(max_bytes_, stream->bytes); - size_bytes_ -= packet.bytes; + size_bytes_ -= packet.size_in_bytes(); size_packets_ -= 1; RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0); @@ -158,7 +171,7 @@ void RoundRobinPacketQueue::FinalizePop(const Packet& packet) { if (stream->packet_queue.empty()) { stream->priority_it = stream_priorities_.end(); } else { - RtpPacketSender::Priority priority = stream->packet_queue.top().priority; + int priority = stream->packet_queue.top().priority(); stream->priority_it = stream_priorities_.emplace( StreamPrioKey(priority, stream->bytes), stream->ssrc); } @@ -218,6 +231,46 @@ int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const { return queue_time_sum_ms_ / size_packets_; } +void RoundRobinPacketQueue::Push(QueuedPacket packet) { + auto stream_info_it = streams_.find(packet.ssrc()); + if (stream_info_it == streams_.end()) { + stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first; + stream_info_it->second.priority_it = stream_priorities_.end(); + stream_info_it->second.ssrc = packet.ssrc(); + } + + Stream* stream = &stream_info_it->second; + + if (stream->priority_it == stream_priorities_.end()) { + // If the SSRC is not currently scheduled, add it to |stream_priorities_|. + RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); + stream->priority_it = stream_priorities_.emplace( + StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc()); + } else if (packet.priority() < stream->priority_it->first.priority) { + // If the priority of this SSRC increased, remove the outdated StreamPrioKey + // and insert a new one with the new priority. Note that |priority_| uses + // lower ordinal for higher priority. + stream_priorities_.erase(stream->priority_it); + stream->priority_it = stream_priorities_.emplace( + StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc()); + } + RTC_CHECK(stream->priority_it != stream_priorities_.end()); + + // In order to figure out how much time a packet has spent in the queue while + // not in a paused state, we subtract the total amount of time the queue has + // been paused so far, and when the packet is popped we subtract the total + // amount of time the queue has been paused at that moment. This way we + // subtract the total amount of time the packet has spent in the queue while + // in a paused state. + UpdateQueueTime(packet.enqueue_time_ms()); + packet.SubtractPauseTimeMs(pause_time_sum_ms_); + + size_packets_ += 1; + size_bytes_ += packet.size_in_bytes(); + + stream->packet_queue.push(packet); +} + RoundRobinPacketQueue::Stream* RoundRobinPacketQueue::GetHighestPriorityStream() { RTC_CHECK(!stream_priorities_.empty()); diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index 74b855a483..812ae87ded 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -15,11 +15,13 @@ #include #include #include +#include #include #include #include "absl/types/optional.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -29,36 +31,80 @@ class RoundRobinPacketQueue { explicit RoundRobinPacketQueue(int64_t start_time_us); ~RoundRobinPacketQueue(); - struct Packet { - Packet(RtpPacketSender::Priority priority, - uint32_t ssrc, - uint16_t seq_number, - int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, - bool retransmission, - uint64_t enqueue_order); - Packet(const Packet& other); - virtual ~Packet(); - bool operator<(const Packet& other) const; + struct QueuedPacket { + public: + QueuedPacket( + int priority, + RtpPacketToSend::Type type, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order, + std::multiset::iterator enqueue_time_it, + absl::optional>::iterator> + packet_it); + QueuedPacket(const QueuedPacket& rhs); + ~QueuedPacket(); - RtpPacketSender::Priority priority; - uint32_t ssrc; - uint16_t sequence_number; - int64_t capture_time_ms; // Absolute time of frame capture. - int64_t enqueue_time_ms; // Absolute time of pacer queue entry. - int64_t sum_paused_ms; - size_t bytes; - bool retransmission; - uint64_t enqueue_order; - std::list::iterator this_it; - std::multiset::iterator enqueue_time_it; + bool operator<(const QueuedPacket& other) const; + + int priority() const { return priority_; } + RtpPacketToSend::Type type() const { return type_; } + uint32_t ssrc() const { return ssrc_; } + uint16_t sequence_number() const { return sequence_number_; } + int64_t capture_time_ms() const { return capture_time_ms_; } + int64_t enqueue_time_ms() const { return enqueue_time_ms_; } + size_t size_in_bytes() const { return bytes_; } + bool is_retransmission() const { return retransmission_; } + uint64_t enqueue_order() const { return enqueue_order_; } + std::unique_ptr ReleasePacket(); + + // For internal use. + absl::optional>::iterator> + PacketIterator() const { + return packet_it_; + } + std::multiset::iterator EnqueueTimeIterator() const { + return enqueue_time_it_; + } + void SubtractPauseTimeMs(int64_t pause_time_sum_ms); + + private: + RtpPacketToSend::Type type_; + int priority_; + uint32_t ssrc_; + uint16_t sequence_number_; + int64_t capture_time_ms_; // Absolute time of frame capture. + int64_t enqueue_time_ms_; // Absolute time of pacer queue entry. + size_t bytes_; + bool retransmission_; + uint64_t enqueue_order_; + std::multiset::iterator enqueue_time_it_; + // Iterator into |rtp_packets_| where the memory for RtpPacket is owned, + // if applicable. + absl::optional>::iterator> + packet_it_; }; - void Push(const Packet& packet); - const Packet& BeginPop(); - void CancelPop(const Packet& packet); - void FinalizePop(const Packet& packet); + void Push(int priority, + RtpPacketToSend::Type type, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order); + void Push(int priority, + int64_t enqueue_time_ms, + uint64_t enqueue_order, + std::unique_ptr packet); + QueuedPacket* BeginPop(); + void CancelPop(); + void FinalizePop(); bool Empty() const; size_t SizeInPackets() const; @@ -71,7 +117,7 @@ class RoundRobinPacketQueue { private: struct StreamPrioKey { - StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes) + StreamPrioKey(int priority, int64_t bytes) : priority(priority), bytes(bytes) {} bool operator<(const StreamPrioKey& other) const { @@ -80,7 +126,7 @@ class RoundRobinPacketQueue { return bytes < other.bytes; } - const RtpPacketSender::Priority priority; + const int priority; const size_t bytes; }; @@ -92,7 +138,7 @@ class RoundRobinPacketQueue { size_t bytes; uint32_t ssrc; - std::priority_queue packet_queue; + std::priority_queue packet_queue; // Whenever a packet is inserted for this stream we check if |priority_it| // points to an element in |stream_priorities_|, and if it does it means @@ -104,13 +150,15 @@ class RoundRobinPacketQueue { static constexpr size_t kMaxLeadingBytes = 1400; + void Push(QueuedPacket packet); + Stream* GetHighestPriorityStream(); // Just used to verify correctness. bool IsSsrcScheduled(uint32_t ssrc) const; int64_t time_last_updated_ms_; - absl::optional pop_packet_; + absl::optional pop_packet_; absl::optional pop_stream_; bool paused_ = false; @@ -132,6 +180,12 @@ class RoundRobinPacketQueue { // The enqueue time of every packet currently in the queue. Used to figure out // the age of the oldest packet in the queue. std::multiset enqueue_times_; + + // List of RTP packets to be sent, not necessarily in the order they will be + // sent. PacketInfo.packet_it will point to an entry in this list, or the + // end iterator of this list if queue does not have direct ownership of the + // packet. + std::list> rtp_packets_; }; } // namespace webrtc diff --git a/modules/rtp_rtcp/include/rtp_packet_pacer.h b/modules/rtp_rtcp/include/rtp_packet_pacer.h index 9820fc2a46..180ddf735b 100644 --- a/modules/rtp_rtcp/include/rtp_packet_pacer.h +++ b/modules/rtp_rtcp/include/rtp_packet_pacer.h @@ -23,10 +23,10 @@ namespace webrtc { // TODO(bugs.webrtc.org/10633): Add things missing to this interface so that we // can use multiple different pacer implementations, and stop inheriting from // RtpPacketSender. -class RtpPacketPacer : RtpPacketSender { +class RtpPacketPacer : public RtpPacketSender { public: RtpPacketPacer() = default; - ~RtpPacketPacer() override; + ~RtpPacketPacer() override = default; // Insert packet into queue, for eventual transmission. Based on the type of // the packet, it will prioritized and scheduled relative to other packets and diff --git a/modules/rtp_rtcp/include/rtp_rtcp.h b/modules/rtp_rtcp/include/rtp_rtcp.h index 6734e6c04d..dbe6345a92 100644 --- a/modules/rtp_rtcp/include/rtp_rtcp.h +++ b/modules/rtp_rtcp/include/rtp_rtcp.h @@ -26,6 +26,7 @@ #include "modules/rtp_rtcp/include/receive_statistics.h" #include "modules/rtp_rtcp/include/report_block_data.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "modules/rtp_rtcp/source/rtp_sender.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/deprecation.h"