dcsctp: Add Reassembly Queue
The Reassembly Queue receives fragmented messages (DATA or I-DATA chunks) and - with help of stream reassemblers - will reassemble these fragments into messages, which will be delivered to the client. It also handle partial reliability (FORWARD-TSN) and stream resetting. To avoid a DoS attack vector, where a sender can send fragments in a way that the reassembly queue will never succeed to reassemble a message and use all available memory, the ReassemblyQueue has a maximum size. Bug: webrtc:12614 Change-Id: Ibb084fecd240d4c414e096579244f8f5ee46914e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214043 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Tommi <tommi@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33678}
This commit is contained in:
parent
8a13d2ca9f
commit
cb70aa7e05
@ -40,12 +40,27 @@ rtc_library("traditional_reassembly_streams") {
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rtc_library("reassembly_queue") {
|
||||||
|
deps = [
|
||||||
|
":traditional_reassembly_streams",
|
||||||
|
"../../../api:array_view",
|
||||||
|
"../../../rtc_base",
|
||||||
|
"../../../rtc_base:checks",
|
||||||
|
"../../../rtc_base:rtc_base_approved",
|
||||||
|
]
|
||||||
|
sources = [
|
||||||
|
"reassembly_queue.cc",
|
||||||
|
"reassembly_queue.h",
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
if (rtc_include_tests) {
|
if (rtc_include_tests) {
|
||||||
rtc_library("dcsctp_rx_unittests") {
|
rtc_library("dcsctp_rx_unittests") {
|
||||||
testonly = true
|
testonly = true
|
||||||
|
|
||||||
deps = [
|
deps = [
|
||||||
":data_tracker",
|
":data_tracker",
|
||||||
|
":reassembly_queue",
|
||||||
":traditional_reassembly_streams",
|
":traditional_reassembly_streams",
|
||||||
"../../../api:array_view",
|
"../../../api:array_view",
|
||||||
"../../../rtc_base:checks",
|
"../../../rtc_base:checks",
|
||||||
@ -56,6 +71,7 @@ if (rtc_include_tests) {
|
|||||||
]
|
]
|
||||||
sources = [
|
sources = [
|
||||||
"data_tracker_test.cc",
|
"data_tracker_test.cc",
|
||||||
|
"reassembly_queue_test.cc",
|
||||||
"traditional_reassembly_streams_test.cc",
|
"traditional_reassembly_streams_test.cc",
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
245
net/dcsctp/rx/reassembly_queue.cc
Normal file
245
net/dcsctp/rx/reassembly_queue.cc
Normal file
@ -0,0 +1,245 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "net/dcsctp/rx/reassembly_queue.h"
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <memory>
|
||||||
|
#include <set>
|
||||||
|
#include <string>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "absl/strings/string_view.h"
|
||||||
|
#include "absl/types/optional.h"
|
||||||
|
#include "api/array_view.h"
|
||||||
|
#include "net/dcsctp/common/sequence_numbers.h"
|
||||||
|
#include "net/dcsctp/common/str_join.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||||
|
#include "net/dcsctp/packet/data.h"
|
||||||
|
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
|
||||||
|
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
|
||||||
|
#include "net/dcsctp/public/dcsctp_message.h"
|
||||||
|
#include "net/dcsctp/rx/reassembly_streams.h"
|
||||||
|
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
|
||||||
|
#include "rtc_base/logging.h"
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
|
||||||
|
TSN peer_initial_tsn,
|
||||||
|
size_t max_size_bytes)
|
||||||
|
: log_prefix_(std::string(log_prefix) + "reasm: "),
|
||||||
|
max_size_bytes_(max_size_bytes),
|
||||||
|
watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
|
||||||
|
last_assembled_tsn_watermark_(
|
||||||
|
tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
|
||||||
|
streams_(std::make_unique<TraditionalReassemblyStreams>(
|
||||||
|
log_prefix_,
|
||||||
|
[this](rtc::ArrayView<const UnwrappedTSN> tsns,
|
||||||
|
DcSctpMessage message) {
|
||||||
|
AddReassembledMessage(tsns, std::move(message));
|
||||||
|
})) {}
|
||||||
|
|
||||||
|
void ReassemblyQueue::Add(TSN tsn, Data data) {
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
|
||||||
|
<< ", stream=" << *data.stream_id << ":"
|
||||||
|
<< *data.message_id << ":" << *data.fsn << ", type="
|
||||||
|
<< (data.is_beginning && data.is_end
|
||||||
|
? "complete"
|
||||||
|
: data.is_beginning
|
||||||
|
? "first"
|
||||||
|
: data.is_end ? "last" : "middle");
|
||||||
|
|
||||||
|
UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
|
||||||
|
|
||||||
|
if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
|
||||||
|
delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << log_prefix_
|
||||||
|
<< "Chunk has already been delivered - skipping";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a stream reset has been received with a "sender's last assigned tsn" in
|
||||||
|
// the future, the socket is in "deferred reset processing" mode and must
|
||||||
|
// buffer chunks until it's exited.
|
||||||
|
if (deferred_reset_streams_.has_value() &&
|
||||||
|
unwrapped_tsn >
|
||||||
|
tsn_unwrapper_.Unwrap(
|
||||||
|
deferred_reset_streams_->req.sender_last_assigned_tsn())) {
|
||||||
|
RTC_DLOG(LS_VERBOSE)
|
||||||
|
<< log_prefix_ << "Deferring chunk with tsn=" << *tsn
|
||||||
|
<< " until cum_ack_tsn="
|
||||||
|
<< *deferred_reset_streams_->req.sender_last_assigned_tsn();
|
||||||
|
// https://tools.ietf.org/html/rfc6525#section-5.2.2
|
||||||
|
// "In this mode, any data arriving with a TSN larger than the
|
||||||
|
// Sender's Last Assigned TSN for the affected stream(s) MUST be queued
|
||||||
|
// locally and held until the cumulative acknowledgment point reaches the
|
||||||
|
// Sender's Last Assigned TSN."
|
||||||
|
queued_bytes_ += data.size();
|
||||||
|
deferred_reset_streams_->deferred_chunks.emplace_back(
|
||||||
|
std::make_pair(tsn, std::move(data)));
|
||||||
|
} else {
|
||||||
|
queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://tools.ietf.org/html/rfc4960#section-6.9
|
||||||
|
// "Note: If the data receiver runs out of buffer space while still
|
||||||
|
// waiting for more fragments to complete the reassembly of the message, it
|
||||||
|
// should dispatch part of its inbound message through a partial delivery
|
||||||
|
// API (see Section 10), freeing some of its receive buffer space so that
|
||||||
|
// the rest of the message may be received."
|
||||||
|
|
||||||
|
// TODO(boivie): Support EOR flag and partial delivery?
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
}
|
||||||
|
|
||||||
|
ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
|
||||||
|
const OutgoingSSNResetRequestParameter& req,
|
||||||
|
TSN cum_tsn_ack) {
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
if (deferred_reset_streams_.has_value()) {
|
||||||
|
// In deferred mode already.
|
||||||
|
return ReconfigurationResponseParameter::Result::kInProgress;
|
||||||
|
} else if (req.request_sequence_number() <=
|
||||||
|
last_completed_reset_req_seq_nbr_) {
|
||||||
|
// Already performed at some time previously.
|
||||||
|
return ReconfigurationResponseParameter::Result::kSuccessPerformed;
|
||||||
|
}
|
||||||
|
|
||||||
|
UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
|
||||||
|
UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
|
||||||
|
|
||||||
|
// https://tools.ietf.org/html/rfc6525#section-5.2.2
|
||||||
|
// "If the Sender's Last Assigned TSN is greater than the
|
||||||
|
// cumulative acknowledgment point, then the endpoint MUST enter "deferred
|
||||||
|
// reset processing"."
|
||||||
|
if (sla_tsn > unwrapped_cum_tsn_ack) {
|
||||||
|
RTC_DLOG(LS_VERBOSE)
|
||||||
|
<< log_prefix_
|
||||||
|
<< "Entering deferred reset processing mode until cum_tsn_ack="
|
||||||
|
<< *req.sender_last_assigned_tsn();
|
||||||
|
deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
|
||||||
|
return ReconfigurationResponseParameter::Result::kInProgress;
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://tools.ietf.org/html/rfc6525#section-5.2.2
|
||||||
|
// "... streams MUST be reset to 0 as the next expected SSN."
|
||||||
|
streams_->ResetStreams(req.stream_ids());
|
||||||
|
last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
return ReconfigurationResponseParameter::Result::kSuccessPerformed;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
if (deferred_reset_streams_.has_value()) {
|
||||||
|
UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
|
||||||
|
UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
|
||||||
|
deferred_reset_streams_->req.sender_last_assigned_tsn());
|
||||||
|
if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << log_prefix_
|
||||||
|
<< "Leaving deferred reset processing with tsn="
|
||||||
|
<< *cum_ack_tsn << ", feeding back "
|
||||||
|
<< deferred_reset_streams_->deferred_chunks.size()
|
||||||
|
<< " chunks";
|
||||||
|
// https://tools.ietf.org/html/rfc6525#section-5.2.2
|
||||||
|
// "... streams MUST be reset to 0 as the next expected SSN."
|
||||||
|
streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
|
||||||
|
std::vector<std::pair<TSN, Data>> deferred_chunks =
|
||||||
|
std::move(deferred_reset_streams_->deferred_chunks);
|
||||||
|
// The response will not be sent now, but as a reply to the retried
|
||||||
|
// request, which will come as "in progress" has been sent prior.
|
||||||
|
last_completed_reset_req_seq_nbr_ =
|
||||||
|
deferred_reset_streams_->req.request_sequence_number();
|
||||||
|
deferred_reset_streams_ = absl::nullopt;
|
||||||
|
|
||||||
|
// https://tools.ietf.org/html/rfc6525#section-5.2.2
|
||||||
|
// "Any queued TSNs (queued at step E2) MUST now be released and processed
|
||||||
|
// normally."
|
||||||
|
for (auto& p : deferred_chunks) {
|
||||||
|
const TSN& tsn = p.first;
|
||||||
|
Data& data = p.second;
|
||||||
|
queued_bytes_ -= data.size();
|
||||||
|
Add(tsn, std::move(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
|
||||||
|
<< *cum_ack_tsn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
|
||||||
|
std::vector<DcSctpMessage> ret;
|
||||||
|
reassembled_messages_.swap(ret);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReassemblyQueue::AddReassembledMessage(
|
||||||
|
rtc::ArrayView<const UnwrappedTSN> tsns,
|
||||||
|
DcSctpMessage message) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
|
||||||
|
<< StrJoin(tsns, ",",
|
||||||
|
[](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
|
||||||
|
sb << *tsn.Wrap();
|
||||||
|
})
|
||||||
|
<< "], message; stream_id=" << *message.stream_id()
|
||||||
|
<< ", ppid=" << *message.ppid()
|
||||||
|
<< ", payload=" << message.payload().size() << " bytes";
|
||||||
|
|
||||||
|
for (const UnwrappedTSN tsn : tsns) {
|
||||||
|
// Update watermark, or insert into delivered_tsns_
|
||||||
|
if (tsn == last_assembled_tsn_watermark_.next_value()) {
|
||||||
|
last_assembled_tsn_watermark_.Increment();
|
||||||
|
} else {
|
||||||
|
delivered_tsns_.insert(tsn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// With new TSNs in delivered_tsns, gaps might be filled.
|
||||||
|
while (!delivered_tsns_.empty() &&
|
||||||
|
*delivered_tsns_.begin() ==
|
||||||
|
last_assembled_tsn_watermark_.next_value()) {
|
||||||
|
last_assembled_tsn_watermark_.Increment();
|
||||||
|
delivered_tsns_.erase(delivered_tsns_.begin());
|
||||||
|
}
|
||||||
|
|
||||||
|
reassembled_messages_.emplace_back(std::move(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
|
||||||
|
|
||||||
|
last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
|
||||||
|
delivered_tsns_.erase(delivered_tsns_.begin(),
|
||||||
|
delivered_tsns_.upper_bound(tsn));
|
||||||
|
|
||||||
|
queued_bytes_ -=
|
||||||
|
streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReassemblyQueue::IsConsistent() const {
|
||||||
|
// Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
|
||||||
|
// enforced in this class. This comparison will still trigger if queued_bytes_
|
||||||
|
// became "negative".
|
||||||
|
return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace dcsctp
|
||||||
163
net/dcsctp/rx/reassembly_queue.h
Normal file
163
net/dcsctp/rx/reassembly_queue.h
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#ifndef NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_
|
||||||
|
#define NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <memory>
|
||||||
|
#include <set>
|
||||||
|
#include <string>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "absl/strings/string_view.h"
|
||||||
|
#include "api/array_view.h"
|
||||||
|
#include "net/dcsctp/common/internal_types.h"
|
||||||
|
#include "net/dcsctp/common/sequence_numbers.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||||
|
#include "net/dcsctp/packet/data.h"
|
||||||
|
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
|
||||||
|
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
|
||||||
|
#include "net/dcsctp/public/dcsctp_message.h"
|
||||||
|
#include "net/dcsctp/rx/reassembly_streams.h"
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
|
||||||
|
// Contains the received DATA chunks that haven't yet been reassembled, and
|
||||||
|
// reassembles chunks when possible.
|
||||||
|
//
|
||||||
|
// The actual assembly is handled by an implementation of the
|
||||||
|
// `ReassemblyStreams` interface.
|
||||||
|
//
|
||||||
|
// Except for reassembling fragmented messages, this class will also handle two
|
||||||
|
// less common operations; To handle the receiver-side of partial reliability
|
||||||
|
// (limited number of retransmissions or limited message lifetime) as well as
|
||||||
|
// stream resetting, which is used when a sender wishes to close a data channel.
|
||||||
|
//
|
||||||
|
// Partial reliability is handled when a FORWARD-TSN or I-FORWARD-TSN chunk is
|
||||||
|
// received, and it will simply delete any chunks matching the parameters in
|
||||||
|
// that chunk. This is mainly implemented in ReassemblyStreams.
|
||||||
|
//
|
||||||
|
// Resetting streams is handled when a RECONFIG chunks is received, with an
|
||||||
|
// "Outgoing SSN Reset Request" parameter. That parameter will contain a list of
|
||||||
|
// streams to reset, and a `sender_last_assigned_tsn`. If this TSN is not yet
|
||||||
|
// seen, the stream cannot be directly reset, and this class will respond that
|
||||||
|
// the reset is "deferred". But if this TSN provided is known, the stream can be
|
||||||
|
// immediately be reset.
|
||||||
|
//
|
||||||
|
// The ReassemblyQueue has a maximum size, as it would otherwise be an DoS
|
||||||
|
// attack vector where a peer could consume all memory of the other peer by
|
||||||
|
// sending a lot of ordered chunks, but carefully withholding an early one. It
|
||||||
|
// also has a watermark limit, which the caller can query is the number of bytes
|
||||||
|
// is above that limit. This is used by the caller to be selective in what to
|
||||||
|
// add to the reassembly queue, so that it's not exhausted. The caller is
|
||||||
|
// expected to call `is_full` prior to adding data to the queue and to act
|
||||||
|
// accordingly if the queue is full.
|
||||||
|
class ReassemblyQueue {
|
||||||
|
public:
|
||||||
|
// When the queue is filled over this fraction (of its maximum size), the
|
||||||
|
// socket should restrict incoming data to avoid filling up the queue.
|
||||||
|
static constexpr float kHighWatermarkLimit = 0.9;
|
||||||
|
|
||||||
|
ReassemblyQueue(absl::string_view log_prefix,
|
||||||
|
TSN peer_initial_tsn,
|
||||||
|
size_t max_size_bytes);
|
||||||
|
|
||||||
|
// Adds a data chunk to the queue, with a `tsn` and other parameters in
|
||||||
|
// `data`.
|
||||||
|
void Add(TSN tsn, Data data);
|
||||||
|
|
||||||
|
// Indicates if the reassembly queue has any reassembled messages that can be
|
||||||
|
// retrieved by calling `FlushMessages`.
|
||||||
|
bool HasMessages() const { return !reassembled_messages_.empty(); }
|
||||||
|
|
||||||
|
// Returns any reassembled messages.
|
||||||
|
std::vector<DcSctpMessage> FlushMessages();
|
||||||
|
|
||||||
|
// Handle a ForwardTSN chunk, when the sender has indicated that the received
|
||||||
|
// (this class) should forget about some chunks. This is used to implement
|
||||||
|
// partial reliability.
|
||||||
|
void Handle(const AnyForwardTsnChunk& forward_tsn);
|
||||||
|
|
||||||
|
// Given the reset stream request and the current cum_tsn_ack, might either
|
||||||
|
// reset the streams directly (returns kSuccessPerformed), or at a later time,
|
||||||
|
// by entering the "deferred reset processing" mode (returns kInProgress).
|
||||||
|
ReconfigurationResponseParameter::Result ResetStreams(
|
||||||
|
const OutgoingSSNResetRequestParameter& req,
|
||||||
|
TSN cum_tsn_ack);
|
||||||
|
|
||||||
|
// Given the current (updated) cum_tsn_ack, might leave "defererred reset
|
||||||
|
// processing" mode and reset streams. Returns true if so.
|
||||||
|
bool MaybeResetStreamsDeferred(TSN cum_ack_tsn);
|
||||||
|
|
||||||
|
// The number of payload bytes that have been queued. Note that the actual
|
||||||
|
// memory usage is higher due to additional overhead of tracking received
|
||||||
|
// data.
|
||||||
|
size_t queued_bytes() const { return queued_bytes_; }
|
||||||
|
|
||||||
|
// The remaining bytes until the queue is full.
|
||||||
|
size_t remaining_bytes() const { return max_size_bytes_ - queued_bytes_; }
|
||||||
|
|
||||||
|
// Indicates if the queue is full. Data should not be added to the queue when
|
||||||
|
// it's full.
|
||||||
|
bool is_full() const { return queued_bytes_ >= max_size_bytes_; }
|
||||||
|
|
||||||
|
// Indicates if the queue is above the watermark limit, which is a certain
|
||||||
|
// percentage of its size.
|
||||||
|
bool is_above_watermark() const { return queued_bytes_ >= watermark_bytes_; }
|
||||||
|
|
||||||
|
// Returns the watermark limit, in bytes.
|
||||||
|
size_t watermark_bytes() const { return watermark_bytes_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool IsConsistent() const;
|
||||||
|
void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,
|
||||||
|
DcSctpMessage message);
|
||||||
|
|
||||||
|
struct DeferredResetStreams {
|
||||||
|
explicit DeferredResetStreams(OutgoingSSNResetRequestParameter req)
|
||||||
|
: req(std::move(req)) {}
|
||||||
|
OutgoingSSNResetRequestParameter req;
|
||||||
|
std::vector<std::pair<TSN, Data>> deferred_chunks;
|
||||||
|
};
|
||||||
|
|
||||||
|
const std::string log_prefix_;
|
||||||
|
const size_t max_size_bytes_;
|
||||||
|
const size_t watermark_bytes_;
|
||||||
|
UnwrappedTSN::Unwrapper tsn_unwrapper_;
|
||||||
|
|
||||||
|
// Whenever a message has been assembled, either increase
|
||||||
|
// `last_assembled_tsn_watermark_` or - if there are gaps - add the message's
|
||||||
|
// TSNs into delivered_tsns_ so that messages are not re-delivered on
|
||||||
|
// duplicate chunks.
|
||||||
|
UnwrappedTSN last_assembled_tsn_watermark_;
|
||||||
|
std::set<UnwrappedTSN> delivered_tsns_;
|
||||||
|
// Messages that have been reassembled, and will be returned by
|
||||||
|
// `FlushMessages`.
|
||||||
|
std::vector<DcSctpMessage> reassembled_messages_;
|
||||||
|
|
||||||
|
// If present, "deferred reset processing" mode is active.
|
||||||
|
absl::optional<DeferredResetStreams> deferred_reset_streams_;
|
||||||
|
|
||||||
|
// Contains the last request sequence number of the
|
||||||
|
// OutgoingSSNResetRequestParameter that was performed.
|
||||||
|
ReconfigRequestSN last_completed_reset_req_seq_nbr_ = ReconfigRequestSN(0);
|
||||||
|
|
||||||
|
// The number of "payload bytes" that are in this queue, in total.
|
||||||
|
size_t queued_bytes_ = 0;
|
||||||
|
|
||||||
|
// The actual implementation of ReassemblyStreams.
|
||||||
|
std::unique_ptr<ReassemblyStreams> streams_;
|
||||||
|
};
|
||||||
|
} // namespace dcsctp
|
||||||
|
|
||||||
|
#endif // NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_
|
||||||
298
net/dcsctp/rx/reassembly_queue_test.cc
Normal file
298
net/dcsctp/rx/reassembly_queue_test.cc
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "net/dcsctp/rx/reassembly_queue.h"
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <array>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <iterator>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "api/array_view.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
|
||||||
|
#include "net/dcsctp/packet/data.h"
|
||||||
|
#include "net/dcsctp/public/dcsctp_message.h"
|
||||||
|
#include "net/dcsctp/public/types.h"
|
||||||
|
#include "net/dcsctp/testing/data_generator.h"
|
||||||
|
#include "rtc_base/gunit.h"
|
||||||
|
#include "test/gmock.h"
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
namespace {
|
||||||
|
using ::testing::ElementsAre;
|
||||||
|
|
||||||
|
// The default maximum size of the Reassembly Queue.
|
||||||
|
static constexpr size_t kBufferSize = 10000;
|
||||||
|
|
||||||
|
static constexpr StreamID kStreamID(1);
|
||||||
|
static constexpr SSN kSSN(0);
|
||||||
|
static constexpr MID kMID(0);
|
||||||
|
static constexpr FSN kFSN(0);
|
||||||
|
static constexpr PPID kPPID(53);
|
||||||
|
|
||||||
|
static constexpr std::array<uint8_t, 4> kShortPayload = {1, 2, 3, 4};
|
||||||
|
static constexpr std::array<uint8_t, 4> kMessage2Payload = {5, 6, 7, 8};
|
||||||
|
static constexpr std::array<uint8_t, 16> kLongPayload = {
|
||||||
|
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
|
||||||
|
|
||||||
|
MATCHER_P3(SctpMessageIs, stream_id, ppid, expected_payload, "") {
|
||||||
|
if (arg.stream_id() != stream_id) {
|
||||||
|
*result_listener << "the stream_id is " << *arg.stream_id();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (arg.ppid() != ppid) {
|
||||||
|
*result_listener << "the ppid is " << *arg.ppid();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (std::vector<uint8_t>(arg.payload().begin(), arg.payload().end()) !=
|
||||||
|
std::vector<uint8_t>(expected_payload.begin(), expected_payload.end())) {
|
||||||
|
*result_listener << "the payload is wrong";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
class ReassemblyQueueTest : public testing::Test {
|
||||||
|
protected:
|
||||||
|
ReassemblyQueueTest() {}
|
||||||
|
DataGenerator gen_;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, EmptyQueue) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessage) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, LargeUnorderedChunkAllPermutations) {
|
||||||
|
std::vector<uint32_t> tsns = {10, 11, 12, 13};
|
||||||
|
rtc::ArrayView<const uint8_t> payload(kLongPayload);
|
||||||
|
do {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < tsns.size(); i++) {
|
||||||
|
auto span = payload.subview((tsns[i] - 10) * 4, 4);
|
||||||
|
Data::IsBeginning is_beginning(tsns[i] == 10);
|
||||||
|
Data::IsEnd is_end(tsns[i] == 13);
|
||||||
|
|
||||||
|
reasm.Add(TSN(tsns[i]),
|
||||||
|
Data(kStreamID, kSSN, kMID, kFSN, kPPID,
|
||||||
|
std::vector<uint8_t>(span.begin(), span.end()),
|
||||||
|
is_beginning, is_end, IsUnordered(false)));
|
||||||
|
if (i < 3) {
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
} else {
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kLongPayload)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (std::next_permutation(std::begin(tsns), std::end(tsns)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, SingleOrderedChunkMessage) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ManySmallOrderedMessages) {
|
||||||
|
std::vector<uint32_t> tsns = {10, 11, 12, 13};
|
||||||
|
rtc::ArrayView<const uint8_t> payload(kLongPayload);
|
||||||
|
do {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
for (size_t i = 0; i < tsns.size(); i++) {
|
||||||
|
auto span = payload.subview((tsns[i] - 10) * 4, 4);
|
||||||
|
Data::IsBeginning is_beginning(true);
|
||||||
|
Data::IsEnd is_end(true);
|
||||||
|
|
||||||
|
SSN ssn(static_cast<uint16_t>(tsns[i] - 10));
|
||||||
|
reasm.Add(TSN(tsns[i]),
|
||||||
|
Data(kStreamID, ssn, kMID, kFSN, kPPID,
|
||||||
|
std::vector<uint8_t>(span.begin(), span.end()),
|
||||||
|
is_beginning, is_end, IsUnordered(false)));
|
||||||
|
}
|
||||||
|
EXPECT_THAT(
|
||||||
|
reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, payload.subview(0, 4)),
|
||||||
|
SctpMessageIs(kStreamID, kPPID, payload.subview(4, 4)),
|
||||||
|
SctpMessageIs(kStreamID, kPPID, payload.subview(8, 4)),
|
||||||
|
SctpMessageIs(kStreamID, kPPID, payload.subview(12, 4))));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
} while (std::next_permutation(std::begin(tsns), std::end(tsns)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, RetransmissionInLargeOrdered) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
|
||||||
|
reasm.Add(TSN(12), gen_.Ordered({3}));
|
||||||
|
reasm.Add(TSN(13), gen_.Ordered({4}));
|
||||||
|
reasm.Add(TSN(14), gen_.Ordered({5}));
|
||||||
|
reasm.Add(TSN(15), gen_.Ordered({6}));
|
||||||
|
reasm.Add(TSN(16), gen_.Ordered({7}));
|
||||||
|
reasm.Add(TSN(17), gen_.Ordered({8}));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 7u);
|
||||||
|
|
||||||
|
// lost and retransmitted
|
||||||
|
reasm.Add(TSN(11), gen_.Ordered({2}));
|
||||||
|
reasm.Add(TSN(18), gen_.Ordered({9}));
|
||||||
|
reasm.Add(TSN(19), gen_.Ordered({10}));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 10u);
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
|
||||||
|
reasm.Add(TSN(20), gen_.Ordered({11, 12, 13, 14, 15, 16}, "E"));
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kLongPayload)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ForwardTSNRemoveUnordered) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Unordered({1}, "B"));
|
||||||
|
reasm.Add(TSN(12), gen_.Unordered({3}));
|
||||||
|
reasm.Add(TSN(13), gen_.Unordered({4}, "E"));
|
||||||
|
|
||||||
|
reasm.Add(TSN(14), gen_.Unordered({5}, "B"));
|
||||||
|
reasm.Add(TSN(15), gen_.Unordered({6}));
|
||||||
|
reasm.Add(TSN(17), gen_.Unordered({8}, "E"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 6u);
|
||||||
|
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
|
||||||
|
reasm.Handle(ForwardTsnChunk(TSN(13), {}));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 3u);
|
||||||
|
|
||||||
|
// The lost chunk comes, but too late.
|
||||||
|
reasm.Add(TSN(11), gen_.Unordered({2}));
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 3u);
|
||||||
|
|
||||||
|
// The second lost chunk comes, message is assembled.
|
||||||
|
reasm.Add(TSN(16), gen_.Unordered({7}));
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ForwardTSNRemoveOrdered) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
|
||||||
|
reasm.Add(TSN(12), gen_.Ordered({3}));
|
||||||
|
reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
|
||||||
|
|
||||||
|
reasm.Add(TSN(14), gen_.Ordered({5}, "B"));
|
||||||
|
reasm.Add(TSN(15), gen_.Ordered({6}));
|
||||||
|
reasm.Add(TSN(16), gen_.Ordered({7}));
|
||||||
|
reasm.Add(TSN(17), gen_.Ordered({8}, "E"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 7u);
|
||||||
|
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
|
||||||
|
reasm.Handle(ForwardTsnChunk(
|
||||||
|
TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)}));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
|
||||||
|
// The lost chunk comes, but too late.
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
|
||||||
|
reasm.Add(TSN(12), gen_.Ordered({3}));
|
||||||
|
reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
|
||||||
|
|
||||||
|
reasm.Add(TSN(15), gen_.Ordered({5}, "B"));
|
||||||
|
reasm.Add(TSN(16), gen_.Ordered({6}));
|
||||||
|
reasm.Add(TSN(17), gen_.Ordered({7}));
|
||||||
|
reasm.Add(TSN(18), gen_.Ordered({8}, "E"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 7u);
|
||||||
|
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
|
||||||
|
reasm.Handle(ForwardTsnChunk(
|
||||||
|
TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)}));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
|
||||||
|
// The lost chunk comes, but too late.
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ShouldntDeliverMessagesBeforeInitialTsn) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(5), gen_.Unordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessages) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
|
||||||
|
reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessagesReallyUnordered) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 4u);
|
||||||
|
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
|
||||||
|
reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 4u);
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
|
||||||
|
reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 4u);
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, ShouldntDeliverBeforeForwardedTsn) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
|
||||||
|
reasm.Handle(ForwardTsnChunk(TSN(12), {}));
|
||||||
|
|
||||||
|
reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
} // namespace dcsctp
|
||||||
Loading…
x
Reference in New Issue
Block a user