From d2a634447f42d6856656a9fcdb65d5845b736941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Fri, 3 May 2019 10:58:50 -0400 Subject: [PATCH] RtpPacketHistory: StoreAndCull default on, support ack removals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for potentially out-of-order removals of packets, using a vector of sequence numbers that have been acknowledges as received. Additionally, make kStoreAndCull storage method by default with a field-trial kill-switch if things go wrong unexpectedly. Bug: webrtc:8975 Change-Id: I6da8b92d85fc362c12db82976f115626cb1d32d4 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/134307 Reviewed-by: Åsa Persson Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/master@{#27850} --- modules/rtp_rtcp/source/rtp_packet_history.cc | 44 +++++-- modules/rtp_rtcp/source/rtp_packet_history.h | 4 + .../source/rtp_packet_history_unittest.cc | 110 +++++++++++++----- modules/rtp_rtcp/source/rtp_sender.cc | 22 +++- modules/rtp_rtcp/source/rtp_sender.h | 1 + 5 files changed, 140 insertions(+), 41 deletions(-) diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc index ac39e22391..b2dacc270d 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history.cc @@ -79,6 +79,10 @@ void RtpPacketHistory::SetRtt(int64_t rtt_ms) { rtc::CritScope cs(&lock_); RTC_DCHECK_GE(rtt_ms, 0); rtt_ms_ = rtt_ms; + // If kStoreAndCull mode is used, packets will be removed after a timeout + // that depends on the RTT. Changing the RTT may thus cause some packets + // become "old" and subject to removal. + CullOldPackets(clock_->TimeInMilliseconds()); } void RtpPacketHistory::PutRtpPacket(std::unique_ptr packet, @@ -232,6 +236,19 @@ std::unique_ptr RtpPacketHistory::GetBestFittingPacket( return absl::make_unique(*best_packet); } +void RtpPacketHistory::CullAcknowledgedPackets( + rtc::ArrayView sequence_numbers) { + rtc::CritScope cs(&lock_); + if (mode_ == StorageMode::kStoreAndCull) { + for (uint16_t sequence_number : sequence_numbers) { + auto stored_packet_it = packet_history_.find(sequence_number); + if (stored_packet_it != packet_history_.end()) { + RemovePacket(stored_packet_it); + } + } + } +} + void RtpPacketHistory::Reset() { packet_history_.clear(); packet_size_.clear(); @@ -283,20 +300,27 @@ std::unique_ptr RtpPacketHistory::RemovePacket( // Move the packet out from the StoredPacket container. std::unique_ptr rtp_packet = std::move(packet_it->second.packet); + + // Check if this is the oldest packet in the history, as this must be updated + // in order to cull old packets. + const bool is_first_packet = packet_it->first == start_seqno_; + // Erase the packet from the map, and capture iterator to the next one. StoredPacketIterator next_it = packet_history_.erase(packet_it); - // |next_it| now points to the next element, or to the end. If the end, - // check if we can wrap around. - if (next_it == packet_history_.end()) { - next_it = packet_history_.begin(); - } + if (is_first_packet) { + // |next_it| now points to the next element, or to the end. If the end, + // check if we can wrap around. + if (next_it == packet_history_.end()) { + next_it = packet_history_.begin(); + } - // Update |start_seq_no| to the new oldest item. - if (next_it != packet_history_.end()) { - start_seqno_ = next_it->first; - } else { - start_seqno_.reset(); + // Update |start_seq_no| to the new oldest item. + if (next_it != packet_history_.end()) { + start_seqno_ = next_it->first; + } else { + start_seqno_.reset(); + } } auto size_iterator = packet_size_.find(rtp_packet->size()); diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h index 5e6463ba30..cf87ddf205 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.h +++ b/modules/rtp_rtcp/source/rtp_packet_history.h @@ -89,6 +89,9 @@ class RtpPacketHistory { std::unique_ptr GetBestFittingPacket( size_t packet_size) const; + // Cull packets that have been acknowledged as received by the remote end. + void CullAcknowledgedPackets(rtc::ArrayView sequence_numbers); + private: struct StoredPacket { StoredPacket(); @@ -133,6 +136,7 @@ class RtpPacketHistory { // Map from rtp sequence numbers to stored packet. std::map packet_history_ RTC_GUARDED_BY(lock_); + // Map from packet size to sequence number. std::map packet_size_ RTC_GUARDED_BY(lock_); // The earliest packet in the history. This might not be the lowest sequence diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc index afa158c0ce..c10fbb623f 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc @@ -82,9 +82,9 @@ TEST_F(RtpPacketHistoryTest, StartSeqResetAfterReset) { EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); // Add a new packet. - hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission, - absl::nullopt); - EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1)); + hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)), + kAllowRetransmission, absl::nullopt); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); // Advance time past where packet expires. fake_clock_.AdvanceTimeMilliseconds( @@ -95,7 +95,7 @@ TEST_F(RtpPacketHistoryTest, StartSeqResetAfterReset) { hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 2)), kAllowRetransmission, absl::nullopt); EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); - EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1)); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2))); } @@ -282,7 +282,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestSentPacketWhenAtMaxSize) { // Add packets until the buffer is full. for (size_t i = 0; i < kMaxNumPackets; ++i) { - std::unique_ptr packet = CreateRtpPacket(kStartSeqNum + i); + std::unique_ptr packet = + CreateRtpPacket(To16u(kStartSeqNum + i)); // Immediate mark packet as sent. hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); @@ -300,7 +301,7 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestSentPacketWhenAtMaxSize) { // Oldest packet should be gone, but packet after than one still present. EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); - EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1)); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); } TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) { @@ -312,7 +313,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) { // Add packets until the buffer is full. for (size_t i = 0; i < kMaxNumPackets; ++i) { - std::unique_ptr packet = CreateRtpPacket(kStartSeqNum + i); + std::unique_ptr packet = + CreateRtpPacket(To16u(kStartSeqNum + i)); // Don't mark packets as sent, preventing them from being removed. hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, absl::nullopt); } @@ -328,7 +330,7 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) { // Oldest packet should be gone, but packet after than one still present. EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); - EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1)); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); } TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) { @@ -361,7 +363,7 @@ TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) { hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + kMaxNumPackets + 1)), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); - EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum + 1)); + EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2))); } @@ -376,8 +378,8 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPackets) { 1); // Add a new packet to trigger culling. - hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission, - fake_clock_.TimeInMilliseconds()); + hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)), + kAllowRetransmission, fake_clock_.TimeInMilliseconds()); // First packet should still be there. EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum)); @@ -387,7 +389,7 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPackets) { kAllowRetransmission, fake_clock_.TimeInMilliseconds()); // First packet should no be gone, but next one still there. EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); - EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1)); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); } TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPacketsHighRtt) { @@ -405,8 +407,8 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPacketsHighRtt) { fake_clock_.AdvanceTimeMilliseconds(kPacketTimeoutMs - 1); // Add a new packet to trigger culling. - hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission, - fake_clock_.TimeInMilliseconds()); + hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)), + kAllowRetransmission, fake_clock_.TimeInMilliseconds()); // First packet should still be there. EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum)); @@ -416,7 +418,7 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPacketsHighRtt) { kAllowRetransmission, fake_clock_.TimeInMilliseconds()); // First packet should no be gone, but next one still there. EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); - EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1)); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); } TEST_F(RtpPacketHistoryTest, RemovesOldWithCulling) { @@ -436,8 +438,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldWithCulling) { // Advance to where packet can be culled, even if buffer is not full. fake_clock_.AdvanceTimeMilliseconds(1); - hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission, - fake_clock_.TimeInMilliseconds()); + hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)), + kAllowRetransmission, fake_clock_.TimeInMilliseconds()); EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); } @@ -462,8 +464,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldWithCullingHighRtt) { // Advance to where packet can be culled, even if buffer is not full. fake_clock_.AdvanceTimeMilliseconds(1); - hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission, - fake_clock_.TimeInMilliseconds()); + hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)), + kAllowRetransmission, fake_clock_.TimeInMilliseconds()); EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum)); } @@ -478,7 +480,7 @@ TEST_F(RtpPacketHistoryTest, GetBestFittingPacket) { const size_t target_packet_size = packet->size(); hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); - packet = CreateRtpPacket(kStartSeqNum + 1); + packet = CreateRtpPacket(To16u(kStartSeqNum + 1)); packet->SetPayloadSize(kTargetSize - 1); hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); @@ -510,7 +512,7 @@ TEST_F(RtpPacketHistoryTest, RtpPacketHistory::kPacketCullingDelayFactor * RtpPacketHistory::kMinPacketDurationMs); - packet = CreateRtpPacket(kStartSeqNum + 1); + packet = CreateRtpPacket(To16u(kStartSeqNum + 1)); packet->SetPayloadSize(100); hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); @@ -518,7 +520,7 @@ TEST_F(RtpPacketHistoryTest, auto best_packet = hist_.GetBestFittingPacket(target_packet_size + 2); ASSERT_THAT(best_packet, ::testing::NotNull()); - EXPECT_EQ(best_packet->SequenceNumber(), kStartSeqNum + 1); + EXPECT_EQ(best_packet->SequenceNumber(), To16u(kStartSeqNum + 1)); } TEST_F(RtpPacketHistoryTest, GetBestFittingPacketReturnLastPacketWhenSameSize) { @@ -530,14 +532,14 @@ TEST_F(RtpPacketHistoryTest, GetBestFittingPacketReturnLastPacketWhenSameSize) { packet->SetPayloadSize(kTargetSize); hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); - packet = CreateRtpPacket(kStartSeqNum + 1); + packet = CreateRtpPacket(To16u(kStartSeqNum + 1)); packet->SetPayloadSize(kTargetSize); hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); auto best_packet = hist_.GetBestFittingPacket(123); ASSERT_THAT(best_packet, ::testing::NotNull()); - EXPECT_EQ(best_packet->SequenceNumber(), kStartSeqNum + 1); + EXPECT_EQ(best_packet->SequenceNumber(), To16u(kStartSeqNum + 1)); } TEST_F(RtpPacketHistoryTest, @@ -551,7 +553,7 @@ TEST_F(RtpPacketHistoryTest, hist_.PutRtpPacket(std::move(small_packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); - auto large_packet = CreateRtpPacket(kStartSeqNum + 1); + auto large_packet = CreateRtpPacket(To16u(kStartSeqNum + 1)); large_packet->SetPayloadSize(kTargetSize * 2); hist_.PutRtpPacket(std::move(large_packet), kAllowRetransmission, fake_clock_.TimeInMilliseconds()); @@ -563,7 +565,7 @@ TEST_F(RtpPacketHistoryTest, ASSERT_THAT(hist_.GetBestFittingPacket(kTargetSize * 2), ::testing::NotNull()); EXPECT_EQ(hist_.GetBestFittingPacket(kTargetSize * 2)->SequenceNumber(), - kStartSeqNum + 1); + To16u(kStartSeqNum + 1)); } TEST_F(RtpPacketHistoryTest, @@ -578,4 +580,60 @@ TEST_F(RtpPacketHistoryTest, ::testing::NotNull()); } +TEST_F(RtpPacketHistoryTest, CullWithAcks) { + const int64_t kPacketLifetime = RtpPacketHistory::kMinPacketDurationMs * + RtpPacketHistory::kPacketCullingDelayFactor; + + const int64_t start_time = fake_clock_.TimeInMilliseconds(); + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 10); + + // Insert three packets 33ms apart, immediately mark them as sent. + std::unique_ptr packet = CreateRtpPacket(kStartSeqNum); + packet->SetPayloadSize(50); + hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + hist_.GetPacketAndSetSendTime(kStartSeqNum); + fake_clock_.AdvanceTimeMilliseconds(33); + packet = CreateRtpPacket(To16u(kStartSeqNum + 1)); + packet->SetPayloadSize(50); + hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + hist_.GetPacketAndSetSendTime(To16u(kStartSeqNum + 1)); + fake_clock_.AdvanceTimeMilliseconds(33); + packet = CreateRtpPacket(To16u(kStartSeqNum + 2)); + packet->SetPayloadSize(50); + hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + hist_.GetPacketAndSetSendTime(To16u(kStartSeqNum + 2)); + + EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum).has_value()); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value()); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value()); + + // Remove middle one using ack, check that only that one is gone. + std::vector acked_sequence_numbers = {To16u(kStartSeqNum + 1)}; + hist_.CullAcknowledgedPackets(acked_sequence_numbers); + + EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum).has_value()); + EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value()); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value()); + + // Advance time to where second packet would have expired, verify first packet + // is removed. + int64_t second_packet_expiry_time = start_time + kPacketLifetime + 33 + 1; + fake_clock_.AdvanceTimeMilliseconds(second_packet_expiry_time - + fake_clock_.TimeInMilliseconds()); + hist_.SetRtt(1); // Trigger culling of old packets. + EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value()); + EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value()); + EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value()); + + // Advance to where last packet expires, verify all gone. + fake_clock_.AdvanceTimeMilliseconds(33); + hist_.SetRtt(1); // Trigger culling of old packets. + EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value()); + EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value()); + EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value()); +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender.cc b/modules/rtp_rtcp/source/rtp_sender.cc index 2e198fc999..bc672c6d07 100644 --- a/modules/rtp_rtcp/source/rtp_sender.cc +++ b/modules/rtp_rtcp/source/rtp_sender.cc @@ -149,6 +149,9 @@ RTPSender::RTPSender( populate_network2_timestamp_(populate_network2_timestamp), send_side_bwe_with_overhead_( field_trials.Lookup("WebRTC-SendSideBwe-WithOverhead") + .find("Enabled") == 0), + legacy_packet_history_storage_mode_( + field_trials.Lookup("WebRTC-UseRtpPacketHistoryLegacyStorageMode") .find("Enabled") == 0) { // This random initialization is not intended to be cryptographic strong. timestamp_offset_ = random_.Rand(); @@ -159,9 +162,13 @@ RTPSender::RTPSender( // Store FlexFEC packets in the packet history data structure, so they can // be found when paced. if (flexfec_ssrc_) { + RtpPacketHistory::StorageMode storage_mode = + legacy_packet_history_storage_mode_ + ? RtpPacketHistory::StorageMode::kStore + : RtpPacketHistory::StorageMode::kStoreAndCull; + flexfec_packet_history_.SetStorePacketsStatus( - RtpPacketHistory::StorageMode::kStore, - kMinFlexfecPacketsToStoreForPacing); + storage_mode, kMinFlexfecPacketsToStoreForPacing); } } @@ -423,9 +430,14 @@ size_t RTPSender::SendPadData(size_t bytes, } void RTPSender::SetStorePacketsStatus(bool enable, uint16_t number_to_store) { - RtpPacketHistory::StorageMode mode = - enable ? RtpPacketHistory::StorageMode::kStore - : RtpPacketHistory::StorageMode::kDisabled; + RtpPacketHistory::StorageMode mode; + if (enable) { + mode = legacy_packet_history_storage_mode_ + ? RtpPacketHistory::StorageMode::kStore + : RtpPacketHistory::StorageMode::kStoreAndCull; + } else { + mode = RtpPacketHistory::StorageMode::kDisabled; + } packet_history_.SetStorePacketsStatus(mode, number_to_store); } diff --git a/modules/rtp_rtcp/source/rtp_sender.h b/modules/rtp_rtcp/source/rtp_sender.h index 4d7e72b7b7..eb0dbc40fc 100644 --- a/modules/rtp_rtcp/source/rtp_sender.h +++ b/modules/rtp_rtcp/source/rtp_sender.h @@ -292,6 +292,7 @@ class RTPSender { const bool populate_network2_timestamp_; const bool send_side_bwe_with_overhead_; + const bool legacy_packet_history_storage_mode_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RTPSender); };