dcsctp: Add Retransmission Queue
The Retransmission Queue contain all message fragments (DATA chunks) that have once been sent, but not yet ACKed by the receiver. It will process incoming SACK chunks, which informs it which chunks that the receiver has seen (ACKed) and which that are lost (NACKed), and will retransmit chunks when it's time. If a message has been sent with partial reliability, e.g. to have a limited number of retransmissions or a limited lifetime, the Retransmission Queue may discard a partially sent and expired message and will instruct the receiver that "don't expect this message - it's expired" by sending a FORWARD-TSN chunk. This currently also includes the congestion control algorithm as it's tightly coupled with the state of the retransmission queue. This is a fairly complicated piece of logic which decides how much data that can be in-flight, depending on the available bandwidth. This is not done by any bandwidth estimation, but similar to TCP, where data is sent until it's lost, and then "we dial down a knob" and take it more carefully from here on. Future refactoring will try to separate the logic regarding fragment retransmission and the congestion control algorithm. Bug: webrtc:12614 Change-Id: I8678250abb766e567c3450634686919936ea077b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214046 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33833}
This commit is contained in:
parent
27e50ccf4c
commit
03e912abaf
@ -64,7 +64,7 @@ struct Data {
|
||||
Data& operator=(Data&& other) = default;
|
||||
|
||||
// Creates a copy of this `Data` object.
|
||||
Data Clone() {
|
||||
Data Clone() const {
|
||||
return Data(stream_id, ssn, message_id, fsn, ppid, payload, is_beginning,
|
||||
is_end, is_unordered);
|
||||
}
|
||||
|
||||
@ -52,6 +52,19 @@ rtc_library("retransmission_timeout") {
|
||||
]
|
||||
}
|
||||
|
||||
rtc_library("retransmission_queue") {
|
||||
deps = [
|
||||
":send_queue",
|
||||
"../../../api:array_view",
|
||||
"../../../rtc_base:checks",
|
||||
"../../../rtc_base:rtc_base_approved",
|
||||
]
|
||||
sources = [
|
||||
"retransmission_queue.cc",
|
||||
"retransmission_queue.h",
|
||||
]
|
||||
}
|
||||
|
||||
if (rtc_include_tests) {
|
||||
rtc_source_set("mock_send_queue") {
|
||||
testonly = true
|
||||
@ -65,6 +78,7 @@ if (rtc_include_tests) {
|
||||
deps = [
|
||||
":fcfs_send_queue",
|
||||
":retransmission_error_counter",
|
||||
":retransmission_queue",
|
||||
":retransmission_timeout",
|
||||
"../../../api:array_view",
|
||||
"../../../rtc_base:checks",
|
||||
@ -75,6 +89,7 @@ if (rtc_include_tests) {
|
||||
sources = [
|
||||
"fcfs_send_queue_test.cc",
|
||||
"retransmission_error_counter_test.cc",
|
||||
"retransmission_queue_test.cc",
|
||||
"retransmission_timeout_test.cc",
|
||||
]
|
||||
}
|
||||
|
||||
798
net/dcsctp/tx/retransmission_queue.cc
Normal file
798
net/dcsctp/tx/retransmission_queue.cc
Normal file
@ -0,0 +1,798 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#include "net/dcsctp/tx/retransmission_queue.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/algorithm/container.h"
|
||||
#include "absl/strings/string_view.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/common/math.h"
|
||||
#include "net/dcsctp/common/pair_hash.h"
|
||||
#include "net/dcsctp/common/sequence_numbers.h"
|
||||
#include "net/dcsctp/common/str_join.h"
|
||||
#include "net/dcsctp/packet/chunk/data_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||
#include "net/dcsctp/packet/chunk/idata_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/sack_chunk.h"
|
||||
#include "net/dcsctp/packet/data.h"
|
||||
#include "net/dcsctp/public/dcsctp_options.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
#include "net/dcsctp/timer/timer.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/strings/string_builder.h"
|
||||
|
||||
namespace dcsctp {
|
||||
namespace {
|
||||
|
||||
// The number of times a packet must be NACKed before it's retransmitted.
|
||||
// See https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
constexpr size_t kNumberOfNacksForRetransmission = 3;
|
||||
} // namespace
|
||||
|
||||
RetransmissionQueue::RetransmissionQueue(
|
||||
absl::string_view log_prefix,
|
||||
TSN initial_tsn,
|
||||
size_t a_rwnd,
|
||||
SendQueue& send_queue,
|
||||
std::function<void(DurationMs rtt)> on_new_rtt,
|
||||
std::function<void()> on_send_queue_empty,
|
||||
std::function<void()> on_clear_retransmission_counter,
|
||||
Timer& t3_rtx,
|
||||
const DcSctpOptions& options,
|
||||
bool supports_partial_reliability,
|
||||
bool use_message_interleaving)
|
||||
: options_(options),
|
||||
partial_reliability_(supports_partial_reliability),
|
||||
log_prefix_(std::string(log_prefix) + "tx: "),
|
||||
data_chunk_header_size_(use_message_interleaving
|
||||
? IDataChunk::kHeaderSize
|
||||
: DataChunk::kHeaderSize),
|
||||
on_new_rtt_(std::move(on_new_rtt)),
|
||||
on_send_queue_empty_(std::move(on_send_queue_empty)),
|
||||
on_clear_retransmission_counter_(
|
||||
std::move(on_clear_retransmission_counter)),
|
||||
t3_rtx_(t3_rtx),
|
||||
cwnd_(options_.cwnd_mtus_initial * options_.mtu),
|
||||
rwnd_(a_rwnd),
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.1
|
||||
// "The initial value of ssthresh MAY be arbitrarily high (for
|
||||
// example, implementations MAY use the size of the receiver advertised
|
||||
// window).""
|
||||
ssthresh_(rwnd_),
|
||||
next_tsn_(tsn_unwrapper_.Unwrap(initial_tsn)),
|
||||
last_cumulative_tsn_ack_(tsn_unwrapper_.Unwrap(TSN(*initial_tsn - 1))),
|
||||
send_queue_(send_queue) {}
|
||||
|
||||
// Returns how large a chunk will be, serialized, carrying the data
|
||||
size_t RetransmissionQueue::GetSerializedChunkSize(const Data& data) const {
|
||||
return RoundUpTo4(data_chunk_header_size_ + data.size());
|
||||
}
|
||||
|
||||
void RetransmissionQueue::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
|
||||
AckInfo& ack_info) {
|
||||
auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack);
|
||||
|
||||
for (auto it = outstanding_data_.begin(); it != first_unacked; ++it) {
|
||||
ack_info.bytes_acked_by_cumulative_tsn_ack += it->second.data().size();
|
||||
ack_info.acked_tsns.push_back(it->first.Wrap());
|
||||
}
|
||||
|
||||
outstanding_data_.erase(outstanding_data_.begin(), first_unacked);
|
||||
}
|
||||
|
||||
void RetransmissionQueue::AckGapBlocks(
|
||||
UnwrappedTSN cumulative_tsn_ack,
|
||||
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
|
||||
AckInfo& ack_info) {
|
||||
// Mark all non-gaps as ACKED (but they can't be removed) as (from RFC)
|
||||
// "SCTP considers the information carried in the Gap Ack Blocks in the
|
||||
// SACK chunk as advisory.". Note that when NR-SACK is supported, this can be
|
||||
// handled differently.
|
||||
|
||||
for (auto& block : gap_ack_blocks) {
|
||||
auto start = outstanding_data_.lower_bound(
|
||||
UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start));
|
||||
auto end = outstanding_data_.upper_bound(
|
||||
UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end));
|
||||
for (auto iter = start; iter != end; ++iter) {
|
||||
if (iter->second.state() != State::kAcked) {
|
||||
ack_info.bytes_acked_by_new_gap_ack_blocks +=
|
||||
iter->second.data().size();
|
||||
iter->second.SetState(State::kAcked);
|
||||
ack_info.highest_tsn_acked =
|
||||
std::max(ack_info.highest_tsn_acked, iter->first);
|
||||
ack_info.acked_tsns.push_back(iter->first.Wrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RetransmissionQueue::NackBetweenAckBlocks(
|
||||
UnwrappedTSN cumulative_tsn_ack,
|
||||
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
|
||||
AckInfo& ack_info) {
|
||||
// Mark everything between the blocks as NACKED/TO_BE_RETRANSMITTED.
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
// "Mark the DATA chunk(s) with three miss indications for retransmission."
|
||||
// "For each incoming SACK, miss indications are incremented only for
|
||||
// missing TSNs prior to the highest TSN newly acknowledged in the SACK."
|
||||
//
|
||||
// What this means is that only when there is a increasing stream of data
|
||||
// received and there are new packets seen (since last time), packets that are
|
||||
// in-flight and between gaps should be nacked. This means that SCTP relies on
|
||||
// the T3-RTX-timer to re-send packets otherwise.
|
||||
UnwrappedTSN max_tsn_to_nack = ack_info.highest_tsn_acked;
|
||||
if (is_in_fast_recovery() && cumulative_tsn_ack > last_cumulative_tsn_ack_) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
// "If an endpoint is in Fast Recovery and a SACK arrives that advances
|
||||
// the Cumulative TSN Ack Point, the miss indications are incremented for
|
||||
// all TSNs reported missing in the SACK."
|
||||
max_tsn_to_nack = UnwrappedTSN::AddTo(
|
||||
cumulative_tsn_ack,
|
||||
gap_ack_blocks.empty() ? 0 : gap_ack_blocks.rbegin()->end);
|
||||
}
|
||||
|
||||
UnwrappedTSN prev_block_last_acked = cumulative_tsn_ack;
|
||||
for (auto& block : gap_ack_blocks) {
|
||||
UnwrappedTSN cur_block_first_acked =
|
||||
UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
|
||||
for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
|
||||
iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
|
||||
if (iter->first <= max_tsn_to_nack) {
|
||||
iter->second.Nack();
|
||||
|
||||
if (iter->second.state() == State::kToBeRetransmitted) {
|
||||
ack_info.has_packet_loss = true;
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap()
|
||||
<< " marked for retransmission";
|
||||
}
|
||||
}
|
||||
}
|
||||
prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
|
||||
}
|
||||
|
||||
// Note that packets are not NACKED which are above the highest gap-ack-block
|
||||
// (or above the cumulative ack TSN if no gap-ack-blocks) as only packets
|
||||
// up until the highest_tsn_acked (see above) should be considered when
|
||||
// NACKing.
|
||||
}
|
||||
|
||||
void RetransmissionQueue::MaybeExitFastRecovery(
|
||||
UnwrappedTSN cumulative_tsn_ack) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
// "When a SACK acknowledges all TSNs up to and including this [fast
|
||||
// recovery] exit point, Fast Recovery is exited."
|
||||
if (fast_recovery_exit_tsn_.has_value() &&
|
||||
cumulative_tsn_ack >= *fast_recovery_exit_tsn_) {
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_
|
||||
<< "exit_point=" << *fast_recovery_exit_tsn_->Wrap()
|
||||
<< " reached - exiting fast recovery";
|
||||
fast_recovery_exit_tsn_ = absl::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
void RetransmissionQueue::HandleIncreasedCumulativeTsnAck(
|
||||
size_t outstanding_bytes,
|
||||
size_t total_bytes_acked) {
|
||||
// Allow some margin for classifying as fully utilized, due to e.g. that too
|
||||
// small packets (less than kMinimumFragmentedPayload) are not sent +
|
||||
// overhead.
|
||||
bool is_fully_utilized = outstanding_bytes + options_.mtu >= cwnd_;
|
||||
size_t old_cwnd = cwnd_;
|
||||
if (phase() == CongestionAlgorithmPhase::kSlowStart) {
|
||||
if (is_fully_utilized && !is_in_fast_recovery()) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.1
|
||||
// "Only when these three conditions are met can the cwnd be
|
||||
// increased; otherwise, the cwnd MUST not be increased. If these
|
||||
// conditions are met, then cwnd MUST be increased by, at most, the
|
||||
// lesser of 1) the total size of the previously outstanding DATA
|
||||
// chunk(s) acknowledged, and 2) the destination's path MTU."
|
||||
if (options_.slow_start_tcp_style) {
|
||||
cwnd_ += std::min(total_bytes_acked, cwnd_);
|
||||
} else {
|
||||
cwnd_ += std::min(total_bytes_acked, options_.mtu);
|
||||
}
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "SS increase cwnd=" << cwnd_
|
||||
<< " (" << old_cwnd << ")";
|
||||
}
|
||||
} else if (phase() == CongestionAlgorithmPhase::kCongestionAvoidance) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.2
|
||||
// "Whenever cwnd is greater than ssthresh, upon each SACK arrival
|
||||
// that advances the Cumulative TSN Ack Point, increase
|
||||
// partial_bytes_acked by the total number of bytes of all new chunks
|
||||
// acknowledged in that SACK including chunks acknowledged by the new
|
||||
// Cumulative TSN Ack and by Gap Ack Blocks."
|
||||
size_t old_pba = partial_bytes_acked_;
|
||||
partial_bytes_acked_ += total_bytes_acked;
|
||||
|
||||
if (partial_bytes_acked_ >= cwnd_ && is_fully_utilized) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.2
|
||||
// "When partial_bytes_acked is equal to or greater than cwnd and
|
||||
// before the arrival of the SACK the sender had cwnd or more bytes of
|
||||
// data outstanding (i.e., before arrival of the SACK, flightsize was
|
||||
// greater than or equal to cwnd), increase cwnd by MTU, and reset
|
||||
// partial_bytes_acked to (partial_bytes_acked - cwnd)."
|
||||
cwnd_ += options_.mtu;
|
||||
partial_bytes_acked_ -= cwnd_;
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA increase cwnd=" << cwnd_
|
||||
<< " (" << old_cwnd << ") ssthresh=" << ssthresh_
|
||||
<< ", pba=" << partial_bytes_acked_ << " ("
|
||||
<< old_pba << ")";
|
||||
} else {
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA unchanged cwnd=" << cwnd_
|
||||
<< " (" << old_cwnd << ") ssthresh=" << ssthresh_
|
||||
<< ", pba=" << partial_bytes_acked_ << " ("
|
||||
<< old_pba << ")";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) {
|
||||
if (!is_in_fast_recovery()) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
// "If not in Fast Recovery, adjust the ssthresh and cwnd of the
|
||||
// destination address(es) to which the missing DATA chunks were last
|
||||
// sent, according to the formula described in Section 7.2.3."
|
||||
size_t old_cwnd = cwnd_;
|
||||
size_t old_pba = partial_bytes_acked_;
|
||||
ssthresh_ = std::max(cwnd_ / 2, options_.cwnd_mtus_min * options_.mtu);
|
||||
cwnd_ = ssthresh_;
|
||||
partial_bytes_acked_ = 0;
|
||||
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_
|
||||
<< "packet loss detected (not fast recovery). cwnd="
|
||||
<< cwnd_ << " (" << old_cwnd
|
||||
<< "), ssthresh=" << ssthresh_
|
||||
<< ", pba=" << partial_bytes_acked_ << " (" << old_pba
|
||||
<< ")";
|
||||
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
// "If not in Fast Recovery, enter Fast Recovery and mark the highest
|
||||
// outstanding TSN as the Fast Recovery exit point."
|
||||
fast_recovery_exit_tsn_ = outstanding_data_.empty()
|
||||
? last_cumulative_tsn_ack_
|
||||
: outstanding_data_.rbegin()->first;
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_
|
||||
<< "fast recovery initiated with exit_point="
|
||||
<< *fast_recovery_exit_tsn_->Wrap();
|
||||
} else {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
// "While in Fast Recovery, the ssthresh and cwnd SHOULD NOT change for
|
||||
// any destinations due to a subsequent Fast Recovery event (i.e., one
|
||||
// SHOULD NOT reduce the cwnd further due to a subsequent Fast Retransmit)."
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_
|
||||
<< "packet loss detected (fast recovery). No changes.";
|
||||
}
|
||||
}
|
||||
|
||||
void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) {
|
||||
rwnd_ = outstanding_bytes_ >= a_rwnd ? 0 : a_rwnd - outstanding_bytes_;
|
||||
}
|
||||
|
||||
void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() {
|
||||
// Note: Can't use `outstanding_bytes()` as that one doesn't count chunks to
|
||||
// be retransmitted.
|
||||
if (outstanding_data_.empty()) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.2
|
||||
// "Whenever all outstanding data sent to an address have been
|
||||
// acknowledged, turn off the T3-rtx timer of that address.
|
||||
// Note: Already stopped in `StopT3RtxTimerOnIncreasedCumulativeTsnAck`."
|
||||
} else {
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.2
|
||||
// "Whenever a SACK is received that acknowledges the DATA chunk
|
||||
// with the earliest outstanding TSN for that address, restart the T3-rtx
|
||||
// timer for that address with its current RTO (if there is still
|
||||
// outstanding data on that address)."
|
||||
// "Whenever a SACK is received missing a TSN that was previously
|
||||
// acknowledged via a Gap Ack Block, start the T3-rtx for the destination
|
||||
// address to which the DATA chunk was originally transmitted if it is not
|
||||
// already running."
|
||||
if (!t3_rtx_.is_running()) {
|
||||
t3_rtx_.Start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const {
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.2.1
|
||||
// "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point,
|
||||
// then drop the SACK. Since Cumulative TSN Ack is monotonically increasing,
|
||||
// a SACK whose Cumulative TSN Ack is less than the Cumulative TSN Ack Point
|
||||
// indicates an out-of- order SACK."
|
||||
//
|
||||
// Note: Important not to drop SACKs with identical TSN to that previously
|
||||
// received, as the gap ack blocks or dup tsn fields may have changed.
|
||||
UnwrappedTSN cumulative_tsn_ack =
|
||||
tsn_unwrapper_.PeekUnwrap(sack.cumulative_tsn_ack());
|
||||
if (cumulative_tsn_ack < last_cumulative_tsn_ack_) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.2.1
|
||||
// "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point,
|
||||
// then drop the SACK. Since Cumulative TSN Ack is monotonically
|
||||
// increasing, a SACK whose Cumulative TSN Ack is less than the Cumulative
|
||||
// TSN Ack Point indicates an out-of- order SACK."
|
||||
return false;
|
||||
} else if (outstanding_data_.empty() &&
|
||||
cumulative_tsn_ack > last_cumulative_tsn_ack_) {
|
||||
// No in-flight data and cum-tsn-ack above what was last ACKed - not valid.
|
||||
return false;
|
||||
} else if (!outstanding_data_.empty() &&
|
||||
cumulative_tsn_ack > outstanding_data_.rbegin()->first) {
|
||||
// There is in-flight data, but the cum-tsn-ack is beyond that - not valid.
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
|
||||
if (!IsSackValid(sack)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t old_outstanding_bytes = outstanding_bytes_;
|
||||
size_t old_rwnd = rwnd_;
|
||||
UnwrappedTSN cumulative_tsn_ack =
|
||||
tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack());
|
||||
|
||||
if (sack.gap_ack_blocks().empty()) {
|
||||
UpdateRTT(now, cumulative_tsn_ack);
|
||||
}
|
||||
|
||||
AckInfo ack_info(cumulative_tsn_ack);
|
||||
// Erase all items up to cumulative_tsn_ack.
|
||||
RemoveAcked(cumulative_tsn_ack, ack_info);
|
||||
|
||||
// ACK packets reported in the gap ack blocks
|
||||
AckGapBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info);
|
||||
|
||||
// NACK and possibly mark for retransmit chunks that weren't acked.
|
||||
NackBetweenAckBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info);
|
||||
|
||||
RecalculateOutstandingBytes();
|
||||
// Update of outstanding_data_ is now done. Congestion control remains.
|
||||
UpdateReceiverWindow(sack.a_rwnd());
|
||||
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK. Acked TSN: "
|
||||
<< StrJoin(ack_info.acked_tsns, ",",
|
||||
[](rtc::StringBuilder& sb, TSN tsn) {
|
||||
sb << *tsn;
|
||||
})
|
||||
<< ", cum_tsn_ack=" << *cumulative_tsn_ack.Wrap() << " ("
|
||||
<< *last_cumulative_tsn_ack_.Wrap()
|
||||
<< "), outstanding_bytes=" << outstanding_bytes_ << " ("
|
||||
<< old_outstanding_bytes << "), rwnd=" << rwnd_ << " ("
|
||||
<< old_rwnd << ")";
|
||||
|
||||
MaybeExitFastRecovery(cumulative_tsn_ack);
|
||||
|
||||
if (cumulative_tsn_ack > last_cumulative_tsn_ack_) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.2
|
||||
// "Whenever a SACK is received that acknowledges the DATA chunk
|
||||
// with the earliest outstanding TSN for that address, restart the T3-rtx
|
||||
// timer for that address with its current RTO (if there is still
|
||||
// outstanding data on that address)."
|
||||
// Note: It may be started again in a bit further down.
|
||||
t3_rtx_.Stop();
|
||||
|
||||
HandleIncreasedCumulativeTsnAck(
|
||||
old_outstanding_bytes, ack_info.bytes_acked_by_cumulative_tsn_ack +
|
||||
ack_info.bytes_acked_by_new_gap_ack_blocks);
|
||||
}
|
||||
|
||||
if (ack_info.has_packet_loss) {
|
||||
is_in_fast_retransmit_ = true;
|
||||
HandlePacketLoss(ack_info.highest_tsn_acked);
|
||||
}
|
||||
|
||||
// https://tools.ietf.org/html/rfc4960#section-8.2
|
||||
// "When an outstanding TSN is acknowledged [...] the endpoint shall clear
|
||||
// the error counter ..."
|
||||
if (ack_info.bytes_acked_by_cumulative_tsn_ack > 0 ||
|
||||
ack_info.bytes_acked_by_new_gap_ack_blocks > 0) {
|
||||
on_clear_retransmission_counter_();
|
||||
}
|
||||
|
||||
last_cumulative_tsn_ack_ = cumulative_tsn_ack;
|
||||
StartT3RtxTimerIfOutstandingData();
|
||||
return true;
|
||||
}
|
||||
|
||||
void RetransmissionQueue::UpdateRTT(TimeMs now,
|
||||
UnwrappedTSN cumulative_tsn_ack) {
|
||||
// RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C,
|
||||
// Halvorsen P (2006) Considerations of SCTP retransmission delays for thin
|
||||
// streams.
|
||||
// Due to delayed acknowledgement, the SACK may be sent much later which
|
||||
// increases the calculated RTT.
|
||||
// TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and
|
||||
// use only those packets for measurement.
|
||||
|
||||
auto it = outstanding_data_.find(cumulative_tsn_ack);
|
||||
if (it != outstanding_data_.end()) {
|
||||
if (!it->second.has_been_retransmitted()) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.1
|
||||
// "Karn's algorithm: RTT measurements MUST NOT be made using
|
||||
// packets that were retransmitted (and thus for which it is ambiguous
|
||||
// whether the reply was for the first instance of the chunk or for a
|
||||
// later instance)"
|
||||
DurationMs rtt = now - it->second.time_sent();
|
||||
on_new_rtt_(rtt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RetransmissionQueue::RecalculateOutstandingBytes() {
|
||||
outstanding_bytes_ = absl::c_accumulate(
|
||||
outstanding_data_, 0,
|
||||
[&](size_t r, const std::pair<const UnwrappedTSN, TxData>& d) {
|
||||
// Packets that have been ACKED or NACKED are not outstanding, as they
|
||||
// are received. And packets that are marked for retransmission or
|
||||
// abandoned are lost, and not outstanding.
|
||||
return r + (d.second.state() == State::kInFlight
|
||||
? GetSerializedChunkSize(d.second.data())
|
||||
: 0);
|
||||
});
|
||||
}
|
||||
|
||||
void RetransmissionQueue::HandleT3RtxTimerExpiry() {
|
||||
size_t old_cwnd = cwnd_;
|
||||
size_t old_outstanding_bytes = outstanding_bytes_;
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.3
|
||||
// "For the destination address for which the timer expires, adjust
|
||||
// its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU."
|
||||
ssthresh_ = std::max(cwnd_ / 2, 4 * options_.mtu);
|
||||
cwnd_ = 1 * options_.mtu;
|
||||
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.3
|
||||
// "For the destination address for which the timer expires, set RTO
|
||||
// <- RTO * 2 ("back off the timer"). The maximum value discussed in rule C7
|
||||
// above (RTO.max) may be used to provide an upper bound to this doubling
|
||||
// operation."
|
||||
|
||||
// Already done by the Timer implementation.
|
||||
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.3
|
||||
// "Determine how many of the earliest (i.e., lowest TSN) outstanding
|
||||
// DATA chunks for the address for which the T3-rtx has expired will fit into
|
||||
// a single packet"
|
||||
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.3
|
||||
// "Note: Any DATA chunks that were sent to the address for which the
|
||||
// T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be
|
||||
// marked for retransmission and sent as soon as cwnd allows (normally, when a
|
||||
// SACK arrives)."
|
||||
int count = 0;
|
||||
for (auto& elem : outstanding_data_) {
|
||||
UnwrappedTSN tsn = elem.first;
|
||||
TxData& item = elem.second;
|
||||
if (item.state() == State::kInFlight || item.state() == State::kNacked) {
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap()
|
||||
<< " will be retransmitted due to T3-RTX";
|
||||
item.SetState(State::kToBeRetransmitted);
|
||||
++count;
|
||||
}
|
||||
}
|
||||
|
||||
// Marking some packets as retransmitted changes outstanding bytes.
|
||||
RecalculateOutstandingBytes();
|
||||
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.3
|
||||
// "Start the retransmission timer T3-rtx on the destination address
|
||||
// to which the retransmission is sent, if rule R1 above indicates to do so."
|
||||
|
||||
// Already done by the Timer implementation.
|
||||
|
||||
RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
|
||||
<< " (" << old_cwnd << "), ssthresh=" << ssthresh_
|
||||
<< ", rtx-packets=" << count << ", outstanding_bytes "
|
||||
<< outstanding_bytes_ << " (" << old_outstanding_bytes
|
||||
<< ")";
|
||||
}
|
||||
|
||||
std::vector<std::pair<TSN, Data>>
|
||||
RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) {
|
||||
std::vector<std::pair<TSN, Data>> result;
|
||||
for (auto& elem : outstanding_data_) {
|
||||
UnwrappedTSN tsn = elem.first;
|
||||
TxData& item = elem.second;
|
||||
|
||||
size_t serialized_size = GetSerializedChunkSize(item.data());
|
||||
if (item.state() == State::kToBeRetransmitted &&
|
||||
serialized_size <= max_size) {
|
||||
item.Retransmit();
|
||||
result.emplace_back(tsn.Wrap(), item.data().Clone());
|
||||
max_size -= serialized_size;
|
||||
}
|
||||
// No point in continuing if the packet is full.
|
||||
if (max_size <= data_chunk_header_size_) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// As some chunks may have switched state, that needs to be reflected here.
|
||||
if (!result.empty()) {
|
||||
RecalculateOutstandingBytes();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
|
||||
TimeMs now,
|
||||
size_t bytes_remaining_in_packet) {
|
||||
// Chunks are always padded to even divisible by four.
|
||||
RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet));
|
||||
|
||||
std::vector<std::pair<TSN, Data>> to_be_sent;
|
||||
size_t old_outstanding_bytes = outstanding_bytes_;
|
||||
size_t old_rwnd = rwnd_;
|
||||
if (is_in_fast_retransmit()) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-7.2.4
|
||||
// "Determine how many of the earliest (i.e., lowest TSN) DATA chunks
|
||||
// marked for retransmission will fit into a single packet ... Retransmit
|
||||
// those K DATA chunks in a single packet. When a Fast Retransmit is being
|
||||
// performed, the sender SHOULD ignore the value of cwnd and SHOULD NOT
|
||||
// delay retransmission for this single packet."
|
||||
is_in_fast_retransmit_ = false;
|
||||
to_be_sent = GetChunksToBeRetransmitted(bytes_remaining_in_packet);
|
||||
size_t to_be_sent_bytes = absl::c_accumulate(
|
||||
to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) {
|
||||
return r + GetSerializedChunkSize(d.second);
|
||||
});
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "fast-retransmit: sending "
|
||||
<< to_be_sent.size() << " chunks, " << to_be_sent_bytes
|
||||
<< " bytes";
|
||||
} else {
|
||||
// Normal sending. Calculate the bandwidth budget (how many bytes that is
|
||||
// allowed to be sent), and fill that up first with chunks that are
|
||||
// scheduled to be retransmitted. If there is still budget, send new chunks
|
||||
// (which will have their TSN assigned here.)
|
||||
size_t remaining_cwnd_bytes =
|
||||
outstanding_bytes_ >= cwnd_ ? 0 : cwnd_ - outstanding_bytes_;
|
||||
size_t max_bytes = RoundDownTo4(std::min(
|
||||
std::min(bytes_remaining_in_packet, rwnd()), remaining_cwnd_bytes));
|
||||
|
||||
to_be_sent = GetChunksToBeRetransmitted(max_bytes);
|
||||
max_bytes -= absl::c_accumulate(
|
||||
to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) {
|
||||
return r + GetSerializedChunkSize(d.second);
|
||||
});
|
||||
|
||||
while (max_bytes > data_chunk_header_size_) {
|
||||
RTC_DCHECK(IsDivisibleBy4(max_bytes));
|
||||
absl::optional<SendQueue::DataToSend> chunk_opt =
|
||||
send_queue_.Produce(now, max_bytes - data_chunk_header_size_);
|
||||
if (!chunk_opt.has_value()) {
|
||||
on_send_queue_empty_();
|
||||
break;
|
||||
}
|
||||
|
||||
UnwrappedTSN tsn = next_tsn_;
|
||||
next_tsn_.Increment();
|
||||
to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone());
|
||||
|
||||
// All chunks are always padded to be even divisible by 4.
|
||||
size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
|
||||
max_bytes -= chunk_size;
|
||||
outstanding_bytes_ += chunk_size;
|
||||
rwnd_ -= chunk_size;
|
||||
outstanding_data_.emplace(
|
||||
tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data),
|
||||
chunk_opt->max_retransmissions, now,
|
||||
chunk_opt->expires_at));
|
||||
}
|
||||
}
|
||||
|
||||
if (!to_be_sent.empty()) {
|
||||
// https://tools.ietf.org/html/rfc4960#section-6.3.2
|
||||
// "Every time a DATA chunk is sent to any address (including a
|
||||
// retransmission), if the T3-rtx timer of that address is not running,
|
||||
// start it running so that it will expire after the RTO of that address."
|
||||
if (!t3_rtx_.is_running()) {
|
||||
t3_rtx_.Start();
|
||||
}
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Sending TSN "
|
||||
<< StrJoin(to_be_sent, ",",
|
||||
[&](rtc::StringBuilder& sb,
|
||||
const std::pair<TSN, Data>& c) {
|
||||
sb << *c.first;
|
||||
})
|
||||
<< " - "
|
||||
<< absl::c_accumulate(
|
||||
to_be_sent, 0,
|
||||
[&](size_t r, const std::pair<TSN, Data>& d) {
|
||||
return r + GetSerializedChunkSize(d.second);
|
||||
})
|
||||
<< " bytes. outstanding_bytes=" << outstanding_bytes_
|
||||
<< " (" << old_outstanding_bytes << "), cwnd=" << cwnd_
|
||||
<< ", rwnd=" << rwnd_ << " (" << old_rwnd << ")";
|
||||
}
|
||||
return to_be_sent;
|
||||
}
|
||||
|
||||
std::vector<std::pair<TSN, RetransmissionQueue::State>>
|
||||
RetransmissionQueue::GetChunkStatesForTesting() const {
|
||||
std::vector<std::pair<TSN, RetransmissionQueue::State>> states;
|
||||
states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked);
|
||||
for (const auto& elem : outstanding_data_) {
|
||||
states.emplace_back(elem.first.Wrap(), elem.second.state());
|
||||
}
|
||||
return states;
|
||||
}
|
||||
|
||||
bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) {
|
||||
if (!partial_reliability_) {
|
||||
return false;
|
||||
}
|
||||
ExpireChunks(now);
|
||||
if (!outstanding_data_.empty()) {
|
||||
auto it = outstanding_data_.begin();
|
||||
return it->first == last_cumulative_tsn_ack_.next_value() &&
|
||||
it->second.state() == State::kAbandoned;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void RetransmissionQueue::TxData::Nack() {
|
||||
++nack_count_;
|
||||
if (nack_count_ >= kNumberOfNacksForRetransmission) {
|
||||
state_ = State::kToBeRetransmitted;
|
||||
} else {
|
||||
state_ = State::kNacked;
|
||||
}
|
||||
}
|
||||
|
||||
void RetransmissionQueue::TxData::Retransmit() {
|
||||
state_ = State::kInFlight;
|
||||
nack_count_ = 0;
|
||||
++num_retransmissions_;
|
||||
}
|
||||
|
||||
bool RetransmissionQueue::TxData::has_expired(TimeMs now) const {
|
||||
if (state_ != State::kAcked && state_ != State::kAbandoned) {
|
||||
if (max_retransmissions_.has_value() &&
|
||||
num_retransmissions_ >= *max_retransmissions_) {
|
||||
return true;
|
||||
} else if (expires_at_.has_value() && *expires_at_ <= now) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void RetransmissionQueue::ExpireChunks(TimeMs now) {
|
||||
for (const auto& elem : outstanding_data_) {
|
||||
UnwrappedTSN tsn = elem.first;
|
||||
const TxData& item = elem.second;
|
||||
|
||||
// Chunks that are in-flight (possibly lost?), nacked or to be retransmitted
|
||||
// can be expired easily. There is always a risk that a message is expired
|
||||
// that was already received by the peer, but for which there haven't been
|
||||
// a SACK received. But that's acceptable, and handled.
|
||||
if (item.has_expired(now)) {
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
|
||||
<< " and message " << *item.data().message_id
|
||||
<< " as expired";
|
||||
ExpireAllFor(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RetransmissionQueue::ExpireAllFor(
|
||||
const RetransmissionQueue::TxData& item) {
|
||||
// Erase all remaining chunks from the producer, if any.
|
||||
send_queue_.Discard(item.data().is_unordered, item.data().stream_id,
|
||||
item.data().message_id);
|
||||
for (auto& elem : outstanding_data_) {
|
||||
UnwrappedTSN tsn = elem.first;
|
||||
TxData& other = elem.second;
|
||||
|
||||
if (other.state() != State::kAbandoned &&
|
||||
other.data().stream_id == item.data().stream_id &&
|
||||
other.data().is_unordered == item.data().is_unordered &&
|
||||
other.data().message_id == item.data().message_id) {
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
|
||||
<< " as abandoned";
|
||||
other.SetState(State::kAbandoned);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ForwardTsnChunk RetransmissionQueue::CreateForwardTsn() const {
|
||||
std::unordered_map<StreamID, SSN, StreamID::Hasher>
|
||||
skipped_per_ordered_stream;
|
||||
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
|
||||
|
||||
for (const auto& elem : outstanding_data_) {
|
||||
UnwrappedTSN tsn = elem.first;
|
||||
const TxData& item = elem.second;
|
||||
|
||||
if ((tsn != new_cumulative_ack.next_value()) ||
|
||||
item.state() != State::kAbandoned) {
|
||||
break;
|
||||
}
|
||||
new_cumulative_ack = tsn;
|
||||
if (!item.data().is_unordered &&
|
||||
item.data().ssn > skipped_per_ordered_stream[item.data().stream_id]) {
|
||||
skipped_per_ordered_stream[item.data().stream_id] = item.data().ssn;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<ForwardTsnChunk::SkippedStream> skipped_streams;
|
||||
skipped_streams.reserve(skipped_per_ordered_stream.size());
|
||||
for (const auto& elem : skipped_per_ordered_stream) {
|
||||
skipped_streams.emplace_back(elem.first, elem.second);
|
||||
}
|
||||
return ForwardTsnChunk(new_cumulative_ack.Wrap(), std::move(skipped_streams));
|
||||
}
|
||||
|
||||
IForwardTsnChunk RetransmissionQueue::CreateIForwardTsn() const {
|
||||
std::unordered_map<std::pair<IsUnordered, StreamID>, MID, UnorderedStreamHash>
|
||||
skipped_per_stream;
|
||||
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
|
||||
|
||||
for (const auto& elem : outstanding_data_) {
|
||||
UnwrappedTSN tsn = elem.first;
|
||||
const TxData& item = elem.second;
|
||||
|
||||
if ((tsn != new_cumulative_ack.next_value()) ||
|
||||
item.state() != State::kAbandoned) {
|
||||
break;
|
||||
}
|
||||
new_cumulative_ack = tsn;
|
||||
std::pair<IsUnordered, StreamID> stream_id =
|
||||
std::make_pair(item.data().is_unordered, item.data().stream_id);
|
||||
|
||||
if (item.data().message_id > skipped_per_stream[stream_id]) {
|
||||
skipped_per_stream[stream_id] = item.data().message_id;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<IForwardTsnChunk::SkippedStream> skipped_streams;
|
||||
skipped_streams.reserve(skipped_per_stream.size());
|
||||
for (const auto& elem : skipped_per_stream) {
|
||||
const std::pair<IsUnordered, StreamID>& stream = elem.first;
|
||||
MID message_id = elem.second;
|
||||
skipped_streams.emplace_back(stream.first, stream.second, message_id);
|
||||
}
|
||||
|
||||
return IForwardTsnChunk(new_cumulative_ack.Wrap(),
|
||||
std::move(skipped_streams));
|
||||
}
|
||||
|
||||
void RetransmissionQueue::PrepareResetStreams(
|
||||
rtc::ArrayView<const StreamID> streams) {
|
||||
// TODO(boivie): These calls are now only affecting the send queue. The
|
||||
// packet buffer can also change behavior - for example draining the chunk
|
||||
// producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request"
|
||||
// can be sent quickly, with a known `sender_last_assigned_tsn`.
|
||||
send_queue_.PrepareResetStreams(streams);
|
||||
}
|
||||
bool RetransmissionQueue::CanResetStreams() const {
|
||||
return send_queue_.CanResetStreams();
|
||||
}
|
||||
void RetransmissionQueue::CommitResetStreams() {
|
||||
send_queue_.CommitResetStreams();
|
||||
}
|
||||
void RetransmissionQueue::RollbackResetStreams() {
|
||||
send_queue_.RollbackResetStreams();
|
||||
}
|
||||
|
||||
} // namespace dcsctp
|
||||
345
net/dcsctp/tx/retransmission_queue.h
Normal file
345
net/dcsctp/tx/retransmission_queue.h
Normal file
@ -0,0 +1,345 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#ifndef NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
|
||||
#define NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
|
||||
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/strings/string_view.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/common/sequence_numbers.h"
|
||||
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/sack_chunk.h"
|
||||
#include "net/dcsctp/packet/data.h"
|
||||
#include "net/dcsctp/public/dcsctp_options.h"
|
||||
#include "net/dcsctp/timer/timer.h"
|
||||
#include "net/dcsctp/tx/retransmission_timeout.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
|
||||
namespace dcsctp {
|
||||
|
||||
// The RetransmissionQueue manages all DATA/I-DATA chunks that are in-flight and
|
||||
// schedules them to be retransmitted if necessary. Chunks are retransmitted
|
||||
// when they have been lost for a number of consecutive SACKs, or when the
|
||||
// retransmission timer, `t3_rtx` expires.
|
||||
//
|
||||
// As congestion control is tightly connected with the state of transmitted
|
||||
// packets, that's also managed here to limit the amount of data that is
|
||||
// in-flight (sent, but not yet acknowledged).
|
||||
class RetransmissionQueue {
|
||||
public:
|
||||
static constexpr size_t kMinimumFragmentedPayload = 10;
|
||||
// State for DATA chunks (message fragments) in the queue.
|
||||
enum class State {
|
||||
// The chunk has been sent but not received yet (from the sender's point of
|
||||
// view, as no SACK has been received yet that reference this chunk).
|
||||
kInFlight,
|
||||
// A SACK has been received which explicitly marked this chunk as missing -
|
||||
// it's now NACKED and may be retransmitted if NACKED enough times.
|
||||
kNacked,
|
||||
// A chunk that will be retransmitted when possible.
|
||||
kToBeRetransmitted,
|
||||
// A SACK has been received which explicitly marked this chunk as received.
|
||||
kAcked,
|
||||
// A chunk whose message has expired or has been retransmitted too many
|
||||
// times (RFC3758). It will not be retransmitted anymore.
|
||||
kAbandoned,
|
||||
};
|
||||
|
||||
// Creates a RetransmissionQueue which will send data using `initial_tsn` as
|
||||
// the first TSN to use for sent fragments. It will poll data from
|
||||
// `send_queue` and call `on_send_queue_empty` when it is empty. When
|
||||
// SACKs are received, it will estimate the RTT, and call `on_new_rtt`. When
|
||||
// an outstanding chunk has been ACKed, it will call
|
||||
// `on_clear_retransmission_counter` and will also use `t3_rtx`, which is the
|
||||
// SCTP retransmission timer to manage retransmissions.
|
||||
RetransmissionQueue(absl::string_view log_prefix,
|
||||
TSN initial_tsn,
|
||||
size_t a_rwnd,
|
||||
SendQueue& send_queue,
|
||||
std::function<void(DurationMs rtt)> on_new_rtt,
|
||||
std::function<void()> on_send_queue_empty,
|
||||
std::function<void()> on_clear_retransmission_counter,
|
||||
Timer& t3_rtx,
|
||||
const DcSctpOptions& options,
|
||||
bool supports_partial_reliability = true,
|
||||
bool use_message_interleaving = false);
|
||||
|
||||
// Handles a received SACK. Returns true if the `sack` was processed and
|
||||
// false if it was discarded due to received out-of-order and not relevant.
|
||||
bool HandleSack(TimeMs now, const SackChunk& sack);
|
||||
|
||||
// Handles an expired retransmission timer.
|
||||
void HandleT3RtxTimerExpiry();
|
||||
|
||||
// Returns a list of chunks to send that would fit in one SCTP packet with
|
||||
// `bytes_remaining_in_packet` bytes available. This may be further limited by
|
||||
// the congestion control windows. Note that `ShouldSendForwardTSN` must be
|
||||
// called prior to this method, to abandon expired chunks, as this method will
|
||||
// not expire any chunks.
|
||||
std::vector<std::pair<TSN, Data>> GetChunksToSend(
|
||||
TimeMs now,
|
||||
size_t bytes_remaining_in_packet);
|
||||
|
||||
// Returns the internal state of all queued chunks. This is only used in
|
||||
// unit-tests.
|
||||
std::vector<std::pair<TSN, State>> GetChunkStatesForTesting() const;
|
||||
|
||||
// Returns the next TSN that will be allocated for sent DATA chunks.
|
||||
TSN next_tsn() const { return next_tsn_.Wrap(); }
|
||||
|
||||
// Returns the size of the congestion window, in bytes. This is the number of
|
||||
// bytes that may be in-flight.
|
||||
size_t cwnd() const { return cwnd_; }
|
||||
|
||||
// Overrides the current congestion window size.
|
||||
void set_cwnd(size_t cwnd) { cwnd_ = cwnd; }
|
||||
|
||||
// Returns the current receiver window size.
|
||||
size_t rwnd() const { return rwnd_; }
|
||||
|
||||
// Returns the number of bytes of packets that are in-flight.
|
||||
size_t outstanding_bytes() const { return outstanding_bytes_; }
|
||||
|
||||
// Given the current time `now`, it will evaluate if there are chunks that
|
||||
// have expired and that need to be discarded. It returns true if a
|
||||
// FORWARD-TSN should be sent.
|
||||
bool ShouldSendForwardTsn(TimeMs now);
|
||||
|
||||
// Creates a FORWARD-TSN chunk.
|
||||
ForwardTsnChunk CreateForwardTsn() const;
|
||||
|
||||
// Creates an I-FORWARD-TSN chunk.
|
||||
IForwardTsnChunk CreateIForwardTsn() const;
|
||||
|
||||
// See the SendQueue for a longer description of these methods related
|
||||
// to stream resetting.
|
||||
void PrepareResetStreams(rtc::ArrayView<const StreamID> streams);
|
||||
bool CanResetStreams() const;
|
||||
void CommitResetStreams();
|
||||
void RollbackResetStreams();
|
||||
|
||||
private:
|
||||
enum class CongestionAlgorithmPhase {
|
||||
kSlowStart,
|
||||
kCongestionAvoidance,
|
||||
};
|
||||
|
||||
// A fragmented message's DATA chunk while in the retransmission queue, and
|
||||
// its associated metadata.
|
||||
class TxData {
|
||||
public:
|
||||
explicit TxData(Data data,
|
||||
absl::optional<size_t> max_retransmissions,
|
||||
TimeMs time_sent,
|
||||
absl::optional<TimeMs> expires_at)
|
||||
: max_retransmissions_(max_retransmissions),
|
||||
time_sent_(time_sent),
|
||||
expires_at_(expires_at),
|
||||
data_(std::move(data)) {}
|
||||
|
||||
TimeMs time_sent() const { return time_sent_; }
|
||||
|
||||
State state() const { return state_; }
|
||||
void SetState(State state) { state_ = state; }
|
||||
|
||||
const Data& data() const { return data_; }
|
||||
|
||||
// Nacks an item. If it has been nacked enough times, it will be marked for
|
||||
// retransmission.
|
||||
void Nack();
|
||||
void Retransmit();
|
||||
|
||||
bool has_been_retransmitted() { return num_retransmissions_ > 0; }
|
||||
|
||||
// Given the current time, and the current state of this DATA chunk, it will
|
||||
// indicate if it has expired (SCTP Partial Reliability Extension).
|
||||
bool has_expired(TimeMs now) const;
|
||||
|
||||
private:
|
||||
State state_ = State::kInFlight;
|
||||
// The number of times the DATA chunk has been nacked (by having received a
|
||||
// SACK which doesn't include it). Will be cleared on retransmissions.
|
||||
size_t nack_count_ = 0;
|
||||
// The number of times the DATA chunk has been retransmitted.
|
||||
size_t num_retransmissions_ = 0;
|
||||
// If the message was sent with a maximum number of retransmissions, this is
|
||||
// set to that number. The value zero (0) means that it will never be
|
||||
// retransmitted.
|
||||
const absl::optional<size_t> max_retransmissions_;
|
||||
// When the packet was sent, and placed in this queue.
|
||||
const TimeMs time_sent_;
|
||||
// If the message was sent with an expiration time, this is set.
|
||||
const absl::optional<TimeMs> expires_at_;
|
||||
// The actual data to send/retransmit.
|
||||
Data data_;
|
||||
};
|
||||
|
||||
// Contains variables scoped to a processing of an incoming SACK.
|
||||
struct AckInfo {
|
||||
explicit AckInfo(UnwrappedTSN cumulative_tsn_ack)
|
||||
: highest_tsn_acked(cumulative_tsn_ack) {}
|
||||
|
||||
// All TSNs that have been acked (for the first time) in this SACK.
|
||||
std::vector<TSN> acked_tsns;
|
||||
|
||||
// Bytes acked by increasing cumulative_tsn_ack in this SACK
|
||||
size_t bytes_acked_by_cumulative_tsn_ack = 0;
|
||||
|
||||
// Bytes acked by gap blocks in this SACK.
|
||||
size_t bytes_acked_by_new_gap_ack_blocks = 0;
|
||||
|
||||
// Indicates if this SACK indicates that packet loss has occurred. Just
|
||||
// because a packet is missing in the SACK doesn't necessarily mean that
|
||||
// there is packet loss as that packet might be in-flight and received
|
||||
// out-of-order. But when it has been reported missing consecutive times, it
|
||||
// will eventually be considered "lost" and this will be set.
|
||||
bool has_packet_loss = false;
|
||||
|
||||
// Highest TSN Newly Acknowledged, an SCTP variable.
|
||||
UnwrappedTSN highest_tsn_acked;
|
||||
};
|
||||
|
||||
// Returns how large a chunk will be, serialized, carrying the data
|
||||
size_t GetSerializedChunkSize(const Data& data) const;
|
||||
|
||||
// Indicates if the congestion control algorithm is in "fast recovery".
|
||||
bool is_in_fast_recovery() const {
|
||||
return fast_recovery_exit_tsn_.has_value();
|
||||
}
|
||||
|
||||
// Indicates if the congestion control algorithm is in "fast retransmit".
|
||||
bool is_in_fast_retransmit() const { return is_in_fast_retransmit_; }
|
||||
|
||||
// Indicates if the provided SACK is valid given what has previously been
|
||||
// received. If it returns false, the SACK is most likely a duplicate of
|
||||
// something already seen, so this returning false doesn't necessarily mean
|
||||
// that the SACK is illegal.
|
||||
bool IsSackValid(const SackChunk& sack) const;
|
||||
|
||||
// Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items
|
||||
// in the retransmission queue up until this value and will update `ack_info`
|
||||
// by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`.
|
||||
void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info);
|
||||
|
||||
// Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK
|
||||
// as "acked" and update `ack_info` by adding new TSNs to `added_tsns`.
|
||||
void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack,
|
||||
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
|
||||
AckInfo& ack_info);
|
||||
|
||||
// Mark chunks reported as "missing", as "nacked" or "to be retransmitted"
|
||||
// depending how many times this has happened. Only packets up until
|
||||
// `ack_info.highest_tsn_acked` (highest TSN newly acknowledged) are
|
||||
// nacked/retransmitted. The method will set `ack_info.has_packet_loss`.
|
||||
void NackBetweenAckBlocks(
|
||||
UnwrappedTSN cumulative_tsn_ack,
|
||||
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
|
||||
AckInfo& ack_info);
|
||||
|
||||
// When a SACK chunk is received, this method will be called which _may_ call
|
||||
// into the `RetransmissionTimeout` to update the RTO.
|
||||
void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack);
|
||||
|
||||
// If the congestion control is in "fast recovery mode", this may be exited
|
||||
// now.
|
||||
void MaybeExitFastRecovery(UnwrappedTSN cumulative_tsn_ack);
|
||||
|
||||
// If chunks have been ACKed, stop the retransmission timer.
|
||||
void StopT3RtxTimerOnIncreasedCumulativeTsnAck(
|
||||
UnwrappedTSN cumulative_tsn_ack);
|
||||
|
||||
// Update the congestion control algorithm given as the cumulative ack TSN
|
||||
// value has increased, as reported in an incoming SACK chunk.
|
||||
void HandleIncreasedCumulativeTsnAck(size_t outstanding_bytes,
|
||||
size_t total_bytes_acked);
|
||||
// Update the congestion control algorithm, given as packet loss has been
|
||||
// detected, as reported in an incoming SACK chunk.
|
||||
void HandlePacketLoss(UnwrappedTSN highest_tsn_acked);
|
||||
// Recalculate the number of in-flight payload bytes.
|
||||
void RecalculateOutstandingBytes();
|
||||
// Update the view of the receiver window size.
|
||||
void UpdateReceiverWindow(uint32_t a_rwnd);
|
||||
// Given `max_size` of space left in a packet, which chunks can be added to
|
||||
// it?
|
||||
std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size);
|
||||
// If there is data sent and not ACKED, ensure that the retransmission timer
|
||||
// is running.
|
||||
void StartT3RtxTimerIfOutstandingData();
|
||||
|
||||
// Given the current time `now_ms`, expire chunks that have a limited
|
||||
// lifetime.
|
||||
void ExpireChunks(TimeMs now);
|
||||
// Given that a message fragment, `item` has expired, expire all other
|
||||
// fragments that share the same message - even never-before-sent fragments
|
||||
// that are still in the SendQueue.
|
||||
void ExpireAllFor(const RetransmissionQueue::TxData& item);
|
||||
|
||||
// Returns the current congestion control algorithm phase.
|
||||
CongestionAlgorithmPhase phase() const {
|
||||
return (cwnd_ <= ssthresh_)
|
||||
? CongestionAlgorithmPhase::kSlowStart
|
||||
: CongestionAlgorithmPhase::kCongestionAvoidance;
|
||||
}
|
||||
|
||||
const DcSctpOptions options_;
|
||||
// If the peer supports RFC3758 - SCTP Partial Reliability Extension.
|
||||
const bool partial_reliability_;
|
||||
const std::string log_prefix_;
|
||||
// The size of the data chunk (DATA/I-DATA) header that is used.
|
||||
const size_t data_chunk_header_size_;
|
||||
// Called when a new RTT measurement has been done
|
||||
const std::function<void(DurationMs rtt)> on_new_rtt_;
|
||||
// Called when the send queue is empty.
|
||||
const std::function<void()> on_send_queue_empty_;
|
||||
// Called when a SACK has been seen that cleared the retransmission counter.
|
||||
const std::function<void()> on_clear_retransmission_counter_;
|
||||
// The retransmission counter.
|
||||
Timer& t3_rtx_;
|
||||
// Unwraps TSNs
|
||||
UnwrappedTSN::Unwrapper tsn_unwrapper_;
|
||||
|
||||
// Congestion Window. Number of bytes that may be in-flight (sent, not acked).
|
||||
size_t cwnd_;
|
||||
// Receive Window. Number of bytes available in the receiver's RX buffer.
|
||||
size_t rwnd_;
|
||||
// Slow Start Threshold. See RFC4960.
|
||||
size_t ssthresh_;
|
||||
// Partial Bytes Acked. See RFC4960.
|
||||
size_t partial_bytes_acked_ = 0;
|
||||
// If set, fast recovery is enabled until this TSN has been cumulative
|
||||
// acked.
|
||||
absl::optional<UnwrappedTSN> fast_recovery_exit_tsn_ = absl::nullopt;
|
||||
// Indicates if the congestion algorithm is in fast retransmit.
|
||||
bool is_in_fast_retransmit_ = false;
|
||||
|
||||
// Next TSN to used.
|
||||
UnwrappedTSN next_tsn_;
|
||||
// The last cumulative TSN ack number
|
||||
UnwrappedTSN last_cumulative_tsn_ack_;
|
||||
// The send queue.
|
||||
SendQueue& send_queue_;
|
||||
// All the outstanding data chunks that are in-flight and that have not been
|
||||
// cumulative acked. Note that it also contains chunks that have been acked in
|
||||
// gap ack blocks.
|
||||
std::map<UnwrappedTSN, TxData> outstanding_data_;
|
||||
// The sum of the message bytes of the send_queue_
|
||||
size_t outstanding_bytes_ = 0;
|
||||
};
|
||||
} // namespace dcsctp
|
||||
|
||||
#endif // NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
|
||||
804
net/dcsctp/tx/retransmission_queue_test.cc
Normal file
804
net/dcsctp/tx/retransmission_queue_test.cc
Normal file
@ -0,0 +1,804 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#include "net/dcsctp/tx/retransmission_queue.h"
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/packet/chunk/data_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
|
||||
#include "net/dcsctp/packet/chunk/sack_chunk.h"
|
||||
#include "net/dcsctp/packet/data.h"
|
||||
#include "net/dcsctp/public/dcsctp_options.h"
|
||||
#include "net/dcsctp/testing/data_generator.h"
|
||||
#include "net/dcsctp/timer/fake_timeout.h"
|
||||
#include "net/dcsctp/timer/timer.h"
|
||||
#include "net/dcsctp/tx/mock_send_queue.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
#include "rtc_base/gunit.h"
|
||||
#include "test/gmock.h"
|
||||
|
||||
namespace dcsctp {
|
||||
namespace {
|
||||
using ::testing::MockFunction;
|
||||
using State = ::dcsctp::RetransmissionQueue::State;
|
||||
using ::testing::_;
|
||||
using ::testing::ElementsAre;
|
||||
using ::testing::IsEmpty;
|
||||
using ::testing::NiceMock;
|
||||
using ::testing::Pair;
|
||||
using ::testing::SizeIs;
|
||||
using ::testing::UnorderedElementsAre;
|
||||
|
||||
constexpr uint32_t kArwnd = 100000;
|
||||
constexpr uint32_t kMaxMtu = 1191;
|
||||
|
||||
class RetransmissionQueueTest : public testing::Test {
|
||||
protected:
|
||||
RetransmissionQueueTest()
|
||||
: gen_(MID(42)),
|
||||
timeout_manager_([this]() { return now_; }),
|
||||
timer_manager_([this]() { return timeout_manager_.CreateTimeout(); }),
|
||||
timer_(timer_manager_.CreateTimer(
|
||||
"test/t3_rtx",
|
||||
[]() { return absl::nullopt; },
|
||||
TimerOptions(DurationMs(0)))) {}
|
||||
|
||||
std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk() {
|
||||
return [this](TimeMs now, size_t max_size) {
|
||||
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||
};
|
||||
}
|
||||
|
||||
std::vector<TSN> GetSentPacketTSNs(RetransmissionQueue& queue) {
|
||||
std::vector<TSN> tsns;
|
||||
for (const auto& elem : queue.GetChunksToSend(now_, 10000)) {
|
||||
tsns.push_back(elem.first);
|
||||
}
|
||||
return tsns;
|
||||
}
|
||||
|
||||
RetransmissionQueue CreateQueue(bool supports_partial_reliability = true,
|
||||
bool use_message_interleaving = false) {
|
||||
DcSctpOptions options;
|
||||
options.mtu = kMaxMtu;
|
||||
return RetransmissionQueue(
|
||||
"", TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(),
|
||||
on_outgoing_message_buffer_empty_.AsStdFunction(),
|
||||
on_clear_retransmission_counter_.AsStdFunction(), *timer_, options,
|
||||
supports_partial_reliability, use_message_interleaving);
|
||||
}
|
||||
|
||||
DataGenerator gen_;
|
||||
TimeMs now_ = TimeMs(0);
|
||||
FakeTimeoutManager timeout_manager_;
|
||||
TimerManager timer_manager_;
|
||||
NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_;
|
||||
NiceMock<MockFunction<void()>> on_outgoing_message_buffer_empty_;
|
||||
NiceMock<MockFunction<void()>> on_clear_retransmission_counter_;
|
||||
NiceMock<MockSendQueue> producer_;
|
||||
std::unique_ptr<Timer> timer_;
|
||||
};
|
||||
|
||||
TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, SendOneChunk) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
|
||||
|
||||
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(10), State::kAcked)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue),
|
||||
testing::ElementsAre(TSN(10), TSN(11), TSN(12)));
|
||||
|
||||
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(11), State::kAcked), //
|
||||
Pair(TSN(12), State::kInFlight)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue),
|
||||
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
|
||||
TSN(15), TSN(16), TSN(17)));
|
||||
|
||||
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
|
||||
{SackChunk::GapAckBlock(2, 3),
|
||||
SackChunk::GapAckBlock(5, 5)},
|
||||
{}));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(12), State::kAcked), //
|
||||
Pair(TSN(13), State::kNacked), //
|
||||
Pair(TSN(14), State::kAcked), //
|
||||
Pair(TSN(15), State::kAcked), //
|
||||
Pair(TSN(16), State::kNacked), //
|
||||
Pair(TSN(17), State::kAcked)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue),
|
||||
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
|
||||
TSN(15), TSN(16), TSN(17)));
|
||||
|
||||
// Send more chunks, but leave some as gaps to force retransmission after
|
||||
// three NACKs.
|
||||
|
||||
// Send 18
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18)));
|
||||
|
||||
// Ack 12, 14-15, 17-18
|
||||
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
|
||||
{SackChunk::GapAckBlock(2, 3),
|
||||
SackChunk::GapAckBlock(5, 6)},
|
||||
{}));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(12), State::kAcked), //
|
||||
Pair(TSN(13), State::kNacked), //
|
||||
Pair(TSN(14), State::kAcked), //
|
||||
Pair(TSN(15), State::kAcked), //
|
||||
Pair(TSN(16), State::kNacked), //
|
||||
Pair(TSN(17), State::kAcked), //
|
||||
Pair(TSN(18), State::kAcked)));
|
||||
|
||||
// Send 19
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19)));
|
||||
|
||||
// Ack 12, 14-15, 17-19
|
||||
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
|
||||
{SackChunk::GapAckBlock(2, 3),
|
||||
SackChunk::GapAckBlock(5, 7)},
|
||||
{}));
|
||||
|
||||
// Send 20
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20)));
|
||||
|
||||
// Ack 12, 14-15, 17-20
|
||||
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
|
||||
{SackChunk::GapAckBlock(2, 3),
|
||||
SackChunk::GapAckBlock(5, 8)},
|
||||
{}));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(12), State::kAcked), //
|
||||
Pair(TSN(13), State::kToBeRetransmitted), //
|
||||
Pair(TSN(14), State::kAcked), //
|
||||
Pair(TSN(15), State::kAcked), //
|
||||
Pair(TSN(16), State::kToBeRetransmitted), //
|
||||
Pair(TSN(17), State::kAcked), //
|
||||
Pair(TSN(18), State::kAcked), //
|
||||
Pair(TSN(19), State::kAcked), //
|
||||
Pair(TSN(20), State::kAcked)));
|
||||
|
||||
// This will trigger "fast retransmit" mode and only chunks 13 and 16 will be
|
||||
// resent right now. The send queue will not even be queried.
|
||||
EXPECT_CALL(producer_, Produce).Times(0);
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13), TSN(16)));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(12), State::kAcked), //
|
||||
Pair(TSN(13), State::kInFlight), //
|
||||
Pair(TSN(14), State::kAcked), //
|
||||
Pair(TSN(15), State::kAcked), //
|
||||
Pair(TSN(16), State::kInFlight), //
|
||||
Pair(TSN(17), State::kAcked), //
|
||||
Pair(TSN(18), State::kAcked), //
|
||||
Pair(TSN(19), State::kAcked), //
|
||||
Pair(TSN(20), State::kAcked)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||
})
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight), //
|
||||
Pair(TSN(11), State::kInFlight)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
|
||||
// Will force chunks to be retransmitted
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kToBeRetransmitted)));
|
||||
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kToBeRetransmitted)));
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_rtx =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
|
||||
RetransmissionQueue queue =
|
||||
CreateQueue(/*supports_partial_reliability=*/false);
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
|
||||
// Will force chunks to be retransmitted
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kToBeRetransmitted)));
|
||||
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
|
||||
.Times(0);
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
} // namespace dcsctp
|
||||
|
||||
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
|
||||
// Will force chunks to be retransmitted
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kToBeRetransmitted)));
|
||||
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
|
||||
.Times(1);
|
||||
|
||||
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kAbandoned)));
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_rtx =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_rtx, testing::IsEmpty());
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kAbandoned)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||
dts.max_retransmissions = 3;
|
||||
return dts;
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
|
||||
.Times(0);
|
||||
|
||||
// Retransmission 1
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
|
||||
|
||||
// Retransmission 2
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
|
||||
|
||||
// Retransmission 3
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
|
||||
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
|
||||
|
||||
// Retransmission 4 - not allowed.
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
|
||||
.Times(1);
|
||||
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
|
||||
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kAbandoned)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
static constexpr size_t kCwnd = 1200;
|
||||
queue.set_cwnd(kCwnd);
|
||||
EXPECT_EQ(queue.cwnd(), kCwnd);
|
||||
EXPECT_EQ(queue.outstanding_bytes(), 0u);
|
||||
|
||||
std::vector<uint8_t> payload(1000);
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this, payload](TimeMs, size_t) {
|
||||
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1500);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
|
||||
|
||||
// Will force chunks to be retransmitted
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kToBeRetransmitted)));
|
||||
EXPECT_EQ(queue.outstanding_bytes(), 0u);
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_rtx =
|
||||
queue.GetChunksToSend(now_, 1500);
|
||||
EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight)));
|
||||
EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
// Send and ack first chunk (TSN 10)
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
|
||||
Pair(TSN(12), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight), //
|
||||
Pair(TSN(11), State::kInFlight), //
|
||||
Pair(TSN(12), State::kInFlight)));
|
||||
|
||||
// Chunk 10 is acked, but the remaining are lost
|
||||
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(10), State::kAcked), //
|
||||
Pair(TSN(11), State::kToBeRetransmitted), //
|
||||
Pair(TSN(12), State::kToBeRetransmitted)));
|
||||
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
|
||||
.Times(1);
|
||||
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(10), State::kAcked), //
|
||||
Pair(TSN(11), State::kAbandoned), //
|
||||
Pair(TSN(12), State::kAbandoned)));
|
||||
|
||||
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
|
||||
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
|
||||
EXPECT_THAT(forward_tsn.skipped_streams(),
|
||||
UnorderedElementsAre(
|
||||
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
|
||||
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
DataGeneratorOptions opts;
|
||||
opts.stream_id = StreamID(1);
|
||||
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
DataGeneratorOptions opts;
|
||||
opts.stream_id = StreamID(2);
|
||||
SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
DataGeneratorOptions opts;
|
||||
opts.stream_id = StreamID(3);
|
||||
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
DataGeneratorOptions opts;
|
||||
opts.stream_id = StreamID(4);
|
||||
SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
|
||||
Pair(TSN(12), _), Pair(TSN(13), _)));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight), //
|
||||
Pair(TSN(11), State::kInFlight), //
|
||||
Pair(TSN(12), State::kInFlight), //
|
||||
Pair(TSN(13), State::kInFlight)));
|
||||
|
||||
// Chunk 13 is acked, but the remaining are lost
|
||||
queue.HandleSack(
|
||||
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(4, 4)}, {}));
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kNacked), //
|
||||
Pair(TSN(11), State::kNacked), //
|
||||
Pair(TSN(12), State::kNacked), //
|
||||
Pair(TSN(13), State::kAcked)));
|
||||
|
||||
queue.HandleT3RtxTimerExpiry();
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kToBeRetransmitted), //
|
||||
Pair(TSN(11), State::kToBeRetransmitted), //
|
||||
Pair(TSN(12), State::kToBeRetransmitted), //
|
||||
Pair(TSN(13), State::kAcked)));
|
||||
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
|
||||
.Times(1);
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
|
||||
.Times(1);
|
||||
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
|
||||
.Times(1);
|
||||
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kAbandoned), //
|
||||
Pair(TSN(11), State::kAbandoned), //
|
||||
Pair(TSN(12), State::kAbandoned), //
|
||||
Pair(TSN(13), State::kAcked)));
|
||||
|
||||
IForwardTsnChunk forward_tsn = queue.CreateIForwardTsn();
|
||||
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
|
||||
EXPECT_THAT(
|
||||
forward_tsn.skipped_streams(),
|
||||
UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
|
||||
IsUnordered(false), StreamID(1), MID(42)),
|
||||
IForwardTsnChunk::SkippedStream(
|
||||
IsUnordered(true), StreamID(2), MID(42)),
|
||||
IForwardTsnChunk::SkippedStream(
|
||||
IsUnordered(false), StreamID(3), MID(42))));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, MeasureRTT) {
|
||||
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t) {
|
||||
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
|
||||
dts.max_retransmissions = 0;
|
||||
return dts;
|
||||
})
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1000);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
|
||||
|
||||
now_ = now_ + DurationMs(123);
|
||||
|
||||
EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1);
|
||||
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, ValidateCumTsnAtRest) {
|
||||
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
|
||||
|
||||
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
|
||||
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue),
|
||||
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
|
||||
TSN(15), TSN(16), TSN(17)));
|
||||
|
||||
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(14), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(15), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(16), kArwnd, {}, {})));
|
||||
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(17), kArwnd, {}, {})));
|
||||
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(18), kArwnd, {}, {})));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue),
|
||||
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
|
||||
TSN(15), TSN(16), TSN(17)));
|
||||
|
||||
// Ack 9, 20-25. This is an invalid SACK, but should still be handled.
|
||||
queue.HandleSack(
|
||||
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(11, 16)}, {}));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kInFlight), //
|
||||
Pair(TSN(11), State::kInFlight), //
|
||||
Pair(TSN(12), State::kInFlight), //
|
||||
Pair(TSN(13), State::kInFlight), //
|
||||
Pair(TSN(14), State::kInFlight), //
|
||||
Pair(TSN(15), State::kInFlight), //
|
||||
Pair(TSN(16), State::kInFlight), //
|
||||
Pair(TSN(17), State::kInFlight)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
|
||||
// Nothing produced - nothing in retransmission queue
|
||||
|
||||
// Ack 9, 12-13
|
||||
queue.HandleSack(
|
||||
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(3, 4)}, {}));
|
||||
|
||||
// Gap ack blocks are just ignore.
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillOnce(CreateChunk())
|
||||
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
|
||||
|
||||
EXPECT_THAT(GetSentPacketTSNs(queue),
|
||||
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
|
||||
TSN(15), TSN(16), TSN(17)));
|
||||
|
||||
// Ack 9, 10-14. This is actually an invalid ACK as the first gap can't be
|
||||
// adjacent to the cum-tsn-ack, but it's not strictly forbidden. However, the
|
||||
// cum-tsn-ack should not move, as the gap-ack-blocks are just advisory.
|
||||
queue.HandleSack(
|
||||
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(1, 5)}, {}));
|
||||
|
||||
EXPECT_THAT(queue.GetChunkStatesForTesting(),
|
||||
ElementsAre(Pair(TSN(9), State::kAcked), //
|
||||
Pair(TSN(10), State::kAcked), //
|
||||
Pair(TSN(11), State::kAcked), //
|
||||
Pair(TSN(12), State::kAcked), //
|
||||
Pair(TSN(13), State::kAcked), //
|
||||
Pair(TSN(14), State::kAcked), //
|
||||
Pair(TSN(15), State::kInFlight), //
|
||||
Pair(TSN(16), State::kInFlight), //
|
||||
Pair(TSN(17), State::kInFlight)));
|
||||
}
|
||||
|
||||
TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
|
||||
RetransmissionQueue queue = CreateQueue();
|
||||
|
||||
// See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the
|
||||
// magic numbers in this test.
|
||||
EXPECT_CALL(producer_, Produce)
|
||||
.WillOnce([this](TimeMs, size_t size) {
|
||||
EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize);
|
||||
|
||||
std::vector<uint8_t> payload(183);
|
||||
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
|
||||
})
|
||||
.WillOnce([this](TimeMs, size_t size) {
|
||||
EXPECT_EQ(size, 976 - DataChunk::kHeaderSize);
|
||||
|
||||
std::vector<uint8_t> payload(957);
|
||||
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
|
||||
});
|
||||
|
||||
std::vector<std::pair<TSN, Data>> chunks_to_send =
|
||||
queue.GetChunksToSend(now_, 1188 - 12);
|
||||
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace dcsctp
|
||||
Loading…
x
Reference in New Issue
Block a user