Revert "dcsctp: Add per-stream-limit, refactor limits."
This reverts commit 4c990e2e56157175324e651f95f3d8c6a0e5c030. Reason for revert: Breaks downstream build. Original change's description: > dcsctp: Add per-stream-limit, refactor limits. > > The limits have been moved out from the Send Queue as they were enforced > outside the queue anyway (in the socket). That was a preparation for > adding even more limits; There is now also a per-stream limit, allowing > individual streams to have one (global) limit, and the entire socket to > have another limit. > > These limits are very small in the default options. In Chrome, the limit > is 16MB per stream, so expect the defaults to be updated when the > additional buffering outside dcSCTP is removed. > > Bug: chromium:41221056 > Change-Id: I9f835be05d349cbfce3e9235d34b5ea0e2fe87d1 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/342481 > Reviewed-by: Florent Castelli <orphis@webrtc.org> > Commit-Queue: Victor Boivie <boivie@webrtc.org> > Cr-Commit-Position: refs/heads/main@{#41895} Bug: chromium:41221056 Change-Id: Icd57fbfca87d6b512cfc7f7682ae709000c2bcad Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/343080 Commit-Queue: Björn Terelius <terelius@webrtc.org> Reviewed-by: Victor Boivie <boivie@webrtc.org> Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com> Cr-Commit-Position: refs/heads/main@{#41901}
This commit is contained in:
parent
29f3a0a728
commit
47ce449afa
@ -85,13 +85,9 @@ struct DcSctpOptions {
|
||||
// buffer is fully utilized.
|
||||
size_t max_receiver_window_buffer_size = 5 * 1024 * 1024;
|
||||
|
||||
// Send queue total size limit. It will not be possible to queue more data if
|
||||
// the queue size is larger than this number.
|
||||
size_t total_send_queue_limit = 2'000'000;
|
||||
|
||||
// Per stream send queue size limit. Similar to `total_send_queue_limit`, but
|
||||
// limiting the size of individual streams.
|
||||
size_t per_stream_send_queue_limit = 2'000'000;
|
||||
// Maximum send buffer size. It will not be possible to queue more data than
|
||||
// this before sending it.
|
||||
size_t max_send_buffer_size = 2'000'000;
|
||||
|
||||
// A threshold that, when the amount of data in the send buffer goes below
|
||||
// this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`.
|
||||
|
||||
@ -215,6 +215,7 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix,
|
||||
absl::bind_front(&DcSctpSocket::OnSentPacket, this)),
|
||||
send_queue_(log_prefix_,
|
||||
&callbacks_,
|
||||
options_.max_send_buffer_size,
|
||||
options_.mtu,
|
||||
options_.default_stream_priority,
|
||||
options_.total_buffered_amount_low_threshold) {}
|
||||
@ -543,9 +544,7 @@ SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message,
|
||||
"Unable to send message as the socket is shutting down");
|
||||
return SendStatus::kErrorShuttingDown;
|
||||
}
|
||||
if (send_queue_.total_buffered_amount() >= options_.total_send_queue_limit ||
|
||||
send_queue_.buffered_amount(message.stream_id()) >=
|
||||
options_.per_stream_send_queue_limit) {
|
||||
if (send_queue_.IsFull()) {
|
||||
if (lifecycle_id.IsSet()) {
|
||||
callbacks_.OnLifecycleEnd(lifecycle_id);
|
||||
}
|
||||
|
||||
@ -1694,37 +1694,6 @@ TEST_P(DcSctpSocketParametrizedTest,
|
||||
MaybeHandoverSocketAndSendMessage(a, std::move(z));
|
||||
}
|
||||
|
||||
TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) {
|
||||
DcSctpOptions options = {.total_send_queue_limit = 4000,
|
||||
.per_stream_send_queue_limit = 1000};
|
||||
SocketUnderTest a("A", options);
|
||||
EXPECT_EQ(a.socket.Send(
|
||||
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
|
||||
kSendOptions),
|
||||
SendStatus::kSuccess);
|
||||
EXPECT_EQ(a.socket.Send(
|
||||
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
|
||||
kSendOptions),
|
||||
SendStatus::kSuccess);
|
||||
EXPECT_EQ(a.socket.Send(
|
||||
DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
|
||||
kSendOptions),
|
||||
SendStatus::kErrorResourceExhaustion);
|
||||
// The per-stream limit for SID=1 is reached, but not SID=2.
|
||||
EXPECT_EQ(a.socket.Send(
|
||||
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
|
||||
kSendOptions),
|
||||
SendStatus::kSuccess);
|
||||
EXPECT_EQ(a.socket.Send(
|
||||
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
|
||||
kSendOptions),
|
||||
SendStatus::kSuccess);
|
||||
EXPECT_EQ(a.socket.Send(
|
||||
DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
|
||||
kSendOptions),
|
||||
SendStatus::kErrorResourceExhaustion);
|
||||
}
|
||||
|
||||
TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) {
|
||||
SocketUnderTest a("A");
|
||||
auto z = std::make_unique<SocketUnderTest>("Z");
|
||||
|
||||
@ -35,11 +35,13 @@ using ::webrtc::Timestamp;
|
||||
|
||||
RRSendQueue::RRSendQueue(absl::string_view log_prefix,
|
||||
DcSctpSocketCallbacks* callbacks,
|
||||
size_t buffer_size,
|
||||
size_t mtu,
|
||||
StreamPriority default_priority,
|
||||
size_t total_buffered_amount_low_threshold)
|
||||
: log_prefix_(log_prefix),
|
||||
callbacks_(*callbacks),
|
||||
buffer_size_(buffer_size),
|
||||
default_priority_(default_priority),
|
||||
scheduler_(log_prefix_, mtu),
|
||||
total_buffered_amount_(
|
||||
@ -377,6 +379,10 @@ void RRSendQueue::Add(Timestamp now,
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
bool RRSendQueue::IsFull() const {
|
||||
return total_buffered_amount() >= buffer_size_;
|
||||
}
|
||||
|
||||
bool RRSendQueue::IsEmpty() const {
|
||||
return total_buffered_amount() == 0;
|
||||
}
|
||||
|
||||
@ -56,6 +56,7 @@ class RRSendQueue : public SendQueue {
|
||||
public:
|
||||
RRSendQueue(absl::string_view log_prefix,
|
||||
DcSctpSocketCallbacks* callbacks,
|
||||
size_t buffer_size,
|
||||
size_t mtu,
|
||||
StreamPriority default_priority,
|
||||
size_t total_buffered_amount_low_threshold);
|
||||
@ -270,6 +271,7 @@ class RRSendQueue : public SendQueue {
|
||||
|
||||
const absl::string_view log_prefix_;
|
||||
DcSctpSocketCallbacks& callbacks_;
|
||||
const size_t buffer_size_;
|
||||
const StreamPriority default_priority_;
|
||||
OutgoingMessageId current_message_id = OutgoingMessageId(0);
|
||||
StreamScheduler scheduler_;
|
||||
|
||||
@ -35,6 +35,7 @@ using ::webrtc::Timestamp;
|
||||
constexpr Timestamp kNow = Timestamp::Zero();
|
||||
constexpr StreamID kStreamID(1);
|
||||
constexpr PPID kPPID(53);
|
||||
constexpr size_t kMaxQueueSize = 1000;
|
||||
constexpr StreamPriority kDefaultPriority(10);
|
||||
constexpr size_t kBufferedAmountLowThreshold = 500;
|
||||
constexpr size_t kOneFragmentPacketSize = 100;
|
||||
@ -46,7 +47,7 @@ class RRSendQueueTest : public testing::Test {
|
||||
RRSendQueueTest()
|
||||
: buf_("log: ",
|
||||
&callbacks_,
|
||||
|
||||
kMaxQueueSize,
|
||||
kMtu,
|
||||
kDefaultPriority,
|
||||
kBufferedAmountLowThreshold) {}
|
||||
@ -59,12 +60,14 @@ class RRSendQueueTest : public testing::Test {
|
||||
TEST_F(RRSendQueueTest, EmptyBuffer) {
|
||||
EXPECT_TRUE(buf_.IsEmpty());
|
||||
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
|
||||
|
||||
EXPECT_FALSE(buf_.IsEmpty());
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
absl::optional<SendQueue::DataToSend> chunk_opt =
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize);
|
||||
ASSERT_TRUE(chunk_opt.has_value());
|
||||
@ -121,30 +124,30 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
|
||||
|
||||
TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
|
||||
std::vector<uint8_t> payload(600);
|
||||
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
|
||||
EXPECT_GE(buf_.total_buffered_amount(), 1000u);
|
||||
EXPECT_TRUE(buf_.IsFull());
|
||||
// However, it's still possible to add messages. It's a soft limit, and it
|
||||
// might be necessary to forcefully add messages due to e.g. external
|
||||
// fragmentation.
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
|
||||
EXPECT_GE(buf_.total_buffered_amount(), 1000u);
|
||||
EXPECT_TRUE(buf_.IsFull());
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
||||
|
||||
EXPECT_GE(buf_.total_buffered_amount(), 1000u);
|
||||
EXPECT_TRUE(buf_.IsFull());
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
||||
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
||||
|
||||
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
EXPECT_FALSE(buf_.IsEmpty());
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
|
||||
@ -152,7 +155,7 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
|
||||
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
|
||||
EXPECT_EQ(chunk_three->data.ppid, PPID(55));
|
||||
|
||||
EXPECT_LT(buf_.total_buffered_amount(), 1000u);
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
EXPECT_TRUE(buf_.IsEmpty());
|
||||
}
|
||||
|
||||
@ -810,7 +813,7 @@ TEST_F(RRSendQueueTest, WillHandoverPriority) {
|
||||
DcSctpSocketHandoverState state;
|
||||
buf_.AddHandoverState(state);
|
||||
|
||||
RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority,
|
||||
RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority,
|
||||
kBufferedAmountLowThreshold);
|
||||
q2.RestoreFromState(state);
|
||||
EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user