diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 3a2166b813..6127da4e77 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -42,34 +42,18 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); } -bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { +bool RRSendQueue::OutgoingStream::HasDataToSend() const { if (pause_state_ == PauseState::kPaused || pause_state_ == PauseState::kResetting) { // The stream has paused (and there is no partially sent message). return false; } - while (!items_.empty()) { - RRSendQueue::OutgoingStream::Item& item = items_.front(); - if (item.message_id.has_value()) { - // Already partially sent messages can always continue to be sent. This - // ensures e.g. that paused streams with partially sent messages get to - // send the partial message in full before resetting. - return true; - } - - // Message has expired. Remove it and inspect the next one. - if (item.expires_at <= now) { - buffered_amount_.Decrease(item.remaining_size); - total_buffered_amount_.Decrease(item.remaining_size); - items_.pop_front(); - RTC_DCHECK(IsConsistent()); - continue; - } - - return true; + if (items_.empty()) { + return false; } - return false; + + return true; } void RRSendQueue::OutgoingStream::AddHandoverState( @@ -141,83 +125,95 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, RTC_DCHECK(IsConsistent()); } -SendQueue::DataToSend RRSendQueue::OutgoingStream::Produce(TimeMs now, - size_t max_size) { - RTC_DCHECK(!items_.empty()); +absl::optional RRSendQueue::OutgoingStream::Produce( + TimeMs now, + size_t max_size) { RTC_DCHECK(pause_state_ != PauseState::kPaused && pause_state_ != PauseState::kResetting); - Item* item = &items_.front(); - DcSctpMessage& message = item->message; + while (!items_.empty()) { + Item& item = items_.front(); + DcSctpMessage& message = item.message; - // Allocate Message ID and SSN when the first fragment is sent. - if (!item->message_id.has_value()) { - MID& mid = - item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_; - item->message_id = mid; - mid = MID(*mid + 1); - } - if (!item->send_options.unordered && !item->ssn.has_value()) { - item->ssn = next_ssn_; - next_ssn_ = SSN(*next_ssn_ + 1); - } + // Allocate Message ID and SSN when the first fragment is sent. + if (!item.message_id.has_value()) { + // Oops, this entire message has already expired. Try the next one. + if (item.expires_at <= now) { + buffered_amount_.Decrease(item.remaining_size); + total_buffered_amount_.Decrease(item.remaining_size); + items_.pop_front(); + continue; + } - // Grab the next `max_size` fragment from this message and calculate flags. - rtc::ArrayView chunk_payload = - item->message.payload().subview(item->remaining_offset, max_size); - rtc::ArrayView message_payload = message.payload(); - Data::IsBeginning is_beginning(chunk_payload.data() == - message_payload.data()); - Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) == - (message_payload.data() + message_payload.size())); - - StreamID stream_id = message.stream_id(); - PPID ppid = message.ppid(); - - // Zero-copy the payload if the message fits in a single chunk. - std::vector payload = - is_beginning && is_end - ? std::move(message).ReleasePayload() - : std::vector(chunk_payload.begin(), chunk_payload.end()); - - FSN fsn(item->current_fsn); - item->current_fsn = FSN(*item->current_fsn + 1); - buffered_amount_.Decrease(payload.size()); - total_buffered_amount_.Decrease(payload.size()); - - SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)), - item->message_id.value(), fsn, ppid, - std::move(payload), is_beginning, is_end, - item->send_options.unordered)); - if (item->send_options.max_retransmissions.has_value() && - *item->send_options.max_retransmissions >= - std::numeric_limits::min() && - *item->send_options.max_retransmissions <= - std::numeric_limits::max()) { - chunk.max_retransmissions = - MaxRetransmits(*item->send_options.max_retransmissions); - } - chunk.expires_at = item->expires_at; - - if (is_end) { - // The entire message has been sent, and its last data copied to `chunk`, so - // it can safely be discarded. - items_.pop_front(); - - if (pause_state_ == PauseState::kPending) { - RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id - << " is moving from pending to paused"; - pause_state_ = PauseState::kPaused; + MID& mid = + item.send_options.unordered ? next_unordered_mid_ : next_ordered_mid_; + item.message_id = mid; + mid = MID(*mid + 1); } - } else { - item->remaining_offset += chunk_payload.size(); - item->remaining_size -= chunk_payload.size(); - RTC_DCHECK(item->remaining_offset + item->remaining_size == - item->message.payload().size()); - RTC_DCHECK(item->remaining_size > 0); + if (!item.send_options.unordered && !item.ssn.has_value()) { + item.ssn = next_ssn_; + next_ssn_ = SSN(*next_ssn_ + 1); + } + + // Grab the next `max_size` fragment from this message and calculate flags. + rtc::ArrayView chunk_payload = + item.message.payload().subview(item.remaining_offset, max_size); + rtc::ArrayView message_payload = message.payload(); + Data::IsBeginning is_beginning(chunk_payload.data() == + message_payload.data()); + Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) == + (message_payload.data() + message_payload.size())); + + StreamID stream_id = message.stream_id(); + PPID ppid = message.ppid(); + + // Zero-copy the payload if the message fits in a single chunk. + std::vector payload = + is_beginning && is_end + ? std::move(message).ReleasePayload() + : std::vector(chunk_payload.begin(), chunk_payload.end()); + + FSN fsn(item.current_fsn); + item.current_fsn = FSN(*item.current_fsn + 1); + buffered_amount_.Decrease(payload.size()); + total_buffered_amount_.Decrease(payload.size()); + + SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)), + item.message_id.value(), fsn, ppid, + std::move(payload), is_beginning, is_end, + item.send_options.unordered)); + if (item.send_options.max_retransmissions.has_value() && + *item.send_options.max_retransmissions >= + std::numeric_limits::min() && + *item.send_options.max_retransmissions <= + std::numeric_limits::max()) { + chunk.max_retransmissions = + MaxRetransmits(*item.send_options.max_retransmissions); + } + chunk.expires_at = item.expires_at; + + if (is_end) { + // The entire message has been sent, and its last data copied to `chunk`, + // so it can safely be discarded. + items_.pop_front(); + + if (pause_state_ == PauseState::kPending) { + RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id + << " is moving from pending to paused"; + pause_state_ = PauseState::kPaused; + } + } else { + item.remaining_offset += chunk_payload.size(); + item.remaining_size -= chunk_payload.size(); + RTC_DCHECK(item.remaining_offset + item.remaining_size == + item.message.payload().size()); + RTC_DCHECK(item.remaining_size > 0); + } + RTC_DCHECK(IsConsistent()); + return chunk; } RTC_DCHECK(IsConsistent()); - return chunk; + return absl::nullopt; } bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, @@ -355,18 +351,18 @@ bool RRSendQueue::IsEmpty() const { } std::map::iterator -RRSendQueue::GetNextStream(TimeMs now) { +RRSendQueue::GetNextStream() { auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1)); for (auto it = start_it; it != streams_.end(); ++it) { - if (it->second.HasDataToSend(now)) { + if (it->second.HasDataToSend()) { current_stream_id_ = it->first; return it; } } for (auto it = streams_.begin(); it != start_it; ++it) { - if (it->second.HasDataToSend(now)) { + if (it->second.HasDataToSend()) { current_stream_id_ = it->first; return it; } @@ -378,39 +374,43 @@ absl::optional RRSendQueue::Produce(TimeMs now, size_t max_size) { std::map::iterator stream_it; - if (previous_message_has_ended_) { - // Previous message has ended. Round-robin to a different stream, if there - // even is one with data to send. - stream_it = GetNextStream(now); - if (stream_it == streams_.end()) { - RTC_DLOG(LS_VERBOSE) - << log_prefix_ - << "There is no stream with data; Can't produce any data."; - return absl::nullopt; + for (;;) { + if (previous_message_has_ended_) { + // Previous message has ended. Round-robin to a different stream, if there + // even is one with data to send. + stream_it = GetNextStream(); + if (stream_it == streams_.end()) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "There is no stream with data; Can't produce any data."; + return absl::nullopt; + } + } else { + // The previous message has not ended; Continue from the current stream. + stream_it = streams_.find(current_stream_id_); + RTC_DCHECK(stream_it != streams_.end()); } - } else { - // The previous message has not ended; Continue from the current stream. - stream_it = streams_.find(current_stream_id_); - RTC_DCHECK(stream_it != streams_.end()); + + absl::optional data = stream_it->second.Produce(now, max_size); + if (!data.has_value()) { + continue; + } + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning ? "first" + : *data->data.is_end ? "last" + : "middle") + << ", stream_id=" << *stream_it->first + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + previous_message_has_ended_ = *data->data.is_end; + RTC_DCHECK(IsConsistent()); + return data; } - - DataToSend data = stream_it->second.Produce(now, max_size); - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type=" - << (data.data.is_unordered ? "unordered" : "ordered") - << "::" - << (*data.data.is_beginning && *data.data.is_end - ? "complete" - : *data.data.is_beginning - ? "first" - : *data.data.is_end ? "last" : "middle") - << ", stream_id=" << *stream_it->first - << ", ppid=" << *data.data.ppid - << ", length=" << data.data.payload.size(); - - previous_message_has_ended_ = *data.data.is_end; - - RTC_DCHECK(IsConsistent()); - return data; } bool RRSendQueue::Discard(IsUnordered unordered, diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index 7ddb426ec9..59f0d91aed 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -134,9 +134,9 @@ class RRSendQueue : public SendQueue { TimeMs expires_at, const SendOptions& send_options); - // Produces a data chunk to send. This is only called on streams that have - // data available. - DataToSend Produce(TimeMs now, size_t max_size); + // Produces a data chunk to send, or `absl::nullopt` if nothing could be + // produced, e.g. if all messages have expired. + absl::optional Produce(TimeMs now, size_t max_size); const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } ThresholdWatcher& buffered_amount() { return buffered_amount_; } @@ -167,9 +167,9 @@ class RRSendQueue : public SendQueue { // Indicates if this stream has a partially sent message in it. bool has_partially_sent_message() const; - // Indicates if the stream has data to send. It will also try to remove any - // expired non-partially sent message. - bool HasDataToSend(TimeMs now); + // Indicates if the stream possibly has data to send. Note that it may + // return `true` for streams that have enqueued, but expired, messages. + bool HasDataToSend() const; void set_priority(StreamPriority priority) { priority_ = priority; } StreamPriority priority() const { return priority_; } @@ -252,7 +252,7 @@ class RRSendQueue : public SendQueue { size_t max_size); // Return the next stream, in round-robin fashion. - std::map::iterator GetNextStream(TimeMs now); + std::map::iterator GetNextStream(); const std::string log_prefix_; const size_t buffer_size_;