webrtc_m130/net/dcsctp/rx/reassembly_queue.cc
Victor Boivie b847a43488 dcsctp: Reset synchronously with incoming request
When a sender has requested a stream to be reset, and the last sender
assigned TSN hasn't been received yet, the receiver will enter deferred
reset mode, where it will store any data chunks received after that
given TSN, and replay those later, when the stream has been reset.

Before this CL, leaving deferred mode was done as soon as the sender's
last assigned TSN was received. That's actually not how the RFC
describes the process[1], but was done that way to properly handle some
sequences of RE-CONFIG and FORWARD-TSN. But after having read the RFCs
again, and realizing that whenever RFC6525 mention "any data arriving",
this also applies to any FORWARD-TSN[2] - it's better to reset streams
synchronously with the incoming requests, and defer not just DATA past
the sender last assigned TSN, but also any FORWARD-TSN after that TSN.

This mostly simplifies the code and is mostly a refactoring, but most
importantly aligns it with how the resetting procedure is explained in
the RFC. It also fixes two bugs:

 * It defers FORWARD-TSN *as well as* DATA chunks with a TSN later
   than the sender's last assigned TSN - see test case. The old
   implementation tried to handle that by exiting the deferred reset
   processing as soon as it reached the sender's last assigned TSN, but
   it didn't manage to do that in all cases.
 * It only defers DATA chunks for streams that are to be reset, not
   all DATA chunks with a TSN > sender's last assigned TSN. This was
   missed in the old implementation, but as it's now implemented
   strictly according to the RFC, this was now done.

[1] https://datatracker.ietf.org/doc/html/rfc6525#section-5.2.2
[2] RFC6525 cover stream resetting, and RFC3758 cover FORWARD-TSN, and
    the combination of these is not covered in the RFCs.

Bug: webrtc:14600
Change-Id: Ief878b755291b9c923aa6fb4317b0f5c00231df4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/322623
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40889}
2023-10-09 09:47:57 +00:00

282 lines
11 KiB
C++

