dcsctp: Rename message_id to mid

MID is a RFC8260 property on an I-DATA chunk, replacing the SSN property
on the DATA chunk in non-interleaved message. The MID stands for
"Message Identifier", and it was frequently named "message_id" in the
source code, but sometimes "mid". To be consistent and using the same
terminology as is most common in the RFC, use "mid" everywhere.

This was triggered by the need to introduce yet another "message
identifier" - but for now, this is just a refacotring CL.

Bug: None
Change-Id: I9cca898d9f3a2f162d6f2e4508ec1b4bc8d7308f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/322500
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40876}
This commit is contained in:
Victor Boivie 2023-10-04 14:25:26 +02:00 committed by WebRTC LUCI CQ
parent e338b3f4a0
commit ee0270b67c
23 changed files with 97 additions and 110 deletions

View File

@ -48,7 +48,7 @@ class AnyDataChunk : public Chunk {
StreamID stream_id() const { return data_.stream_id; }
SSN ssn() const { return data_.ssn; }
MID message_id() const { return data_.message_id; }
MID mid() const { return data_.mid; }
FSN fsn() const { return data_.fsn; }
PPID ppid() const { return data_.ppid; }
rtc::ArrayView<const uint8_t> payload() const { return data_.payload; }
@ -59,7 +59,7 @@ class AnyDataChunk : public Chunk {
AnyDataChunk(TSN tsn,
StreamID stream_id,
SSN ssn,
MID message_id,
MID mid,
FSN fsn,
PPID ppid,
std::vector<uint8_t> payload,
@ -67,7 +67,7 @@ class AnyDataChunk : public Chunk {
: tsn_(tsn),
data_(stream_id,
ssn,
message_id,
mid,
fsn,
ppid,
std::move(payload),

View File

@ -24,12 +24,9 @@ class AnyForwardTsnChunk : public Chunk {
public:
struct SkippedStream {
SkippedStream(StreamID stream_id, SSN ssn)
: stream_id(stream_id), ssn(ssn), unordered(false), message_id(0) {}
SkippedStream(IsUnordered unordered, StreamID stream_id, MID message_id)
: stream_id(stream_id),
ssn(0),
unordered(unordered),
message_id(message_id) {}
: stream_id(stream_id), ssn(ssn), unordered(false), mid(0) {}
SkippedStream(IsUnordered unordered, StreamID stream_id, MID mid)
: stream_id(stream_id), ssn(0), unordered(unordered), mid(mid) {}
StreamID stream_id;
@ -38,11 +35,11 @@ class AnyForwardTsnChunk : public Chunk {
// Set for I-FORWARD_TSN
IsUnordered unordered;
MID message_id;
MID mid;
bool operator==(const SkippedStream& other) const {
return stream_id == other.stream_id && ssn == other.ssn &&
unordered == other.unordered && message_id == other.message_id;
unordered == other.unordered && mid == other.mid;
}
};

View File

@ -54,7 +54,7 @@ absl::optional<IDataChunk> IDataChunk::Parse(
uint8_t flags = reader->Load8<1>();
TSN tsn(reader->Load32<4>());
StreamID stream_identifier(reader->Load16<8>());
MID message_id(reader->Load32<12>());
MID mid(reader->Load32<12>());
uint32_t ppid_or_fsn = reader->Load32<16>();
Options options;
@ -65,7 +65,7 @@ absl::optional<IDataChunk> IDataChunk::Parse(
options.immediate_ack =
ImmediateAckFlag((flags & (1 << kFlagsBitImmediateAck)) != 0);
return IDataChunk(tsn, stream_identifier, message_id,
return IDataChunk(tsn, stream_identifier, mid,
PPID(options.is_beginning ? ppid_or_fsn : 0),
FSN(options.is_beginning ? 0 : ppid_or_fsn),
std::vector<uint8_t>(reader->variable_data().begin(),
@ -83,7 +83,7 @@ void IDataChunk::SerializeTo(std::vector<uint8_t>& out) const {
(*options().immediate_ack ? (1 << kFlagsBitImmediateAck) : 0));
writer.Store32<4>(*tsn());
writer.Store16<8>(*stream_id());
writer.Store32<12>(*message_id());
writer.Store32<12>(*mid());
writer.Store32<16>(options().is_beginning ? *ppid() : *fsn());
writer.CopyToVariableData(payload());
}
@ -97,7 +97,7 @@ std::string IDataChunk::ToString() const {
: *options().is_end ? "last"
: "middle")
<< ", tsn=" << *tsn() << ", stream_id=" << *stream_id()
<< ", message_id=" << *message_id();
<< ", mid=" << *mid();
if (*options().is_beginning) {
sb << ", ppid=" << *ppid();

View File

@ -42,7 +42,7 @@ class IDataChunk : public AnyDataChunk, public TLVTrait<IDataChunkConfig> {
static constexpr size_t kHeaderSize = IDataChunkConfig::kHeaderSize;
IDataChunk(TSN tsn,
StreamID stream_id,
MID message_id,
MID mid,
PPID ppid,
FSN fsn,
std::vector<uint8_t> payload,
@ -50,7 +50,7 @@ class IDataChunk : public AnyDataChunk, public TLVTrait<IDataChunkConfig> {
: AnyDataChunk(tsn,
stream_id,
SSN(0),
message_id,
mid,
fsn,
ppid,
std::move(payload),

View File

@ -44,7 +44,7 @@ TEST(IDataChunkTest, AtBeginningFromCapture) {
ASSERT_HAS_VALUE_AND_ASSIGN(IDataChunk chunk, IDataChunk::Parse(data));
EXPECT_EQ(*chunk.tsn(), 2487901653);
EXPECT_EQ(*chunk.stream_id(), 1);
EXPECT_EQ(*chunk.message_id(), 0u);
EXPECT_EQ(*chunk.mid(), 0u);
EXPECT_EQ(*chunk.ppid(), 53u);
EXPECT_EQ(*chunk.fsn(), 0u); // Not provided (so set to zero)
}
@ -62,13 +62,13 @@ TEST(IDataChunkTest, AtBeginningSerializeAndDeserialize) {
IDataChunk::Parse(serialized));
EXPECT_EQ(*deserialized.tsn(), 123u);
EXPECT_EQ(*deserialized.stream_id(), 456u);
EXPECT_EQ(*deserialized.message_id(), 789u);
EXPECT_EQ(*deserialized.mid(), 789u);
EXPECT_EQ(*deserialized.ppid(), 53u);
EXPECT_EQ(*deserialized.fsn(), 0u);
EXPECT_EQ(deserialized.ToString(),
"I-DATA, type=ordered::first, tsn=123, stream_id=456, "
"message_id=789, ppid=53, length=1");
"mid=789, ppid=53, length=1");
}
TEST(IDataChunkTest, InMiddleFromCapture) {
@ -93,7 +93,7 @@ TEST(IDataChunkTest, InMiddleFromCapture) {
ASSERT_HAS_VALUE_AND_ASSIGN(IDataChunk chunk, IDataChunk::Parse(data));
EXPECT_EQ(*chunk.tsn(), 2487901706);
EXPECT_EQ(*chunk.stream_id(), 3u);
EXPECT_EQ(*chunk.message_id(), 1u);
EXPECT_EQ(*chunk.mid(), 1u);
EXPECT_EQ(*chunk.ppid(), 0u); // Not provided (so set to zero)
EXPECT_EQ(*chunk.fsn(), 8u);
}
@ -109,14 +109,14 @@ TEST(IDataChunkTest, InMiddleSerializeAndDeserialize) {
IDataChunk::Parse(serialized));
EXPECT_EQ(*deserialized.tsn(), 123u);
EXPECT_EQ(*deserialized.stream_id(), 456u);
EXPECT_EQ(*deserialized.message_id(), 789u);
EXPECT_EQ(*deserialized.mid(), 789u);
EXPECT_EQ(*deserialized.ppid(), 0u);
EXPECT_EQ(*deserialized.fsn(), 101112u);
EXPECT_THAT(deserialized.payload(), ElementsAre(1, 2, 3));
EXPECT_EQ(deserialized.ToString(),
"I-DATA, type=ordered::middle, tsn=123, stream_id=456, "
"message_id=789, fsn=101112, length=3");
"mid=789, fsn=101112, length=3");
}
} // namespace

View File

@ -68,8 +68,8 @@ absl::optional<IForwardTsnChunk> IForwardTsnChunk::Parse(
StreamID stream_id(sub_reader.Load16<0>());
IsUnordered unordered(sub_reader.Load8<3>() & 0x01);
MID message_id(sub_reader.Load32<4>());
skipped_streams.emplace_back(unordered, stream_id, message_id);
MID mid(sub_reader.Load32<4>());
skipped_streams.emplace_back(unordered, stream_id, mid);
offset += kSkippedStreamBufferSize;
}
RTC_DCHECK(offset == reader->variable_data_size());
@ -89,7 +89,7 @@ void IForwardTsnChunk::SerializeTo(std::vector<uint8_t>& out) const {
sub_writer.Store16<0>(*skipped[i].stream_id);
sub_writer.Store8<3>(skipped[i].unordered ? 1 : 0);
sub_writer.Store32<4>(*skipped[i].message_id);
sub_writer.Store32<4>(*skipped[i].mid);
offset += kSkippedStreamBufferSize;
}
RTC_DCHECK(offset == variable_size);

View File

@ -42,7 +42,7 @@ struct Data {
Data(StreamID stream_id,
SSN ssn,
MID message_id,
MID mid,
FSN fsn,
PPID ppid,
std::vector<uint8_t> payload,
@ -51,7 +51,7 @@ struct Data {
IsUnordered is_unordered)
: stream_id(stream_id),
ssn(ssn),
message_id(message_id),
mid(mid),
fsn(fsn),
ppid(ppid),
payload(std::move(payload)),
@ -65,8 +65,8 @@ struct Data {
// Creates a copy of this `Data` object.
Data Clone() const {
return Data(stream_id, ssn, message_id, fsn, ppid, payload, is_beginning,
is_end, is_unordered);
return Data(stream_id, ssn, mid, fsn, ppid, payload, is_beginning, is_end,
is_unordered);
}
// The size of this data, which translates to the size of its payload.
@ -82,7 +82,7 @@ struct Data {
// Message Identifier (MID) per stream and ordered/unordered. Defined by
// RFC8260, and used together with options.is_unordered and stream_id to
// uniquely identify a message. Used only in I-DATA chunks (not DATA).
MID message_id;
MID mid;
// Fragment Sequence Number (FSN) per stream and ordered/unordered, as above.
FSN fsn;

View File

@ -106,8 +106,8 @@ size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
return payload_size;
}
size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) {
UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id);
size_t InterleavedReassemblyStreams::Stream::EraseTo(MID mid) {
UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(mid);
size_t removed_bytes = 0;
auto it = chunks_by_mid_.begin();
@ -135,7 +135,7 @@ int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered);
RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id);
int queued_bytes = data.size();
UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id);
UnwrappedMID mid = mid_unwrapper_.Unwrap(data.mid);
FSN fsn = data.fsn;
auto [unused, inserted] =
chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
@ -208,7 +208,7 @@ size_t InterleavedReassemblyStreams::HandleForwardTsn(
for (const auto& skipped : skipped_streams) {
removed_bytes +=
GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id))
.EraseTo(skipped.message_id);
.EraseTo(skipped.mid);
}
return removed_bytes;
}

View File

@ -67,7 +67,7 @@ class InterleavedReassemblyStreams : public ReassemblyStreams {
parent_(*parent),
next_mid_(mid_unwrapper_.Unwrap(next_mid)) {}
int Add(UnwrappedTSN tsn, Data data);
size_t EraseTo(MID message_id);
size_t EraseTo(MID mid);
void Reset() {
mid_unwrapper_.Reset();
next_mid_ = mid_unwrapper_.Unwrap(MID(0));

View File

@ -70,8 +70,8 @@ ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
void ReassemblyQueue::Add(TSN tsn, Data data) {
RTC_DCHECK(IsConsistent());
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
<< ", stream=" << *data.stream_id << ":"
<< *data.message_id << ":" << *data.fsn << ", type="
<< ", stream=" << *data.stream_id << ":" << *data.mid
<< ":" << *data.fsn << ", type="
<< (data.is_beginning && data.is_end ? "complete"
: data.is_beginning ? "first"
: data.is_end ? "last"

View File

@ -281,9 +281,9 @@ TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenDeliveredTsnsHaveGap) {
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
DataGeneratorOptions opts;
opts.message_id = MID(0);
opts.mid = MID(0);
reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
opts.message_id = MID(1);
opts.mid = MID(1);
reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
EXPECT_THAT(reasm.FlushMessages(), SizeIs(2));
@ -295,12 +295,12 @@ TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
HandoverReadinessStatus().Add(
HandoverUnreadinessReason::kStreamResetDeferred));
opts.message_id = MID(3);
opts.mid = MID(3);
opts.ppid = PPID(3);
reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm.MaybeResetStreamsDeferred(TSN(11));
opts.message_id = MID(2);
opts.mid = MID(2);
opts.ppid = PPID(2);
reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm.MaybeResetStreamsDeferred(TSN(15));

View File

@ -290,10 +290,10 @@ TEST_F(StreamResetHandlerTest, ResetStreamsNotDeferred) {
TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) {
DataGeneratorOptions opts;
opts.message_id = MID(0);
opts.mid = MID(0);
reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts));
opts.message_id = MID(1);
opts.mid = MID(1);
reasm_->Add(AddTo(kPeerInitialTsn, 1),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
@ -314,25 +314,25 @@ TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) {
EXPECT_THAT(responses, SizeIs(1));
EXPECT_EQ(responses[0].result(), ResponseResult::kInProgress);
opts.message_id = MID(1);
opts.mid = MID(1);
opts.ppid = PPID(5);
reasm_->Add(AddTo(kPeerInitialTsn, 5),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
opts.message_id = MID(0);
opts.mid = MID(0);
opts.ppid = PPID(4);
reasm_->Add(AddTo(kPeerInitialTsn, 4),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
opts.message_id = MID(3);
opts.mid = MID(3);
opts.ppid = PPID(3);
reasm_->Add(AddTo(kPeerInitialTsn, 3),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
opts.message_id = MID(2);
opts.mid = MID(2);
opts.ppid = PPID(2);
reasm_->Add(AddTo(kPeerInitialTsn, 2),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
@ -765,7 +765,7 @@ TEST_F(StreamResetHandlerTest, PerformCloseAfterOneFirstFailing) {
// Let the socket receive the TSN.
DataGeneratorOptions opts;
opts.message_id = MID(0);
opts.mid = MID(0);
reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(kPeerInitialTsn);
data_tracker_->Observe(kPeerInitialTsn);

View File

@ -32,13 +32,13 @@ Data DataGenerator::Ordered(std::vector<uint8_t> payload,
} else {
fsn_ = FSN(*fsn_ + 1);
}
MID message_id = opts.message_id.value_or(message_id_);
Data ret = Data(opts.stream_id, SSN(static_cast<uint16_t>(*message_id)),
message_id, fsn_, opts.ppid, std::move(payload), is_beginning,
is_end, IsUnordered(false));
MID mid = opts.mid.value_or(mid_);
Data ret = Data(opts.stream_id, SSN(static_cast<uint16_t>(*mid)), mid, fsn_,
opts.ppid, std::move(payload), is_beginning, is_end,
IsUnordered(false));
if (is_end) {
message_id_ = MID(*message_id + 1);
mid_ = MID(*mid + 1);
}
return ret;
}
@ -54,11 +54,11 @@ Data DataGenerator::Unordered(std::vector<uint8_t> payload,
} else {
fsn_ = FSN(*fsn_ + 1);
}
MID message_id = opts.message_id.value_or(message_id_);
Data ret = Data(opts.stream_id, SSN(0), message_id, fsn_, kPpid,
std::move(payload), is_beginning, is_end, IsUnordered(true));
MID mid = opts.mid.value_or(mid_);
Data ret = Data(opts.stream_id, SSN(0), mid, fsn_, kPpid, std::move(payload),
is_beginning, is_end, IsUnordered(true));
if (is_end) {
message_id_ = MID(*message_id + 1);
mid_ = MID(*mid + 1);
}
return ret;
}

View File

@ -23,15 +23,14 @@ namespace dcsctp {
struct DataGeneratorOptions {
StreamID stream_id = StreamID(1);
absl::optional<MID> message_id = absl::nullopt;
absl::optional<MID> mid = absl::nullopt;
PPID ppid = PPID(53);
};
// Generates Data with correct sequence numbers, and used only in unit tests.
class DataGenerator {
public:
explicit DataGenerator(MID start_message_id = MID(0))
: message_id_(start_message_id) {}
explicit DataGenerator(MID start_mid = MID(0)) : mid_(start_mid) {}
// Generates ordered "data" with the provided `payload` and flags, which can
// contain "B" for setting the "is_beginning" flag, and/or "E" for setting the
@ -48,10 +47,10 @@ class DataGenerator {
DataGeneratorOptions opts = {});
// Resets the Message ID identifier - simulating a "stream reset".
void ResetStream() { message_id_ = MID(0); }
void ResetStream() { mid_ = MID(0); }
private:
MID message_id_;
MID mid_;
FSN fsn_ = FSN(0);
};
} // namespace dcsctp

View File

@ -34,7 +34,7 @@ class MockSendQueue : public SendQueue {
(override));
MOCK_METHOD(bool,
Discard,
(IsUnordered unordered, StreamID stream_id, MID message_id),
(IsUnordered unordered, StreamID stream_id, MID mid),
(override));
MOCK_METHOD(void, PrepareResetStream, (StreamID stream_id), (override));
MOCK_METHOD(bool, HasStreamsReadyToBeReset, (), (const, override));

View File

@ -261,7 +261,7 @@ bool OutstandingData::NackItem(UnwrappedTSN tsn,
void OutstandingData::AbandonAllFor(const Item& item) {
// Erase all remaining chunks from the producer, if any.
if (discard_from_send_queue_(item.data().is_unordered, item.data().stream_id,
item.data().message_id)) {
item.data().mid)) {
// There were remaining chunks to be produced for this message. Since the
// receiver may have already received all chunks (up till now) for this
// message, we can't just FORWARD-TSN to the last fragment in this
@ -272,10 +272,10 @@ void OutstandingData::AbandonAllFor(const Item& item) {
// TSN in the sent FORWARD-TSN.
UnwrappedTSN tsn = next_tsn_;
next_tsn_.Increment();
Data message_end(item.data().stream_id, item.data().ssn,
item.data().message_id, item.data().fsn, item.data().ppid,
std::vector<uint8_t>(), Data::IsBeginning(false),
Data::IsEnd(true), item.data().is_unordered);
Data message_end(item.data().stream_id, item.data().ssn, item.data().mid,
item.data().fsn, item.data().ppid, std::vector<uint8_t>(),
Data::IsBeginning(false), Data::IsEnd(true),
item.data().is_unordered);
Item& added_item =
outstanding_data_
.emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
@ -295,7 +295,7 @@ void OutstandingData::AbandonAllFor(const Item& item) {
if (!other.is_abandoned() &&
other.data().stream_id == item.data().stream_id &&
other.data().is_unordered == item.data().is_unordered &&
other.data().message_id == item.data().message_id) {
other.data().mid == item.data().mid) {
RTC_DLOG(LS_VERBOSE) << "Marking chunk " << *tsn.Wrap()
<< " as abandoned";
if (other.should_be_retransmitted()) {
@ -378,7 +378,7 @@ void OutstandingData::ExpireOutstandingChunks(TimeMs now) {
// Already abandoned.
} else if (item.is_nacked() && item.has_expired(now)) {
RTC_DLOG(LS_VERBOSE) << "Marking nacked chunk " << *tsn.Wrap()
<< " and message " << *item.data().message_id
<< " and message " << *item.data().mid
<< " as expired";
AbandonAllFor(item);
} else {
@ -419,7 +419,7 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
// queue.
RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk "
<< *it->first.Wrap() << " and message "
<< *it->second.data().message_id << " as expired";
<< *it->second.data().mid << " as expired";
AbandonAllFor(it->second);
RTC_DCHECK(IsConsistent());
return absl::nullopt;
@ -522,15 +522,15 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const {
std::pair<IsUnordered, StreamID> stream_id =
std::make_pair(item.data().is_unordered, item.data().stream_id);
if (item.data().message_id > skipped_per_stream[stream_id]) {
skipped_per_stream[stream_id] = item.data().message_id;
if (item.data().mid > skipped_per_stream[stream_id]) {
skipped_per_stream[stream_id] = item.data().mid;
}
}
std::vector<IForwardTsnChunk::SkippedStream> skipped_streams;
skipped_streams.reserve(skipped_per_stream.size());
for (const auto& [stream, message_id] : skipped_per_stream) {
skipped_streams.emplace_back(stream.first, stream.second, message_id);
for (const auto& [stream, mid] : skipped_per_stream) {
skipped_streams.emplace_back(stream.first, stream.second, mid);
}
return IForwardTsnChunk(new_cumulative_ack.Wrap(),

View File

@ -86,8 +86,8 @@ RetransmissionQueue::RetransmissionQueue(
data_chunk_header_size_,
tsn_unwrapper_.Unwrap(my_initial_tsn),
tsn_unwrapper_.Unwrap(TSN(*my_initial_tsn - 1)),
[this](IsUnordered unordered, StreamID stream_id, MID message_id) {
return send_queue_.Discard(unordered, stream_id, message_id);
[this](IsUnordered unordered, StreamID stream_id, MID mid) {
return send_queue_.Discard(unordered, stream_id, mid);
}) {}
bool RetransmissionQueue::IsConsistent() const {

View File

@ -1067,7 +1067,7 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
RetransmissionQueue queue = CreateQueue();
DataGeneratorOptions options;
options.stream_id = StreamID(17);
options.message_id = MID(42);
options.mid = MID(42);
TimeMs test_start = now_;
EXPECT_CALL(producer_, Produce)
.WillOnce([&](TimeMs, size_t) {

View File

@ -143,7 +143,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
DcSctpMessage& message = item.message;
// Allocate Message ID and SSN when the first fragment is sent.
if (!item.message_id.has_value()) {
if (!item.mid.has_value()) {
// Oops, this entire message has already expired. Try the next one.
if (item.attributes.expires_at <= now) {
HandleMessageExpired(item);
@ -153,7 +153,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
MID& mid =
item.attributes.unordered ? next_unordered_mid_ : next_ordered_mid_;
item.message_id = mid;
item.mid = mid;
mid = MID(*mid + 1);
}
if (!item.attributes.unordered && !item.ssn.has_value()) {
@ -184,10 +184,9 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
buffered_amount_.Decrease(payload.size());
parent_.total_buffered_amount_.Decrease(payload.size());
SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)),
item.message_id.value(), fsn, ppid,
std::move(payload), is_beginning, is_end,
item.attributes.unordered));
SendQueue::DataToSend chunk(Data(
stream_id, item.ssn.value_or(SSN(0)), item.mid.value(), fsn, ppid,
std::move(payload), is_beginning, is_end, item.attributes.unordered));
chunk.max_retransmissions = item.attributes.max_retransmissions;
chunk.expires_at = item.attributes.expires_at;
chunk.lifecycle_id =
@ -231,13 +230,12 @@ void RRSendQueue::OutgoingStream::HandleMessageExpired(
}
}
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
MID message_id) {
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, MID mid) {
bool result = false;
if (!items_.empty()) {
Item& item = items_.front();
if (item.attributes.unordered == unordered && item.message_id.has_value() &&
*item.message_id == message_id) {
if (item.attributes.unordered == unordered && item.mid.has_value() &&
*item.mid == mid) {
HandleMessageExpired(item);
items_.pop_front();
@ -329,7 +327,7 @@ void RRSendQueue::OutgoingStream::Reset() {
item.remaining_size);
item.remaining_offset = 0;
item.remaining_size = item.message.payload().size();
item.message_id = absl::nullopt;
item.mid = absl::nullopt;
item.ssn = absl::nullopt;
item.current_fsn = FSN(0);
if (old_pause_state == PauseState::kPaused ||
@ -344,7 +342,7 @@ bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
if (items_.empty()) {
return false;
}
return items_.front().message_id.has_value();
return items_.front().mid.has_value();
}
void RRSendQueue::Add(TimeMs now,
@ -386,11 +384,8 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
return scheduler_.Produce(now, max_size);
}
bool RRSendQueue::Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) {
bool has_discarded =
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
bool RRSendQueue::Discard(IsUnordered unordered, StreamID stream_id, MID mid) {
bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(unordered, mid);
RTC_DCHECK(IsConsistent());
return has_discarded;

View File

@ -76,9 +76,7 @@ class RRSendQueue : public SendQueue {
// Implementation of `SendQueue`.
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
bool Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) override;
bool Discard(IsUnordered unordered, StreamID stream_id, MID mid) override;
void PrepareResetStream(StreamID streams) override;
bool HasStreamsReadyToBeReset() const override;
std::vector<StreamID> GetStreamsReadyToBeReset() override;
@ -163,7 +161,7 @@ class RRSendQueue : public SendQueue {
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
// Discards a partially sent message, see `SendQueue::Discard`.
bool Discard(IsUnordered unordered, MID message_id);
bool Discard(IsUnordered unordered, MID mid);
// Pauses this stream, which is used before resetting it.
void Pause();
@ -232,7 +230,7 @@ class RRSendQueue : public SendQueue {
size_t remaining_size;
// If set, an allocated Message ID and SSN. Will be allocated when the
// first fragment is sent.
absl::optional<MID> message_id = absl::nullopt;
absl::optional<MID> mid = absl::nullopt;
absl::optional<SSN> ssn = absl::nullopt;
// The current Fragment Sequence Number, incremented for each fragment.
FSN current_fsn = FSN(0);

View File

@ -228,7 +228,7 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) {
EXPECT_FALSE(chunk_one->data.is_end);
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.message_id);
chunk_one->data.mid);
absl::optional<SendQueue::DataToSend> chunk_two =
buf_.Produce(kNow, kOneFragmentPacketSize);
@ -245,7 +245,7 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) {
// Calling it again shouldn't cause issues.
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.message_id);
chunk_one->data.mid);
ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
}
@ -860,7 +860,7 @@ TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) {
/*maybe_delivered=*/false));
EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.message_id);
chunk_one->data.mid);
}
} // namespace
} // namespace dcsctp

View File

@ -54,7 +54,7 @@ class SendQueue {
virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0;
// Discards a partially sent message identified by the parameters `unordered`,
// `stream_id` and `message_id`. The `message_id` comes from the returned
// `stream_id` and `mid`. The `mid` comes from the returned
// information when having called `Produce`. A partially sent message means
// that it has had at least one fragment of it returned when `Produce` was
// called prior to calling this method).
@ -67,9 +67,7 @@ class SendQueue {
//
// This function returns true if this message had unsent fragments still in
// the queue that were discarded, and false if there were no such fragments.
virtual bool Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) = 0;
virtual bool Discard(IsUnordered unordered, StreamID stream_id, MID mid) = 0;
// Prepares the stream to be reset. This is used to close a WebRTC data
// channel and will be signaled to the other side.

View File

@ -29,8 +29,8 @@ MATCHER_P(HasDataWithMid, mid, "") {
return false;
}
if (arg->data.message_id != mid) {
*result_listener << "the produced data had mid " << *arg->data.message_id
if (arg->data.mid != mid) {
*result_listener << "the produced data had mid " << *arg->data.mid
<< " and not the expected " << *mid;
return false;
}