From 2a9bed3ee3773fe0f7328fb023a09575f952f801 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Tue, 11 May 2021 22:54:16 +0200 Subject: [PATCH] dcsctp: Add interleaved reassembly streams This is the receive-side part of supporting what is frequently called "ndata", but actually RFC8260 - "User Message Interleaving". This CL adds a new ReassemblyStreams implementation that can assemble I-DATA chunks and process I-FORWARD-TSN for partial reliability. Bug: webrtc:5696 Change-Id: I3cfbea62e7b6c02fbd3f51b43ba3fb7863cf0f88 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/218506 Commit-Queue: Victor Boivie Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/main@{#37128} --- net/dcsctp/rx/BUILD.gn | 25 ++ .../rx/interleaved_reassembly_streams.cc | 270 ++++++++++++++++++ .../rx/interleaved_reassembly_streams.h | 111 +++++++ .../rx/interleaved_reassembly_streams_test.cc | 154 ++++++++++ net/dcsctp/rx/reassembly_queue.cc | 20 +- net/dcsctp/rx/reassembly_queue.h | 1 + net/dcsctp/rx/reassembly_queue_test.cc | 102 ++++++- net/dcsctp/rx/reassembly_streams.cc | 55 ++++ .../socket/transmission_control_block.h | 1 + 9 files changed, 736 insertions(+), 3 deletions(-) create mode 100644 net/dcsctp/rx/interleaved_reassembly_streams.cc create mode 100644 net/dcsctp/rx/interleaved_reassembly_streams.h create mode 100644 net/dcsctp/rx/interleaved_reassembly_streams_test.cc create mode 100644 net/dcsctp/rx/reassembly_streams.cc diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn index d24f6d6fb3..8ef60dcd5f 100644 --- a/net/dcsctp/rx/BUILD.gn +++ b/net/dcsctp/rx/BUILD.gn @@ -44,6 +44,28 @@ rtc_source_set("reassembly_streams") { absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] } +rtc_library("interleaved_reassembly_streams") { + deps = [ + ":reassembly_streams", + "../../../api:array_view", + "../../../rtc_base", + "../../../rtc_base:checks", + "../../../rtc_base:logging", + "../common:sequence_numbers", + "../packet:chunk", + "../packet:data", + "../public:types", + ] + sources = [ + "interleaved_reassembly_streams.cc", + "interleaved_reassembly_streams.h", + ] + absl_deps = [ + "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", + ] +} rtc_library("traditional_reassembly_streams") { deps = [ ":reassembly_streams", @@ -68,6 +90,7 @@ rtc_library("traditional_reassembly_streams") { rtc_library("reassembly_queue") { deps = [ + ":interleaved_reassembly_streams", ":reassembly_streams", ":traditional_reassembly_streams", "../../../api:array_view", @@ -98,6 +121,7 @@ if (rtc_include_tests) { deps = [ ":data_tracker", + ":interleaved_reassembly_streams", ":reassembly_queue", ":reassembly_streams", ":traditional_reassembly_streams", @@ -117,6 +141,7 @@ if (rtc_include_tests) { absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] sources = [ "data_tracker_test.cc", + "interleaved_reassembly_streams_test.cc", "reassembly_queue_test.cc", "traditional_reassembly_streams_test.cc", ] diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.cc b/net/dcsctp/rx/interleaved_reassembly_streams.cc new file mode 100644 index 0000000000..847058b7f8 --- /dev/null +++ b/net/dcsctp/rx/interleaved_reassembly_streams.cc @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/rx/interleaved_reassembly_streams.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "api/array_view.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/types.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +InterleavedReassemblyStreams::InterleavedReassemblyStreams( + absl::string_view log_prefix, + OnAssembledMessage on_assembled_message, + const DcSctpSocketHandoverState* handover_state) + : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) { + if (handover_state) { + for (const DcSctpSocketHandoverState::OrderedStream& state : + handover_state->rx.ordered_streams) { + FullStreamId stream_id(IsUnordered(false), StreamID(state.id)); + streams_.emplace( + std::piecewise_construct, std::forward_as_tuple(stream_id), + std::forward_as_tuple(stream_id, this, MID(state.next_ssn))); + } + for (const DcSctpSocketHandoverState::UnorderedStream& state : + handover_state->rx.unordered_streams) { + FullStreamId stream_id(IsUnordered(true), StreamID(state.id)); + streams_.emplace(std::piecewise_construct, + std::forward_as_tuple(stream_id), + std::forward_as_tuple(stream_id, this)); + } + } +} + +size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage( + UnwrappedMID mid) { + std::map::const_iterator it = + chunks_by_mid_.find(mid); + if (it == chunks_by_mid_.end()) { + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << " - no chunks"; + return 0; + } + const ChunkMap& chunks = it->second; + if (!chunks.begin()->second.second.is_beginning || + !chunks.rbegin()->second.second.is_end) { + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << "- missing beginning or end"; + return 0; + } + int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first; + if (fsn_diff != (static_cast(chunks.size()) - 1)) { + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << "- not all chunks exist (have " + << chunks.size() << ", expect " << (fsn_diff + 1) + << ")"; + return 0; + } + + size_t removed_bytes = AssembleMessage(chunks); + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << " - succeeded and removed " + << removed_bytes; + + chunks_by_mid_.erase(mid); + return removed_bytes; +} + +size_t InterleavedReassemblyStreams::Stream::AssembleMessage( + const ChunkMap& tsn_chunks) { + size_t count = tsn_chunks.size(); + if (count == 1) { + // Fast path - zero-copy + const Data& data = tsn_chunks.begin()->second.second; + size_t payload_size = data.size(); + UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first}; + DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload)); + parent_.on_assembled_message_(tsns, std::move(message)); + return payload_size; + } + + // Slow path - will need to concatenate the payload. + std::vector tsns; + tsns.reserve(count); + + std::vector payload; + size_t payload_size = absl::c_accumulate( + tsn_chunks, 0, + [](size_t v, const auto& p) { return v + p.second.second.size(); }); + payload.reserve(payload_size); + + for (auto& item : tsn_chunks) { + const UnwrappedTSN tsn = item.second.first; + const Data& data = item.second.second; + tsns.push_back(tsn); + payload.insert(payload.end(), data.payload.begin(), data.payload.end()); + } + + const Data& data = tsn_chunks.begin()->second.second; + + DcSctpMessage message(data.stream_id, data.ppid, std::move(payload)); + parent_.on_assembled_message_(tsns, std::move(message)); + return payload_size; +} + +size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) { + UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id); + + size_t removed_bytes = 0; + auto it = chunks_by_mid_.begin(); + while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) { + removed_bytes += absl::c_accumulate( + it->second, 0, + [](size_t r2, const auto& q) { return r2 + q.second.second.size(); }); + it = chunks_by_mid_.erase(it); + } + + if (!stream_id_.unordered) { + // For ordered streams, erasing a message might suddenly unblock that queue + // and allow it to deliver any following received messages. + if (unwrapped_mid >= next_mid_) { + next_mid_ = unwrapped_mid.next_value(); + } + + removed_bytes += TryToAssembleMessages(); + } + + return removed_bytes; +} + +int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) { + RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered); + RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id); + int queued_bytes = data.size(); + UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id); + FSN fsn = data.fsn; + auto [unused, inserted] = + chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data))); + if (!inserted) { + return 0; + } + + if (stream_id_.unordered) { + queued_bytes -= TryToAssembleMessage(mid); + } else { + if (mid == next_mid_) { + queued_bytes -= TryToAssembleMessages(); + } + } + + return queued_bytes; +} + +size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() { + size_t removed_bytes = 0; + + for (;;) { + size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_); + if (removed_bytes_this_iter == 0) { + break; + } + + removed_bytes += removed_bytes_this_iter; + next_mid_.Increment(); + } + return removed_bytes; +} + +void InterleavedReassemblyStreams::Stream::AddHandoverState( + DcSctpSocketHandoverState& state) const { + if (stream_id_.unordered) { + DcSctpSocketHandoverState::UnorderedStream state_stream; + state_stream.id = stream_id_.stream_id.value(); + state.rx.unordered_streams.push_back(std::move(state_stream)); + } else { + DcSctpSocketHandoverState::OrderedStream state_stream; + state_stream.id = stream_id_.stream_id.value(); + state_stream.next_ssn = next_mid_.Wrap().value(); + state.rx.ordered_streams.push_back(std::move(state_stream)); + } +} + +InterleavedReassemblyStreams::Stream& +InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + it = + streams_ + .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), + std::forward_as_tuple(stream_id, this)) + .first; + } + return it->second; +} + +int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) { + return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id)) + .Add(tsn, std::move(data)); +} + +size_t InterleavedReassemblyStreams::HandleForwardTsn( + UnwrappedTSN new_cumulative_ack_tsn, + rtc::ArrayView skipped_streams) { + size_t removed_bytes = 0; + for (const auto& skipped : skipped_streams) { + removed_bytes += + GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id)) + .EraseTo(skipped.message_id); + } + return removed_bytes; +} + +void InterleavedReassemblyStreams::ResetStreams( + rtc::ArrayView stream_ids) { + if (stream_ids.empty()) { + for (auto& entry : streams_) { + entry.second.Reset(); + } + } else { + for (StreamID stream_id : stream_ids) { + GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset(); + GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset(); + } + } +} + +HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness() + const { + HandoverReadinessStatus status; + for (const auto& [stream_id, stream] : streams_) { + if (stream.has_unassembled_chunks()) { + status.Add( + stream_id.unordered + ? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks + : HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks); + break; + } + } + return status; +} + +void InterleavedReassemblyStreams::AddHandoverState( + DcSctpSocketHandoverState& state) { + for (const auto& [unused, stream] : streams_) { + stream.AddHandoverState(state); + } +} + +} // namespace dcsctp diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.h b/net/dcsctp/rx/interleaved_reassembly_streams.h new file mode 100644 index 0000000000..9d4bbc799d --- /dev/null +++ b/net/dcsctp/rx/interleaved_reassembly_streams.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_ +#define NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_ + +#include +#include +#include +#include + +#include "absl/strings/string_view.h" +#include "api/array_view.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/rx/reassembly_streams.h" + +namespace dcsctp { + +// Handles reassembly of incoming data when interleaved message sending is +// enabled on the association, i.e. when RFC8260 is in use. +class InterleavedReassemblyStreams : public ReassemblyStreams { + public: + InterleavedReassemblyStreams( + absl::string_view log_prefix, + OnAssembledMessage on_assembled_message, + const DcSctpSocketHandoverState* handover_state = nullptr); + + int Add(UnwrappedTSN tsn, Data data) override; + + size_t HandleForwardTsn( + UnwrappedTSN new_cumulative_ack_tsn, + rtc::ArrayView skipped_streams) + override; + + void ResetStreams(rtc::ArrayView stream_ids) override; + + HandoverReadinessStatus GetHandoverReadiness() const override; + void AddHandoverState(DcSctpSocketHandoverState& state) override; + + private: + struct FullStreamId { + const IsUnordered unordered; + const StreamID stream_id; + + FullStreamId(IsUnordered unordered, StreamID stream_id) + : unordered(unordered), stream_id(stream_id) {} + + friend bool operator<(FullStreamId a, FullStreamId b) { + return a.unordered < b.unordered || + (!(a.unordered < b.unordered) && (a.stream_id < b.stream_id)); + } + }; + + class Stream { + public: + Stream(FullStreamId stream_id, + InterleavedReassemblyStreams* parent, + MID next_mid = MID(0)) + : stream_id_(stream_id), + parent_(*parent), + next_mid_(mid_unwrapper_.Unwrap(next_mid)) {} + int Add(UnwrappedTSN tsn, Data data); + size_t EraseTo(MID message_id); + void Reset() { + mid_unwrapper_.Reset(); + next_mid_ = mid_unwrapper_.Unwrap(MID(0)); + } + bool has_unassembled_chunks() const { return !chunks_by_mid_.empty(); } + void AddHandoverState(DcSctpSocketHandoverState& state) const; + + private: + using ChunkMap = std::map>; + + // Try to assemble one message identified by `mid`. + // Returns the number of bytes assembled if a message was assembled. + size_t TryToAssembleMessage(UnwrappedMID mid); + size_t AssembleMessage(const ChunkMap& tsn_chunks); + // Try to assemble one or several messages in order from the stream. + // Returns the number of bytes assembled if one or more messages were + // assembled. + size_t TryToAssembleMessages(); + + const FullStreamId stream_id_; + InterleavedReassemblyStreams& parent_; + std::map chunks_by_mid_; + UnwrappedMID::Unwrapper mid_unwrapper_; + UnwrappedMID next_mid_; + }; + + Stream& GetOrCreateStream(const FullStreamId& stream_id); + + const std::string log_prefix_; + + // Callback for when a message has been assembled. + const OnAssembledMessage on_assembled_message_; + + // All unordered and ordered streams, managing not-yet-assembled data. + std::map streams_; +}; + +} // namespace dcsctp + +#endif // NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_ diff --git a/net/dcsctp/rx/interleaved_reassembly_streams_test.cc b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc new file mode 100644 index 0000000000..df4024ed60 --- /dev/null +++ b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/rx/interleaved_reassembly_streams.h" + +#include +#include +#include + +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/rx/reassembly_streams.h" +#include "net/dcsctp/testing/data_generator.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::MockFunction; +using ::testing::NiceMock; + +class InterleavedReassemblyStreamsTest : public testing::Test { + protected: + UnwrappedTSN tsn(uint32_t value) { return tsn_.Unwrap(TSN(value)); } + + InterleavedReassemblyStreamsTest() {} + DataGenerator gen_; + UnwrappedTSN::Unwrapper tsn_; +}; + +TEST_F(InterleavedReassemblyStreamsTest, + AddUnorderedMessageReturnsCorrectSize) { + NiceMock> on_assembled; + + InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1); + EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3); + EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2); + // Adding the end fragment should make it empty again. + EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({7}, "E")), -6); +} + +TEST_F(InterleavedReassemblyStreamsTest, + AddSimpleOrderedMessageReturnsCorrectSize) { + NiceMock> on_assembled; + + InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1); + EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3); + EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2); + EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), -6); +} + +TEST_F(InterleavedReassemblyStreamsTest, + AddMoreComplexOrderedMessageReturnsCorrectSize) { + NiceMock> on_assembled; + + InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1); + Data late = gen_.Ordered({2, 3, 4}); + EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2); + EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1); + + EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1); + EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2); + EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1); + EXPECT_EQ(streams.Add(tsn(2), std::move(late)), -8); +} + +TEST_F(InterleavedReassemblyStreamsTest, + DeleteUnorderedMessageReturnsCorrectSize) { + NiceMock> on_assembled; + + InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1); + EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3); + EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2); + + IForwardTsnChunk::SkippedStream skipped[] = { + IForwardTsnChunk::SkippedStream(IsUnordered(true), StreamID(1), MID(0))}; + EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u); +} + +TEST_F(InterleavedReassemblyStreamsTest, + DeleteSimpleOrderedMessageReturnsCorrectSize) { + NiceMock> on_assembled; + + InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1); + EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3); + EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2); + + IForwardTsnChunk::SkippedStream skipped[] = { + IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(0))}; + EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u); +} + +TEST_F(InterleavedReassemblyStreamsTest, + DeleteManyOrderedMessagesReturnsCorrectSize) { + NiceMock> on_assembled; + + InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1); + gen_.Ordered({2, 3, 4}); + EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2); + EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1); + + EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1); + EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2); + EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1); + + // Expire all three messages + IForwardTsnChunk::SkippedStream skipped[] = { + IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(2))}; + EXPECT_EQ(streams.HandleForwardTsn(tsn(8), skipped), 8u); +} + +TEST_F(InterleavedReassemblyStreamsTest, + DeleteOrderedMessageDelivesTwoReturnsCorrectSize) { + NiceMock> on_assembled; + + InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1); + gen_.Ordered({2, 3, 4}); + EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2); + EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1); + + EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1); + EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2); + EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1); + + // The first ordered message expire, and the following two are delivered. + IForwardTsnChunk::SkippedStream skipped[] = { + IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(0))}; + EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc index cbf198b136..e0c47f731b 100644 --- a/net/dcsctp/rx/reassembly_queue.cc +++ b/net/dcsctp/rx/reassembly_queue.cc @@ -29,15 +29,32 @@ #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" #include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/rx/interleaved_reassembly_streams.h" #include "net/dcsctp/rx/reassembly_streams.h" #include "net/dcsctp/rx/traditional_reassembly_streams.h" #include "rtc_base/logging.h" namespace dcsctp { +namespace { +std::unique_ptr CreateStreams( + absl::string_view log_prefix, + ReassemblyStreams::OnAssembledMessage on_assembled_message, + bool use_message_interleaving, + const DcSctpSocketHandoverState* handover_state) { + if (use_message_interleaving) { + return std::make_unique( + log_prefix, std::move(on_assembled_message), handover_state); + } + return std::make_unique( + log_prefix, std::move(on_assembled_message), handover_state); +} +} // namespace + ReassemblyQueue::ReassemblyQueue( absl::string_view log_prefix, TSN peer_initial_tsn, size_t max_size_bytes, + bool use_message_interleaving, const DcSctpSocketHandoverState* handover_state) : log_prefix_(std::string(log_prefix) + "reasm: "), max_size_bytes_(max_size_bytes), @@ -50,12 +67,13 @@ ReassemblyQueue::ReassemblyQueue( ? ReconfigRequestSN( handover_state->rx.last_completed_deferred_reset_req_sn) : ReconfigRequestSN(0)), - streams_(std::make_unique( + streams_(CreateStreams( log_prefix_, [this](rtc::ArrayView tsns, DcSctpMessage message) { AddReassembledMessage(tsns, std::move(message)); }, + use_message_interleaving, handover_state)) {} void ReassemblyQueue::Add(TSN tsn, Data data) { diff --git a/net/dcsctp/rx/reassembly_queue.h b/net/dcsctp/rx/reassembly_queue.h index 9cc0c61eb6..ab5dd5e1b4 100644 --- a/net/dcsctp/rx/reassembly_queue.h +++ b/net/dcsctp/rx/reassembly_queue.h @@ -72,6 +72,7 @@ class ReassemblyQueue { ReassemblyQueue(absl::string_view log_prefix, TSN peer_initial_tsn, size_t max_size_bytes, + bool use_message_interleaving = false, const DcSctpSocketHandoverState* handover_state = nullptr); // Adds a data chunk to the queue, with a `tsn` and other parameters in diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc index bc1b776837..cac469f89f 100644 --- a/net/dcsctp/rx/reassembly_queue_test.cc +++ b/net/dcsctp/rx/reassembly_queue_test.cc @@ -33,6 +33,7 @@ namespace dcsctp { namespace { using ::testing::ElementsAre; using ::testing::SizeIs; +using ::testing::UnorderedElementsAre; // The default maximum size of the Reassembly Queue. static constexpr size_t kBufferSize = 10000; @@ -45,6 +46,11 @@ static constexpr PPID kPPID(53); static constexpr std::array kShortPayload = {1, 2, 3, 4}; static constexpr std::array kMessage2Payload = {5, 6, 7, 8}; +static constexpr std::array kSixBytePayload = {1, 2, 3, 4, 5, 6}; +static constexpr std::array kMediumPayload1 = {1, 2, 3, 4, + 5, 6, 7, 8}; +static constexpr std::array kMediumPayload2 = {9, 10, 11, 12, + 13, 14, 15, 16}; static constexpr std::array kLongPayload = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; @@ -369,7 +375,8 @@ TEST_F(ReassemblyQueueTest, HandoverInInitialState) { DcSctpSocketHandoverState state; reasm1.AddHandoverState(state); g_handover_state_transformer_for_test(&state); - ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state); + ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, + /*use_message_interleaving=*/false, &state); reasm2.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE")); EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1)); @@ -384,7 +391,8 @@ TEST_F(ReassemblyQueueTest, HandoverAfterHavingAssembedOneMessage) { DcSctpSocketHandoverState state; reasm1.AddHandoverState(state); g_handover_state_transformer_for_test(&state); - ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state); + ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, + /*use_message_interleaving=*/false, &state); reasm2.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE")); EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1)); @@ -405,5 +413,95 @@ TEST_F(ReassemblyQueueTest, HandleInconsistentForwardTSN) { // Don't assemble SSN=7, as that TSN is skipped. EXPECT_FALSE(reasm.HasMessages()); } + +TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessageInRfc8260) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, + /*use_message_interleaving=*/true); + reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID, + {1, 2, 3, 4}, Data::IsBeginning(true), + Data::IsEnd(true), IsUnordered(true))); + EXPECT_EQ(reasm.queued_bytes(), 0u); + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload))); +} + +TEST_F(ReassemblyQueueTest, TwoInterleavedChunks) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, + /*use_message_interleaving=*/true); + reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID, + {1, 2, 3, 4}, Data::IsBeginning(true), + Data::IsEnd(false), IsUnordered(true))); + reasm.Add(TSN(11), Data(StreamID(2), SSN(0), MID(0), FSN(0), kPPID, + {9, 10, 11, 12}, Data::IsBeginning(true), + Data::IsEnd(false), IsUnordered(true))); + EXPECT_EQ(reasm.queued_bytes(), 8u); + reasm.Add(TSN(12), Data(StreamID(1), SSN(0), MID(0), FSN(1), kPPID, + {5, 6, 7, 8}, Data::IsBeginning(false), + Data::IsEnd(true), IsUnordered(true))); + EXPECT_EQ(reasm.queued_bytes(), 4u); + reasm.Add(TSN(13), Data(StreamID(2), SSN(0), MID(0), FSN(1), kPPID, + {13, 14, 15, 16}, Data::IsBeginning(false), + Data::IsEnd(true), IsUnordered(true))); + EXPECT_EQ(reasm.queued_bytes(), 0u); + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(StreamID(1), kPPID, kMediumPayload1), + SctpMessageIs(StreamID(2), kPPID, kMediumPayload2))); +} + +TEST_F(ReassemblyQueueTest, UnorderedInterleavedMessagesAllPermutations) { + std::vector indexes = {0, 1, 2, 3, 4, 5}; + TSN tsns[] = {TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15)}; + StreamID stream_ids[] = {StreamID(1), StreamID(2), StreamID(1), + StreamID(1), StreamID(2), StreamID(2)}; + FSN fsns[] = {FSN(0), FSN(0), FSN(1), FSN(2), FSN(1), FSN(2)}; + rtc::ArrayView payload(kSixBytePayload); + do { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, + /*use_message_interleaving=*/true); + for (int i : indexes) { + auto span = payload.subview(*fsns[i] * 2, 2); + Data::IsBeginning is_beginning(fsns[i] == FSN(0)); + Data::IsEnd is_end(fsns[i] == FSN(2)); + reasm.Add(tsns[i], Data(stream_ids[i], SSN(0), MID(0), fsns[i], kPPID, + std::vector(span.begin(), span.end()), + is_beginning, is_end, IsUnordered(true))); + } + EXPECT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + UnorderedElementsAre( + SctpMessageIs(StreamID(1), kPPID, kSixBytePayload), + SctpMessageIs(StreamID(2), kPPID, kSixBytePayload))); + EXPECT_EQ(reasm.queued_bytes(), 0u); + } while (std::next_permutation(std::begin(indexes), std::end(indexes))); +} + +TEST_F(ReassemblyQueueTest, IForwardTSNRemoveALotOrdered) { + ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, + /*use_message_interleaving=*/true); + reasm.Add(TSN(10), gen_.Ordered({1}, "B")); + gen_.Ordered({2}, ""); + reasm.Add(TSN(12), gen_.Ordered({3}, "")); + reasm.Add(TSN(13), gen_.Ordered({4}, "E")); + reasm.Add(TSN(15), gen_.Ordered({5}, "B")); + reasm.Add(TSN(16), gen_.Ordered({6}, "")); + reasm.Add(TSN(17), gen_.Ordered({7}, "")); + reasm.Add(TSN(18), gen_.Ordered({8}, "E")); + + ASSERT_FALSE(reasm.HasMessages()); + EXPECT_EQ(reasm.queued_bytes(), 7u); + + reasm.Handle( + IForwardTsnChunk(TSN(13), {IForwardTsnChunk::SkippedStream( + IsUnordered(false), kStreamID, MID(0))})); + EXPECT_EQ(reasm.queued_bytes(), 0u); + + // The lost chunk comes, but too late. + ASSERT_TRUE(reasm.HasMessages()); + EXPECT_THAT(reasm.FlushMessages(), + ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload))); +} + } // namespace } // namespace dcsctp diff --git a/net/dcsctp/rx/reassembly_streams.cc b/net/dcsctp/rx/reassembly_streams.cc new file mode 100644 index 0000000000..9fd52fb15d --- /dev/null +++ b/net/dcsctp/rx/reassembly_streams.cc @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/rx/reassembly_streams.h" + +#include +#include +#include + +namespace dcsctp { + +ReassembledMessage AssembleMessage(std::map::iterator start, + std::map::iterator end) { + size_t count = std::distance(start, end); + + if (count == 1) { + // Fast path - zero-copy + Data& data = start->second; + + return ReassembledMessage{ + .tsns = {start->first}, + .message = DcSctpMessage(data.stream_id, data.ppid, + std::move(start->second.payload)), + }; + } + + // Slow path - will need to concatenate the payload. + std::vector tsns; + std::vector payload; + + size_t payload_size = std::accumulate( + start, end, 0, + [](size_t v, const auto& p) { return v + p.second.size(); }); + + tsns.reserve(count); + payload.reserve(payload_size); + for (auto it = start; it != end; ++it) { + Data& data = it->second; + tsns.push_back(it->first); + payload.insert(payload.end(), data.payload.begin(), data.payload.end()); + } + + return ReassembledMessage{ + .tsns = std::move(tsns), + .message = DcSctpMessage(start->second.stream_id, start->second.ppid, + std::move(payload)), + }; +} +} // namespace dcsctp diff --git a/net/dcsctp/socket/transmission_control_block.h b/net/dcsctp/socket/transmission_control_block.h index 41a79eada5..038ad3683f 100644 --- a/net/dcsctp/socket/transmission_control_block.h +++ b/net/dcsctp/socket/transmission_control_block.h @@ -98,6 +98,7 @@ class TransmissionControlBlock : public Context { reassembly_queue_(log_prefix, peer_initial_tsn, options.max_receiver_window_buffer_size, + capabilities.message_interleaving, handover_state), retransmission_queue_( log_prefix,