dcsctp: support socket handover in ReassemblyQueue

Bug: webrtc:13154
Change-Id: I816e51dcd923ba6440480de5d5df9012e4af9e5a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/231958
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Sergey Sukhanov <sergeysu@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35002}
This commit is contained in:
Sergey Sukhanov 2021-09-15 11:59:17 +02:00 committed by WebRTC LUCI CQ
parent 74281bed53
commit 9c1657cba8
5 changed files with 138 additions and 11 deletions

View File

@ -34,6 +34,8 @@ struct DcSctpSocketHandoverState {
struct Receive {
bool seen_packet = false;
uint32_t last_cumulative_acked_tsn = 0;
uint32_t last_assembled_tsn = 0;
uint32_t last_completed_deferred_reset_req_sn = 0;
std::vector<OrderedStream> ordered_streams;
std::vector<UnorderedStream> unordered_streams;
};
@ -46,7 +48,7 @@ enum class HandoverUnreadinessReason : uint32_t {
kSendQueueNotEmpty = 2,
kPendingStreamResetRequest = 4,
kDataTrackerTsnBlocksPending = 8,
kReassemblyQueueNotEmpty = 16,
kPendingStreamReset = 16,
kReassemblyQueueDeliveredTSNsGap = 32,
kStreamResetDeferred = 64,
kOrderedStreamHasUnassembledChunks = 128,
@ -54,8 +56,7 @@ enum class HandoverUnreadinessReason : uint32_t {
kRetransmissionQueueOutstandingData = 512,
kRetransmissionQueueFastRecovery = 1024,
kRetransmissionQueueNotEmpty = 2048,
kPendingStreamReset = 4096,
kMax = kPendingStreamReset,
kMax = kRetransmissionQueueNotEmpty,
};
// Return value of `DcSctpSocketInterface::GetHandoverReadiness`. Set of

View File

@ -81,6 +81,7 @@ rtc_library("reassembly_queue") {
"../packet:chunk",
"../packet:data",
"../packet:parameter",
"../public:socket",
"../public:types",
]
sources = [

View File

@ -34,20 +34,29 @@
#include "rtc_base/logging.h"
namespace dcsctp {
ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
ReassemblyQueue::ReassemblyQueue(
absl::string_view log_prefix,
TSN peer_initial_tsn,
size_t max_size_bytes)
size_t max_size_bytes,
const DcSctpSocketHandoverState* handover_state)
: log_prefix_(std::string(log_prefix) + "reasm: "),
max_size_bytes_(max_size_bytes),
watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
last_assembled_tsn_watermark_(
tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
last_assembled_tsn_watermark_(tsn_unwrapper_.Unwrap(
handover_state ? TSN(handover_state->rx.last_assembled_tsn)
: TSN(*peer_initial_tsn - 1))),
last_completed_reset_req_seq_nbr_(
handover_state
? ReconfigRequestSN(
handover_state->rx.last_completed_deferred_reset_req_sn)
: ReconfigRequestSN(0)),
streams_(std::make_unique<TraditionalReassemblyStreams>(
log_prefix_,
[this](rtc::ArrayView<const UnwrappedTSN> tsns,
DcSctpMessage message) {
AddReassembledMessage(tsns, std::move(message));
})) {}
},
handover_state)) {}
void ReassemblyQueue::Add(TSN tsn, Data data) {
RTC_DCHECK(IsConsistent());
@ -242,4 +251,22 @@ bool ReassemblyQueue::IsConsistent() const {
return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
}
HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
HandoverReadinessStatus status = streams_->GetHandoverReadiness();
if (!delivered_tsns_.empty()) {
status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
}
if (deferred_reset_streams_.has_value()) {
status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
}
return status;
}
void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
state.rx.last_completed_deferred_reset_req_sn =
last_completed_reset_req_seq_nbr_.value();
streams_->AddHandoverState(state);
}
} // namespace dcsctp

View File

