Add a fastpath for message reassembly that avoids map insertion

Change-Id: I50204915f4857f22e091fb9d88b4111e40d64227
Bug: webrtc:15724
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/331340
Commit-Queue: Daniel Collins <dpcollins@google.com>
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41379}
This commit is contained in:
Daniel Collins 2023-12-13 09:39:27 -05:00 committed by WebRTC LUCI CQ
parent 336fb4faf4
commit 042e57deea
2 changed files with 50 additions and 13 deletions

View File

@ -86,6 +86,11 @@ TraditionalReassemblyStreams::TraditionalReassemblyStreams(
int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn, int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn,
Data data) { Data data) {
if (data.is_beginning && data.is_end) {
// Fastpath for already assembled chunks.
AssembleMessage(tsn, std::move(data));
return 0;
}
int queued_bytes = data.size(); int queued_bytes = data.size();
auto [it, inserted] = chunks_.emplace(tsn, std::move(data)); auto [it, inserted] = chunks_.emplace(tsn, std::move(data));
if (!inserted) { if (!inserted) {
@ -124,12 +129,7 @@ size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage(
if (count == 1) { if (count == 1) {
// Fast path - zero-copy // Fast path - zero-copy
const Data& data = start->second; return AssembleMessage(start->first, std::move(start->second));
size_t payload_size = start->second.size();
UnwrappedTSN tsns[1] = {start->first};
DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;
} }
// Slow path - will need to concatenate the payload. // Slow path - will need to concatenate the payload.
@ -155,6 +155,17 @@ size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage(
return payload_size; return payload_size;
} }
size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage(
UnwrappedTSN tsn,
Data data) {
// Fast path - zero-copy
size_t payload_size = data.size();
UnwrappedTSN tsns[1] = {tsn};
DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;
}
size_t TraditionalReassemblyStreams::UnorderedStream::EraseTo( size_t TraditionalReassemblyStreams::UnorderedStream::EraseTo(
UnwrappedTSN tsn) { UnwrappedTSN tsn) {
auto end_iter = chunks_.upper_bound(tsn); auto end_iter = chunks_.upper_bound(tsn);
@ -202,20 +213,40 @@ size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessages() {
return assembled_bytes; return assembled_bytes;
} }
size_t
TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessagesFastpath(
UnwrappedSSN ssn,
UnwrappedTSN tsn,
Data data) {
RTC_DCHECK(ssn == next_ssn_);
size_t assembled_bytes = 0;
if (data.is_beginning && data.is_end) {
assembled_bytes += AssembleMessage(tsn, std::move(data));
next_ssn_.Increment();
} else {
size_t queued_bytes = data.size();
auto [iter, inserted] = chunks_by_ssn_[ssn].emplace(tsn, std::move(data));
if (!inserted) {
// Not actually assembled, but deduplicated meaning queued size doesn't
// include this message.
return queued_bytes;
}
}
return assembled_bytes + TryToAssembleMessages();
}
int TraditionalReassemblyStreams::OrderedStream::Add(UnwrappedTSN tsn, int TraditionalReassemblyStreams::OrderedStream::Add(UnwrappedTSN tsn,
Data data) { Data data) {
int queued_bytes = data.size(); int queued_bytes = data.size();
UnwrappedSSN ssn = ssn_unwrapper_.Unwrap(data.ssn); UnwrappedSSN ssn = ssn_unwrapper_.Unwrap(data.ssn);
auto [unused, inserted] = chunks_by_ssn_[ssn].emplace(tsn, std::move(data)); if (ssn == next_ssn_) {
return queued_bytes -
TryToAssembleMessagesFastpath(ssn, tsn, std::move(data));
}
auto [iter, inserted] = chunks_by_ssn_[ssn].emplace(tsn, std::move(data));
if (!inserted) { if (!inserted) {
return 0; return 0;
} }
if (ssn == next_ssn_) {
queued_bytes -= TryToAssembleMessages();
}
return queued_bytes; return queued_bytes;
} }

View File

@ -55,6 +55,7 @@ class TraditionalReassemblyStreams : public ReassemblyStreams {
: parent_(*parent) {} : parent_(*parent) {}
size_t AssembleMessage(ChunkMap::iterator start, ChunkMap::iterator end); size_t AssembleMessage(ChunkMap::iterator start, ChunkMap::iterator end);
size_t AssembleMessage(UnwrappedTSN tsn, Data data);
TraditionalReassemblyStreams& parent_; TraditionalReassemblyStreams& parent_;
}; };
@ -101,6 +102,11 @@ class TraditionalReassemblyStreams : public ReassemblyStreams {
// Returns the number of bytes assembled if a message was assembled. // Returns the number of bytes assembled if a message was assembled.
size_t TryToAssembleMessage(); size_t TryToAssembleMessage();
size_t TryToAssembleMessages(); size_t TryToAssembleMessages();
// Same as above but when inserting the first complete message avoid
// insertion into the map.
size_t TryToAssembleMessagesFastpath(UnwrappedSSN ssn,
UnwrappedTSN tsn,
Data data);
// This must be an ordered container to be able to iterate in SSN order. // This must be an ordered container to be able to iterate in SSN order.
std::map<UnwrappedSSN, ChunkMap> chunks_by_ssn_; std::map<UnwrappedSSN, ChunkMap> chunks_by_ssn_;
UnwrappedSSN::Unwrapper ssn_unwrapper_; UnwrappedSSN::Unwrapper ssn_unwrapper_;