Don't remove or retransmit packets in the pacer queue.

The main purpose right now of this CL is to avoid the situation
where multiple retransmissions are queued for sending (normally after
network glitch with increased pacer queue length), and some of those
fail sending because the can't be retrieved from the packet history
due to too short time since last sent.

Bug: webrtc:8975, webrtc:10607
Change-Id: I9f6369d83f0b8208e5f57b2dc2fd3f2db7c6fea1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/135164
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27884}
This commit is contained in:
Erik Språng 2019-05-08 10:15:05 -07:00 committed by Commit Bot
parent daac58290e
commit 0f4f055ca6
5 changed files with 130 additions and 12 deletions

View File

@ -118,6 +118,10 @@ void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
stored_packet.send_time_ms = send_time_ms;
stored_packet.storage_type = type;
stored_packet.times_retransmitted = 0;
// No send time indicates packet is not sent immediately, but instead will
// be put in the pacer queue and later retrieved via
// GetPacketAndSetSendTime().
stored_packet.pending_transmission = !send_time_ms.has_value();
if (!start_seqno_) {
start_seqno_ = rtp_seq_no;
@ -150,14 +154,17 @@ std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndSetSendTime(
++packet.times_retransmitted;
}
// Update send-time and return copy of packet instance.
// Update send-time and mark as no long in pacer queue.
packet.send_time_ms = now_ms;
packet.pending_transmission = false;
if (packet.storage_type == StorageType::kDontRetransmit) {
// Non retransmittable packet, so call must come from paced sender.
// Remove from history and return actual packet instance.
return RemovePacket(rtp_it);
}
// Return copy of packet instance since it may need to be retransmitted.
return absl::make_unique<RtpPacketToSend>(*packet.packet);
}
@ -249,6 +256,21 @@ void RtpPacketHistory::CullAcknowledgedPackets(
}
}
bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) {
rtc::CritScope cs(&lock_);
if (mode_ == StorageMode::kDisabled) {
return false;
}
auto rtp_it = packet_history_.find(sequence_number);
if (rtp_it == packet_history_.end()) {
return false;
}
rtp_it->second.pending_transmission = true;
return true;
}
void RtpPacketHistory::Reset() {
packet_history_.clear();
packet_size_.clear();
@ -270,8 +292,8 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) {
}
const StoredPacket& stored_packet = stored_packet_it->second;
if (!stored_packet.send_time_ms) {
// Don't remove packets that have not been sent.
if (stored_packet_it->second.pending_transmission) {
// Don't remove packets in the pacer queue, pending tranmission.
return;
}
@ -341,6 +363,7 @@ RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState(
state.ssrc = stored_packet.packet->Ssrc();
state.packet_size = stored_packet.packet->size();
state.times_retransmitted = stored_packet.times_retransmitted;
state.pending_transmission = stored_packet.pending_transmission;
return state;
}

View File

@ -47,6 +47,7 @@ class RtpPacketHistory {
size_t packet_size = 0;
// Number of times RE-transmitted, ie not including the first transmission.
size_t times_retransmitted = 0;
bool pending_transmission = false;
};
// Maximum number of packets we ever allow in the history.
@ -92,6 +93,11 @@ class RtpPacketHistory {
// Cull packets that have been acknowledged as received by the remote end.
void CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers);
// Mark packet as queued for transmission. This will prevent premature
// removal or duplicate retransmissions in the pacer queue.
// Returns true if status was set, false if packet was not found.
bool SetPendingTransmission(uint16_t sequence_number);
private:
struct StoredPacket {
StoredPacket();
@ -111,6 +117,9 @@ class RtpPacketHistory {
// The actual packet.
std::unique_ptr<RtpPacketToSend> packet;
// True if the packet is currently in the pacer queue pending transmission.
bool pending_transmission = false;
};
using StoredPacketIterator = std::map<uint16_t, StoredPacket>::iterator;

View File

@ -636,4 +636,79 @@ TEST_F(RtpPacketHistoryTest, CullWithAcks) {
EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value());
}
TEST_F(RtpPacketHistoryTest, SetsPendingTransmissionState) {
const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2;
hist_.SetRtt(kRttMs);
// Set size to remove old packets as soon as possible.
hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1);
// Add a packet, without send time, indicating it's in pacer queue.
hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission,
/* send_time_ms = */ absl::nullopt);
// Packet is pending transmission.
absl::optional<RtpPacketHistory::PacketState> packet_state =
hist_.GetPacketState(kStartSeqNum);
ASSERT_TRUE(packet_state.has_value());
EXPECT_TRUE(packet_state->pending_transmission);
// Packet sent, state should be back to non-pending.
EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum));
packet_state = hist_.GetPacketState(kStartSeqNum);
ASSERT_TRUE(packet_state.has_value());
EXPECT_FALSE(packet_state->pending_transmission);
// Time for a retransmission.
fake_clock_.AdvanceTimeMilliseconds(kRttMs);
EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum));
packet_state = hist_.GetPacketState(kStartSeqNum);
ASSERT_TRUE(packet_state.has_value());
EXPECT_TRUE(packet_state->pending_transmission);
// Packet sent.
EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum));
// Too early for retransmission.
ASSERT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value());
// Retransmission allowed again, it's not in a pending state.
fake_clock_.AdvanceTimeMilliseconds(kRttMs);
packet_state = hist_.GetPacketState(kStartSeqNum);
ASSERT_TRUE(packet_state.has_value());
EXPECT_FALSE(packet_state->pending_transmission);
}
TEST_F(RtpPacketHistoryTest, DontRemovePendingTransmissions) {
const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2;
const int64_t kPacketTimeoutMs =
kRttMs * RtpPacketHistory::kMinPacketDurationRtt;
// Set size to remove old packets as soon as possible.
hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1);
hist_.SetRtt(kRttMs);
// Add a sent packet.
hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
// Advance clock to just before packet timeout.
fake_clock_.AdvanceTimeMilliseconds(kPacketTimeoutMs - 1);
// Mark as enqueued in pacer.
EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum));
// Advance clock to where packet would have timed out. It should still
// be there and pending.
fake_clock_.AdvanceTimeMilliseconds(1);
absl::optional<RtpPacketHistory::PacketState> packet_state =
hist_.GetPacketState(kStartSeqNum);
ASSERT_TRUE(packet_state.has_value());
EXPECT_TRUE(packet_state->pending_transmission);
// Packet sent. Now it can be removed.
EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum));
hist_.SetRtt(kRttMs); // Force culling of old packets.
packet_state = hist_.GetPacketState(kStartSeqNum);
ASSERT_FALSE(packet_state.has_value());
}
} // namespace webrtc

