From 4c990e2e56157175324e651f95f3d8c6a0e5c030 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Sun, 10 Mar 2024 21:40:31 +0100 Subject: [PATCH] dcsctp: Add per-stream-limit, refactor limits. The limits have been moved out from the Send Queue as they were enforced outside the queue anyway (in the socket). That was a preparation for adding even more limits; There is now also a per-stream limit, allowing individual streams to have one (global) limit, and the entire socket to have another limit. These limits are very small in the default options. In Chrome, the limit is 16MB per stream, so expect the defaults to be updated when the additional buffering outside dcSCTP is removed. Bug: chromium:41221056 Change-Id: I9f835be05d349cbfce3e9235d34b5ea0e2fe87d1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/342481 Reviewed-by: Florent Castelli Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/main@{#41895} --- net/dcsctp/public/dcsctp_options.h | 10 +++++--- net/dcsctp/socket/dcsctp_socket.cc | 5 ++-- net/dcsctp/socket/dcsctp_socket_test.cc | 31 +++++++++++++++++++++++++ net/dcsctp/tx/rr_send_queue.cc | 6 ----- net/dcsctp/tx/rr_send_queue.h | 2 -- net/dcsctp/tx/rr_send_queue_test.cc | 21 +++++++---------- 6 files changed, 50 insertions(+), 25 deletions(-) diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h index 600e8a362e..6a6a8cd193 100644 --- a/net/dcsctp/public/dcsctp_options.h +++ b/net/dcsctp/public/dcsctp_options.h @@ -85,9 +85,13 @@ struct DcSctpOptions { // buffer is fully utilized. size_t max_receiver_window_buffer_size = 5 * 1024 * 1024; - // Maximum send buffer size. It will not be possible to queue more data than - // this before sending it. - size_t max_send_buffer_size = 2'000'000; + // Send queue total size limit. It will not be possible to queue more data if + // the queue size is larger than this number. + size_t total_send_queue_limit = 2'000'000; + + // Per stream send queue size limit. Similar to `total_send_queue_limit`, but + // limiting the size of individual streams. + size_t per_stream_send_queue_limit = 2'000'000; // A threshold that, when the amount of data in the send buffer goes below // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`. diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 0667e6f899..c92ec37b1c 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -215,7 +215,6 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, absl::bind_front(&DcSctpSocket::OnSentPacket, this)), send_queue_(log_prefix_, &callbacks_, - options_.max_send_buffer_size, options_.mtu, options_.default_stream_priority, options_.total_buffered_amount_low_threshold) {} @@ -544,7 +543,9 @@ SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message, "Unable to send message as the socket is shutting down"); return SendStatus::kErrorShuttingDown; } - if (send_queue_.IsFull()) { + if (send_queue_.total_buffered_amount() >= options_.total_send_queue_limit || + send_queue_.buffered_amount(message.stream_id()) >= + options_.per_stream_send_queue_limit) { if (lifecycle_id.IsSet()) { callbacks_.OnLifecycleEnd(lifecycle_id); } diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index dfe8ba60fe..d64774b5d3 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -1694,6 +1694,37 @@ TEST_P(DcSctpSocketParametrizedTest, MaybeHandoverSocketAndSendMessage(a, std::move(z)); } +TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) { + DcSctpOptions options = {.total_send_queue_limit = 4000, + .per_stream_send_queue_limit = 1000}; + SocketUnderTest a("A", options); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); + // The per-stream limit for SID=1 is reached, but not SID=2. + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); +} + TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) { SocketUnderTest a("A"); auto z = std::make_unique("Z"); diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 3e682fdca6..2193880acf 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -35,13 +35,11 @@ using ::webrtc::Timestamp; RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold) : log_prefix_(log_prefix), callbacks_(*callbacks), - buffer_size_(buffer_size), default_priority_(default_priority), scheduler_(log_prefix_, mtu), total_buffered_amount_( @@ -379,10 +377,6 @@ void RRSendQueue::Add(Timestamp now, RTC_DCHECK(IsConsistent()); } -bool RRSendQueue::IsFull() const { - return total_buffered_amount() >= buffer_size_; -} - bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index b6c359dc1e..1a370a225c 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -56,7 +56,6 @@ class RRSendQueue : public SendQueue { public: RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold); @@ -271,7 +270,6 @@ class RRSendQueue : public SendQueue { const absl::string_view log_prefix_; DcSctpSocketCallbacks& callbacks_; - const size_t buffer_size_; const StreamPriority default_priority_; OutgoingMessageId current_message_id = OutgoingMessageId(0); StreamScheduler scheduler_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 632cd8fc19..9beba950b6 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -35,7 +35,6 @@ using ::webrtc::Timestamp; constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); -constexpr size_t kMaxQueueSize = 1000; constexpr StreamPriority kDefaultPriority(10); constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kOneFragmentPacketSize = 100; @@ -47,7 +46,7 @@ class RRSendQueueTest : public testing::Test { RRSendQueueTest() : buf_("log: ", &callbacks_, - kMaxQueueSize, + kMtu, kDefaultPriority, kBufferedAmountLowThreshold) {} @@ -60,14 +59,12 @@ class RRSendQueueTest : public testing::Test { TEST_F(RRSendQueueTest, EmptyBuffer) { EXPECT_TRUE(buf_.IsEmpty()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); - EXPECT_FALSE(buf_.IsFull()); } TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); EXPECT_FALSE(buf_.IsEmpty()); - EXPECT_FALSE(buf_.IsFull()); absl::optional chunk_opt = buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_opt.has_value()); @@ -124,30 +121,30 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { std::vector payload(600); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); // However, it's still possible to add messages. It's a soft limit, and it // might be necessary to forcefully add messages due to e.g. external // fragmentation. buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional chunk_one = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional chunk_two = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_FALSE(buf_.IsEmpty()); absl::optional chunk_three = buf_.Produce(kNow, 1000); @@ -155,7 +152,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.ppid, PPID(55)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_TRUE(buf_.IsEmpty()); } @@ -813,7 +810,7 @@ TEST_F(RRSendQueueTest, WillHandoverPriority) { DcSctpSocketHandoverState state; buf_.AddHandoverState(state); - RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority, + RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); q2.RestoreFromState(state); EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));