diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index 6e4efb0799..c8da0cd478 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -78,6 +78,11 @@ RtpPacketToSend* RoundRobinPacketQueue::QueuedPacket::RtpPacket() const { return owned_packet_; } +void RoundRobinPacketQueue::QueuedPacket::UpdateEnqueueTimeIterator( + std::multiset::iterator it) { + enqueue_time_it_ = it; +} + std::multiset::iterator RoundRobinPacketQueue::QueuedPacket::EnqueueTimeIterator() const { return enqueue_time_it_; @@ -134,11 +139,34 @@ void RoundRobinPacketQueue::Push(int priority, uint64_t enqueue_order, std::unique_ptr packet) { RTC_DCHECK(packet->packet_type().has_value()); - Push(QueuedPacket(priority, enqueue_time, enqueue_order, - enqueue_times_.insert(enqueue_time), std::move(packet))); + if (size_packets_ == 0) { + // Single packet fast-path. + single_packet_queue_.emplace( + QueuedPacket(priority, enqueue_time, enqueue_order, + enqueue_times_.end(), std::move(packet))); + UpdateQueueTime(enqueue_time); + single_packet_queue_->SubtractPauseTime(pause_time_sum_); + size_packets_ = 1; + size_ += PacketSize(*single_packet_queue_); + } else { + MaybePromoteSinglePacketToNormalQueue(); + Push(QueuedPacket(priority, enqueue_time, enqueue_order, + enqueue_times_.insert(enqueue_time), std::move(packet))); + } } std::unique_ptr RoundRobinPacketQueue::Pop() { + if (single_packet_queue_.has_value()) { + RTC_DCHECK(stream_priorities_.empty()); + std::unique_ptr rtp_packet( + single_packet_queue_->RtpPacket()); + single_packet_queue_.reset(); + queue_time_sum_ = TimeDelta::Zero(); + size_packets_ = 0; + size_ = DataSize::Zero(); + return rtp_packet; + } + RTC_DCHECK(!Empty()); Stream* stream = GetHighestPriorityStream(); const QueuedPacket& queued_packet = stream->packet_queue.top(); @@ -163,13 +191,7 @@ std::unique_ptr RoundRobinPacketQueue::Pop() { // case a "budget" will be built up for the stream sending at the lower // rate. To avoid building a too large budget we limit |bytes| to be within // kMaxLeading bytes of the stream that has sent the most amount of bytes. - DataSize packet_size = - DataSize::Bytes(queued_packet.RtpPacket()->payload_size() + - queued_packet.RtpPacket()->padding_size()); - if (include_overhead_) { - packet_size += DataSize::Bytes(queued_packet.RtpPacket()->headers_size()) + - transport_overhead_per_packet_; - } + DataSize packet_size = PacketSize(queued_packet); stream->size = std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize); max_size_ = std::max(max_size_, stream->size); @@ -195,9 +217,12 @@ std::unique_ptr RoundRobinPacketQueue::Pop() { } bool RoundRobinPacketQueue::Empty() const { - RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) || - (stream_priorities_.empty() && size_packets_ == 0)); - return stream_priorities_.empty(); + if (size_packets_ == 0) { + RTC_DCHECK(!single_packet_queue_.has_value() && stream_priorities_.empty()); + return true; + } + RTC_DCHECK(single_packet_queue_.has_value() || !stream_priorities_.empty()); + return false; } size_t RoundRobinPacketQueue::SizeInPackets() const { @@ -209,6 +234,10 @@ DataSize RoundRobinPacketQueue::Size() const { } bool RoundRobinPacketQueue::NextPacketIsAudio() const { + if (single_packet_queue_.has_value()) { + return single_packet_queue_->Type() == RtpPacketMediaType::kAudio; + } + if (stream_priorities_.empty()) { return false; } @@ -220,6 +249,10 @@ bool RoundRobinPacketQueue::NextPacketIsAudio() const { } Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const { + if (single_packet_queue_.has_value()) { + return single_packet_queue_->EnqueueTime(); + } + if (Empty()) return Timestamp::MinusInfinity(); RTC_CHECK(!enqueue_times_.empty()); @@ -250,6 +283,7 @@ void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) { } void RoundRobinPacketQueue::SetIncludeOverhead() { + MaybePromoteSinglePacketToNormalQueue(); include_overhead_ = true; // We need to update the size to reflect overhead for existing packets. for (const auto& stream : streams_) { @@ -261,6 +295,7 @@ void RoundRobinPacketQueue::SetIncludeOverhead() { } void RoundRobinPacketQueue::SetTransportOverhead(DataSize overhead_per_packet) { + MaybePromoteSinglePacketToNormalQueue(); if (include_overhead_) { DataSize previous_overhead = transport_overhead_per_packet_; // We need to update the size to reflect overhead for existing packets. @@ -304,26 +339,44 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) { } RTC_CHECK(stream->priority_it != stream_priorities_.end()); - // In order to figure out how much time a packet has spent in the queue while - // not in a paused state, we subtract the total amount of time the queue has - // been paused so far, and when the packet is popped we subtract the total - // amount of time the queue has been paused at that moment. This way we - // subtract the total amount of time the packet has spent in the queue while - // in a paused state. - UpdateQueueTime(packet.EnqueueTime()); - packet.SubtractPauseTime(pause_time_sum_); + if (packet.EnqueueTimeIterator() == enqueue_times_.end()) { + // Promotion from single-packet queue. Just add to enqueue times. + packet.UpdateEnqueueTimeIterator( + enqueue_times_.insert(packet.EnqueueTime())); + } else { + // In order to figure out how much time a packet has spent in the queue + // while not in a paused state, we subtract the total amount of time the + // queue has been paused so far, and when the packet is popped we subtract + // the total amount of time the queue has been paused at that moment. This + // way we subtract the total amount of time the packet has spent in the + // queue while in a paused state. + UpdateQueueTime(packet.EnqueueTime()); + packet.SubtractPauseTime(pause_time_sum_); - size_packets_ += 1; - size_ += DataSize::Bytes(packet.RtpPacket()->payload_size() + - packet.RtpPacket()->padding_size()); - if (include_overhead_) { - size_ += DataSize::Bytes(packet.RtpPacket()->headers_size()) + - transport_overhead_per_packet_; + size_packets_ += 1; + size_ += PacketSize(packet); } stream->packet_queue.push(packet); } +DataSize RoundRobinPacketQueue::PacketSize(const QueuedPacket& packet) const { + DataSize packet_size = DataSize::Bytes(packet.RtpPacket()->payload_size() + + packet.RtpPacket()->padding_size()); + if (include_overhead_) { + packet_size += DataSize::Bytes(packet.RtpPacket()->headers_size()) + + transport_overhead_per_packet_; + } + return packet_size; +} + +void RoundRobinPacketQueue::MaybePromoteSinglePacketToNormalQueue() { + if (single_packet_queue_.has_value()) { + Push(*single_packet_queue_); + single_packet_queue_.reset(); + } +} + RoundRobinPacketQueue::Stream* RoundRobinPacketQueue::GetHighestPriorityStream() { RTC_CHECK(!stream_priorities_.empty()); diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index c256679f7b..8e85347352 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -77,6 +77,7 @@ class RoundRobinPacketQueue { RtpPacketToSend* RtpPacket() const; std::multiset::iterator EnqueueTimeIterator() const; + void UpdateEnqueueTimeIterator(std::multiset::iterator it); void SubtractPauseTime(TimeDelta pause_time_sum); private: @@ -132,6 +133,9 @@ class RoundRobinPacketQueue { void Push(QueuedPacket packet); + DataSize PacketSize(const QueuedPacket& packet) const; + void MaybePromoteSinglePacketToNormalQueue(); + Stream* GetHighestPriorityStream(); // Just used to verify correctness. @@ -161,6 +165,8 @@ class RoundRobinPacketQueue { // the age of the oldest packet in the queue. std::multiset enqueue_times_; + absl::optional single_packet_queue_; + bool include_overhead_; }; } // namespace webrtc