From d70186367c2936cc3fcdbfb55102e24087b920d3 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Wed, 11 May 2022 12:40:48 +0200 Subject: [PATCH] dcsctp: Add virtual time stream scheduler This adds a stream scheduler using virtual finish time (as defined in e.g. many Fair Queuing scheduler implementations), which indicates when a stream's next sent packet is supposed to be sent. In the initial version, this will be used to implement a round robin scheduler, by emulating that a stream's virtual finish time - when scheduled - is the "one more" than all existing virtual finish times. That will make the scheduler simply iterate between the streams in round robin order. The stream scheduler component is tested in isolation, and follow-up CLs will integrate it into the send queue. Bug: webrtc:5696 Change-Id: Iaa2c204f9b9a00517f55355cb11cfd25bb415f9e Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261946 Commit-Queue: Victor Boivie Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/main@{#37157} --- net/dcsctp/tx/BUILD.gn | 26 ++ net/dcsctp/tx/stream_scheduler.cc | 175 ++++++++++++ net/dcsctp/tx/stream_scheduler.h | 187 ++++++++++++ net/dcsctp/tx/stream_scheduler_test.cc | 377 +++++++++++++++++++++++++ 4 files changed, 765 insertions(+) create mode 100644 net/dcsctp/tx/stream_scheduler.cc create mode 100644 net/dcsctp/tx/stream_scheduler.h create mode 100644 net/dcsctp/tx/stream_scheduler_test.cc diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index a94c103cb3..c2fb12eac8 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -42,6 +42,30 @@ rtc_library("rr_send_queue") { ] } +rtc_library("stream_scheduler") { + deps = [ + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:logging", + "../../../rtc_base:strong_alias", + "../../../rtc_base/containers:flat_set", + "../common:str_join", + "../packet:data", + "../public:socket", + "../public:types", + ] + sources = [ + "stream_scheduler.cc", + "stream_scheduler.h", + ] + absl_deps = [ + "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", + ] +} + rtc_library("retransmission_error_counter") { deps = [ "../../../rtc_base:checks", @@ -145,6 +169,7 @@ if (rtc_include_tests) { ":retransmission_timeout", ":rr_send_queue", ":send_queue", + ":stream_scheduler", "../../../api:array_view", "../../../api/task_queue:task_queue", "../../../rtc_base:checks", @@ -168,6 +193,7 @@ if (rtc_include_tests) { "retransmission_queue_test.cc", "retransmission_timeout_test.cc", "rr_send_queue_test.cc", + "stream_scheduler_test.cc", ] } } diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc new file mode 100644 index 0000000000..22457bfa8e --- /dev/null +++ b/net/dcsctp/tx/stream_scheduler.cc @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/stream_scheduler.h" + +#include "absl/algorithm/container.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/str_join.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +void StreamScheduler::Stream::set_priority(StreamPriority priority) { + priority_ = priority; +} + +absl::optional StreamScheduler::Produce( + TimeMs now, + size_t max_size) { + bool rescheduling = !currently_sending_a_message_; + + RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling + << ", active=" + << StrJoin(active_streams_, ", ", + [&](rtc::StringBuilder& sb, const auto& p) { + sb << *p->stream_id() << "@" + << *p->next_finish_time(); + }); + + RTC_DCHECK(rescheduling || current_stream_ != nullptr); + + absl::optional data; + while (!data.has_value() && !active_streams_.empty()) { + if (rescheduling) { + auto it = active_streams_.begin(); + current_stream_ = *it; + RTC_DLOG(LS_VERBOSE) << "Rescheduling to stream " + << *current_stream_->stream_id(); + + active_streams_.erase(it); + current_stream_->ForceMarkInactive(); + } else { + RTC_DLOG(LS_VERBOSE) << "Producing from previous stream: " + << *current_stream_->stream_id(); + RTC_DCHECK(absl::c_any_of(active_streams_, [this](const auto* p) { + return p == current_stream_; + })); + } + + data = current_stream_->Produce(now, max_size); + } + + if (!data.has_value()) { + RTC_DLOG(LS_VERBOSE) + << "There is no stream with data; Can't produce any data."; + RTC_DCHECK(IsConsistent()); + + return absl::nullopt; + } + + RTC_DCHECK(data->data.stream_id == current_stream_->stream_id()); + + RTC_DLOG(LS_VERBOSE) << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning ? "first" + : *data->data.is_end ? "last" + : "middle") + << ", stream_id=" << *current_stream_->stream_id() + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + currently_sending_a_message_ = !*data->data.is_end; + virtual_time_ = current_stream_->current_time(); + + // One side-effect of rescheduling is that the new stream will not be present + // in `active_streams`. + size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message(); + if (rescheduling && bytes_to_send_next > 0) { + current_stream_->MakeActive(); + } else if (!rescheduling && bytes_to_send_next == 0) { + current_stream_->MakeInactive(); + } + + RTC_DCHECK(IsConsistent()); + return data; +} + +StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime() + const { + // Implement round-robin by letting the stream have its next virtual finish + // time in the future. It doesn't matter how far into the future, just any + // positive number so that any other stream that has the same virtual finish + // time as this stream gets to produce their data before revisiting this + // stream. + return VirtualTime(*current_virtual_time_ + 1); +} + +absl::optional StreamScheduler::Stream::Produce( + TimeMs now, + size_t max_size) { + absl::optional data = callback_.Produce(now, max_size); + + if (data.has_value()) { + VirtualTime new_current = GetNextFinishTime(); + RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_ + << " -> " << *new_current; + current_virtual_time_ = new_current; + } + + return data; +} + +bool StreamScheduler::IsConsistent() const { + for (Stream* stream : active_streams_) { + if (stream->next_finish_time_ == VirtualTime::Zero()) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream->stream_id() + << " is active, but has no next-finish-time"; + return false; + } + } + return true; +} + +void StreamScheduler::Stream::MaybeMakeActive() { + RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")"; + RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); + if (bytes_to_send_in_next_message() == 0) { + return; + } + + MakeActive(); +} + +void StreamScheduler::Stream::MakeActive() { + current_virtual_time_ = parent_.virtual_time_; + VirtualTime next_finish_time = GetNextFinishTime(); + RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() + << " active, expiring at " << *next_finish_time; + RTC_DCHECK_GT(*next_finish_time, 0); + RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); + next_finish_time_ = next_finish_time; + RTC_DCHECK(!absl::c_any_of(parent_.active_streams_, + [this](const auto* p) { return p == this; })); + parent_.active_streams_.emplace(this); +} + +void StreamScheduler::Stream::ForceMarkInactive() { + RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " inactive"; + RTC_DCHECK(next_finish_time_ != VirtualTime::Zero()); + next_finish_time_ = VirtualTime::Zero(); +} + +void StreamScheduler::Stream::MakeInactive() { + ForceMarkInactive(); + webrtc::EraseIf(parent_.active_streams_, + [&](const auto* s) { return s == this; }); +} + +} // namespace dcsctp diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h new file mode 100644 index 0000000000..9feeb61434 --- /dev/null +++ b/net/dcsctp/tx/stream_scheduler.h @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_ +#define NET_DCSCTP_TX_STREAM_SCHEDULER_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/containers/flat_set.h" +#include "rtc_base/strong_alias.h" + +namespace dcsctp { + +// A parameterized stream scheduler. Currently, it implements the round robin +// scheduling algorithm using virtual finish time. It is to be used as a part of +// a send queue and will track all active streams (streams that have any data +// that can be sent). +// +// The stream scheduler works with the concept of associating active streams +// with a "virtual finish time", which is the time when a stream is allowed to +// produce data. Streams are ordered by their virtual finish time, and the +// "current virtual time" will advance to the next following virtual finish time +// whenever a chunk is to be produced. In the initial round-robin scheduling +// algorithm, a stream's virtual finish time will just increment by one (1) +// after having produced a chunk, which results in a round-robin scheduling. +class StreamScheduler { + private: + class VirtualTime : public webrtc::StrongAlias { + public: + constexpr explicit VirtualTime(const UnderlyingType& v) + : webrtc::StrongAlias(v) {} + + static constexpr VirtualTime Zero() { return VirtualTime(0); } + }; + + public: + class StreamCallback { + public: + virtual ~StreamCallback() = default; + + // Produces a fragment of data to send. The current wall time is specified + // as `now` and should be used to skip chunks with expired limited lifetime. + // The parameter `max_size` specifies the maximum amount of actual payload + // that may be returned. If these constraints prevents the stream from + // sending some data, `absl::nullopt` should be returned. + virtual absl::optional Produce(TimeMs now, + size_t max_size) = 0; + + // Returns the number of payload bytes that is scheduled to be sent in the + // next enqueued message, or zero if there are no enqueued messages or if + // the stream has been actively paused. + virtual size_t bytes_to_send_in_next_message() const = 0; + }; + + class Stream { + public: + StreamID stream_id() const { return stream_id_; } + + StreamPriority priority() const { return priority_; } + void set_priority(StreamPriority priority); + + // Will activate the stream _if_ it has any data to send. That is, if the + // callback to `bytes_to_send_in_next_message` returns non-zero. If the + // callback returns zero, the stream will not be made active. + void MaybeMakeActive(); + + // Will remove the stream from the list of active streams, and will not try + // to produce data from it. To make it active again, call `MaybeMakeActive`. + void MakeInactive(); + + // Make the scheduler move to another message, or another stream. This is + // used to abort the scheduler from continuing producing fragments for the + // current message in case it's deleted. + void ForceReschedule() { parent_.ForceReschedule(); } + + private: + friend class StreamScheduler; + + Stream(StreamScheduler* parent, + StreamCallback* callback, + StreamID stream_id, + StreamPriority priority) + : parent_(*parent), + callback_(*callback), + stream_id_(stream_id), + priority_(priority) {} + + // Produces a message from this stream. This will only be called on streams + // that have data. + absl::optional Produce(TimeMs now, size_t max_size); + + void MakeActive(); + void ForceMarkInactive(); + + VirtualTime current_time() const { return current_virtual_time_; } + VirtualTime next_finish_time() const { return next_finish_time_; } + size_t bytes_to_send_in_next_message() const { + return callback_.bytes_to_send_in_next_message(); + } + + // Returns the next virtual finish time for this stream. + VirtualTime GetNextFinishTime() const; + + StreamScheduler& parent_; + StreamCallback& callback_; + const StreamID stream_id_; + StreamPriority priority_; + // This outgoing stream's "current" virtual_time. + VirtualTime current_virtual_time_ = VirtualTime::Zero(); + VirtualTime next_finish_time_ = VirtualTime::Zero(); + }; + + std::unique_ptr CreateStream(StreamCallback* callback, + StreamID stream_id, + StreamPriority priority) { + return absl::WrapUnique(new Stream(this, callback, stream_id, priority)); + } + + // Makes the scheduler stop producing message from the current stream and + // re-evaluates which stream to produce from. + void ForceReschedule() { currently_sending_a_message_ = false; } + + // Produces a fragment of data to send. The current wall time is specified as + // `now` and will be used to skip chunks with expired limited lifetime. The + // parameter `max_size` specifies the maximum amount of actual payload that + // may be returned. If no data can be produced, `absl::nullopt` is returned. + absl::optional Produce(TimeMs now, size_t max_size); + + rtc::ArrayView ActiveStreamsForTesting() const { + return rtc::MakeArrayView(&*active_streams_.cbegin(), + active_streams_.size()); + } + + private: + struct ActiveStreamComparator { + // Ordered by virtual finish time (primary), stream-id (secondary). + bool operator()(Stream* a, Stream* b) const { + VirtualTime a_vft = a->next_finish_time(); + VirtualTime b_vft = b->next_finish_time(); + if (a_vft == b_vft) { + return a->stream_id() < b->stream_id(); + } + return a_vft < b_vft; + } + }; + + bool IsConsistent() const; + + // The current virtual time, as defined in the WFQ algorithm. + VirtualTime virtual_time_ = VirtualTime::Zero(); + + // The current stream to send chunks from. + Stream* current_stream_ = nullptr; + + // Indicates if the streams is currently sending a message, and should then + // continue sending from this stream until that message has been sent in full. + bool currently_sending_a_message_ = false; + + // The currently active streams, ordered by virtual finish time. + webrtc::flat_set active_streams_; +}; + +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_STREAM_SCHEDULER_H_ diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc new file mode 100644 index 0000000000..cd15837c7e --- /dev/null +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -0,0 +1,377 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/stream_scheduler.h" + +#include + +#include "net/dcsctp/public/types.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::Return; +using ::testing::StrictMock; + +constexpr size_t kMtu = 1000; +constexpr size_t kPayloadSize = 4; + +MATCHER_P(HasDataWithMid, mid, "") { + if (!arg.has_value()) { + *result_listener << "There was no produced data"; + return false; + } + + if (arg->data.message_id != mid) { + *result_listener << "the produced data had mid " << *arg->data.message_id + << " and not the expected " << *mid; + return false; + } + + return true; +} + +std::function(TimeMs, size_t)> +CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) { + return [sid, mid, payload_size](TimeMs now, size_t max_size) { + return SendQueue::DataToSend(Data( + sid, SSN(0), mid, FSN(0), PPID(42), std::vector(payload_size), + Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true))); + }; +} + +std::map GetPacketCounts(StreamScheduler& scheduler, + size_t packets_to_generate) { + std::map packet_counts; + for (size_t i = 0; i < packets_to_generate; ++i) { + absl::optional data = + scheduler.Produce(TimeMs(0), kMtu); + if (data.has_value()) { + ++packet_counts[data->data.stream_id]; + } + } + return packet_counts; +} + +class MockStreamCallback : public StreamScheduler::StreamCallback { + public: + MOCK_METHOD(absl::optional, + Produce, + (TimeMs, size_t), + (override)); + MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override)); +}; + +class TestStream { + public: + TestStream(StreamScheduler& scheduler, + StreamID stream_id, + StreamPriority priority, + size_t packet_size = kPayloadSize) { + EXPECT_CALL(callback_, Produce) + .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size)); + EXPECT_CALL(callback_, bytes_to_send_in_next_message) + .WillRepeatedly(Return(packet_size)); + stream_ = scheduler.CreateStream(&callback_, stream_id, priority); + stream_->MaybeMakeActive(); + } + + StreamScheduler::Stream& stream() { return *stream_; } + + private: + StrictMock callback_; + std::unique_ptr stream_; +}; + +// A scheduler without active streams doesn't produce data. +TEST(StreamSchedulerTest, HasNoActiveStreams) { + StreamScheduler scheduler; + + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Stream properties can be set and retrieved +TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { + StreamScheduler scheduler; + + StrictMock callback; + auto stream = + scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); + + EXPECT_EQ(stream->stream_id(), StreamID(1)); + EXPECT_EQ(stream->priority(), StreamPriority(2)); + + stream->set_priority(StreamPriority(0)); + EXPECT_EQ(stream->priority(), StreamPriority(0)); +} + +// A scheduler with a single stream produced packets from it. +TEST(StreamSchedulerTest, CanProduceFromSingleStream) { + StreamScheduler scheduler; + + StrictMock callback; + EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); + EXPECT_CALL(callback, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(0)); + auto stream = + scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); + stream->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Switches between two streams after every packet. +TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Switches between two streams after every packet, but keeps producing from the +// same stream when a packet contains of multiple fragments. +TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector(4), Data::IsBeginning(true), + Data::IsEnd(false), IsUnordered(true))); + }) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector(4), Data::IsBeginning(false), + Data::IsEnd(false), IsUnordered(true))); + }) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector(4), Data::IsBeginning(false), + Data::IsEnd(true), IsUnordered(true))); + }) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Deactivates a stream before it has finished producing all packets. +TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming. + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + + // ... but the stream is made inactive before it can be produced. + stream1->MakeInactive(); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Resumes a paused stream - makes a stream active after inactivating it. +TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { + StreamScheduler scheduler; + + StrictMock callback1; + // Callbacks are setup so that they hint that there is a MID(2) coming... + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) // When making active again + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + + stream1->MakeInactive(); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + stream1->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Iterates between streams, where one is suddenly paused and later resumed. +TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + stream1->MakeInactive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + stream1->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Verifies that packet counts are evenly distributed in round robin scheduling. +TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) { + StreamScheduler scheduler; + TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); + TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); + + std::map packet_counts = GetPacketCounts(scheduler, 10); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 5U); +} + +// Verifies that packet counts are evenly distributed among active streams, +// where a stream is suddenly made inactive, two are added, and then the paused +// stream is resumed. +TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) { + StreamScheduler scheduler; + TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); + TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); + + std::map packet_counts = GetPacketCounts(scheduler, 10); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 5U); + + stream2.stream().MakeInactive(); + + TestStream stream3(scheduler, StreamID(3), StreamPriority(1)); + TestStream stream4(scheduler, StreamID(4), StreamPriority(1)); + + std::map counts2 = GetPacketCounts(scheduler, 15); + EXPECT_EQ(counts2[StreamID(1)], 5U); + EXPECT_EQ(counts2[StreamID(2)], 0U); + EXPECT_EQ(counts2[StreamID(3)], 5U); + EXPECT_EQ(counts2[StreamID(4)], 5U); + + stream2.stream().MaybeMakeActive(); + + std::map counts3 = GetPacketCounts(scheduler, 20); + EXPECT_EQ(counts3[StreamID(1)], 5U); + EXPECT_EQ(counts3[StreamID(2)], 5U); + EXPECT_EQ(counts3[StreamID(3)], 5U); + EXPECT_EQ(counts3[StreamID(4)], 5U); +} + +} // namespace +} // namespace dcsctp