diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h index 6a6a8cd193..600e8a362e 100644 --- a/net/dcsctp/public/dcsctp_options.h +++ b/net/dcsctp/public/dcsctp_options.h @@ -85,13 +85,9 @@ struct DcSctpOptions { // buffer is fully utilized. size_t max_receiver_window_buffer_size = 5 * 1024 * 1024; - // Send queue total size limit. It will not be possible to queue more data if - // the queue size is larger than this number. - size_t total_send_queue_limit = 2'000'000; - - // Per stream send queue size limit. Similar to `total_send_queue_limit`, but - // limiting the size of individual streams. - size_t per_stream_send_queue_limit = 2'000'000; + // Maximum send buffer size. It will not be possible to queue more data than + // this before sending it. + size_t max_send_buffer_size = 2'000'000; // A threshold that, when the amount of data in the send buffer goes below // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`. diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index c92ec37b1c..0667e6f899 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -215,6 +215,7 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, absl::bind_front(&DcSctpSocket::OnSentPacket, this)), send_queue_(log_prefix_, &callbacks_, + options_.max_send_buffer_size, options_.mtu, options_.default_stream_priority, options_.total_buffered_amount_low_threshold) {} @@ -543,9 +544,7 @@ SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message, "Unable to send message as the socket is shutting down"); return SendStatus::kErrorShuttingDown; } - if (send_queue_.total_buffered_amount() >= options_.total_send_queue_limit || - send_queue_.buffered_amount(message.stream_id()) >= - options_.per_stream_send_queue_limit) { + if (send_queue_.IsFull()) { if (lifecycle_id.IsSet()) { callbacks_.OnLifecycleEnd(lifecycle_id); } diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index d64774b5d3..dfe8ba60fe 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -1694,37 +1694,6 @@ TEST_P(DcSctpSocketParametrizedTest, MaybeHandoverSocketAndSendMessage(a, std::move(z)); } -TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) { - DcSctpOptions options = {.total_send_queue_limit = 4000, - .per_stream_send_queue_limit = 1000}; - SocketUnderTest a("A", options); - EXPECT_EQ(a.socket.Send( - DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), - kSendOptions), - SendStatus::kSuccess); - EXPECT_EQ(a.socket.Send( - DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), - kSendOptions), - SendStatus::kSuccess); - EXPECT_EQ(a.socket.Send( - DcSctpMessage(StreamID(1), PPID(53), std::vector(600)), - kSendOptions), - SendStatus::kErrorResourceExhaustion); - // The per-stream limit for SID=1 is reached, but not SID=2. - EXPECT_EQ(a.socket.Send( - DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), - kSendOptions), - SendStatus::kSuccess); - EXPECT_EQ(a.socket.Send( - DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), - kSendOptions), - SendStatus::kSuccess); - EXPECT_EQ(a.socket.Send( - DcSctpMessage(StreamID(2), PPID(53), std::vector(600)), - kSendOptions), - SendStatus::kErrorResourceExhaustion); -} - TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) { SocketUnderTest a("A"); auto z = std::make_unique("Z"); diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 2193880acf..3e682fdca6 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -35,11 +35,13 @@ using ::webrtc::Timestamp; RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, + size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold) : log_prefix_(log_prefix), callbacks_(*callbacks), + buffer_size_(buffer_size), default_priority_(default_priority), scheduler_(log_prefix_, mtu), total_buffered_amount_( @@ -377,6 +379,10 @@ void RRSendQueue::Add(Timestamp now, RTC_DCHECK(IsConsistent()); } +bool RRSendQueue::IsFull() const { + return total_buffered_amount() >= buffer_size_; +} + bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index 1a370a225c..b6c359dc1e 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -56,6 +56,7 @@ class RRSendQueue : public SendQueue { public: RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, + size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold); @@ -270,6 +271,7 @@ class RRSendQueue : public SendQueue { const absl::string_view log_prefix_; DcSctpSocketCallbacks& callbacks_; + const size_t buffer_size_; const StreamPriority default_priority_; OutgoingMessageId current_message_id = OutgoingMessageId(0); StreamScheduler scheduler_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 9beba950b6..632cd8fc19 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -35,6 +35,7 @@ using ::webrtc::Timestamp; constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); +constexpr size_t kMaxQueueSize = 1000; constexpr StreamPriority kDefaultPriority(10); constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kOneFragmentPacketSize = 100; @@ -46,7 +47,7 @@ class RRSendQueueTest : public testing::Test { RRSendQueueTest() : buf_("log: ", &callbacks_, - + kMaxQueueSize, kMtu, kDefaultPriority, kBufferedAmountLowThreshold) {} @@ -59,12 +60,14 @@ class RRSendQueueTest : public testing::Test { TEST_F(RRSendQueueTest, EmptyBuffer) { EXPECT_TRUE(buf_.IsEmpty()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + EXPECT_FALSE(buf_.IsFull()); } TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); EXPECT_FALSE(buf_.IsEmpty()); + EXPECT_FALSE(buf_.IsFull()); absl::optional chunk_opt = buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_opt.has_value()); @@ -121,30 +124,30 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { std::vector payload(600); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); + EXPECT_FALSE(buf_.IsFull()); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); + EXPECT_FALSE(buf_.IsFull()); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); - EXPECT_GE(buf_.total_buffered_amount(), 1000u); + EXPECT_TRUE(buf_.IsFull()); // However, it's still possible to add messages. It's a soft limit, and it // might be necessary to forcefully add messages due to e.g. external // fragmentation. buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); - EXPECT_GE(buf_.total_buffered_amount(), 1000u); + EXPECT_TRUE(buf_.IsFull()); absl::optional chunk_one = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); - EXPECT_GE(buf_.total_buffered_amount(), 1000u); + EXPECT_TRUE(buf_.IsFull()); absl::optional chunk_two = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); + EXPECT_FALSE(buf_.IsFull()); EXPECT_FALSE(buf_.IsEmpty()); absl::optional chunk_three = buf_.Produce(kNow, 1000); @@ -152,7 +155,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.ppid, PPID(55)); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); + EXPECT_FALSE(buf_.IsFull()); EXPECT_TRUE(buf_.IsEmpty()); } @@ -810,7 +813,7 @@ TEST_F(RRSendQueueTest, WillHandoverPriority) { DcSctpSocketHandoverState state; buf_.AddHandoverState(state); - RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority, + RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); q2.RestoreFromState(state); EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));