This reverts commit 17a02a31d7d2897b75ad69fdac5d10e7475a5865. Reason for revert: Breaks downstream test Original change's description: > dcsctp: Add public API for setting priorities > > This is the first part of supporting stream priorities, and adds the API > and very basic support for setting and retrieving the stream priority. > > This commit doesn't in any way change the actual packet sending - the > specified priority values are stored, but not acted on. > > This is all that is client visible, so clients can start using the API > as written, and they would never notice that things are missing. > > Bug: webrtc:5696 > Change-Id: I24fce8cbb6f3cba187df99d1d3f45e73621c93c6 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261943 > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Commit-Queue: Victor Boivie <boivie@webrtc.org> > Cr-Commit-Position: refs/heads/main@{#37034} Bug: webrtc:5696 Change-Id: If172d9c9dbce7aae72152abbbae1ccc77340bbc1 No-Presubmit: true No-Tree-Checks: true No-Try: true Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264444 Owners-Override: Björn Terelius <terelius@webrtc.org> Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com> Commit-Queue: Björn Terelius <terelius@webrtc.org> Auto-Submit: Björn Terelius <terelius@webrtc.org> Reviewed-by: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37039}
277 lines
10 KiB
C++
277 lines
10 KiB
C++
/*
|
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
|
|
#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_
|
|
|
|
#include <cstdint>
|
|
#include <deque>
|
|
#include <map>
|
|
#include <string>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "absl/algorithm/container.h"
|
|
#include "absl/strings/string_view.h"
|
|
#include "absl/types/optional.h"
|
|
#include "api/array_view.h"
|
|
#include "net/dcsctp/public/dcsctp_message.h"
|
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
|
#include "net/dcsctp/public/types.h"
|
|
#include "net/dcsctp/tx/send_queue.h"
|
|
|
|
namespace dcsctp {
|
|
|
|
// The Round Robin SendQueue holds all messages that the client wants to send,
|
|
// but that haven't yet been split into chunks and fully sent on the wire.
|
|
//
|
|
// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
|
|
// it will cycle to send messages from different streams. It will send all
|
|
// fragments from one message before continuing with a different message on
|
|
// possibly a different stream, until support for message interleaving has been
|
|
// implemented.
|
|
//
|
|
// As messages can be (requested to be) sent before the connection is properly
|
|
// established, this send queue is always present - even for closed connections.
|
|
class RRSendQueue : public SendQueue {
|
|
public:
|
|
RRSendQueue(absl::string_view log_prefix,
|
|
size_t buffer_size,
|
|
std::function<void(StreamID)> on_buffered_amount_low,
|
|
size_t total_buffered_amount_low_threshold,
|
|
std::function<void()> on_total_buffered_amount_low);
|
|
|
|
// Indicates if the buffer is full. Note that it's up to the caller to ensure
|
|
// that the buffer is not full prior to adding new items to it.
|
|
bool IsFull() const;
|
|
// Indicates if the buffer is empty.
|
|
bool IsEmpty() const;
|
|
|
|
// Adds the message to be sent using the `send_options` provided. The current
|
|
// time should be in `now`. Note that it's the responsibility of the caller to
|
|
// ensure that the buffer is not full (by calling `IsFull`) before adding
|
|
// messages to it.
|
|
void Add(TimeMs now,
|
|
DcSctpMessage message,
|
|
const SendOptions& send_options = {});
|
|
|
|
// Implementation of `SendQueue`.
|
|
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
|
|
bool Discard(IsUnordered unordered,
|
|
StreamID stream_id,
|
|
MID message_id) override;
|
|
void PrepareResetStream(StreamID streams) override;
|
|
bool HasStreamsReadyToBeReset() const override;
|
|
std::vector<StreamID> GetStreamsReadyToBeReset() override;
|
|
void CommitResetStreams() override;
|
|
void RollbackResetStreams() override;
|
|
void Reset() override;
|
|
size_t buffered_amount(StreamID stream_id) const override;
|
|
size_t total_buffered_amount() const override {
|
|
return total_buffered_amount_.value();
|
|
}
|
|
size_t buffered_amount_low_threshold(StreamID stream_id) const override;
|
|
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
|
|
|
|
HandoverReadinessStatus GetHandoverReadiness() const;
|
|
void AddHandoverState(DcSctpSocketHandoverState& state);
|
|
void RestoreFromState(const DcSctpSocketHandoverState& state);
|
|
|
|
private:
|
|
// Represents a value and a "low threshold" that when the value reaches or
|
|
// goes under the "low threshold", will trigger `on_threshold_reached`
|
|
// callback.
|
|
class ThresholdWatcher {
|
|
public:
|
|
explicit ThresholdWatcher(std::function<void()> on_threshold_reached)
|
|
: on_threshold_reached_(std::move(on_threshold_reached)) {}
|
|
// Increases the value.
|
|
void Increase(size_t bytes) { value_ += bytes; }
|
|
// Decreases the value and triggers `on_threshold_reached` if it's at or
|
|
// below `low_threshold()`.
|
|
void Decrease(size_t bytes);
|
|
|
|
size_t value() const { return value_; }
|
|
size_t low_threshold() const { return low_threshold_; }
|
|
void SetLowThreshold(size_t low_threshold);
|
|
|
|
private:
|
|
const std::function<void()> on_threshold_reached_;
|
|
size_t value_ = 0;
|
|
size_t low_threshold_ = 0;
|
|
};
|
|
|
|
// Per-stream information.
|
|
class OutgoingStream {
|
|
public:
|
|
OutgoingStream(
|
|
StreamID stream_id,
|
|
std::function<void()> on_buffered_amount_low,
|
|
ThresholdWatcher& total_buffered_amount,
|
|
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
|
|
: stream_id_(stream_id),
|
|
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
|
|
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
|
|
next_ssn_(SSN(state ? state->next_ssn : 0)),
|
|
buffered_amount_(std::move(on_buffered_amount_low)),
|
|
total_buffered_amount_(total_buffered_amount) {}
|
|
|
|
StreamID stream_id() const { return stream_id_; }
|
|
|
|
// Enqueues a message to this stream.
|
|
void Add(DcSctpMessage message,
|
|
TimeMs expires_at,
|
|
const SendOptions& send_options);
|
|
|
|
// Produces a data chunk to send. This is only called on streams that have
|
|
// data available.
|
|
DataToSend Produce(TimeMs now, size_t max_size);
|
|
|
|
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
|
|
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
|
|
|
|
// Discards a partially sent message, see `SendQueue::Discard`.
|
|
bool Discard(IsUnordered unordered, MID message_id);
|
|
|
|
// Pauses this stream, which is used before resetting it.
|
|
void Pause();
|
|
|
|
// Resumes a paused stream.
|
|
void Resume();
|
|
|
|
bool IsReadyToBeReset() const {
|
|
return pause_state_ == PauseState::kPaused;
|
|
}
|
|
|
|
bool IsResetting() const { return pause_state_ == PauseState::kResetting; }
|
|
|
|
void SetAsResetting() {
|
|
RTC_DCHECK(pause_state_ == PauseState::kPaused);
|
|
pause_state_ = PauseState::kResetting;
|
|
}
|
|
|
|
// Resets this stream, meaning MIDs and SSNs are set to zero.
|
|
void Reset();
|
|
|
|
// Indicates if this stream has a partially sent message in it.
|
|
bool has_partially_sent_message() const;
|
|
|
|
// Indicates if the stream has data to send. It will also try to remove any
|
|
// expired non-partially sent message.
|
|
bool HasDataToSend(TimeMs now);
|
|
|
|
void AddHandoverState(
|
|
DcSctpSocketHandoverState::OutgoingStream& state) const;
|
|
|
|
private:
|
|
// Streams are paused before they can be reset. To reset a stream, the
|
|
// socket sends an outgoing stream reset command with the TSN of the last
|
|
// fragment of the last message, so that receivers and senders can agree on
|
|
// when it stopped. And if the send queue is in the middle of sending a
|
|
// message, and without fragments not yet sent and without TSNs allocated to
|
|
// them, it will keep sending data until that message has ended.
|
|
enum class PauseState {
|
|
// The stream is not paused, and not scheduled to be reset.
|
|
kNotPaused,
|
|
// The stream has requested to be reset/paused but is still producing
|
|
// fragments of a message that hasn't ended yet. When it does, it will
|
|
// transition to the `kPaused` state.
|
|
kPending,
|
|
// The stream is fully paused and can be reset.
|
|
kPaused,
|
|
// The stream has been added to an outgoing stream reset request and a
|
|
// response from the peer hasn't been received yet.
|
|
kResetting,
|
|
};
|
|
|
|
// An enqueued message and metadata.
|
|
struct Item {
|
|
explicit Item(DcSctpMessage msg,
|
|
TimeMs expires_at,
|
|
const SendOptions& send_options)
|
|
: message(std::move(msg)),
|
|
expires_at(expires_at),
|
|
send_options(send_options),
|
|
remaining_offset(0),
|
|
remaining_size(message.payload().size()) {}
|
|
DcSctpMessage message;
|
|
TimeMs expires_at;
|
|
SendOptions send_options;
|
|
// The remaining payload (offset and size) to be sent, when it has been
|
|
// fragmented.
|
|
size_t remaining_offset;
|
|
size_t remaining_size;
|
|
// If set, an allocated Message ID and SSN. Will be allocated when the
|
|
// first fragment is sent.
|
|
absl::optional<MID> message_id = absl::nullopt;
|
|
absl::optional<SSN> ssn = absl::nullopt;
|
|
// The current Fragment Sequence Number, incremented for each fragment.
|
|
FSN current_fsn = FSN(0);
|
|
};
|
|
|
|
bool IsConsistent() const;
|
|
|
|
const StreamID stream_id_;
|
|
PauseState pause_state_ = PauseState::kNotPaused;
|
|
// MIDs are different for unordered and ordered messages sent on a stream.
|
|
MID next_unordered_mid_;
|
|
MID next_ordered_mid_;
|
|
|
|
SSN next_ssn_;
|
|
// Enqueued messages, and metadata.
|
|
std::deque<Item> items_;
|
|
|
|
// The current amount of buffered data.
|
|
ThresholdWatcher buffered_amount_;
|
|
|
|
// Reference to the total buffered amount, which is updated directly by each
|
|
// stream.
|
|
ThresholdWatcher& total_buffered_amount_;
|
|
};
|
|
|
|
bool IsConsistent() const;
|
|
OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
|
|
absl::optional<DataToSend> Produce(
|
|
std::map<StreamID, OutgoingStream>::iterator it,
|
|
TimeMs now,
|
|
size_t max_size);
|
|
|
|
// Return the next stream, in round-robin fashion.
|
|
std::map<StreamID, OutgoingStream>::iterator GetNextStream(TimeMs now);
|
|
|
|
const std::string log_prefix_;
|
|
const size_t buffer_size_;
|
|
|
|
// Called when the buffered amount is below what has been set using
|
|
// `SetBufferedAmountLowThreshold`.
|
|
const std::function<void(StreamID)> on_buffered_amount_low_;
|
|
|
|
// Called when the total buffered amount is below what has been set using
|
|
// `SetTotalBufferedAmountLowThreshold`.
|
|
const std::function<void()> on_total_buffered_amount_low_;
|
|
|
|
// The total amount of buffer data, for all streams.
|
|
ThresholdWatcher total_buffered_amount_;
|
|
|
|
// Indicates if the previous fragment sent was the end of a message. For
|
|
// non-interleaved sending, this means that the next message may come from a
|
|
// different stream. If not true, the next fragment must be produced from the
|
|
// same stream as last time.
|
|
bool previous_message_has_ended_ = true;
|
|
|
|
// The current stream to send chunks from. Modified by `GetNextStream`.
|
|
StreamID current_stream_id_ = StreamID(0);
|
|
|
|
// All streams, and messages added to those.
|
|
std::map<StreamID, OutgoingStream> streams_;
|
|
};
|
|
} // namespace dcsctp
|
|
|
|
#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_
|