dcsctp: Remove deprecated delivery checks

With https://webrtc-review.googlesource.com/c/src/+/321603, the
responsibility to not ingest duplicate received chunks was moved from
the reassembly queue to the data tracker. But in that CL, we couldn't
remove updating the internal variables in the reassembly queue, because
those were included in the handover state. Now that time has passed,
we can remove this code altogether as nothing was ever reading from
these variables - only writing to them.

Bug: webrtc:14600
Change-Id: Icf958c75f74974be6cad7cd827cf49b3ab2f5412
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/329300
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41291}
This commit is contained in:
Victor Boivie 2023-11-30 13:06:03 +01:00 committed by WebRTC LUCI CQ
parent 24510d43dc
commit b506d68f2a
4 changed files with 14 additions and 80 deletions

View File

@ -786,5 +786,16 @@ TEST_F(DataTrackerTest, DoesNotAcceptGapsWithDuplicateData) {
EXPECT_FALSE(tracker_->Observe(TSN(12))); EXPECT_FALSE(tracker_->Observe(TSN(12)));
} }
TEST_F(DataTrackerTest, NotReadyForHandoverWhenHavingTsnGaps) {
tracker_->Observe(TSN(10));
tracker_->Observe(TSN(12));
EXPECT_EQ(tracker_->GetHandoverReadiness(),
HandoverReadinessStatus().Add(
HandoverUnreadinessReason::kDataTrackerTsnBlocksPending));
tracker_->Observe(TSN(11));
EXPECT_EQ(tracker_->GetHandoverReadiness(), HandoverReadinessStatus());
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp

View File

@ -57,8 +57,6 @@ ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
: log_prefix_(log_prefix), : log_prefix_(log_prefix),
max_size_bytes_(max_size_bytes), max_size_bytes_(max_size_bytes),
watermark_bytes_(max_size_bytes * kHighWatermarkLimit), watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
last_assembled_tsn_watermark_(
tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)), last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
streams_(CreateStreams( streams_(CreateStreams(
log_prefix_, log_prefix_,
@ -180,33 +178,9 @@ void ReassemblyQueue::AddReassembledMessage(
<< ", ppid=" << *message.ppid() << ", ppid=" << *message.ppid()
<< ", payload=" << message.payload().size() << " bytes"; << ", payload=" << message.payload().size() << " bytes";
for (const UnwrappedTSN tsn : tsns) {
if (tsn == last_assembled_tsn_watermark_.next_value()) {
// Update watermark, or insert into delivered_tsns_
last_assembled_tsn_watermark_.Increment();
} else {
delivered_tsns_.insert(tsn);
}
}
// With new TSNs in delivered_tsns, gaps might be filled.
MaybeMoveLastAssembledWatermarkFurther();
reassembled_messages_.emplace_back(std::move(message)); reassembled_messages_.emplace_back(std::move(message));
} }
void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
// `delivered_tsns_` contain TSNS when there is a gap between ranges of
// assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
// that list, because if so, it can be moved.
while (!delivered_tsns_.empty() &&
*delivered_tsns_.begin() ==
last_assembled_tsn_watermark_.next_value()) {
last_assembled_tsn_watermark_.Increment();
delivered_tsns_.erase(delivered_tsns_.begin());
}
}
void ReassemblyQueue::HandleForwardTsn( void ReassemblyQueue::HandleForwardTsn(
TSN new_cumulative_tsn, TSN new_cumulative_tsn,
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) { rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
@ -228,33 +202,19 @@ void ReassemblyQueue::HandleForwardTsn(
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
<< " - performing."; << " - performing.";
last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
delivered_tsns_.erase(delivered_tsns_.begin(),
delivered_tsns_.upper_bound(tsn));
MaybeMoveLastAssembledWatermarkFurther();
queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams); queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams);
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
} }
bool ReassemblyQueue::IsConsistent() const { bool ReassemblyQueue::IsConsistent() const {
// `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
// adjacent.
if (!delivered_tsns_.empty() &&
last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
return false;
}
// Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
// enforced in this class. This comparison will still trigger if queued_bytes_ // enforced in this class. But in case it wraps around (becomes negative, but
// became "negative". // as it's unsigned, that would wrap to very big), this would trigger.
return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_); return (queued_bytes_ <= 2 * max_size_bytes_);
} }
HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const { HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
HandoverReadinessStatus status = streams_->GetHandoverReadiness(); HandoverReadinessStatus status = streams_->GetHandoverReadiness();
if (!delivered_tsns_.empty()) {
status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
}
if (deferred_reset_streams_.has_value()) { if (deferred_reset_streams_.has_value()) {
status.Add(HandoverUnreadinessReason::kStreamResetDeferred); status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
} }
@ -262,7 +222,6 @@ HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
} }
void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) { void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
state.rx.last_completed_deferred_reset_req_sn = state.rx.last_completed_deferred_reset_req_sn =
last_completed_reset_req_seq_nbr_.value(); last_completed_reset_req_seq_nbr_.value();
streams_->AddHandoverState(state); streams_->AddHandoverState(state);
@ -272,8 +231,6 @@ void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
// Validate that the component is in pristine state. // Validate that the component is in pristine state.
RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0)); RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));
last_assembled_tsn_watermark_ =
tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
last_completed_reset_req_seq_nbr_ = last_completed_reset_req_seq_nbr_ =
ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn); ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
streams_->RestoreFromState(state); streams_->RestoreFromState(state);

View File

@ -142,19 +142,12 @@ class ReassemblyQueue {
bool IsConsistent() const; bool IsConsistent() const;
void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns, void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,
DcSctpMessage message); DcSctpMessage message);
void MaybeMoveLastAssembledWatermarkFurther();
const absl::string_view log_prefix_; const absl::string_view log_prefix_;
const size_t max_size_bytes_; const size_t max_size_bytes_;
const size_t watermark_bytes_; const size_t watermark_bytes_;
UnwrappedTSN::Unwrapper tsn_unwrapper_; UnwrappedTSN::Unwrapper tsn_unwrapper_;
// Whenever a message has been assembled, either increase
// `last_assembled_tsn_watermark_` or - if there are gaps - add the message's
// TSNs into delivered_tsns_ so that messages are not re-delivered on
// duplicate chunks.
UnwrappedTSN last_assembled_tsn_watermark_;
std::set<UnwrappedTSN> delivered_tsns_;
// Messages that have been reassembled, and will be returned by // Messages that have been reassembled, and will be returned by
// `FlushMessages`. // `FlushMessages`.
std::vector<DcSctpMessage> reassembled_messages_; std::vector<DcSctpMessage> reassembled_messages_;

View File

@ -252,33 +252,6 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) {
ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload))); ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
} }
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenDeliveredTsnsHaveGap) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B"));
EXPECT_FALSE(reasm.HasMessages());
reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
EXPECT_TRUE(reasm.HasMessages());
EXPECT_EQ(
reasm.GetHandoverReadiness(),
HandoverReadinessStatus()
.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)
.Add(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
EXPECT_THAT(reasm.FlushMessages(),
ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
EXPECT_EQ(
reasm.GetHandoverReadiness(),
HandoverReadinessStatus()
.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)
.Add(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
reasm.HandleForwardTsn(TSN(13), std::vector<SkippedStream>());
EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus());
}
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) { TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));