From cb70aa7e0534f3c84175b6e903dd90c26f67fe89 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Sat, 3 Apr 2021 20:33:43 +0200 Subject: [PATCH] dcsctp: Add Reassembly Queue The Reassembly Queue receives fragmented messages (DATA or I-DATA chunks) and - with help of stream reassemblers - will reassemble these fragments into messages, which will be delivered to the client. It also handle partial reliability (FORWARD-TSN) and stream resetting. To avoid a DoS attack vector, where a sender can send fragments in a way that the reassembly queue will never succeed to reassemble a message and use all available memory, the ReassemblyQueue has a maximum size. Bug: webrtc:12614 Change-Id: Ibb084fecd240d4c414e096579244f8f5ee46914e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214043 Commit-Queue: Victor Boivie Reviewed-by: Tommi Cr-Commit-Position: refs/heads/master@{#33678} --- net/dcsctp/rx/BUILD.gn | 16 ++ net/dcsctp/rx/reassembly_queue.cc | 245 ++++++++++++++++++++ net/dcsctp/rx/reassembly_queue.h | 163 ++++++++++++++ net/dcsctp/rx/reassembly_queue_test.cc | 298 +++++++++++++++++++++++++ 4 files changed, 722 insertions(+) create mode 100644 net/dcsctp/rx/reassembly_queue.cc create mode 100644 net/dcsctp/rx/reassembly_queue.h create mode 100644 net/dcsctp/rx/reassembly_queue_test.cc diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn index 6c26e116b8..75312b96da 100644 --- a/net/dcsctp/rx/BUILD.gn +++ b/net/dcsctp/rx/BUILD.gn @@ -40,12 +40,27 @@ rtc_library("traditional_reassembly_streams") { ] } +rtc_library("reassembly_queue") { + deps = [ + ":traditional_reassembly_streams", + "../../../api:array_view", + "../../../rtc_base", + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + ] + sources = [ + "reassembly_queue.cc", + "reassembly_queue.h", + ] +} + if (rtc_include_tests) { rtc_library("dcsctp_rx_unittests") { testonly = true deps = [ ":data_tracker", + ":reassembly_queue", ":traditional_reassembly_streams", "../../../api:array_view", "../../../rtc_base:checks", @@ -56,6 +71,7 @@ if (rtc_include_tests) { ] sources = [ "data_tracker_test.cc", + "reassembly_queue_test.cc", "traditional_reassembly_streams_test.cc", ] } diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc new file mode 100644 index 0000000000..581b9fcc49 --- /dev/null +++ b/net/dcsctp/rx/reassembly_queue.cc @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/rx/reassembly_queue.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/common/str_join.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" +#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/rx/reassembly_streams.h" +#include "net/dcsctp/rx/traditional_reassembly_streams.h" +#include "rtc_base/logging.h" + +namespace dcsctp { +ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix, + TSN peer_initial_tsn, + size_t max_size_bytes) + : log_prefix_(std::string(log_prefix) + "reasm: "), + max_size_bytes_(max_size_bytes), + watermark_bytes_(max_size_bytes * kHighWatermarkLimit), + last_assembled_tsn_watermark_( + tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))), + streams_(std::make_unique( + log_prefix_, + [this](rtc::ArrayView tsns, + DcSctpMessage message) { + AddReassembledMessage(tsns, std::move(message)); + })) {} + +void ReassemblyQueue::Add(TSN tsn, Data data) { + RTC_DCHECK(IsConsistent()); + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn + << ", stream=" << *data.stream_id << ":" + << *data.message_id << ":" << *data.fsn << ", type=" + << (data.is_beginning && data.is_end + ? "complete" + : data.is_beginning + ? "first" + : data.is_end ? "last" : "middle"); + + UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn); + + if (unwrapped_tsn <= last_assembled_tsn_watermark_ || + delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Chunk has already been delivered - skipping"; + return; + } + + // If a stream reset has been received with a "sender's last assigned tsn" in + // the future, the socket is in "deferred reset processing" mode and must + // buffer chunks until it's exited. + if (deferred_reset_streams_.has_value() && + unwrapped_tsn > + tsn_unwrapper_.Unwrap( + deferred_reset_streams_->req.sender_last_assigned_tsn())) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ << "Deferring chunk with tsn=" << *tsn + << " until cum_ack_tsn=" + << *deferred_reset_streams_->req.sender_last_assigned_tsn(); + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "In this mode, any data arriving with a TSN larger than the + // Sender's Last Assigned TSN for the affected stream(s) MUST be queued + // locally and held until the cumulative acknowledgment point reaches the + // Sender's Last Assigned TSN." + queued_bytes_ += data.size(); + deferred_reset_streams_->deferred_chunks.emplace_back( + std::make_pair(tsn, std::move(data))); + } else { + queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data)); + } + + // https://tools.ietf.org/html/rfc4960#section-6.9 + // "Note: If the data receiver runs out of buffer space while still + // waiting for more fragments to complete the reassembly of the message, it + // should dispatch part of its inbound message through a partial delivery + // API (see Section 10), freeing some of its receive buffer space so that + // the rest of the message may be received." + + // TODO(boivie): Support EOR flag and partial delivery? + RTC_DCHECK(IsConsistent()); +} + +ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams( + const OutgoingSSNResetRequestParameter& req, + TSN cum_tsn_ack) { + RTC_DCHECK(IsConsistent()); + if (deferred_reset_streams_.has_value()) { + // In deferred mode already. + return ReconfigurationResponseParameter::Result::kInProgress; + } else if (req.request_sequence_number() <= + last_completed_reset_req_seq_nbr_) { + // Already performed at some time previously. + return ReconfigurationResponseParameter::Result::kSuccessPerformed; + } + + UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn()); + UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack); + + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "If the Sender's Last Assigned TSN is greater than the + // cumulative acknowledgment point, then the endpoint MUST enter "deferred + // reset processing"." + if (sla_tsn > unwrapped_cum_tsn_ack) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "Entering deferred reset processing mode until cum_tsn_ack=" + << *req.sender_last_assigned_tsn(); + deferred_reset_streams_ = absl::make_optional(req); + return ReconfigurationResponseParameter::Result::kInProgress; + } + + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "... streams MUST be reset to 0 as the next expected SSN." + streams_->ResetStreams(req.stream_ids()); + last_completed_reset_req_seq_nbr_ = req.request_sequence_number(); + RTC_DCHECK(IsConsistent()); + return ReconfigurationResponseParameter::Result::kSuccessPerformed; +} + +bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) { + RTC_DCHECK(IsConsistent()); + if (deferred_reset_streams_.has_value()) { + UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn); + UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap( + deferred_reset_streams_->req.sender_last_assigned_tsn()); + if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Leaving deferred reset processing with tsn=" + << *cum_ack_tsn << ", feeding back " + << deferred_reset_streams_->deferred_chunks.size() + << " chunks"; + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "... streams MUST be reset to 0 as the next expected SSN." + streams_->ResetStreams(deferred_reset_streams_->req.stream_ids()); + std::vector> deferred_chunks = + std::move(deferred_reset_streams_->deferred_chunks); + // The response will not be sent now, but as a reply to the retried + // request, which will come as "in progress" has been sent prior. + last_completed_reset_req_seq_nbr_ = + deferred_reset_streams_->req.request_sequence_number(); + deferred_reset_streams_ = absl::nullopt; + + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "Any queued TSNs (queued at step E2) MUST now be released and processed + // normally." + for (auto& p : deferred_chunks) { + const TSN& tsn = p.first; + Data& data = p.second; + queued_bytes_ -= data.size(); + Add(tsn, std::move(data)); + } + + RTC_DCHECK(IsConsistent()); + return true; + } else { + RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn=" + << *cum_ack_tsn; + } + } + + return false; +} + +std::vector ReassemblyQueue::FlushMessages() { + std::vector ret; + reassembled_messages_.swap(ret); + return ret; +} + +void ReassemblyQueue::AddReassembledMessage( + rtc::ArrayView tsns, + DcSctpMessage message) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=[" + << StrJoin(tsns, ",", + [](rtc::StringBuilder& sb, UnwrappedTSN tsn) { + sb << *tsn.Wrap(); + }) + << "], message; stream_id=" << *message.stream_id() + << ", ppid=" << *message.ppid() + << ", payload=" << message.payload().size() << " bytes"; + + for (const UnwrappedTSN tsn : tsns) { + // Update watermark, or insert into delivered_tsns_ + if (tsn == last_assembled_tsn_watermark_.next_value()) { + last_assembled_tsn_watermark_.Increment(); + } else { + delivered_tsns_.insert(tsn); + } + } + + // With new TSNs in delivered_tsns, gaps might be filled. + while (!delivered_tsns_.empty() && + *delivered_tsns_.begin() == + last_assembled_tsn_watermark_.next_value()) { + last_assembled_tsn_watermark_.Increment(); + delivered_tsns_.erase(delivered_tsns_.begin()); + } + + reassembled_messages_.emplace_back(std::move(message)); +} + +void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) { + RTC_DCHECK(IsConsistent()); + UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn()); + + last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn); + delivered_tsns_.erase(delivered_tsns_.begin(), + delivered_tsns_.upper_bound(tsn)); + + queued_bytes_ -= + streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams()); + RTC_DCHECK(IsConsistent()); +} + +bool ReassemblyQueue::IsConsistent() const { + // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively + // enforced in this class. This comparison will still trigger if queued_bytes_ + // became "negative". + return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_); +} + +} // namespace dcsctp diff --git a/net/dcsctp/rx/reassembly_queue.h b/net/dcsctp/rx/reassembly_queue.h new file mode 100644 index 0000000000..b752e53acc --- /dev/null +++ b/net/dcsctp/rx/reassembly_queue.h @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_ +#define NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_ + +#include + +#include +#include +#include +#include +#include +#include + +#include "absl/strings/string_view.h" +#include "api/array_view.h" +#include "net/dcsctp/common/internal_types.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" +#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/rx/reassembly_streams.h" + +namespace dcsctp { + +// Contains the received DATA chunks that haven't yet been reassembled, and +// reassembles chunks when possible. +// +// The actual assembly is handled by an implementation of the +// `ReassemblyStreams` interface. +// +// Except for reassembling fragmented messages, this class will also handle two +// less common operations; To handle the receiver-side of partial reliability +// (limited number of retransmissions or limited message lifetime) as well as +// stream resetting, which is used when a sender wishes to close a data channel. +// +// Partial reliability is handled when a FORWARD-TSN or I-FORWARD-TSN chunk is +// received, and it will simply delete any chunks matching the parameters in +// that chunk. This is mainly implemented in ReassemblyStreams. +// +// Resetting streams is handled when a RECONFIG chunks is received, with an +// "Outgoing SSN Reset Request" parameter. That parameter will contain a list of +// streams to reset, and a `sender_last_assigned_tsn`. If this TSN is not yet +// seen, the stream cannot be directly reset, and this class will respond that +// the reset is "deferred". But if this TSN provided is known, the stream can be +// immediately be reset. +// +// The ReassemblyQueue has a maximum size, as it would otherwise be an DoS +// attack vector where a peer could consume all memory of the other peer by +// sending a lot of ordered chunks, but carefully withholding an early one. It +// also has a watermark limit, which the caller can query is the number of bytes +// is above that limit. This is used by the caller to be selective in what to +// add to the reassembly queue, so that it's not exhausted. The caller is +// expected to call `is_full` prior to adding data to the queue and to act +// accordingly if the queue is full. +class ReassemblyQueue { + public: + // When the queue is filled over this fraction (of its maximum size), the + // socket should restrict incoming data to avoid filling up the queue. + static constexpr float kHighWatermarkLimit = 0.9; + + ReassemblyQueue(absl::string_view log_prefix, + TSN peer_initial_tsn, + size_t max_size_bytes); + + // Adds a data chunk to the queue, with a `tsn` and other parameters in + // `data`. + void Add(TSN tsn, Data data); + + // Indicates if the reassembly queue has any reassembled messages that can be + // retrieved by calling `FlushMessages`. + bool HasMessages() const { return !reassembled_messages_.empty(); } + + // Returns any reassembled messages. + std::vector FlushMessages(); + + // Handle a ForwardTSN chunk, when the sender has indicated that the received + // (this class) should forget about some chunks. This is used to implement + // partial reliability. + void Handle(const AnyForwardTsnChunk& forward_tsn); + + // Given the reset stream request and the current cum_tsn_ack, might either + // reset the streams directly (returns kSuccessPerformed), or at a later time, + // by entering the "deferred reset processing" mode (returns kInProgress). + ReconfigurationResponseParameter::Result ResetStreams( + const OutgoingSSNResetRequestParameter& req, + TSN cum_tsn_ack); + + // Given the current (updated) cum_tsn_ack, might leave "defererred reset + // processing" mode and reset streams. Returns true if so. + bool MaybeResetStreamsDeferred(TSN cum_ack_tsn); + + // The number of payload bytes that have been queued. Note that the actual + // memory usage is higher due to additional overhead of tracking received + // data. + size_t queued_bytes() const { return queued_bytes_; } + + // The remaining bytes until the queue is full. + size_t remaining_bytes() const { return max_size_bytes_ - queued_bytes_; } + + // Indicates if the queue is full. Data should not be added to the queue when + // it's full. + bool is_full() const { return queued_bytes_ >= max_size_bytes_; } + + // Indicates if the queue is above the watermark limit, which is a certain + // percentage of its size. + bool is_above_watermark() const { return queued_bytes_ >= watermark_bytes_; } + + // Returns the watermark limit, in bytes. + size_t watermark_bytes() const { return watermark_bytes_; } + + private: + bool IsConsistent() const; + void AddReassembledMessage(rtc::ArrayView tsns, + DcSctpMessage message); + + struct DeferredResetStreams { + explicit DeferredResetStreams(OutgoingSSNResetRequestParameter req) + : req(std::move(req)) {} + OutgoingSSNResetRequestParameter req; + std::vector> deferred_chunks; + }; + + const std::string log_prefix_; + const size_t max_size_bytes_; + const size_t watermark_bytes_; + UnwrappedTSN::Unwrapper tsn_unwrapper_; + + // Whenever a message has been assembled, either increase + // `last_assembled_tsn_watermark_` or - if there are gaps - add the message's + // TSNs into delivered_tsns_ so that messages are not re-delivered on + // duplicate chunks. + UnwrappedTSN last_assembled_tsn_watermark_; + std::set delivered_tsns_; + // Messages that have been reassembled, and will be returned by + // `FlushMessages`. + std::vector reassembled_messages_; + + // If present, "deferred reset processing" mode is active. + absl::optional deferred_reset_streams_; + + // Contains the last request sequence number of the + // OutgoingSSNResetRequestParameter that was performed. + ReconfigRequestSN last_completed_reset_req_seq_nbr_ = ReconfigRequestSN(0); + + // The number of "payload bytes" that are in this queue, in total. + size_t queued_bytes_ = 0; + + // The actual implementation of ReassemblyStreams. + std::unique_ptr streams_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_ diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc new file mode 100644 index 0000000000..e38372c7d1 --- /dev/null +++ b/net/dcsctp/rx/reassembly_queue_test.cc @@ -0,0 +1,298 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/rx/reassembly_queue.h" + +#include + +#include +#include +#include +#include +#include + +#include "api/array_view.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/testing/data_generator.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::ElementsAre; + +// The default maximum size of the Reassembly Queue. +static constexpr size_t kBufferSize = 10000; + +static constexpr StreamID kStreamID(1); +static constexpr SSN kSSN(0); +static constexpr MID kMID(0); +static constexpr FSN kFSN(0); +static constexpr PPID kPPID(53); + +static constexpr std::array kShortPayload = {1, 2, 3, 4}; +static constexpr std::array kMessage2Payload = {5, 6, 7, 8}; +static constexpr std::array kLongPayload = { + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; + +MATCHER_P3(SctpMessageIs, stream_id, ppid, expected_payload, "") { + if (arg.stream_id() != stream_id) { + *result_listener << "the stream_id is " << *arg.stream_id(); + return false; + } + + if (arg.ppid() != ppid) { + *result_listener << "the ppid is " << *arg.ppid(); + return false; + } + + if (std::vector(arg.payload().begin(), arg.payload().end()) != + std::vector(expected_payload.begin(), expected_payload.end())) { + *result_listener << "the payload is wrong"; + return false; + } + return true; +} + +class ReassemblyQueueTest : public testing::Test { + protected: + ReassemblyQueueTest() {} + DataGenerator gen_; +}; + +TEST_F(ReassemblyQueueTest, EmptyQueue) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + EXPECT_FALSE(reasm.HasMessages()); + EXPECT_EQ(reasm.queued_bytes(), 0u); +} + +TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessage) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE")); + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload))); + EXPECT_EQ(reasm.queued_bytes(), 0u); +} + +TEST_F(ReassemblyQueueTest, LargeUnorderedChunkAllPermutations) { + std::vector tsns = {10, 11, 12, 13}; + rtc::ArrayView payload(kLongPayload); + do { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + + for (size_t i = 0; i < tsns.size(); i++) { + auto span = payload.subview((tsns[i] - 10) * 4, 4); + Data::IsBeginning is_beginning(tsns[i] == 10); + Data::IsEnd is_end(tsns[i] == 13); + + reasm.Add(TSN(tsns[i]), + Data(kStreamID, kSSN, kMID, kFSN, kPPID, + std::vector(span.begin(), span.end()), + is_beginning, is_end, IsUnordered(false))); + if (i < 3) { + EXPECT_FALSE(reasm.HasMessages()); + } else { + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kLongPayload))); + EXPECT_EQ(reasm.queued_bytes(), 0u); + } + } + } while (std::next_permutation(std::begin(tsns), std::end(tsns))); +} + +TEST_F(ReassemblyQueueTest, SingleOrderedChunkMessage) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE")); + EXPECT_EQ(reasm.queued_bytes(), 0u); + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload))); +} + +TEST_F(ReassemblyQueueTest, ManySmallOrderedMessages) { + std::vector tsns = {10, 11, 12, 13}; + rtc::ArrayView payload(kLongPayload); + do { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + for (size_t i = 0; i < tsns.size(); i++) { + auto span = payload.subview((tsns[i] - 10) * 4, 4); + Data::IsBeginning is_beginning(true); + Data::IsEnd is_end(true); + + SSN ssn(static_cast(tsns[i] - 10)); + reasm.Add(TSN(tsns[i]), + Data(kStreamID, ssn, kMID, kFSN, kPPID, + std::vector(span.begin(), span.end()), + is_beginning, is_end, IsUnordered(false))); + } + EXPECT_THAT( + reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, payload.subview(0, 4)), + SctpMessageIs(kStreamID, kPPID, payload.subview(4, 4)), + SctpMessageIs(kStreamID, kPPID, payload.subview(8, 4)), + SctpMessageIs(kStreamID, kPPID, payload.subview(12, 4)))); + EXPECT_EQ(reasm.queued_bytes(), 0u); + } while (std::next_permutation(std::begin(tsns), std::end(tsns))); +} + +TEST_F(ReassemblyQueueTest, RetransmissionInLargeOrdered) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Ordered({1}, "B")); + reasm.Add(TSN(12), gen_.Ordered({3})); + reasm.Add(TSN(13), gen_.Ordered({4})); + reasm.Add(TSN(14), gen_.Ordered({5})); + reasm.Add(TSN(15), gen_.Ordered({6})); + reasm.Add(TSN(16), gen_.Ordered({7})); + reasm.Add(TSN(17), gen_.Ordered({8})); + EXPECT_EQ(reasm.queued_bytes(), 7u); + + // lost and retransmitted + reasm.Add(TSN(11), gen_.Ordered({2})); + reasm.Add(TSN(18), gen_.Ordered({9})); + reasm.Add(TSN(19), gen_.Ordered({10})); + EXPECT_EQ(reasm.queued_bytes(), 10u); + EXPECT_FALSE(reasm.HasMessages()); + + reasm.Add(TSN(20), gen_.Ordered({11, 12, 13, 14, 15, 16}, "E")); + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kLongPayload))); + EXPECT_EQ(reasm.queued_bytes(), 0u); +} + +TEST_F(ReassemblyQueueTest, ForwardTSNRemoveUnordered) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Unordered({1}, "B")); + reasm.Add(TSN(12), gen_.Unordered({3})); + reasm.Add(TSN(13), gen_.Unordered({4}, "E")); + + reasm.Add(TSN(14), gen_.Unordered({5}, "B")); + reasm.Add(TSN(15), gen_.Unordered({6})); + reasm.Add(TSN(17), gen_.Unordered({8}, "E")); + EXPECT_EQ(reasm.queued_bytes(), 6u); + + EXPECT_FALSE(reasm.HasMessages()); + + reasm.Handle(ForwardTsnChunk(TSN(13), {})); + EXPECT_EQ(reasm.queued_bytes(), 3u); + + // The lost chunk comes, but too late. + reasm.Add(TSN(11), gen_.Unordered({2})); + EXPECT_FALSE(reasm.HasMessages()); + EXPECT_EQ(reasm.queued_bytes(), 3u); + + // The second lost chunk comes, message is assembled. + reasm.Add(TSN(16), gen_.Unordered({7})); + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_EQ(reasm.queued_bytes(), 0u); +} + +TEST_F(ReassemblyQueueTest, ForwardTSNRemoveOrdered) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Ordered({1}, "B")); + reasm.Add(TSN(12), gen_.Ordered({3})); + reasm.Add(TSN(13), gen_.Ordered({4}, "E")); + + reasm.Add(TSN(14), gen_.Ordered({5}, "B")); + reasm.Add(TSN(15), gen_.Ordered({6})); + reasm.Add(TSN(16), gen_.Ordered({7})); + reasm.Add(TSN(17), gen_.Ordered({8}, "E")); + EXPECT_EQ(reasm.queued_bytes(), 7u); + + EXPECT_FALSE(reasm.HasMessages()); + + reasm.Handle(ForwardTsnChunk( + TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)})); + EXPECT_EQ(reasm.queued_bytes(), 0u); + + // The lost chunk comes, but too late. + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload))); +} + +TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Ordered({1}, "B")); + reasm.Add(TSN(12), gen_.Ordered({3})); + reasm.Add(TSN(13), gen_.Ordered({4}, "E")); + + reasm.Add(TSN(15), gen_.Ordered({5}, "B")); + reasm.Add(TSN(16), gen_.Ordered({6})); + reasm.Add(TSN(17), gen_.Ordered({7})); + reasm.Add(TSN(18), gen_.Ordered({8}, "E")); + EXPECT_EQ(reasm.queued_bytes(), 7u); + + EXPECT_FALSE(reasm.HasMessages()); + + reasm.Handle(ForwardTsnChunk( + TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)})); + EXPECT_EQ(reasm.queued_bytes(), 0u); + + // The lost chunk comes, but too late. + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload))); +} + +TEST_F(ReassemblyQueueTest, ShouldntDeliverMessagesBeforeInitialTsn) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(5), gen_.Unordered({1, 2, 3, 4}, "BE")); + EXPECT_EQ(reasm.queued_bytes(), 0u); + EXPECT_FALSE(reasm.HasMessages()); +} + +TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessages) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE")); + EXPECT_EQ(reasm.queued_bytes(), 0u); + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload))); + reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE")); + EXPECT_EQ(reasm.queued_bytes(), 0u); + EXPECT_FALSE(reasm.HasMessages()); +} + +TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessagesReallyUnordered) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B")); + EXPECT_EQ(reasm.queued_bytes(), 4u); + + EXPECT_FALSE(reasm.HasMessages()); + + reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE")); + EXPECT_EQ(reasm.queued_bytes(), 4u); + EXPECT_TRUE(reasm.HasMessages()); + + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload))); + reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE")); + EXPECT_EQ(reasm.queued_bytes(), 4u); + EXPECT_FALSE(reasm.HasMessages()); +} + +TEST_F(ReassemblyQueueTest, ShouldntDeliverBeforeForwardedTsn) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); + reasm.Handle(ForwardTsnChunk(TSN(12), {})); + + reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE")); + EXPECT_EQ(reasm.queued_bytes(), 0u); + EXPECT_FALSE(reasm.HasMessages()); +} + +} // namespace +} // namespace dcsctp