From 803fdc41066d51d2b4683857fca00adfb309034f Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Tue, 1 Jun 2021 17:06:35 +0200 Subject: [PATCH] dcsctp: Stay within stream while producing from it The way that the "next stream" was picked when round-robin cycling was flawed. When a message was produced in its entirety, the "next stream" would be put at a stream identifier value that was just larger than what was previously used. And then, for each fragment that was to be created, it would try to resolve the nearest stream (above or equal to that number) that had messages to send - always starting from that stream id that didn't necessarily point to the stream for which fragments were actually produced. For example, if the previous stream ID for which a message was fully produced on was 5, then the next_stream_id would be set to 6, and then when producing next fragment, it might have produced something from stream_id=1, because that was the only stream with messages in it. It wouldn't update next_stream_id at this time; it would still be 6. After a single fragment had been produced from that stream, a message was queued on stream_id=6. The next time a fragment was to be produced, it would not continue one stream_id=1, but instead pick the new stream, which would suddenly produce a new fragment (with B flag set) while the previous message (from stream_id=1) wasn't finished yet. The fix is simple; Just ensure that we continue iterating from where we ever produce a fragment from. Bug: webrtc:12832 Change-Id: Icc761c572ed200db607a7609dab1ac6a8aeb2f04 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220938 Reviewed-by: Florent Castelli Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/master@{#34190} --- net/dcsctp/tx/rr_send_queue.cc | 2 ++ net/dcsctp/tx/rr_send_queue_test.cc | 37 +++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 027e5b8271..37c6974c3a 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -292,6 +292,7 @@ absl::optional RRSendQueue::Produce(TimeMs now, size_t max_size) { auto start_it = streams_.lower_bound(next_stream_id_); for (auto it = start_it; it != streams_.end(); ++it) { + next_stream_id_ = it->first; absl::optional ret = Produce(it, now, max_size); if (ret.has_value()) { return ret; @@ -299,6 +300,7 @@ absl::optional RRSendQueue::Produce(TimeMs now, } for (auto it = streams_.begin(); it != start_it; ++it) { + next_stream_id_ = it->first; absl::optional ret = Produce(it, now, max_size); if (ret.has_value()) { return ret; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index e4897b70cb..7a031098a0 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -666,5 +666,42 @@ TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) { absl::optional chunk_two = buf_.Produce(kNow, kOneFragmentPacketSize); } + +TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) { + buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(5)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + + // Next, it should pick a different stream. + + buf_.Add(kNow, + DcSctpMessage(StreamID(1), kPPID, + std::vector(kOneFragmentPacketSize * 2))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); + + // It should still stay on the Stream1 now, even if might be tempted to switch + // to this stream, as it's the stream following 5. + buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + + // After stream id 1 is complete, it's time to do stream 6. + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk4.data.stream_id, StreamID(6)); + EXPECT_THAT(chunk4.data.payload, SizeIs(1)); + + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); +} } // namespace } // namespace dcsctp