dcsctp: Stay within stream while producing from it

The way that the "next stream" was picked when round-robin cycling was
flawed. When a message was produced in its entirety, the "next stream"
would be put at a stream identifier value that was just larger than what
was previously used. And then, for each fragment that was to be created,
it would try to resolve the nearest stream (above or equal to that
number) that had messages to send - always starting from that stream id
that didn't necessarily point to the stream for which fragments were
actually produced.

For example, if the previous stream ID for which a message was fully
produced on was 5, then the next_stream_id would be set to 6, and then
when producing next fragment, it might have produced something from
stream_id=1, because that was the only stream with messages in it. It
wouldn't update next_stream_id at this time; it would still be 6.

After a single fragment had been produced from that stream, a message
was queued on stream_id=6. The next time a fragment was to be produced,
it would not continue one stream_id=1, but instead pick the new stream,
which would suddenly produce a new fragment (with B flag set) while the
previous message (from stream_id=1) wasn't finished yet.

The fix is simple; Just ensure that we continue iterating from where we
ever produce a fragment from.

Bug: webrtc:12832
Change-Id: Icc761c572ed200db607a7609dab1ac6a8aeb2f04
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220938
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34190}
This commit is contained in:
Victor Boivie 2021-06-01 17:06:35 +02:00 committed by WebRTC LUCI CQ
parent f865444877
commit 803fdc4106
2 changed files with 39 additions and 0 deletions

View File

@ -292,6 +292,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
size_t max_size) {
auto start_it = streams_.lower_bound(next_stream_id_);
for (auto it = start_it; it != streams_.end(); ++it) {
next_stream_id_ = it->first;
absl::optional<DataToSend> ret = Produce(it, now, max_size);
if (ret.has_value()) {
return ret;
@ -299,6 +300,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
}
for (auto it = streams_.begin(); it != start_it; ++it) {
next_stream_id_ = it->first;
absl::optional<DataToSend> ret = Produce(it, now, max_size);
if (ret.has_value()) {
return ret;

View File

@ -666,5 +666,42 @@ TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) {
absl::optional<SendQueue::DataToSend> chunk_two =
buf_.Produce(kNow, kOneFragmentPacketSize);
}
TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) {
buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector<uint8_t>(1)));
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
buf_.Produce(kNow, kOneFragmentPacketSize));
EXPECT_EQ(chunk1.data.stream_id, StreamID(5));
EXPECT_THAT(chunk1.data.payload, SizeIs(1));
// Next, it should pick a different stream.
buf_.Add(kNow,
DcSctpMessage(StreamID(1), kPPID,
std::vector<uint8_t>(kOneFragmentPacketSize * 2)));
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
buf_.Produce(kNow, kOneFragmentPacketSize));
EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
// It should still stay on the Stream1 now, even if might be tempted to switch
// to this stream, as it's the stream following 5.
buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1)));
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
buf_.Produce(kNow, kOneFragmentPacketSize));
EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
// After stream id 1 is complete, it's time to do stream 6.
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
buf_.Produce(kNow, kOneFragmentPacketSize));
EXPECT_EQ(chunk4.data.stream_id, StreamID(6));
EXPECT_THAT(chunk4.data.payload, SizeIs(1));
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
}
} // namespace
} // namespace dcsctp