dcsctp: introduce handover API types and implement it for streams

Bug: webrtc:13154
Change-Id: Ifa250175af79b7adc87dbc2750054adc94b90bb7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/231842
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Sergey Sukhanov <sergeysu@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#34991}
This commit is contained in:
Sergey Sukhanov 2021-09-14 13:59:55 +02:00 committed by WebRTC LUCI CQ
parent d0321c5e5a
commit ad6b7a733a
7 changed files with 251 additions and 5 deletions

View File

@ -27,6 +27,7 @@ rtc_source_set("types") {
rtc_source_set("socket") { rtc_source_set("socket") {
deps = [ deps = [
":strong_alias",
":types", ":types",
"../../../api:array_view", "../../../api:array_view",
"../../../rtc_base", "../../../rtc_base",
@ -34,6 +35,7 @@ rtc_source_set("socket") {
"../../../rtc_base:rtc_base_approved", "../../../rtc_base:rtc_base_approved",
] ]
sources = [ sources = [
"dcsctp_handover_state.h",
"dcsctp_socket.h", "dcsctp_socket.h",
"packet_observer.h", "packet_observer.h",
"timeout.h", "timeout.h",

View File

@ -0,0 +1,92 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_
#define NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_
#include <cstdint>
#include <string>
#include <vector>
#include "net/dcsctp/public/strong_alias.h"
namespace dcsctp {
// Stores state snapshot of a dcSCTP socket. The snapshot can be used to
// recreate the socket - possibly in another process. This state should be
// treaded as opaque - the calling client should not inspect or alter it except
// for serialization. Serialization is not provided by dcSCTP. If needed it has
// to be implemented in the calling client.
struct DcSctpSocketHandoverState {
struct OrderedStream {
uint32_t id = 0;
uint32_t next_ssn = 0;
};
struct UnorderedStream {
uint32_t id = 0;
};
struct Receive {
std::vector<OrderedStream> ordered_streams;
std::vector<UnorderedStream> unordered_streams;
};
Receive rx;
};
// A list of possible reasons for a socket to be not ready for handover.
enum class HandoverUnreadinessReason : uint32_t {
kWrongConnectionState = 1,
kSendQueueNotEmpty = 2,
kDataTrackerNotIdle = 4,
kDataTrackerTsnBlocksPending = 8,
kReassemblyQueueNotEmpty = 16,
kReassemblyQueueDeliveredTSNsGap = 32,
kStreamResetDeferred = 64,
kOrderedStreamHasUnassembledChunks = 128,
kUnorderedStreamHasUnassembledChunks = 256,
kRetransmissionQueueOutstandingData = 512,
kRetransmissionQueueFastRecovery = 1024,
kRetransmissionQueueNotEmpty = 2048,
kPendingStreamReset = 4096,
kPendingStreamResetRequest = 8192,
kMax = kPendingStreamResetRequest,
};
// Return value of `DcSctpSocketInterface::GetHandoverReadiness`. Set of
// `HandoverUnreadinessReason` bits. When no bit is set, the socket is in the
// state in which a snapshot of the state can be made by
// `GetHandoverStateAndClose()`.
class HandoverReadinessStatus
: public StrongAlias<class HandoverReadinessStatusTag, uint32_t> {
public:
// Constructs an empty `HandoverReadinessStatus` which represents ready state.
constexpr HandoverReadinessStatus()
: StrongAlias<class HandoverReadinessStatusTag, uint32_t>(0) {}
// Constructs status object that contains a single reason for not being
// handover ready.
constexpr explicit HandoverReadinessStatus(HandoverUnreadinessReason reason)
: StrongAlias<class HandoverReadinessStatusTag, uint32_t>(
static_cast<uint32_t>(reason)) {}
// Convenience methods
constexpr bool IsReady() const { return value() == 0; }
constexpr bool Contains(HandoverUnreadinessReason reason) const {
return value() & static_cast<uint32_t>(reason);
}
HandoverReadinessStatus& Add(HandoverUnreadinessReason reason) {
return Add(HandoverReadinessStatus(reason));
}
HandoverReadinessStatus& Add(HandoverReadinessStatus status) {
value() |= status.value();
return *this;
}
};
} // namespace dcsctp
#endif // NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_

View File

@ -36,6 +36,7 @@ rtc_source_set("reassembly_streams") {
"../common:sequence_numbers", "../common:sequence_numbers",
"../packet:chunk", "../packet:chunk",
"../packet:data", "../packet:data",
"../public:socket",
"../public:types", "../public:types",
] ]
sources = [ "reassembly_streams.h" ] sources = [ "reassembly_streams.h" ]

View File

@ -21,6 +21,7 @@
#include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h" #include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_handover_state.h"
#include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_message.h"
namespace dcsctp { namespace dcsctp {
@ -77,6 +78,9 @@ class ReassemblyStreams {
// either a few streams, or all streams (when the list is empty) to be // either a few streams, or all streams (when the list is empty) to be
// reset - to have their next SSN or Message ID to be zero. // reset - to have their next SSN or Message ID to be zero.
virtual void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) = 0; virtual void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) = 0;
virtual HandoverReadinessStatus GetHandoverReadiness() const = 0;
virtual void AddHandoverState(DcSctpSocketHandoverState& state) = 0;
}; };
} // namespace dcsctp } // namespace dcsctp

View File

@ -78,6 +78,30 @@ absl::optional<std::map<UnwrappedTSN, Data>::iterator> FindEnd(
} }
} // namespace } // namespace
TraditionalReassemblyStreams::TraditionalReassemblyStreams(
absl::string_view log_prefix,
OnAssembledMessage on_assembled_message,
const DcSctpSocketHandoverState* handover_state)
: log_prefix_(log_prefix),
on_assembled_message_(std::move(on_assembled_message)) {
if (handover_state) {
for (const DcSctpSocketHandoverState::OrderedStream& state_stream :
handover_state->rx.ordered_streams) {
ordered_streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(StreamID(state_stream.id)),
std::forward_as_tuple(this, SSN(state_stream.next_ssn)));
}
for (const DcSctpSocketHandoverState::UnorderedStream& state_stream :
handover_state->rx.unordered_streams) {
unordered_streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(StreamID(state_stream.id)),
std::forward_as_tuple(this));
}
}
}
int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn, int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn,
Data data) { Data data) {
int queued_bytes = data.size(); int queued_bytes = data.size();
@ -286,4 +310,39 @@ void TraditionalReassemblyStreams::ResetStreams(
} }
} }
} }
HandoverReadinessStatus TraditionalReassemblyStreams::GetHandoverReadiness()
const {
HandoverReadinessStatus status;
for (const auto& entry : ordered_streams_) {
if (entry.second.has_unassembled_chunks()) {
status.Add(HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
break;
}
}
for (const auto& entry : unordered_streams_) {
if (entry.second.has_unassembled_chunks()) {
status.Add(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks);
break;
}
}
return status;
}
void TraditionalReassemblyStreams::AddHandoverState(
DcSctpSocketHandoverState& state) {
for (const auto& entry : ordered_streams_) {
DcSctpSocketHandoverState::OrderedStream state_stream;
state_stream.id = entry.first.value();
state_stream.next_ssn = entry.second.next_ssn().value();
state.rx.ordered_streams.push_back(std::move(state_stream));
}
for (const auto& entry : unordered_streams_) {
DcSctpSocketHandoverState::UnorderedStream state_stream;
state_stream.id = entry.first.value();
state.rx.unordered_streams.push_back(std::move(state_stream));
}
}
} // namespace dcsctp } // namespace dcsctp

