diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h index 600e8a362e..6a6a8cd193 100644 --- a/net/dcsctp/public/dcsctp_options.h +++ b/net/dcsctp/public/dcsctp_options.h @@ -85,9 +85,13 @@ struct DcSctpOptions { // buffer is fully utilized. size_t max_receiver_window_buffer_size = 5 * 1024 * 1024; - // Maximum send buffer size. It will not be possible to queue more data than - // this before sending it. - size_t max_send_buffer_size = 2'000'000; + // Send queue total size limit. It will not be possible to queue more data if + // the queue size is larger than this number. + size_t total_send_queue_limit = 2'000'000; + + // Per stream send queue size limit. Similar to `total_send_queue_limit`, but + // limiting the size of individual streams. + size_t per_stream_send_queue_limit = 2'000'000; // A threshold that, when the amount of data in the send buffer goes below // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`. diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 0667e6f899..c92ec37b1c 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -215,7 +215,6 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, absl::bind_front(&DcSctpSocket::OnSentPacket, this)), send_queue_(log_prefix_, &callbacks_, - options_.max_send_buffer_size, options_.mtu, options_.default_stream_priority, options_.total_buffered_amount_low_threshold) {} @@ -544,7 +543,9 @@ SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message, "Unable to send message as the socket is shutting down"); return SendStatus::kErrorShuttingDown; } - if (send_queue_.IsFull()) { + if (send_queue_.total_buffered_amount() >= options_.total_send_queue_limit || + send_queue_.buffered_amount(message.stream_id()) >= + options_.per_stream_send_queue_limit) { if (lifecycle_id.IsSet()) { callbacks_.OnLifecycleEnd(lifecycle_id); } diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index dfe8ba60fe..d64774b5d3 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -1694,6 +1694,37 @@ TEST_P(DcSctpSocketParametrizedTest, MaybeHandoverSocketAndSendMessage(a, std::move(z)); } +TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) { + DcSctpOptions options = {.total_send_queue_limit = 4000, + .per_stream_send_queue_limit = 1000}; + SocketUnderTest a("A", options); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); + // The per-stream limit for SID=1 is reached, but not SID=2. + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); +} + TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) { SocketUnderTest a("A"); auto z = std::make_unique("Z"); diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 3e682fdca6..2193880acf 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -35,13 +35,11 @@ using ::webrtc::Timestamp; RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold) : log_prefix_(log_prefix), callbacks_(*callbacks), - buffer_size_(buffer_size), default_priority_(default_priority), scheduler_(log_prefix_, mtu), total_buffered_amount_( @@ -379,10 +377,6 @@ void RRSendQueue::Add(Timestamp now, RTC_DCHECK(IsConsistent()); } -bool RRSendQueue::IsFull() const { - return total_buffered_amount() >= buffer_size_; -} - bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index b6c359dc1e..1a370a225c 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -56,7 +56,6 @@ class RRSendQueue : public SendQueue { public: RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold); @@ -271,7 +270,6 @@ class RRSendQueue : public SendQueue { const absl::string_view log_prefix_; DcSctpSocketCallbacks& callbacks_; - const size_t buffer_size_; const StreamPriority default_priority_; OutgoingMessageId current_message_id = OutgoingMessageId(0); StreamScheduler scheduler_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 632cd8fc19..9beba950b6 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -35,7 +35,6 @@ using ::webrtc::Timestamp; constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); -constexpr size_t kMaxQueueSize = 1000; constexpr StreamPriority kDefaultPriority(10); constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kOneFragmentPacketSize = 100; @@ -47,7 +46,7 @@ class RRSendQueueTest : public testing::Test { RRSendQueueTest() : buf_("log: ", &callbacks_, - kMaxQueueSize, + kMtu, kDefaultPriority, kBufferedAmountLowThreshold) {} @@ -60,14 +59,12 @@ class RRSendQueueTest : public testing::Test { TEST_F(RRSendQueueTest, EmptyBuffer) { EXPECT_TRUE(buf_.IsEmpty()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); - EXPECT_FALSE(buf_.IsFull()); } TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); EXPECT_FALSE(buf_.IsEmpty()); - EXPECT_FALSE(buf_.IsFull()); absl::optional chunk_opt = buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_opt.has_value()); @@ -124,30 +121,30 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { std::vector payload(600); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); // However, it's still possible to add messages. It's a soft limit, and it // might be necessary to forcefully add messages due to e.g. external // fragmentation. buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional chunk_one = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional chunk_two = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_FALSE(buf_.IsEmpty()); absl::optional chunk_three = buf_.Produce(kNow, 1000); @@ -155,7 +152,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.ppid, PPID(55)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_TRUE(buf_.IsEmpty()); } @@ -813,7 +810,7 @@ TEST_F(RRSendQueueTest, WillHandoverPriority) { DcSctpSocketHandoverState state; buf_.AddHandoverState(state); - RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority, + RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); q2.RestoreFromState(state); EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));