diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc index 65c8c607b9..0930b826be 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history.cc @@ -44,17 +44,54 @@ RtpPacketHistory::PacketState::PacketState() = default; RtpPacketHistory::PacketState::PacketState(const PacketState&) = default; RtpPacketHistory::PacketState::~PacketState() = default; -RtpPacketHistory::StoredPacket::StoredPacket() = default; +RtpPacketHistory::StoredPacket::StoredPacket( + std::unique_ptr packet, + StorageType storage_type, + absl::optional send_time_ms, + uint64_t insert_order) + : send_time_ms_(send_time_ms), + packet_(std::move(packet)), + // No send time indicates packet is not sent immediately, but instead will + // be put in the pacer queue and later retrieved via + // GetPacketAndSetSendTime(). + pending_transmission_(!send_time_ms.has_value()), + storage_type_(storage_type), + insert_order_(insert_order), + times_retransmitted_(0) {} + RtpPacketHistory::StoredPacket::StoredPacket(StoredPacket&&) = default; RtpPacketHistory::StoredPacket& RtpPacketHistory::StoredPacket::operator=( RtpPacketHistory::StoredPacket&&) = default; RtpPacketHistory::StoredPacket::~StoredPacket() = default; +void RtpPacketHistory::StoredPacket::IncrementTimesRetransmitted( + PacketPrioritySet* priority_set) { + // Check if this StoredPacket is in the priority set. If so, we need to remove + // it before updating |times_retransmitted_| since that is used in sorting, + // and then add it back. + const bool in_priority_set = priority_set->erase(this) > 0; + ++times_retransmitted_; + if (in_priority_set) { + priority_set->insert(this); + } +} + +bool RtpPacketHistory::MoreUseful::operator()(StoredPacket* lhs, + StoredPacket* rhs) const { + // Prefer to send packets we haven't already sent as padding. + if (lhs->times_retransmitted() != rhs->times_retransmitted()) { + return lhs->times_retransmitted() < rhs->times_retransmitted(); + } + // All else being equal, prefer newer packets. + return lhs->insert_order() > rhs->insert_order(); +} + RtpPacketHistory::RtpPacketHistory(Clock* clock) : clock_(clock), number_to_store_(0), mode_(StorageMode::kDisabled), - rtt_ms_(-1) {} + rtt_ms_(-1), + retransmittable_packets_inserted_(0) {} RtpPacketHistory::~RtpPacketHistory() {} @@ -99,36 +136,35 @@ void RtpPacketHistory::PutRtpPacket(std::unique_ptr packet, // Store packet. const uint16_t rtp_seq_no = packet->SequenceNumber(); - StoredPacket& stored_packet = packet_history_[rtp_seq_no]; - RTC_DCHECK(stored_packet.packet == nullptr); - if (stored_packet.packet) { + auto it = packet_history_.emplace( + rtp_seq_no, StoredPacket(std::move(packet), type, send_time_ms, + type != StorageType::kDontRetransmit + ? retransmittable_packets_inserted_++ + : 0)); + RTC_DCHECK(it.second); + StoredPacket& stored_packet = it.first->second; + if (stored_packet.packet_) { // It is an error if this happen. But it can happen if the sequence numbers // for some reason restart without that the history has been reset. - auto size_iterator = packet_size_.find(stored_packet.packet->size()); + auto size_iterator = packet_size_.find(stored_packet.packet_->size()); if (size_iterator != packet_size_.end() && - size_iterator->second == stored_packet.packet->SequenceNumber()) { + size_iterator->second == stored_packet.packet_->SequenceNumber()) { packet_size_.erase(size_iterator); } } - stored_packet.packet = std::move(packet); - if (stored_packet.packet->capture_time_ms() <= 0) { - stored_packet.packet->set_capture_time_ms(now_ms); + if (stored_packet.packet_->capture_time_ms() <= 0) { + stored_packet.packet_->set_capture_time_ms(now_ms); } - stored_packet.send_time_ms = send_time_ms; - stored_packet.storage_type = type; - stored_packet.times_retransmitted = 0; - // No send time indicates packet is not sent immediately, but instead will - // be put in the pacer queue and later retrieved via - // GetPacketAndSetSendTime(). - stored_packet.pending_transmission = !send_time_ms.has_value(); if (!start_seqno_) { start_seqno_ = rtp_seq_no; } + // Store the sequence number of the last send packet with this size. if (type != StorageType::kDontRetransmit) { - packet_size_[stored_packet.packet->size()] = rtp_seq_no; + packet_size_[stored_packet.packet_->size()] = rtp_seq_no; + padding_priority_.insert(&stored_packet); } } @@ -150,22 +186,23 @@ std::unique_ptr RtpPacketHistory::GetPacketAndSetSendTime( return nullptr; } - if (packet.send_time_ms) { - ++packet.times_retransmitted; + if (packet.storage_type() != StorageType::kDontRetransmit && + packet.send_time_ms_) { + packet.IncrementTimesRetransmitted(&padding_priority_); } // Update send-time and mark as no long in pacer queue. - packet.send_time_ms = now_ms; - packet.pending_transmission = false; + packet.send_time_ms_ = now_ms; + packet.pending_transmission_ = false; - if (packet.storage_type == StorageType::kDontRetransmit) { + if (packet.storage_type() == StorageType::kDontRetransmit) { // Non retransmittable packet, so call must come from paced sender. // Remove from history and return actual packet instance. return RemovePacket(rtp_it); } // Return copy of packet instance since it may need to be retransmitted. - return absl::make_unique(*packet.packet); + return absl::make_unique(*packet.packet_); } absl::optional RtpPacketHistory::GetPacketState( @@ -189,10 +226,10 @@ absl::optional RtpPacketHistory::GetPacketState( bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet, int64_t now_ms) const { - if (packet.send_time_ms) { + if (packet.send_time_ms_) { // Send-time already set, this check must be for a retransmission. - if (packet.times_retransmitted > 0 && - now_ms < *packet.send_time_ms + rtt_ms_) { + if (packet.times_retransmitted() > 0 && + now_ms < *packet.send_time_ms_ + rtt_ms_) { // This packet has already been retransmitted once, and the time since // that even is lower than on RTT. Ignore request as this packet is // likely already in the network pipe. @@ -205,7 +242,6 @@ bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet, std::unique_ptr RtpPacketHistory::GetBestFittingPacket( size_t packet_length) const { - // TODO(sprang): Make this smarter, taking retransmit count etc into account. rtc::CritScope cs(&lock_); if (packet_length < kMinPacketRequestBytes || packet_size_.empty()) { return nullptr; @@ -233,16 +269,41 @@ std::unique_ptr RtpPacketHistory::GetBestFittingPacket( RTC_DCHECK(false); return nullptr; } - if (!history_it->second.packet) { + if (!history_it->second.packet_) { RTC_LOG(LS_ERROR) << "Packet pointer is null in history for seq_no" << seq_no; RTC_DCHECK(false); return nullptr; } - RtpPacketToSend* best_packet = history_it->second.packet.get(); + RtpPacketToSend* best_packet = history_it->second.packet_.get(); return absl::make_unique(*best_packet); } +std::unique_ptr RtpPacketHistory::GetPayloadPaddingPacket() { + rtc::CritScope cs(&lock_); + RTC_DCHECK(mode_ != StorageMode::kDisabled); + if (padding_priority_.empty()) { + return nullptr; + } + + auto best_packet_it = padding_priority_.begin(); + StoredPacket* best_packet = *best_packet_it; + if (best_packet->pending_transmission_) { + // Because PacedSender releases it's lock when it calls + // TimeToSendPadding() there is the potential for a race where a new + // packet ends up here instead of the regular transmit path. In such a + // case, just return empty and it will be picked up on the next + // Process() call. + return nullptr; + } + + best_packet->send_time_ms_ = clock_->TimeInMilliseconds(); + best_packet->IncrementTimesRetransmitted(&padding_priority_); + + // Return a copy of the packet. + return absl::make_unique(*best_packet->packet_); +} + void RtpPacketHistory::CullAcknowledgedPackets( rtc::ArrayView sequence_numbers) { rtc::CritScope cs(&lock_); @@ -267,13 +328,14 @@ bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) { return false; } - rtp_it->second.pending_transmission = true; + rtp_it->second.pending_transmission_ = true; return true; } void RtpPacketHistory::Reset() { packet_history_.clear(); packet_size_.clear(); + padding_priority_.clear(); start_seqno_.reset(); } @@ -292,19 +354,19 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) { } const StoredPacket& stored_packet = stored_packet_it->second; - if (stored_packet_it->second.pending_transmission) { + if (stored_packet_it->second.pending_transmission_) { // Don't remove packets in the pacer queue, pending tranmission. return; } - if (*stored_packet.send_time_ms + packet_duration_ms > now_ms) { + if (*stored_packet.send_time_ms_ + packet_duration_ms > now_ms) { // Don't cull packets too early to avoid failed retransmission requests. return; } if (packet_history_.size() >= number_to_store_ || (mode_ == StorageMode::kStoreAndCull && - *stored_packet.send_time_ms + + *stored_packet.send_time_ms_ + (packet_duration_ms * kPacketCullingDelayFactor) <= now_ms)) { // Too many packets in history, or this packet has timed out. Remove it @@ -321,12 +383,17 @@ std::unique_ptr RtpPacketHistory::RemovePacket( StoredPacketIterator packet_it) { // Move the packet out from the StoredPacket container. std::unique_ptr rtp_packet = - std::move(packet_it->second.packet); + std::move(packet_it->second.packet_); // Check if this is the oldest packet in the history, as this must be updated // in order to cull old packets. const bool is_first_packet = packet_it->first == start_seqno_; + // Erase from padding priority set, if eligible. + if (packet_it->second.storage_type() != StorageType::kDontRetransmit) { + RTC_CHECK_EQ(padding_priority_.erase(&packet_it->second), 1); + } + // Erase the packet from the map, and capture iterator to the next one. StoredPacketIterator next_it = packet_history_.erase(packet_it); @@ -357,13 +424,13 @@ std::unique_ptr RtpPacketHistory::RemovePacket( RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState( const RtpPacketHistory::StoredPacket& stored_packet) { RtpPacketHistory::PacketState state; - state.rtp_sequence_number = stored_packet.packet->SequenceNumber(); - state.send_time_ms = stored_packet.send_time_ms; - state.capture_time_ms = stored_packet.packet->capture_time_ms(); - state.ssrc = stored_packet.packet->Ssrc(); - state.packet_size = stored_packet.packet->size(); - state.times_retransmitted = stored_packet.times_retransmitted; - state.pending_transmission = stored_packet.pending_transmission; + state.rtp_sequence_number = stored_packet.packet_->SequenceNumber(); + state.send_time_ms = stored_packet.send_time_ms_; + state.capture_time_ms = stored_packet.packet_->capture_time_ms(); + state.ssrc = stored_packet.packet_->Ssrc(); + state.packet_size = stored_packet.packet_->size(); + state.times_retransmitted = stored_packet.times_retransmitted(); + state.pending_transmission = stored_packet.pending_transmission_; return state; } diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h index 811d97e4cd..0246e8c056 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.h +++ b/modules/rtp_rtcp/source/rtp_packet_history.h @@ -13,6 +13,7 @@ #include #include +#include #include #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -90,6 +91,12 @@ class RtpPacketHistory { std::unique_ptr GetBestFittingPacket( size_t packet_size) const; + // Get the packet (if any) from the history, that is deemed most likely to + // the remote side. This is calculated from heuristics such as packet age + // and times retransmitted. Updated the send time of the packet, so is not + // a const method. + std::unique_ptr GetPayloadPaddingPacket(); + // Cull packets that have been acknowledged as received by the remote end. void CullAcknowledgedPackets(rtc::ArrayView sequence_numbers); @@ -99,27 +106,48 @@ class RtpPacketHistory { bool SetPendingTransmission(uint16_t sequence_number); private: - struct StoredPacket { - StoredPacket(); + struct MoreUseful; + class StoredPacket; + using PacketPrioritySet = std::set; + + class StoredPacket { + public: + StoredPacket(std::unique_ptr packet, + StorageType storage_type, + absl::optional send_time_ms, + uint64_t insert_order); StoredPacket(StoredPacket&&); StoredPacket& operator=(StoredPacket&&); ~StoredPacket(); + StorageType storage_type() const { return storage_type_; } + uint64_t insert_order() const { return insert_order_; } + size_t times_retransmitted() const { return times_retransmitted_; } + void IncrementTimesRetransmitted(PacketPrioritySet* priority_set); + // The time of last transmission, including retransmissions. - absl::optional send_time_ms; - - // Number of times RE-transmitted, ie excluding the first transmission. - size_t times_retransmitted = 0; - - // Storing a packet with |storage_type| = kDontRetransmit indicates this is - // only used as temporary storage until sent by the pacer sender. - StorageType storage_type = kDontRetransmit; + absl::optional send_time_ms_; // The actual packet. - std::unique_ptr packet; + std::unique_ptr packet_; // True if the packet is currently in the pacer queue pending transmission. - bool pending_transmission = false; + bool pending_transmission_; + + private: + // Storing a packet with |storage_type| = kDontRetransmit indicates this is + // only used as temporary storage until sent by the pacer sender. + StorageType storage_type_; + + // Unique number per StoredPacket, incremented by one for each added + // packet. Used to sort on insert order. + uint64_t insert_order_; + + // Number of times RE-transmitted, ie excluding the first transmission. + size_t times_retransmitted_; + }; + struct MoreUseful { + bool operator()(StoredPacket* lhs, StoredPacket* rhs) const; }; using StoredPacketIterator = std::map::iterator; @@ -148,6 +176,12 @@ class RtpPacketHistory { // Map from packet size to sequence number. std::map packet_size_ RTC_GUARDED_BY(lock_); + // Total number of packets with StorageType::kAllowsRetransmission inserted. + uint64_t retransmittable_packets_inserted_ RTC_GUARDED_BY(lock_); + // Retransmittable objects from |packet_history_| ordered by + // "most likely to be useful", used in GetPayloadPaddingPacket(). + PacketPrioritySet padding_priority_ RTC_GUARDED_BY(lock_); + // The earliest packet in the history. This might not be the lowest sequence // number, in case there is a wraparound. absl::optional start_seqno_ RTC_GUARDED_BY(lock_); diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc index 5a251d9275..5074ac6f3c 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc @@ -711,4 +711,59 @@ TEST_F(RtpPacketHistoryTest, DontRemovePendingTransmissions) { ASSERT_FALSE(packet_state.has_value()); } +TEST_F(RtpPacketHistoryTest, PrioritizedPayloadPadding) { + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1); + + // Add two sent packets, one millisecond apart. + hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + fake_clock_.AdvanceTimeMilliseconds(1); + + hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + fake_clock_.AdvanceTimeMilliseconds(1); + + // Latest packet given equal retransmission count. + EXPECT_EQ(hist_.GetPayloadPaddingPacket()->SequenceNumber(), + kStartSeqNum + 1); + + // Older packet has lower retransmission count. + EXPECT_EQ(hist_.GetPayloadPaddingPacket()->SequenceNumber(), kStartSeqNum); + + // Equal retransmission count again, use newest packet. + EXPECT_EQ(hist_.GetPayloadPaddingPacket()->SequenceNumber(), + kStartSeqNum + 1); + + // Older packet has lower retransmission count. + EXPECT_EQ(hist_.GetPayloadPaddingPacket()->SequenceNumber(), kStartSeqNum); + + // Remove newest packet. + hist_.CullAcknowledgedPackets(std::vector{kStartSeqNum + 1}); + + // Only older packet left. + EXPECT_EQ(hist_.GetPayloadPaddingPacket()->SequenceNumber(), kStartSeqNum); + + hist_.CullAcknowledgedPackets(std::vector{kStartSeqNum}); + + EXPECT_EQ(hist_.GetPayloadPaddingPacket(), nullptr); +} + +TEST_F(RtpPacketHistoryTest, NoPendingPacketAsPadding) { + hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1); + + hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission, + fake_clock_.TimeInMilliseconds()); + fake_clock_.AdvanceTimeMilliseconds(1); + + EXPECT_EQ(hist_.GetPayloadPaddingPacket()->SequenceNumber(), kStartSeqNum); + + // If packet is pending retransmission, don't try to use it as padding. + hist_.SetPendingTransmission(kStartSeqNum); + EXPECT_EQ(nullptr, hist_.GetPayloadPaddingPacket()); + + // Market it as no longer pending, should be usable as padding again. + hist_.GetPacketAndSetSendTime(kStartSeqNum); + EXPECT_EQ(hist_.GetPayloadPaddingPacket()->SequenceNumber(), kStartSeqNum); +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender.cc b/modules/rtp_rtcp/source/rtp_sender.cc index 044148db14..3c80cae0f1 100644 --- a/modules/rtp_rtcp/source/rtp_sender.cc +++ b/modules/rtp_rtcp/source/rtp_sender.cc @@ -151,6 +151,9 @@ RTPSender::RTPSender( .find("Enabled") == 0), legacy_packet_history_storage_mode_( field_trials.Lookup("WebRTC-UseRtpPacketHistoryLegacyStorageMode") + .find("Enabled") == 0), + payload_padding_prefer_useful_packets_( + field_trials.Lookup("WebRTC-PayloadPadding-UseMostUsefulPacket") .find("Enabled") == 0) { // This random initialization is not intended to be cryptographic strong. timestamp_offset_ = random_.Rand(); @@ -288,8 +291,13 @@ size_t RTPSender::TrySendRedundantPayloads(size_t bytes_to_send, int bytes_left = static_cast(bytes_to_send); while (bytes_left > 0) { - std::unique_ptr packet = - packet_history_.GetBestFittingPacket(bytes_left); + std::unique_ptr packet; + if (payload_padding_prefer_useful_packets_) { + packet = packet_history_.GetPayloadPaddingPacket(); + } else { + packet = packet_history_.GetBestFittingPacket(bytes_left); + } + if (!packet) break; size_t payload_size = packet->payload_size(); diff --git a/modules/rtp_rtcp/source/rtp_sender.h b/modules/rtp_rtcp/source/rtp_sender.h index 8c6d01266e..ac5cf467ad 100644 --- a/modules/rtp_rtcp/source/rtp_sender.h +++ b/modules/rtp_rtcp/source/rtp_sender.h @@ -301,6 +301,12 @@ class RTPSender : public AcknowledgedPacketsObserver { const bool send_side_bwe_with_overhead_; const bool legacy_packet_history_storage_mode_; + // Set by field trial "WebRTC-PayloadPadding-UseMostUsefulPacket". If set + // to "Enabled" this field will be true and + // packet_history_.GetPayloadPaddingPacket() will be called instead of + // packet_history_.GetBestFittingPacket() in TrySendRedundantPayloads(). + const bool payload_padding_prefer_useful_packets_; + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RTPSender); };