From 791adafa09a1fdf6122e3f5b45c1e397bc6223a0 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 20 May 2021 17:21:24 +0200 Subject: [PATCH] dcsctp: Add OnBufferedAmountLow in Send Queue This adds the necessary properties and callback to the Send Queue to support the bufferedAmount & bufferedAmountLowThreshold properties and the bufferedamountlow event in RTCDataChannel. The public API changes and socket support comes in a follow-up CL. Bug: webrtc:12794 Change-Id: I12a16f44f775da3711f3aa52a68a0bf24f70d2f8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219690 Reviewed-by: Harald Alvestrand Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/master@{#34142} --- net/dcsctp/socket/dcsctp_socket.cc | 4 +- net/dcsctp/tx/mock_send_queue.h | 9 ++ net/dcsctp/tx/rr_send_queue.cc | 93 +++++++++++++--- net/dcsctp/tx/rr_send_queue.h | 47 +++++++- net/dcsctp/tx/rr_send_queue_test.cc | 161 +++++++++++++++++++++++++++- net/dcsctp/tx/send_queue.h | 11 ++ 6 files changed, 303 insertions(+), 22 deletions(-) diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 174288eeb3..b8d07c7600 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -167,7 +167,9 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, TimerOptions(options.t2_shutdown_timeout, TimerBackoffAlgorithm::kExponential, options.max_retransmissions))), - send_queue_(log_prefix_, options_.max_send_buffer_size) {} + send_queue_(log_prefix_, + options_.max_send_buffer_size, + [](StreamID stream_id) {}) {} std::string DcSctpSocket::log_prefix() const { return log_prefix_ + "[" + std::string(ToString(state_)) + "] "; diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h index 8c19262c25..73f1bd0314 100644 --- a/net/dcsctp/tx/mock_send_queue.h +++ b/net/dcsctp/tx/mock_send_queue.h @@ -43,6 +43,15 @@ class MockSendQueue : public SendQueue { MOCK_METHOD(void, CommitResetStreams, (), (override)); MOCK_METHOD(void, RollbackResetStreams, (), (override)); MOCK_METHOD(void, Reset, (), (override)); + MOCK_METHOD(size_t, buffered_amount, (StreamID stream_id), (const, override)); + MOCK_METHOD(size_t, + buffered_amount_low_threshold, + (StreamID stream_id), + (const, override)); + MOCK_METHOD(void, + SetBufferedAmountLowThreshold, + (StreamID stream_id, size_t bytes), + (override)); }; } // namespace dcsctp diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index c7303221b0..77bb3168b4 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -38,19 +38,51 @@ RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) { if (!item.message_id.has_value() && item.expires_at.has_value() && *item.expires_at <= now) { // TODO(boivie): This should be reported to the client. + buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); continue; } + RTC_DCHECK(IsConsistent()); return &item; } + RTC_DCHECK(IsConsistent()); return nullptr; } +bool RRSendQueue::OutgoingStream::IsConsistent() const { + size_t bytes = 0; + for (const auto& item : items_) { + bytes += item.remaining_size; + } + return bytes == buffered_amount_.value(); +} + +void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) { + RTC_DCHECK(bytes <= value_); + size_t old_value = value_; + value_ -= bytes; + + if (old_value > low_threshold_ && value_ <= low_threshold_) { + on_threshold_reached_(); + } +} + +void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) { + // Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted. + if (low_threshold_ < value_ && low_threshold >= value_) { + on_threshold_reached_(); + } + low_threshold_ = low_threshold; +} + void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, absl::optional expires_at, const SendOptions& send_options) { + buffered_amount_.Increase(message.payload().size()); items_.emplace_back(std::move(message), expires_at, send_options); + + RTC_DCHECK(IsConsistent()); } absl::optional RRSendQueue::OutgoingStream::Produce( @@ -58,18 +90,21 @@ absl::optional RRSendQueue::OutgoingStream::Produce( size_t max_size) { Item* item = GetFirstNonExpiredMessage(now); if (item == nullptr) { + RTC_DCHECK(IsConsistent()); return absl::nullopt; } // If a stream is paused, it will allow sending all partially sent messages // but will not start sending new fragments of completely unsent messages. if (is_paused_ && !item->message_id.has_value()) { + RTC_DCHECK(IsConsistent()); return absl::nullopt; } DcSctpMessage& message = item->message; if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) { + RTC_DCHECK(IsConsistent()); return absl::nullopt; } @@ -105,6 +140,7 @@ absl::optional RRSendQueue::OutgoingStream::Produce( FSN fsn(item->current_fsn); item->current_fsn = FSN(*item->current_fsn + 1); + buffered_amount_.Decrease(payload.size()); SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)), item->message_id.value(), fsn, ppid, @@ -124,47 +160,45 @@ absl::optional RRSendQueue::OutgoingStream::Produce( item->message.payload().size()); RTC_DCHECK(item->remaining_size > 0); } + RTC_DCHECK(IsConsistent()); return chunk; } -size_t RRSendQueue::OutgoingStream::buffered_amount() const { - size_t bytes = 0; - for (const auto& item : items_) { - bytes += item.remaining_size; - } - return bytes; -} - bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, MID message_id) { + bool result = false; if (!items_.empty()) { Item& item = items_.front(); if (item.send_options.unordered == unordered && item.message_id.has_value() && *item.message_id == message_id) { + buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); // As the item still existed, it had unsent data. - return true; + result = true; } } - return false; + RTC_DCHECK(IsConsistent()); + return result; } void RRSendQueue::OutgoingStream::Pause() { is_paused_ = true; - // A stream is pause when it's about to be reset. In this implementation, + // A stream is paused when it's about to be reset. In this implementation, // it will throw away all non-partially send messages. This is subject to // change. It will however not discard any partially sent messages - only // whole messages. Partially delivered messages (at the time of receiving a - // Stream Reset command) will always deliver all the fragments before actually - // resetting the stream. + // Stream Reset command) will always deliver all the fragments before + // actually resetting the stream. for (auto it = items_.begin(); it != items_.end();) { if (it->remaining_offset == 0) { + buffered_amount_.Decrease(it->remaining_size); it = items_.erase(it); } else { ++it; } } + RTC_DCHECK(IsConsistent()); } void RRSendQueue::OutgoingStream::Reset() { @@ -172,6 +206,8 @@ void RRSendQueue::OutgoingStream::Reset() { // If this message has been partially sent, reset it so that it will be // re-sent. auto& item = items_.front(); + buffered_amount_.Increase(item.message.payload().size() - + item.remaining_size); item.remaining_offset = 0; item.remaining_size = item.message.payload().size(); item.message_id = absl::nullopt; @@ -182,6 +218,7 @@ void RRSendQueue::OutgoingStream::Reset() { next_ordered_mid_ = MID(0); next_unordered_mid_ = MID(0); next_ssn_ = SSN(0); + RTC_DCHECK(IsConsistent()); } bool RRSendQueue::OutgoingStream::has_partially_sent_message() const { @@ -209,11 +246,11 @@ void RRSendQueue::Add(TimeMs now, } size_t RRSendQueue::total_bytes() const { - // TODO(boivie): Have the current size as a member variable, so that's it not + // TODO(boivie): Have the current size as a member variable, so that it's not // calculated for every operation. size_t bytes = 0; for (const auto& stream : streams_) { - bytes += stream.second.buffered_amount(); + bytes += stream.second.buffered_amount().value(); } return bytes; @@ -306,6 +343,27 @@ void RRSendQueue::Reset() { } } +size_t RRSendQueue::buffered_amount(StreamID stream_id) const { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + return 0; + } + return it->second.buffered_amount().value(); +} + +size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + return 0; + } + return it->second.buffered_amount().low_threshold(); +} + +void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id, + size_t bytes) { + GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes); +} + RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( StreamID stream_id) { auto it = streams_.find(stream_id); @@ -313,6 +371,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( return it->second; } - return streams_.emplace(stream_id, OutgoingStream()).first->second; + return streams_ + .emplace(stream_id, + [this, stream_id]() { on_buffered_amount_low_(stream_id); }) + .first->second; } } // namespace dcsctp diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index 2b9389f68f..d7fcc9542e 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -44,9 +44,12 @@ class RRSendQueue : public SendQueue { // How small a data chunk's payload may be, if having to fragment a message. static constexpr size_t kMinimumFragmentedPayload = 10; - RRSendQueue(absl::string_view log_prefix, size_t buffer_size) + RRSendQueue(absl::string_view log_prefix, + size_t buffer_size, + std::function on_buffered_amount_low) : log_prefix_(std::string(log_prefix) + "fcfs: "), - buffer_size_(buffer_size) {} + buffer_size_(buffer_size), + on_buffered_amount_low_(std::move(on_buffered_amount_low)) {} // Indicates if the buffer is full. Note that it's up to the caller to ensure // that the buffer is not full prior to adding new items to it. @@ -72,14 +75,43 @@ class RRSendQueue : public SendQueue { void CommitResetStreams() override; void RollbackResetStreams() override; void Reset() override; + size_t buffered_amount(StreamID stream_id) const override; + size_t buffered_amount_low_threshold(StreamID stream_id) const override; + void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override; // The size of the buffer, in "payload bytes". size_t total_bytes() const; private: + // Represents a value and a "low threshold" that when the value reaches or + // goes under the "low threshold", will trigger `on_threshold_reached` + // callback. + class ThresholdWatcher { + public: + explicit ThresholdWatcher(std::function on_threshold_reached) + : on_threshold_reached_(std::move(on_threshold_reached)) {} + // Increases the value. + void Increase(size_t bytes) { value_ += bytes; } + // Decreases the value and triggers `on_threshold_reached` if it's at or + // below `low_threshold()`. + void Decrease(size_t bytes); + + size_t value() const { return value_; } + size_t low_threshold() const { return low_threshold_; } + void SetLowThreshold(size_t low_threshold); + + private: + const std::function on_threshold_reached_; + size_t value_ = 0; + size_t low_threshold_ = 0; + }; + // Per-stream information. class OutgoingStream { public: + explicit OutgoingStream(std::function on_buffered_amount_low) + : buffered_amount_(std::move(on_buffered_amount_low)) {} + // Enqueues a message to this stream. void Add(DcSctpMessage message, absl::optional expires_at, @@ -88,8 +120,8 @@ class RRSendQueue : public SendQueue { // Possibly produces a data chunk to send. absl::optional Produce(TimeMs now, size_t max_size); - // The amount of data enqueued on this stream. - size_t buffered_amount() const; + const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } + ThresholdWatcher& buffered_amount() { return buffered_amount_; } // Discards a partially sent message, see `SendQueue::Discard`. bool Discard(IsUnordered unordered, MID message_id); @@ -136,6 +168,7 @@ class RRSendQueue : public SendQueue { // Returns the first non-expired message, or nullptr if there isn't one. Item* GetFirstNonExpiredMessage(TimeMs now); + bool IsConsistent() const; // Streams are pause when they are about to be reset. bool is_paused_ = false; @@ -146,6 +179,9 @@ class RRSendQueue : public SendQueue { SSN next_ssn_ = SSN(0); // Enqueued messages, and metadata. std::deque items_; + + // The current amount of buffered data. + ThresholdWatcher buffered_amount_; }; OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); @@ -156,6 +192,9 @@ class RRSendQueue : public SendQueue { const std::string log_prefix_; const size_t buffer_size_; + // Called when the buffered amount is below what has been set using + // `SetBufferedAmountLowThreshold`. + const std::function on_buffered_amount_low_; // The next stream to send chunks from. StreamID next_stream_id_ = StreamID(0); diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 5e99bb4baf..3bc748caac 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -36,9 +36,12 @@ constexpr size_t kTwoFragmentPacketSize = 101; class RRSendQueueTest : public testing::Test { protected: - RRSendQueueTest() : buf_("log: ", kMaxQueueSize) {} + RRSendQueueTest() + : buf_("log: ", kMaxQueueSize, on_buffered_amount_low_.AsStdFunction()) {} const DcSctpOptions options_; + testing::NiceMock> + on_buffered_amount_low_; RRSendQueue buf_; }; @@ -474,5 +477,161 @@ TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) { EXPECT_EQ(chunk8.data.stream_id, StreamID(4)); EXPECT_THAT(chunk8.data.payload, SizeIs(8)); } + +TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u); +} + +TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) { + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); +} + +TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) { + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(1)); + + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u); + + // Should now trigger again, as buffer_amount went above the threshold. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(1)); +} + +TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) { + buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(10))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u); + + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(10)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(20))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(20)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); +} + +TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.SetBufferedAmountLowThreshold(StreamID(1), 700); + + std::vector payload(1000); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u); + + // Doesn't trigger when reducing even further. + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); +} + +TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.SetBufferedAmountLowThreshold(StreamID(1), 700); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1000))); + + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, 400)); + EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk1.data.payload, SizeIs(400)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); + + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(200))); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u); + + // Will trigger again, as it went above the limit. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, 200)); + EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); + EXPECT_THAT(chunk2.data.payload, SizeIs(200)); + EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); +} + +TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) { + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + + buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(100))); + + // Modifying the threshold, still under buffered_amount, should not trigger. + buf_.SetBufferedAmountLowThreshold(StreamID(1), 50); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 99); + + // When the threshold reaches buffered_amount, it will trigger. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 100); + + // But not when it's set low again. + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 50); + + // But it will trigger when it overshoots. + EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1))); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 150); + + // But not when it's set low again. + EXPECT_CALL(on_buffered_amount_low_, Call).Times(0); + buf_.SetBufferedAmountLowThreshold(StreamID(1), 0); +} + } // namespace } // namespace dcsctp diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h index f48c0c75a6..67bd294768 100644 --- a/net/dcsctp/tx/send_queue.h +++ b/net/dcsctp/tx/send_queue.h @@ -108,6 +108,17 @@ class SendQueue { // of data loss. However, data loss cannot be completely guaranteed when a // peer restarts. virtual void Reset() = 0; + + // Returns the amount of buffered data. This doesn't include packets that are + // e.g. inflight. + virtual size_t buffered_amount(StreamID stream_id) const = 0; + + // Returns the limit for the `OnBufferedAmountLow` event. Default value is 0. + virtual size_t buffered_amount_low_threshold(StreamID stream_id) const = 0; + + // Sets a limit for the `OnBufferedAmountLow` event. + virtual void SetBufferedAmountLowThreshold(StreamID stream_id, + size_t bytes) = 0; }; } // namespace dcsctp