Reland "Improve performance of RtpPacketHistory"

This is a reland of 9e380fd484db09c37323b90a19c5ce7965927975

Patchset 1 is the original CL. The follow-ups adds fix for a test failure
and test for that change.

Original change's description:
> Improve performance of RtpPacketHistory
>
> The data structures in RtpPacketHistory were chosen based on assumption
> of few packets with possible sparse segments due to missing acking.
> In practice high bitrate usages with full histories seem to be more of
> a problem.
> Due to that, change storage from an std::map to an std::deque and live
> with potential segments of nullptr. Also limit size of padding prio
> set so that doesn't become a bottleneck.
>
> Bug: webrtc:8975
> Change-Id: I3b6314fb3255937d25362ff2cd906efb7b1397f7
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/145901
> Commit-Queue: Erik Språng <sprang@webrtc.org>
> Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#29117}

Bug: webrtc:8975
Change-Id: I5038e5ad2eb79ce75710d2d8b0b3ac01dd41c013
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/152282
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29152}
This commit is contained in:
Erik Språng 2019-09-11 12:20:36 +02:00 committed by Commit Bot
parent 9a91161b9f
commit 2d7b2f5f72
3 changed files with 195 additions and 100 deletions

View File

@ -23,6 +23,7 @@
namespace webrtc {
constexpr size_t RtpPacketHistory::kMaxCapacity;
constexpr size_t RtpPacketHistory::kMaxPaddingtHistory;
constexpr int64_t RtpPacketHistory::kMinPacketDurationMs;
constexpr int RtpPacketHistory::kMinPacketDurationRtt;
constexpr int RtpPacketHistory::kPacketCullingDelayFactor;
@ -130,18 +131,36 @@ void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
// Store packet.
const uint16_t rtp_seq_no = packet->SequenceNumber();
auto packet_it = packet_history_.emplace(
rtp_seq_no,
StoredPacket(std::move(packet), send_time_ms, packets_inserted_++));
RTC_DCHECK(packet_it.second) << "Failed to insert packet in history.";
StoredPacket& stored_packet = packet_it.first->second;
if (!start_seqno_) {
start_seqno_ = rtp_seq_no;
int packet_index = GetPacketIndex(rtp_seq_no);
if (packet_index >= 0u &&
static_cast<size_t>(packet_index) < packet_history_.size() &&
packet_history_[packet_index].packet_ != nullptr) {
RTC_LOG(LS_WARNING) << "Duplicate packet inserted: " << rtp_seq_no;
// Remove previous packet to avoid inconsistent state.
RemovePacket(packet_index);
packet_index = GetPacketIndex(rtp_seq_no);
}
// Store the sequence number of the last send packet with this size.
auto prio_it = padding_priority_.insert(&stored_packet);
// Packet to be inserted ahead of first packet, expand front.
for (; packet_index < 0; ++packet_index) {
packet_history_.emplace_front(nullptr, absl::nullopt, 0);
}
// Packet to be inserted behind last packet, expand back.
while (static_cast<int>(packet_history_.size()) <= packet_index) {
packet_history_.emplace_back(nullptr, absl::nullopt, 0);
}
RTC_DCHECK_GE(packet_index, 0);
RTC_DCHECK_LT(packet_index, packet_history_.size());
RTC_DCHECK(packet_history_[packet_index].packet_ == nullptr);
packet_history_[packet_index] =
StoredPacket(std::move(packet), send_time_ms, packets_inserted_++);
if (padding_priority_.size() >= kMaxPaddingtHistory - 1) {
padding_priority_.erase(std::prev(padding_priority_.end()));
}
auto prio_it = padding_priority_.insert(&packet_history_[packet_index]);
RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set.";
}
@ -152,27 +171,26 @@ std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndSetSendTime(
return nullptr;
}
StoredPacket* packet = GetStoredPacket(sequence_number);
if (packet == nullptr) {
return nullptr;
}
int64_t now_ms = clock_->TimeInMilliseconds();
StoredPacketIterator rtp_it = packet_history_.find(sequence_number);
if (rtp_it == packet_history_.end()) {
if (!VerifyRtt(*packet, now_ms)) {
return nullptr;
}
StoredPacket& packet = rtp_it->second;
if (!VerifyRtt(rtp_it->second, now_ms)) {
return nullptr;
}
if (packet.send_time_ms_) {
packet.IncrementTimesRetransmitted(&padding_priority_);
if (packet->send_time_ms_) {
packet->IncrementTimesRetransmitted(&padding_priority_);
}
// Update send-time and mark as no long in pacer queue.
packet.send_time_ms_ = now_ms;
packet.pending_transmission_ = false;
packet->send_time_ms_ = now_ms;
packet->pending_transmission_ = false;
// Return copy of packet instance since it may need to be retransmitted again.
return absl::make_unique<RtpPacketToSend>(*packet.packet_);
// Return copy of packet instance since it may need to be retransmitted.
return absl::make_unique<RtpPacketToSend>(*packet->packet_);
}
std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
@ -192,29 +210,26 @@ std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
return nullptr;
}
int64_t now_ms = clock_->TimeInMilliseconds();
StoredPacketIterator rtp_it = packet_history_.find(sequence_number);
if (rtp_it == packet_history_.end()) {
StoredPacket* packet = GetStoredPacket(sequence_number);
if (packet == nullptr) {
return nullptr;
}
StoredPacket& packet = rtp_it->second;
if (packet.pending_transmission_) {
if (packet->pending_transmission_) {
// Packet already in pacer queue, ignore this request.
return nullptr;
}
if (!VerifyRtt(rtp_it->second, now_ms)) {
if (!VerifyRtt(*packet, clock_->TimeInMilliseconds())) {
// Packet already resent within too short a time window, ignore.
return nullptr;
}
// Copy and/or encapsulate packet.
std::unique_ptr<RtpPacketToSend> encapsulated_packet =
encapsulate(*packet.packet_);
encapsulate(*packet->packet_);
if (encapsulated_packet) {
packet.pending_transmission_ = true;
packet->pending_transmission_ = true;
}
return encapsulated_packet;
@ -226,20 +241,18 @@ void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) {
return;
}
int64_t now_ms = clock_->TimeInMilliseconds();
StoredPacketIterator rtp_it = packet_history_.find(sequence_number);
if (rtp_it == packet_history_.end()) {
StoredPacket* packet = GetStoredPacket(sequence_number);
if (packet == nullptr) {
return;
}
StoredPacket& packet = rtp_it->second;
RTC_DCHECK(packet.send_time_ms_);
RTC_DCHECK(packet->send_time_ms_);
// Update send-time, mark as no longer in pacer queue, and increment
// transmission count.
packet.send_time_ms_ = now_ms;
packet.pending_transmission_ = false;
packet.IncrementTimesRetransmitted(&padding_priority_);
packet->send_time_ms_ = clock_->TimeInMilliseconds();
packet->pending_transmission_ = false;
packet->IncrementTimesRetransmitted(&padding_priority_);
}
absl::optional<RtpPacketHistory::PacketState> RtpPacketHistory::GetPacketState(
@ -249,16 +262,21 @@ absl::optional<RtpPacketHistory::PacketState> RtpPacketHistory::GetPacketState(
return absl::nullopt;
}
auto rtp_it = packet_history_.find(sequence_number);
if (rtp_it == packet_history_.end()) {
int packet_index = GetPacketIndex(sequence_number);
if (packet_index < 0 ||
static_cast<size_t>(packet_index) >= packet_history_.size()) {
return absl::nullopt;
}
const StoredPacket& packet = packet_history_[packet_index];
if (packet.packet_ == nullptr) {
return absl::nullopt;
}
if (!VerifyRtt(rtp_it->second, clock_->TimeInMilliseconds())) {
if (!VerifyRtt(packet, clock_->TimeInMilliseconds())) {
return absl::nullopt;
}
return StoredPacketToPacketState(rtp_it->second);
return StoredPacketToPacketState(packet);
}
bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet,
@ -317,15 +335,13 @@ std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket(
void RtpPacketHistory::CullAcknowledgedPackets(
rtc::ArrayView<const uint16_t> sequence_numbers) {
rtc::CritScope cs(&lock_);
if (mode_ == StorageMode::kDisabled) {
return;
}
for (uint16_t sequence_number : sequence_numbers) {
auto stored_packet_it = packet_history_.find(sequence_number);
if (stored_packet_it != packet_history_.end()) {
RemovePacket(stored_packet_it);
int packet_index = GetPacketIndex(sequence_number);
if (packet_index < 0 ||
static_cast<size_t>(packet_index) >= packet_history_.size()) {
continue;
}
RemovePacket(packet_index);
}
}
@ -335,12 +351,12 @@ bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) {
return false;
}
auto rtp_it = packet_history_.find(sequence_number);
if (rtp_it == packet_history_.end()) {
StoredPacket* packet = GetStoredPacket(sequence_number);
if (packet == nullptr) {
return false;
}
rtp_it->second.pending_transmission_ = true;
packet->pending_transmission_ = true;
return true;
}
@ -352,25 +368,21 @@ void RtpPacketHistory::Clear() {
void RtpPacketHistory::Reset() {
packet_history_.clear();
padding_priority_.clear();
start_seqno_.reset();
}
void RtpPacketHistory::CullOldPackets(int64_t now_ms) {
int64_t packet_duration_ms =
std::max(kMinPacketDurationRtt * rtt_ms_, kMinPacketDurationMs);
while (!packet_history_.empty()) {
auto stored_packet_it = packet_history_.find(*start_seqno_);
RTC_DCHECK(stored_packet_it != packet_history_.end());
if (packet_history_.size() >= kMaxCapacity) {
// We have reached the absolute max capacity, remove one packet
// unconditionally.
RemovePacket(stored_packet_it);
RemovePacket(0);
continue;
}
const StoredPacket& stored_packet = stored_packet_it->second;
if (stored_packet_it->second.pending_transmission_) {
const StoredPacket& stored_packet = packet_history_.front();
if (stored_packet.pending_transmission_) {
// Don't remove packets in the pacer queue, pending tranmission.
return;
}
@ -386,7 +398,7 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) {
now_ms) {
// Too many packets in history, or this packet has timed out. Remove it
// and continue.
RemovePacket(stored_packet_it);
RemovePacket(0);
} else {
// No more packets can be removed right now.
return;
@ -395,46 +407,60 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) {
}
std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket(
StoredPacketIterator packet_it) {
int packet_index) {
// Move the packet out from the StoredPacket container.
std::unique_ptr<RtpPacketToSend> rtp_packet =
std::move(packet_it->second.packet_);
// Check if this is the oldest packet in the history, as this must be updated
// in order to cull old packets.
const bool is_first_packet = packet_it->first == start_seqno_;
std::move(packet_history_[packet_index].packet_);
// Erase from padding priority set, if eligible.
size_t num_erased = padding_priority_.erase(&packet_it->second);
RTC_DCHECK_EQ(num_erased, 1)
<< "Failed to remove one packet from prio set, got " << num_erased;
if (num_erased != 1) {
RTC_LOG(LS_ERROR) << "RtpPacketHistory in inconsistent state, resetting.";
Reset();
return nullptr;
}
padding_priority_.erase(&packet_history_[packet_index]);
// Erase the packet from the map, and capture iterator to the next one.
StoredPacketIterator next_it = packet_history_.erase(packet_it);
if (is_first_packet) {
// |next_it| now points to the next element, or to the end. If the end,
// check if we can wrap around.
if (next_it == packet_history_.end()) {
next_it = packet_history_.begin();
}
// Update |start_seq_no| to the new oldest item.
if (next_it != packet_history_.end()) {
start_seqno_ = next_it->first;
} else {
start_seqno_.reset();
if (packet_index == 0) {
while (!packet_history_.empty() &&
packet_history_.front().packet_ == nullptr) {
packet_history_.pop_front();
}
}
return rtp_packet;
}
int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const {
if (packet_history_.empty()) {
return 0;
}
RTC_DCHECK(packet_history_.front().packet_ != nullptr);
int first_seq = packet_history_.front().packet_->SequenceNumber();
if (first_seq == sequence_number) {
return 0;
}
int packet_index = sequence_number - first_seq;
constexpr int kSeqNumSpan = std::numeric_limits<uint16_t>::max() + 1;
if (IsNewerSequenceNumber(sequence_number, first_seq)) {
if (sequence_number < first_seq) {
// Forward wrap.
packet_index += kSeqNumSpan;
}
} else if (sequence_number > first_seq) {
// Backwards wrap.
packet_index -= kSeqNumSpan;
}
return packet_index;
}
RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket(
uint16_t sequence_number) {
int index = GetPacketIndex(sequence_number);
if (index < 0 || static_cast<size_t>(index) >= packet_history_.size()) {
return nullptr;
}
return &packet_history_[index];
}
RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState(
const RtpPacketHistory::StoredPacket& stored_packet) {
RtpPacketHistory::PacketState state;

View File

@ -11,6 +11,7 @@
#ifndef MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_
#define MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_
#include <deque>
#include <map>
#include <memory>
#include <set>
@ -53,6 +54,8 @@ class RtpPacketHistory {
// Maximum number of packets we ever allow in the history.
static constexpr size_t kMaxCapacity = 9600;
// Maximum number of entries in prioritized queue of padding packets.
static constexpr size_t kMaxPaddingtHistory = 63;
// Don't remove packets within max(1000ms, 3x RTT).
static constexpr int64_t kMinPacketDurationMs = 1000;
static constexpr int kMinPacketDurationRtt = 3;
@ -171,8 +174,6 @@ class RtpPacketHistory {
bool operator()(StoredPacket* lhs, StoredPacket* rhs) const;
};
using StoredPacketIterator = std::map<uint16_t, StoredPacket>::iterator;
// Helper method used by GetPacketAndSetSendTime() and GetPacketState() to
// check if packet has too recently been sent.
bool VerifyRtt(const StoredPacket& packet, int64_t now_ms) const
@ -181,7 +182,11 @@ class RtpPacketHistory {
void CullOldPackets(int64_t now_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Removes the packet from the history, and context/mapping that has been
// stored. Returns the RTP packet instance contained within the StoredPacket.
std::unique_ptr<RtpPacketToSend> RemovePacket(StoredPacketIterator packet)
std::unique_ptr<RtpPacketToSend> RemovePacket(int packet_index)
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
int GetPacketIndex(uint16_t sequence_number) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
StoredPacket* GetStoredPacket(uint16_t sequence_number)
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
static PacketState StoredPacketToPacketState(
const StoredPacket& stored_packet);
@ -192,8 +197,13 @@ class RtpPacketHistory {
StorageMode mode_ RTC_GUARDED_BY(lock_);
int64_t rtt_ms_ RTC_GUARDED_BY(lock_);
// Map from rtp sequence numbers to stored packet.
std::map<uint16_t, StoredPacket> packet_history_ RTC_GUARDED_BY(lock_);
// Queue of stored packets, ordered by sequence number, with older packets in
// the front and new packets being added to the back. Note that there may be
// wrap-arounds so the back may have a lower sequence number.
// Packets may also be removed out-of-order, in which case there will be
// instances of StoredPacket with |packet_| set to nullptr. The first and last
// entry in the queue will however always be populated.
std::deque<StoredPacket> packet_history_ RTC_GUARDED_BY(lock_);
// Total number of packets with inserted.
uint64_t packets_inserted_ RTC_GUARDED_BY(lock_);
@ -201,10 +211,6 @@ class RtpPacketHistory {
// in GetPayloadPaddingPacket().
PacketPrioritySet padding_priority_ RTC_GUARDED_BY(lock_);
// The earliest packet in the history. This might not be the lowest sequence
// number, in case there is a wraparound.
absl::optional<uint16_t> start_seqno_ RTC_GUARDED_BY(lock_);
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtpPacketHistory);
};
} // namespace webrtc

View File

@ -291,6 +291,38 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) {
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
}
TEST_F(RtpPacketHistoryTest, RemovesLowestPrioPaddingWhenAtMaxCapacity) {
// Tests the absolute upper bound on number of packets in the prioritized
// set of potential padding packets.
const size_t kMaxNumPackets = RtpPacketHistory::kMaxPaddingtHistory;
hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets * 2);
hist_.SetRtt(1);
// Add packets until the max is reached, and then yet another one.
for (size_t i = 0; i < kMaxNumPackets + 1; ++i) {
std::unique_ptr<RtpPacketToSend> packet =
CreateRtpPacket(To16u(kStartSeqNum + i));
// Don't mark packets as sent, preventing them from being removed.
hist_.PutRtpPacket(std::move(packet), fake_clock_.TimeInMilliseconds());
}
// Advance time to allow retransmission/padding.
fake_clock_.AdvanceTimeMilliseconds(1);
// The oldest packet will be least prioritized and has fallen out of the
// priority set.
for (size_t i = kMaxNumPackets - 1; i > 0; --i) {
auto packet = hist_.GetPayloadPaddingPacket();
ASSERT_TRUE(packet);
EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + i + 1));
}
// Wrap around to newest padding packet again.
auto packet = hist_.GetPayloadPaddingPacket();
ASSERT_TRUE(packet);
EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + kMaxNumPackets));
}
TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) {
const size_t kMaxNumPackets = 10;
hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets);
@ -704,4 +736,35 @@ TEST_F(RtpPacketHistoryTest, PayloadPaddingWithEncapsulation) {
EXPECT_EQ(padding_packet->SequenceNumber(), kStartSeqNum + 1);
}
TEST_F(RtpPacketHistoryTest, OutOfOrderInsertRemoval) {
hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 10);
// Insert packets, out of order, including both forwards and backwards
// sequence number wraps.
const int seq_offsets[] = {0, 1, -1, 2, -2, 3, -3};
const int64_t start_time_ms = fake_clock_.TimeInMilliseconds();
for (int offset : seq_offsets) {
uint16_t seq_no = To16u(kStartSeqNum + offset);
std::unique_ptr<RtpPacketToSend> packet = CreateRtpPacket(seq_no);
packet->SetPayloadSize(50);
hist_.PutRtpPacket(std::move(packet), fake_clock_.TimeInMilliseconds());
hist_.GetPacketAndSetSendTime(seq_no);
fake_clock_.AdvanceTimeMilliseconds(33);
}
// Check packet are there and remove them in the same out-of-order fashion.
int64_t expected_time_offset_ms = 0;
for (int offset : seq_offsets) {
uint16_t seq_no = To16u(kStartSeqNum + offset);
absl::optional<RtpPacketHistory::PacketState> packet_state =
hist_.GetPacketState(seq_no);
ASSERT_TRUE(packet_state.has_value());
EXPECT_EQ(packet_state->send_time_ms,
start_time_ms + expected_time_offset_ms);
std::vector<uint16_t> acked_sequence_numbers = {seq_no};
hist_.CullAcknowledgedPackets(acked_sequence_numbers);
expected_time_offset_ms += 33;
}
}
} // namespace webrtc