@ -27,6 +27,7 @@
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_handover_state.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/rx/reassembly_streams.h"
@ -70,7 +71,8 @@ class ReassemblyQueue {
ReassemblyQueue(absl::string_view log_prefix,
TSN peer_initial_tsn,
size_t max_size_bytes);
size_t max_size_bytes,
const DcSctpSocketHandoverState* handover_state = nullptr);
// Adds a data chunk to the queue, with a `tsn` and other parameters in
// `data`.
@ -118,6 +120,10 @@ class ReassemblyQueue {
// Returns the watermark limit, in bytes.
size_t watermark_bytes() const { return watermark_bytes_; }
HandoverReadinessStatus GetHandoverReadiness() const;
void AddHandoverState(DcSctpSocketHandoverState& state);
private:
bool IsConsistent() const;
void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,
@ -150,7 +156,7 @@ class ReassemblyQueue {
// Contains the last request sequence number of the
// OutgoingSSNResetRequestParameter that was performed.
ReconfigRequestSN last_completed_reset_req_seq_nbr_ = ReconfigRequestSN(0);
ReconfigRequestSN last_completed_reset_req_seq_nbr_;
// The number of "payload bytes" that are in this queue, in total.
size_t queued_bytes_ = 0;

View File

@ -31,6 +31,7 @@
namespace dcsctp {
namespace {
using ::testing::ElementsAre;
using ::testing::SizeIs;
// The default maximum size of the Reassembly Queue.
static constexpr size_t kBufferSize = 10000;
@ -294,5 +295,96 @@ TEST_F(ReassemblyQueueTest, ShouldntDeliverBeforeForwardedTsn) {
EXPECT_FALSE(reasm.HasMessages());
}
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenDeliveredTsnsHaveGap) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B"));
EXPECT_FALSE(reasm.HasMessages());
reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
EXPECT_TRUE(reasm.HasMessages());
EXPECT_EQ(
reasm.GetHandoverReadiness(),
HandoverReadinessStatus()
.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)
.Add(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
EXPECT_THAT(reasm.FlushMessages(),
ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
EXPECT_EQ(
reasm.GetHandoverReadiness(),
HandoverReadinessStatus()
.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)
.Add(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
reasm.Handle(ForwardTsnChunk(TSN(13), {}));
EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus());
}
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
DataGeneratorOptions opts;
opts.message_id = MID(0);
reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
opts.message_id = MID(1);
reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
EXPECT_THAT(reasm.FlushMessages(), SizeIs(2));
reasm.ResetStreams(
OutgoingSSNResetRequestParameter(
ReconfigRequestSN(10), ReconfigRequestSN(3), TSN(13), {StreamID(1)}),
TSN(11));
EXPECT_EQ(reasm.GetHandoverReadiness(),
HandoverReadinessStatus().Add(
HandoverUnreadinessReason::kStreamResetDeferred));
opts.message_id = MID(3);
opts.ppid = PPID(3);
reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm.MaybeResetStreamsDeferred(TSN(11));
opts.message_id = MID(2);
opts.ppid = PPID(2);
reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm.MaybeResetStreamsDeferred(TSN(15));
EXPECT_EQ(reasm.GetHandoverReadiness(),
HandoverReadinessStatus().Add(
HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap));
EXPECT_THAT(reasm.FlushMessages(), SizeIs(2));
EXPECT_EQ(reasm.GetHandoverReadiness(),
HandoverReadinessStatus().Add(
HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap));
reasm.Handle(ForwardTsnChunk(TSN(15), {}));
EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus());
}
TEST_F(ReassemblyQueueTest, HandoverInInitialState) {
ReassemblyQueue reasm1("log: ", TSN(10), kBufferSize);
EXPECT_EQ(reasm1.GetHandoverReadiness(), HandoverReadinessStatus());
DcSctpSocketHandoverState state;
reasm1.AddHandoverState(state);
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state);
reasm2.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
}
TEST_F(ReassemblyQueueTest, HandoverAfterHavingAssembedOneMessage) {
ReassemblyQueue reasm1("log: ", TSN(10), kBufferSize);
reasm1.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm1.FlushMessages(), SizeIs(1));
EXPECT_EQ(reasm1.GetHandoverReadiness(), HandoverReadinessStatus());
DcSctpSocketHandoverState state;
reasm1.AddHandoverState(state);
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state);
reasm2.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
}
} // namespace
} // namespace dcsctp