View File

@ -29,9 +29,10 @@ namespace dcsctp {
// RFC4960 is to be followed. // RFC4960 is to be followed.
class TraditionalReassemblyStreams : public ReassemblyStreams { class TraditionalReassemblyStreams : public ReassemblyStreams {
public: public:
TraditionalReassemblyStreams(absl::string_view log_prefix, TraditionalReassemblyStreams(
OnAssembledMessage on_assembled_message) absl::string_view log_prefix,
: log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {} OnAssembledMessage on_assembled_message,
const DcSctpSocketHandoverState* handover_state = nullptr);
int Add(UnwrappedTSN tsn, Data data) override; int Add(UnwrappedTSN tsn, Data data) override;
@ -42,6 +43,9 @@ class TraditionalReassemblyStreams : public ReassemblyStreams {
void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override; void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override;
HandoverReadinessStatus GetHandoverReadiness() const override;
void AddHandoverState(DcSctpSocketHandoverState& state) override;
private: private:
using ChunkMap = std::map<UnwrappedTSN, Data>; using ChunkMap = std::map<UnwrappedTSN, Data>;
@ -65,6 +69,7 @@ class TraditionalReassemblyStreams : public ReassemblyStreams {
int Add(UnwrappedTSN tsn, Data data); int Add(UnwrappedTSN tsn, Data data);
// Returns the number of bytes removed from the queue. // Returns the number of bytes removed from the queue.
size_t EraseTo(UnwrappedTSN tsn); size_t EraseTo(UnwrappedTSN tsn);
bool has_unassembled_chunks() const { return !chunks_.empty(); }
private: private:
// Given an iterator to any chunk within the map, try to assemble a message // Given an iterator to any chunk within the map, try to assemble a message
@ -81,14 +86,17 @@ class TraditionalReassemblyStreams : public ReassemblyStreams {
// messages when possible. // messages when possible.
class OrderedStream : StreamBase { class OrderedStream : StreamBase {
public: public:
explicit OrderedStream(TraditionalReassemblyStreams* parent) explicit OrderedStream(TraditionalReassemblyStreams* parent,
: StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(SSN(0))) {} SSN next_ssn = SSN(0))
: StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(next_ssn)) {}
int Add(UnwrappedTSN tsn, Data data); int Add(UnwrappedTSN tsn, Data data);
size_t EraseTo(SSN ssn); size_t EraseTo(SSN ssn);
void Reset() { void Reset() {
ssn_unwrapper_.Reset(); ssn_unwrapper_.Reset();
next_ssn_ = ssn_unwrapper_.Unwrap(SSN(0)); next_ssn_ = ssn_unwrapper_.Unwrap(SSN(0));
} }
SSN next_ssn() const { return next_ssn_.Wrap(); }
bool has_unassembled_chunks() const { return !chunks_by_ssn_.empty(); }
private: private:
// Try to assemble one or several messages in order from the stream. // Try to assemble one or several messages in order from the stream.

View File

@ -148,5 +148,85 @@ TEST_F(TraditionalReassemblyStreamsTest,
EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u); EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
} }
TEST_F(TraditionalReassemblyStreamsTest, NoStreamsCanBeHandedOver) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction());
EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady());
DcSctpSocketHandoverState state;
streams1.AddHandoverState(state);
TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(),
&state);
EXPECT_EQ(streams2.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
EXPECT_EQ(streams2.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
EXPECT_EQ(streams2.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
EXPECT_EQ(streams2.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
}
TEST_F(TraditionalReassemblyStreamsTest,
OrderedStreamsCanBeHandedOverWhenNoUnassembledChunksExist) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction());
EXPECT_EQ(streams1.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
EXPECT_EQ(streams1.GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks));
EXPECT_EQ(streams1.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
EXPECT_EQ(streams1.GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks));
EXPECT_EQ(streams1.Add(tsn(3), gen_.Ordered({5, 6})), 2);
EXPECT_EQ(streams1.GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks));
ForwardTsnChunk::SkippedStream skipped[] = {
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
EXPECT_EQ(streams1.HandleForwardTsn(tsn(3), skipped), 6u);
EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady());
DcSctpSocketHandoverState state;
streams1.AddHandoverState(state);
TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(),
&state);
EXPECT_EQ(streams2.Add(tsn(4), gen_.Ordered({7})), 1);
}
TEST_F(TraditionalReassemblyStreamsTest,
UnorderedStreamsCanBeHandedOverWhenNoUnassembledChunksExist) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction());
EXPECT_EQ(streams1.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
EXPECT_EQ(
streams1.GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
EXPECT_EQ(streams1.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
EXPECT_EQ(
streams1.GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
EXPECT_EQ(streams1.Add(tsn(3), gen_.Unordered({5, 6})), 2);
EXPECT_EQ(
streams1.GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
EXPECT_EQ(streams1.HandleForwardTsn(tsn(3), {}), 6u);
EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady());
DcSctpSocketHandoverState state;
streams1.AddHandoverState(state);
TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(),
&state);
EXPECT_EQ(streams2.Add(tsn(4), gen_.Unordered({7})), 1);
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp