From 2fc097ea830bf2fa303659a6230013a5756399ac Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 14 Mar 2024 22:18:25 +0100 Subject: [PATCH] Reapply "dcsctp: Add per-stream-limit, refactor limits." Keeping the old setting for the total queue size limit, which avoids breaking a downstream. This reverts commit 47ce449afaf9ba38785437fdd338630cad24a77b and relands commit 4c990e2e56157175324e651f95f3d8c6a0e5c030. Bug: chromium:40072842 Change-Id: I1e7d14b5d0026232d1fc9277172b6947b8be3490 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/343120 Commit-Queue: Victor Boivie Reviewed-by: Florent Castelli Cr-Commit-Position: refs/heads/main@{#41907} --- net/dcsctp/public/dcsctp_options.h | 8 +++++-- net/dcsctp/socket/dcsctp_socket.cc | 5 ++-- net/dcsctp/socket/dcsctp_socket_test.cc | 31 +++++++++++++++++++++++++ net/dcsctp/tx/rr_send_queue.cc | 6 ----- net/dcsctp/tx/rr_send_queue.h | 2 -- net/dcsctp/tx/rr_send_queue_test.cc | 21 +++++++---------- 6 files changed, 49 insertions(+), 24 deletions(-) diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h index 600e8a362e..221a856686 100644 --- a/net/dcsctp/public/dcsctp_options.h +++ b/net/dcsctp/public/dcsctp_options.h @@ -85,10 +85,14 @@ struct DcSctpOptions { // buffer is fully utilized. size_t max_receiver_window_buffer_size = 5 * 1024 * 1024; - // Maximum send buffer size. It will not be possible to queue more data than - // this before sending it. + // Send queue total size limit. It will not be possible to queue more data if + // the queue size is larger than this number. size_t max_send_buffer_size = 2'000'000; + // Per stream send queue size limit. Similar to `max_send_buffer_size`, but + // limiting the size of individual streams. + size_t per_stream_send_queue_limit = 2'000'000; + // A threshold that, when the amount of data in the send buffer goes below // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`. size_t total_buffered_amount_low_threshold = 1'800'000; diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 0667e6f899..d197a38386 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -215,7 +215,6 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, absl::bind_front(&DcSctpSocket::OnSentPacket, this)), send_queue_(log_prefix_, &callbacks_, - options_.max_send_buffer_size, options_.mtu, options_.default_stream_priority, options_.total_buffered_amount_low_threshold) {} @@ -544,7 +543,9 @@ SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message, "Unable to send message as the socket is shutting down"); return SendStatus::kErrorShuttingDown; } - if (send_queue_.IsFull()) { + if (send_queue_.total_buffered_amount() >= options_.max_send_buffer_size || + send_queue_.buffered_amount(message.stream_id()) >= + options_.per_stream_send_queue_limit) { if (lifecycle_id.IsSet()) { callbacks_.OnLifecycleEnd(lifecycle_id); } diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index dfe8ba60fe..2d392d62f1 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -1694,6 +1694,37 @@ TEST_P(DcSctpSocketParametrizedTest, MaybeHandoverSocketAndSendMessage(a, std::move(z)); } +TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) { + DcSctpOptions options = {.max_send_buffer_size = 4000, + .per_stream_send_queue_limit = 1000}; + SocketUnderTest a("A", options); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); + // The per-stream limit for SID=1 is reached, but not SID=2. + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); +} + TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) { SocketUnderTest a("A"); auto z = std::make_unique("Z"); diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 3e682fdca6..2193880acf 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -35,13 +35,11 @@ using ::webrtc::Timestamp; RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold) : log_prefix_(log_prefix), callbacks_(*callbacks), - buffer_size_(buffer_size), default_priority_(default_priority), scheduler_(log_prefix_, mtu), total_buffered_amount_( @@ -379,10 +377,6 @@ void RRSendQueue::Add(Timestamp now, RTC_DCHECK(IsConsistent()); } -bool RRSendQueue::IsFull() const { - return total_buffered_amount() >= buffer_size_; -} - bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index b6c359dc1e..1a370a225c 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -56,7 +56,6 @@ class RRSendQueue : public SendQueue { public: RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold); @@ -271,7 +270,6 @@ class RRSendQueue : public SendQueue { const absl::string_view log_prefix_; DcSctpSocketCallbacks& callbacks_; - const size_t buffer_size_; const StreamPriority default_priority_; OutgoingMessageId current_message_id = OutgoingMessageId(0); StreamScheduler scheduler_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 632cd8fc19..9beba950b6 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -35,7 +35,6 @@ using ::webrtc::Timestamp; constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); -constexpr size_t kMaxQueueSize = 1000; constexpr StreamPriority kDefaultPriority(10); constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kOneFragmentPacketSize = 100; @@ -47,7 +46,7 @@ class RRSendQueueTest : public testing::Test { RRSendQueueTest() : buf_("log: ", &callbacks_, - kMaxQueueSize, + kMtu, kDefaultPriority, kBufferedAmountLowThreshold) {} @@ -60,14 +59,12 @@ class RRSendQueueTest : public testing::Test { TEST_F(RRSendQueueTest, EmptyBuffer) { EXPECT_TRUE(buf_.IsEmpty()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); - EXPECT_FALSE(buf_.IsFull()); } TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); EXPECT_FALSE(buf_.IsEmpty()); - EXPECT_FALSE(buf_.IsFull()); absl::optional chunk_opt = buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_opt.has_value()); @@ -124,30 +121,30 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { std::vector payload(600); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); // However, it's still possible to add messages. It's a soft limit, and it // might be necessary to forcefully add messages due to e.g. external // fragmentation. buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional chunk_one = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional chunk_two = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_FALSE(buf_.IsEmpty()); absl::optional chunk_three = buf_.Produce(kNow, 1000); @@ -155,7 +152,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.ppid, PPID(55)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_TRUE(buf_.IsEmpty()); } @@ -813,7 +810,7 @@ TEST_F(RRSendQueueTest, WillHandoverPriority) { DcSctpSocketHandoverState state; buf_.AddHandoverState(state); - RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority, + RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); q2.RestoreFromState(state); EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));