dcsctp: Add per-stream-limit, refactor limits.

The limits have been moved out from the Send Queue as they were enforced
outside the queue anyway (in the socket). That was a preparation for
adding even more limits; There is now also a per-stream limit, allowing
individual streams to have one (global) limit, and the entire socket to
have another limit.

These limits are very small in the default options. In Chrome, the limit
is 16MB per stream, so expect the defaults to be updated when the
additional buffering outside dcSCTP is removed.

Bug: chromium:41221056
Change-Id: I9f835be05d349cbfce3e9235d34b5ea0e2fe87d1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/342481
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41895}
This commit is contained in:
Victor Boivie 2024-03-10 21:40:31 +01:00 committed by WebRTC LUCI CQ
parent 0fa90887c5
commit 4c990e2e56
6 changed files with 50 additions and 25 deletions

View File

@ -85,9 +85,13 @@ struct DcSctpOptions {
// buffer is fully utilized. // buffer is fully utilized.
size_t max_receiver_window_buffer_size = 5 * 1024 * 1024; size_t max_receiver_window_buffer_size = 5 * 1024 * 1024;
// Maximum send buffer size. It will not be possible to queue more data than // Send queue total size limit. It will not be possible to queue more data if
// this before sending it. // the queue size is larger than this number.
size_t max_send_buffer_size = 2'000'000; size_t total_send_queue_limit = 2'000'000;
// Per stream send queue size limit. Similar to `total_send_queue_limit`, but
// limiting the size of individual streams.
size_t per_stream_send_queue_limit = 2'000'000;
// A threshold that, when the amount of data in the send buffer goes below // A threshold that, when the amount of data in the send buffer goes below
// this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`. // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`.

View File

@ -215,7 +215,6 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix,
absl::bind_front(&DcSctpSocket::OnSentPacket, this)), absl::bind_front(&DcSctpSocket::OnSentPacket, this)),
send_queue_(log_prefix_, send_queue_(log_prefix_,
&callbacks_, &callbacks_,
options_.max_send_buffer_size,
options_.mtu, options_.mtu,
options_.default_stream_priority, options_.default_stream_priority,
options_.total_buffered_amount_low_threshold) {} options_.total_buffered_amount_low_threshold) {}
@ -544,7 +543,9 @@ SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message,
"Unable to send message as the socket is shutting down"); "Unable to send message as the socket is shutting down");
return SendStatus::kErrorShuttingDown; return SendStatus::kErrorShuttingDown;
} }
if (send_queue_.IsFull()) { if (send_queue_.total_buffered_amount() >= options_.total_send_queue_limit ||
send_queue_.buffered_amount(message.stream_id()) >=
options_.per_stream_send_queue_limit) {
if (lifecycle_id.IsSet()) { if (lifecycle_id.IsSet()) {
callbacks_.OnLifecycleEnd(lifecycle_id); callbacks_.OnLifecycleEnd(lifecycle_id);
} }

View File

@ -1694,6 +1694,37 @@ TEST_P(DcSctpSocketParametrizedTest,
MaybeHandoverSocketAndSendMessage(a, std::move(z)); MaybeHandoverSocketAndSendMessage(a, std::move(z));
} }
TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) {
DcSctpOptions options = {.total_send_queue_limit = 4000,
.per_stream_send_queue_limit = 1000};
SocketUnderTest a("A", options);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kErrorResourceExhaustion);
// The per-stream limit for SID=1 is reached, but not SID=2.
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kErrorResourceExhaustion);
}
TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) { TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) {
SocketUnderTest a("A"); SocketUnderTest a("A");
auto z = std::make_unique<SocketUnderTest>("Z"); auto z = std::make_unique<SocketUnderTest>("Z");

View File

@ -35,13 +35,11 @@ using ::webrtc::Timestamp;
RRSendQueue::RRSendQueue(absl::string_view log_prefix, RRSendQueue::RRSendQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks, DcSctpSocketCallbacks* callbacks,
size_t buffer_size,
size_t mtu, size_t mtu,
StreamPriority default_priority, StreamPriority default_priority,
size_t total_buffered_amount_low_threshold) size_t total_buffered_amount_low_threshold)
: log_prefix_(log_prefix), : log_prefix_(log_prefix),
callbacks_(*callbacks), callbacks_(*callbacks),
buffer_size_(buffer_size),
default_priority_(default_priority), default_priority_(default_priority),
scheduler_(log_prefix_, mtu), scheduler_(log_prefix_, mtu),
total_buffered_amount_( total_buffered_amount_(
@ -379,10 +377,6 @@ void RRSendQueue::Add(Timestamp now,
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
} }
bool RRSendQueue::IsFull() const {
return total_buffered_amount() >= buffer_size_;
}
bool RRSendQueue::IsEmpty() const { bool RRSendQueue::IsEmpty() const {
return total_buffered_amount() == 0; return total_buffered_amount() == 0;
} }

View File

@ -56,7 +56,6 @@ class RRSendQueue : public SendQueue {
public: public:
RRSendQueue(absl::string_view log_prefix, RRSendQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks, DcSctpSocketCallbacks* callbacks,
size_t buffer_size,
size_t mtu, size_t mtu,
StreamPriority default_priority, StreamPriority default_priority,
size_t total_buffered_amount_low_threshold); size_t total_buffered_amount_low_threshold);
@ -271,7 +270,6 @@ class RRSendQueue : public SendQueue {
const absl::string_view log_prefix_; const absl::string_view log_prefix_;
DcSctpSocketCallbacks& callbacks_; DcSctpSocketCallbacks& callbacks_;
const size_t buffer_size_;
const StreamPriority default_priority_; const StreamPriority default_priority_;
OutgoingMessageId current_message_id = OutgoingMessageId(0); OutgoingMessageId current_message_id = OutgoingMessageId(0);
StreamScheduler scheduler_; StreamScheduler scheduler_;

View File

@ -35,7 +35,6 @@ using ::webrtc::Timestamp;
constexpr Timestamp kNow = Timestamp::Zero(); constexpr Timestamp kNow = Timestamp::Zero();
constexpr StreamID kStreamID(1); constexpr StreamID kStreamID(1);
constexpr PPID kPPID(53); constexpr PPID kPPID(53);
constexpr size_t kMaxQueueSize = 1000;
constexpr StreamPriority kDefaultPriority(10); constexpr StreamPriority kDefaultPriority(10);
constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kBufferedAmountLowThreshold = 500;
constexpr size_t kOneFragmentPacketSize = 100; constexpr size_t kOneFragmentPacketSize = 100;
@ -47,7 +46,7 @@ class RRSendQueueTest : public testing::Test {
RRSendQueueTest() RRSendQueueTest()
: buf_("log: ", : buf_("log: ",
&callbacks_, &callbacks_,
kMaxQueueSize,
kMtu, kMtu,
kDefaultPriority, kDefaultPriority,
kBufferedAmountLowThreshold) {} kBufferedAmountLowThreshold) {}
@ -60,14 +59,12 @@ class RRSendQueueTest : public testing::Test {
TEST_F(RRSendQueueTest, EmptyBuffer) { TEST_F(RRSendQueueTest, EmptyBuffer) {
EXPECT_TRUE(buf_.IsEmpty()); EXPECT_TRUE(buf_.IsEmpty());
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
EXPECT_FALSE(buf_.IsFull());
} }
TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
EXPECT_FALSE(buf_.IsEmpty()); EXPECT_FALSE(buf_.IsEmpty());
EXPECT_FALSE(buf_.IsFull());
absl::optional<SendQueue::DataToSend> chunk_opt = absl::optional<SendQueue::DataToSend> chunk_opt =
buf_.Produce(kNow, kOneFragmentPacketSize); buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_opt.has_value()); ASSERT_TRUE(chunk_opt.has_value());
@ -124,30 +121,30 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
std::vector<uint8_t> payload(600); std::vector<uint8_t> payload(600);
EXPECT_FALSE(buf_.IsFull()); EXPECT_LT(buf_.total_buffered_amount(), 1000u);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
EXPECT_FALSE(buf_.IsFull()); EXPECT_LT(buf_.total_buffered_amount(), 1000u);
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
EXPECT_TRUE(buf_.IsFull()); EXPECT_GE(buf_.total_buffered_amount(), 1000u);
// However, it's still possible to add messages. It's a soft limit, and it // However, it's still possible to add messages. It's a soft limit, and it
// might be necessary to forcefully add messages due to e.g. external // might be necessary to forcefully add messages due to e.g. external
// fragmentation. // fragmentation.
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
EXPECT_TRUE(buf_.IsFull()); EXPECT_GE(buf_.total_buffered_amount(), 1000u);
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000); absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
ASSERT_TRUE(chunk_one.has_value()); ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
EXPECT_EQ(chunk_one->data.ppid, kPPID); EXPECT_EQ(chunk_one->data.ppid, kPPID);
EXPECT_TRUE(buf_.IsFull()); EXPECT_GE(buf_.total_buffered_amount(), 1000u);
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000); absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
ASSERT_TRUE(chunk_two.has_value()); ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_two->data.ppid, PPID(54)); EXPECT_EQ(chunk_two->data.ppid, PPID(54));
EXPECT_FALSE(buf_.IsFull()); EXPECT_LT(buf_.total_buffered_amount(), 1000u);
EXPECT_FALSE(buf_.IsEmpty()); EXPECT_FALSE(buf_.IsEmpty());
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000); absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
@ -155,7 +152,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
EXPECT_EQ(chunk_three->data.ppid, PPID(55)); EXPECT_EQ(chunk_three->data.ppid, PPID(55));
EXPECT_FALSE(buf_.IsFull()); EXPECT_LT(buf_.total_buffered_amount(), 1000u);
EXPECT_TRUE(buf_.IsEmpty()); EXPECT_TRUE(buf_.IsEmpty());
} }
@ -813,7 +810,7 @@ TEST_F(RRSendQueueTest, WillHandoverPriority) {
DcSctpSocketHandoverState state; DcSctpSocketHandoverState state;
buf_.AddHandoverState(state); buf_.AddHandoverState(state);
RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority, RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority,
kBufferedAmountLowThreshold); kBufferedAmountLowThreshold);
q2.RestoreFromState(state); q2.RestoreFromState(state);
EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42)); EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));