diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 174d19b77c..bee9d515b8 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -123,7 +123,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, const SendOptions& send_options) { bool was_active = bytes_to_send_in_next_message() > 0; buffered_amount_.Increase(message.payload().size()); - total_buffered_amount_.Increase(message.payload().size()); + parent_.total_buffered_amount_.Increase(message.payload().size()); items_.emplace_back(std::move(message), expires_at, send_options); if (!was_active) { @@ -148,7 +148,7 @@ absl::optional RRSendQueue::OutgoingStream::Produce( // Oops, this entire message has already expired. Try the next one. if (item.expires_at <= now) { buffered_amount_.Decrease(item.remaining_size); - total_buffered_amount_.Decrease(item.remaining_size); + parent_.total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); continue; } @@ -184,7 +184,7 @@ absl::optional RRSendQueue::OutgoingStream::Produce( FSN fsn(item.current_fsn); item.current_fsn = FSN(*item.current_fsn + 1); buffered_amount_.Decrease(payload.size()); - total_buffered_amount_.Decrease(payload.size()); + parent_.total_buffered_amount_.Decrease(payload.size()); SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)), item.message_id.value(), fsn, ppid, @@ -232,7 +232,7 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, if (item.send_options.unordered == unordered && item.message_id.has_value() && *item.message_id == message_id) { buffered_amount_.Decrease(item.remaining_size); - total_buffered_amount_.Decrease(item.remaining_size); + parent_.total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); // Only partially sent messages are discarded, so if a message was @@ -278,7 +278,7 @@ void RRSendQueue::OutgoingStream::Pause() { for (auto it = items_.begin(); it != items_.end();) { if (it->remaining_offset == 0) { buffered_amount_.Decrease(it->remaining_size); - total_buffered_amount_.Decrease(it->remaining_size); + parent_.total_buffered_amount_.Decrease(it->remaining_size); it = items_.erase(it); } else { ++it; @@ -320,8 +320,8 @@ void RRSendQueue::OutgoingStream::Reset() { auto& item = items_.front(); buffered_amount_.Increase(item.message.payload().size() - item.remaining_size); - total_buffered_amount_.Increase(item.message.payload().size() - - item.remaining_size); + parent_.total_buffered_amount_.Increase(item.message.payload().size() - + item.remaining_size); item.remaining_offset = 0; item.remaining_size = item.message.payload().size(); item.message_id = absl::nullopt; @@ -474,9 +474,8 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( return streams_ .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), std::forward_as_tuple( - &scheduler_, stream_id, default_priority_, - [this, stream_id]() { on_buffered_amount_low_(stream_id); }, - total_buffered_amount_)) + this, &scheduler_, stream_id, default_priority_, + [this, stream_id]() { on_buffered_amount_low_(stream_id); })) .first->second; } @@ -520,9 +519,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { streams_.emplace( std::piecewise_construct, std::forward_as_tuple(stream_id), std::forward_as_tuple( - &scheduler_, stream_id, StreamPriority(state_stream.priority), + this, &scheduler_, stream_id, StreamPriority(state_stream.priority), [this, stream_id]() { on_buffered_amount_low_(stream_id); }, - total_buffered_amount_, &state_stream)); + &state_stream)); } } } // namespace dcsctp diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index 49c36feab5..8e6085f6b8 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -120,18 +120,18 @@ class RRSendQueue : public SendQueue { class OutgoingStream : public StreamScheduler::StreamProducer { public: OutgoingStream( + RRSendQueue* parent, StreamScheduler* scheduler, StreamID stream_id, StreamPriority priority, std::function on_buffered_amount_low, - ThresholdWatcher& total_buffered_amount, const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) - : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)), + : parent_(*parent), + scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)), next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)), - buffered_amount_(std::move(on_buffered_amount_low)), - total_buffered_amount_(total_buffered_amount) {} + buffered_amount_(std::move(on_buffered_amount_low)) {} StreamID stream_id() const { return scheduler_stream_->stream_id(); } @@ -230,6 +230,8 @@ class RRSendQueue : public SendQueue { bool IsConsistent() const; + RRSendQueue& parent_; + const std::unique_ptr scheduler_stream_; PauseState pause_state_ = PauseState::kNotPaused; @@ -243,10 +245,6 @@ class RRSendQueue : public SendQueue { // The current amount of buffered data. ThresholdWatcher buffered_amount_; - - // Reference to the total buffered amount, which is updated directly by each - // stream. - ThresholdWatcher& total_buffered_amount_; }; bool IsConsistent() const;