diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index a94c103cb3..c2fb12eac8 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -42,6 +42,30 @@ rtc_library("rr_send_queue") { ] } +rtc_library("stream_scheduler") { + deps = [ + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:logging", + "../../../rtc_base:strong_alias", + "../../../rtc_base/containers:flat_set", + "../common:str_join", + "../packet:data", + "../public:socket", + "../public:types", + ] + sources = [ + "stream_scheduler.cc", + "stream_scheduler.h", + ] + absl_deps = [ + "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/strings", + "//third_party/abseil-cpp/absl/types:optional", + ] +} + rtc_library("retransmission_error_counter") { deps = [ "../../../rtc_base:checks", @@ -145,6 +169,7 @@ if (rtc_include_tests) { ":retransmission_timeout", ":rr_send_queue", ":send_queue", + ":stream_scheduler", "../../../api:array_view", "../../../api/task_queue:task_queue", "../../../rtc_base:checks", @@ -168,6 +193,7 @@ if (rtc_include_tests) { "retransmission_queue_test.cc", "retransmission_timeout_test.cc", "rr_send_queue_test.cc", + "stream_scheduler_test.cc", ] } } diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc new file mode 100644 index 0000000000..22457bfa8e --- /dev/null +++ b/net/dcsctp/tx/stream_scheduler.cc @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/stream_scheduler.h" + +#include "absl/algorithm/container.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/str_join.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +void StreamScheduler::Stream::set_priority(StreamPriority priority) { + priority_ = priority; +} + +absl::optional StreamScheduler::Produce( + TimeMs now, + size_t max_size) { + bool rescheduling = !currently_sending_a_message_; + + RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling + << ", active=" + << StrJoin(active_streams_, ", ", + [&](rtc::StringBuilder& sb, const auto& p) { + sb << *p->stream_id() << "@" + << *p->next_finish_time(); + }); + + RTC_DCHECK(rescheduling || current_stream_ != nullptr); + + absl::optional data; + while (!data.has_value() && !active_streams_.empty()) { + if (rescheduling) { + auto it = active_streams_.begin(); + current_stream_ = *it; + RTC_DLOG(LS_VERBOSE) << "Rescheduling to stream " + << *current_stream_->stream_id(); + + active_streams_.erase(it); + current_stream_->ForceMarkInactive(); + } else { + RTC_DLOG(LS_VERBOSE) << "Producing from previous stream: " + << *current_stream_->stream_id(); + RTC_DCHECK(absl::c_any_of(active_streams_, [this](const auto* p) { + return p == current_stream_; + })); + } + + data = current_stream_->Produce(now, max_size); + } + + if (!data.has_value()) { + RTC_DLOG(LS_VERBOSE) + << "There is no stream with data; Can't produce any data."; + RTC_DCHECK(IsConsistent()); + + return absl::nullopt; + } + + RTC_DCHECK(data->data.stream_id == current_stream_->stream_id()); + + RTC_DLOG(LS_VERBOSE) << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning ? "first" + : *data->data.is_end ? "last" + : "middle") + << ", stream_id=" << *current_stream_->stream_id() + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + currently_sending_a_message_ = !*data->data.is_end; + virtual_time_ = current_stream_->current_time(); + + // One side-effect of rescheduling is that the new stream will not be present + // in `active_streams`. + size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message(); + if (rescheduling && bytes_to_send_next > 0) { + current_stream_->MakeActive(); + } else if (!rescheduling && bytes_to_send_next == 0) { + current_stream_->MakeInactive(); + } + + RTC_DCHECK(IsConsistent()); + return data; +} + +StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime() + const { + // Implement round-robin by letting the stream have its next virtual finish + // time in the future. It doesn't matter how far into the future, just any + // positive number so that any other stream that has the same virtual finish + // time as this stream gets to produce their data before revisiting this + // stream. + return VirtualTime(*current_virtual_time_ + 1); +} + +absl::optional StreamScheduler::Stream::Produce( + TimeMs now, + size_t max_size) { + absl::optional data = callback_.Produce(now, max_size); + + if (data.has_value()) { + VirtualTime new_current = GetNextFinishTime(); + RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_ + << " -> " << *new_current; + current_virtual_time_ = new_current; + } + + return data; +} + +bool StreamScheduler::IsConsistent() const { + for (Stream* stream : active_streams_) { + if (stream->next_finish_time_ == VirtualTime::Zero()) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream->stream_id() + << " is active, but has no next-finish-time"; + return false; + } + } + return true; +} + +void StreamScheduler::Stream::MaybeMakeActive() { + RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")"; + RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); + if (bytes_to_send_in_next_message() == 0) { + return; + } + + MakeActive(); +} + +void StreamScheduler::Stream::MakeActive() { + current_virtual_time_ = parent_.virtual_time_; + VirtualTime next_finish_time = GetNextFinishTime(); + RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() + << " active, expiring at " << *next_finish_time; + RTC_DCHECK_GT(*next_finish_time, 0); + RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); + next_finish_time_ = next_finish_time; + RTC_DCHECK(!absl::c_any_of(parent_.active_streams_, + [this](const auto* p) { return p == this; })); + parent_.active_streams_.emplace(this); +} + +void StreamScheduler::Stream::ForceMarkInactive() { + RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " inactive"; + RTC_DCHECK(next_finish_time_ != VirtualTime::Zero()); + next_finish_time_ = VirtualTime::Zero(); +} + +void StreamScheduler::Stream::MakeInactive() { + ForceMarkInactive(); + webrtc::EraseIf(parent_.active_streams_, + [&](const auto* s) { return s == this; }); +} + +} // namespace dcsctp diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h new file mode 100644 index 0000000000..9feeb61434 --- /dev/null +++ b/net/dcsctp/tx/stream_scheduler.h @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_ +#define NET_DCSCTP_TX_STREAM_SCHEDULER_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/containers/flat_set.h" +#include "rtc_base/strong_alias.h" + +namespace dcsctp { + +// A parameterized stream scheduler. Currently, it implements the round robin +// scheduling algorithm using virtual finish time. It is to be used as a part of +// a send queue and will track all active streams (streams that have any data +// that can be sent). +// +// The stream scheduler works with the concept of associating active streams +// with a "virtual finish time", which is the time when a stream is allowed to +// produce data. Streams are ordered by their virtual finish time, and the +// "current virtual time" will advance to the next following virtual finish time +// whenever a chunk is to be produced. In the initial round-robin scheduling +// algorithm, a stream's virtual finish time will just increment by one (1) +// after having produced a chunk, which results in a round-robin scheduling. +class StreamScheduler { + private: + class VirtualTime : public webrtc::StrongAlias { + public: + constexpr explicit VirtualTime(const UnderlyingType& v) + : webrtc::StrongAlias(v) {} + + static constexpr VirtualTime Zero() { return VirtualTime(0); } + }; + + public: + class StreamCallback { + public: + virtual ~StreamCallback() = default; + + // Produces a fragment of data to send. The current wall time is specified + // as `now` and should be used to skip chunks with expired limited lifetime. + // The parameter `max_size` specifies the maximum amount of actual payload + // that may be returned. If these constraints prevents the stream from + // sending some data, `absl::nullopt` should be returned. + virtual absl::optional Produce(TimeMs now, + size_t max_size) = 0; + + // Returns the number of payload bytes that is scheduled to be sent in the + // next enqueued message, or zero if there are no enqueued messages or if + // the stream has been actively paused. + virtual size_t bytes_to_send_in_next_message() const = 0; + }; + + class Stream { + public: + StreamID stream_id() const { return stream_id_; } + + StreamPriority priority() const { return priority_; } + void set_priority(StreamPriority priority); + + // Will activate the stream _if_ it has any data to send. That is, if the + // callback to `bytes_to_send_in_next_message` returns non-zero. If the + // callback returns zero, the stream will not be made active. + void MaybeMakeActive(); + + // Will remove the stream from the list of active streams, and will not try + // to produce data from it. To make it active again, call `MaybeMakeActive`. + void MakeInactive(); + + // Make the scheduler move to another message, or another stream. This is + // used to abort the scheduler from continuing producing fragments for the + // current message in case it's deleted. + void ForceReschedule() { parent_.ForceReschedule(); } + + private: + friend class StreamScheduler; + + Stream(StreamScheduler* parent, + StreamCallback* callback, + StreamID stream_id, + StreamPriority priority) + : parent_(*parent), + callback_(*callback), + stream_id_(stream_id), + priority_(priority) {} + + // Produces a message from this stream. This will only be called on streams + // that have data. + absl::optional Produce(TimeMs now, size_t max_size); + + void MakeActive(); + void ForceMarkInactive(); + + VirtualTime current_time() const { return current_virtual_time_; } + VirtualTime next_finish_time() const { return next_finish_time_; } + size_t bytes_to_send_in_next_message() const { + return callback_.bytes_to_send_in_next_message(); + } + + // Returns the next virtual finish time for this stream. + VirtualTime GetNextFinishTime() const; + + StreamScheduler& parent_; + StreamCallback& callback_; + const StreamID stream_id_; + StreamPriority priority_; + // This outgoing stream's "current" virtual_time. + VirtualTime current_virtual_time_ = VirtualTime::Zero(); + VirtualTime next_finish_time_ = VirtualTime::Zero(); + }; + + std::unique_ptr CreateStream(StreamCallback* callback, + StreamID stream_id, + StreamPriority priority) { + return absl::WrapUnique(new Stream(this, callback, stream_id, priority)); + } + + // Makes the scheduler stop producing message from the current stream and + // re-evaluates which stream to produce from. + void ForceReschedule() { currently_sending_a_message_ = false; } + + // Produces a fragment of data to send. The current wall time is specified as + // `now` and will be used to skip chunks with expired limited lifetime. The + // parameter `max_size` specifies the maximum amount of actual payload that + // may be returned. If no data can be produced, `absl::nullopt` is returned. + absl::optional Produce(TimeMs now, size_t max_size); + + rtc::ArrayView ActiveStreamsForTesting() const { + return rtc::MakeArrayView(&*active_streams_.cbegin(), + active_streams_.size()); + } + + private: + struct ActiveStreamComparator { + // Ordered by virtual finish time (primary), stream-id (secondary). + bool operator()(Stream* a, Stream* b) const { + VirtualTime a_vft = a->next_finish_time(); + VirtualTime b_vft = b->next_finish_time(); + if (a_vft == b_vft) { + return a->stream_id() < b->stream_id(); + } + return a_vft < b_vft; + } + }; + + bool IsConsistent() const; + + // The current virtual time, as defined in the WFQ algorithm. + VirtualTime virtual_time_ = VirtualTime::Zero(); + + // The current stream to send chunks from. + Stream* current_stream_ = nullptr; + + // Indicates if the streams is currently sending a message, and should then + // continue sending from this stream until that message has been sent in full. + bool currently_sending_a_message_ = false; + + // The currently active streams, ordered by virtual finish time. + webrtc::flat_set active_streams_; +}; + +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_STREAM_SCHEDULER_H_ diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc new file mode 100644 index 0000000000..cd15837c7e --- /dev/null +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -0,0 +1,377 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/stream_scheduler.h" + +#include + +#include "net/dcsctp/public/types.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::Return; +using ::testing::StrictMock; + +constexpr size_t kMtu = 1000; +constexpr size_t kPayloadSize = 4; + +MATCHER_P(HasDataWithMid, mid, "") { + if (!arg.has_value()) { + *result_listener << "There was no produced data"; + return false; + } + + if (arg->data.message_id != mid) { + *result_listener << "the produced data had mid " << *arg->data.message_id + << " and not the expected " << *mid; + return false; + } + + return true; +} + +std::function(TimeMs, size_t)> +CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) { + return [sid, mid, payload_size](TimeMs now, size_t max_size) { + return SendQueue::DataToSend(Data( + sid, SSN(0), mid, FSN(0), PPID(42), std::vector(payload_size), + Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true))); + }; +} + +std::map GetPacketCounts(StreamScheduler& scheduler, + size_t packets_to_generate) { + std::map packet_counts; + for (size_t i = 0; i < packets_to_generate; ++i) { + absl::optional data = + scheduler.Produce(TimeMs(0), kMtu); + if (data.has_value()) { + ++packet_counts[data->data.stream_id]; + } + } + return packet_counts; +} + +class MockStreamCallback : public StreamScheduler::StreamCallback { + public: + MOCK_METHOD(absl::optional, + Produce, + (TimeMs, size_t), + (override)); + MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override)); +}; + +class TestStream { + public: + TestStream(StreamScheduler& scheduler, + StreamID stream_id, + StreamPriority priority, + size_t packet_size = kPayloadSize) { + EXPECT_CALL(callback_, Produce) + .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size)); + EXPECT_CALL(callback_, bytes_to_send_in_next_message) + .WillRepeatedly(Return(packet_size)); + stream_ = scheduler.CreateStream(&callback_, stream_id, priority); + stream_->MaybeMakeActive(); + } + + StreamScheduler::Stream& stream() { return *stream_; } + + private: + StrictMock callback_; + std::unique_ptr stream_; +}; + +// A scheduler without active streams doesn't produce data. +TEST(StreamSchedulerTest, HasNoActiveStreams) { + StreamScheduler scheduler; + + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Stream properties can be set and retrieved +TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { + StreamScheduler scheduler; + + StrictMock callback; + auto stream = + scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); + + EXPECT_EQ(stream->stream_id(), StreamID(1)); + EXPECT_EQ(stream->priority(), StreamPriority(2)); + + stream->set_priority(StreamPriority(0)); + EXPECT_EQ(stream->priority(), StreamPriority(0)); +} + +// A scheduler with a single stream produced packets from it. +TEST(StreamSchedulerTest, CanProduceFromSingleStream) { + StreamScheduler scheduler; + + StrictMock callback; + EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); + EXPECT_CALL(callback, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(0)); + auto stream = + scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2)); + stream->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Switches between two streams after every packet. +TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Switches between two streams after every packet, but keeps producing from the +// same stream when a packet contains of multiple fragments. +TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector(4), Data::IsBeginning(true), + Data::IsEnd(false), IsUnordered(true))); + }) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector(4), Data::IsBeginning(false), + Data::IsEnd(false), IsUnordered(true))); + }) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector(4), Data::IsBeginning(false), + Data::IsEnd(true), IsUnordered(true))); + }) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Deactivates a stream before it has finished producing all packets. +TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming. + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + + // ... but the stream is made inactive before it can be produced. + stream1->MakeInactive(); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Resumes a paused stream - makes a stream active after inactivating it. +TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { + StreamScheduler scheduler; + + StrictMock callback1; + // Callbacks are setup so that they hint that there is a MID(2) coming... + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) // When making active again + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + + stream1->MakeInactive(); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + stream1->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Iterates between streams, where one is suddenly paused and later resumed. +TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { + StreamScheduler scheduler; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + stream1->MakeInactive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + stream1->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Verifies that packet counts are evenly distributed in round robin scheduling. +TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) { + StreamScheduler scheduler; + TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); + TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); + + std::map packet_counts = GetPacketCounts(scheduler, 10); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 5U); +} + +// Verifies that packet counts are evenly distributed among active streams, +// where a stream is suddenly made inactive, two are added, and then the paused +// stream is resumed. +TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) { + StreamScheduler scheduler; + TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); + TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); + + std::map packet_counts = GetPacketCounts(scheduler, 10); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 5U); + + stream2.stream().MakeInactive(); + + TestStream stream3(scheduler, StreamID(3), StreamPriority(1)); + TestStream stream4(scheduler, StreamID(4), StreamPriority(1)); + + std::map counts2 = GetPacketCounts(scheduler, 15); + EXPECT_EQ(counts2[StreamID(1)], 5U); + EXPECT_EQ(counts2[StreamID(2)], 0U); + EXPECT_EQ(counts2[StreamID(3)], 5U); + EXPECT_EQ(counts2[StreamID(4)], 5U); + + stream2.stream().MaybeMakeActive(); + + std::map counts3 = GetPacketCounts(scheduler, 20); + EXPECT_EQ(counts3[StreamID(1)], 5U); + EXPECT_EQ(counts3[StreamID(2)], 5U); + EXPECT_EQ(counts3[StreamID(3)], 5U); + EXPECT_EQ(counts3[StreamID(4)], 5U); +} + +} // namespace +} // namespace dcsctp