From 0f4f055ca6e0cab863a82c9d7113cfd5d585631d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Wed, 8 May 2019 10:15:05 -0700 Subject: [PATCH] Don't remove or retransmit packets in the pacer queue. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main purpose right now of this CL is to avoid the situation where multiple retransmissions are queued for sending (normally after network glitch with increased pacer queue length), and some of those fail sending because the can't be retrieved from the packet history due to too short time since last sent. Bug: webrtc:8975, webrtc:10607 Change-Id: I9f6369d83f0b8208e5f57b2dc2fd3f2db7c6fea1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/135164 Commit-Queue: Erik Språng Reviewed-by: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#27884} --- modules/rtp_rtcp/source/rtp_packet_history.cc | 29 ++++++- modules/rtp_rtcp/source/rtp_packet_history.h | 9 +++ .../source/rtp_packet_history_unittest.cc | 75 +++++++++++++++++++ modules/rtp_rtcp/source/rtp_sender.cc | 10 ++- .../rtp_rtcp/source/rtp_sender_unittest.cc | 19 +++-- 5 files changed, 130 insertions(+), 12 deletions(-) diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc index b2dacc270d..65c8c607b9 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history.cc @@ -118,6 +118,10 @@ void RtpPacketHistory::PutRtpPacket(std::unique_ptr packet, stored_packet.send_time_ms = send_time_ms; stored_packet.storage_type = type; stored_packet.times_retransmitted = 0; + // No send time indicates packet is not sent immediately, but instead will + // be put in the pacer queue and later retrieved via + // GetPacketAndSetSendTime(). + stored_packet.pending_transmission = !send_time_ms.has_value(); if (!start_seqno_) { start_seqno_ = rtp_seq_no; @@ -150,14 +154,17 @@ std::unique_ptr RtpPacketHistory::GetPacketAndSetSendTime( ++packet.times_retransmitted; } - // Update send-time and return copy of packet instance. + // Update send-time and mark as no long in pacer queue. packet.send_time_ms = now_ms; + packet.pending_transmission = false; if (packet.storage_type == StorageType::kDontRetransmit) { // Non retransmittable packet, so call must come from paced sender. // Remove from history and return actual packet instance. return RemovePacket(rtp_it); } + + // Return copy of packet instance since it may need to be retransmitted. return absl::make_unique(*packet.packet); } @@ -249,6 +256,21 @@ void RtpPacketHistory::CullAcknowledgedPackets( } } +bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) { + rtc::CritScope cs(&lock_); + if (mode_ == StorageMode::kDisabled) { + return false; + } + + auto rtp_it = packet_history_.find(sequence_number); + if (rtp_it == packet_history_.end()) { + return false; + } + + rtp_it->second.pending_transmission = true; + return true; +} + void RtpPacketHistory::Reset() { packet_history_.clear(); packet_size_.clear(); @@ -270,8 +292,8 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) { } const StoredPacket& stored_packet = stored_packet_it->second; - if (!stored_packet.send_time_ms) { - // Don't remove packets that have not been sent. + if (stored_packet_it->second.pending_transmission) { + // Don't remove packets in the pacer queue, pending tranmission. return; } @@ -341,6 +363,7 @@ RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState( state.ssrc = stored_packet.packet->Ssrc(); state.packet_size = stored_packet.packet->size(); state.times_retransmitted = stored_packet.times_retransmitted; + state.pending_transmission = stored_packet.pending_transmission; return state; } diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h index cf87ddf205..811d97e4cd 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.h +++ b/modules/rtp_rtcp/source/rtp_packet_history.h @@ -47,6 +47,7 @@ class RtpPacketHistory { size_t packet_size = 0; // Number of times RE-transmitted, ie not including the first transmission. size_t times_retransmitted = 0; + bool pending_transmission = false; }; // Maximum number of packets we ever allow in the history. @@ -92,6 +93,11 @@ class RtpPacketHistory { // Cull packets that have been acknowledged as received by the remote end. void CullAcknowledgedPackets(rtc::ArrayView sequence_numbers); + // Mark packet as queued for transmission. This will prevent premature + // removal or duplicate retransmissions in the pacer queue. + // Returns true if status was set, false if packet was not found. + bool SetPendingTransmission(uint16_t sequence_number); + private: struct StoredPacket { StoredPacket(); @@ -111,6 +117,9 @@ class RtpPacketHistory { // The actual packet. std::unique_ptr packet; + + // True if the packet is currently in the pacer queue pending transmission. + bool pending_transmission = false; }; using StoredPacketIterator = std::map::iterator; diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc index c10fbb623f..5a251d9275 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc @@ -636,4 +636,79 @@ TEST_F(RtpPacketHistoryTest, CullWithAcks) { EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value()); } +TEST_F(RtpPacketHistoryTest, SetsPendingTransmissionState) { + const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2; + hist_.SetRtt(kRttMs); + + // Set size to remove old packets as soon as possible. + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1); + + // Add a packet, without send time, indicating it's in pacer queue. + hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission, + /* send_time_ms = */ absl::nullopt); + + // Packet is pending transmission. + absl::optional packet_state = + hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_TRUE(packet_state->pending_transmission); + + // Packet sent, state should be back to non-pending. + EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum)); + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_FALSE(packet_state->pending_transmission); + + // Time for a retransmission. + fake_clock_.AdvanceTimeMilliseconds(kRttMs); + EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum)); + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_TRUE(packet_state->pending_transmission); + + // Packet sent. + EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum)); + // Too early for retransmission. + ASSERT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value()); + + // Retransmission allowed again, it's not in a pending state. + fake_clock_.AdvanceTimeMilliseconds(kRttMs); + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_FALSE(packet_state->pending_transmission); +} + +TEST_F(RtpPacketHistoryTest, DontRemovePendingTransmissions) { + const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2; + const int64_t kPacketTimeoutMs = + kRttMs * RtpPacketHistory::kMinPacketDurationRtt; + + // Set size to remove old packets as soon as possible. + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1); + hist_.SetRtt(kRttMs); + + // Add a sent packet. + hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + + // Advance clock to just before packet timeout. + fake_clock_.AdvanceTimeMilliseconds(kPacketTimeoutMs - 1); + // Mark as enqueued in pacer. + EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum)); + + // Advance clock to where packet would have timed out. It should still + // be there and pending. + fake_clock_.AdvanceTimeMilliseconds(1); + absl::optional packet_state = + hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_TRUE(packet_state->pending_transmission); + + // Packet sent. Now it can be removed. + EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum)); + hist_.SetRtt(kRttMs); // Force culling of old packets. + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_FALSE(packet_state.has_value()); +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender.cc b/modules/rtp_rtcp/source/rtp_sender.cc index 55095d7511..b60114bc6b 100644 --- a/modules/rtp_rtcp/source/rtp_sender.cc +++ b/modules/rtp_rtcp/source/rtp_sender.cc @@ -451,8 +451,8 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id) { // don't retransmit too often. absl::optional stored_packet = packet_history_.GetPacketState(packet_id); - if (!stored_packet) { - // Packet not found. + if (!stored_packet || stored_packet->pending_transmission) { + // Packet not found or already queued for retransmission, ignore. return 0; } @@ -468,6 +468,12 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id) { } if (paced_sender_) { + // Mark packet as being in pacer queue again, to prevent duplicates. + if (!packet_history_.SetPendingTransmission(packet_id)) { + // Packet has already been removed from history, return early. + return 0; + } + // Convert from TickTime to Clock since capture_time_ms is based on // TickTime. int64_t corrected_capture_tims_ms = diff --git a/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_unittest.cc index ba841c2d95..47f2856337 100644 --- a/modules/rtp_rtcp/source/rtp_sender_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_sender_unittest.cc @@ -797,23 +797,28 @@ TEST_P(RtpSenderTest, TrafficSmoothingRetransmits) { EXPECT_TRUE(rtp_sender_->SendToNetwork(std::move(packet), kAllowRetransmission, RtpPacketSender::kNormalPriority)); + // Immediately process send bucket and send packet. + rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false, + PacedPacketInfo()); + EXPECT_EQ(1, transport_.packets_sent()); - EXPECT_EQ(0, transport_.packets_sent()); - - EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority, - kSsrc, kSeqNum, _, _, _)); - + // Retransmit packet. const int kStoredTimeInMs = 100; fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs); + EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority, + kSsrc, kSeqNum, _, _, _)); + EXPECT_CALL(mock_rtc_event_log_, + LogProxy(SameRtcEventTypeAs(RtcEvent::Type::RtpPacketOutgoing))); + EXPECT_EQ(static_cast(packet_size), rtp_sender_->ReSendPacket(kSeqNum)); - EXPECT_EQ(0, transport_.packets_sent()); + EXPECT_EQ(1, transport_.packets_sent()); rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false, PacedPacketInfo()); // Process send bucket. Packet should now be sent. - EXPECT_EQ(1, transport_.packets_sent()); + EXPECT_EQ(2, transport_.packets_sent()); EXPECT_EQ(packet_size, transport_.last_sent_packet().size()); webrtc::RTPHeader rtp_header;