Reapply "dcsctp: Add per-stream-limit, refactor limits."

Keeping the old setting for the total queue size
limit, which avoids breaking a downstream.

This reverts commit 47ce449afaf9ba38785437fdd338630cad24a77b
and relands commit 4c990e2e56157175324e651f95f3d8c6a0e5c030.

Bug: chromium:40072842
Change-Id: I1e7d14b5d0026232d1fc9277172b6947b8be3490
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/343120
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41907}
This commit is contained in:
Victor Boivie 2024-03-14 22:18:25 +01:00 committed by WebRTC LUCI CQ
parent 1ee24a650c
commit 2fc097ea83
6 changed files with 49 additions and 24 deletions

View File

@ -85,10 +85,14 @@ struct DcSctpOptions {
// buffer is fully utilized.
size_t max_receiver_window_buffer_size = 5 * 1024 * 1024;
// Maximum send buffer size. It will not be possible to queue more data than
// this before sending it.
// Send queue total size limit. It will not be possible to queue more data if
// the queue size is larger than this number.
size_t max_send_buffer_size = 2'000'000;
// Per stream send queue size limit. Similar to `max_send_buffer_size`, but
// limiting the size of individual streams.
size_t per_stream_send_queue_limit = 2'000'000;
// A threshold that, when the amount of data in the send buffer goes below
// this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`.
size_t total_buffered_amount_low_threshold = 1'800'000;

View File

@ -215,7 +215,6 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix,
absl::bind_front(&DcSctpSocket::OnSentPacket, this)),
send_queue_(log_prefix_,
&callbacks_,
options_.max_send_buffer_size,
options_.mtu,
options_.default_stream_priority,
options_.total_buffered_amount_low_threshold) {}
@ -544,7 +543,9 @@ SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message,
"Unable to send message as the socket is shutting down");
return SendStatus::kErrorShuttingDown;
}
if (send_queue_.IsFull()) {
if (send_queue_.total_buffered_amount() >= options_.max_send_buffer_size ||
send_queue_.buffered_amount(message.stream_id()) >=
options_.per_stream_send_queue_limit) {
if (lifecycle_id.IsSet()) {
callbacks_.OnLifecycleEnd(lifecycle_id);
}

View File

@ -1694,6 +1694,37 @@ TEST_P(DcSctpSocketParametrizedTest,
MaybeHandoverSocketAndSendMessage(a, std::move(z));
}
TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) {
DcSctpOptions options = {.max_send_buffer_size = 4000,
.per_stream_send_queue_limit = 1000};
SocketUnderTest a("A", options);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kErrorResourceExhaustion);
// The per-stream limit for SID=1 is reached, but not SID=2.
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kSuccess);
EXPECT_EQ(a.socket.Send(
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
kSendOptions),
SendStatus::kErrorResourceExhaustion);
}
TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) {
SocketUnderTest a("A");
auto z = std::make_unique<SocketUnderTest>("Z");

View File

@ -35,13 +35,11 @@ using ::webrtc::Timestamp;
RRSendQueue::RRSendQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks,
size_t buffer_size,
size_t mtu,
StreamPriority default_priority,
size_t total_buffered_amount_low_threshold)
: log_prefix_(log_prefix),
callbacks_(*callbacks),
buffer_size_(buffer_size),
default_priority_(default_priority),
scheduler_(log_prefix_, mtu),
total_buffered_amount_(
@ -379,10 +377,6 @@ void RRSendQueue::Add(Timestamp now,
RTC_DCHECK(IsConsistent());
}
bool RRSendQueue::IsFull() const {
return total_buffered_amount() >= buffer_size_;
}
bool RRSendQueue::IsEmpty() const {
return total_buffered_amount() == 0;
}

View File

@ -56,7 +56,6 @@ class RRSendQueue : public SendQueue {
public:
RRSendQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks,
size_t buffer_size,
size_t mtu,
StreamPriority default_priority,
size_t total_buffered_amount_low_threshold);
@ -271,7 +270,6 @@ class RRSendQueue : public SendQueue {
const absl::string_view log_prefix_;
DcSctpSocketCallbacks& callbacks_;
const size_t buffer_size_;
const StreamPriority default_priority_;
OutgoingMessageId current_message_id = OutgoingMessageId(0);
StreamScheduler scheduler_;

View File

@ -35,7 +35,6 @@ using ::webrtc::Timestamp;
constexpr Timestamp kNow = Timestamp::Zero();
constexpr StreamID kStreamID(1);
constexpr PPID kPPID(53);
constexpr size_t kMaxQueueSize = 1000;
constexpr StreamPriority kDefaultPriority(10);
constexpr size_t kBufferedAmountLowThreshold = 500;
constexpr size_t kOneFragmentPacketSize = 100;
@ -47,7 +46,7 @@ class RRSendQueueTest : public testing::Test {
RRSendQueueTest()
: buf_("log: ",
&callbacks_,
kMaxQueueSize,
kMtu,
kDefaultPriority,
kBufferedAmountLowThreshold) {}
@ -60,14 +59,12 @@ class RRSendQueueTest : public testing::Test {
TEST_F(RRSendQueueTest, EmptyBuffer) {
EXPECT_TRUE(buf_.IsEmpty());
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
EXPECT_FALSE(buf_.IsFull());
}
TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
EXPECT_FALSE(buf_.IsEmpty());
EXPECT_FALSE(buf_.IsFull());
absl::optional<SendQueue::DataToSend> chunk_opt =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_opt.has_value());
@ -124,30 +121,30 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
std::vector<uint8_t> payload(600);
EXPECT_FALSE(buf_.IsFull());
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
EXPECT_FALSE(buf_.IsFull());
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
EXPECT_TRUE(buf_.IsFull());
EXPECT_GE(buf_.total_buffered_amount(), 1000u);
// However, it's still possible to add messages. It's a soft limit, and it
// might be necessary to forcefully add messages due to e.g. external
// fragmentation.
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
EXPECT_TRUE(buf_.IsFull());
EXPECT_GE(buf_.total_buffered_amount(), 1000u);
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
EXPECT_EQ(chunk_one->data.ppid, kPPID);
EXPECT_TRUE(buf_.IsFull());
EXPECT_GE(buf_.total_buffered_amount(), 1000u);
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
EXPECT_FALSE(buf_.IsFull());
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
EXPECT_FALSE(buf_.IsEmpty());
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
@ -155,7 +152,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
EXPECT_EQ(chunk_three->data.ppid, PPID(55));
EXPECT_FALSE(buf_.IsFull());
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
EXPECT_TRUE(buf_.IsEmpty());
}
@ -813,7 +810,7 @@ TEST_F(RRSendQueueTest, WillHandoverPriority) {
DcSctpSocketHandoverState state;
buf_.AddHandoverState(state);
RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority,
RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority,
kBufferedAmountLowThreshold);
q2.RestoreFromState(state);
EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));