When a single stream is reset, and an outgoing SSN reset request is sent and later acked by the peer sending a reconfiguration response with status=Performed, the sender should unpause the paused stream and reset the SSNs of that (ordered) stream. But only the single stream that was paused, and not all streams. In this scenario, dcSCTP would - when the peer acked the SSN reset request - reset the SSN of all streams. This was found by orphis@webrtc.org using a data channel test application. The peer, if it's a usrsctp client, will ABORT with PROTOCOL_VIOLATION as it has already seen that SSN on that stream but with a different TSN. This bug was introduced when implementing the Round Robin scheduler in https://webrtc-review.googlesource.com/c/src/+/219682. The FCFS scheduler prior to this change was implemented correctly. Bug: webrtc:12952 Change-Id: I3ea144a1df303145f69a5b03aada7f448c8c8163 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225266 Reviewed-by: Florent Castelli <orphis@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34436}
437 lines
14 KiB
C++
437 lines
14 KiB
C++
/*
|
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
#include "net/dcsctp/tx/rr_send_queue.h"
|
|
|
|
#include <cstdint>
|
|
#include <deque>
|
|
#include <map>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "absl/algorithm/container.h"
|
|
#include "absl/types/optional.h"
|
|
#include "api/array_view.h"
|
|
#include "net/dcsctp/packet/data.h"
|
|
#include "net/dcsctp/public/dcsctp_message.h"
|
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
|
#include "net/dcsctp/public/types.h"
|
|
#include "net/dcsctp/tx/send_queue.h"
|
|
#include "rtc_base/logging.h"
|
|
|
|
namespace dcsctp {
|
|
|
|
bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
|
|
while (!items_.empty()) {
|
|
RRSendQueue::OutgoingStream::Item& item = items_.front();
|
|
if (item.message_id.has_value()) {
|
|
// Already partially sent messages can always continue to be sent.
|
|
return true;
|
|
}
|
|
|
|
// Message has expired. Remove it and inspect the next one.
|
|
if (item.expires_at.has_value() && *item.expires_at <= now) {
|
|
buffered_amount_.Decrease(item.remaining_size);
|
|
total_buffered_amount_.Decrease(item.remaining_size);
|
|
items_.pop_front();
|
|
RTC_DCHECK(IsConsistent());
|
|
continue;
|
|
}
|
|
|
|
if (is_paused_) {
|
|
// The stream has paused (and there is no partially sent message).
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool RRSendQueue::IsConsistent() const {
|
|
size_t total_buffered_amount = 0;
|
|
for (const auto& stream_entry : streams_) {
|
|
total_buffered_amount += stream_entry.second.buffered_amount().value();
|
|
}
|
|
|
|
if (previous_message_has_ended_) {
|
|
auto it = streams_.find(current_stream_id_);
|
|
if (it != streams_.end() && it->second.has_partially_sent_message()) {
|
|
RTC_DLOG(LS_ERROR)
|
|
<< "Previous message has ended, but still partial message in stream";
|
|
return false;
|
|
}
|
|
} else {
|
|
auto it = streams_.find(current_stream_id_);
|
|
if (it == streams_.end() || !it->second.has_partially_sent_message()) {
|
|
RTC_DLOG(LS_ERROR)
|
|
<< "Previous message has NOT ended, but there is no partial message";
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return total_buffered_amount == total_buffered_amount_.value();
|
|
}
|
|
|
|
bool RRSendQueue::OutgoingStream::IsConsistent() const {
|
|
size_t bytes = 0;
|
|
for (const auto& item : items_) {
|
|
bytes += item.remaining_size;
|
|
}
|
|
return bytes == buffered_amount_.value();
|
|
}
|
|
|
|
void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
|
|
RTC_DCHECK(bytes <= value_);
|
|
size_t old_value = value_;
|
|
value_ -= bytes;
|
|
|
|
if (old_value > low_threshold_ && value_ <= low_threshold_) {
|
|
on_threshold_reached_();
|
|
}
|
|
}
|
|
|
|
void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
|
|
// Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted.
|
|
if (low_threshold_ < value_ && low_threshold >= value_) {
|
|
on_threshold_reached_();
|
|
}
|
|
low_threshold_ = low_threshold;
|
|
}
|
|
|
|
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
|
absl::optional<TimeMs> expires_at,
|
|
const SendOptions& send_options) {
|
|
buffered_amount_.Increase(message.payload().size());
|
|
total_buffered_amount_.Increase(message.payload().size());
|
|
items_.emplace_back(std::move(message), expires_at, send_options);
|
|
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
|
TimeMs now,
|
|
size_t max_size) {
|
|
RTC_DCHECK(!items_.empty());
|
|
|
|
Item* item = &items_.front();
|
|
DcSctpMessage& message = item->message;
|
|
|
|
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
|
|
RTC_DCHECK(IsConsistent());
|
|
return absl::nullopt;
|
|
}
|
|
|
|
// Allocate Message ID and SSN when the first fragment is sent.
|
|
if (!item->message_id.has_value()) {
|
|
MID& mid =
|
|
item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
|
|
item->message_id = mid;
|
|
mid = MID(*mid + 1);
|
|
}
|
|
if (!item->send_options.unordered && !item->ssn.has_value()) {
|
|
item->ssn = next_ssn_;
|
|
next_ssn_ = SSN(*next_ssn_ + 1);
|
|
}
|
|
|
|
// Grab the next `max_size` fragment from this message and calculate flags.
|
|
rtc::ArrayView<const uint8_t> chunk_payload =
|
|
item->message.payload().subview(item->remaining_offset, max_size);
|
|
rtc::ArrayView<const uint8_t> message_payload = message.payload();
|
|
Data::IsBeginning is_beginning(chunk_payload.data() ==
|
|
message_payload.data());
|
|
Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
|
|
(message_payload.data() + message_payload.size()));
|
|
|
|
StreamID stream_id = message.stream_id();
|
|
PPID ppid = message.ppid();
|
|
|
|
// Zero-copy the payload if the message fits in a single chunk.
|
|
std::vector<uint8_t> payload =
|
|
is_beginning && is_end
|
|
? std::move(message).ReleasePayload()
|
|
: std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
|
|
|
|
FSN fsn(item->current_fsn);
|
|
item->current_fsn = FSN(*item->current_fsn + 1);
|
|
buffered_amount_.Decrease(payload.size());
|
|
total_buffered_amount_.Decrease(payload.size());
|
|
|
|
SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)),
|
|
item->message_id.value(), fsn, ppid,
|
|
std::move(payload), is_beginning, is_end,
|
|
item->send_options.unordered));
|
|
chunk.max_retransmissions = item->send_options.max_retransmissions;
|
|
chunk.expires_at = item->expires_at;
|
|
|
|
if (is_end) {
|
|
// The entire message has been sent, and its last data copied to `chunk`, so
|
|
// it can safely be discarded.
|
|
items_.pop_front();
|
|
} else {
|
|
item->remaining_offset += chunk_payload.size();
|
|
item->remaining_size -= chunk_payload.size();
|
|
RTC_DCHECK(item->remaining_offset + item->remaining_size ==
|
|
item->message.payload().size());
|
|
RTC_DCHECK(item->remaining_size > 0);
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
return chunk;
|
|
}
|
|
|
|
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
|
|
MID message_id) {
|
|
bool result = false;
|
|
if (!items_.empty()) {
|
|
Item& item = items_.front();
|
|
if (item.send_options.unordered == unordered &&
|
|
item.message_id.has_value() && *item.message_id == message_id) {
|
|
buffered_amount_.Decrease(item.remaining_size);
|
|
total_buffered_amount_.Decrease(item.remaining_size);
|
|
items_.pop_front();
|
|
// As the item still existed, it had unsent data.
|
|
result = true;
|
|
}
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
return result;
|
|
}
|
|
|
|
void RRSendQueue::OutgoingStream::Pause() {
|
|
is_paused_ = true;
|
|
|
|
// A stream is paused when it's about to be reset. In this implementation,
|
|
// it will throw away all non-partially send messages. This is subject to
|
|
// change. It will however not discard any partially sent messages - only
|
|
// whole messages. Partially delivered messages (at the time of receiving a
|
|
// Stream Reset command) will always deliver all the fragments before
|
|
// actually resetting the stream.
|
|
for (auto it = items_.begin(); it != items_.end();) {
|
|
if (it->remaining_offset == 0) {
|
|
buffered_amount_.Decrease(it->remaining_size);
|
|
total_buffered_amount_.Decrease(it->remaining_size);
|
|
it = items_.erase(it);
|
|
} else {
|
|
++it;
|
|
}
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
void RRSendQueue::OutgoingStream::Reset() {
|
|
if (!items_.empty()) {
|
|
// If this message has been partially sent, reset it so that it will be
|
|
// re-sent.
|
|
auto& item = items_.front();
|
|
buffered_amount_.Increase(item.message.payload().size() -
|
|
item.remaining_size);
|
|
total_buffered_amount_.Increase(item.message.payload().size() -
|
|
item.remaining_size);
|
|
item.remaining_offset = 0;
|
|
item.remaining_size = item.message.payload().size();
|
|
item.message_id = absl::nullopt;
|
|
item.ssn = absl::nullopt;
|
|
item.current_fsn = FSN(0);
|
|
}
|
|
is_paused_ = false;
|
|
next_ordered_mid_ = MID(0);
|
|
next_unordered_mid_ = MID(0);
|
|
next_ssn_ = SSN(0);
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
|
|
if (items_.empty()) {
|
|
return false;
|
|
}
|
|
return items_.front().message_id.has_value();
|
|
}
|
|
|
|
void RRSendQueue::Add(TimeMs now,
|
|
DcSctpMessage message,
|
|
const SendOptions& send_options) {
|
|
RTC_DCHECK(!message.payload().empty());
|
|
// Any limited lifetime should start counting from now - when the message
|
|
// has been added to the queue.
|
|
absl::optional<TimeMs> expires_at = absl::nullopt;
|
|
if (send_options.lifetime.has_value()) {
|
|
// `expires_at` is the time when it expires. Which is slightly larger than
|
|
// the message's lifetime, as the message is alive during its entire
|
|
// lifetime (which may be zero).
|
|
expires_at = now + *send_options.lifetime + DurationMs(1);
|
|
}
|
|
GetOrCreateStreamInfo(message.stream_id())
|
|
.Add(std::move(message), expires_at, send_options);
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
bool RRSendQueue::IsFull() const {
|
|
return total_buffered_amount() >= buffer_size_;
|
|
}
|
|
|
|
bool RRSendQueue::IsEmpty() const {
|
|
return total_buffered_amount() == 0;
|
|
}
|
|
|
|
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
|
|
RRSendQueue::GetNextStream(TimeMs now) {
|
|
auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
|
|
|
|
for (auto it = start_it; it != streams_.end(); ++it) {
|
|
if (it->second.HasDataToSend(now)) {
|
|
current_stream_id_ = it->first;
|
|
return it;
|
|
}
|
|
}
|
|
|
|
for (auto it = streams_.begin(); it != start_it; ++it) {
|
|
if (it->second.HasDataToSend(now)) {
|
|
current_stream_id_ = it->first;
|
|
return it;
|
|
}
|
|
}
|
|
return streams_.end();
|
|
}
|
|
|
|
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
|
size_t max_size) {
|
|
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
|
|
|
|
if (previous_message_has_ended_) {
|
|
// Previous message has ended. Round-robin to a different stream, if there
|
|
// even is one with data to send.
|
|
stream_it = GetNextStream(now);
|
|
if (stream_it == streams_.end()) {
|
|
RTC_DLOG(LS_VERBOSE)
|
|
<< log_prefix_
|
|
<< "There is no stream with data; Can't produce any data.";
|
|
return absl::nullopt;
|
|
}
|
|
} else {
|
|
// The previous message has not ended; Continue from the current stream.
|
|
stream_it = streams_.find(current_stream_id_);
|
|
RTC_DCHECK(stream_it != streams_.end());
|
|
}
|
|
|
|
absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
|
|
if (data.has_value()) {
|
|
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
|
|
<< (data->data.is_unordered ? "unordered" : "ordered")
|
|
<< "::"
|
|
<< (*data->data.is_beginning && *data->data.is_end
|
|
? "complete"
|
|
: *data->data.is_beginning
|
|
? "first"
|
|
: *data->data.is_end ? "last" : "middle")
|
|
<< ", stream_id=" << *stream_it->first
|
|
<< ", ppid=" << *data->data.ppid
|
|
<< ", length=" << data->data.payload.size();
|
|
|
|
previous_message_has_ended_ = *data->data.is_end;
|
|
}
|
|
|
|
RTC_DCHECK(IsConsistent());
|
|
return data;
|
|
}
|
|
|
|
bool RRSendQueue::Discard(IsUnordered unordered,
|
|
StreamID stream_id,
|
|
MID message_id) {
|
|
bool has_discarded =
|
|
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
|
|
if (has_discarded) {
|
|
// Only partially sent messages are discarded, so if a message was
|
|
// discarded, then it was the currently sent message.
|
|
previous_message_has_ended_ = true;
|
|
}
|
|
|
|
return has_discarded;
|
|
}
|
|
|
|
void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
|
|
for (StreamID stream_id : streams) {
|
|
GetOrCreateStreamInfo(stream_id).Pause();
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
bool RRSendQueue::CanResetStreams() const {
|
|
// Streams can be reset if those streams that are paused don't have any
|
|
// messages that are partially sent.
|
|
for (auto& stream : streams_) {
|
|
if (stream.second.is_paused() &&
|
|
stream.second.has_partially_sent_message()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void RRSendQueue::CommitResetStreams() {
|
|
for (auto& stream_entry : streams_) {
|
|
if (stream_entry.second.is_paused()) {
|
|
stream_entry.second.Reset();
|
|
}
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
void RRSendQueue::RollbackResetStreams() {
|
|
for (auto& stream_entry : streams_) {
|
|
stream_entry.second.Resume();
|
|
}
|
|
RTC_DCHECK(IsConsistent());
|
|
}
|
|
|
|
void RRSendQueue::Reset() {
|
|
// Recalculate buffered amount, as partially sent messages may have been put
|
|
// fully back in the queue.
|
|
for (auto& stream_entry : streams_) {
|
|
OutgoingStream& stream = stream_entry.second;
|
|
stream.Reset();
|
|
}
|
|
previous_message_has_ended_ = true;
|
|
}
|
|
|
|
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
|
|
auto it = streams_.find(stream_id);
|
|
if (it == streams_.end()) {
|
|
return 0;
|
|
}
|
|
return it->second.buffered_amount().value();
|
|
}
|
|
|
|
size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const {
|
|
auto it = streams_.find(stream_id);
|
|
if (it == streams_.end()) {
|
|
return 0;
|
|
}
|
|
return it->second.buffered_amount().low_threshold();
|
|
}
|
|
|
|
void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id,
|
|
size_t bytes) {
|
|
GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes);
|
|
}
|
|
|
|
RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
|
|
StreamID stream_id) {
|
|
auto it = streams_.find(stream_id);
|
|
if (it != streams_.end()) {
|
|
return it->second;
|
|
}
|
|
|
|
return streams_
|
|
.emplace(stream_id,
|
|
OutgoingStream(
|
|
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
|
total_buffered_amount_))
|
|
.first->second;
|
|
}
|
|
} // namespace dcsctp
|