webrtc_m130/net/dcsctp/tx/stream_scheduler.h
Victor Boivie 4fbf555989 dcsctp: Make use of log_prefix consistent
The log_prefix frequently used in dcSCTP is intended to be used
to separate logs from different sockets within the same log output,
typically in unit tests. Every log entry always has the file and
line, so it's not important to add more information to the log prefix
that indicates _where_ it's logged. So those have been removed.

Also, since log_prefix is a string (typically 32 bytes) and it's
never changing during the lifetime of the socket, pass and store it
as a absl::string_view to save memory.

Bug: None
Change-Id: I10466710ca6c2badfcd3adc5630426a90ca74204
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/274704
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39571}
2023-03-15 22:15:05 +00:00

225 lines
8.6 KiB
C++

/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_
#define NET_DCSCTP_TX_STREAM_SCHEDULER_H_
#include <algorithm>
#include <cstdint>
#include <deque>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <string>
#include <utility>
#include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/packet/chunk/idata_chunk.h"
#include "net/dcsctp/packet/sctp_packet.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/containers/flat_set.h"
#include "rtc_base/strong_alias.h"
namespace dcsctp {
// A parameterized stream scheduler. Currently, it implements the round robin
// scheduling algorithm using virtual finish time. It is to be used as a part of
// a send queue and will track all active streams (streams that have any data
// that can be sent).
//
// The stream scheduler works with the concept of associating active streams
// with a "virtual finish time", which is the time when a stream is allowed to
// produce data. Streams are ordered by their virtual finish time, and the
// "current virtual time" will advance to the next following virtual finish time
// whenever a chunk is to be produced.
//
// When message interleaving is enabled, the WFQ - Weighted Fair Queueing -
// scheduling algorithm will be used. And when it's not, round-robin scheduling
// will be used instead.
//
// In the round robin scheduling algorithm, a stream's virtual finish time will
// just increment by one (1) after having produced a chunk, which results in a
// round-robin scheduling.
//
// In WFQ scheduling algorithm, a stream's virtual finish time will be defined
// as the number of bytes in the next fragment to be sent, multiplied by the
// inverse of the stream's priority, meaning that a high priority - or a smaller
// fragment - results in a closer virtual finish time, compared to a stream with
// either a lower priority or a larger fragment to be sent.
class StreamScheduler {
private:
class VirtualTime : public webrtc::StrongAlias<class VirtualTimeTag, double> {
public:
constexpr explicit VirtualTime(const UnderlyingType& v)
: webrtc::StrongAlias<class VirtualTimeTag, double>(v) {}
static constexpr VirtualTime Zero() { return VirtualTime(0); }
};
class InverseWeight
: public webrtc::StrongAlias<class InverseWeightTag, double> {
public:
constexpr explicit InverseWeight(StreamPriority priority)
: webrtc::StrongAlias<class InverseWeightTag, double>(
1.0 / std::max(static_cast<double>(*priority), 0.000001)) {}
};
public:
class StreamProducer {
public:
virtual ~StreamProducer() = default;
// Produces a fragment of data to send. The current wall time is specified
// as `now` and should be used to skip chunks with expired limited lifetime.
// The parameter `max_size` specifies the maximum amount of actual payload
// that may be returned. If these constraints prevents the stream from
// sending some data, `absl::nullopt` should be returned.
virtual absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
size_t max_size) = 0;
// Returns the number of payload bytes that is scheduled to be sent in the
// next enqueued message, or zero if there are no enqueued messages or if
// the stream has been actively paused.
virtual size_t bytes_to_send_in_next_message() const = 0;
};
class Stream {
public:
StreamID stream_id() const { return stream_id_; }
StreamPriority priority() const { return priority_; }
void SetPriority(StreamPriority priority);
// Will activate the stream _if_ it has any data to send. That is, if the
// callback to `bytes_to_send_in_next_message` returns non-zero. If the
// callback returns zero, the stream will not be made active.
void MaybeMakeActive();
// Will remove the stream from the list of active streams, and will not try
// to produce data from it. To make it active again, call `MaybeMakeActive`.
void MakeInactive();
// Make the scheduler move to another message, or another stream. This is
// used to abort the scheduler from continuing producing fragments for the
// current message in case it's deleted.
void ForceReschedule() { parent_.ForceReschedule(); }
private:
friend class StreamScheduler;
Stream(StreamScheduler* parent,
StreamProducer* producer,
StreamID stream_id,
StreamPriority priority)
: parent_(*parent),
producer_(*producer),
stream_id_(stream_id),
priority_(priority),
inverse_weight_(priority) {}
// Produces a message from this stream. This will only be called on streams
// that have data.
absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
void MakeActive(size_t bytes_to_send_next);
void ForceMarkInactive();
VirtualTime current_time() const { return current_virtual_time_; }
VirtualTime next_finish_time() const { return next_finish_time_; }
size_t bytes_to_send_in_next_message() const {
return producer_.bytes_to_send_in_next_message();
}
VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const;
StreamScheduler& parent_;
StreamProducer& producer_;
const StreamID stream_id_;
StreamPriority priority_;
InverseWeight inverse_weight_;
// This outgoing stream's "current" virtual_time.
VirtualTime current_virtual_time_ = VirtualTime::Zero();
VirtualTime next_finish_time_ = VirtualTime::Zero();
};
// The `mtu` parameter represents the maximum SCTP packet size, which should
// be the same as `DcSctpOptions::mtu`.
StreamScheduler(absl::string_view log_prefix, size_t mtu)
: log_prefix_(log_prefix),
max_payload_bytes_(mtu - SctpPacket::kHeaderSize -
IDataChunk::kHeaderSize) {}
std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
StreamID stream_id,
StreamPriority priority) {
return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
}
void EnableMessageInterleaving(bool enabled) {
enable_message_interleaving_ = enabled;
}
// Makes the scheduler stop producing message from the current stream and
// re-evaluates which stream to produce from.
void ForceReschedule() { currently_sending_a_message_ = false; }
// Produces a fragment of data to send. The current wall time is specified as
// `now` and will be used to skip chunks with expired limited lifetime. The
// parameter `max_size` specifies the maximum amount of actual payload that
// may be returned. If no data can be produced, `absl::nullopt` is returned.
absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
std::set<StreamID> ActiveStreamsForTesting() const;
private:
struct ActiveStreamComparator {
// Ordered by virtual finish time (primary), stream-id (secondary).
bool operator()(Stream* a, Stream* b) const {
VirtualTime a_vft = a->next_finish_time();
VirtualTime b_vft = b->next_finish_time();
if (a_vft == b_vft) {
return a->stream_id() < b->stream_id();
}
return a_vft < b_vft;
}
};
bool IsConsistent() const;
const absl::string_view log_prefix_;
const size_t max_payload_bytes_;
// The current virtual time, as defined in the WFQ algorithm.
VirtualTime virtual_time_ = VirtualTime::Zero();
// The current stream to send chunks from.
Stream* current_stream_ = nullptr;
bool enable_message_interleaving_ = false;
// Indicates if the streams is currently sending a message, and should then
// - if message interleaving is not enabled - continue sending from this
// stream until that message has been sent in full.
bool currently_sending_a_message_ = false;
// The currently active streams, ordered by virtual finish time.
webrtc::flat_set<Stream*, ActiveStreamComparator> active_streams_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_STREAM_SCHEDULER_H_