If there is only little space left in a packet, and the remaining data for a partially sent message is much larger, it will not generate a small fragment for this message. This is to avoid fragmenting a message into too many packets, as that increases the risk of losing messages when partial reliability is enabled. And when a stream doesn't want to generate a too small fragment, the scheduler should _not_ switch streams. It should only switch streams when a message has been fully sent. Previously, it would switch stream when a stream doesn't want to produce a message, but as noted above, that could happen for other reasons. This required some refactoring, which also increased its robustness by now only doing explicit stream switching on fully produced messages. Bug: webrtc:12832 Change-Id: Icb213774fd0d26fba5640b00aac0407d393e4bfc Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220937 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34197}
433 lines
14 KiB
C++
433 lines
14 KiB
C++
/*
|
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
#include "net/dcsctp/tx/rr_send_queue.h"
|
|
|
|
#include <cstdint>
|
|
#include <deque>
|
|
#include <map>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "absl/algorithm/container.h"
|
|
#include "absl/types/optional.h"
|
|
#include "api/array_view.h"
|
|
#include "net/dcsctp/packet/data.h"
|
|
#include "net/dcsctp/public/dcsctp_message.h"
|
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
|
#include "net/dcsctp/public/types.h"
|
|
#include "net/dcsctp/tx/send_queue.h"
|
|
#include "rtc_base/logging.h"
|
|
|
|
namespace dcsctp {
|
|
|
|
bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
|
|
while (!items_.empty()) {
|
|
RRSendQueue::OutgoingStream::Item& item = items_.front();
|
|
if (item.message_id.has_value()) {
|
|
// Already partially sent messages can always continue to be sent.
|
|
return true;
|
|
}
|
|
|
|
// Message has expired. Remove it and inspect the next one.
|
|
if (item.expires_at.has_value() && *item.expires_at <= now) {
|
|
buffered_amount_.Decrease(item.remaining_size);
|
|
total_buffered_amount_.Decrease(item.remaining_size);
|
|
items_.pop_front();
|
|
RTC_DCHECK(IsConsistent());
|
|
continue;
|
|
}
|
|
|
|
if (is_paused_) {
|
|
// The stream has paused (and there is no partially sent message).
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool RRSendQueue::IsConsistent() const {
|
|
size_t total_buffered_amount = 0;
|
|
for (const auto& stream_entry : streams_) {
|
|
total_buffered_amount += stream_entry.second.buffered_amount().value();
|
|
}
|
|
|
|
if (previous_message_has_ended_) {
|
|
auto it = streams_.find(current_stream_id_);
|
|
if (it != streams_.end() && it->second.has_partially_sent_message()) {
|
|
RTC_DLOG(LS_ERROR)
|
|
<< "Previous message has ended, but still partial message in stream";
|
|
return false;
|
|
}
|
|
} else {
|
|
auto it = streams_.find(current_stream_id_);
|
|
if (it == streams_.end() || !it->second.has_partially_sent_message()) {
|
|
RTC_DLOG(LS_ERROR)
|
|
<< "Previous message has NOT ended, but there is no partial message";
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return total_buffered_amount == total_buffered_amount_.value();
|
|
}
|
|
|
|
bool RRSendQueue::OutgoingStream::IsConsistent() const {
|
|
size_t bytes = 0;
|
|
for (const auto& item : items_) {
|
|
bytes += item.remaining_size;
|
|
}
|
|
return bytes == buffered_amount_.value();
|
|
}
|
|
|
|
void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
|
|
RTC_DCHECK(bytes <= value_);
|
|
size_t old_value = value_;
|
|
value_ -= bytes;
|
|
|
|
if (old_value > low_threshold_ && value_ <= low_threshold_) {
|
|
on_threshold_reached_();
|
|
}
|
|
}
|
|
|
|
void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
|
|
// Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted.
|
|
if (low_threshold_ < value_ && low_threshold >= value_) {
|
|
on_threshold_reached_();
|
|
}
|
|
low_threshold_ = low_threshold;
|
|
}
|
|
|
|
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
|
absl::optional<TimeMs> expires_at,
|
|
const SendOptions& send_options) {
|
|
buffered_amount_.Increase(message.payload().size());
|
|
total_buffered_amount_.Increase(message.payload().size());
|
|
items_.emplace_back(std::move(message), expires_at, send_options);
|
|
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
|
TimeMs now,
|
|
size_t max_size) {
|
|
RTC_DCHECK(!items_.empty());
|
|
|
|
Item* item = &items_.front();
|
|
DcSctpMessage& message = item->message;
|
|
|
|
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
|
|
RTC_DCHECK(IsConsistent());
|
|
return absl::nullopt;
|
|
}
|
|
|
|
// Allocate Message ID and SSN when the first fragment is sent.
|
|
if (!item->message_id.has_value()) {
|
|
MID& mid =
|
|
item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
|
|
item->message_id = mid;
|
|
mid = MID(*mid + 1);
|
|
}
|
|
if (!item->send_options.unordered && !item->ssn.has_value()) {
|
|
item->ssn = next_ssn_;
|
|
next_ssn_ = SSN(*next_ssn_ + 1);
|
|
}
|
|
|
|
// Grab the next `max_size` fragment from this message and calculate flags.
|
|
rtc::ArrayView<const uint8_t> chunk_payload =
|
|
item->message.payload().subview(item->remaining_offset, max_size);
|
|
rtc::ArrayView<const uint8_t> message_payload = message.payload();
|
|
Data::IsBeginning is_beginning(chunk_payload.data() ==
|
|
message_payload.data());
|
|
Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
|
|
(message_payload.data() + message_payload.size()));
|
|
|
|
StreamID stream_id = message.stream_id();
|
|
PPID ppid = message.ppid();
|
|
|
|
// Zero-copy the payload if the message fits in a single chunk.
|
|
std::vector<uint8_t> payload =
|
|
is_beginning && is_end
|
|
? std::move(message).ReleasePayload()
|
|
: std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
|
|
|
|
FSN fsn(item->current_fsn);
|
|
item->current_fsn = FSN(*item->current_fsn + 1);
|
|
buffered_amount_.Decrease(payload.size());
|
|
total_buffered_amount_.Decrease(payload.size());
|
|
|
|
SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)),
|
|
item->message_id.value(), fsn, ppid,
|
|
std::move(payload), is_beginning, is_end,
|
|
item->send_options.unordered));
|
|
chunk.max_retransmissions = item->send_options.max_retransmissions;
|
|
chunk.expires_at = item->expires_at;
|
|
|
|
if (is_end) {
|
|
// The entire message has been sent, and its last data copied to `chunk`, so
|
|
// it can safely be discarded.
|
|
items_.pop_front();
|
|
} else {
|
|
item->remaining_offset += chunk_payload.size();
|
|
item->remaining_size -= chunk_payload.size();
|
|
RTC_DCHECK(item->remaining_offset + item->remaining_size ==
|
|
item->message.payload().size());
|
|
RTC_DCHECK(item->remaining_size > 0);
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
return chunk;
|
|
}
|
|
|
|
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
|
|
MID message_id) {
|
|
bool result = false;
|
|
if (!items_.empty()) {
|
|
Item& item = items_.front();
|
|
if (item.send_options.unordered == unordered &&
|
|
item.message_id.has_value() && *item.message_id == message_id) {
|
|
buffered_amount_.Decrease(item.remaining_size);
|
|
total_buffered_amount_.Decrease(item.remaining_size);
|
|
items_.pop_front();
|
|
// As the item still existed, it had unsent data.
|
|
result = true;
|
|
}
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
return result;
|
|
}
|
|
|
|
void RRSendQueue::OutgoingStream::Pause() {
|
|
is_paused_ = true;
|
|
|
|
// A stream is paused when it's about to be reset. In this implementation,
|
|
// it will throw away all non-partially send messages. This is subject to
|
|
// change. It will however not discard any partially sent messages - only
|
|
// whole messages. Partially delivered messages (at the time of receiving a
|
|
// Stream Reset command) will always deliver all the fragments before
|
|
// actually resetting the stream.
|
|
for (auto it = items_.begin(); it != items_.end();) {
|
|
if (it->remaining_offset == 0) {
|
|
buffered_amount_.Decrease(it->remaining_size);
|
|
total_buffered_amount_.Decrease(it->remaining_size);
|
|
it = items_.erase(it);
|
|
} else {
|
|
++it;
|
|
}
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
void RRSendQueue::OutgoingStream::Reset() {
|
|
if (!items_.empty()) {
|
|
// If this message has been partially sent, reset it so that it will be
|
|
// re-sent.
|
|
auto& item = items_.front();
|
|
buffered_amount_.Increase(item.message.payload().size() -
|
|
item.remaining_size);
|
|
total_buffered_amount_.Increase(item.message.payload().size() -
|
|
item.remaining_size);
|
|
item.remaining_offset = 0;
|
|
item.remaining_size = item.message.payload().size();
|
|
item.message_id = absl::nullopt;
|
|
item.ssn = absl::nullopt;
|
|
item.current_fsn = FSN(0);
|
|
}
|
|
is_paused_ = false;
|
|
next_ordered_mid_ = MID(0);
|
|
next_unordered_mid_ = MID(0);
|
|
next_ssn_ = SSN(0);
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
|
|
if (items_.empty()) {
|
|
return false;
|
|
}
|
|
return items_.front().message_id.has_value();
|
|
}
|
|
|
|
void RRSendQueue::Add(TimeMs now,
|
|
DcSctpMessage message,
|
|
const SendOptions& send_options) {
|
|
RTC_DCHECK(!message.payload().empty());
|
|
// Any limited lifetime should start counting from now - when the message
|
|
// has been added to the queue.
|
|
absl::optional<TimeMs> expires_at = absl::nullopt;
|
|
if (send_options.lifetime.has_value()) {
|
|
// `expires_at` is the time when it expires. Which is slightly larger than
|
|
// the message's lifetime, as the message is alive during its entire
|
|
// lifetime (which may be zero).
|
|
expires_at = now + *send_options.lifetime + DurationMs(1);
|
|
}
|
|
GetOrCreateStreamInfo(message.stream_id())
|
|
.Add(std::move(message), expires_at, send_options);
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
bool RRSendQueue::IsFull() const {
|
|
return total_buffered_amount() >= buffer_size_;
|
|
}
|
|
|
|
bool RRSendQueue::IsEmpty() const {
|
|
return total_buffered_amount() == 0;
|
|
}
|
|
|
|
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
|
|
RRSendQueue::GetNextStream(TimeMs now) {
|
|
auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
|
|
|
|
for (auto it = start_it; it != streams_.end(); ++it) {
|
|
if (it->second.HasDataToSend(now)) {
|
|
current_stream_id_ = it->first;
|
|
return it;
|
|
}
|
|
}
|
|
|
|
for (auto it = streams_.begin(); it != start_it; ++it) {
|
|
if (it->second.HasDataToSend(now)) {
|
|
current_stream_id_ = it->first;
|
|
return it;
|
|
}
|
|
}
|
|
return streams_.end();
|
|
}
|
|
|
|
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
|
size_t max_size) {
|
|
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
|
|
|
|
if (previous_message_has_ended_) {
|
|
// Previous message has ended. Round-robin to a different stream, if there
|
|
// even is one with data to send.
|
|
stream_it = GetNextStream(now);
|
|
if (stream_it == streams_.end()) {
|
|
RTC_DLOG(LS_VERBOSE)
|
|
<< log_prefix_
|
|
<< "There is no stream with data; Can't produce any data.";
|
|
return absl::nullopt;
|
|
}
|
|
} else {
|
|
// The previous message has not ended; Continue from the current stream.
|
|
stream_it = streams_.find(current_stream_id_);
|
|
RTC_DCHECK(stream_it != streams_.end());
|
|
}
|
|
|
|
absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
|
|
if (data.has_value()) {
|
|
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
|
|
<< (data->data.is_unordered ? "unordered" : "ordered")
|
|
<< "::"
|
|
<< (*data->data.is_beginning && *data->data.is_end
|
|
? "complete"
|
|
: *data->data.is_beginning
|
|
? "first"
|
|
: *data->data.is_end ? "last" : "middle")
|
|
<< ", stream_id=" << *stream_it->first
|
|
<< ", ppid=" << *data->data.ppid
|
|
<< ", length=" << data->data.payload.size();
|
|
|
|
previous_message_has_ended_ = *data->data.is_end;
|
|
}
|
|
|
|
RTC_DCHECK(IsConsistent());
|
|
return data;
|
|
}
|
|
|
|
bool RRSendQueue::Discard(IsUnordered unordered,
|
|
StreamID stream_id,
|
|
MID message_id) {
|
|
bool has_discarded =
|
|
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
|
|
if (has_discarded) {
|
|
// Only partially sent messages are discarded, so if a message was
|
|
// discarded, then it was the currently sent message.
|
|
previous_message_has_ended_ = true;
|
|
}
|
|
|
|
return has_discarded;
|
|
}
|
|
|
|
void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
|
|
for (StreamID stream_id : streams) {
|
|
GetOrCreateStreamInfo(stream_id).Pause();
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
bool RRSendQueue::CanResetStreams() const {
|
|
// Streams can be reset if those streams that are paused don't have any
|
|
// messages that are partially sent.
|
|
for (auto& stream : streams_) {
|
|
if (stream.second.is_paused() &&
|
|
stream.second.has_partially_sent_message()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void RRSendQueue::CommitResetStreams() {
|
|
Reset();
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
void RRSendQueue::RollbackResetStreams() {
|
|
for (auto& stream_entry : streams_) {
|
|
stream_entry.second.Resume();
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
void RRSendQueue::Reset() {
|
|
// Recalculate buffered amount, as partially sent messages may have been put
|
|
// fully back in the queue.
|
|
for (auto& stream_entry : streams_) {
|
|
OutgoingStream& stream = stream_entry.second;
|
|
stream.Reset();
|
|
}
|
|
previous_message_has_ended_ = true;
|
|
}
|
|
|
|
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
|
|
auto it = streams_.find(stream_id);
|
|
if (it == streams_.end()) {
|
|
return 0;
|
|
}
|
|
return it->second.buffered_amount().value();
|
|
}
|
|
|
|
size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const {
|
|
auto it = streams_.find(stream_id);
|
|
if (it == streams_.end()) {
|
|
return 0;
|
|
}
|
|
return it->second.buffered_amount().low_threshold();
|
|
}
|
|
|
|
void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id,
|
|
size_t bytes) {
|
|
GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes);
|
|
}
|
|
|
|
RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
|
|
StreamID stream_id) {
|
|
auto it = streams_.find(stream_id);
|
|
if (it != streams_.end()) {
|
|
return it->second;
|
|
}
|
|
|
|
return streams_
|
|
.emplace(stream_id,
|
|
OutgoingStream(
|
|
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
|
total_buffered_amount_))
|
|
.first->second;
|
|
}
|
|
} // namespace dcsctp
|