Revert "dcsctp: Use rtc::CopyOnWriteBuffer"

This reverts commit 2db59a6584eca54245794a0e657ca9ded9e6707f.

Reason for revert: Causes msan-issue in crc32c, reading uninitialized
memory.

Bug: webrtc:12943, chromium:1275559
Change-Id: I05f1012d896aeaca86c4562e0df15fa7ea326d60
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/239560
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35461}
This commit is contained in:
Victor Boivie 2021-12-01 18:57:22 +00:00 committed by WebRTC LUCI CQ
parent 4ad09762da
commit 4b7024b572
19 changed files with 86 additions and 132 deletions

View File

@ -29,7 +29,6 @@
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/socket/dcsctp_socket.h"
#include "net/dcsctp/socket/state_cookie.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/logging.h"
namespace dcsctp {
@ -168,9 +167,9 @@ void MakeDataChunk(FuzzState& state, SctpPacket::Builder& b) {
options.is_unordered = IsUnordered(state.GetByte() != 0);
options.is_beginning = Data::IsBeginning(state.GetByte() != 0);
options.is_end = Data::IsEnd(state.GetByte() != 0);
rtc::CopyOnWriteBuffer payload(10);
b.Add(DataChunk(state.GetNextTSN(), StreamID(state.GetByte()),
SSN(state.GetByte()), PPID(53), payload, options));
SSN(state.GetByte()), PPID(53), std::vector<uint8_t>(10),
options));
}
void MakeInitChunk(FuzzState& state, SctpPacket::Builder& b) {
@ -285,7 +284,7 @@ void MakeIDataChunk(FuzzState& state, SctpPacket::Builder& b) {
options.is_end = Data::IsEnd(state.GetByte() != 0);
b.Add(IDataChunk(state.GetNextTSN(), StreamID(state.GetByte()),
state.GetNextMID(), PPID(53), FSN(0),
rtc::CopyOnWriteBuffer(10), options));
std::vector<uint8_t>(10), options));
}
void MakeIForwardTsnChunk(FuzzState& state, SctpPacket::Builder& b) {

View File

@ -20,8 +20,6 @@
#include "net/dcsctp/packet/bounded_byte_reader.h"
#include "net/dcsctp/packet/bounded_byte_writer.h"
#include "net/dcsctp/packet/chunk/data_common.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h"
namespace dcsctp {
@ -66,7 +64,9 @@ absl::optional<DataChunk> DataChunk::Parse(rtc::ArrayView<const uint8_t> data) {
ImmediateAckFlag((flags & (1 << kFlagsBitImmediateAck)) != 0);
return DataChunk(tsn, stream_identifier, ssn, ppid,
rtc::CopyOnWriteBuffer(reader->variable_data()), options);
std::vector<uint8_t>(reader->variable_data().begin(),
reader->variable_data().end()),
options);
}
void DataChunk::SerializeTo(std::vector<uint8_t>& out) const {

View File

@ -23,7 +23,6 @@
#include "net/dcsctp/packet/chunk/data_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/tlv_trait.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@ -46,7 +45,7 @@ class DataChunk : public AnyDataChunk, public TLVTrait<DataChunkConfig> {
StreamID stream_id,
SSN ssn,
PPID ppid,
rtc::CopyOnWriteBuffer payload,
std::vector<uint8_t> payload,
const Options& options)
: AnyDataChunk(tsn,
stream_id,

View File

@ -15,7 +15,6 @@
#include "api/array_view.h"
#include "net/dcsctp/testing/testing_macros.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/gunit.h"
#include "test/gmock.h"
@ -52,9 +51,8 @@ TEST(DataChunkTest, FromCapture) {
}
TEST(DataChunkTest, SerializeAndDeserialize) {
rtc::CopyOnWriteBuffer payload({1, 2, 3, 4, 5});
DataChunk chunk(TSN(123), StreamID(456), SSN(789), PPID(9090),
std::move(payload),
/*payload=*/{1, 2, 3, 4, 5},
/*options=*/{});
std::vector<uint8_t> serialized;

View File

@ -17,7 +17,6 @@
#include "api/array_view.h"
#include "net/dcsctp/packet/chunk/chunk.h"
#include "net/dcsctp/packet/data.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@ -63,7 +62,7 @@ class AnyDataChunk : public Chunk {
MID message_id,
FSN fsn,
PPID ppid,
rtc::CopyOnWriteBuffer payload,
std::vector<uint8_t> payload,
const Options& options)
: tsn_(tsn),
data_(stream_id,

View File

@ -68,7 +68,9 @@ absl::optional<IDataChunk> IDataChunk::Parse(
return IDataChunk(tsn, stream_identifier, message_id,
PPID(options.is_beginning ? ppid_or_fsn : 0),
FSN(options.is_beginning ? 0 : ppid_or_fsn),
rtc::CopyOnWriteBuffer(reader->variable_data()), options);
std::vector<uint8_t>(reader->variable_data().begin(),
reader->variable_data().end()),
options);
}
void IDataChunk::SerializeTo(std::vector<uint8_t>& out) const {

View File

@ -23,7 +23,6 @@
#include "net/dcsctp/packet/chunk/data_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/tlv_trait.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@ -46,7 +45,7 @@ class IDataChunk : public AnyDataChunk, public TLVTrait<IDataChunkConfig> {
MID message_id,
PPID ppid,
FSN fsn,
rtc::CopyOnWriteBuffer payload,
std::vector<uint8_t> payload,
const Options& options)
: AnyDataChunk(tsn,
stream_id,

View File

@ -52,8 +52,8 @@ TEST(IDataChunkTest, AtBeginningFromCapture) {
TEST(IDataChunkTest, AtBeginningSerializeAndDeserialize) {
IDataChunk::Options options;
options.is_beginning = Data::IsBeginning(true);
IDataChunk chunk(TSN(123), StreamID(456), MID(789), PPID(53), FSN(0),
rtc::CopyOnWriteBuffer({1, 2, 4}), options);
IDataChunk chunk(TSN(123), StreamID(456), MID(789), PPID(53), FSN(0), {1},
options);
std::vector<uint8_t> serialized;
chunk.SerializeTo(serialized);
@ -68,7 +68,7 @@ TEST(IDataChunkTest, AtBeginningSerializeAndDeserialize) {
EXPECT_EQ(deserialized.ToString(),
"I-DATA, type=ordered::first, tsn=123, stream_id=456, "
"message_id=789, ppid=53, length=3");
"message_id=789, ppid=53, length=1");
}
TEST(IDataChunkTest, InMiddleFromCapture) {
@ -100,7 +100,7 @@ TEST(IDataChunkTest, InMiddleFromCapture) {
TEST(IDataChunkTest, InMiddleSerializeAndDeserialize) {
IDataChunk chunk(TSN(123), StreamID(456), MID(789), PPID(0), FSN(101112),
rtc::CopyOnWriteBuffer({1, 2, 3}), /*options=*/{});
{1, 2, 3}, /*options=*/{});
std::vector<uint8_t> serialized;
chunk.SerializeTo(serialized);

View File

@ -16,7 +16,6 @@
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/public/types.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@ -46,7 +45,7 @@ struct Data {
MID message_id,
FSN fsn,
PPID ppid,
rtc::CopyOnWriteBuffer payload,
std::vector<uint8_t> payload,
IsBeginning is_beginning,
IsEnd is_end,
IsUnordered is_unordered)
@ -91,7 +90,7 @@ struct Data {
PPID ppid;
// The actual data payload.
rtc::CopyOnWriteBuffer payload;
std::vector<uint8_t> payload;
// If this data represents the first, last or a middle chunk.
IsBeginning is_beginning;

View File

@ -242,10 +242,10 @@ TEST(SctpPacketTest, SerializeAndDeserializeThreeChunks) {
{SackChunk::GapAckBlock(2, 3)},
/*duplicate_tsns=*/{TSN(1), TSN(2), TSN(3)}));
b.Add(DataChunk(TSN(123), StreamID(456), SSN(789), PPID(9090),
/*payload=*/rtc::CopyOnWriteBuffer({1, 2, 3, 4, 5}),
/*payload=*/{1, 2, 3, 4, 5},
/*options=*/{}));
b.Add(DataChunk(TSN(124), StreamID(654), SSN(987), PPID(909),
/*payload=*/rtc::CopyOnWriteBuffer({5, 4, 3, 3, 1}),
/*payload=*/{5, 4, 3, 3, 1},
/*options=*/{}));
std::vector<uint8_t> serialized = b.Build();
@ -319,7 +319,7 @@ TEST(SctpPacketTest, ReturnsCorrectSpaceAvailableToStayWithinMTU) {
// Add a smaller packet first.
DataChunk::Options data_options;
rtc::CopyOnWriteBuffer payload1(183);
std::vector<uint8_t> payload1(183);
builder.Add(
DataChunk(TSN(1), StreamID(1), SSN(0), PPID(53), payload1, data_options));
@ -328,7 +328,7 @@ TEST(SctpPacketTest, ReturnsCorrectSpaceAvailableToStayWithinMTU) {
kMaxPacketSize - kSctpHeaderSize - chunk1_size);
EXPECT_EQ(builder.bytes_remaining(), 976u); // Hand-calculated.
rtc::CopyOnWriteBuffer payload2(957);
std::vector<uint8_t> payload2(957);
builder.Add(
DataChunk(TSN(1), StreamID(1), SSN(0), PPID(53), payload2, data_options));

View File

@ -16,7 +16,6 @@
#include "api/array_view.h"
#include "net/dcsctp/public/types.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@ -25,14 +24,7 @@ namespace dcsctp {
// identifier (`ppid`).
class DcSctpMessage {
public:
// For best performance, please use the other constructor for zero-copy.
DcSctpMessage(StreamID stream_id, PPID ppid, std::vector<uint8_t> payload)
: stream_id_(stream_id), ppid_(ppid), payload_(payload) {}
explicit DcSctpMessage(StreamID stream_id,
PPID ppid,
rtc::CopyOnWriteBuffer payload,
bool)
: stream_id_(stream_id), ppid_(ppid), payload_(std::move(payload)) {}
DcSctpMessage(DcSctpMessage&& other) = default;
@ -48,25 +40,14 @@ class DcSctpMessage {
// The payload of the message.
rtc::ArrayView<const uint8_t> payload() const { return payload_; }
const rtc::CopyOnWriteBuffer& buffer_payload() const { return payload_; }
// When destructing the message, extracts the payload.
// Deprecated method - please use `ReleaseBufferPayload`.
ABSL_DEPRECATED("Use ReleaseBufferPayload instead")
std::vector<uint8_t> ReleasePayload() && {
return std::vector<uint8_t>(payload_.cdata(),
payload_.cdata() + payload_.size());
}
// When destructing the message, extracts the payload.
rtc::CopyOnWriteBuffer ReleaseBufferPayload() && {
return std::move(payload_);
}
std::vector<uint8_t> ReleasePayload() && { return std::move(payload_); }
private:
StreamID stream_id_;
PPID ppid_;
rtc::CopyOnWriteBuffer payload_;
std::vector<uint8_t> payload_;
};
} // namespace dcsctp

View File

@ -99,9 +99,10 @@ TEST_F(ReassemblyQueueTest, LargeUnorderedChunkAllPermutations) {
Data::IsBeginning is_beginning(tsns[i] == 10);
Data::IsEnd is_end(tsns[i] == 13);
reasm.Add(TSN(tsns[i]), Data(kStreamID, kSSN, kMID, kFSN, kPPID,
rtc::CopyOnWriteBuffer(span), is_beginning,
is_end, IsUnordered(false)));
reasm.Add(TSN(tsns[i]),
Data(kStreamID, kSSN, kMID, kFSN, kPPID,
std::vector<uint8_t>(span.begin(), span.end()),
is_beginning, is_end, IsUnordered(false)));
if (i < 3) {
EXPECT_FALSE(reasm.HasMessages());
} else {
@ -134,9 +135,10 @@ TEST_F(ReassemblyQueueTest, ManySmallOrderedMessages) {
Data::IsEnd is_end(true);
SSN ssn(static_cast<uint16_t>(tsns[i] - 10));
reasm.Add(TSN(tsns[i]), Data(kStreamID, ssn, kMID, kFSN, kPPID,
rtc::CopyOnWriteBuffer(span), is_beginning,
is_end, IsUnordered(false)));
reasm.Add(TSN(tsns[i]),
Data(kStreamID, ssn, kMID, kFSN, kPPID,
std::vector<uint8_t>(span.begin(), span.end()),
is_beginning, is_end, IsUnordered(false)));
}
EXPECT_THAT(
reasm.FlushMessages(),

View File

@ -145,32 +145,29 @@ size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage(
const Data& data = start->second;
size_t payload_size = start->second.size();
UnwrappedTSN tsns[1] = {start->first};
DcSctpMessage message(data.stream_id, data.ppid, data.payload, false);
DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;
}
// Slow path - will need to concatenate the payload.
std::vector<UnwrappedTSN> tsns;
std::vector<uint8_t> payload;
size_t payload_size = std::accumulate(
start, end, 0,
[](size_t v, const auto& p) { return v + p.second.size(); });
tsns.reserve(count);
rtc::CopyOnWriteBuffer payload(payload_size);
size_t offset = 0;
payload.reserve(payload_size);
for (auto it = start; it != end; ++it) {
const Data& data = it->second;
tsns.push_back(it->first);
memcpy(reinterpret_cast<void*>(payload.MutableData() + offset),
data.payload.cdata(), data.payload.size());
offset += data.payload.size();
payload.insert(payload.end(), data.payload.begin(), data.payload.end());
}
DcSctpMessage message(start->second.stream_id, start->second.ppid,
std::move(payload), false);
std::move(payload));
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;

View File

@ -977,7 +977,7 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) {
AnyDataChunk::ImmediateAckFlag immediate_ack = chunk.options().immediate_ack;
Data data = std::move(chunk).extract();
if (data.payload.size() == 0) {
if (data.payload.empty()) {
// Empty DATA chunks are illegal.
packet_sender_.Send(tcb_->PacketBuilder().Add(
ErrorChunk(Parameters::Builder().Add(NoUserDataCause(tsn)).Build())));

View File

@ -1396,10 +1396,8 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
opts.is_beginning = Data::IsBeginning(true);
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(
tsn, StreamID(1), SSN(0), PPID(53),
rtc::CopyOnWriteBuffer(std::vector<uint8_t>(kWatermarkLimit + 1)),
opts))
.Add(DataChunk(tsn, StreamID(1), SSN(0), PPID(53),
std::vector<uint8_t>(kWatermarkLimit + 1), opts))
.Build());
// First DATA will always trigger a SACK. It's not interesting.
@ -1407,12 +1405,11 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
AllOf(HasSackWithCumAckTsn(tsn), HasSackWithNoGapAckBlocks()));
// This DATA should be accepted - it's advancing cum ack tsn.
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 1), StreamID(1), SSN(0), PPID(53),
rtc::CopyOnWriteBuffer(std::vector<uint8_t>(1)),
/*options=*/{}))
.Build());
sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 1), StreamID(1), SSN(0),
PPID(53), std::vector<uint8_t>(1),
/*options=*/{}))
.Build());
// The receiver might have moved into delayed ack mode.
cb_z2.AdvanceTime(options.rto_initial);
@ -1423,12 +1420,11 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks()));
// This DATA will not be accepted - it's not advancing cum ack tsn.
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0), PPID(53),
rtc::CopyOnWriteBuffer(std::vector<uint8_t>(1)),
/*options=*/{}))
.Build());
sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0),
PPID(53), std::vector<uint8_t>(1),
/*options=*/{}))
.Build());
// Sack will be sent in IMMEDIATE mode when this is happening.
EXPECT_THAT(
@ -1436,12 +1432,11 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks()));
// This DATA will not be accepted either.
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 4), StreamID(1), SSN(0), PPID(53),
rtc::CopyOnWriteBuffer(std::vector<uint8_t>(1)),
/*options=*/{}))
.Build());
sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(AddTo(tsn, 4), StreamID(1), SSN(0),
PPID(53), std::vector<uint8_t>(1),
/*options=*/{}))
.Build());
// Sack will be sent in IMMEDIATE mode when this is happening.
EXPECT_THAT(
@ -1451,10 +1446,9 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
// This DATA should be accepted, and it fills the reassembly queue.
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(
AddTo(tsn, 2), StreamID(1), SSN(0), PPID(53),
rtc::CopyOnWriteBuffer(std::vector<uint8_t>(kRemainingSize)),
/*options=*/{}))
.Add(DataChunk(AddTo(tsn, 2), StreamID(1), SSN(0), PPID(53),
std::vector<uint8_t>(kRemainingSize),
/*options=*/{}))
.Build());
// The receiver might have moved into delayed ack mode.
@ -1471,10 +1465,9 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) {
// This DATA will make the connection close. It's too full now.
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
.Add(DataChunk(
AddTo(tsn, 3), StreamID(1), SSN(0), PPID(53),
rtc::CopyOnWriteBuffer(std::vector<uint8_t>(kSmallMessageSize)),
/*options=*/{}))
.Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0), PPID(53),
std::vector<uint8_t>(kSmallMessageSize),
/*options=*/{}))
.Build());
}

