diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc index 85689f9637..b369f402fe 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history.cc @@ -23,6 +23,7 @@ namespace webrtc { constexpr size_t RtpPacketHistory::kMaxCapacity; +constexpr size_t RtpPacketHistory::kMaxPaddingtHistory; constexpr int64_t RtpPacketHistory::kMinPacketDurationMs; constexpr int RtpPacketHistory::kMinPacketDurationRtt; constexpr int RtpPacketHistory::kPacketCullingDelayFactor; @@ -130,18 +131,28 @@ void RtpPacketHistory::PutRtpPacket(std::unique_ptr packet, // Store packet. const uint16_t rtp_seq_no = packet->SequenceNumber(); - auto packet_it = packet_history_.emplace( - rtp_seq_no, - StoredPacket(std::move(packet), send_time_ms, packets_inserted_++)); - RTC_DCHECK(packet_it.second) << "Failed to insert packet in history."; - StoredPacket& stored_packet = packet_it.first->second; + int packet_index = GetPacketIndex(rtp_seq_no); + RTC_DCHECK_GE(packet_index, 0) << "Out-of-order inserts not supported."; + size_t index = packet_index; - if (!start_seqno_) { - start_seqno_ = rtp_seq_no; + while (packet_history_.size() < index) { + packet_history_.emplace_back(nullptr, absl::nullopt, 0); + } + RTC_DCHECK(packet_history_.size() == index || + packet_history_[index].packet_ == nullptr); + + if (packet_history_.size() <= index) { + packet_history_.emplace_back(std::move(packet), send_time_ms, + packets_inserted_++); + } else { + packet_history_[packet_index] = + StoredPacket(std::move(packet), send_time_ms, packets_inserted_++); } - // Store the sequence number of the last send packet with this size. - auto prio_it = padding_priority_.insert(&stored_packet); + if (padding_priority_.size() >= kMaxPaddingtHistory - 1) { + padding_priority_.erase(std::prev(padding_priority_.end())); + } + auto prio_it = padding_priority_.insert(&packet_history_[packet_index]); RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set."; } @@ -152,27 +163,26 @@ std::unique_ptr RtpPacketHistory::GetPacketAndSetSendTime( return nullptr; } + StoredPacket* packet = GetStoredPacket(sequence_number); + if (packet == nullptr) { + return nullptr; + } + int64_t now_ms = clock_->TimeInMilliseconds(); - StoredPacketIterator rtp_it = packet_history_.find(sequence_number); - if (rtp_it == packet_history_.end()) { + if (!VerifyRtt(*packet, now_ms)) { return nullptr; } - StoredPacket& packet = rtp_it->second; - if (!VerifyRtt(rtp_it->second, now_ms)) { - return nullptr; - } - - if (packet.send_time_ms_) { - packet.IncrementTimesRetransmitted(&padding_priority_); + if (packet->send_time_ms_) { + packet->IncrementTimesRetransmitted(&padding_priority_); } // Update send-time and mark as no long in pacer queue. - packet.send_time_ms_ = now_ms; - packet.pending_transmission_ = false; + packet->send_time_ms_ = now_ms; + packet->pending_transmission_ = false; - // Return copy of packet instance since it may need to be retransmitted again. - return absl::make_unique(*packet.packet_); + // Return copy of packet instance since it may need to be retransmitted. + return absl::make_unique(*packet->packet_); } std::unique_ptr RtpPacketHistory::GetPacketAndMarkAsPending( @@ -192,29 +202,26 @@ std::unique_ptr RtpPacketHistory::GetPacketAndMarkAsPending( return nullptr; } - int64_t now_ms = clock_->TimeInMilliseconds(); - StoredPacketIterator rtp_it = packet_history_.find(sequence_number); - if (rtp_it == packet_history_.end()) { + StoredPacket* packet = GetStoredPacket(sequence_number); + if (packet == nullptr) { return nullptr; } - StoredPacket& packet = rtp_it->second; - - if (packet.pending_transmission_) { + if (packet->pending_transmission_) { // Packet already in pacer queue, ignore this request. return nullptr; } - if (!VerifyRtt(rtp_it->second, now_ms)) { + if (!VerifyRtt(*packet, clock_->TimeInMilliseconds())) { // Packet already resent within too short a time window, ignore. return nullptr; } // Copy and/or encapsulate packet. std::unique_ptr encapsulated_packet = - encapsulate(*packet.packet_); + encapsulate(*packet->packet_); if (encapsulated_packet) { - packet.pending_transmission_ = true; + packet->pending_transmission_ = true; } return encapsulated_packet; @@ -226,20 +233,18 @@ void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) { return; } - int64_t now_ms = clock_->TimeInMilliseconds(); - StoredPacketIterator rtp_it = packet_history_.find(sequence_number); - if (rtp_it == packet_history_.end()) { + StoredPacket* packet = GetStoredPacket(sequence_number); + if (packet == nullptr) { return; } - StoredPacket& packet = rtp_it->second; - RTC_DCHECK(packet.send_time_ms_); + RTC_DCHECK(packet->send_time_ms_); // Update send-time, mark as no longer in pacer queue, and increment // transmission count. - packet.send_time_ms_ = now_ms; - packet.pending_transmission_ = false; - packet.IncrementTimesRetransmitted(&padding_priority_); + packet->send_time_ms_ = clock_->TimeInMilliseconds(); + packet->pending_transmission_ = false; + packet->IncrementTimesRetransmitted(&padding_priority_); } absl::optional RtpPacketHistory::GetPacketState( @@ -249,16 +254,21 @@ absl::optional RtpPacketHistory::GetPacketState( return absl::nullopt; } - auto rtp_it = packet_history_.find(sequence_number); - if (rtp_it == packet_history_.end()) { + int packet_index = GetPacketIndex(sequence_number); + if (packet_index < 0 || + static_cast(packet_index) >= packet_history_.size()) { + return absl::nullopt; + } + const StoredPacket& packet = packet_history_[packet_index]; + if (packet.packet_ == nullptr) { return absl::nullopt; } - if (!VerifyRtt(rtp_it->second, clock_->TimeInMilliseconds())) { + if (!VerifyRtt(packet, clock_->TimeInMilliseconds())) { return absl::nullopt; } - return StoredPacketToPacketState(rtp_it->second); + return StoredPacketToPacketState(packet); } bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet, @@ -317,15 +327,13 @@ std::unique_ptr RtpPacketHistory::GetPayloadPaddingPacket( void RtpPacketHistory::CullAcknowledgedPackets( rtc::ArrayView sequence_numbers) { rtc::CritScope cs(&lock_); - if (mode_ == StorageMode::kDisabled) { - return; - } - for (uint16_t sequence_number : sequence_numbers) { - auto stored_packet_it = packet_history_.find(sequence_number); - if (stored_packet_it != packet_history_.end()) { - RemovePacket(stored_packet_it); + int packet_index = GetPacketIndex(sequence_number); + if (packet_index < 0 || + static_cast(packet_index) >= packet_history_.size()) { + continue; } + RemovePacket(packet_index); } } @@ -335,12 +343,12 @@ bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) { return false; } - auto rtp_it = packet_history_.find(sequence_number); - if (rtp_it == packet_history_.end()) { + StoredPacket* packet = GetStoredPacket(sequence_number); + if (packet == nullptr) { return false; } - rtp_it->second.pending_transmission_ = true; + packet->pending_transmission_ = true; return true; } @@ -352,25 +360,21 @@ void RtpPacketHistory::Clear() { void RtpPacketHistory::Reset() { packet_history_.clear(); padding_priority_.clear(); - start_seqno_.reset(); } void RtpPacketHistory::CullOldPackets(int64_t now_ms) { int64_t packet_duration_ms = std::max(kMinPacketDurationRtt * rtt_ms_, kMinPacketDurationMs); while (!packet_history_.empty()) { - auto stored_packet_it = packet_history_.find(*start_seqno_); - RTC_DCHECK(stored_packet_it != packet_history_.end()); - if (packet_history_.size() >= kMaxCapacity) { // We have reached the absolute max capacity, remove one packet // unconditionally. - RemovePacket(stored_packet_it); + RemovePacket(0); continue; } - const StoredPacket& stored_packet = stored_packet_it->second; - if (stored_packet_it->second.pending_transmission_) { + const StoredPacket& stored_packet = packet_history_.front(); + if (stored_packet.pending_transmission_) { // Don't remove packets in the pacer queue, pending tranmission. return; } @@ -386,7 +390,7 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) { now_ms) { // Too many packets in history, or this packet has timed out. Remove it // and continue. - RemovePacket(stored_packet_it); + RemovePacket(0); } else { // No more packets can be removed right now. return; @@ -395,46 +399,57 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) { } std::unique_ptr RtpPacketHistory::RemovePacket( - StoredPacketIterator packet_it) { + int packet_index) { // Move the packet out from the StoredPacket container. std::unique_ptr rtp_packet = - std::move(packet_it->second.packet_); - - // Check if this is the oldest packet in the history, as this must be updated - // in order to cull old packets. - const bool is_first_packet = packet_it->first == start_seqno_; + std::move(packet_history_[packet_index].packet_); // Erase from padding priority set, if eligible. - size_t num_erased = padding_priority_.erase(&packet_it->second); - RTC_DCHECK_EQ(num_erased, 1) - << "Failed to remove one packet from prio set, got " << num_erased; - if (num_erased != 1) { - RTC_LOG(LS_ERROR) << "RtpPacketHistory in inconsistent state, resetting."; - Reset(); - return nullptr; - } + padding_priority_.erase(&packet_history_[packet_index]); - // Erase the packet from the map, and capture iterator to the next one. - StoredPacketIterator next_it = packet_history_.erase(packet_it); - - if (is_first_packet) { - // |next_it| now points to the next element, or to the end. If the end, - // check if we can wrap around. - if (next_it == packet_history_.end()) { - next_it = packet_history_.begin(); - } - - // Update |start_seq_no| to the new oldest item. - if (next_it != packet_history_.end()) { - start_seqno_ = next_it->first; - } else { - start_seqno_.reset(); + if (packet_index == 0) { + while (!packet_history_.empty() && + packet_history_.front().packet_ == nullptr) { + packet_history_.pop_front(); } } return rtp_packet; } +int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const { + if (packet_history_.empty()) { + return 0; + } + + RTC_DCHECK(packet_history_.front().packet_ != nullptr); + int first_seq = packet_history_.front().packet_->SequenceNumber(); + if (first_seq == sequence_number) { + return 0; + } + + if (IsNewerSequenceNumber(sequence_number, first_seq)) { + // New packet is ahead of start of list. Find the delta. + int packet_index = sequence_number - first_seq; + if (packet_index < 0) { + // A wrap-around has occurred, unwrap to get a valid index. + packet_index += 1 << 16; + } + return packet_index; + } + + return -1; +} + +RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket( + uint16_t sequence_number) { + int index = GetPacketIndex(sequence_number); + if (index < 0 || static_cast(index) >= packet_history_.size()) { + return nullptr; + } + return &packet_history_[index]; +} + RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState( const RtpPacketHistory::StoredPacket& stored_packet) { RtpPacketHistory::PacketState state; diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h index 4850c7538c..9253ede4fa 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.h +++ b/modules/rtp_rtcp/source/rtp_packet_history.h @@ -11,6 +11,7 @@ #ifndef MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_ #define MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_ +#include #include #include #include @@ -53,6 +54,8 @@ class RtpPacketHistory { // Maximum number of packets we ever allow in the history. static constexpr size_t kMaxCapacity = 9600; + // Maximum number of entries in prioritized queue of padding packets. + static constexpr size_t kMaxPaddingtHistory = 63; // Don't remove packets within max(1000ms, 3x RTT). static constexpr int64_t kMinPacketDurationMs = 1000; static constexpr int kMinPacketDurationRtt = 3; @@ -171,8 +174,6 @@ class RtpPacketHistory { bool operator()(StoredPacket* lhs, StoredPacket* rhs) const; }; - using StoredPacketIterator = std::map::iterator; - // Helper method used by GetPacketAndSetSendTime() and GetPacketState() to // check if packet has too recently been sent. bool VerifyRtt(const StoredPacket& packet, int64_t now_ms) const @@ -181,7 +182,11 @@ class RtpPacketHistory { void CullOldPackets(int64_t now_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); // Removes the packet from the history, and context/mapping that has been // stored. Returns the RTP packet instance contained within the StoredPacket. - std::unique_ptr RemovePacket(StoredPacketIterator packet) + std::unique_ptr RemovePacket(int packet_index) + RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + int GetPacketIndex(uint16_t sequence_number) const + RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + StoredPacket* GetStoredPacket(uint16_t sequence_number) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); static PacketState StoredPacketToPacketState( const StoredPacket& stored_packet); @@ -192,8 +197,13 @@ class RtpPacketHistory { StorageMode mode_ RTC_GUARDED_BY(lock_); int64_t rtt_ms_ RTC_GUARDED_BY(lock_); - // Map from rtp sequence numbers to stored packet. - std::map packet_history_ RTC_GUARDED_BY(lock_); + // Queue of stored packets, ordered by sequence number, with older packets in + // the front and new packets being added to the back. Note that there may be + // wrap-arounds so the back may have a lower sequence number. + // Packets may also be removed out-of-order, in which case there will be + // instances of StoredPacket with |packet_| set to nullptr. The first and last + // entry in the queue will however always be populated. + std::deque packet_history_ RTC_GUARDED_BY(lock_); // Total number of packets with inserted. uint64_t packets_inserted_ RTC_GUARDED_BY(lock_); @@ -201,10 +211,6 @@ class RtpPacketHistory { // in GetPayloadPaddingPacket(). PacketPrioritySet padding_priority_ RTC_GUARDED_BY(lock_); - // The earliest packet in the history. This might not be the lowest sequence - // number, in case there is a wraparound. - absl::optional start_seqno_ RTC_GUARDED_BY(lock_); - RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtpPacketHistory); }; } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc index 0523ed2ba9..242af16ed8 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc @@ -291,6 +291,38 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) { EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); } +TEST_F(RtpPacketHistoryTest, RemovesLowestPrioPaddingWhenAtMaxCapacity) { + // Tests the absolute upper bound on number of packets in the prioritized + // set of potential padding packets. + const size_t kMaxNumPackets = RtpPacketHistory::kMaxPaddingtHistory; + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets * 2); + hist_.SetRtt(1); + + // Add packets until the max is reached, and then yet another one. + for (size_t i = 0; i < kMaxNumPackets + 1; ++i) { + std::unique_ptr packet = + CreateRtpPacket(To16u(kStartSeqNum + i)); + // Don't mark packets as sent, preventing them from being removed. + hist_.PutRtpPacket(std::move(packet), fake_clock_.TimeInMilliseconds()); + } + + // Advance time to allow retransmission/padding. + fake_clock_.AdvanceTimeMilliseconds(1); + + // The oldest packet will be least prioritized and has fallen out of the + // priority set. + for (size_t i = kMaxNumPackets - 1; i > 0; --i) { + auto packet = hist_.GetPayloadPaddingPacket(); + ASSERT_TRUE(packet); + EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + i + 1)); + } + + // Wrap around to newest padding packet again. + auto packet = hist_.GetPayloadPaddingPacket(); + ASSERT_TRUE(packet); + EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + kMaxNumPackets)); +} + TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) { const size_t kMaxNumPackets = 10; hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets);