From ad6b7a733a05f1a938c24cb6b619accb50e2966f Mon Sep 17 00:00:00 2001 From: Sergey Sukhanov Date: Tue, 14 Sep 2021 13:59:55 +0200 Subject: [PATCH] dcsctp: introduce handover API types and implement it for streams Bug: webrtc:13154 Change-Id: Ifa250175af79b7adc87dbc2750054adc94b90bb7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/231842 Reviewed-by: Victor Boivie Commit-Queue: Sergey Sukhanov Cr-Commit-Position: refs/heads/main@{#34991} --- net/dcsctp/public/BUILD.gn | 2 + net/dcsctp/public/dcsctp_handover_state.h | 92 +++++++++++++++++++ net/dcsctp/rx/BUILD.gn | 1 + net/dcsctp/rx/reassembly_streams.h | 4 + .../rx/traditional_reassembly_streams.cc | 59 ++++++++++++ .../rx/traditional_reassembly_streams.h | 18 +++- .../rx/traditional_reassembly_streams_test.cc | 80 ++++++++++++++++ 7 files changed, 251 insertions(+), 5 deletions(-) create mode 100644 net/dcsctp/public/dcsctp_handover_state.h diff --git a/net/dcsctp/public/BUILD.gn b/net/dcsctp/public/BUILD.gn index 23530a6b52..8fb521cfb2 100644 --- a/net/dcsctp/public/BUILD.gn +++ b/net/dcsctp/public/BUILD.gn @@ -27,6 +27,7 @@ rtc_source_set("types") { rtc_source_set("socket") { deps = [ + ":strong_alias", ":types", "../../../api:array_view", "../../../rtc_base", @@ -34,6 +35,7 @@ rtc_source_set("socket") { "../../../rtc_base:rtc_base_approved", ] sources = [ + "dcsctp_handover_state.h", "dcsctp_socket.h", "packet_observer.h", "timeout.h", diff --git a/net/dcsctp/public/dcsctp_handover_state.h b/net/dcsctp/public/dcsctp_handover_state.h new file mode 100644 index 0000000000..d1267cafd3 --- /dev/null +++ b/net/dcsctp/public/dcsctp_handover_state.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_ +#define NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_ + +#include +#include +#include + +#include "net/dcsctp/public/strong_alias.h" + +namespace dcsctp { + +// Stores state snapshot of a dcSCTP socket. The snapshot can be used to +// recreate the socket - possibly in another process. This state should be +// treaded as opaque - the calling client should not inspect or alter it except +// for serialization. Serialization is not provided by dcSCTP. If needed it has +// to be implemented in the calling client. +struct DcSctpSocketHandoverState { + struct OrderedStream { + uint32_t id = 0; + uint32_t next_ssn = 0; + }; + struct UnorderedStream { + uint32_t id = 0; + }; + struct Receive { + std::vector ordered_streams; + std::vector unordered_streams; + }; + Receive rx; +}; + +// A list of possible reasons for a socket to be not ready for handover. +enum class HandoverUnreadinessReason : uint32_t { + kWrongConnectionState = 1, + kSendQueueNotEmpty = 2, + kDataTrackerNotIdle = 4, + kDataTrackerTsnBlocksPending = 8, + kReassemblyQueueNotEmpty = 16, + kReassemblyQueueDeliveredTSNsGap = 32, + kStreamResetDeferred = 64, + kOrderedStreamHasUnassembledChunks = 128, + kUnorderedStreamHasUnassembledChunks = 256, + kRetransmissionQueueOutstandingData = 512, + kRetransmissionQueueFastRecovery = 1024, + kRetransmissionQueueNotEmpty = 2048, + kPendingStreamReset = 4096, + kPendingStreamResetRequest = 8192, + kMax = kPendingStreamResetRequest, +}; + +// Return value of `DcSctpSocketInterface::GetHandoverReadiness`. Set of +// `HandoverUnreadinessReason` bits. When no bit is set, the socket is in the +// state in which a snapshot of the state can be made by +// `GetHandoverStateAndClose()`. +class HandoverReadinessStatus + : public StrongAlias { + public: + // Constructs an empty `HandoverReadinessStatus` which represents ready state. + constexpr HandoverReadinessStatus() + : StrongAlias(0) {} + // Constructs status object that contains a single reason for not being + // handover ready. + constexpr explicit HandoverReadinessStatus(HandoverUnreadinessReason reason) + : StrongAlias( + static_cast(reason)) {} + + // Convenience methods + constexpr bool IsReady() const { return value() == 0; } + constexpr bool Contains(HandoverUnreadinessReason reason) const { + return value() & static_cast(reason); + } + HandoverReadinessStatus& Add(HandoverUnreadinessReason reason) { + return Add(HandoverReadinessStatus(reason)); + } + HandoverReadinessStatus& Add(HandoverReadinessStatus status) { + value() |= status.value(); + return *this; + } +}; + +} // namespace dcsctp + +#endif // NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_ diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn index fb92513158..6c49648984 100644 --- a/net/dcsctp/rx/BUILD.gn +++ b/net/dcsctp/rx/BUILD.gn @@ -36,6 +36,7 @@ rtc_source_set("reassembly_streams") { "../common:sequence_numbers", "../packet:chunk", "../packet:data", + "../public:socket", "../public:types", ] sources = [ "reassembly_streams.h" ] diff --git a/net/dcsctp/rx/reassembly_streams.h b/net/dcsctp/rx/reassembly_streams.h index a8b42b5a2d..06f1a781ce 100644 --- a/net/dcsctp/rx/reassembly_streams.h +++ b/net/dcsctp/rx/reassembly_streams.h @@ -21,6 +21,7 @@ #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h" #include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_handover_state.h" #include "net/dcsctp/public/dcsctp_message.h" namespace dcsctp { @@ -77,6 +78,9 @@ class ReassemblyStreams { // either a few streams, or all streams (when the list is empty) to be // reset - to have their next SSN or Message ID to be zero. virtual void ResetStreams(rtc::ArrayView stream_ids) = 0; + + virtual HandoverReadinessStatus GetHandoverReadiness() const = 0; + virtual void AddHandoverState(DcSctpSocketHandoverState& state) = 0; }; } // namespace dcsctp diff --git a/net/dcsctp/rx/traditional_reassembly_streams.cc b/net/dcsctp/rx/traditional_reassembly_streams.cc index 4108d37236..c9af293056 100644 --- a/net/dcsctp/rx/traditional_reassembly_streams.cc +++ b/net/dcsctp/rx/traditional_reassembly_streams.cc @@ -78,6 +78,30 @@ absl::optional::iterator> FindEnd( } } // namespace +TraditionalReassemblyStreams::TraditionalReassemblyStreams( + absl::string_view log_prefix, + OnAssembledMessage on_assembled_message, + const DcSctpSocketHandoverState* handover_state) + : log_prefix_(log_prefix), + on_assembled_message_(std::move(on_assembled_message)) { + if (handover_state) { + for (const DcSctpSocketHandoverState::OrderedStream& state_stream : + handover_state->rx.ordered_streams) { + ordered_streams_.emplace( + std::piecewise_construct, + std::forward_as_tuple(StreamID(state_stream.id)), + std::forward_as_tuple(this, SSN(state_stream.next_ssn))); + } + for (const DcSctpSocketHandoverState::UnorderedStream& state_stream : + handover_state->rx.unordered_streams) { + unordered_streams_.emplace( + std::piecewise_construct, + std::forward_as_tuple(StreamID(state_stream.id)), + std::forward_as_tuple(this)); + } + } +} + int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn, Data data) { int queued_bytes = data.size(); @@ -286,4 +310,39 @@ void TraditionalReassemblyStreams::ResetStreams( } } } + +HandoverReadinessStatus TraditionalReassemblyStreams::GetHandoverReadiness() + const { + HandoverReadinessStatus status; + for (const auto& entry : ordered_streams_) { + if (entry.second.has_unassembled_chunks()) { + status.Add(HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks); + break; + } + } + for (const auto& entry : unordered_streams_) { + if (entry.second.has_unassembled_chunks()) { + status.Add( + HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks); + break; + } + } + return status; +} + +void TraditionalReassemblyStreams::AddHandoverState( + DcSctpSocketHandoverState& state) { + for (const auto& entry : ordered_streams_) { + DcSctpSocketHandoverState::OrderedStream state_stream; + state_stream.id = entry.first.value(); + state_stream.next_ssn = entry.second.next_ssn().value(); + state.rx.ordered_streams.push_back(std::move(state_stream)); + } + for (const auto& entry : unordered_streams_) { + DcSctpSocketHandoverState::UnorderedStream state_stream; + state_stream.id = entry.first.value(); + state.rx.unordered_streams.push_back(std::move(state_stream)); + } +} + } // namespace dcsctp diff --git a/net/dcsctp/rx/traditional_reassembly_streams.h b/net/dcsctp/rx/traditional_reassembly_streams.h index d7ae2dd1b3..0c724327e2 100644 --- a/net/dcsctp/rx/traditional_reassembly_streams.h +++ b/net/dcsctp/rx/traditional_reassembly_streams.h @@ -29,9 +29,10 @@ namespace dcsctp { // RFC4960 is to be followed. class TraditionalReassemblyStreams : public ReassemblyStreams { public: - TraditionalReassemblyStreams(absl::string_view log_prefix, - OnAssembledMessage on_assembled_message) - : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {} + TraditionalReassemblyStreams( + absl::string_view log_prefix, + OnAssembledMessage on_assembled_message, + const DcSctpSocketHandoverState* handover_state = nullptr); int Add(UnwrappedTSN tsn, Data data) override; @@ -42,6 +43,9 @@ class TraditionalReassemblyStreams : public ReassemblyStreams { void ResetStreams(rtc::ArrayView stream_ids) override; + HandoverReadinessStatus GetHandoverReadiness() const override; + void AddHandoverState(DcSctpSocketHandoverState& state) override; + private: using ChunkMap = std::map; @@ -65,6 +69,7 @@ class TraditionalReassemblyStreams : public ReassemblyStreams { int Add(UnwrappedTSN tsn, Data data); // Returns the number of bytes removed from the queue. size_t EraseTo(UnwrappedTSN tsn); + bool has_unassembled_chunks() const { return !chunks_.empty(); } private: // Given an iterator to any chunk within the map, try to assemble a message @@ -81,14 +86,17 @@ class TraditionalReassemblyStreams : public ReassemblyStreams { // messages when possible. class OrderedStream : StreamBase { public: - explicit OrderedStream(TraditionalReassemblyStreams* parent) - : StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(SSN(0))) {} + explicit OrderedStream(TraditionalReassemblyStreams* parent, + SSN next_ssn = SSN(0)) + : StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(next_ssn)) {} int Add(UnwrappedTSN tsn, Data data); size_t EraseTo(SSN ssn); void Reset() { ssn_unwrapper_.Reset(); next_ssn_ = ssn_unwrapper_.Unwrap(SSN(0)); } + SSN next_ssn() const { return next_ssn_.Wrap(); } + bool has_unassembled_chunks() const { return !chunks_by_ssn_.empty(); } private: // Try to assemble one or several messages in order from the stream. diff --git a/net/dcsctp/rx/traditional_reassembly_streams_test.cc b/net/dcsctp/rx/traditional_reassembly_streams_test.cc index 30d29a05dc..f58bfed4a4 100644 --- a/net/dcsctp/rx/traditional_reassembly_streams_test.cc +++ b/net/dcsctp/rx/traditional_reassembly_streams_test.cc @@ -148,5 +148,85 @@ TEST_F(TraditionalReassemblyStreamsTest, EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u); } +TEST_F(TraditionalReassemblyStreamsTest, NoStreamsCanBeHandedOver) { + NiceMock> on_assembled; + + TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction()); + EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady()); + + DcSctpSocketHandoverState state; + streams1.AddHandoverState(state); + TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(), + &state); + + EXPECT_EQ(streams2.Add(tsn(1), gen_.Ordered({1}, "B")), 1); + EXPECT_EQ(streams2.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3); + EXPECT_EQ(streams2.Add(tsn(1), gen_.Unordered({1}, "B")), 1); + EXPECT_EQ(streams2.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3); +} + +TEST_F(TraditionalReassemblyStreamsTest, + OrderedStreamsCanBeHandedOverWhenNoUnassembledChunksExist) { + NiceMock> on_assembled; + + TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams1.Add(tsn(1), gen_.Ordered({1}, "B")), 1); + EXPECT_EQ(streams1.GetHandoverReadiness(), + HandoverReadinessStatus( + HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks)); + EXPECT_EQ(streams1.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3); + EXPECT_EQ(streams1.GetHandoverReadiness(), + HandoverReadinessStatus( + HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks)); + EXPECT_EQ(streams1.Add(tsn(3), gen_.Ordered({5, 6})), 2); + EXPECT_EQ(streams1.GetHandoverReadiness(), + HandoverReadinessStatus( + HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks)); + + ForwardTsnChunk::SkippedStream skipped[] = { + ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))}; + EXPECT_EQ(streams1.HandleForwardTsn(tsn(3), skipped), 6u); + EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady()); + + DcSctpSocketHandoverState state; + streams1.AddHandoverState(state); + TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(), + &state); + EXPECT_EQ(streams2.Add(tsn(4), gen_.Ordered({7})), 1); +} + +TEST_F(TraditionalReassemblyStreamsTest, + UnorderedStreamsCanBeHandedOverWhenNoUnassembledChunksExist) { + NiceMock> on_assembled; + + TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction()); + + EXPECT_EQ(streams1.Add(tsn(1), gen_.Unordered({1}, "B")), 1); + EXPECT_EQ( + streams1.GetHandoverReadiness(), + HandoverReadinessStatus( + HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks)); + EXPECT_EQ(streams1.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3); + EXPECT_EQ( + streams1.GetHandoverReadiness(), + HandoverReadinessStatus( + HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks)); + EXPECT_EQ(streams1.Add(tsn(3), gen_.Unordered({5, 6})), 2); + EXPECT_EQ( + streams1.GetHandoverReadiness(), + HandoverReadinessStatus( + HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks)); + + EXPECT_EQ(streams1.HandleForwardTsn(tsn(3), {}), 6u); + EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady()); + + DcSctpSocketHandoverState state; + streams1.AddHandoverState(state); + TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(), + &state); + EXPECT_EQ(streams2.Add(tsn(4), gen_.Unordered({7})), 1); +} + } // namespace } // namespace dcsctp