dcsctp: Abandon chunks consistently

The previous logic to abandon chunks when partial reliability was used
was a bit too eager and trigger happy.

 * Chunks with limited retransmissions should only be abandoned when a
   chunk is really considered lost. It should follow the same rules as
   for retransmitting chunks - that it must be nacked three times or
   due to a T3-RTX expiration. Before this change, a single SACK not
   referencing it would be enough to abandon it. This resulted in a lot
   of unnecessary sent FORWARD-TSN and undelivered messages - especially
   if running with zero retransmissions.

   The logic to expire chunks by limited retransmissions will now only
   be applied when a chunk is actually nacked.

 * The second partial reliability trigger - expiration time - wasn't
   evaluated when producing a middle chunk of a larger message.

A number of test cases were added and updated as chunks will now be
abandoned immediately instead of first scheduled for retransmission and
later abandoned.

Bug: webrtc:12961
Change-Id: I0ae17b2672568bdbdc32073a99d4c24b09ff5fe9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225548
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34458}
This commit is contained in:
Victor Boivie 2021-07-09 20:24:51 +02:00 committed by WebRTC LUCI CQ
parent 4e18a1fff3
commit bf15e567e8
4 changed files with 469 additions and 156 deletions

View File

@ -91,6 +91,33 @@ MATCHER_P(HasDataChunkWithStreamId, stream_id, "") {
return true;
}
MATCHER_P(HasDataChunkWithPPID, ppid, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
if (!packet.has_value()) {
*result_listener << "data didn't parse as an SctpPacket";
return false;
}
if (packet->descriptors()[0].type != DataChunk::kType) {
*result_listener << "the first chunk in the packet is not a data chunk";
return false;
}
absl::optional<DataChunk> dc =
DataChunk::Parse(packet->descriptors()[0].data);
if (!dc.has_value()) {
*result_listener << "The first chunk didn't parse as a data chunk";
return false;
}
if (dc->ppid() != ppid) {
*result_listener << "the ppid is " << *dc->ppid();
return false;
}
return true;
}
MATCHER_P(HasDataChunkWithSsn, ssn, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
if (!packet.has_value()) {
@ -1049,7 +1076,14 @@ TEST_F(DcSctpSocketTest, SendMessageWithLimitedRtx) {
// Third DATA
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
// Handle SACK
// Handle SACK for first DATA
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
// Handle delayed SACK for third DATA
AdvanceTime(options_.delayed_ack_max_timeout);
RunTimers();
// Handle SACK for second DATA
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
// Now the missing data chunk will be marked as nacked, but it might still be
@ -1065,11 +1099,7 @@ TEST_F(DcSctpSocketTest, SendMessageWithLimitedRtx) {
// FORWARD-TSN (third)
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
// The receiver might have moved into delayed ack mode.
AdvanceTime(options_.rto_initial);
RunTimers();
// Handle SACK
// Which will trigger a SACK
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
@ -1084,6 +1114,78 @@ TEST_F(DcSctpSocketTest, SendMessageWithLimitedRtx) {
EXPECT_FALSE(msg3.has_value());
}
TEST_F(DcSctpSocketTest, SendManyFragmentedMessagesWithLimitedRtx) {
ConnectSockets();
SendOptions send_options;
send_options.unordered = IsUnordered(true);
send_options.max_retransmissions = 0;
std::vector<uint8_t> payload(options_.mtu * 2 - 100 /* margin */);
// Sending first message
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(51), payload), send_options);
// Sending second message
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(52), payload), send_options);
// Sending third message
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), send_options);
// Sending fourth message
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(54), payload), send_options);
// First DATA, first fragment
std::vector<uint8_t> packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51)));
sock_z_.ReceivePacket(std::move(packet));
// First DATA, second fragment (lost)
packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51)));
// Second DATA, first fragment
packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52)));
sock_z_.ReceivePacket(std::move(packet));
// Second DATA, second fragment (lost)
packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52)));
EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
// Third DATA, first fragment
packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53)));
EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
sock_z_.ReceivePacket(std::move(packet));
// Third DATA, second fragment (lost)
packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53)));
EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
// Fourth DATA, first fragment
packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54)));
EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
sock_z_.ReceivePacket(std::move(packet));
// Fourth DATA, second fragment
packet = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54)));
EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
sock_z_.ReceivePacket(std::move(packet));
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
// Let the RTX timer expire, and exchange FORWARD-TSN/SACKs
AdvanceTime(options_.rto_initial);
RunTimers();
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg1.has_value());
EXPECT_EQ(msg1->ppid(), PPID(54));
ASSERT_FALSE(cb_z_.ConsumeReceivedMessage().has_value());
}
struct FakeChunkConfig : ChunkConfig {
static constexpr int kType = 0x49;
static constexpr size_t kHeaderSize = 4;

View File

@ -188,16 +188,8 @@ void RetransmissionQueue::NackBetweenAckBlocks(
for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
if (iter->first <= max_tsn_to_nack) {
if (iter->second.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data());
}
if (iter->second.Nack()) {
ack_info.has_packet_loss = true;
to_be_retransmitted_.insert(iter->first);
RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap()
<< " marked for retransmission";
}
ack_info.has_packet_loss =
NackItem(iter->first, iter->second, /*retransmit_now=*/false);
}
}
prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
@ -499,20 +491,11 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() {
// T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be
// marked for retransmission and sent as soon as cwnd allows (normally, when a
// SACK arrives)."
int count = 0;
for (auto& elem : outstanding_data_) {
UnwrappedTSN tsn = elem.first;
TxData& item = elem.second;
if (!item.is_acked()) {
if (item.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(item.data());
}
if (item.Nack(/*retransmit_now=*/true)) {
to_be_retransmitted_.insert(tsn);
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap()
<< " will be retransmitted due to T3-RTX";
++count;
}
NackItem(tsn, item, /*retransmit_now=*/true);
}
}
@ -524,12 +507,33 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() {
RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
<< " (" << old_cwnd << "), ssthresh=" << ssthresh_
<< ", rtx-packets=" << count << ", outstanding_bytes "
<< outstanding_bytes_ << " (" << old_outstanding_bytes
<< ")";
<< ", outstanding_bytes " << outstanding_bytes_ << " ("
<< old_outstanding_bytes << ")";
RTC_DCHECK(IsConsistent());
}
bool RetransmissionQueue::NackItem(UnwrappedTSN tsn,
TxData& item,
bool retransmit_now) {
if (item.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(item.data());
}
switch (item.Nack(retransmit_now)) {
case TxData::NackAction::kNothing:
return false;
case TxData::NackAction::kRetransmit:
to_be_retransmitted_.insert(tsn);
RTC_DLOG(LS_VERBOSE) << log_prefix_ << *tsn.Wrap()
<< " marked for retransmission";
break;
case TxData::NackAction::kAbandon:
AbandonAllFor(item);
break;
}
return true;
}
std::vector<std::pair<TSN, Data>>
RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) {
std::vector<std::pair<TSN, Data>> result;
@ -615,17 +619,35 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
UnwrappedTSN tsn = next_tsn_;
next_tsn_.Increment();
to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone());
// All chunks are always padded to be even divisible by 4.
size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
max_bytes -= chunk_size;
outstanding_bytes_ += chunk_size;
rwnd_ -= chunk_size;
outstanding_data_.emplace(
tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data),
chunk_opt->max_retransmissions, now,
chunk_opt->expires_at));
auto item_it =
outstanding_data_
.emplace(tsn,
RetransmissionQueue::TxData(
chunk_opt->data.Clone(),
partial_reliability_ ? chunk_opt->max_retransmissions
: absl::nullopt,
now,
partial_reliability_ ? chunk_opt->expires_at
: absl::nullopt))
.first;
if (item_it->second.has_expired(now)) {
// No need to send it - it was expired when it was in the send
// queue.
RTC_DLOG(LS_VERBOSE)
<< log_prefix_ << "Marking freshly produced chunk "
<< *item_it->first.Wrap() << " and message "
<< *item_it->second.data().message_id << " as expired";
AbandonAllFor(item_it->second);
} else {
to_be_sent.emplace_back(tsn.Wrap(), std::move(chunk_opt->data));
}
}
}
@ -684,7 +706,7 @@ bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) {
if (!partial_reliability_) {
return false;
}
ExpireChunks(now);
ExpireOutstandingChunks(now);
if (!outstanding_data_.empty()) {
auto it = outstanding_data_.begin();
return it->first == last_cumulative_tsn_ack_.next_value() &&
@ -699,15 +721,22 @@ void RetransmissionQueue::TxData::Ack() {
should_be_retransmitted_ = false;
}
bool RetransmissionQueue::TxData::Nack(bool retransmit_now) {
RetransmissionQueue::TxData::NackAction RetransmissionQueue::TxData::Nack(
bool retransmit_now) {
ack_state_ = AckState::kNacked;
++nack_count_;
if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) &&
!is_abandoned_) {
should_be_retransmitted_ = true;
return true;
// Nacked enough times - it's considered lost.
if (!max_retransmissions_.has_value() ||
num_retransmissions_ < max_retransmissions_) {
should_be_retransmitted_ = true;
return NackAction::kRetransmit;
}
Abandon();
return NackAction::kAbandon;
}
return false;
return NackAction::kNothing;
}
void RetransmissionQueue::TxData::Retransmit() {
@ -724,33 +753,24 @@ void RetransmissionQueue::TxData::Abandon() {
}
bool RetransmissionQueue::TxData::has_expired(TimeMs now) const {
if (ack_state_ != AckState::kAcked && !is_abandoned_) {
if (max_retransmissions_.has_value() &&
num_retransmissions_ >= *max_retransmissions_) {
return true;
} else if (expires_at_.has_value() && *expires_at_ <= now) {
return true;
}
}
return false;
return expires_at_.has_value() && *expires_at_ <= now;
}
void RetransmissionQueue::ExpireChunks(TimeMs now) {
void RetransmissionQueue::ExpireOutstandingChunks(TimeMs now) {
for (const auto& elem : outstanding_data_) {
UnwrappedTSN tsn = elem.first;
const TxData& item = elem.second;
// Chunks that are in-flight (possibly lost?), nacked or to be retransmitted
// can be expired easily. There is always a risk that a message is expired
// that was already received by the peer, but for which there haven't been
// a SACK received. But that's acceptable, and handled.
// Chunks that are nacked can be expired. Care should be taken not to expire
// unacked (in-flight) chunks as they might have been received, but the SACK
// is either delayed or in-flight and may be received later.
if (item.is_abandoned()) {
// Already abandoned.
} else if (item.has_expired(now)) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
<< " and message " << *item.data().message_id
<< " as expired";
ExpireAllFor(item);
} else if (item.is_nacked() && item.has_expired(now)) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking nacked chunk "
<< *tsn.Wrap() << " and message "
<< *item.data().message_id << " as expired";
AbandonAllFor(item);
} else {
// A non-expired chunk. No need to iterate any further.
break;
@ -758,7 +778,7 @@ void RetransmissionQueue::ExpireChunks(TimeMs now) {
}
}
void RetransmissionQueue::ExpireAllFor(
void RetransmissionQueue::AbandonAllFor(
const RetransmissionQueue::TxData& item) {
// Erase all remaining chunks from the producer, if any.
if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id,

View File

@ -143,6 +143,12 @@ class RetransmissionQueue {
// its associated metadata.
class TxData {
public:
enum class NackAction {
kNothing,
kRetransmit,
kAbandon,
};
explicit TxData(Data data,
absl::optional<size_t> max_retransmissions,
TimeMs time_sent,
@ -160,9 +166,10 @@ class RetransmissionQueue {
void Ack();
// Nacks an item. If it has been nacked enough times, or if `retransmit_now`
// is set, it might be marked for retransmission, which is indicated by the
// return value.
bool Nack(bool retransmit_now = false);
// is set, it might be marked for retransmission. If the item has reached
// its max retransmission value, it will instead be abandoned. The action
// performed is indicated as return value.
NackAction Nack(bool retransmit_now = false);
// Prepares the item to be retransmitted. Sets it as outstanding and
// clears all nack counters.
@ -173,6 +180,7 @@ class RetransmissionQueue {
bool is_outstanding() const { return ack_state_ == AckState::kUnacked; }
bool is_acked() const { return ack_state_ == AckState::kAcked; }
bool is_nacked() const { return ack_state_ == AckState::kNacked; }
bool is_abandoned() const { return is_abandoned_; }
// Indicates if this chunk should be retransmitted.
@ -264,6 +272,14 @@ class RetransmissionQueue {
// by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`.
void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info);
// Helper method to nack an item and perform the correct operations given the
// action indicated when nacking an item (e.g. retransmitting or abandoning).
// The return value indicate if an action was performed, meaning that packet
// loss was detected and acted upon.
bool NackItem(UnwrappedTSN cumulative_tsn_ack,
TxData& item,
bool retransmit_now);
// Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK
// as "acked" and update `ack_info` by adding new TSNs to `added_tsns`.
void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack,
@ -307,13 +323,13 @@ class RetransmissionQueue {
// is running.
void StartT3RtxTimerIfOutstandingData();
// Given the current time `now_ms`, expire chunks that have a limited
// lifetime.
void ExpireChunks(TimeMs now);
// Given that a message fragment, `item` has expired, expire all other
// fragments that share the same message - even never-before-sent fragments
// that are still in the SendQueue.
void ExpireAllFor(const RetransmissionQueue::TxData& item);
// Given the current time `now_ms`, expire and abandon outstanding (sent at
// least once) chunks that have a limited lifetime.
void ExpireOutstandingChunks(TimeMs now);
// Given that a message fragment, `item` has been abandoned, abandon all other
// fragments that share the same message - both never-before-sent fragments
// that are still in the SendQueue and outstanding chunks.
void AbandonAllFor(const RetransmissionQueue::TxData& item);
// Returns the current congestion control algorithm phase.
CongestionAlgorithmPhase phase() const {

View File

@ -378,14 +378,14 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
Pair(TSN(10), State::kInFlight)));
// Will force chunks to be retransmitted
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
Pair(TSN(10), State::kAbandoned)));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
@ -438,9 +438,9 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
// Retransmission 4 - not allowed.
queue.HandleT3RtxTimerExpiry();
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
queue.HandleT3RtxTimerExpiry();
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
@ -521,16 +521,11 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kToBeRetransmitted), //
Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(true));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
queue.HandleT3RtxTimerExpiry();
// NOTE: The TSN=13 represents the end fragment.
EXPECT_THAT(queue.GetChunkStatesForTesting(),
@ -539,6 +534,8 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
Pair(TSN(12), State::kAbandoned), //
Pair(TSN(13), State::kAbandoned)));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13));
EXPECT_THAT(forward_tsn.skipped_streams(),
@ -579,23 +576,19 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kToBeRetransmitted), //
Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
// NOTE: No additional TSN representing the end fragment, as that's TSN=12.
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned)));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
EXPECT_THAT(forward_tsn.skipped_streams(),
@ -657,22 +650,14 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
Pair(TSN(12), State::kNacked), //
Pair(TSN(13), State::kAcked)));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted), //
Pair(TSN(11), State::kToBeRetransmitted), //
Pair(TSN(12), State::kToBeRetransmitted), //
Pair(TSN(13), State::kAcked)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
.WillOnce(Return(true));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
@ -685,6 +670,8 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
Pair(TSN(15), State::kAbandoned), //
Pair(TSN(16), State::kAbandoned)));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn();
EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12));
EXPECT_THAT(
@ -891,61 +878,6 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
}
TEST_F(RetransmissionQueueTest, AccountsInflightAbandonedChunksAsOutstanding) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
// Discard the message while it was outstanding.
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned)));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
// Now ACK those, one at a time.
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 2u);
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 1u);
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u);
}
TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
@ -979,10 +911,10 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
// Mark the message as lost.
queue.HandleT3RtxTimerExpiry();
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
queue.HandleT3RtxTimerExpiry();
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
@ -1003,5 +935,248 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
EXPECT_EQ(queue.outstanding_bytes(), 0u);
}
TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
RetransmissionQueue queue = CreateQueue();
DataGeneratorOptions options;
options.stream_id = StreamID(17);
options.message_id = MID(42);
TimeMs test_start = now_;
EXPECT_CALL(producer_, Produce)
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", options));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "", options));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 24);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(17), MID(42)))
.WillOnce(Return(true));
now_ += DurationMs(100);
EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());
EXPECT_THAT(
queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), // Initial TSN
Pair(TSN(10), State::kAbandoned), // Produced
Pair(TSN(11), State::kAbandoned), // Produced and expired
Pair(TSN(12), State::kAbandoned))); // Placeholder end
}
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _), Pair(TSN(13), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight), //
Pair(TSN(13), State::kInFlight)));
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kNacked), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kInFlight), //
Pair(TSN(13), State::kInFlight)));
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kNacked), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kInFlight)));
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked)));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
}
TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
// This is a fairly long test.
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 2;
return dts;
})
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send,
ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _),
Pair(TSN(13), _), Pair(TSN(14), _), Pair(TSN(15), _),
Pair(TSN(16), _), Pair(TSN(17), _), Pair(TSN(18), _),
Pair(TSN(19), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight), //
Pair(TSN(13), State::kInFlight), //
Pair(TSN(14), State::kInFlight), //
Pair(TSN(15), State::kInFlight), //
Pair(TSN(16), State::kInFlight), //
Pair(TSN(17), State::kInFlight), //
Pair(TSN(18), State::kInFlight), //
Pair(TSN(19), State::kInFlight)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
// Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it.
for (int tsn = 11; tsn <= 13; ++tsn) {
queue.HandleSack(
now_,
SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
}
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked), //
Pair(TSN(14), State::kInFlight), //
Pair(TSN(15), State::kInFlight), //
Pair(TSN(16), State::kInFlight), //
Pair(TSN(17), State::kInFlight), //
Pair(TSN(18), State::kInFlight), //
Pair(TSN(19), State::kInFlight)));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _)));
// Ack TSN [14 to 16] - three more nacks - second and last retransmission.
for (int tsn = 14; tsn <= 16; ++tsn) {
queue.HandleSack(
now_,
SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
}
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kAcked), //
Pair(TSN(16), State::kAcked), //
Pair(TSN(17), State::kInFlight), //
Pair(TSN(18), State::kInFlight), //
Pair(TSN(19), State::kInFlight)));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _)));
// Ack TSN [17 to 18]
for (int tsn = 17; tsn <= 18; ++tsn) {
queue.HandleSack(
now_,
SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
}
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kNacked), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kAcked), //
Pair(TSN(16), State::kAcked), //
Pair(TSN(17), State::kAcked), //
Pair(TSN(18), State::kAcked), //
Pair(TSN(19), State::kInFlight)));
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
// Ack TSN 19 - three more nacks for TSN 10, no more retransmissions.
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {}));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kAcked), //
Pair(TSN(16), State::kAcked), //
Pair(TSN(17), State::kAcked), //
Pair(TSN(18), State::kAcked), //
Pair(TSN(19), State::kAcked)));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
} // namespace
} // namespace
} // namespace dcsctp