dcsctp: Remove initial TSN from reassembly queue

With a previous refactoring, which made the data tracker responsible for
ensuring that the reassembly queue doesn't see any duplicate received
chunks, it no longer needs to know the initial peer's TSN. Removing.

Bug: None
Change-Id: I0e2aef1de0293f1860b46dee0089757c9c300aea
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/345701
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41997}
This commit is contained in:
Victor Boivie 2024-04-04 12:57:24 +02:00 committed by WebRTC LUCI CQ
parent 4f244d0808
commit de276cf049
5 changed files with 20 additions and 26 deletions

View File

@ -51,7 +51,6 @@ std::unique_ptr<ReassemblyStreams> CreateStreams(
} // namespace } // namespace
ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix, ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
TSN peer_initial_tsn,
size_t max_size_bytes, size_t max_size_bytes,
bool use_message_interleaving) bool use_message_interleaving)
: log_prefix_(log_prefix), : log_prefix_(log_prefix),

View File

@ -72,7 +72,6 @@ class ReassemblyQueue {
static constexpr float kHighWatermarkLimit = 0.9; static constexpr float kHighWatermarkLimit = 0.9;
ReassemblyQueue(absl::string_view log_prefix, ReassemblyQueue(absl::string_view log_prefix,
TSN peer_initial_tsn,
size_t max_size_bytes, size_t max_size_bytes,
bool use_message_interleaving = false); bool use_message_interleaving = false);

View File

@ -81,13 +81,13 @@ class ReassemblyQueueTest : public testing::Test {
}; };
TEST_F(ReassemblyQueueTest, EmptyQueue) { TEST_F(ReassemblyQueueTest, EmptyQueue) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
EXPECT_FALSE(reasm.HasMessages()); EXPECT_FALSE(reasm.HasMessages());
EXPECT_EQ(reasm.queued_bytes(), 0u); EXPECT_EQ(reasm.queued_bytes(), 0u);
} }
TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessage) { TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessage) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE")); reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
EXPECT_TRUE(reasm.HasMessages()); EXPECT_TRUE(reasm.HasMessages());
EXPECT_THAT(reasm.FlushMessages(), EXPECT_THAT(reasm.FlushMessages(),
@ -99,7 +99,7 @@ TEST_F(ReassemblyQueueTest, LargeUnorderedChunkAllPermutations) {
std::vector<uint32_t> tsns = {10, 11, 12, 13}; std::vector<uint32_t> tsns = {10, 11, 12, 13};
rtc::ArrayView<const uint8_t> payload(kLongPayload); rtc::ArrayView<const uint8_t> payload(kLongPayload);
do { do {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
for (size_t i = 0; i < tsns.size(); i++) { for (size_t i = 0; i < tsns.size(); i++) {
auto span = payload.subview((tsns[i] - 10) * 4, 4); auto span = payload.subview((tsns[i] - 10) * 4, 4);
@ -123,7 +123,7 @@ TEST_F(ReassemblyQueueTest, LargeUnorderedChunkAllPermutations) {
} }
TEST_F(ReassemblyQueueTest, SingleOrderedChunkMessage) { TEST_F(ReassemblyQueueTest, SingleOrderedChunkMessage) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE")); reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_EQ(reasm.queued_bytes(), 0u); EXPECT_EQ(reasm.queued_bytes(), 0u);
EXPECT_TRUE(reasm.HasMessages()); EXPECT_TRUE(reasm.HasMessages());
@ -135,7 +135,7 @@ TEST_F(ReassemblyQueueTest, ManySmallOrderedMessages) {
std::vector<uint32_t> tsns = {10, 11, 12, 13}; std::vector<uint32_t> tsns = {10, 11, 12, 13};
rtc::ArrayView<const uint8_t> payload(kLongPayload); rtc::ArrayView<const uint8_t> payload(kLongPayload);
do { do {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
for (size_t i = 0; i < tsns.size(); i++) { for (size_t i = 0; i < tsns.size(); i++) {
auto span = payload.subview((tsns[i] - 10) * 4, 4); auto span = payload.subview((tsns[i] - 10) * 4, 4);
Data::IsBeginning is_beginning(true); Data::IsBeginning is_beginning(true);
@ -158,7 +158,7 @@ TEST_F(ReassemblyQueueTest, ManySmallOrderedMessages) {
} }
TEST_F(ReassemblyQueueTest, RetransmissionInLargeOrdered) { TEST_F(ReassemblyQueueTest, RetransmissionInLargeOrdered) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
reasm.Add(TSN(10), gen_.Ordered({1}, "B")); reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
reasm.Add(TSN(12), gen_.Ordered({3})); reasm.Add(TSN(12), gen_.Ordered({3}));
reasm.Add(TSN(13), gen_.Ordered({4})); reasm.Add(TSN(13), gen_.Ordered({4}));
@ -183,7 +183,7 @@ TEST_F(ReassemblyQueueTest, RetransmissionInLargeOrdered) {
} }
TEST_F(ReassemblyQueueTest, ForwardTSNRemoveUnordered) { TEST_F(ReassemblyQueueTest, ForwardTSNRemoveUnordered) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
reasm.Add(TSN(10), gen_.Unordered({1}, "B")); reasm.Add(TSN(10), gen_.Unordered({1}, "B"));
reasm.Add(TSN(12), gen_.Unordered({3})); reasm.Add(TSN(12), gen_.Unordered({3}));
reasm.Add(TSN(13), gen_.Unordered({4}, "E")); reasm.Add(TSN(13), gen_.Unordered({4}, "E"));
@ -205,7 +205,7 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveUnordered) {
} }
TEST_F(ReassemblyQueueTest, ForwardTSNRemoveOrdered) { TEST_F(ReassemblyQueueTest, ForwardTSNRemoveOrdered) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
reasm.Add(TSN(10), gen_.Ordered({1}, "B")); reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
reasm.Add(TSN(12), gen_.Ordered({3})); reasm.Add(TSN(12), gen_.Ordered({3}));
reasm.Add(TSN(13), gen_.Ordered({4}, "E")); reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
@ -229,7 +229,7 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveOrdered) {
} }
TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) { TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
reasm.Add(TSN(10), gen_.Ordered({1}, "B")); reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
reasm.Add(TSN(12), gen_.Ordered({3})); reasm.Add(TSN(12), gen_.Ordered({3}));
reasm.Add(TSN(13), gen_.Ordered({4}, "E")); reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
@ -253,7 +253,7 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) {
} }
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) { TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", kBufferSize);
reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));
reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)}));
EXPECT_THAT(reasm.FlushMessages(), SizeIs(2)); EXPECT_THAT(reasm.FlushMessages(), SizeIs(2));
@ -270,13 +270,13 @@ TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
} }
TEST_F(ReassemblyQueueTest, HandoverInInitialState) { TEST_F(ReassemblyQueueTest, HandoverInInitialState) {
ReassemblyQueue reasm1("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm1("log: ", kBufferSize);
EXPECT_EQ(reasm1.GetHandoverReadiness(), HandoverReadinessStatus()); EXPECT_EQ(reasm1.GetHandoverReadiness(), HandoverReadinessStatus());
DcSctpSocketHandoverState state; DcSctpSocketHandoverState state;
reasm1.AddHandoverState(state); reasm1.AddHandoverState(state);
g_handover_state_transformer_for_test(&state); g_handover_state_transformer_for_test(&state);
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, ReassemblyQueue reasm2("log: ", kBufferSize,
/*use_message_interleaving=*/false); /*use_message_interleaving=*/false);
reasm2.RestoreFromState(state); reasm2.RestoreFromState(state);
@ -285,7 +285,7 @@ TEST_F(ReassemblyQueueTest, HandoverInInitialState) {
} }
TEST_F(ReassemblyQueueTest, HandoverAfterHavingAssembedOneMessage) { TEST_F(ReassemblyQueueTest, HandoverAfterHavingAssembedOneMessage) {
ReassemblyQueue reasm1("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm1("log: ", kBufferSize);
reasm1.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE")); reasm1.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm1.FlushMessages(), SizeIs(1)); EXPECT_THAT(reasm1.FlushMessages(), SizeIs(1));
@ -293,7 +293,7 @@ TEST_F(ReassemblyQueueTest, HandoverAfterHavingAssembedOneMessage) {
DcSctpSocketHandoverState state; DcSctpSocketHandoverState state;
reasm1.AddHandoverState(state); reasm1.AddHandoverState(state);
g_handover_state_transformer_for_test(&state); g_handover_state_transformer_for_test(&state);
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, ReassemblyQueue reasm2("log: ", kBufferSize,
/*use_message_interleaving=*/false); /*use_message_interleaving=*/false);
reasm2.RestoreFromState(state); reasm2.RestoreFromState(state);
@ -302,7 +302,7 @@ TEST_F(ReassemblyQueueTest, HandoverAfterHavingAssembedOneMessage) {
} }
TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessageInRfc8260) { TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessageInRfc8260) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, ReassemblyQueue reasm("log: ", kBufferSize,
/*use_message_interleaving=*/true); /*use_message_interleaving=*/true);
reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID, reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID,
{1, 2, 3, 4}, Data::IsBeginning(true), {1, 2, 3, 4}, Data::IsBeginning(true),
@ -314,7 +314,7 @@ TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessageInRfc8260) {
} }
TEST_F(ReassemblyQueueTest, TwoInterleavedChunks) { TEST_F(ReassemblyQueueTest, TwoInterleavedChunks) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, ReassemblyQueue reasm("log: ", kBufferSize,
/*use_message_interleaving=*/true); /*use_message_interleaving=*/true);
reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID, reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID,
{1, 2, 3, 4}, Data::IsBeginning(true), {1, 2, 3, 4}, Data::IsBeginning(true),
@ -345,7 +345,7 @@ TEST_F(ReassemblyQueueTest, UnorderedInterleavedMessagesAllPermutations) {
FSN fsns[] = {FSN(0), FSN(0), FSN(1), FSN(2), FSN(1), FSN(2)}; FSN fsns[] = {FSN(0), FSN(0), FSN(1), FSN(2), FSN(1), FSN(2)};
rtc::ArrayView<const uint8_t> payload(kSixBytePayload); rtc::ArrayView<const uint8_t> payload(kSixBytePayload);
do { do {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, ReassemblyQueue reasm("log: ", kBufferSize,
/*use_message_interleaving=*/true); /*use_message_interleaving=*/true);
for (int i : indexes) { for (int i : indexes) {
auto span = payload.subview(*fsns[i] * 2, 2); auto span = payload.subview(*fsns[i] * 2, 2);
@ -365,7 +365,7 @@ TEST_F(ReassemblyQueueTest, UnorderedInterleavedMessagesAllPermutations) {
} }
TEST_F(ReassemblyQueueTest, IForwardTSNRemoveALotOrdered) { TEST_F(ReassemblyQueueTest, IForwardTSNRemoveALotOrdered) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize, ReassemblyQueue reasm("log: ", kBufferSize,
/*use_message_interleaving=*/true); /*use_message_interleaving=*/true);
reasm.Add(TSN(10), gen_.Ordered({1}, "B")); reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
gen_.Ordered({2}, ""); gen_.Ordered({2}, "");

View File

@ -107,9 +107,7 @@ class StreamResetHandlerTest : public testing::Test {
data_tracker_(std::make_unique<DataTracker>("log: ", data_tracker_(std::make_unique<DataTracker>("log: ",
delayed_ack_timer_.get(), delayed_ack_timer_.get(),
kPeerInitialTsn)), kPeerInitialTsn)),
reasm_(std::make_unique<ReassemblyQueue>("log: ", reasm_(std::make_unique<ReassemblyQueue>("log: ", kArwnd)),
kPeerInitialTsn,
kArwnd)),
retransmission_queue_(std::make_unique<RetransmissionQueue>( retransmission_queue_(std::make_unique<RetransmissionQueue>(
"", "",
&callbacks_, &callbacks_,
@ -201,8 +199,7 @@ class StreamResetHandlerTest : public testing::Test {
data_tracker_ = std::make_unique<DataTracker>( data_tracker_ = std::make_unique<DataTracker>(
"log: ", delayed_ack_timer_.get(), kPeerInitialTsn); "log: ", delayed_ack_timer_.get(), kPeerInitialTsn);
data_tracker_->RestoreFromState(state); data_tracker_->RestoreFromState(state);
reasm_ = reasm_ = std::make_unique<ReassemblyQueue>("log: ", kArwnd);
std::make_unique<ReassemblyQueue>("log: ", kPeerInitialTsn, kArwnd);
reasm_->RestoreFromState(state); reasm_->RestoreFromState(state);
retransmission_queue_ = std::make_unique<RetransmissionQueue>( retransmission_queue_ = std::make_unique<RetransmissionQueue>(
"", &callbacks_, kMyInitialTsn, kArwnd, producer_, [](TimeDelta rtt) {}, "", &callbacks_, kMyInitialTsn, kArwnd, producer_, [](TimeDelta rtt) {},

View File

@ -90,7 +90,6 @@ TransmissionControlBlock::TransmissionControlBlock(
tx_error_counter_(log_prefix, options), tx_error_counter_(log_prefix, options),
data_tracker_(log_prefix, delayed_ack_timer_.get(), peer_initial_tsn), data_tracker_(log_prefix, delayed_ack_timer_.get(), peer_initial_tsn),
reassembly_queue_(log_prefix, reassembly_queue_(log_prefix,
peer_initial_tsn,
options.max_receiver_window_buffer_size, options.max_receiver_window_buffer_size,
capabilities.message_interleaving), capabilities.message_interleaving),
retransmission_queue_( retransmission_queue_(