View File

@ -451,8 +451,8 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id) {
// don't retransmit too often.
absl::optional<RtpPacketHistory::PacketState> stored_packet =
packet_history_.GetPacketState(packet_id);
if (!stored_packet) {
// Packet not found.
if (!stored_packet || stored_packet->pending_transmission) {
// Packet not found or already queued for retransmission, ignore.
return 0;
}
@ -468,6 +468,12 @@ int32_t RTPSender::ReSendPacket(uint16_t packet_id) {
}
if (paced_sender_) {
// Mark packet as being in pacer queue again, to prevent duplicates.
if (!packet_history_.SetPendingTransmission(packet_id)) {
// Packet has already been removed from history, return early.
return 0;
}
// Convert from TickTime to Clock since capture_time_ms is based on
// TickTime.
int64_t corrected_capture_tims_ms =

View File

@ -797,23 +797,28 @@ TEST_P(RtpSenderTest, TrafficSmoothingRetransmits) {
EXPECT_TRUE(rtp_sender_->SendToNetwork(std::move(packet),
kAllowRetransmission,
RtpPacketSender::kNormalPriority));
// Immediately process send bucket and send packet.
rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false,
PacedPacketInfo());
EXPECT_EQ(1, transport_.packets_sent());
EXPECT_EQ(0, transport_.packets_sent());
EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority,
kSsrc, kSeqNum, _, _, _));
// Retransmit packet.
const int kStoredTimeInMs = 100;
fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs);
EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority,
kSsrc, kSeqNum, _, _, _));
EXPECT_CALL(mock_rtc_event_log_,
LogProxy(SameRtcEventTypeAs(RtcEvent::Type::RtpPacketOutgoing)));
EXPECT_EQ(static_cast<int>(packet_size), rtp_sender_->ReSendPacket(kSeqNum));
EXPECT_EQ(0, transport_.packets_sent());
EXPECT_EQ(1, transport_.packets_sent());
rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false,
PacedPacketInfo());
// Process send bucket. Packet should now be sent.
EXPECT_EQ(1, transport_.packets_sent());
EXPECT_EQ(2, transport_.packets_sent());
EXPECT_EQ(packet_size, transport_.last_sent_packet().size());
webrtc::RTPHeader rtp_header;