diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index 4f6fb709bb..9191ca46b1 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -67,8 +67,36 @@ rtc_library("retransmission_timeout") { ] } +rtc_library("outstanding_data") { + deps = [ + ":retransmission_timeout", + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + "../common:math", + "../common:sequence_numbers", + "../common:str_join", + "../packet:chunk", + "../packet:data", + "../public:socket", + "../public:types", + "../timer", + ] + sources = [ + "outstanding_data.cc", + "outstanding_data.h", + ] + absl_deps = [ + "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", + ] +} + rtc_library("retransmission_queue") { deps = [ + ":outstanding_data", ":retransmission_timeout", ":send_queue", "../../../api:array_view", @@ -111,6 +139,7 @@ if (rtc_include_tests) { deps = [ ":mock_send_queue", + ":outstanding_data", ":retransmission_error_counter", ":retransmission_queue", ":retransmission_timeout", @@ -123,6 +152,7 @@ if (rtc_include_tests) { "../../../test:test_support", "../common:handover_testing", "../common:math", + "../common:sequence_numbers", "../packet:chunk", "../packet:data", "../public:socket", @@ -133,6 +163,7 @@ if (rtc_include_tests) { ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] sources = [ + "outstanding_data_test.cc", "retransmission_error_counter_test.cc", "retransmission_queue_test.cc", "retransmission_timeout_test.cc", diff --git a/net/dcsctp/tx/outstanding_data.cc b/net/dcsctp/tx/outstanding_data.cc new file mode 100644 index 0000000000..1f3a24f029 --- /dev/null +++ b/net/dcsctp/tx/outstanding_data.cc @@ -0,0 +1,493 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/outstanding_data.h" + +#include +#include +#include +#include + +#include "net/dcsctp/common/math.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +// The number of times a packet must be NACKed before it's retransmitted. +// See https://tools.ietf.org/html/rfc4960#section-7.2.4 +constexpr size_t kNumberOfNacksForRetransmission = 3; + +// Returns how large a chunk will be, serialized, carrying the data +size_t OutstandingData::GetSerializedChunkSize(const Data& data) const { + return RoundUpTo4(data_chunk_header_size_ + data.size()); +} + +void OutstandingData::Item::Ack() { + ack_state_ = AckState::kAcked; + should_be_retransmitted_ = false; +} + +OutstandingData::Item::NackAction OutstandingData::Item::Nack( + bool retransmit_now) { + ack_state_ = AckState::kNacked; + ++nack_count_; + if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) && + !is_abandoned_) { + // Nacked enough times - it's considered lost. + if (!max_retransmissions_.has_value() || + num_retransmissions_ < max_retransmissions_) { + should_be_retransmitted_ = true; + return NackAction::kRetransmit; + } + Abandon(); + return NackAction::kAbandon; + } + return NackAction::kNothing; +} + +void OutstandingData::Item::Retransmit() { + ack_state_ = AckState::kUnacked; + should_be_retransmitted_ = false; + + nack_count_ = 0; + ++num_retransmissions_; +} + +void OutstandingData::Item::Abandon() { + is_abandoned_ = true; + should_be_retransmitted_ = false; +} + +bool OutstandingData::Item::has_expired(TimeMs now) const { + return expires_at_.has_value() && *expires_at_ <= now; +} + +bool OutstandingData::IsConsistent() const { + size_t actual_outstanding_bytes = 0; + size_t actual_outstanding_items = 0; + + std::set actual_to_be_retransmitted; + for (const auto& elem : outstanding_data_) { + if (elem.second.is_outstanding()) { + actual_outstanding_bytes += GetSerializedChunkSize(elem.second.data()); + ++actual_outstanding_items; + } + + if (elem.second.should_be_retransmitted()) { + actual_to_be_retransmitted.insert(elem.first); + } + } + + if (outstanding_data_.empty() && + next_tsn_ != last_cumulative_tsn_ack_.next_value()) { + return false; + } + + return actual_outstanding_bytes == outstanding_bytes_ && + actual_outstanding_items == outstanding_items_ && + actual_to_be_retransmitted == to_be_retransmitted_; +} + +void OutstandingData::AckChunk(AckInfo& ack_info, + std::map::iterator iter) { + if (!iter->second.is_acked()) { + size_t serialized_size = GetSerializedChunkSize(iter->second.data()); + ack_info.bytes_acked += serialized_size; + if (iter->second.is_outstanding()) { + outstanding_bytes_ -= serialized_size; + --outstanding_items_; + } + if (iter->second.should_be_retransmitted()) { + to_be_retransmitted_.erase(iter->first); + } + iter->second.Ack(); + ack_info.highest_tsn_acked = + std::max(ack_info.highest_tsn_acked, iter->first); + } +} + +OutstandingData::AckInfo OutstandingData::HandleSack( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + bool is_in_fast_retransmit) { + OutstandingData::AckInfo ack_info(cumulative_tsn_ack); + // Erase all items up to cumulative_tsn_ack. + RemoveAcked(cumulative_tsn_ack, ack_info); + + // ACK packets reported in the gap ack blocks + AckGapBlocks(cumulative_tsn_ack, gap_ack_blocks, ack_info); + + // NACK and possibly mark for retransmit chunks that weren't acked. + NackBetweenAckBlocks(cumulative_tsn_ack, gap_ack_blocks, + is_in_fast_retransmit, ack_info); + + RTC_DCHECK(IsConsistent()); + return ack_info; +} + +void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, + AckInfo& ack_info) { + auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); + + for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) { + AckChunk(ack_info, iter); + } + + outstanding_data_.erase(outstanding_data_.begin(), first_unacked); + last_cumulative_tsn_ack_ = cumulative_tsn_ack; +} + +void OutstandingData::AckGapBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + AckInfo& ack_info) { + // Mark all non-gaps as ACKED (but they can't be removed) as (from RFC) + // "SCTP considers the information carried in the Gap Ack Blocks in the + // SACK chunk as advisory.". Note that when NR-SACK is supported, this can be + // handled differently. + + for (auto& block : gap_ack_blocks) { + auto start = outstanding_data_.lower_bound( + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); + auto end = outstanding_data_.upper_bound( + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); + for (auto iter = start; iter != end; ++iter) { + AckChunk(ack_info, iter); + } + } +} + +void OutstandingData::NackBetweenAckBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + bool is_in_fast_recovery, + OutstandingData::AckInfo& ack_info) { + // Mark everything between the blocks as NACKED/TO_BE_RETRANSMITTED. + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "Mark the DATA chunk(s) with three miss indications for retransmission." + // "For each incoming SACK, miss indications are incremented only for + // missing TSNs prior to the highest TSN newly acknowledged in the SACK." + // + // What this means is that only when there is a increasing stream of data + // received and there are new packets seen (since last time), packets that are + // in-flight and between gaps should be nacked. This means that SCTP relies on + // the T3-RTX-timer to re-send packets otherwise. + UnwrappedTSN max_tsn_to_nack = ack_info.highest_tsn_acked; + if (is_in_fast_recovery && cumulative_tsn_ack > last_cumulative_tsn_ack_) { + // https://tools.ietf.org/html/rfc4960#section-7.2.4 + // "If an endpoint is in Fast Recovery and a SACK arrives that advances + // the Cumulative TSN Ack Point, the miss indications are incremented for + // all TSNs reported missing in the SACK." + max_tsn_to_nack = UnwrappedTSN::AddTo( + cumulative_tsn_ack, + gap_ack_blocks.empty() ? 0 : gap_ack_blocks.rbegin()->end); + } + + UnwrappedTSN prev_block_last_acked = cumulative_tsn_ack; + for (auto& block : gap_ack_blocks) { + UnwrappedTSN cur_block_first_acked = + UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); + for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); + iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { + if (iter->first <= max_tsn_to_nack) { + ack_info.has_packet_loss = + NackItem(iter->first, iter->second, /*retransmit_now=*/false); + } + } + prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); + } + + // Note that packets are not NACKED which are above the highest gap-ack-block + // (or above the cumulative ack TSN if no gap-ack-blocks) as only packets + // up until the highest_tsn_acked (see above) should be considered when + // NACKing. +} + +bool OutstandingData::NackItem(UnwrappedTSN tsn, + Item& item, + bool retransmit_now) { + if (item.is_outstanding()) { + outstanding_bytes_ -= GetSerializedChunkSize(item.data()); + --outstanding_items_; + } + + switch (item.Nack(retransmit_now)) { + case Item::NackAction::kNothing: + return false; + case Item::NackAction::kRetransmit: + to_be_retransmitted_.insert(tsn); + RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " marked for retransmission"; + break; + case Item::NackAction::kAbandon: + AbandonAllFor(item); + break; + } + return true; +} + +void OutstandingData::AbandonAllFor(const Item& item) { + // Erase all remaining chunks from the producer, if any. + if (discard_from_send_queue_(item.data().is_unordered, item.data().stream_id, + item.data().message_id)) { + // There were remaining chunks to be produced for this message. Since the + // receiver may have already received all chunks (up till now) for this + // message, we can't just FORWARD-TSN to the last fragment in this + // (abandoned) message and start sending a new message, as the receiver will + // then see a new message before the end of the previous one was seen (or + // skipped over). So create a new fragment, representing the end, that the + // received will never see as it is abandoned immediately and used as cum + // TSN in the sent FORWARD-TSN. + UnwrappedTSN tsn = next_tsn_; + next_tsn_.Increment(); + Data message_end(item.data().stream_id, item.data().ssn, + item.data().message_id, item.data().fsn, item.data().ppid, + std::vector(), Data::IsBeginning(false), + Data::IsEnd(true), item.data().is_unordered); + Item& added_item = + outstanding_data_ + .emplace(tsn, Item(std::move(message_end), absl::nullopt, TimeMs(0), + absl::nullopt)) + .first->second; + // The added chunk shouldn't be included in `outstanding_bytes`, so set it + // as acked. + added_item.Ack(); + RTC_DLOG(LS_VERBOSE) << "Adding unsent end placeholder for message at tsn=" + << *tsn.Wrap(); + } + + for (auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + Item& other = elem.second; + + if (!other.is_abandoned() && + other.data().stream_id == item.data().stream_id && + other.data().is_unordered == item.data().is_unordered && + other.data().message_id == item.data().message_id) { + RTC_DLOG(LS_VERBOSE) << "Marking chunk " << *tsn.Wrap() + << " as abandoned"; + if (other.should_be_retransmitted()) { + to_be_retransmitted_.erase(tsn); + } + other.Abandon(); + } + } +} + +std::vector> OutstandingData::GetChunksToBeRetransmitted( + size_t max_size) { + std::vector> result; + + for (auto it = to_be_retransmitted_.begin(); + it != to_be_retransmitted_.end();) { + UnwrappedTSN tsn = *it; + auto elem = outstanding_data_.find(tsn); + RTC_DCHECK(elem != outstanding_data_.end()); + Item& item = elem->second; + RTC_DCHECK(item.should_be_retransmitted()); + RTC_DCHECK(!item.is_outstanding()); + RTC_DCHECK(!item.is_abandoned()); + RTC_DCHECK(!item.is_acked()); + + size_t serialized_size = GetSerializedChunkSize(item.data()); + if (serialized_size <= max_size) { + item.Retransmit(); + result.emplace_back(tsn.Wrap(), item.data().Clone()); + max_size -= serialized_size; + outstanding_bytes_ += serialized_size; + ++outstanding_items_; + it = to_be_retransmitted_.erase(it); + } else { + ++it; + } + // No point in continuing if the packet is full. + if (max_size <= data_chunk_header_size_) { + break; + } + } + + RTC_DCHECK(IsConsistent()); + return result; +} + +void OutstandingData::ExpireOutstandingChunks(TimeMs now) { + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const Item& item = elem.second; + + // Chunks that are nacked can be expired. Care should be taken not to expire + // unacked (in-flight) chunks as they might have been received, but the SACK + // is either delayed or in-flight and may be received later. + if (item.is_abandoned()) { + // Already abandoned. + } else if (item.is_nacked() && item.has_expired(now)) { + RTC_DLOG(LS_VERBOSE) << "Marking nacked chunk " << *tsn.Wrap() + << " and message " << *item.data().message_id + << " as expired"; + AbandonAllFor(item); + } else { + // A non-expired chunk. No need to iterate any further. + break; + } + } + RTC_DCHECK(IsConsistent()); +} + +UnwrappedTSN OutstandingData::highest_outstanding_tsn() const { + return outstanding_data_.empty() ? last_cumulative_tsn_ack_ + : outstanding_data_.rbegin()->first; +} + +absl::optional OutstandingData::Insert( + const Data& data, + absl::optional max_retransmissions, + TimeMs time_sent, + absl::optional expires_at) { + UnwrappedTSN tsn = next_tsn_; + next_tsn_.Increment(); + + // All chunks are always padded to be even divisible by 4. + size_t chunk_size = GetSerializedChunkSize(data); + outstanding_bytes_ += chunk_size; + ++outstanding_items_; + auto it = outstanding_data_ + .emplace(tsn, Item(data.Clone(), max_retransmissions, time_sent, + expires_at)) + .first; + + if (it->second.has_expired(time_sent)) { + // No need to send it - it was expired when it was in the send + // queue. + RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " + << *it->first.Wrap() << " and message " + << *it->second.data().message_id << " as expired"; + AbandonAllFor(it->second); + RTC_DCHECK(IsConsistent()); + return absl::nullopt; + } + + RTC_DCHECK(IsConsistent()); + return tsn; +} + +void OutstandingData::NackAll() { + for (auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + Item& item = elem.second; + if (!item.is_acked()) { + NackItem(tsn, item, /*retransmit_now=*/true); + } + } + RTC_DCHECK(IsConsistent()); +} + +absl::optional OutstandingData::MeasureRTT(TimeMs now, + UnwrappedTSN tsn) const { + auto it = outstanding_data_.find(tsn); + if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is ambiguous + // whether the reply was for the first instance of the chunk or for a + // later instance)" + return now - it->second.time_sent(); + } + return absl::nullopt; +} + +std::vector> +OutstandingData::GetChunkStatesForTesting() const { + std::vector> states; + states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); + for (const auto& elem : outstanding_data_) { + State state; + if (elem.second.is_abandoned()) { + state = State::kAbandoned; + } else if (elem.second.should_be_retransmitted()) { + state = State::kToBeRetransmitted; + } else if (elem.second.is_acked()) { + state = State::kAcked; + } else if (elem.second.is_outstanding()) { + state = State::kInFlight; + } else { + state = State::kNacked; + } + + states.emplace_back(elem.first.Wrap(), state); + } + return states; +} + +bool OutstandingData::ShouldSendForwardTsn() const { + if (!outstanding_data_.empty()) { + auto it = outstanding_data_.begin(); + return it->first == last_cumulative_tsn_ack_.next_value() && + it->second.is_abandoned(); + } + return false; +} + +ForwardTsnChunk OutstandingData::CreateForwardTsn() const { + std::map skipped_per_ordered_stream; + UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; + + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const Item& item = elem.second; + + if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { + break; + } + new_cumulative_ack = tsn; + if (!item.data().is_unordered && + item.data().ssn > skipped_per_ordered_stream[item.data().stream_id]) { + skipped_per_ordered_stream[item.data().stream_id] = item.data().ssn; + } + } + + std::vector skipped_streams; + skipped_streams.reserve(skipped_per_ordered_stream.size()); + for (const auto& elem : skipped_per_ordered_stream) { + skipped_streams.emplace_back(elem.first, elem.second); + } + return ForwardTsnChunk(new_cumulative_ack.Wrap(), std::move(skipped_streams)); +} + +IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { + std::map, MID> skipped_per_stream; + UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; + + for (const auto& elem : outstanding_data_) { + UnwrappedTSN tsn = elem.first; + const Item& item = elem.second; + + if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { + break; + } + new_cumulative_ack = tsn; + std::pair stream_id = + std::make_pair(item.data().is_unordered, item.data().stream_id); + + if (item.data().message_id > skipped_per_stream[stream_id]) { + skipped_per_stream[stream_id] = item.data().message_id; + } + } + + std::vector skipped_streams; + skipped_streams.reserve(skipped_per_stream.size()); + for (const auto& elem : skipped_per_stream) { + const std::pair& stream = elem.first; + MID message_id = elem.second; + skipped_streams.emplace_back(stream.first, stream.second, message_id); + } + + return IForwardTsnChunk(new_cumulative_ack.Wrap(), + std::move(skipped_streams)); +} + +} // namespace dcsctp diff --git a/net/dcsctp/tx/outstanding_data.h b/net/dcsctp/tx/outstanding_data.h new file mode 100644 index 0000000000..b9394f8995 --- /dev/null +++ b/net/dcsctp/tx/outstanding_data.h @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_OUTSTANDING_DATA_H_ +#define NET_DCSCTP_TX_OUTSTANDING_DATA_H_ + +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/data.h" + +namespace dcsctp { + +// This class keeps track of outstanding data chunks (sent, not yet acked) and +// handles acking, nacking, rescheduling and abandoning. +class OutstandingData { + public: + // State for DATA chunks (message fragments) in the queue - used in tests. + enum class State { + // The chunk has been sent but not received yet (from the sender's point of + // view, as no SACK has been received yet that reference this chunk). + kInFlight, + // A SACK has been received which explicitly marked this chunk as missing - + // it's now NACKED and may be retransmitted if NACKED enough times. + kNacked, + // A chunk that will be retransmitted when possible. + kToBeRetransmitted, + // A SACK has been received which explicitly marked this chunk as received. + kAcked, + // A chunk whose message has expired or has been retransmitted too many + // times (RFC3758). It will not be retransmitted anymore. + kAbandoned, + }; + + // Contains variables scoped to a processing of an incoming SACK. + struct AckInfo { + explicit AckInfo(UnwrappedTSN cumulative_tsn_ack) + : highest_tsn_acked(cumulative_tsn_ack) {} + + // Bytes acked by increasing cumulative_tsn_ack and gap_ack_blocks. + size_t bytes_acked = 0; + + // Indicates if this SACK indicates that packet loss has occurred. Just + // because a packet is missing in the SACK doesn't necessarily mean that + // there is packet loss as that packet might be in-flight and received + // out-of-order. But when it has been reported missing consecutive times, it + // will eventually be considered "lost" and this will be set. + bool has_packet_loss = false; + + // Highest TSN Newly Acknowledged, an SCTP variable. + UnwrappedTSN highest_tsn_acked; + }; + + OutstandingData( + size_t data_chunk_header_size, + UnwrappedTSN next_tsn, + UnwrappedTSN last_cumulative_tsn_ack, + std::function discard_from_send_queue) + : data_chunk_header_size_(data_chunk_header_size), + next_tsn_(next_tsn), + last_cumulative_tsn_ack_(last_cumulative_tsn_ack), + discard_from_send_queue_(std::move(discard_from_send_queue)) {} + + AckInfo HandleSack( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + bool is_in_fast_retransmit); + + // Given `max_size` of space left in a packet, which chunks can be added to + // it? + std::vector> GetChunksToBeRetransmitted(size_t max_size); + + size_t outstanding_bytes() const { return outstanding_bytes_; } + + // Returns the number of DATA chunks that are in-flight. + size_t outstanding_items() const { return outstanding_items_; } + + // Given the current time `now_ms`, expire and abandon outstanding (sent at + // least once) chunks that have a limited lifetime. + void ExpireOutstandingChunks(TimeMs now); + + bool empty() const { return outstanding_data_.empty(); } + + bool has_data_to_be_retransmitted() const { + return !to_be_retransmitted_.empty(); + } + + UnwrappedTSN last_cumulative_tsn_ack() const { + return last_cumulative_tsn_ack_; + } + + UnwrappedTSN next_tsn() const { return next_tsn_; } + + UnwrappedTSN highest_outstanding_tsn() const; + + // Schedules `data` to be sent, with the provided partial reliability + // parameters. Returns the TSN if the item was actually added and scheduled to + // be sent, and absl::nullopt if it shouldn't be sent. + absl::optional Insert( + const Data& data, + absl::optional max_retransmissions, + TimeMs time_sent, + absl::optional expires_at); + + // Nacks all outstanding data. + void NackAll(); + + // Creates a FORWARD-TSN chunk. + ForwardTsnChunk CreateForwardTsn() const; + + // Creates an I-FORWARD-TSN chunk. + IForwardTsnChunk CreateIForwardTsn() const; + + // Given the current time and a TSN, it returns the measured RTT between when + // the chunk was sent and now. It takes into acccount Karn's algorithm, so if + // the chunk has ever been retransmitted, it will return absl::nullopt. + absl::optional MeasureRTT(TimeMs now, UnwrappedTSN tsn) const; + + // Returns the internal state of all queued chunks. This is only used in + // unit-tests. + std::vector> GetChunkStatesForTesting() const; + + // Returns true if the next chunk that is not acked by the peer has been + // abandoned, which means that a FORWARD-TSN should be sent. + bool ShouldSendForwardTsn() const; + + private: + // A fragmented message's DATA chunk while in the retransmission queue, and + // its associated metadata. + class Item { + public: + enum class NackAction { + kNothing, + kRetransmit, + kAbandon, + }; + + explicit Item(Data data, + absl::optional max_retransmissions, + TimeMs time_sent, + absl::optional expires_at) + : max_retransmissions_(max_retransmissions), + time_sent_(time_sent), + expires_at_(expires_at), + data_(std::move(data)) {} + + TimeMs time_sent() const { return time_sent_; } + + const Data& data() const { return data_; } + + // Acks an item. + void Ack(); + + // Nacks an item. If it has been nacked enough times, or if `retransmit_now` + // is set, it might be marked for retransmission. If the item has reached + // its max retransmission value, it will instead be abandoned. The action + // performed is indicated as return value. + NackAction Nack(bool retransmit_now = false); + + // Prepares the item to be retransmitted. Sets it as outstanding and + // clears all nack counters. + void Retransmit(); + + // Marks this item as abandoned. + void Abandon(); + + bool is_outstanding() const { return ack_state_ == AckState::kUnacked; } + bool is_acked() const { return ack_state_ == AckState::kAcked; } + bool is_nacked() const { return ack_state_ == AckState::kNacked; } + bool is_abandoned() const { return is_abandoned_; } + + // Indicates if this chunk should be retransmitted. + bool should_be_retransmitted() const { return should_be_retransmitted_; } + // Indicates if this chunk has ever been retransmitted. + bool has_been_retransmitted() const { return num_retransmissions_ > 0; } + + // Given the current time, and the current state of this DATA chunk, it will + // indicate if it has expired (SCTP Partial Reliability Extension). + bool has_expired(TimeMs now) const; + + private: + enum class AckState { + kUnacked, + kAcked, + kNacked, + }; + // Indicates the presence of this chunk, if it's in flight (Unacked), has + // been received (Acked) or is lost (Nacked). + AckState ack_state_ = AckState::kUnacked; + // Indicates if this chunk has been abandoned, which is a terminal state. + bool is_abandoned_ = false; + // Indicates if this chunk should be retransmitted. + bool should_be_retransmitted_ = false; + + // The number of times the DATA chunk has been nacked (by having received a + // SACK which doesn't include it). Will be cleared on retransmissions. + size_t nack_count_ = 0; + // The number of times the DATA chunk has been retransmitted. + size_t num_retransmissions_ = 0; + // If the message was sent with a maximum number of retransmissions, this is + // set to that number. The value zero (0) means that it will never be + // retransmitted. + const absl::optional max_retransmissions_; + // When the packet was sent, and placed in this queue. + const TimeMs time_sent_; + // If the message was sent with an expiration time, this is set. At this + // exact millisecond, the item is considered expired. + const absl::optional expires_at_; + // The actual data to send/retransmit. + Data data_; + }; + + // Returns how large a chunk will be, serialized, carrying the data + size_t GetSerializedChunkSize(const Data& data) const; + + // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items + // in the retransmission queue up until this value and will update `ack_info` + // by setting `bytes_acked_by_cumulative_tsn_ack`. + void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info); + + // Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK + // as "acked" and update `ack_info` by adding new TSNs to `added_tsns`. + void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + AckInfo& ack_info); + + // Mark chunks reported as "missing", as "nacked" or "to be retransmitted" + // depending how many times this has happened. Only packets up until + // `ack_info.highest_tsn_acked` (highest TSN newly acknowledged) are + // nacked/retransmitted. The method will set `ack_info.has_packet_loss`. + void NackBetweenAckBlocks( + UnwrappedTSN cumulative_tsn_ack, + rtc::ArrayView gap_ack_blocks, + bool is_in_fast_recovery, + OutstandingData::AckInfo& ack_info); + + // Acks the chunk referenced by `iter` and updates state in `ack_info` and the + // object's state. + void AckChunk(AckInfo& ack_info, std::map::iterator iter); + + // Helper method to nack an item and perform the correct operations given the + // action indicated when nacking an item (e.g. retransmitting or abandoning). + // The return value indicate if an action was performed, meaning that packet + // loss was detected and acted upon. + bool NackItem(UnwrappedTSN tsn, Item& item, bool retransmit_now); + + // Given that a message fragment, `item` has been abandoned, abandon all other + // fragments that share the same message - both never-before-sent fragments + // that are still in the SendQueue and outstanding chunks. + void AbandonAllFor(const OutstandingData::Item& item); + + bool IsConsistent() const; + + // The size of the data chunk (DATA/I-DATA) header that is used. + const size_t data_chunk_header_size_; + // Next TSN to used. + UnwrappedTSN next_tsn_; + // The last cumulative TSN ack number. + UnwrappedTSN last_cumulative_tsn_ack_; + // Callback when to discard items from the send queue. + std::function discard_from_send_queue_; + + std::map outstanding_data_; + // The number of bytes that are in-flight (sent but not yet acked or nacked). + size_t outstanding_bytes_ = 0; + // The number of DATA chunks that are in-flight (sent but not yet acked or + // nacked). + size_t outstanding_items_ = 0; + // Data chunks that are to be retransmitted. + std::set to_be_retransmitted_; +}; +} // namespace dcsctp +#endif // NET_DCSCTP_TX_OUTSTANDING_DATA_H_ diff --git a/net/dcsctp/tx/outstanding_data_test.cc b/net/dcsctp/tx/outstanding_data_test.cc new file mode 100644 index 0000000000..c9117842fe --- /dev/null +++ b/net/dcsctp/tx/outstanding_data_test.cc @@ -0,0 +1,370 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/outstanding_data.h" + +#include + +#include "absl/types/optional.h" +#include "net/dcsctp/common/math.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/testing/data_generator.h" +#include "net/dcsctp/testing/testing_macros.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::MockFunction; +using State = ::dcsctp::OutstandingData::State; +using ::testing::_; +using ::testing::ElementsAre; +using ::testing::Pair; +using ::testing::Return; +using ::testing::StrictMock; + +constexpr TimeMs kNow(42); + +class OutstandingDataTest : public testing::Test { + protected: + OutstandingDataTest() + : gen_(MID(42)), + buf_(DataChunk::kHeaderSize, + unwrapper_.Unwrap(TSN(10)), + unwrapper_.Unwrap(TSN(9)), + on_discard_.AsStdFunction()) {} + + UnwrappedTSN::Unwrapper unwrapper_; + DataGenerator gen_; + StrictMock> on_discard_; + OutstandingData buf_; +}; + +TEST_F(OutstandingDataTest, HasInitialState) { + EXPECT_TRUE(buf_.empty()); + EXPECT_EQ(buf_.outstanding_bytes(), 0u); + EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(10)); + EXPECT_EQ(buf_.highest_outstanding_tsn().Wrap(), TSN(9)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked))); + EXPECT_FALSE(buf_.ShouldSendForwardTsn()); +} + +TEST_F(OutstandingDataTest, InsertChunk) { + ASSERT_HAS_VALUE_AND_ASSIGN( + UnwrappedTSN tsn, + buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt)); + + EXPECT_EQ(tsn.Wrap(), TSN(10)); + + EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(buf_.outstanding_items(), 1u); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); + EXPECT_EQ(buf_.highest_outstanding_tsn().Wrap(), TSN(10)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), + Pair(TSN(10), State::kInFlight))); +} + +TEST_F(OutstandingDataTest, AcksSingleChunk) { + buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt); + OutstandingData::AckInfo ack = + buf_.HandleSack(unwrapper_.Unwrap(TSN(10)), {}, false); + + EXPECT_EQ(ack.bytes_acked, DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(10)); + EXPECT_FALSE(ack.has_packet_loss); + + EXPECT_EQ(buf_.outstanding_bytes(), 0u); + EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(10)); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); + EXPECT_EQ(buf_.highest_outstanding_tsn().Wrap(), TSN(10)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked))); +} + +TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) { + buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt); + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false); + + EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(buf_.outstanding_items(), 1u); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); + EXPECT_EQ(buf_.highest_outstanding_tsn().Wrap(), TSN(10)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), + Pair(TSN(10), State::kInFlight))); +} + +TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) { + buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt); + + std::vector gab = {SackChunk::GapAckBlock(2, 2)}; + OutstandingData::AckInfo ack = + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab, false); + EXPECT_EQ(ack.bytes_acked, DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(11)); + EXPECT_FALSE(ack.has_packet_loss); + + EXPECT_EQ(buf_.outstanding_bytes(), 0u); + EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(12)); + EXPECT_EQ(buf_.highest_outstanding_tsn().Wrap(), TSN(11)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kNacked), // + Pair(TSN(11), State::kAcked))); +} + +TEST_F(OutstandingDataTest, NacksThreeTimesWithSameTsnDoesntRetransmit) { + buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt); + + std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kNacked), // + Pair(TSN(11), State::kAcked))); +} + +TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) { + buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt); + + std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + std::vector gab2 = {SackChunk::GapAckBlock(2, 3)}; + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + std::vector gab3 = {SackChunk::GapAckBlock(2, 4)}; + OutstandingData::AckInfo ack = + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false); + EXPECT_EQ(ack.bytes_acked, DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(13)); + EXPECT_TRUE(ack.has_packet_loss); + + EXPECT_TRUE(buf_.has_data_to_be_retransmitted()); + + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kToBeRetransmitted), // + Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kAcked))); + + EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000), + ElementsAre(Pair(TSN(10), _))); +} + +TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) { + static constexpr uint16_t kMaxRetransmissions = 0; + buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "E"), kMaxRetransmissions, kNow, absl::nullopt); + + std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + std::vector gab2 = {SackChunk::GapAckBlock(2, 3)}; + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(false)); + std::vector gab3 = {SackChunk::GapAckBlock(2, 4)}; + OutstandingData::AckInfo ack = + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false); + EXPECT_EQ(ack.bytes_acked, DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(13)); + EXPECT_TRUE(ack.has_packet_loss); + + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(14)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned), // + Pair(TSN(13), State::kAbandoned))); +} + +TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) { + static constexpr uint16_t kMaxRetransmissions = 0; + buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt); + + std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + std::vector gab2 = {SackChunk::GapAckBlock(2, 3)}; + EXPECT_FALSE( + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss); + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + + EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(true)); + std::vector gab3 = {SackChunk::GapAckBlock(2, 4)}; + OutstandingData::AckInfo ack = + buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false); + EXPECT_EQ(ack.bytes_acked, DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(13)); + EXPECT_TRUE(ack.has_packet_loss); + + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(15)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned), // + Pair(TSN(13), State::kAbandoned), // + Pair(TSN(14), State::kAbandoned))); +} + +TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) { + static constexpr TimeMs kExpiresAt = kNow + DurationMs(1); + EXPECT_TRUE( + buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, kExpiresAt) + .has_value()); + EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, + kNow + DurationMs(0), kExpiresAt) + .has_value()); + + EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(false)); + EXPECT_FALSE(buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, + kNow + DurationMs(1), kExpiresAt) + .has_value()); + + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); + EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(13)); + EXPECT_EQ(buf_.highest_outstanding_tsn().Wrap(), TSN(12)); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), + Pair(TSN(12), State::kAbandoned))); +} + +TEST_F(OutstandingDataTest, CanGenerateForwardTsn) { + static constexpr uint16_t kMaxRetransmissions = 0; + buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "E"), kMaxRetransmissions, kNow, absl::nullopt); + + EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(false)); + buf_.NackAll(); + + EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kAbandoned), // + Pair(TSN(11), State::kAbandoned), + Pair(TSN(12), State::kAbandoned))); + + EXPECT_TRUE(buf_.ShouldSendForwardTsn()); + ForwardTsnChunk chunk = buf_.CreateForwardTsn(); + EXPECT_EQ(chunk.new_cumulative_tsn(), TSN(12)); +} + +TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) { + buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt); + + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + testing::ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight), // + Pair(TSN(13), State::kInFlight), // + Pair(TSN(14), State::kInFlight), // + Pair(TSN(15), State::kInFlight), // + Pair(TSN(16), State::kInFlight), // + Pair(TSN(17), State::kInFlight))); + + std::vector gab = {SackChunk::GapAckBlock(2, 3), + SackChunk::GapAckBlock(5, 5)}; + buf_.HandleSack(unwrapper_.Unwrap(TSN(12)), gab, false); + + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(12), State::kAcked), // + Pair(TSN(13), State::kNacked), // + Pair(TSN(14), State::kAcked), // + Pair(TSN(15), State::kAcked), // + Pair(TSN(16), State::kNacked), // + Pair(TSN(17), State::kAcked))); +} + +TEST_F(OutstandingDataTest, MeasureRTT) { + buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow + DurationMs(1), + absl::nullopt); + buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow + DurationMs(2), + absl::nullopt); + + static constexpr DurationMs kDuration(123); + ASSERT_HAS_VALUE_AND_ASSIGN( + DurationMs duration, + buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11)))); + + EXPECT_EQ(duration, kDuration - DurationMs(1)); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index 8e25ec80a2..85399f24a7 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -36,6 +36,7 @@ #include "net/dcsctp/public/dcsctp_options.h" #include "net/dcsctp/public/types.h" #include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/outstanding_data.h" #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" @@ -44,10 +45,6 @@ namespace dcsctp { namespace { -// The number of times a packet must be NACKed before it's retransmitted. -// See https://tools.ietf.org/html/rfc4960#section-7.2.4 -constexpr size_t kNumberOfNacksForRetransmission = 3; - // Allow sending only slightly less than an MTU, to account for headers. constexpr float kMinBytesRequiredToSendFactor = 0.9; } // namespace @@ -85,37 +82,21 @@ RetransmissionQueue::RetransmissionQueue( ssthresh_(handover_state ? handover_state->tx.ssthresh : rwnd_), partial_bytes_acked_( handover_state ? handover_state->tx.partial_bytes_acked : 0), - next_tsn_(tsn_unwrapper_.Unwrap( - handover_state ? TSN(handover_state->tx.next_tsn) : my_initial_tsn)), - last_cumulative_tsn_ack_(tsn_unwrapper_.Unwrap( - handover_state ? TSN(handover_state->tx.next_tsn - 1) - : TSN(*my_initial_tsn - 1))), - send_queue_(send_queue) {} + send_queue_(send_queue), + outstanding_data_( + data_chunk_header_size_, + tsn_unwrapper_.Unwrap(handover_state + ? TSN(handover_state->tx.next_tsn) + : my_initial_tsn), + tsn_unwrapper_.Unwrap(handover_state + ? TSN(handover_state->tx.next_tsn - 1) + : TSN(*my_initial_tsn - 1)), + [this](IsUnordered unordered, StreamID stream_id, MID message_id) { + return send_queue_.Discard(unordered, stream_id, message_id); + }) {} bool RetransmissionQueue::IsConsistent() const { - size_t actual_outstanding_bytes = 0; - size_t actual_outstanding_items = 0; - - std::set actual_to_be_retransmitted; - for (const auto& elem : outstanding_data_) { - if (elem.second.is_outstanding()) { - actual_outstanding_bytes += GetSerializedChunkSize(elem.second.data()); - ++actual_outstanding_items; - } - - if (elem.second.should_be_retransmitted()) { - actual_to_be_retransmitted.insert(elem.first); - } - } - - if (outstanding_data_.empty() && - next_tsn_ != last_cumulative_tsn_ack_.next_value()) { - return false; - } - - return actual_outstanding_bytes == outstanding_bytes_ && - actual_outstanding_items == outstanding_items_ && - actual_to_be_retransmitted == to_be_retransmitted_; + return true; } // Returns how large a chunk will be, serialized, carrying the data @@ -123,101 +104,6 @@ size_t RetransmissionQueue::GetSerializedChunkSize(const Data& data) const { return RoundUpTo4(data_chunk_header_size_ + data.size()); } -void RetransmissionQueue::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, - AckInfo& ack_info) { - auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); - - for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) { - AckChunk(ack_info, iter); - } - - outstanding_data_.erase(outstanding_data_.begin(), first_unacked); -} - -void RetransmissionQueue::AckGapBlocks( - UnwrappedTSN cumulative_tsn_ack, - rtc::ArrayView gap_ack_blocks, - AckInfo& ack_info) { - // Mark all non-gaps as ACKED (but they can't be removed) as (from RFC) - // "SCTP considers the information carried in the Gap Ack Blocks in the - // SACK chunk as advisory.". Note that when NR-SACK is supported, this can be - // handled differently. - - for (auto& block : gap_ack_blocks) { - auto start = outstanding_data_.lower_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); - auto end = outstanding_data_.upper_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); - for (auto iter = start; iter != end; ++iter) { - AckChunk(ack_info, iter); - } - } -} - -void RetransmissionQueue::AckChunk( - AckInfo& ack_info, - std::map::iterator iter) { - if (!iter->second.is_acked()) { - size_t serialized_size = GetSerializedChunkSize(iter->second.data()); - ack_info.bytes_acked += serialized_size; - if (iter->second.is_outstanding()) { - outstanding_bytes_ -= serialized_size; - --outstanding_items_; - } - if (iter->second.should_be_retransmitted()) { - to_be_retransmitted_.erase(iter->first); - } - iter->second.Ack(); - ack_info.highest_tsn_acked = - std::max(ack_info.highest_tsn_acked, iter->first); - } -} - -void RetransmissionQueue::NackBetweenAckBlocks( - UnwrappedTSN cumulative_tsn_ack, - rtc::ArrayView gap_ack_blocks, - AckInfo& ack_info) { - // Mark everything between the blocks as NACKED/TO_BE_RETRANSMITTED. - // https://tools.ietf.org/html/rfc4960#section-7.2.4 - // "Mark the DATA chunk(s) with three miss indications for retransmission." - // "For each incoming SACK, miss indications are incremented only for - // missing TSNs prior to the highest TSN newly acknowledged in the SACK." - // - // What this means is that only when there is a increasing stream of data - // received and there are new packets seen (since last time), packets that are - // in-flight and between gaps should be nacked. This means that SCTP relies on - // the T3-RTX-timer to re-send packets otherwise. - UnwrappedTSN max_tsn_to_nack = ack_info.highest_tsn_acked; - if (is_in_fast_recovery() && cumulative_tsn_ack > last_cumulative_tsn_ack_) { - // https://tools.ietf.org/html/rfc4960#section-7.2.4 - // "If an endpoint is in Fast Recovery and a SACK arrives that advances - // the Cumulative TSN Ack Point, the miss indications are incremented for - // all TSNs reported missing in the SACK." - max_tsn_to_nack = UnwrappedTSN::AddTo( - cumulative_tsn_ack, - gap_ack_blocks.empty() ? 0 : gap_ack_blocks.rbegin()->end); - } - - UnwrappedTSN prev_block_last_acked = cumulative_tsn_ack; - for (auto& block : gap_ack_blocks) { - UnwrappedTSN cur_block_first_acked = - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); - for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); - iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { - if (iter->first <= max_tsn_to_nack) { - ack_info.has_packet_loss = - NackItem(iter->first, iter->second, /*retransmit_now=*/false); - } - } - prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); - } - - // Note that packets are not NACKED which are above the highest gap-ack-block - // (or above the cumulative ack TSN if no gap-ack-blocks) as only packets - // up until the highest_tsn_acked (see above) should be considered when - // NACKing. -} - void RetransmissionQueue::MaybeExitFastRecovery( UnwrappedTSN cumulative_tsn_ack) { // https://tools.ietf.org/html/rfc4960#section-7.2.4 @@ -308,9 +194,7 @@ void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) { // https://tools.ietf.org/html/rfc4960#section-7.2.4 // "If not in Fast Recovery, enter Fast Recovery and mark the highest // outstanding TSN as the Fast Recovery exit point." - fast_recovery_exit_tsn_ = outstanding_data_.empty() - ? last_cumulative_tsn_ack_ - : outstanding_data_.rbegin()->first; + fast_recovery_exit_tsn_ = outstanding_data_.highest_outstanding_tsn(); RTC_DLOG(LS_VERBOSE) << log_prefix_ << "fast recovery initiated with exit_point=" << *fast_recovery_exit_tsn_->Wrap(); @@ -325,7 +209,9 @@ void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) { } void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) { - rwnd_ = outstanding_bytes_ >= a_rwnd ? 0 : a_rwnd - outstanding_bytes_; + rwnd_ = outstanding_data_.outstanding_bytes() >= a_rwnd + ? 0 + : a_rwnd - outstanding_data_.outstanding_bytes(); } void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() { @@ -363,20 +249,14 @@ bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const { // received, as the gap ack blocks or dup tsn fields may have changed. UnwrappedTSN cumulative_tsn_ack = tsn_unwrapper_.PeekUnwrap(sack.cumulative_tsn_ack()); - if (cumulative_tsn_ack < last_cumulative_tsn_ack_) { + if (cumulative_tsn_ack < outstanding_data_.last_cumulative_tsn_ack()) { // https://tools.ietf.org/html/rfc4960#section-6.2.1 // "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point, // then drop the SACK. Since Cumulative TSN Ack is monotonically // increasing, a SACK whose Cumulative TSN Ack is less than the Cumulative // TSN Ack Point indicates an out-of- order SACK." return false; - } else if (outstanding_data_.empty() && - cumulative_tsn_ack > last_cumulative_tsn_ack_) { - // No in-flight data and cum-tsn-ack above what was last ACKed - not valid. - return false; - } else if (!outstanding_data_.empty() && - cumulative_tsn_ack > outstanding_data_.rbegin()->first) { - // There is in-flight data, but the cum-tsn-ack is beyond that - not valid. + } else if (cumulative_tsn_ack > outstanding_data_.highest_outstanding_tsn()) { return false; } return true; @@ -387,7 +267,9 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { return false; } - size_t old_outstanding_bytes = outstanding_bytes_; + UnwrappedTSN old_last_cumulative_tsn_ack = + outstanding_data_.last_cumulative_tsn_ack(); + size_t old_outstanding_bytes = outstanding_data_.outstanding_bytes(); size_t old_rwnd = rwnd_; UnwrappedTSN cumulative_tsn_ack = tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack()); @@ -396,29 +278,23 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { UpdateRTT(now, cumulative_tsn_ack); } - AckInfo ack_info(cumulative_tsn_ack); - // Erase all items up to cumulative_tsn_ack. - RemoveAcked(cumulative_tsn_ack, ack_info); - - // ACK packets reported in the gap ack blocks - AckGapBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info); - - // NACK and possibly mark for retransmit chunks that weren't acked. - NackBetweenAckBlocks(cumulative_tsn_ack, sack.gap_ack_blocks(), ack_info); + OutstandingData::AckInfo ack_info = outstanding_data_.HandleSack( + cumulative_tsn_ack, sack.gap_ack_blocks(), is_in_fast_retransmit_); // Update of outstanding_data_ is now done. Congestion control remains. UpdateReceiverWindow(sack.a_rwnd()); - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK. cum_tsn_ack=" + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK, cum_tsn_ack=" << *cumulative_tsn_ack.Wrap() << " (" - << *last_cumulative_tsn_ack_.Wrap() - << "), outstanding_bytes=" << outstanding_bytes_ << " (" + << *old_last_cumulative_tsn_ack.Wrap() + << "), outstanding_bytes=" + << outstanding_data_.outstanding_bytes() << " (" << old_outstanding_bytes << "), rwnd=" << rwnd_ << " (" << old_rwnd << ")"; MaybeExitFastRecovery(cumulative_tsn_ack); - if (cumulative_tsn_ack > last_cumulative_tsn_ack_) { + if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) { // https://tools.ietf.org/html/rfc4960#section-6.3.2 // "Whenever a SACK is received that acknowledges the DATA chunk // with the earliest outstanding TSN for that address, restart the T3-rtx @@ -443,7 +319,6 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { on_clear_retransmission_counter_(); } - last_cumulative_tsn_ack_ = cumulative_tsn_ack; StartT3RtxTimerIfOutstandingData(); RTC_DCHECK(IsConsistent()); return true; @@ -459,23 +334,17 @@ void RetransmissionQueue::UpdateRTT(TimeMs now, // TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and // use only those packets for measurement. - auto it = outstanding_data_.find(cumulative_tsn_ack); - if (it != outstanding_data_.end()) { - if (!it->second.has_been_retransmitted()) { - // https://tools.ietf.org/html/rfc4960#section-6.3.1 - // "Karn's algorithm: RTT measurements MUST NOT be made using - // packets that were retransmitted (and thus for which it is ambiguous - // whether the reply was for the first instance of the chunk or for a - // later instance)" - DurationMs rtt = now - it->second.time_sent(); - on_new_rtt_(rtt); - } + absl::optional rtt = + outstanding_data_.MeasureRTT(now, cumulative_tsn_ack); + + if (rtt.has_value()) { + on_new_rtt_(*rtt); } } void RetransmissionQueue::HandleT3RtxTimerExpiry() { size_t old_cwnd = cwnd_; - size_t old_outstanding_bytes = outstanding_bytes_; + size_t old_outstanding_bytes = outstanding_bytes(); // https://tools.ietf.org/html/rfc4960#section-6.3.3 // "For the destination address for which the timer expires, adjust // its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU." @@ -502,13 +371,7 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() { // T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be // marked for retransmission and sent as soon as cwnd allows (normally, when a // SACK arrives)." - for (auto& elem : outstanding_data_) { - UnwrappedTSN tsn = elem.first; - TxData& item = elem.second; - if (!item.is_acked()) { - NackItem(tsn, item, /*retransmit_now=*/true); - } - } + outstanding_data_.NackAll(); // https://tools.ietf.org/html/rfc4960#section-6.3.3 // "Start the retransmission timer T3-rtx on the destination address @@ -518,69 +381,11 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() { RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_ << " (" << old_cwnd << "), ssthresh=" << ssthresh_ - << ", outstanding_bytes " << outstanding_bytes_ << " (" + << ", outstanding_bytes " << outstanding_bytes() << " (" << old_outstanding_bytes << ")"; RTC_DCHECK(IsConsistent()); } -bool RetransmissionQueue::NackItem(UnwrappedTSN tsn, - TxData& item, - bool retransmit_now) { - if (item.is_outstanding()) { - outstanding_bytes_ -= GetSerializedChunkSize(item.data()); - --outstanding_items_; - } - - switch (item.Nack(retransmit_now)) { - case TxData::NackAction::kNothing: - return false; - case TxData::NackAction::kRetransmit: - to_be_retransmitted_.insert(tsn); - RTC_DLOG(LS_VERBOSE) << log_prefix_ << *tsn.Wrap() - << " marked for retransmission"; - break; - case TxData::NackAction::kAbandon: - AbandonAllFor(item); - break; - } - return true; -} - -std::vector> -RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) { - std::vector> result; - - for (auto it = to_be_retransmitted_.begin(); - it != to_be_retransmitted_.end();) { - UnwrappedTSN tsn = *it; - auto elem = outstanding_data_.find(tsn); - RTC_DCHECK(elem != outstanding_data_.end()); - TxData& item = elem->second; - RTC_DCHECK(item.should_be_retransmitted()); - RTC_DCHECK(!item.is_outstanding()); - RTC_DCHECK(!item.is_abandoned()); - RTC_DCHECK(!item.is_acked()); - - size_t serialized_size = GetSerializedChunkSize(item.data()); - if (serialized_size <= max_size) { - item.Retransmit(); - result.emplace_back(tsn.Wrap(), item.data().Clone()); - max_size -= serialized_size; - outstanding_bytes_ += serialized_size; - ++outstanding_items_; - it = to_be_retransmitted_.erase(it); - } else { - ++it; - } - // No point in continuing if the packet is full. - if (max_size <= data_chunk_header_size_) { - break; - } - } - - return result; -} - std::vector> RetransmissionQueue::GetChunksToSend( TimeMs now, size_t bytes_remaining_in_packet) { @@ -588,9 +393,9 @@ std::vector> RetransmissionQueue::GetChunksToSend( RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet)); std::vector> to_be_sent; - size_t old_outstanding_bytes = outstanding_bytes_; + size_t old_outstanding_bytes = outstanding_bytes(); size_t old_rwnd = rwnd_; - if (is_in_fast_retransmit()) { + if (is_in_fast_retransmit_) { // https://tools.ietf.org/html/rfc4960#section-7.2.4 // "Determine how many of the earliest (i.e., lowest TSN) DATA chunks // marked for retransmission will fit into a single packet ... Retransmit @@ -598,7 +403,8 @@ std::vector> RetransmissionQueue::GetChunksToSend( // performed, the sender SHOULD ignore the value of cwnd and SHOULD NOT // delay retransmission for this single packet." is_in_fast_retransmit_ = false; - to_be_sent = GetChunksToBeRetransmitted(bytes_remaining_in_packet); + to_be_sent = + outstanding_data_.GetChunksToBeRetransmitted(bytes_remaining_in_packet); size_t to_be_sent_bytes = absl::c_accumulate( to_be_sent, 0, [&](size_t r, const std::pair& d) { return r + GetSerializedChunkSize(d.second); @@ -614,7 +420,7 @@ std::vector> RetransmissionQueue::GetChunksToSend( size_t max_bytes = RoundDownTo4(std::min(max_bytes_to_send(), bytes_remaining_in_packet)); - to_be_sent = GetChunksToBeRetransmitted(max_bytes); + to_be_sent = outstanding_data_.GetChunksToBeRetransmitted(max_bytes); max_bytes -= absl::c_accumulate( to_be_sent, 0, [&](size_t r, const std::pair& d) { return r + GetSerializedChunkSize(d.second); @@ -628,37 +434,17 @@ std::vector> RetransmissionQueue::GetChunksToSend( break; } - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); - - // All chunks are always padded to be even divisible by 4. size_t chunk_size = GetSerializedChunkSize(chunk_opt->data); max_bytes -= chunk_size; - outstanding_bytes_ += chunk_size; - ++outstanding_items_; rwnd_ -= chunk_size; - auto item_it = - outstanding_data_ - .emplace(tsn, - RetransmissionQueue::TxData( - chunk_opt->data.Clone(), - partial_reliability_ ? chunk_opt->max_retransmissions - : absl::nullopt, - now, - partial_reliability_ ? chunk_opt->expires_at - : absl::nullopt)) - .first; - if (item_it->second.has_expired(now)) { - // No need to send it - it was expired when it was in the send - // queue. - RTC_DLOG(LS_VERBOSE) - << log_prefix_ << "Marking freshly produced chunk " - << *item_it->first.Wrap() << " and message " - << *item_it->second.data().message_id << " as expired"; - AbandonAllFor(item_it->second); - } else { - to_be_sent.emplace_back(tsn.Wrap(), std::move(chunk_opt->data)); + absl::optional tsn = outstanding_data_.Insert( + chunk_opt->data, + partial_reliability_ ? chunk_opt->max_retransmissions : absl::nullopt, + now, partial_reliability_ ? chunk_opt->expires_at : absl::nullopt); + + if (tsn.has_value()) { + to_be_sent.emplace_back(tsn->Wrap(), std::move(chunk_opt->data)); } } } @@ -683,7 +469,7 @@ std::vector> RetransmissionQueue::GetChunksToSend( [&](size_t r, const std::pair& d) { return r + GetSerializedChunkSize(d.second); }) - << " bytes. outstanding_bytes=" << outstanding_bytes_ + << " bytes. outstanding_bytes=" << outstanding_bytes() << " (" << old_outstanding_bytes << "), cwnd=" << cwnd_ << ", rwnd=" << rwnd_ << " (" << old_rwnd << ")"; } @@ -691,29 +477,6 @@ std::vector> RetransmissionQueue::GetChunksToSend( return to_be_sent; } -std::vector> -RetransmissionQueue::GetChunkStatesForTesting() const { - std::vector> states; - states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); - for (const auto& elem : outstanding_data_) { - State state; - if (elem.second.is_abandoned()) { - state = State::kAbandoned; - } else if (elem.second.should_be_retransmitted()) { - state = State::kToBeRetransmitted; - } else if (elem.second.is_acked()) { - state = State::kAcked; - } else if (elem.second.is_outstanding()) { - state = State::kInFlight; - } else { - state = State::kNacked; - } - - states.emplace_back(elem.first.Wrap(), state); - } - return states; -} - bool RetransmissionQueue::can_send_data() const { return cwnd_ < options_.avoid_fragmentation_cwnd_mtus * options_.mtu || max_bytes_to_send() >= min_bytes_required_to_send_; @@ -723,132 +486,16 @@ bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { if (!partial_reliability_) { return false; } - ExpireOutstandingChunks(now); - if (!outstanding_data_.empty()) { - auto it = outstanding_data_.begin(); - return it->first == last_cumulative_tsn_ack_.next_value() && - it->second.is_abandoned(); - } + outstanding_data_.ExpireOutstandingChunks(now); + bool ret = outstanding_data_.ShouldSendForwardTsn(); RTC_DCHECK(IsConsistent()); - return false; -} - -void RetransmissionQueue::TxData::Ack() { - ack_state_ = AckState::kAcked; - should_be_retransmitted_ = false; -} - -RetransmissionQueue::TxData::NackAction RetransmissionQueue::TxData::Nack( - bool retransmit_now) { - ack_state_ = AckState::kNacked; - ++nack_count_; - if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) && - !is_abandoned_) { - // Nacked enough times - it's considered lost. - if (!max_retransmissions_.has_value() || - num_retransmissions_ < max_retransmissions_) { - should_be_retransmitted_ = true; - return NackAction::kRetransmit; - } - Abandon(); - return NackAction::kAbandon; - } - return NackAction::kNothing; -} - -void RetransmissionQueue::TxData::Retransmit() { - ack_state_ = AckState::kUnacked; - should_be_retransmitted_ = false; - - nack_count_ = 0; - ++num_retransmissions_; -} - -void RetransmissionQueue::TxData::Abandon() { - is_abandoned_ = true; - should_be_retransmitted_ = false; -} - -bool RetransmissionQueue::TxData::has_expired(TimeMs now) const { - return expires_at_.has_value() && *expires_at_ <= now; -} - -void RetransmissionQueue::ExpireOutstandingChunks(TimeMs now) { - for (const auto& elem : outstanding_data_) { - UnwrappedTSN tsn = elem.first; - const TxData& item = elem.second; - - // Chunks that are nacked can be expired. Care should be taken not to expire - // unacked (in-flight) chunks as they might have been received, but the SACK - // is either delayed or in-flight and may be received later. - if (item.is_abandoned()) { - // Already abandoned. - } else if (item.is_nacked() && item.has_expired(now)) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking nacked chunk " - << *tsn.Wrap() << " and message " - << *item.data().message_id << " as expired"; - AbandonAllFor(item); - } else { - // A non-expired chunk. No need to iterate any further. - break; - } - } -} - -void RetransmissionQueue::AbandonAllFor( - const RetransmissionQueue::TxData& item) { - // Erase all remaining chunks from the producer, if any. - if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id, - item.data().message_id)) { - // There were remaining chunks to be produced for this message. Since the - // receiver may have already received all chunks (up till now) for this - // message, we can't just FORWARD-TSN to the last fragment in this - // (abandoned) message and start sending a new message, as the receiver will - // then see a new message before the end of the previous one was seen (or - // skipped over). So create a new fragment, representing the end, that the - // received will never see as it is abandoned immediately and used as cum - // TSN in the sent FORWARD-TSN. - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); - Data message_end(item.data().stream_id, item.data().ssn, - item.data().message_id, item.data().fsn, item.data().ppid, - std::vector(), Data::IsBeginning(false), - Data::IsEnd(true), item.data().is_unordered); - TxData& added_item = - outstanding_data_ - .emplace(tsn, RetransmissionQueue::TxData(std::move(message_end), - absl::nullopt, TimeMs(0), - absl::nullopt)) - .first->second; - // The added chunk shouldn't be included in `outstanding_bytes`, so set it - // as acked. - added_item.Ack(); - RTC_DLOG(LS_VERBOSE) << log_prefix_ - << "Adding unsent end placeholder for message at tsn=" - << *tsn.Wrap(); - } - for (auto& elem : outstanding_data_) { - UnwrappedTSN tsn = elem.first; - TxData& other = elem.second; - - if (!other.is_abandoned() && - other.data().stream_id == item.data().stream_id && - other.data().is_unordered == item.data().is_unordered && - other.data().message_id == item.data().message_id) { - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap() - << " as abandoned"; - if (other.should_be_retransmitted()) { - to_be_retransmitted_.erase(tsn); - } - other.Abandon(); - } - } + return ret; } size_t RetransmissionQueue::max_bytes_to_send() const { - size_t left = outstanding_bytes_ >= cwnd_ ? 0 : cwnd_ - outstanding_bytes_; + size_t left = outstanding_bytes() >= cwnd_ ? 0 : cwnd_ - outstanding_bytes(); - if (outstanding_bytes_ == 0) { + if (outstanding_bytes() == 0) { // https://datatracker.ietf.org/doc/html/rfc4960#section-6.1 // ... However, regardless of the value of rwnd (including if it is 0), the // data sender can always have one DATA chunk in flight to the receiver if @@ -859,64 +506,6 @@ size_t RetransmissionQueue::max_bytes_to_send() const { return std::min(rwnd(), left); } -ForwardTsnChunk RetransmissionQueue::CreateForwardTsn() const { - std::map skipped_per_ordered_stream; - UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - - for (const auto& elem : outstanding_data_) { - UnwrappedTSN tsn = elem.first; - const TxData& item = elem.second; - - if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { - break; - } - new_cumulative_ack = tsn; - if (!item.data().is_unordered && - item.data().ssn > skipped_per_ordered_stream[item.data().stream_id]) { - skipped_per_ordered_stream[item.data().stream_id] = item.data().ssn; - } - } - - std::vector skipped_streams; - skipped_streams.reserve(skipped_per_ordered_stream.size()); - for (const auto& elem : skipped_per_ordered_stream) { - skipped_streams.emplace_back(elem.first, elem.second); - } - return ForwardTsnChunk(new_cumulative_ack.Wrap(), std::move(skipped_streams)); -} - -IForwardTsnChunk RetransmissionQueue::CreateIForwardTsn() const { - std::map, MID> skipped_per_stream; - UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - - for (const auto& elem : outstanding_data_) { - UnwrappedTSN tsn = elem.first; - const TxData& item = elem.second; - - if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { - break; - } - new_cumulative_ack = tsn; - std::pair stream_id = - std::make_pair(item.data().is_unordered, item.data().stream_id); - - if (item.data().message_id > skipped_per_stream[stream_id]) { - skipped_per_stream[stream_id] = item.data().message_id; - } - } - - std::vector skipped_streams; - skipped_streams.reserve(skipped_per_stream.size()); - for (const auto& elem : skipped_per_stream) { - const std::pair& stream = elem.first; - MID message_id = elem.second; - skipped_streams.emplace_back(stream.first, stream.second, message_id); - } - - return IForwardTsnChunk(new_cumulative_ack.Wrap(), - std::move(skipped_streams)); -} - void RetransmissionQueue::PrepareResetStreams( rtc::ArrayView streams) { // TODO(boivie): These calls are now only affecting the send queue. The @@ -943,7 +532,7 @@ HandoverReadinessStatus RetransmissionQueue::GetHandoverReadiness() const { if (fast_recovery_exit_tsn_.has_value()) { status.Add(HandoverUnreadinessReason::kRetransmissionQueueFastRecovery); } - if (!to_be_retransmitted_.empty()) { + if (outstanding_data_.has_data_to_be_retransmitted()) { status.Add(HandoverUnreadinessReason::kRetransmissionQueueNotEmpty); } return status; diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h index 90b6430643..08f11db744 100644 --- a/net/dcsctp/tx/retransmission_queue.h +++ b/net/dcsctp/tx/retransmission_queue.h @@ -29,6 +29,7 @@ #include "net/dcsctp/public/dcsctp_handover_state.h" #include "net/dcsctp/public/dcsctp_options.h" #include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/outstanding_data.h" #include "net/dcsctp/tx/retransmission_timeout.h" #include "net/dcsctp/tx/send_queue.h" @@ -45,23 +46,7 @@ namespace dcsctp { class RetransmissionQueue { public: static constexpr size_t kMinimumFragmentedPayload = 10; - // State for DATA chunks (message fragments) in the queue - used in tests. - enum class State { - // The chunk has been sent but not received yet (from the sender's point of - // view, as no SACK has been received yet that reference this chunk). - kInFlight, - // A SACK has been received which explicitly marked this chunk as missing - - // it's now NACKED and may be retransmitted if NACKED enough times. - kNacked, - // A chunk that will be retransmitted when possible. - kToBeRetransmitted, - // A SACK has been received which explicitly marked this chunk as received. - kAcked, - // A chunk whose message has expired or has been retransmitted too many - // times (RFC3758). It will not be retransmitted anymore. - kAbandoned, - }; - + using State = OutstandingData::State; // Creates a RetransmissionQueue which will send data using `my_initial_tsn` // (or a value from `DcSctpSocketHandoverState` if given) as the first TSN // to use for sent fragments. It will poll data from `send_queue`. When SACKs @@ -100,10 +85,13 @@ class RetransmissionQueue { // Returns the internal state of all queued chunks. This is only used in // unit-tests. - std::vector> GetChunkStatesForTesting() const; + std::vector> GetChunkStatesForTesting() + const { + return outstanding_data_.GetChunkStatesForTesting(); + } // Returns the next TSN that will be allocated for sent DATA chunks. - TSN next_tsn() const { return next_tsn_.Wrap(); } + TSN next_tsn() const { return outstanding_data_.next_tsn().Wrap(); } // Returns the size of the congestion window, in bytes. This is the number of // bytes that may be in-flight. @@ -116,10 +104,14 @@ class RetransmissionQueue { size_t rwnd() const { return rwnd_; } // Returns the number of bytes of packets that are in-flight. - size_t outstanding_bytes() const { return outstanding_bytes_; } + size_t outstanding_bytes() const { + return outstanding_data_.outstanding_bytes(); + } // Returns the number of DATA chunks that are in-flight. - size_t outstanding_items() const { return outstanding_items_; } + size_t outstanding_items() const { + return outstanding_data_.outstanding_items(); + } // Indicates if the congestion control algorithm allows data to be sent. bool can_send_data() const; @@ -130,10 +122,14 @@ class RetransmissionQueue { bool ShouldSendForwardTsn(TimeMs now); // Creates a FORWARD-TSN chunk. - ForwardTsnChunk CreateForwardTsn() const; + ForwardTsnChunk CreateForwardTsn() const { + return outstanding_data_.CreateForwardTsn(); + } // Creates an I-FORWARD-TSN chunk. - IForwardTsnChunk CreateIForwardTsn() const; + IForwardTsnChunk CreateIForwardTsn() const { + return outstanding_data_.CreateIForwardTsn(); + } // See the SendQueue for a longer description of these methods related // to stream resetting. @@ -152,109 +148,6 @@ class RetransmissionQueue { kCongestionAvoidance, }; - // A fragmented message's DATA chunk while in the retransmission queue, and - // its associated metadata. - class TxData { - public: - enum class NackAction { - kNothing, - kRetransmit, - kAbandon, - }; - - explicit TxData(Data data, - absl::optional max_retransmissions, - TimeMs time_sent, - absl::optional expires_at) - : max_retransmissions_(max_retransmissions), - time_sent_(time_sent), - expires_at_(expires_at), - data_(std::move(data)) {} - - TimeMs time_sent() const { return time_sent_; } - - const Data& data() const { return data_; } - - // Acks an item. - void Ack(); - - // Nacks an item. If it has been nacked enough times, or if `retransmit_now` - // is set, it might be marked for retransmission. If the item has reached - // its max retransmission value, it will instead be abandoned. The action - // performed is indicated as return value. - NackAction Nack(bool retransmit_now = false); - - // Prepares the item to be retransmitted. Sets it as outstanding and - // clears all nack counters. - void Retransmit(); - - // Marks this item as abandoned. - void Abandon(); - - bool is_outstanding() const { return ack_state_ == AckState::kUnacked; } - bool is_acked() const { return ack_state_ == AckState::kAcked; } - bool is_nacked() const { return ack_state_ == AckState::kNacked; } - bool is_abandoned() const { return is_abandoned_; } - - // Indicates if this chunk should be retransmitted. - bool should_be_retransmitted() const { return should_be_retransmitted_; } - // Indicates if this chunk has ever been retransmitted. - bool has_been_retransmitted() const { return num_retransmissions_ > 0; } - - // Given the current time, and the current state of this DATA chunk, it will - // indicate if it has expired (SCTP Partial Reliability Extension). - bool has_expired(TimeMs now) const; - - private: - enum class AckState { - kUnacked, - kAcked, - kNacked, - }; - // Indicates the presence of this chunk, if it's in flight (Unacked), has - // been received (Acked) or is lost (Nacked). - AckState ack_state_ = AckState::kUnacked; - // Indicates if this chunk has been abandoned, which is a terminal state. - bool is_abandoned_ = false; - // Indicates if this chunk should be retransmitted. - bool should_be_retransmitted_ = false; - - // The number of times the DATA chunk has been nacked (by having received a - // SACK which doesn't include it). Will be cleared on retransmissions. - size_t nack_count_ = 0; - // The number of times the DATA chunk has been retransmitted. - size_t num_retransmissions_ = 0; - // If the message was sent with a maximum number of retransmissions, this is - // set to that number. The value zero (0) means that it will never be - // retransmitted. - const absl::optional max_retransmissions_; - // When the packet was sent, and placed in this queue. - const TimeMs time_sent_; - // If the message was sent with an expiration time, this is set. - const absl::optional expires_at_; - // The actual data to send/retransmit. - Data data_; - }; - - // Contains variables scoped to a processing of an incoming SACK. - struct AckInfo { - explicit AckInfo(UnwrappedTSN cumulative_tsn_ack) - : highest_tsn_acked(cumulative_tsn_ack) {} - - // Bytes acked by increasing cumulative_tsn_ack and gap_ack_blocks. - size_t bytes_acked = 0; - - // Indicates if this SACK indicates that packet loss has occurred. Just - // because a packet is missing in the SACK doesn't necessarily mean that - // there is packet loss as that packet might be in-flight and received - // out-of-order. But when it has been reported missing consecutive times, it - // will eventually be considered "lost" and this will be set. - bool has_packet_loss = false; - - // Highest TSN Newly Acknowledged, an SCTP variable. - UnwrappedTSN highest_tsn_acked; - }; - bool IsConsistent() const; // Returns how large a chunk will be, serialized, carrying the data @@ -265,46 +158,12 @@ class RetransmissionQueue { return fast_recovery_exit_tsn_.has_value(); } - // Indicates if the congestion control algorithm is in "fast retransmit". - bool is_in_fast_retransmit() const { return is_in_fast_retransmit_; } - // Indicates if the provided SACK is valid given what has previously been // received. If it returns false, the SACK is most likely a duplicate of // something already seen, so this returning false doesn't necessarily mean // that the SACK is illegal. bool IsSackValid(const SackChunk& sack) const; - // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items - // in the retransmission queue up until this value and will update `ack_info` - // by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`. - void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info); - - // Helper method to nack an item and perform the correct operations given the - // action indicated when nacking an item (e.g. retransmitting or abandoning). - // The return value indicate if an action was performed, meaning that packet - // loss was detected and acted upon. - bool NackItem(UnwrappedTSN tsn, TxData& item, bool retransmit_now); - - // Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK - // as "acked" and update `ack_info` by adding new TSNs to `added_tsns`. - void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack, - rtc::ArrayView gap_ack_blocks, - AckInfo& ack_info); - - // Acks the chunk referenced by `iter` and updates state in `ack_info` and the - // object's state. - void AckChunk(AckInfo& ack_info, - std::map::iterator iter); - - // Mark chunks reported as "missing", as "nacked" or "to be retransmitted" - // depending how many times this has happened. Only packets up until - // `ack_info.highest_tsn_acked` (highest TSN newly acknowledged) are - // nacked/retransmitted. The method will set `ack_info.has_packet_loss`. - void NackBetweenAckBlocks( - UnwrappedTSN cumulative_tsn_ack, - rtc::ArrayView gap_ack_blocks, - AckInfo& ack_info); - // When a SACK chunk is received, this method will be called which _may_ call // into the `RetransmissionTimeout` to update the RTO. void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack); @@ -326,21 +185,10 @@ class RetransmissionQueue { void HandlePacketLoss(UnwrappedTSN highest_tsn_acked); // Update the view of the receiver window size. void UpdateReceiverWindow(uint32_t a_rwnd); - // Given `max_size` of space left in a packet, which chunks can be added to - // it? - std::vector> GetChunksToBeRetransmitted(size_t max_size); // If there is data sent and not ACKED, ensure that the retransmission timer // is running. void StartT3RtxTimerIfOutstandingData(); - // Given the current time `now_ms`, expire and abandon outstanding (sent at - // least once) chunks that have a limited lifetime. - void ExpireOutstandingChunks(TimeMs now); - // Given that a message fragment, `item` has been abandoned, abandon all other - // fragments that share the same message - both never-before-sent fragments - // that are still in the SendQueue and outstanding chunks. - void AbandonAllFor(const RetransmissionQueue::TxData& item); - // Returns the current congestion control algorithm phase. CongestionAlgorithmPhase phase() const { return (cwnd_ <= ssthresh_) @@ -384,23 +232,12 @@ class RetransmissionQueue { // Indicates if the congestion algorithm is in fast retransmit. bool is_in_fast_retransmit_ = false; - // Next TSN to used. - UnwrappedTSN next_tsn_; - // The last cumulative TSN ack number - UnwrappedTSN last_cumulative_tsn_ack_; // The send queue. SendQueue& send_queue_; // All the outstanding data chunks that are in-flight and that have not been // cumulative acked. Note that it also contains chunks that have been acked in // gap ack blocks. - std::map outstanding_data_; - // Data chunks that are to be retransmitted. - std::set to_be_retransmitted_; - // The number of bytes that are in-flight (sent but not yet acked or nacked). - size_t outstanding_bytes_ = 0; - // The number of DATA chunks that are in-flight (sent but not yet acked or - // nacked). - size_t outstanding_items_ = 0; + OutstandingData outstanding_data_; }; } // namespace dcsctp