dcsctp: Refactor send queue (1/2)

Let the OutgoingStream reference the parent instead of passing
references to individual items it needs, as follow-up CLs will add even
more items.

No functional change - pure refactoring.

Bug: webrtc:5696
Change-Id: I914e590c0d90e898d7d230a16170cf4faff2338c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264142
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37398}
This commit is contained in:
Victor Boivie 2022-05-24 21:56:28 +02:00 committed by WebRTC LUCI CQ
parent 74680c0234
commit 8967672f6d
2 changed files with 17 additions and 20 deletions

View File

@ -123,7 +123,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
const SendOptions& send_options) { const SendOptions& send_options) {
bool was_active = bytes_to_send_in_next_message() > 0; bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size()); buffered_amount_.Increase(message.payload().size());
total_buffered_amount_.Increase(message.payload().size()); parent_.total_buffered_amount_.Increase(message.payload().size());
items_.emplace_back(std::move(message), expires_at, send_options); items_.emplace_back(std::move(message), expires_at, send_options);
if (!was_active) { if (!was_active) {
@ -148,7 +148,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
// Oops, this entire message has already expired. Try the next one. // Oops, this entire message has already expired. Try the next one.
if (item.expires_at <= now) { if (item.expires_at <= now) {
buffered_amount_.Decrease(item.remaining_size); buffered_amount_.Decrease(item.remaining_size);
total_buffered_amount_.Decrease(item.remaining_size); parent_.total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front(); items_.pop_front();
continue; continue;
} }
@ -184,7 +184,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
FSN fsn(item.current_fsn); FSN fsn(item.current_fsn);
item.current_fsn = FSN(*item.current_fsn + 1); item.current_fsn = FSN(*item.current_fsn + 1);
buffered_amount_.Decrease(payload.size()); buffered_amount_.Decrease(payload.size());
total_buffered_amount_.Decrease(payload.size()); parent_.total_buffered_amount_.Decrease(payload.size());
SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)), SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)),
item.message_id.value(), fsn, ppid, item.message_id.value(), fsn, ppid,
@ -232,7 +232,7 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
if (item.send_options.unordered == unordered && if (item.send_options.unordered == unordered &&
item.message_id.has_value() && *item.message_id == message_id) { item.message_id.has_value() && *item.message_id == message_id) {
buffered_amount_.Decrease(item.remaining_size); buffered_amount_.Decrease(item.remaining_size);
total_buffered_amount_.Decrease(item.remaining_size); parent_.total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front(); items_.pop_front();
// Only partially sent messages are discarded, so if a message was // Only partially sent messages are discarded, so if a message was
@ -278,7 +278,7 @@ void RRSendQueue::OutgoingStream::Pause() {
for (auto it = items_.begin(); it != items_.end();) { for (auto it = items_.begin(); it != items_.end();) {
if (it->remaining_offset == 0) { if (it->remaining_offset == 0) {
buffered_amount_.Decrease(it->remaining_size); buffered_amount_.Decrease(it->remaining_size);
total_buffered_amount_.Decrease(it->remaining_size); parent_.total_buffered_amount_.Decrease(it->remaining_size);
it = items_.erase(it); it = items_.erase(it);
} else { } else {
++it; ++it;
@ -320,7 +320,7 @@ void RRSendQueue::OutgoingStream::Reset() {
auto& item = items_.front(); auto& item = items_.front();
buffered_amount_.Increase(item.message.payload().size() - buffered_amount_.Increase(item.message.payload().size() -
item.remaining_size); item.remaining_size);
total_buffered_amount_.Increase(item.message.payload().size() - parent_.total_buffered_amount_.Increase(item.message.payload().size() -
item.remaining_size); item.remaining_size);
item.remaining_offset = 0; item.remaining_offset = 0;
item.remaining_size = item.message.payload().size(); item.remaining_size = item.message.payload().size();
@ -474,9 +474,8 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
return streams_ return streams_
.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple( std::forward_as_tuple(
&scheduler_, stream_id, default_priority_, this, &scheduler_, stream_id, default_priority_,
[this, stream_id]() { on_buffered_amount_low_(stream_id); }, [this, stream_id]() { on_buffered_amount_low_(stream_id); }))
total_buffered_amount_))
.first->second; .first->second;
} }
@ -520,9 +519,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
streams_.emplace( streams_.emplace(
std::piecewise_construct, std::forward_as_tuple(stream_id), std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple( std::forward_as_tuple(
&scheduler_, stream_id, StreamPriority(state_stream.priority), this, &scheduler_, stream_id, StreamPriority(state_stream.priority),
[this, stream_id]() { on_buffered_amount_low_(stream_id); }, [this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_, &state_stream)); &state_stream));
} }
} }
} // namespace dcsctp } // namespace dcsctp

View File

@ -120,18 +120,18 @@ class RRSendQueue : public SendQueue {
class OutgoingStream : public StreamScheduler::StreamProducer { class OutgoingStream : public StreamScheduler::StreamProducer {
public: public:
OutgoingStream( OutgoingStream(
RRSendQueue* parent,
StreamScheduler* scheduler, StreamScheduler* scheduler,
StreamID stream_id, StreamID stream_id,
StreamPriority priority, StreamPriority priority,
std::function<void()> on_buffered_amount_low, std::function<void()> on_buffered_amount_low,
ThresholdWatcher& total_buffered_amount,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
: scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)), : parent_(*parent),
scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)),
buffered_amount_(std::move(on_buffered_amount_low)), buffered_amount_(std::move(on_buffered_amount_low)) {}
total_buffered_amount_(total_buffered_amount) {}
StreamID stream_id() const { return scheduler_stream_->stream_id(); } StreamID stream_id() const { return scheduler_stream_->stream_id(); }
@ -230,6 +230,8 @@ class RRSendQueue : public SendQueue {
bool IsConsistent() const; bool IsConsistent() const;
RRSendQueue& parent_;
const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_; const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
PauseState pause_state_ = PauseState::kNotPaused; PauseState pause_state_ = PauseState::kNotPaused;
@ -243,10 +245,6 @@ class RRSendQueue : public SendQueue {
// The current amount of buffered data. // The current amount of buffered data.
ThresholdWatcher buffered_amount_; ThresholdWatcher buffered_amount_;
// Reference to the total buffered amount, which is updated directly by each
// stream.
ThresholdWatcher& total_buffered_amount_;
}; };
bool IsConsistent() const; bool IsConsistent() const;