diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc index 7183323642..36ade9230a 100644 --- a/net/dcsctp/rx/reassembly_queue.cc +++ b/net/dcsctp/rx/reassembly_queue.cc @@ -221,14 +221,21 @@ void ReassemblyQueue::AddReassembledMessage( } // With new TSNs in delivered_tsns, gaps might be filled. + MaybeMoveLastAssembledWatermarkFurther(); + + reassembled_messages_.emplace_back(std::move(message)); +} + +void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() { + // `delivered_tsns_` contain TSNS when there is a gap between ranges of + // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to + // that list, because if so, it can be moved. while (!delivered_tsns_.empty() && *delivered_tsns_.begin() == last_assembled_tsn_watermark_.next_value()) { last_assembled_tsn_watermark_.Increment(); delivered_tsns_.erase(delivered_tsns_.begin()); } - - reassembled_messages_.emplace_back(std::move(message)); } void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) { @@ -239,12 +246,21 @@ void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) { delivered_tsns_.erase(delivered_tsns_.begin(), delivered_tsns_.upper_bound(tsn)); + MaybeMoveLastAssembledWatermarkFurther(); + queued_bytes_ -= streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams()); RTC_DCHECK(IsConsistent()); } bool ReassemblyQueue::IsConsistent() const { + // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be + // adjacent. + if (!delivered_tsns_.empty() && + last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) { + return false; + } + // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively // enforced in this class. This comparison will still trigger if queued_bytes_ // became "negative". diff --git a/net/dcsctp/rx/reassembly_queue.h b/net/dcsctp/rx/reassembly_queue.h index 2fa11e3f75..9cc0c61eb6 100644 --- a/net/dcsctp/rx/reassembly_queue.h +++ b/net/dcsctp/rx/reassembly_queue.h @@ -128,6 +128,7 @@ class ReassemblyQueue { bool IsConsistent() const; void AddReassembledMessage(rtc::ArrayView tsns, DcSctpMessage message); + void MaybeMoveLastAssembledWatermarkFurther(); struct DeferredResetStreams { explicit DeferredResetStreams(OutgoingSSNResetRequestParameter req)