From bf15e567e83d4efa025a133937d25b3122062579 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Fri, 9 Jul 2021 20:24:51 +0200 Subject: [PATCH] dcsctp: Abandon chunks consistently The previous logic to abandon chunks when partial reliability was used was a bit too eager and trigger happy. * Chunks with limited retransmissions should only be abandoned when a chunk is really considered lost. It should follow the same rules as for retransmitting chunks - that it must be nacked three times or due to a T3-RTX expiration. Before this change, a single SACK not referencing it would be enough to abandon it. This resulted in a lot of unnecessary sent FORWARD-TSN and undelivered messages - especially if running with zero retransmissions. The logic to expire chunks by limited retransmissions will now only be applied when a chunk is actually nacked. * The second partial reliability trigger - expiration time - wasn't evaluated when producing a middle chunk of a larger message. A number of test cases were added and updated as chunks will now be abandoned immediately instead of first scheduled for retransmission and later abandoned. Bug: webrtc:12961 Change-Id: I0ae17b2672568bdbdc32073a99d4c24b09ff5fe9 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225548 Reviewed-by: Florent Castelli Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/master@{#34458} --- net/dcsctp/socket/dcsctp_socket_test.cc | 114 ++++++- net/dcsctp/tx/retransmission_queue.cc | 126 ++++---- net/dcsctp/tx/retransmission_queue.h | 36 ++- net/dcsctp/tx/retransmission_queue_test.cc | 349 ++++++++++++++++----- 4 files changed, 469 insertions(+), 156 deletions(-) diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index 4ab42491d8..7ca3d9b399 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -91,6 +91,33 @@ MATCHER_P(HasDataChunkWithStreamId, stream_id, "") { return true; } +MATCHER_P(HasDataChunkWithPPID, ppid, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != DataChunk::kType) { + *result_listener << "the first chunk in the packet is not a data chunk"; + return false; + } + + absl::optional dc = + DataChunk::Parse(packet->descriptors()[0].data); + if (!dc.has_value()) { + *result_listener << "The first chunk didn't parse as a data chunk"; + return false; + } + + if (dc->ppid() != ppid) { + *result_listener << "the ppid is " << *dc->ppid(); + return false; + } + + return true; +} + MATCHER_P(HasDataChunkWithSsn, ssn, "") { absl::optional packet = SctpPacket::Parse(arg); if (!packet.has_value()) { @@ -1049,7 +1076,14 @@ TEST_F(DcSctpSocketTest, SendMessageWithLimitedRtx) { // Third DATA sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); - // Handle SACK + // Handle SACK for first DATA + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + // Handle delayed SACK for third DATA + AdvanceTime(options_.delayed_ack_max_timeout); + RunTimers(); + + // Handle SACK for second DATA sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); // Now the missing data chunk will be marked as nacked, but it might still be @@ -1065,11 +1099,7 @@ TEST_F(DcSctpSocketTest, SendMessageWithLimitedRtx) { // FORWARD-TSN (third) sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); - // The receiver might have moved into delayed ack mode. - AdvanceTime(options_.rto_initial); - RunTimers(); - - // Handle SACK + // Which will trigger a SACK sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); absl::optional msg1 = cb_z_.ConsumeReceivedMessage(); @@ -1084,6 +1114,78 @@ TEST_F(DcSctpSocketTest, SendMessageWithLimitedRtx) { EXPECT_FALSE(msg3.has_value()); } +TEST_F(DcSctpSocketTest, SendManyFragmentedMessagesWithLimitedRtx) { + ConnectSockets(); + + SendOptions send_options; + send_options.unordered = IsUnordered(true); + send_options.max_retransmissions = 0; + std::vector payload(options_.mtu * 2 - 100 /* margin */); + // Sending first message + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(51), payload), send_options); + // Sending second message + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(52), payload), send_options); + // Sending third message + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), send_options); + // Sending fourth message + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(54), payload), send_options); + + // First DATA, first fragment + std::vector packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51))); + sock_z_.ReceivePacket(std::move(packet)); + + // First DATA, second fragment (lost) + packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51))); + + // Second DATA, first fragment + packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52))); + sock_z_.ReceivePacket(std::move(packet)); + + // Second DATA, second fragment (lost) + packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52))); + EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0))); + + // Third DATA, first fragment + packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53))); + EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0))); + sock_z_.ReceivePacket(std::move(packet)); + + // Third DATA, second fragment (lost) + packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53))); + EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0))); + + // Fourth DATA, first fragment + packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54))); + EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0))); + sock_z_.ReceivePacket(std::move(packet)); + + // Fourth DATA, second fragment + packet = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54))); + EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0))); + sock_z_.ReceivePacket(std::move(packet)); + + ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_); + + // Let the RTX timer expire, and exchange FORWARD-TSN/SACKs + AdvanceTime(options_.rto_initial); + RunTimers(); + ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_); + + absl::optional msg1 = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg1.has_value()); + EXPECT_EQ(msg1->ppid(), PPID(54)); + + ASSERT_FALSE(cb_z_.ConsumeReceivedMessage().has_value()); +} + struct FakeChunkConfig : ChunkConfig { static constexpr int kType = 0x49; static constexpr size_t kHeaderSize = 4; diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index ef2f0e3172..51bb65a30c 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -188,16 +188,8 @@ void RetransmissionQueue::NackBetweenAckBlocks( for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { if (iter->first <= max_tsn_to_nack) { - if (iter->second.is_outstanding()) { - outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data()); - } - - if (iter->second.Nack()) { - ack_info.has_packet_loss = true; - to_be_retransmitted_.insert(iter->first); - RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap() - << " marked for retransmission"; - } + ack_info.has_packet_loss = + NackItem(iter->first, iter->second, /*retransmit_now=*/false); } } prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); @@ -499,20 +491,11 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() { // T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be // marked for retransmission and sent as soon as cwnd allows (normally, when a // SACK arrives)." - int count = 0; for (auto& elem : outstanding_data_) { UnwrappedTSN tsn = elem.first; TxData& item = elem.second; if (!item.is_acked()) { - if (item.is_outstanding()) { - outstanding_bytes_ -= GetSerializedChunkSize(item.data()); - } - if (item.Nack(/*retransmit_now=*/true)) { - to_be_retransmitted_.insert(tsn); - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap() - << " will be retransmitted due to T3-RTX"; - ++count; - } + NackItem(tsn, item, /*retransmit_now=*/true); } } @@ -524,12 +507,33 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() { RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_ << " (" << old_cwnd << "), ssthresh=" << ssthresh_ - << ", rtx-packets=" << count << ", outstanding_bytes " - << outstanding_bytes_ << " (" << old_outstanding_bytes - << ")"; + << ", outstanding_bytes " << outstanding_bytes_ << " (" + << old_outstanding_bytes << ")"; RTC_DCHECK(IsConsistent()); } +bool RetransmissionQueue::NackItem(UnwrappedTSN tsn, + TxData& item, + bool retransmit_now) { + if (item.is_outstanding()) { + outstanding_bytes_ -= GetSerializedChunkSize(item.data()); + } + + switch (item.Nack(retransmit_now)) { + case TxData::NackAction::kNothing: + return false; + case TxData::NackAction::kRetransmit: + to_be_retransmitted_.insert(tsn); + RTC_DLOG(LS_VERBOSE) << log_prefix_ << *tsn.Wrap() + << " marked for retransmission"; + break; + case TxData::NackAction::kAbandon: + AbandonAllFor(item); + break; + } + return true; +} + std::vector> RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) { std::vector> result; @@ -615,17 +619,35 @@ std::vector> RetransmissionQueue::GetChunksToSend( UnwrappedTSN tsn = next_tsn_; next_tsn_.Increment(); - to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone()); // All chunks are always padded to be even divisible by 4. size_t chunk_size = GetSerializedChunkSize(chunk_opt->data); max_bytes -= chunk_size; outstanding_bytes_ += chunk_size; rwnd_ -= chunk_size; - outstanding_data_.emplace( - tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data), - chunk_opt->max_retransmissions, now, - chunk_opt->expires_at)); + auto item_it = + outstanding_data_ + .emplace(tsn, + RetransmissionQueue::TxData( + chunk_opt->data.Clone(), + partial_reliability_ ? chunk_opt->max_retransmissions + : absl::nullopt, + now, + partial_reliability_ ? chunk_opt->expires_at + : absl::nullopt)) + .first; + + if (item_it->second.has_expired(now)) { + // No need to send it - it was expired when it was in the send + // queue. + RTC_DLOG(LS_VERBOSE) + << log_prefix_ << "Marking freshly produced chunk " + << *item_it->first.Wrap() << " and message " + << *item_it->second.data().message_id << " as expired"; + AbandonAllFor(item_it->second); + } else { + to_be_sent.emplace_back(tsn.Wrap(), std::move(chunk_opt->data)); + } } } @@ -684,7 +706,7 @@ bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { if (!partial_reliability_) { return false; } - ExpireChunks(now); + ExpireOutstandingChunks(now); if (!outstanding_data_.empty()) { auto it = outstanding_data_.begin(); return it->first == last_cumulative_tsn_ack_.next_value() && @@ -699,15 +721,22 @@ void RetransmissionQueue::TxData::Ack() { should_be_retransmitted_ = false; } -bool RetransmissionQueue::TxData::Nack(bool retransmit_now) { +RetransmissionQueue::TxData::NackAction RetransmissionQueue::TxData::Nack( + bool retransmit_now) { ack_state_ = AckState::kNacked; ++nack_count_; if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) && !is_abandoned_) { - should_be_retransmitted_ = true; - return true; + // Nacked enough times - it's considered lost. + if (!max_retransmissions_.has_value() || + num_retransmissions_ < max_retransmissions_) { + should_be_retransmitted_ = true; + return NackAction::kRetransmit; + } + Abandon(); + return NackAction::kAbandon; } - return false; + return NackAction::kNothing; } void RetransmissionQueue::TxData::Retransmit() { @@ -724,33 +753,24 @@ void RetransmissionQueue::TxData::Abandon() { } bool RetransmissionQueue::TxData::has_expired(TimeMs now) const { - if (ack_state_ != AckState::kAcked && !is_abandoned_) { - if (max_retransmissions_.has_value() && - num_retransmissions_ >= *max_retransmissions_) { - return true; - } else if (expires_at_.has_value() && *expires_at_ <= now) { - return true; - } - } - return false; + return expires_at_.has_value() && *expires_at_ <= now; } -void RetransmissionQueue::ExpireChunks(TimeMs now) { +void RetransmissionQueue::ExpireOutstandingChunks(TimeMs now) { for (const auto& elem : outstanding_data_) { UnwrappedTSN tsn = elem.first; const TxData& item = elem.second; - // Chunks that are in-flight (possibly lost?), nacked or to be retransmitted - // can be expired easily. There is always a risk that a message is expired - // that was already received by the peer, but for which there haven't been - // a SACK received. But that's acceptable, and handled. + // Chunks that are nacked can be expired. Care should be taken not to expire + // unacked (in-flight) chunks as they might have been received, but the SACK + // is either delayed or in-flight and may be received later. if (item.is_abandoned()) { // Already abandoned. - } else if (item.has_expired(now)) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap() - << " and message " << *item.data().message_id - << " as expired"; - ExpireAllFor(item); + } else if (item.is_nacked() && item.has_expired(now)) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking nacked chunk " + << *tsn.Wrap() << " and message " + << *item.data().message_id << " as expired"; + AbandonAllFor(item); } else { // A non-expired chunk. No need to iterate any further. break; @@ -758,7 +778,7 @@ void RetransmissionQueue::ExpireChunks(TimeMs now) { } } -void RetransmissionQueue::ExpireAllFor( +void RetransmissionQueue::AbandonAllFor( const RetransmissionQueue::TxData& item) { // Erase all remaining chunks from the producer, if any. if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id, diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h index 7f5baf9fff..c5a6a04db8 100644 --- a/net/dcsctp/tx/retransmission_queue.h +++ b/net/dcsctp/tx/retransmission_queue.h @@ -143,6 +143,12 @@ class RetransmissionQueue { // its associated metadata. class TxData { public: + enum class NackAction { + kNothing, + kRetransmit, + kAbandon, + }; + explicit TxData(Data data, absl::optional max_retransmissions, TimeMs time_sent, @@ -160,9 +166,10 @@ class RetransmissionQueue { void Ack(); // Nacks an item. If it has been nacked enough times, or if `retransmit_now` - // is set, it might be marked for retransmission, which is indicated by the - // return value. - bool Nack(bool retransmit_now = false); + // is set, it might be marked for retransmission. If the item has reached + // its max retransmission value, it will instead be abandoned. The action + // performed is indicated as return value. + NackAction Nack(bool retransmit_now = false); // Prepares the item to be retransmitted. Sets it as outstanding and // clears all nack counters. @@ -173,6 +180,7 @@ class RetransmissionQueue { bool is_outstanding() const { return ack_state_ == AckState::kUnacked; } bool is_acked() const { return ack_state_ == AckState::kAcked; } + bool is_nacked() const { return ack_state_ == AckState::kNacked; } bool is_abandoned() const { return is_abandoned_; } // Indicates if this chunk should be retransmitted. @@ -264,6 +272,14 @@ class RetransmissionQueue { // by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`. void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info); + // Helper method to nack an item and perform the correct operations given the + // action indicated when nacking an item (e.g. retransmitting or abandoning). + // The return value indicate if an action was performed, meaning that packet + // loss was detected and acted upon. + bool NackItem(UnwrappedTSN cumulative_tsn_ack, + TxData& item, + bool retransmit_now); + // Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK // as "acked" and update `ack_info` by adding new TSNs to `added_tsns`. void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack, @@ -307,13 +323,13 @@ class RetransmissionQueue { // is running. void StartT3RtxTimerIfOutstandingData(); - // Given the current time `now_ms`, expire chunks that have a limited - // lifetime. - void ExpireChunks(TimeMs now); - // Given that a message fragment, `item` has expired, expire all other - // fragments that share the same message - even never-before-sent fragments - // that are still in the SendQueue. - void ExpireAllFor(const RetransmissionQueue::TxData& item); + // Given the current time `now_ms`, expire and abandon outstanding (sent at + // least once) chunks that have a limited lifetime. + void ExpireOutstandingChunks(TimeMs now); + // Given that a message fragment, `item` has been abandoned, abandon all other + // fragments that share the same message - both never-before-sent fragments + // that are still in the SendQueue and outstanding chunks. + void AbandonAllFor(const RetransmissionQueue::TxData& item); // Returns the current congestion control algorithm phase. CongestionAlgorithmPhase phase() const { diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc index e02b111b5a..4aa76d66e5 100644 --- a/net/dcsctp/tx/retransmission_queue_test.cc +++ b/net/dcsctp/tx/retransmission_queue_test.cc @@ -378,14 +378,14 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { Pair(TSN(10), State::kInFlight))); // Will force chunks to be retransmitted + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // - Pair(TSN(10), State::kToBeRetransmitted))); - - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(1); + Pair(TSN(10), State::kAbandoned))); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); @@ -438,9 +438,9 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); // Retransmission 4 - not allowed. - queue.HandleT3RtxTimerExpiry(); EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) .Times(1); + queue.HandleT3RtxTimerExpiry(); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty()); @@ -521,16 +521,11 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { // Chunk 10 is acked, but the remaining are lost queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); - queue.HandleT3RtxTimerExpiry(); - - EXPECT_THAT(queue.GetChunkStatesForTesting(), - ElementsAre(Pair(TSN(10), State::kAcked), // - Pair(TSN(11), State::kToBeRetransmitted), // - Pair(TSN(12), State::kToBeRetransmitted))); EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) .WillOnce(Return(true)); - EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + queue.HandleT3RtxTimerExpiry(); // NOTE: The TSN=13 represents the end fragment. EXPECT_THAT(queue.GetChunkStatesForTesting(), @@ -539,6 +534,8 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { Pair(TSN(12), State::kAbandoned), // Pair(TSN(13), State::kAbandoned))); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13)); EXPECT_THAT(forward_tsn.skipped_streams(), @@ -579,23 +576,19 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { // Chunk 10 is acked, but the remaining are lost queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); - queue.HandleT3RtxTimerExpiry(); - - EXPECT_THAT(queue.GetChunkStatesForTesting(), - ElementsAre(Pair(TSN(10), State::kAcked), // - Pair(TSN(11), State::kToBeRetransmitted), // - Pair(TSN(12), State::kToBeRetransmitted))); EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) .WillOnce(Return(false)); - EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); - // NOTE: No additional TSN representing the end fragment, as that's TSN=12. + queue.HandleT3RtxTimerExpiry(); + EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned))); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12)); EXPECT_THAT(forward_tsn.skipped_streams(), @@ -657,22 +650,14 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { Pair(TSN(12), State::kNacked), // Pair(TSN(13), State::kAcked))); - queue.HandleT3RtxTimerExpiry(); - - EXPECT_THAT(queue.GetChunkStatesForTesting(), - ElementsAre(Pair(TSN(9), State::kAcked), // - Pair(TSN(10), State::kToBeRetransmitted), // - Pair(TSN(11), State::kToBeRetransmitted), // - Pair(TSN(12), State::kToBeRetransmitted), // - Pair(TSN(13), State::kAcked))); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) .WillOnce(Return(true)); EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42))) .WillOnce(Return(true)); EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42))) .WillOnce(Return(true)); - EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // @@ -685,6 +670,8 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { Pair(TSN(15), State::kAbandoned), // Pair(TSN(16), State::kAbandoned))); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn(); EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12)); EXPECT_THAT( @@ -891,61 +878,6 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _))); } -TEST_F(RetransmissionQueueTest, AccountsInflightAbandonedChunksAsOutstanding) { - RetransmissionQueue queue = CreateQueue(); - EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); - dts.max_retransmissions = 0; - return dts; - }) - .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); - dts.max_retransmissions = 0; - return dts; - }) - .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "")); - dts.max_retransmissions = 0; - return dts; - }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); - - // Send and ack first chunk (TSN 10) - std::vector> chunks_to_send = - queue.GetChunksToSend(now_, 1000); - EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), - Pair(TSN(12), _))); - EXPECT_THAT(queue.GetChunkStatesForTesting(), - ElementsAre(Pair(TSN(9), State::kAcked), // - Pair(TSN(10), State::kInFlight), // - Pair(TSN(11), State::kInFlight), // - Pair(TSN(12), State::kInFlight))); - EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); - - // Discard the message while it was outstanding. - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(1); - EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); - - EXPECT_THAT(queue.GetChunkStatesForTesting(), - ElementsAre(Pair(TSN(9), State::kAcked), // - Pair(TSN(10), State::kAbandoned), // - Pair(TSN(11), State::kAbandoned), // - Pair(TSN(12), State::kAbandoned))); - EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); - - // Now ACK those, one at a time. - queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 2u); - - queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 1u); - - queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), 0u); -} - TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) @@ -979,10 +911,10 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); // Mark the message as lost. - queue.HandleT3RtxTimerExpiry(); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) .Times(1); + queue.HandleT3RtxTimerExpiry(); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunkStatesForTesting(), @@ -1003,5 +935,248 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { EXPECT_EQ(queue.outstanding_bytes(), 0u); } +TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { + RetransmissionQueue queue = CreateQueue(); + DataGeneratorOptions options; + options.stream_id = StreamID(17); + options.message_id = MID(42); + TimeMs test_start = now_; + EXPECT_CALL(producer_, Produce) + .WillOnce([&](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", options)); + dts.expires_at = TimeMs(test_start + DurationMs(10)); + return dts; + }) + .WillOnce([&](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "", options)); + dts.expires_at = TimeMs(test_start + DurationMs(10)); + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 24); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(17), MID(42))) + .WillOnce(Return(true)); + now_ += DurationMs(100); + + EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); + + EXPECT_THAT( + queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // Initial TSN + Pair(TSN(10), State::kAbandoned), // Produced + Pair(TSN(11), State::kAbandoned), // Produced and expired + Pair(TSN(12), State::kAbandoned))); // Placeholder end +} + +TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _), Pair(TSN(13), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight))); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(0); + + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kNacked), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight))); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kNacked), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kInFlight))); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(false)); + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked))); + + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); +} + +TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { + // This is a fairly long test. + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 2; + return dts; + }) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, + ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _), + Pair(TSN(13), _), Pair(TSN(14), _), Pair(TSN(15), _), + Pair(TSN(16), _), Pair(TSN(17), _), Pair(TSN(18), _), + Pair(TSN(19), _))); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight), // + Pair(TSN(14), State::kInFlight), // + Pair(TSN(15), State::kInFlight), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kInFlight), // + Pair(TSN(18), State::kInFlight), // + Pair(TSN(19), State::kInFlight))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(0); + + // Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it. + for (int tsn = 11; tsn <= 13; ++tsn) { + queue.HandleSack( + now_, + SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {})); + } + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kInFlight), // + Pair(TSN(15), State::kInFlight), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kInFlight), // + Pair(TSN(18), State::kInFlight), // + Pair(TSN(19), State::kInFlight))); + + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _))); + + // Ack TSN [14 to 16] - three more nacks - second and last retransmission. + for (int tsn = 14; tsn <= 16; ++tsn) { + queue.HandleSack( + now_, + SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {})); + } + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kAcked), // + Pair(TSN(17), State::kInFlight), // + Pair(TSN(18), State::kInFlight), // + Pair(TSN(19), State::kInFlight))); + + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _))); + + // Ack TSN [17 to 18] + for (int tsn = 17; tsn <= 18; ++tsn) { + queue.HandleSack( + now_, + SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {})); + } + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kNacked), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kAcked), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked), // + Pair(TSN(19), State::kInFlight))); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + // Ack TSN 19 - three more nacks for TSN 10, no more retransmissions. + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(false)); + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {})); + + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty()); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kAcked), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked), // + Pair(TSN(19), State::kAcked))); + + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); +} // namespace + } // namespace } // namespace dcsctp