View File

@ -34,8 +34,8 @@ Data DataGenerator::Ordered(std::vector<uint8_t> payload,
}
MID message_id = opts.message_id.value_or(message_id_);
Data ret = Data(opts.stream_id, SSN(static_cast<uint16_t>(*message_id)),
message_id, fsn_, opts.ppid, rtc::CopyOnWriteBuffer(payload),
is_beginning, is_end, IsUnordered(false));
message_id, fsn_, opts.ppid, std::move(payload), is_beginning,
is_end, IsUnordered(false));
if (is_end) {
message_id_ = MID(*message_id + 1);
@ -56,8 +56,7 @@ Data DataGenerator::Unordered(std::vector<uint8_t> payload,
}
MID message_id = opts.message_id.value_or(message_id_);
Data ret = Data(opts.stream_id, SSN(0), message_id, fsn_, kPpid,
rtc::CopyOnWriteBuffer(payload), is_beginning, is_end,
IsUnordered(true));
std::move(payload), is_beginning, is_end, IsUnordered(true));
if (is_end) {
message_id_ = MID(*message_id + 1);
}

View File

@ -248,7 +248,7 @@ void OutstandingData::AbandonAllFor(const Item& item) {
next_tsn_.Increment();
Data message_end(item.data().stream_id, item.data().ssn,
item.data().message_id, item.data().fsn, item.data().ppid,
rtc::CopyOnWriteBuffer(), Data::IsBeginning(false),
std::vector<uint8_t>(), Data::IsBeginning(false),
Data::IsEnd(true), item.data().is_unordered);
Item& added_item =
outstanding_data_
@ -357,10 +357,10 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
size_t chunk_size = GetSerializedChunkSize(data);
outstanding_bytes_ += chunk_size;
++outstanding_items_;
auto it =
outstanding_data_
.emplace(tsn, Item(data, max_retransmissions, time_sent, expires_at))
.first;
auto it = outstanding_data_
.emplace(tsn, Item(data.Clone(), max_retransmissions, time_sent,
expires_at))
.first;
if (it->second.has_expired(time_sent)) {
// No need to send it - it was expired when it was in the send

View File

@ -147,14 +147,14 @@ class OutstandingData {
kAbandon,
};
explicit Item(const Data& data,
explicit Item(Data data,
MaxRetransmits max_retransmissions,
TimeMs time_sent,
TimeMs expires_at)
: max_retransmissions_(max_retransmissions),
time_sent_(time_sent),
expires_at_(expires_at),
data_(data.Clone()) {}
data_(std::move(data)) {}
TimeMs time_sent() const { return time_sent_; }

View File

@ -9,7 +9,6 @@
*/
#include "net/dcsctp/tx/rr_send_queue.h"
#include <algorithm>
#include <cstdint>
#include <deque>
#include <limits>
@ -25,7 +24,6 @@
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/logging.h"
namespace dcsctp {
@ -98,14 +96,7 @@ bool RRSendQueue::IsConsistent() const {
}
}
if (total_buffered_amount != total_buffered_amount_.value()) {
RTC_DLOG(LS_ERROR) << "Actual total_buffered_amount="
<< total_buffered_amount
<< " != expected total_buffered_amount="
<< total_buffered_amount_.value();
return false;
}
return true;
return total_buffered_amount == total_buffered_amount_.value();
}
bool RRSendQueue::OutgoingStream::IsConsistent() const {
@ -113,13 +104,7 @@ bool RRSendQueue::OutgoingStream::IsConsistent() const {
for (const auto& item : items_) {
bytes += item.remaining_size;
}
if (bytes != buffered_amount_.value()) {
RTC_DLOG(LS_ERROR) << "Actual buffered amount=" << bytes
<< " != expected buffered_amount_="
<< buffered_amount_.value();
return false;
}
return true;
return bytes == buffered_amount_.value();
}
void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
@ -176,20 +161,22 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
}
// Grab the next `max_size` fragment from this message and calculate flags.
size_t actual_size = std::min(max_size, item->remaining_size);
RTC_DCHECK(actual_size > 0);
Data::IsBeginning is_beginning(item->remaining_offset == 0);
Data::IsEnd is_end(actual_size == item->remaining_size);
rtc::ArrayView<const uint8_t> chunk_payload =
item->message.payload().subview(item->remaining_offset, max_size);
rtc::ArrayView<const uint8_t> message_payload = message.payload();
Data::IsBeginning is_beginning(chunk_payload.data() ==
message_payload.data());
Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
(message_payload.data() + message_payload.size()));
StreamID stream_id = message.stream_id();
PPID ppid = message.ppid();
// Zero-copy the payload if the message fits in a single chunk.
rtc::CopyOnWriteBuffer payload =
std::vector<uint8_t> payload =
is_beginning && is_end
? std::move(message).ReleaseBufferPayload()
: message.buffer_payload().Slice(item->remaining_offset, actual_size);
RTC_DCHECK_EQ(payload.size(), actual_size);
? std::move(message).ReleasePayload()
: std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
FSN fsn(item->current_fsn);
item->current_fsn = FSN(*item->current_fsn + 1);
@ -215,8 +202,8 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
// it can safely be discarded.
items_.pop_front();
} else {
item->remaining_offset += actual_size;
item->remaining_size -= actual_size;
item->remaining_offset += chunk_payload.size();
item->remaining_size -= chunk_payload.size();
RTC_DCHECK(item->remaining_offset + item->remaining_size ==
item->message.payload().size());
RTC_DCHECK(item->remaining_size > 0);