diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc index b2dacc270d..65c8c607b9 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history.cc @@ -118,6 +118,10 @@ void RtpPacketHistory::PutRtpPacket(std::unique_ptr packet, stored_packet.send_time_ms = send_time_ms; stored_packet.storage_type = type; stored_packet.times_retransmitted = 0; + // No send time indicates packet is not sent immediately, but instead will + // be put in the pacer queue and later retrieved via + // GetPacketAndSetSendTime(). + stored_packet.pending_transmission = !send_time_ms.has_value(); if (!start_seqno_) { start_seqno_ = rtp_seq_no; @@ -150,14 +154,17 @@ std::unique_ptr RtpPacketHistory::GetPacketAndSetSendTime( ++packet.times_retransmitted; } - // Update send-time and return copy of packet instance. + // Update send-time and mark as no long in pacer queue. packet.send_time_ms = now_ms; + packet.pending_transmission = false; if (packet.storage_type == StorageType::kDontRetransmit) { // Non retransmittable packet, so call must come from paced sender. // Remove from history and return actual packet instance. return RemovePacket(rtp_it); } + + // Return copy of packet instance since it may need to be retransmitted. return absl::make_unique(*packet.packet); } @@ -249,6 +256,21 @@ void RtpPacketHistory::CullAcknowledgedPackets( } } +bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) { + rtc::CritScope cs(&lock_); + if (mode_ == StorageMode::kDisabled) { + return false; + } + + auto rtp_it = packet_history_.find(sequence_number); + if (rtp_it == packet_history_.end()) { + return false; + } + + rtp_it->second.pending_transmission = true; + return true; +} + void RtpPacketHistory::Reset() { packet_history_.clear(); packet_size_.clear(); @@ -270,8 +292,8 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) { } const StoredPacket& stored_packet = stored_packet_it->second; - if (!stored_packet.send_time_ms) { - // Don't remove packets that have not been sent. + if (stored_packet_it->second.pending_transmission) { + // Don't remove packets in the pacer queue, pending tranmission. return; } @@ -341,6 +363,7 @@ RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState( state.ssrc = stored_packet.packet->Ssrc(); state.packet_size = stored_packet.packet->size(); state.times_retransmitted = stored_packet.times_retransmitted; + state.pending_transmission = stored_packet.pending_transmission; return state; } diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h index cf87ddf205..811d97e4cd 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.h +++ b/modules/rtp_rtcp/source/rtp_packet_history.h @@ -47,6 +47,7 @@ class RtpPacketHistory { size_t packet_size = 0; // Number of times RE-transmitted, ie not including the first transmission. size_t times_retransmitted = 0; + bool pending_transmission = false; }; // Maximum number of packets we ever allow in the history. @@ -92,6 +93,11 @@ class RtpPacketHistory { // Cull packets that have been acknowledged as received by the remote end. void CullAcknowledgedPackets(rtc::ArrayView sequence_numbers); + // Mark packet as queued for transmission. This will prevent premature + // removal or duplicate retransmissions in the pacer queue. + // Returns true if status was set, false if packet was not found. + bool SetPendingTransmission(uint16_t sequence_number); + private: struct StoredPacket { StoredPacket(); @@ -111,6 +117,9 @@ class RtpPacketHistory { // The actual packet. std::unique_ptr packet; + + // True if the packet is currently in the pacer queue pending transmission. + bool pending_transmission = false; }; using StoredPacketIterator = std::map::iterator; diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc index c10fbb623f..5a251d9275 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc @@ -636,4 +636,79 @@ TEST_F(RtpPacketHistoryTest, CullWithAcks) { EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value()); } +TEST_F(RtpPacketHistoryTest, SetsPendingTransmissionState) { + const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2; + hist_.SetRtt(kRttMs); + + // Set size to remove old packets as soon as possible. + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1); + + // Add a packet, without send time, indicating it's in pacer queue. + hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission, + /* send_time_ms = */ absl::nullopt); + + // Packet is pending transmission. + absl::optional packet_state = + hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_TRUE(packet_state->pending_transmission); + + // Packet sent, state should be back to non-pending. + EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum)); + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_FALSE(packet_state->pending_transmission); + + // Time for a retransmission. + fake_clock_.AdvanceTimeMilliseconds(kRttMs); + EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum)); + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_TRUE(packet_state->pending_transmission); + + // Packet sent. + EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum)); + // Too early for retransmission. + ASSERT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value()); + + // Retransmission allowed again, it's not in a pending state. + fake_clock_.AdvanceTimeMilliseconds(kRttMs); + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_FALSE(packet_state->pending_transmission); +} + +TEST_F(RtpPacketHistoryTest, DontRemovePendingTransmissions) { + const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2; + const int64_t kPacketTimeoutMs = + kRttMs * RtpPacketHistory::kMinPacketDurationRtt; + + // Set size to remove old packets as soon as possible. + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1); + hist_.SetRtt(kRttMs); + + // Add a sent packet. + hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + + // Advance clock to just before packet timeout. + fake_clock_.AdvanceTimeMilliseconds(kPacketTimeoutMs - 1); + // Mark as enqueued in pacer. + EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum)); + + // Advance clock to where packet would have timed out. It should still + // be there and pending. + fake_clock_.AdvanceTimeMilliseconds(1); + absl::optional packet_state = + hist_.GetPacketState(kStartSeqNum); + ASSERT_TRUE(packet_state.has_value()); + EXPECT_TRUE(packet_state->pending_transmission); + + // Packet sent. Now it can be removed. + EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum)); + hist_.SetRtt(kRttMs); // Force culling of old packets. + packet_state = hist_.GetPacketState(kStartSeqNum); + ASSERT_FALSE(packet_state.has_value()); +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender.cc b/modules/rtp_rtcp/source/rtp_sender.cc index 55095d7511..b60114bc6b 100644 --- a/modules/rtp_rtcp/source/rtp_sender.cc +++ b/modules/rtp_rtcp/source/rtp_sender.cc @@ -451,8 +451,8 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id) { // don't retransmit too often. absl::optional stored_packet = packet_history_.GetPacketState(packet_id); - if (!stored_packet) { - // Packet not found. + if (!stored_packet || stored_packet->pending_transmission) { + // Packet not found or already queued for retransmission, ignore. return 0; } @@ -468,6 +468,12 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id) { } if (paced_sender_) { + // Mark packet as being in pacer queue again, to prevent duplicates. + if (!packet_history_.SetPendingTransmission(packet_id)) { + // Packet has already been removed from history, return early. + return 0; + } + // Convert from TickTime to Clock since capture_time_ms is based on // TickTime. int64_t corrected_capture_tims_ms = diff --git a/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_unittest.cc index ba841c2d95..47f2856337 100644 --- a/modules/rtp_rtcp/source/rtp_sender_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_sender_unittest.cc @@ -797,23 +797,28 @@ TEST_P(RtpSenderTest, TrafficSmoothingRetransmits) { EXPECT_TRUE(rtp_sender_->SendToNetwork(std::move(packet), kAllowRetransmission, RtpPacketSender::kNormalPriority)); + // Immediately process send bucket and send packet. + rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false, + PacedPacketInfo()); + EXPECT_EQ(1, transport_.packets_sent()); - EXPECT_EQ(0, transport_.packets_sent()); - - EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority, - kSsrc, kSeqNum, _, _, _)); - + // Retransmit packet. const int kStoredTimeInMs = 100; fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs); + EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority, + kSsrc, kSeqNum, _, _, _)); + EXPECT_CALL(mock_rtc_event_log_, + LogProxy(SameRtcEventTypeAs(RtcEvent::Type::RtpPacketOutgoing))); + EXPECT_EQ(static_cast(packet_size), rtp_sender_->ReSendPacket(kSeqNum)); - EXPECT_EQ(0, transport_.packets_sent()); + EXPECT_EQ(1, transport_.packets_sent()); rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false, PacedPacketInfo()); // Process send bucket. Packet should now be sent. - EXPECT_EQ(1, transport_.packets_sent()); + EXPECT_EQ(2, transport_.packets_sent()); EXPECT_EQ(packet_size, transport_.last_sent_packet().size()); webrtc::RTPHeader rtp_header;