/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/rx/reassembly_queue.h"
#include <stddef.h>
#include <algorithm>
#include <cstdint>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include "rtc_base/logging.h"
namespace dcsctp {
namespace {
std::unique_ptr<ReassemblyStreams> CreateStreams(
absl::string_view log_prefix,
ReassemblyStreams::OnAssembledMessage on_assembled_message,
bool use_message_interleaving) {
if (use_message_interleaving) {
return std::make_unique<InterleavedReassemblyStreams>(
log_prefix, std::move(on_assembled_message));
}
return std::make_unique<TraditionalReassemblyStreams>(
log_prefix, std::move(on_assembled_message));
}
} // namespace
ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
TSN peer_initial_tsn,
size_t max_size_bytes,
bool use_message_interleaving)
: log_prefix_(log_prefix),
max_size_bytes_(max_size_bytes),
watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
last_assembled_tsn_watermark_(
tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
streams_(CreateStreams(
log_prefix_,
[this](rtc::ArrayView<const UnwrappedTSN> tsns,
DcSctpMessage message) {
AddReassembledMessage(tsns, std::move(message));
},
use_message_interleaving)) {}
void ReassemblyQueue::Add(TSN tsn, Data data) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
<< ", stream=" << *data.stream_id << ":" << *data.mid
<< ":" << *data.fsn << ", type="
<< (data.is_beginning && data.is_end ? "complete"
: data.is_beginning ? "first"
: data.is_end ? "last"
: "middle");
UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
// If a stream reset has been received with a "sender's last assigned tsn" in
// the future, the socket is in "deferred reset processing" mode and must
// buffer chunks until it's exited.
if (deferred_reset_streams_.has_value() &&
unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn &&
deferred_reset_streams_->streams.contains(data.stream_id)) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_ << "Deferring chunk with tsn=" << *tsn
<< ", sid=" << *data.stream_id << " until tsn="
<< *deferred_reset_streams_->sender_last_assigned_tsn.Wrap();
// https://tools.ietf.org/html/rfc6525#section-5.2.2
// "In this mode, any data arriving with a TSN larger than the
// Sender's Last Assigned TSN for the affected stream(s) MUST be queued
// locally and held until the cumulative acknowledgment point reaches the
// Sender's Last Assigned TSN."
queued_bytes_ += data.size();
deferred_reset_streams_->deferred_actions.push_back(
[this, tsn, data = std::move(data)]() mutable {
queued_bytes_ -= data.size();
Add(tsn, std::move(data));
});
} else {
queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
}
// https://tools.ietf.org/html/rfc4960#section-6.9
// "Note: If the data receiver runs out of buffer space while still
// waiting for more fragments to complete the reassembly of the message, it
// should dispatch part of its inbound message through a partial delivery
// API (see Section 10), freeing some of its receive buffer space so that
// the rest of the message may be received."
// TODO(boivie): Support EOR flag and partial delivery?
RTC_DCHECK(IsConsistent());
}
void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset(
rtc::ArrayView<const StreamID> stream_ids) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: ["
<< StrJoin(stream_ids, ",",
[](rtc::StringBuilder& sb, StreamID sid) {
sb << *sid;
})
<< "]";
// https://tools.ietf.org/html/rfc6525#section-5.2.2
// "... streams MUST be reset to 0 as the next expected SSN."
streams_->ResetStreams(stream_ids);
if (deferred_reset_streams_.has_value()) {
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "Leaving deferred reset processing, feeding back "
<< deferred_reset_streams_->deferred_actions.size()
<< " actions";
// https://tools.ietf.org/html/rfc6525#section-5.2.2
// "Any queued TSNs (queued at step E2) MUST now be released and processed
// normally."
auto deferred_actions =
std::move(deferred_reset_streams_->deferred_actions);
deferred_reset_streams_ = absl::nullopt;
for (auto& action : deferred_actions) {
action();
}
}
RTC_DCHECK(IsConsistent());
}
void ReassemblyQueue::EnterDeferredReset(
TSN sender_last_assigned_tsn,
rtc::ArrayView<const StreamID> streams) {
if (!deferred_reset_streams_.has_value()) {
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "Entering deferred reset; sender_last_assigned_tsn="
<< *sender_last_assigned_tsn;
deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(
tsn_unwrapper_.Unwrap(sender_last_assigned_tsn),
webrtc::flat_set<StreamID>(streams.begin(), streams.end()));
}
RTC_DCHECK(IsConsistent());
}
std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
std::vector<DcSctpMessage> ret;
reassembled_messages_.swap(ret);
return ret;
}
void ReassemblyQueue::AddReassembledMessage(
rtc::ArrayView<const UnwrappedTSN> tsns,
DcSctpMessage message) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
<< StrJoin(tsns, ",",
[](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
sb << *tsn.Wrap();
})
<< "], message; stream_id=" << *message.stream_id()
<< ", ppid=" << *message.ppid()
<< ", payload=" << message.payload().size() << " bytes";
for (const UnwrappedTSN tsn : tsns) {
if (tsn == last_assembled_tsn_watermark_.next_value()) {
// Update watermark, or insert into delivered_tsns_
last_assembled_tsn_watermark_.Increment();
} else {
delivered_tsns_.insert(tsn);
}
}
// With new TSNs in delivered_tsns, gaps might be filled.
MaybeMoveLastAssembledWatermarkFurther();
reassembled_messages_.emplace_back(std::move(message));
}
void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
// `delivered_tsns_` contain TSNS when there is a gap between ranges of
// assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
// that list, because if so, it can be moved.
while (!delivered_tsns_.empty() &&
*delivered_tsns_.begin() ==
last_assembled_tsn_watermark_.next_value()) {
last_assembled_tsn_watermark_.Increment();
delivered_tsns_.erase(delivered_tsns_.begin());
}
}
void ReassemblyQueue::HandleForwardTsn(
TSN new_cumulative_tsn,
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn);
if (deferred_reset_streams_.has_value() &&
tsn > deferred_reset_streams_->sender_last_assigned_tsn) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
<< "- deferring.";
deferred_reset_streams_->deferred_actions.emplace_back(
[this, new_cumulative_tsn,
streams = std::vector<AnyForwardTsnChunk::SkippedStream>(
skipped_streams.begin(), skipped_streams.end())] {
HandleForwardTsn(new_cumulative_tsn, streams);
});
RTC_DCHECK(IsConsistent());
return;
}
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
<< " - performing.";
last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
delivered_tsns_.erase(delivered_tsns_.begin(),
delivered_tsns_.upper_bound(tsn));
MaybeMoveLastAssembledWatermarkFurther();
queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams);
RTC_DCHECK(IsConsistent());
}
bool ReassemblyQueue::IsConsistent() const {
// `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
// adjacent.
if (!delivered_tsns_.empty() &&
last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
return false;
}
// Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
// enforced in this class. This comparison will still trigger if queued_bytes_
// became "negative".
return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
}
HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
HandoverReadinessStatus status = streams_->GetHandoverReadiness();
if (!delivered_tsns_.empty()) {
status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
}
if (deferred_reset_streams_.has_value()) {
status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
}
return status;
}
void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
state.rx.last_completed_deferred_reset_req_sn =
last_completed_reset_req_seq_nbr_.value();
streams_->AddHandoverState(state);
}
void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
// Validate that the component is in pristine state.
RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));
last_assembled_tsn_watermark_ =
tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
last_completed_reset_req_seq_nbr_ =
ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
streams_->RestoreFromState(state);
}
} // namespace dcsctp