From 042e57deea9d4c7ad857d7cebabd087b1725efb9 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 13 Dec 2023 09:39:27 -0500 Subject: [PATCH] Add a fastpath for message reassembly that avoids map insertion Change-Id: I50204915f4857f22e091fb9d88b4111e40d64227 Bug: webrtc:15724 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/331340 Commit-Queue: Daniel Collins Reviewed-by: Victor Boivie Cr-Commit-Position: refs/heads/main@{#41379} --- .../rx/traditional_reassembly_streams.cc | 57 ++++++++++++++----- .../rx/traditional_reassembly_streams.h | 6 ++ 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/net/dcsctp/rx/traditional_reassembly_streams.cc b/net/dcsctp/rx/traditional_reassembly_streams.cc index dce6c90131..c94691f0db 100644 --- a/net/dcsctp/rx/traditional_reassembly_streams.cc +++ b/net/dcsctp/rx/traditional_reassembly_streams.cc @@ -86,6 +86,11 @@ TraditionalReassemblyStreams::TraditionalReassemblyStreams( int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn, Data data) { + if (data.is_beginning && data.is_end) { + // Fastpath for already assembled chunks. + AssembleMessage(tsn, std::move(data)); + return 0; + } int queued_bytes = data.size(); auto [it, inserted] = chunks_.emplace(tsn, std::move(data)); if (!inserted) { @@ -124,12 +129,7 @@ size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage( if (count == 1) { // Fast path - zero-copy - const Data& data = start->second; - size_t payload_size = start->second.size(); - UnwrappedTSN tsns[1] = {start->first}; - DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload)); - parent_.on_assembled_message_(tsns, std::move(message)); - return payload_size; + return AssembleMessage(start->first, std::move(start->second)); } // Slow path - will need to concatenate the payload. @@ -155,6 +155,17 @@ size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage( return payload_size; } +size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage( + UnwrappedTSN tsn, + Data data) { + // Fast path - zero-copy + size_t payload_size = data.size(); + UnwrappedTSN tsns[1] = {tsn}; + DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload)); + parent_.on_assembled_message_(tsns, std::move(message)); + return payload_size; +} + size_t TraditionalReassemblyStreams::UnorderedStream::EraseTo( UnwrappedTSN tsn) { auto end_iter = chunks_.upper_bound(tsn); @@ -202,20 +213,40 @@ size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessages() { return assembled_bytes; } +size_t +TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessagesFastpath( + UnwrappedSSN ssn, + UnwrappedTSN tsn, + Data data) { + RTC_DCHECK(ssn == next_ssn_); + size_t assembled_bytes = 0; + if (data.is_beginning && data.is_end) { + assembled_bytes += AssembleMessage(tsn, std::move(data)); + next_ssn_.Increment(); + } else { + size_t queued_bytes = data.size(); + auto [iter, inserted] = chunks_by_ssn_[ssn].emplace(tsn, std::move(data)); + if (!inserted) { + // Not actually assembled, but deduplicated meaning queued size doesn't + // include this message. + return queued_bytes; + } + } + return assembled_bytes + TryToAssembleMessages(); +} + int TraditionalReassemblyStreams::OrderedStream::Add(UnwrappedTSN tsn, Data data) { int queued_bytes = data.size(); - UnwrappedSSN ssn = ssn_unwrapper_.Unwrap(data.ssn); - auto [unused, inserted] = chunks_by_ssn_[ssn].emplace(tsn, std::move(data)); + if (ssn == next_ssn_) { + return queued_bytes - + TryToAssembleMessagesFastpath(ssn, tsn, std::move(data)); + } + auto [iter, inserted] = chunks_by_ssn_[ssn].emplace(tsn, std::move(data)); if (!inserted) { return 0; } - - if (ssn == next_ssn_) { - queued_bytes -= TryToAssembleMessages(); - } - return queued_bytes; } diff --git a/net/dcsctp/rx/traditional_reassembly_streams.h b/net/dcsctp/rx/traditional_reassembly_streams.h index d355c599ae..9214a9bc9a 100644 --- a/net/dcsctp/rx/traditional_reassembly_streams.h +++ b/net/dcsctp/rx/traditional_reassembly_streams.h @@ -55,6 +55,7 @@ class TraditionalReassemblyStreams : public ReassemblyStreams { : parent_(*parent) {} size_t AssembleMessage(ChunkMap::iterator start, ChunkMap::iterator end); + size_t AssembleMessage(UnwrappedTSN tsn, Data data); TraditionalReassemblyStreams& parent_; }; @@ -101,6 +102,11 @@ class TraditionalReassemblyStreams : public ReassemblyStreams { // Returns the number of bytes assembled if a message was assembled. size_t TryToAssembleMessage(); size_t TryToAssembleMessages(); + // Same as above but when inserting the first complete message avoid + // insertion into the map. + size_t TryToAssembleMessagesFastpath(UnwrappedSSN ssn, + UnwrappedTSN tsn, + Data data); // This must be an ordered container to be able to iterate in SSN order. std::map chunks_by_ssn_; UnwrappedSSN::Unwrapper ssn_unwrapper_;