diff --git a/net/dcsctp/packet/data.h b/net/dcsctp/packet/data.h index 23a5aa4616..f2d2e74904 100644 --- a/net/dcsctp/packet/data.h +++ b/net/dcsctp/packet/data.h @@ -64,7 +64,7 @@ struct Data { Data& operator=(Data&& other) = default; // Creates a copy of this `Data` object. - Data Clone() { + Data Clone() const { return Data(stream_id, ssn, message_id, fsn, ppid, payload, is_beginning, is_end, is_unordered); } diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index dd811eb0a1..47d665f139 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -52,6 +52,19 @@ rtc_library("retransmission_timeout") { ] } +rtc_library("retransmission_queue") { + deps = [ + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + ] + sources = [ + "retransmission_queue.cc", + "retransmission_queue.h", + ] +} + if (rtc_include_tests) { rtc_source_set("mock_send_queue") { testonly = true @@ -65,6 +78,7 @@ if (rtc_include_tests) { deps = [ ":fcfs_send_queue", ":retransmission_error_counter", + ":retransmission_queue", ":retransmission_timeout", "../../../api:array_view", "../../../rtc_base:checks", @@ -75,6 +89,7 @@ if (rtc_include_tests) { sources = [ "fcfs_send_queue_test.cc", "retransmission_error_counter_test.cc", + "retransmission_queue_test.cc", "retransmission_timeout_test.cc", ] } diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc new file mode 100644 index 0000000000..704e6ab16b --- /dev/null +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -0,0 +1,798 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_queue.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/math.h" +#include "net/dcsctp/common/pair_hash.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/common/str_join.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/logging.h" +#include "rtc_base/strings/string_builder.h" + +namespace dcsctp { +namespace { + +// The number of times a packet must be NACKed before it's retransmitted. +// See https://tools.ietf.org/html/rfc4960#section-7.2.4 +constexpr size_t kNumberOfNacksForRetransmission = 3; +} // namespace + +RetransmissionQueue::RetransmissionQueue( + absl::string_view log_prefix, + TSN initial_tsn, + size_t a_rwnd, + SendQueue& send_queue, + std::function on_new_rtt, + std::function on_send_queue_empty, + std::function on_clear_retransmission_counter, + Timer& t3_rtx, + const DcSctpOptions& options, + bool supports_partial_reliability, + bool use_message_interleaving) + : options_(options), + partial_reliability_(supports_partial_reliability), + log_prefix_(std::string(log_prefix) + "tx: "), + data_chunk_header_size_(use_message_interleaving + ? IDataChunk::kHeaderSize + : DataChunk::kHeaderSize), + on_new_rtt_(std::move(on_new_rtt)), + on_send_queue_empty_(std::move(on_send_queue_empty)), + on_clear_retransmission_counter_( + std::move(on_clear_retransmission_counter)), + t3_rtx_(t3_rtx), + cwnd_(options_.cwnd_mtus_initial * options_.mtu), + rwnd_(a_rwnd), + // https://tools.ietf.org/html/rfc4960#section-7.2.1 + // "The initial value of ssthresh MAY be arbitrarily high (for + // example, implementations MAY use the size of the receiver advertised + // window)."" + ssthresh_(rwnd_), + next_tsn_(tsn_unwrapper_.Unwrap(initial_tsn)), + last_cumulative_tsn_ack_(tsn_unwrapper_.Unwrap(TSN(*initial_tsn - 1))), + send_queue_(send_queue) {} + +// Returns how large a chunk will be, serialized, carrying the data +size_t RetransmissionQueue::GetSerializedChunkSize(const Data& data) const { + return RoundUpTo4(data_chunk_header_size_ + data.size()); +} + +void RetransmissionQueue::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, + AckInfo& ack_info) { + auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); + + for (auto it = outstanding_data_.begin(); it != first_unacked; ++it) { + ack_info.bytes_acked_by_cumulative_tsn_ack += it->second.data().size(); + ack_info.acked_tsns.push_back(it->first.Wrap()); + } + + outstanding_data_.erase(outstanding_data_.begin(), first_unacked); +} + +void RetransmissionQueue::AckGapBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + AckInfo& ack_info) { + // Mark all non-gaps as ACKED (but they can't be removed) as (from RFC) + // "SCTP considers the information carried in the Gap Ack Blocks in the + // SACK chunk as advisory.". Note that when NR-SACK is supported, this can be + // handled differently. + + for (auto& block : gap_ack_blocks) { + auto start = outstanding_data_.lower_bound( + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); + auto end = outstanding_data_.upper_bound( + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); + for (auto iter = start; iter != end; ++iter) { + if (iter->second.state() != State::kAcked) { + ack_info.bytes_acked_by_new_gap_ack_blocks += + iter->second.data().size(); + iter->second.SetState(State::kAcked); + ack_info.highest_tsn_acked = + std::max(ack_info.highest_tsn_acked, iter->first); + ack_info.acked_tsns.push_back(iter->first.Wrap()); + } + } + } +} + +void RetransmissionQueue::NackBetweenAckBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + AckInfo& ack_info) { + // Mark everything between the blocks as NACKED/TO_BE_RETRANSMITTED. + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "Mark the DATA chunk(s) with three miss indications for retransmission." + // "For each incoming SACK, miss indications are incremented only for + // missing TSNs prior to the highest TSN newly acknowledged in the SACK." + // + // What this means is that only when there is a increasing stream of data + // received and there are new packets seen (since last time), packets that are + // in-flight and between gaps should be nacked. This means that SCTP relies on + // the T3-RTX-timer to re-send packets otherwise. + UnwrappedTSN max_tsn_to_nack = ack_info.highest_tsn_acked; + if (is_in_fast_recovery() && cumulative_tsn_ack > last_cumulative_tsn_ack_) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "If an endpoint is in Fast Recovery and a SACK arrives that advances + // the Cumulative TSN Ack Point, the miss indications are incremented for + // all TSNs reported missing in the SACK." + max_tsn_to_nack = UnwrappedTSN::AddTo( + cumulative_tsn_ack, + gap_ack_blocks.empty() ? 0 : gap_ack_blocks.rbegin()->end); + } + + UnwrappedTSN prev_block_last_acked = cumulative_tsn_ack; + for (auto& block : gap_ack_blocks) { + UnwrappedTSN cur_block_first_acked = + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); + for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); + iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { + if (iter->first <= max_tsn_to_nack) { + iter->second.Nack(); + + if (iter->second.state() == State::kToBeRetransmitted) { + ack_info.has_packet_loss = true; + RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap() + << " marked for retransmission"; + } + } + } + prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); + } + + // Note that packets are not NACKED which are above the highest gap-ack-block + // (or above the cumulative ack TSN if no gap-ack-blocks) as only packets + // up until the highest_tsn_acked (see above) should be considered when + // NACKing. +} + +void RetransmissionQueue::MaybeExitFastRecovery( + UnwrappedTSN cumulative_tsn_ack) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "When a SACK acknowledges all TSNs up to and including this [fast + // recovery] exit point, Fast Recovery is exited." + if (fast_recovery_exit_tsn_.has_value() && + cumulative_tsn_ack >= *fast_recovery_exit_tsn_) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "exit_point=" << *fast_recovery_exit_tsn_->Wrap() + << " reached - exiting fast recovery"; + fast_recovery_exit_tsn_ = absl::nullopt; + } +} + +void RetransmissionQueue::HandleIncreasedCumulativeTsnAck( + size_t outstanding_bytes, + size_t total_bytes_acked) { + // Allow some margin for classifying as fully utilized, due to e.g. that too + // small packets (less than kMinimumFragmentedPayload) are not sent + + // overhead. + bool is_fully_utilized = outstanding_bytes + options_.mtu >= cwnd_; + size_t old_cwnd = cwnd_; + if (phase() == CongestionAlgorithmPhase::kSlowStart) { + if (is_fully_utilized && !is_in_fast_recovery()) { + // https://tools.ietf.org/html/rfc4960#section-7.2.1 + // "Only when these three conditions are met can the cwnd be + // increased; otherwise, the cwnd MUST not be increased. If these + // conditions are met, then cwnd MUST be increased by, at most, the + // lesser of 1) the total size of the previously outstanding DATA + // chunk(s) acknowledged, and 2) the destination's path MTU." + if (options_.slow_start_tcp_style) { + cwnd_ += std::min(total_bytes_acked, cwnd_); + } else { + cwnd_ += std::min(total_bytes_acked, options_.mtu); + } + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "SS increase cwnd=" << cwnd_ + << " (" << old_cwnd << ")"; + } + } else if (phase() == CongestionAlgorithmPhase::kCongestionAvoidance) { + // https://tools.ietf.org/html/rfc4960#section-7.2.2 + // "Whenever cwnd is greater than ssthresh, upon each SACK arrival + // that advances the Cumulative TSN Ack Point, increase + // partial_bytes_acked by the total number of bytes of all new chunks + // acknowledged in that SACK including chunks acknowledged by the new + // Cumulative TSN Ack and by Gap Ack Blocks." + size_t old_pba = partial_bytes_acked_; + partial_bytes_acked_ += total_bytes_acked; + + if (partial_bytes_acked_ >= cwnd_ && is_fully_utilized) { + // https://tools.ietf.org/html/rfc4960#section-7.2.2 + // "When partial_bytes_acked is equal to or greater than cwnd and + // before the arrival of the SACK the sender had cwnd or more bytes of + // data outstanding (i.e., before arrival of the SACK, flightsize was + // greater than or equal to cwnd), increase cwnd by MTU, and reset + // partial_bytes_acked to (partial_bytes_acked - cwnd)." + cwnd_ += options_.mtu; + partial_bytes_acked_ -= cwnd_; + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA increase cwnd=" << cwnd_ + << " (" << old_cwnd << ") ssthresh=" << ssthresh_ + << ", pba=" << partial_bytes_acked_ << " (" + << old_pba << ")"; + } else { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA unchanged cwnd=" << cwnd_ + << " (" << old_cwnd << ") ssthresh=" << ssthresh_ + << ", pba=" << partial_bytes_acked_ << " (" + << old_pba << ")"; + } + } +} + +void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) { + if (!is_in_fast_recovery()) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "If not in Fast Recovery, adjust the ssthresh and cwnd of the + // destination address(es) to which the missing DATA chunks were last + // sent, according to the formula described in Section 7.2.3." + size_t old_cwnd = cwnd_; + size_t old_pba = partial_bytes_acked_; + ssthresh_ = std::max(cwnd_ / 2, options_.cwnd_mtus_min * options_.mtu); + cwnd_ = ssthresh_; + partial_bytes_acked_ = 0; + + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "packet loss detected (not fast recovery). cwnd=" + << cwnd_ << " (" << old_cwnd + << "), ssthresh=" << ssthresh_ + << ", pba=" << partial_bytes_acked_ << " (" << old_pba + << ")"; + + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "If not in Fast Recovery, enter Fast Recovery and mark the highest + // outstanding TSN as the Fast Recovery exit point." + fast_recovery_exit_tsn_ = outstanding_data_.empty() + ? last_cumulative_tsn_ack_ + : outstanding_data_.rbegin()->first; + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "fast recovery initiated with exit_point=" + << *fast_recovery_exit_tsn_->Wrap(); + } else { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "While in Fast Recovery, the ssthresh and cwnd SHOULD NOT change for + // any destinations due to a subsequent Fast Recovery event (i.e., one + // SHOULD NOT reduce the cwnd further due to a subsequent Fast Retransmit)." + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "packet loss detected (fast recovery). No changes."; + } +} + +void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) { + rwnd_ = outstanding_bytes_ >= a_rwnd ? 0 : a_rwnd - outstanding_bytes_; +} + +void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() { + // Note: Can't use `outstanding_bytes()` as that one doesn't count chunks to + // be retransmitted. + if (outstanding_data_.empty()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Whenever all outstanding data sent to an address have been + // acknowledged, turn off the T3-rtx timer of that address. + // Note: Already stopped in `StopT3RtxTimerOnIncreasedCumulativeTsnAck`." + } else { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Whenever a SACK is received that acknowledges the DATA chunk + // with the earliest outstanding TSN for that address, restart the T3-rtx + // timer for that address with its current RTO (if there is still + // outstanding data on that address)." + // "Whenever a SACK is received missing a TSN that was previously + // acknowledged via a Gap Ack Block, start the T3-rtx for the destination + // address to which the DATA chunk was originally transmitted if it is not + // already running." + if (!t3_rtx_.is_running()) { + t3_rtx_.Start(); + } + } +} + +bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const { + // https://tools.ietf.org/html/rfc4960#section-6.2.1 + // "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point, + // then drop the SACK. Since Cumulative TSN Ack is monotonically increasing, + // a SACK whose Cumulative TSN Ack is less than the Cumulative TSN Ack Point + // indicates an out-of- order SACK." + // + // Note: Important not to drop SACKs with identical TSN to that previously + // received, as the gap ack blocks or dup tsn fields may have changed. + UnwrappedTSN cumulative_tsn_ack = + tsn_unwrapper_.PeekUnwrap(sack.cumulative_tsn_ack()); + if (cumulative_tsn_ack < last_cumulative_tsn_ack_) { + // https://tools.ietf.org/html/rfc4960#section-6.2.1 + // "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point, + // then drop the SACK. Since Cumulative TSN Ack is monotonically + // increasing, a SACK whose Cumulative TSN Ack is less than the Cumulative + // TSN Ack Point indicates an out-of- order SACK." + return false; + } else if (outstanding_data_.empty() && + cumulative_tsn_ack > last_cumulative_tsn_ack_) { + // No in-flight data and cum-tsn-ack above what was last ACKed - not valid. + return false; + } else if (!outstanding_data_.empty() && + cumulative_tsn_ack > outstanding_data_.rbegin()->first) { + // There is in-flight data, but the cum-tsn-ack is beyond that - not valid. + return false; + } + return true; +} + +bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { + if (!IsSackValid(sack)) { + return false; + } + + size_t old_outstanding_bytes = outstanding_bytes_; + size_t old_rwnd = rwnd_; + UnwrappedTSN cumulative_tsn_ack = + tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack()); + + if (sack.gap_ack_blocks().empty()) { + UpdateRTT(now, cumulative_tsn_ack); + } + + AckInfo ack_info(cumulative_tsn_ack); + // Erase all items up to cumulative_tsn_ack. + RemoveAcked(cumulative_tsn_ack, ack_info); + + // ACK packets reported in the gap ack blocks + AckGapBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info); + + // NACK and possibly mark for retransmit chunks that weren't acked. + NackBetweenAckBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info); + + RecalculateOutstandingBytes(); + // Update of outstanding_data_ is now done. Congestion control remains. + UpdateReceiverWindow(sack.a_rwnd()); + + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK. Acked TSN: " + << StrJoin(ack_info.acked_tsns, ",", + [](rtc::StringBuilder& sb, TSN tsn) { + sb << *tsn; + }) + << ", cum_tsn_ack=" << *cumulative_tsn_ack.Wrap() << " (" + << *last_cumulative_tsn_ack_.Wrap() + << "), outstanding_bytes=" << outstanding_bytes_ << " (" + << old_outstanding_bytes << "), rwnd=" << rwnd_ << " (" + << old_rwnd << ")"; + + MaybeExitFastRecovery(cumulative_tsn_ack); + + if (cumulative_tsn_ack > last_cumulative_tsn_ack_) { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Whenever a SACK is received that acknowledges the DATA chunk + // with the earliest outstanding TSN for that address, restart the T3-rtx + // timer for that address with its current RTO (if there is still + // outstanding data on that address)." + // Note: It may be started again in a bit further down. + t3_rtx_.Stop(); + + HandleIncreasedCumulativeTsnAck( + old_outstanding_bytes, ack_info.bytes_acked_by_cumulative_tsn_ack + + ack_info.bytes_acked_by_new_gap_ack_blocks); + } + + if (ack_info.has_packet_loss) { + is_in_fast_retransmit_ = true; + HandlePacketLoss(ack_info.highest_tsn_acked); + } + + // https://tools.ietf.org/html/rfc4960#section-8.2 + // "When an outstanding TSN is acknowledged [...] the endpoint shall clear + // the error counter ..." + if (ack_info.bytes_acked_by_cumulative_tsn_ack > 0 || + ack_info.bytes_acked_by_new_gap_ack_blocks > 0) { + on_clear_retransmission_counter_(); + } + + last_cumulative_tsn_ack_ = cumulative_tsn_ack; + StartT3RtxTimerIfOutstandingData(); + return true; +} + +void RetransmissionQueue::UpdateRTT(TimeMs now, + UnwrappedTSN cumulative_tsn_ack) { + // RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C, + // Halvorsen P (2006) Considerations of SCTP retransmission delays for thin + // streams. + // Due to delayed acknowledgement, the SACK may be sent much later which + // increases the calculated RTT. + // TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and + // use only those packets for measurement. + + auto it = outstanding_data_.find(cumulative_tsn_ack); + if (it != outstanding_data_.end()) { + if (!it->second.has_been_retransmitted()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is ambiguous + // whether the reply was for the first instance of the chunk or for a + // later instance)" + DurationMs rtt = now - it->second.time_sent(); + on_new_rtt_(rtt); + } + } +} + +void RetransmissionQueue::RecalculateOutstandingBytes() { + outstanding_bytes_ = absl::c_accumulate( + outstanding_data_, 0, + [&](size_t r, const std::pair& d) { + // Packets that have been ACKED or NACKED are not outstanding, as they + // are received. And packets that are marked for retransmission or + // abandoned are lost, and not outstanding. + return r + (d.second.state() == State::kInFlight + ? GetSerializedChunkSize(d.second.data()) + : 0); + }); +} + +void RetransmissionQueue::HandleT3RtxTimerExpiry() { + size_t old_cwnd = cwnd_; + size_t old_outstanding_bytes = outstanding_bytes_; + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "For the destination address for which the timer expires, adjust + // its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU." + ssthresh_ = std::max(cwnd_ / 2, 4 * options_.mtu); + cwnd_ = 1 * options_.mtu; + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "For the destination address for which the timer expires, set RTO + // <- RTO * 2 ("back off the timer"). The maximum value discussed in rule C7 + // above (RTO.max) may be used to provide an upper bound to this doubling + // operation." + + // Already done by the Timer implementation. + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "Determine how many of the earliest (i.e., lowest TSN) outstanding + // DATA chunks for the address for which the T3-rtx has expired will fit into + // a single packet" + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "Note: Any DATA chunks that were sent to the address for which the + // T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be + // marked for retransmission and sent as soon as cwnd allows (normally, when a + // SACK arrives)." + int count = 0; + for (auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + TxData& item = elem.second; + if (item.state() == State::kInFlight || item.state() == State::kNacked) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap() + << " will be retransmitted due to T3-RTX"; + item.SetState(State::kToBeRetransmitted); + ++count; + } + } + + // Marking some packets as retransmitted changes outstanding bytes. + RecalculateOutstandingBytes(); + + // https://tools.ietf.org/html/rfc4960#section-6.3.3 + // "Start the retransmission timer T3-rtx on the destination address + // to which the retransmission is sent, if rule R1 above indicates to do so." + + // Already done by the Timer implementation. + + RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_ + << " (" << old_cwnd << "), ssthresh=" << ssthresh_ + << ", rtx-packets=" << count << ", outstanding_bytes " + << outstanding_bytes_ << " (" << old_outstanding_bytes + << ")"; +} + +std::vector> +RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) { + std::vector> result; + for (auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + TxData& item = elem.second; + + size_t serialized_size = GetSerializedChunkSize(item.data()); + if (item.state() == State::kToBeRetransmitted && + serialized_size <= max_size) { + item.Retransmit(); + result.emplace_back(tsn.Wrap(), item.data().Clone()); + max_size -= serialized_size; + } + // No point in continuing if the packet is full. + if (max_size <= data_chunk_header_size_) { + break; + } + } + // As some chunks may have switched state, that needs to be reflected here. + if (!result.empty()) { + RecalculateOutstandingBytes(); + } + return result; +} + +std::vector> RetransmissionQueue::GetChunksToSend( + TimeMs now, + size_t bytes_remaining_in_packet) { + // Chunks are always padded to even divisible by four. + RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet)); + + std::vector> to_be_sent; + size_t old_outstanding_bytes = outstanding_bytes_; + size_t old_rwnd = rwnd_; + if (is_in_fast_retransmit()) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "Determine how many of the earliest (i.e., lowest TSN) DATA chunks + // marked for retransmission will fit into a single packet ... Retransmit + // those K DATA chunks in a single packet. When a Fast Retransmit is being + // performed, the sender SHOULD ignore the value of cwnd and SHOULD NOT + // delay retransmission for this single packet." + is_in_fast_retransmit_ = false; + to_be_sent = GetChunksToBeRetransmitted(bytes_remaining_in_packet); + size_t to_be_sent_bytes = absl::c_accumulate( + to_be_sent, 0, [&](size_t r, const std::pair& d) { + return r + GetSerializedChunkSize(d.second); + }); + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "fast-retransmit: sending " + << to_be_sent.size() << " chunks, " << to_be_sent_bytes + << " bytes"; + } else { + // Normal sending. Calculate the bandwidth budget (how many bytes that is + // allowed to be sent), and fill that up first with chunks that are + // scheduled to be retransmitted. If there is still budget, send new chunks + // (which will have their TSN assigned here.) + size_t remaining_cwnd_bytes = + outstanding_bytes_ >= cwnd_ ? 0 : cwnd_ - outstanding_bytes_; + size_t max_bytes = RoundDownTo4(std::min( + std::min(bytes_remaining_in_packet, rwnd()), remaining_cwnd_bytes)); + + to_be_sent = GetChunksToBeRetransmitted(max_bytes); + max_bytes -= absl::c_accumulate( + to_be_sent, 0, [&](size_t r, const std::pair& d) { + return r + GetSerializedChunkSize(d.second); + }); + + while (max_bytes > data_chunk_header_size_) { + RTC_DCHECK(IsDivisibleBy4(max_bytes)); + absl::optional chunk_opt = + send_queue_.Produce(now, max_bytes - data_chunk_header_size_); + if (!chunk_opt.has_value()) { + on_send_queue_empty_(); + break; + } + + UnwrappedTSN tsn = next_tsn_; + next_tsn_.Increment(); + to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone()); + + // All chunks are always padded to be even divisible by 4. + size_t chunk_size = GetSerializedChunkSize(chunk_opt->data); + max_bytes -= chunk_size; + outstanding_bytes_ += chunk_size; + rwnd_ -= chunk_size; + outstanding_data_.emplace( + tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data), + chunk_opt->max_retransmissions, now, + chunk_opt->expires_at)); + } + } + + if (!to_be_sent.empty()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.2 + // "Every time a DATA chunk is sent to any address (including a + // retransmission), if the T3-rtx timer of that address is not running, + // start it running so that it will expire after the RTO of that address." + if (!t3_rtx_.is_running()) { + t3_rtx_.Start(); + } + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Sending TSN " + << StrJoin(to_be_sent, ",", + [&](rtc::StringBuilder& sb, + const std::pair& c) { + sb << *c.first; + }) + << " - " + << absl::c_accumulate( + to_be_sent, 0, + [&](size_t r, const std::pair& d) { + return r + GetSerializedChunkSize(d.second); + }) + << " bytes. outstanding_bytes=" << outstanding_bytes_ + << " (" << old_outstanding_bytes << "), cwnd=" << cwnd_ + << ", rwnd=" << rwnd_ << " (" << old_rwnd << ")"; + } + return to_be_sent; +} + +std::vector> +RetransmissionQueue::GetChunkStatesForTesting() const { + std::vector> states; + states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); + for (const auto& elem : outstanding_data_) { + states.emplace_back(elem.first.Wrap(), elem.second.state()); + } + return states; +} + +bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { + if (!partial_reliability_) { + return false; + } + ExpireChunks(now); + if (!outstanding_data_.empty()) { + auto it = outstanding_data_.begin(); + return it->first == last_cumulative_tsn_ack_.next_value() && + it->second.state() == State::kAbandoned; + } + return false; +} + +void RetransmissionQueue::TxData::Nack() { + ++nack_count_; + if (nack_count_ >= kNumberOfNacksForRetransmission) { + state_ = State::kToBeRetransmitted; + } else { + state_ = State::kNacked; + } +} + +void RetransmissionQueue::TxData::Retransmit() { + state_ = State::kInFlight; + nack_count_ = 0; + ++num_retransmissions_; +} + +bool RetransmissionQueue::TxData::has_expired(TimeMs now) const { + if (state_ != State::kAcked && state_ != State::kAbandoned) { + if (max_retransmissions_.has_value() && + num_retransmissions_ >= *max_retransmissions_) { + return true; + } else if (expires_at_.has_value() && *expires_at_ <= now) { + return true; + } + } + return false; +} + +void RetransmissionQueue::ExpireChunks(TimeMs now) { + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const TxData& item = elem.second; + + // Chunks that are in-flight (possibly lost?), nacked or to be retransmitted + // can be expired easily. There is always a risk that a message is expired + // that was already received by the peer, but for which there haven't been + // a SACK received. But that's acceptable, and handled. + if (item.has_expired(now)) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap() + << " and message " << *item.data().message_id + << " as expired"; + ExpireAllFor(item); + } + } +} + +void RetransmissionQueue::ExpireAllFor( + const RetransmissionQueue::TxData& item) { + // Erase all remaining chunks from the producer, if any. + send_queue_.Discard(item.data().is_unordered, item.data().stream_id, + item.data().message_id); + for (auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + TxData& other = elem.second; + + if (other.state() != State::kAbandoned && + other.data().stream_id == item.data().stream_id && + other.data().is_unordered == item.data().is_unordered && + other.data().message_id == item.data().message_id) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap() + << " as abandoned"; + other.SetState(State::kAbandoned); + } + } +} + +ForwardTsnChunk RetransmissionQueue::CreateForwardTsn() const { + std::unordered_map + skipped_per_ordered_stream; + UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; + + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const TxData& item = elem.second; + + if ((tsn != new_cumulative_ack.next_value()) || + item.state() != State::kAbandoned) { + break; + } + new_cumulative_ack = tsn; + if (!item.data().is_unordered && + item.data().ssn > skipped_per_ordered_stream[item.data().stream_id]) { + skipped_per_ordered_stream[item.data().stream_id] = item.data().ssn; + } + } + + std::vector skipped_streams; + skipped_streams.reserve(skipped_per_ordered_stream.size()); + for (const auto& elem : skipped_per_ordered_stream) { + skipped_streams.emplace_back(elem.first, elem.second); + } + return ForwardTsnChunk(new_cumulative_ack.Wrap(), std::move(skipped_streams)); +} + +IForwardTsnChunk RetransmissionQueue::CreateIForwardTsn() const { + std::unordered_map, MID, UnorderedStreamHash> + skipped_per_stream; + UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; + + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const TxData& item = elem.second; + + if ((tsn != new_cumulative_ack.next_value()) || + item.state() != State::kAbandoned) { + break; + } + new_cumulative_ack = tsn; + std::pair stream_id = + std::make_pair(item.data().is_unordered, item.data().stream_id); + + if (item.data().message_id > skipped_per_stream[stream_id]) { + skipped_per_stream[stream_id] = item.data().message_id; + } + } + + std::vector skipped_streams; + skipped_streams.reserve(skipped_per_stream.size()); + for (const auto& elem : skipped_per_stream) { + const std::pair& stream = elem.first; + MID message_id = elem.second; + skipped_streams.emplace_back(stream.first, stream.second, message_id); + } + + return IForwardTsnChunk(new_cumulative_ack.Wrap(), + std::move(skipped_streams)); +} + +void RetransmissionQueue::PrepareResetStreams( + rtc::ArrayView streams) { + // TODO(boivie): These calls are now only affecting the send queue. The + // packet buffer can also change behavior - for example draining the chunk + // producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request" + // can be sent quickly, with a known `sender_last_assigned_tsn`. + send_queue_.PrepareResetStreams(streams); +} +bool RetransmissionQueue::CanResetStreams() const { + return send_queue_.CanResetStreams(); +} +void RetransmissionQueue::CommitResetStreams() { + send_queue_.CommitResetStreams(); +} +void RetransmissionQueue::RollbackResetStreams() { + send_queue_.RollbackResetStreams(); +} + +} // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h new file mode 100644 index 0000000000..c2599a438d --- /dev/null +++ b/net/dcsctp/tx/retransmission_queue.h @@ -0,0 +1,345 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_ +#define NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_ + +#include +#include +#include +#include +#include +#include + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/retransmission_timeout.h" +#include "net/dcsctp/tx/send_queue.h" + +namespace dcsctp { + +// The RetransmissionQueue manages all DATA/I-DATA chunks that are in-flight and +// schedules them to be retransmitted if necessary. Chunks are retransmitted +// when they have been lost for a number of consecutive SACKs, or when the +// retransmission timer, `t3_rtx` expires. +// +// As congestion control is tightly connected with the state of transmitted +// packets, that's also managed here to limit the amount of data that is +// in-flight (sent, but not yet acknowledged). +class RetransmissionQueue { + public: + static constexpr size_t kMinimumFragmentedPayload = 10; + // State for DATA chunks (message fragments) in the queue. + enum class State { + // The chunk has been sent but not received yet (from the sender's point of + // view, as no SACK has been received yet that reference this chunk). + kInFlight, + // A SACK has been received which explicitly marked this chunk as missing - + // it's now NACKED and may be retransmitted if NACKED enough times. + kNacked, + // A chunk that will be retransmitted when possible. + kToBeRetransmitted, + // A SACK has been received which explicitly marked this chunk as received. + kAcked, + // A chunk whose message has expired or has been retransmitted too many + // times (RFC3758). It will not be retransmitted anymore. + kAbandoned, + }; + + // Creates a RetransmissionQueue which will send data using `initial_tsn` as + // the first TSN to use for sent fragments. It will poll data from + // `send_queue` and call `on_send_queue_empty` when it is empty. When + // SACKs are received, it will estimate the RTT, and call `on_new_rtt`. When + // an outstanding chunk has been ACKed, it will call + // `on_clear_retransmission_counter` and will also use `t3_rtx`, which is the + // SCTP retransmission timer to manage retransmissions. + RetransmissionQueue(absl::string_view log_prefix, + TSN initial_tsn, + size_t a_rwnd, + SendQueue& send_queue, + std::function on_new_rtt, + std::function on_send_queue_empty, + std::function on_clear_retransmission_counter, + Timer& t3_rtx, + const DcSctpOptions& options, + bool supports_partial_reliability = true, + bool use_message_interleaving = false); + + // Handles a received SACK. Returns true if the `sack` was processed and + // false if it was discarded due to received out-of-order and not relevant. + bool HandleSack(TimeMs now, const SackChunk& sack); + + // Handles an expired retransmission timer. + void HandleT3RtxTimerExpiry(); + + // Returns a list of chunks to send that would fit in one SCTP packet with + // `bytes_remaining_in_packet` bytes available. This may be further limited by + // the congestion control windows. Note that `ShouldSendForwardTSN` must be + // called prior to this method, to abandon expired chunks, as this method will + // not expire any chunks. + std::vector> GetChunksToSend( + TimeMs now, + size_t bytes_remaining_in_packet); + + // Returns the internal state of all queued chunks. This is only used in + // unit-tests. + std::vector> GetChunkStatesForTesting() const; + + // Returns the next TSN that will be allocated for sent DATA chunks. + TSN next_tsn() const { return next_tsn_.Wrap(); } + + // Returns the size of the congestion window, in bytes. This is the number of + // bytes that may be in-flight. + size_t cwnd() const { return cwnd_; } + + // Overrides the current congestion window size. + void set_cwnd(size_t cwnd) { cwnd_ = cwnd; } + + // Returns the current receiver window size. + size_t rwnd() const { return rwnd_; } + + // Returns the number of bytes of packets that are in-flight. + size_t outstanding_bytes() const { return outstanding_bytes_; } + + // Given the current time `now`, it will evaluate if there are chunks that + // have expired and that need to be discarded. It returns true if a + // FORWARD-TSN should be sent. + bool ShouldSendForwardTsn(TimeMs now); + + // Creates a FORWARD-TSN chunk. + ForwardTsnChunk CreateForwardTsn() const; + + // Creates an I-FORWARD-TSN chunk. + IForwardTsnChunk CreateIForwardTsn() const; + + // See the SendQueue for a longer description of these methods related + // to stream resetting. + void PrepareResetStreams(rtc::ArrayView streams); + bool CanResetStreams() const; + void CommitResetStreams(); + void RollbackResetStreams(); + + private: + enum class CongestionAlgorithmPhase { + kSlowStart, + kCongestionAvoidance, + }; + + // A fragmented message's DATA chunk while in the retransmission queue, and + // its associated metadata. + class TxData { + public: + explicit TxData(Data data, + absl::optional max_retransmissions, + TimeMs time_sent, + absl::optional expires_at) + : max_retransmissions_(max_retransmissions), + time_sent_(time_sent), + expires_at_(expires_at), + data_(std::move(data)) {} + + TimeMs time_sent() const { return time_sent_; } + + State state() const { return state_; } + void SetState(State state) { state_ = state; } + + const Data& data() const { return data_; } + + // Nacks an item. If it has been nacked enough times, it will be marked for + // retransmission. + void Nack(); + void Retransmit(); + + bool has_been_retransmitted() { return num_retransmissions_ > 0; } + + // Given the current time, and the current state of this DATA chunk, it will + // indicate if it has expired (SCTP Partial Reliability Extension). + bool has_expired(TimeMs now) const; + + private: + State state_ = State::kInFlight; + // The number of times the DATA chunk has been nacked (by having received a + // SACK which doesn't include it). Will be cleared on retransmissions. + size_t nack_count_ = 0; + // The number of times the DATA chunk has been retransmitted. + size_t num_retransmissions_ = 0; + // If the message was sent with a maximum number of retransmissions, this is + // set to that number. The value zero (0) means that it will never be + // retransmitted. + const absl::optional max_retransmissions_; + // When the packet was sent, and placed in this queue. + const TimeMs time_sent_; + // If the message was sent with an expiration time, this is set. + const absl::optional expires_at_; + // The actual data to send/retransmit. + Data data_; + }; + + // Contains variables scoped to a processing of an incoming SACK. + struct AckInfo { + explicit AckInfo(UnwrappedTSN cumulative_tsn_ack) + : highest_tsn_acked(cumulative_tsn_ack) {} + + // All TSNs that have been acked (for the first time) in this SACK. + std::vector acked_tsns; + + // Bytes acked by increasing cumulative_tsn_ack in this SACK + size_t bytes_acked_by_cumulative_tsn_ack = 0; + + // Bytes acked by gap blocks in this SACK. + size_t bytes_acked_by_new_gap_ack_blocks = 0; + + // Indicates if this SACK indicates that packet loss has occurred. Just + // because a packet is missing in the SACK doesn't necessarily mean that + // there is packet loss as that packet might be in-flight and received + // out-of-order. But when it has been reported missing consecutive times, it + // will eventually be considered "lost" and this will be set. + bool has_packet_loss = false; + + // Highest TSN Newly Acknowledged, an SCTP variable. + UnwrappedTSN highest_tsn_acked; + }; + + // Returns how large a chunk will be, serialized, carrying the data + size_t GetSerializedChunkSize(const Data& data) const; + + // Indicates if the congestion control algorithm is in "fast recovery". + bool is_in_fast_recovery() const { + return fast_recovery_exit_tsn_.has_value(); + } + + // Indicates if the congestion control algorithm is in "fast retransmit". + bool is_in_fast_retransmit() const { return is_in_fast_retransmit_; } + + // Indicates if the provided SACK is valid given what has previously been + // received. If it returns false, the SACK is most likely a duplicate of + // something already seen, so this returning false doesn't necessarily mean + // that the SACK is illegal. + bool IsSackValid(const SackChunk& sack) const; + + // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items + // in the retransmission queue up until this value and will update `ack_info` + // by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`. + void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info); + + // Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK + // as "acked" and update `ack_info` by adding new TSNs to `added_tsns`. + void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + AckInfo& ack_info); + + // Mark chunks reported as "missing", as "nacked" or "to be retransmitted" + // depending how many times this has happened. Only packets up until + // `ack_info.highest_tsn_acked` (highest TSN newly acknowledged) are + // nacked/retransmitted. The method will set `ack_info.has_packet_loss`. + void NackBetweenAckBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + AckInfo& ack_info); + + // When a SACK chunk is received, this method will be called which _may_ call + // into the `RetransmissionTimeout` to update the RTO. + void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack); + + // If the congestion control is in "fast recovery mode", this may be exited + // now. + void MaybeExitFastRecovery(UnwrappedTSN cumulative_tsn_ack); + + // If chunks have been ACKed, stop the retransmission timer. + void StopT3RtxTimerOnIncreasedCumulativeTsnAck( + UnwrappedTSN cumulative_tsn_ack); + + // Update the congestion control algorithm given as the cumulative ack TSN + // value has increased, as reported in an incoming SACK chunk. + void HandleIncreasedCumulativeTsnAck(size_t outstanding_bytes, + size_t total_bytes_acked); + // Update the congestion control algorithm, given as packet loss has been + // detected, as reported in an incoming SACK chunk. + void HandlePacketLoss(UnwrappedTSN highest_tsn_acked); + // Recalculate the number of in-flight payload bytes. + void RecalculateOutstandingBytes(); + // Update the view of the receiver window size. + void UpdateReceiverWindow(uint32_t a_rwnd); + // Given `max_size` of space left in a packet, which chunks can be added to + // it? + std::vector> GetChunksToBeRetransmitted(size_t max_size); + // If there is data sent and not ACKED, ensure that the retransmission timer + // is running. + void StartT3RtxTimerIfOutstandingData(); + + // Given the current time `now_ms`, expire chunks that have a limited + // lifetime. + void ExpireChunks(TimeMs now); + // Given that a message fragment, `item` has expired, expire all other + // fragments that share the same message - even never-before-sent fragments + // that are still in the SendQueue. + void ExpireAllFor(const RetransmissionQueue::TxData& item); + + // Returns the current congestion control algorithm phase. + CongestionAlgorithmPhase phase() const { + return (cwnd_ <= ssthresh_) + ? CongestionAlgorithmPhase::kSlowStart + : CongestionAlgorithmPhase::kCongestionAvoidance; + } + + const DcSctpOptions options_; + // If the peer supports RFC3758 - SCTP Partial Reliability Extension. + const bool partial_reliability_; + const std::string log_prefix_; + // The size of the data chunk (DATA/I-DATA) header that is used. + const size_t data_chunk_header_size_; + // Called when a new RTT measurement has been done + const std::function on_new_rtt_; + // Called when the send queue is empty. + const std::function on_send_queue_empty_; + // Called when a SACK has been seen that cleared the retransmission counter. + const std::function on_clear_retransmission_counter_; + // The retransmission counter. + Timer& t3_rtx_; + // Unwraps TSNs + UnwrappedTSN::Unwrapper tsn_unwrapper_; + + // Congestion Window. Number of bytes that may be in-flight (sent, not acked). + size_t cwnd_; + // Receive Window. Number of bytes available in the receiver's RX buffer. + size_t rwnd_; + // Slow Start Threshold. See RFC4960. + size_t ssthresh_; + // Partial Bytes Acked. See RFC4960. + size_t partial_bytes_acked_ = 0; + // If set, fast recovery is enabled until this TSN has been cumulative + // acked. + absl::optional fast_recovery_exit_tsn_ = absl::nullopt; + // Indicates if the congestion algorithm is in fast retransmit. + bool is_in_fast_retransmit_ = false; + + // Next TSN to used. + UnwrappedTSN next_tsn_; + // The last cumulative TSN ack number + UnwrappedTSN last_cumulative_tsn_ack_; + // The send queue. + SendQueue& send_queue_; + // All the outstanding data chunks that are in-flight and that have not been + // cumulative acked. Note that it also contains chunks that have been acked in + // gap ack blocks. + std::map outstanding_data_; + // The sum of the message bytes of the send_queue_ + size_t outstanding_bytes_ = 0; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_ diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc new file mode 100644 index 0000000000..f36d91eb7b --- /dev/null +++ b/net/dcsctp/tx/retransmission_queue_test.cc @@ -0,0 +1,804 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/retransmission_queue.h" + +#include +#include +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/testing/data_generator.h" +#include "net/dcsctp/timer/fake_timeout.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/mock_send_queue.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::MockFunction; +using State = ::dcsctp::RetransmissionQueue::State; +using ::testing::_; +using ::testing::ElementsAre; +using ::testing::IsEmpty; +using ::testing::NiceMock; +using ::testing::Pair; +using ::testing::SizeIs; +using ::testing::UnorderedElementsAre; + +constexpr uint32_t kArwnd = 100000; +constexpr uint32_t kMaxMtu = 1191; + +class RetransmissionQueueTest : public testing::Test { + protected: + RetransmissionQueueTest() + : gen_(MID(42)), + timeout_manager_([this]() { return now_; }), + timer_manager_([this]() { return timeout_manager_.CreateTimeout(); }), + timer_(timer_manager_.CreateTimer( + "test/t3_rtx", + []() { return absl::nullopt; }, + TimerOptions(DurationMs(0)))) {} + + std::function CreateChunk() { + return [this](TimeMs now, size_t max_size) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }; + } + + std::vector GetSentPacketTSNs(RetransmissionQueue& queue) { + std::vector tsns; + for (const auto& elem : queue.GetChunksToSend(now_, 10000)) { + tsns.push_back(elem.first); + } + return tsns; + } + + RetransmissionQueue CreateQueue(bool supports_partial_reliability = true, + bool use_message_interleaving = false) { + DcSctpOptions options; + options.mtu = kMaxMtu; + return RetransmissionQueue( + "", TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(), + on_outgoing_message_buffer_empty_.AsStdFunction(), + on_clear_retransmission_counter_.AsStdFunction(), *timer_, options, + supports_partial_reliability, use_message_interleaving); + } + + DataGenerator gen_; + TimeMs now_ = TimeMs(0); + FakeTimeoutManager timeout_manager_; + TimerManager timer_manager_; + NiceMock> on_rtt_; + NiceMock> on_outgoing_message_buffer_empty_; + NiceMock> on_clear_retransmission_counter_; + NiceMock producer_; + std::unique_ptr timer_; +}; + +TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, SendOneChunk) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); + + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12))); + + queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 5)}, + {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kNacked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kNacked), // + Pair(TSN(17), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + // Send more chunks, but leave some as gaps to force retransmission after + // three NACKs. + + // Send 18 + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18))); + + // Ack 12, 14-15, 17-18 + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 6)}, + {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kNacked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kNacked), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked))); + + // Send 19 + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19))); + + // Ack 12, 14-15, 17-19 + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 7)}, + {})); + + // Send 20 + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20))); + + // Ack 12, 14-15, 17-20 + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, + {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 8)}, + {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kToBeRetransmitted), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kToBeRetransmitted), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked), // + Pair(TSN(19), State::kAcked), // + Pair(TSN(20), State::kAcked))); + + // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be + // resent right now. The send queue will not even be queried. + EXPECT_CALL(producer_, Produce).Times(0); + + EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13), TSN(16))); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kInFlight), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kAcked), // + Pair(TSN(18), State::kAcked), // + Pair(TSN(19), State::kAcked), // + Pair(TSN(20), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }) + .WillOnce([this](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _))); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + std::vector> chunks_to_rtx = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { + RetransmissionQueue queue = + CreateQueue(/*supports_partial_reliability=*/false); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(0); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); +} // namespace dcsctp + +TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned))); + + std::vector> chunks_to_rtx = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_rtx, testing::IsEmpty()); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned))); +} + +TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + dts.max_retransmissions = 3; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(0); + + // Retransmission 1 + queue.HandleT3RtxTimerExpiry(); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); + + // Retransmission 2 + queue.HandleT3RtxTimerExpiry(); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); + + // Retransmission 3 + queue.HandleT3RtxTimerExpiry(); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); + + // Retransmission 4 - not allowed. + queue.HandleT3RtxTimerExpiry(); + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty()); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned))); +} + +TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { + RetransmissionQueue queue = CreateQueue(); + static constexpr size_t kCwnd = 1200; + queue.set_cwnd(kCwnd); + EXPECT_EQ(queue.cwnd(), kCwnd); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + + std::vector payload(1000); + EXPECT_CALL(producer_, Produce) + .WillOnce([this, payload](TimeMs, size_t) { + return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1500); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); + + // Will force chunks to be retransmitted + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted))); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + + std::vector> chunks_to_rtx = + queue.GetChunksToSend(now_, 1500); + EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight))); + EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); +} + +TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + // Send and ack first chunk (TSN 10) + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight))); + + // Chunk 10 is acked, but the remaining are lost + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kToBeRetransmitted), // + Pair(TSN(12), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned))); + + ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); + EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12)); + EXPECT_THAT(forward_tsn.skipped_streams(), + UnorderedElementsAre( + ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42)))); +} + +TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { + RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(1); + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(2); + SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(3); + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + DataGeneratorOptions opts; + opts.stream_id = StreamID(4); + SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts)); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _), Pair(TSN(13), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight))); + + // Chunk 13 is acked, but the remaining are lost + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(4, 4)}, {})); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kNacked), // + Pair(TSN(11), State::kNacked), // + Pair(TSN(12), State::kNacked), // + Pair(TSN(13), State::kAcked))); + + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted), // + Pair(TSN(11), State::kToBeRetransmitted), // + Pair(TSN(12), State::kToBeRetransmitted), // + Pair(TSN(13), State::kAcked))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .Times(1); + EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42))) + .Times(1); + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42))) + .Times(1); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned), // + Pair(TSN(13), State::kAcked))); + + IForwardTsnChunk forward_tsn = queue.CreateIForwardTsn(); + EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12)); + EXPECT_THAT( + forward_tsn.skipped_streams(), + UnorderedElementsAre(IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(1), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(true), StreamID(2), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(3), MID(42)))); +} + +TEST_F(RetransmissionQueueTest, MeasureRTT) { + RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + + now_ = now_ + DurationMs(123); + + EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1); + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); +} + +TEST_F(RetransmissionQueueTest, ValidateCumTsnAtRest) { + RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); + + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {}))); + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}))); +} + +TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { + RetransmissionQueue queue = CreateQueue(); + + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(14), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(15), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(16), kArwnd, {}, {}))); + EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(17), kArwnd, {}, {}))); + EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(18), kArwnd, {}, {}))); +} + +TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + // Ack 9, 20-25. This is an invalid SACK, but should still be handled. + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(11, 16)}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight), // + Pair(TSN(14), State::kInFlight), // + Pair(TSN(15), State::kInFlight), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) { + RetransmissionQueue queue = CreateQueue(); + + // Nothing produced - nothing in retransmission queue + + // Ack 9, 12-13 + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(3, 4)}, {})); + + // Gap ack blocks are just ignore. + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked))); +} + +TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillOnce(CreateChunk()) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_THAT(GetSentPacketTSNs(queue), + testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), + TSN(15), TSN(16), TSN(17))); + + // Ack 9, 10-14. This is actually an invalid ACK as the first gap can't be + // adjacent to the cum-tsn-ack, but it's not strictly forbidden. However, the + // cum-tsn-ack should not move, as the gap-ack-blocks are just advisory. + queue.HandleSack( + now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(1, 5)}, {})); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kInFlight), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kInFlight))); +} + +TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { + RetransmissionQueue queue = CreateQueue(); + + // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the + // magic numbers in this test. + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t size) { + EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize); + + std::vector payload(183); + return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + }) + .WillOnce([this](TimeMs, size_t size) { + EXPECT_EQ(size, 976 - DataChunk::kHeaderSize); + + std::vector payload(957); + return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + }); + + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1188 - 12); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _))); +} + +} // namespace +} // namespace dcsctp