dcsctp: Add virtual time stream scheduler

This adds a stream scheduler using virtual finish time (as defined in
e.g. many Fair Queuing scheduler implementations), which indicates when
a stream's next sent packet is supposed to be sent.

In the initial version, this will be used to implement a round robin
scheduler, by emulating that a stream's virtual finish time - when
scheduled - is the "one more" than all existing virtual finish times.
That will make the scheduler simply iterate between the streams in
round robin order.

The stream scheduler component is tested in isolation, and follow-up
CLs will integrate it into the send queue.

Bug: webrtc:5696
Change-Id: Iaa2c204f9b9a00517f55355cb11cfd25bb415f9e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261946
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37157}
This commit is contained in:
Victor Boivie 2022-05-11 12:40:48 +02:00 committed by WebRTC LUCI CQ
parent da0ea97236
commit d70186367c
4 changed files with 765 additions and 0 deletions

View File

@ -42,6 +42,30 @@ rtc_library("rr_send_queue") {
]
}
rtc_library("stream_scheduler") {
deps = [
":send_queue",
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
"../../../rtc_base:strong_alias",
"../../../rtc_base/containers:flat_set",
"../common:str_join",
"../packet:data",
"../public:socket",
"../public:types",
]
sources = [
"stream_scheduler.cc",
"stream_scheduler.h",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
}
rtc_library("retransmission_error_counter") {
deps = [
"../../../rtc_base:checks",
@ -145,6 +169,7 @@ if (rtc_include_tests) {
":retransmission_timeout",
":rr_send_queue",
":send_queue",
":stream_scheduler",
"../../../api:array_view",
"../../../api/task_queue:task_queue",
"../../../rtc_base:checks",
@ -168,6 +193,7 @@ if (rtc_include_tests) {
"retransmission_queue_test.cc",
"retransmission_timeout_test.cc",
"rr_send_queue_test.cc",
"stream_scheduler_test.cc",
]
}
}

View File

@ -0,0 +1,175 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/stream_scheduler.h"
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
namespace dcsctp {
void StreamScheduler::Stream::set_priority(StreamPriority priority) {
priority_ = priority;
}
absl::optional<SendQueue::DataToSend> StreamScheduler::Produce(
TimeMs now,
size_t max_size) {
bool rescheduling = !currently_sending_a_message_;
RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling
<< ", active="
<< StrJoin(active_streams_, ", ",
[&](rtc::StringBuilder& sb, const auto& p) {
sb << *p->stream_id() << "@"
<< *p->next_finish_time();
});
RTC_DCHECK(rescheduling || current_stream_ != nullptr);
absl::optional<SendQueue::DataToSend> data;
while (!data.has_value() && !active_streams_.empty()) {
if (rescheduling) {
auto it = active_streams_.begin();
current_stream_ = *it;
RTC_DLOG(LS_VERBOSE) << "Rescheduling to stream "
<< *current_stream_->stream_id();
active_streams_.erase(it);
current_stream_->ForceMarkInactive();
} else {
RTC_DLOG(LS_VERBOSE) << "Producing from previous stream: "
<< *current_stream_->stream_id();
RTC_DCHECK(absl::c_any_of(active_streams_, [this](const auto* p) {
return p == current_stream_;
}));
}
data = current_stream_->Produce(now, max_size);
}
if (!data.has_value()) {
RTC_DLOG(LS_VERBOSE)
<< "There is no stream with data; Can't produce any data.";
RTC_DCHECK(IsConsistent());
return absl::nullopt;
}
RTC_DCHECK(data->data.stream_id == current_stream_->stream_id());
RTC_DLOG(LS_VERBOSE) << "Producing DATA, type="
<< (data->data.is_unordered ? "unordered" : "ordered")
<< "::"
<< (*data->data.is_beginning && *data->data.is_end
? "complete"
: *data->data.is_beginning ? "first"
: *data->data.is_end ? "last"
: "middle")
<< ", stream_id=" << *current_stream_->stream_id()
<< ", ppid=" << *data->data.ppid
<< ", length=" << data->data.payload.size();
currently_sending_a_message_ = !*data->data.is_end;
virtual_time_ = current_stream_->current_time();
// One side-effect of rescheduling is that the new stream will not be present
// in `active_streams`.
size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message();
if (rescheduling && bytes_to_send_next > 0) {
current_stream_->MakeActive();
} else if (!rescheduling && bytes_to_send_next == 0) {
current_stream_->MakeInactive();
}
RTC_DCHECK(IsConsistent());
return data;
}
StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime()
const {
// Implement round-robin by letting the stream have its next virtual finish
// time in the future. It doesn't matter how far into the future, just any
// positive number so that any other stream that has the same virtual finish
// time as this stream gets to produce their data before revisiting this
// stream.
return VirtualTime(*current_virtual_time_ + 1);
}
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
TimeMs now,
size_t max_size) {
absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size);
if (data.has_value()) {
VirtualTime new_current = GetNextFinishTime();
RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_
<< " -> " << *new_current;
current_virtual_time_ = new_current;
}
return data;
}
bool StreamScheduler::IsConsistent() const {
for (Stream* stream : active_streams_) {
if (stream->next_finish_time_ == VirtualTime::Zero()) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream->stream_id()
<< " is active, but has no next-finish-time";
return false;
}
}
return true;
}
void StreamScheduler::Stream::MaybeMakeActive() {
RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")";
RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
if (bytes_to_send_in_next_message() == 0) {
return;
}
MakeActive();
}
void StreamScheduler::Stream::MakeActive() {
current_virtual_time_ = parent_.virtual_time_;
VirtualTime next_finish_time = GetNextFinishTime();
RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id()
<< " active, expiring at " << *next_finish_time;
RTC_DCHECK_GT(*next_finish_time, 0);
RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
next_finish_time_ = next_finish_time;
RTC_DCHECK(!absl::c_any_of(parent_.active_streams_,
[this](const auto* p) { return p == this; }));
parent_.active_streams_.emplace(this);
}
void StreamScheduler::Stream::ForceMarkInactive() {
RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " inactive";
RTC_DCHECK(next_finish_time_ != VirtualTime::Zero());
next_finish_time_ = VirtualTime::Zero();
}
void StreamScheduler::Stream::MakeInactive() {
ForceMarkInactive();
webrtc::EraseIf(parent_.active_streams_,
[&](const auto* s) { return s == this; });
}
} // namespace dcsctp

View File

@ -0,0 +1,187 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_
#define NET_DCSCTP_TX_STREAM_SCHEDULER_H_
#include <cstdint>
#include <deque>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <string>
#include <utility>
#include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/containers/flat_set.h"
#include "rtc_base/strong_alias.h"
namespace dcsctp {
// A parameterized stream scheduler. Currently, it implements the round robin
// scheduling algorithm using virtual finish time. It is to be used as a part of
// a send queue and will track all active streams (streams that have any data
// that can be sent).
//
// The stream scheduler works with the concept of associating active streams
// with a "virtual finish time", which is the time when a stream is allowed to
// produce data. Streams are ordered by their virtual finish time, and the
// "current virtual time" will advance to the next following virtual finish time
// whenever a chunk is to be produced. In the initial round-robin scheduling
// algorithm, a stream's virtual finish time will just increment by one (1)
// after having produced a chunk, which results in a round-robin scheduling.
class StreamScheduler {
private:
class VirtualTime : public webrtc::StrongAlias<class VirtualTimeTag, double> {
public:
constexpr explicit VirtualTime(const UnderlyingType& v)
: webrtc::StrongAlias<class VirtualTimeTag, double>(v) {}
static constexpr VirtualTime Zero() { return VirtualTime(0); }
};
public:
class StreamCallback {
public:
virtual ~StreamCallback() = default;
// Produces a fragment of data to send. The current wall time is specified
// as `now` and should be used to skip chunks with expired limited lifetime.
// The parameter `max_size` specifies the maximum amount of actual payload
// that may be returned. If these constraints prevents the stream from
// sending some data, `absl::nullopt` should be returned.
virtual absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
size_t max_size) = 0;
// Returns the number of payload bytes that is scheduled to be sent in the
// next enqueued message, or zero if there are no enqueued messages or if
// the stream has been actively paused.
virtual size_t bytes_to_send_in_next_message() const = 0;
};
class Stream {
public:
StreamID stream_id() const { return stream_id_; }
StreamPriority priority() const { return priority_; }
void set_priority(StreamPriority priority);
// Will activate the stream _if_ it has any data to send. That is, if the
// callback to `bytes_to_send_in_next_message` returns non-zero. If the
// callback returns zero, the stream will not be made active.
void MaybeMakeActive();
// Will remove the stream from the list of active streams, and will not try
// to produce data from it. To make it active again, call `MaybeMakeActive`.
void MakeInactive();
// Make the scheduler move to another message, or another stream. This is
// used to abort the scheduler from continuing producing fragments for the
// current message in case it's deleted.
void ForceReschedule() { parent_.ForceReschedule(); }
private:
friend class StreamScheduler;
Stream(StreamScheduler* parent,
StreamCallback* callback,
StreamID stream_id,
StreamPriority priority)
: parent_(*parent),
callback_(*callback),
stream_id_(stream_id),
priority_(priority) {}
// Produces a message from this stream. This will only be called on streams
// that have data.
absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
void MakeActive();
void ForceMarkInactive();
VirtualTime current_time() const { return current_virtual_time_; }
VirtualTime next_finish_time() const { return next_finish_time_; }
size_t bytes_to_send_in_next_message() const {
return callback_.bytes_to_send_in_next_message();
}
// Returns the next virtual finish time for this stream.
VirtualTime GetNextFinishTime() const;
StreamScheduler& parent_;
StreamCallback& callback_;
const StreamID stream_id_;
StreamPriority priority_;
// This outgoing stream's "current" virtual_time.
VirtualTime current_virtual_time_ = VirtualTime::Zero();
VirtualTime next_finish_time_ = VirtualTime::Zero();
};
std::unique_ptr<Stream> CreateStream(StreamCallback* callback,
StreamID stream_id,
StreamPriority priority) {
return absl::WrapUnique(new Stream(this, callback, stream_id, priority));
}
// Makes the scheduler stop producing message from the current stream and
// re-evaluates which stream to produce from.
void ForceReschedule() { currently_sending_a_message_ = false; }
// Produces a fragment of data to send. The current wall time is specified as
// `now` and will be used to skip chunks with expired limited lifetime. The
// parameter `max_size` specifies the maximum amount of actual payload that
// may be returned. If no data can be produced, `absl::nullopt` is returned.
absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
rtc::ArrayView<Stream* const> ActiveStreamsForTesting() const {
return rtc::MakeArrayView(&*active_streams_.cbegin(),
active_streams_.size());
}
private:
struct ActiveStreamComparator {
// Ordered by virtual finish time (primary), stream-id (secondary).
bool operator()(Stream* a, Stream* b) const {
VirtualTime a_vft = a->next_finish_time();
VirtualTime b_vft = b->next_finish_time();
if (a_vft == b_vft) {
return a->stream_id() < b->stream_id();
}
return a_vft < b_vft;
}
};
bool IsConsistent() const;
// The current virtual time, as defined in the WFQ algorithm.
VirtualTime virtual_time_ = VirtualTime::Zero();
// The current stream to send chunks from.
Stream* current_stream_ = nullptr;
// Indicates if the streams is currently sending a message, and should then
// continue sending from this stream until that message has been sent in full.
bool currently_sending_a_message_ = false;
// The currently active streams, ordered by virtual finish time.
webrtc::flat_set<Stream*, ActiveStreamComparator> active_streams_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_STREAM_SCHEDULER_H_

View File

@ -0,0 +1,377 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/stream_scheduler.h"
#include <vector>
#include "net/dcsctp/public/types.h"
#include "test/gmock.h"
namespace dcsctp {
namespace {
using ::testing::Return;
using ::testing::StrictMock;
constexpr size_t kMtu = 1000;
constexpr size_t kPayloadSize = 4;
MATCHER_P(HasDataWithMid, mid, "") {
if (!arg.has_value()) {
*result_listener << "There was no produced data";
return false;
}
if (arg->data.message_id != mid) {
*result_listener << "the produced data had mid " << *arg->data.message_id
<< " and not the expected " << *mid;
return false;
}
return true;
}
std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) {
return [sid, mid, payload_size](TimeMs now, size_t max_size) {
return SendQueue::DataToSend(Data(
sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size),
Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true)));
};
}
std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
size_t packets_to_generate) {
std::map<StreamID, size_t> packet_counts;
for (size_t i = 0; i < packets_to_generate; ++i) {
absl::optional<SendQueue::DataToSend> data =
scheduler.Produce(TimeMs(0), kMtu);
if (data.has_value()) {
++packet_counts[data->data.stream_id];
}
}
return packet_counts;
}
class MockStreamCallback : public StreamScheduler::StreamCallback {
public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
(TimeMs, size_t),
(override));
MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
};
class TestStream {
public:
TestStream(StreamScheduler& scheduler,
StreamID stream_id,
StreamPriority priority,
size_t packet_size = kPayloadSize) {
EXPECT_CALL(callback_, Produce)
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
EXPECT_CALL(callback_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size));
stream_ = scheduler.CreateStream(&callback_, stream_id, priority);
stream_->MaybeMakeActive();
}
StreamScheduler::Stream& stream() { return *stream_; }
private:
StrictMock<MockStreamCallback> callback_;
std::unique_ptr<StreamScheduler::Stream> stream_;
};
// A scheduler without active streams doesn't produce data.
TEST(StreamSchedulerTest, HasNoActiveStreams) {
StreamScheduler scheduler;
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Stream properties can be set and retrieved
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback;
auto stream =
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
EXPECT_EQ(stream->stream_id(), StreamID(1));
EXPECT_EQ(stream->priority(), StreamPriority(2));
stream->set_priority(StreamPriority(0));
EXPECT_EQ(stream->priority(), StreamPriority(0));
}
// A scheduler with a single stream produced packets from it.
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback;
EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
EXPECT_CALL(callback, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0));
auto stream =
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Switches between two streams after every packet.
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2;
EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Switches between two streams after every packet, but keeps producing from the
// same stream when a packet contains of multiple fragments.
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce([](...) {
return SendQueue::DataToSend(
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(true),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(true), IsUnordered(true)));
})
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2;
EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Deactivates a stream before it has finished producing all packets.
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
// ... but the stream is made inactive before it can be produced.
stream1->MakeInactive();
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Resumes a paused stream - makes a stream active after inactivating it.
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
// Callbacks are setup so that they hint that there is a MID(2) coming...
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active again
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
stream1->MakeInactive();
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Iterates between streams, where one is suddenly paused and later resumed.
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StreamScheduler scheduler;
StrictMock<MockStreamCallback> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamCallback> callback2;
EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
stream1->MakeInactive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Verifies that packet counts are evenly distributed in round robin scheduling.
TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
StreamScheduler scheduler;
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
}
// Verifies that packet counts are evenly distributed among active streams,
// where a stream is suddenly made inactive, two are added, and then the paused
// stream is resumed.
TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
StreamScheduler scheduler;
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
stream2.stream().MakeInactive();
TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
EXPECT_EQ(counts2[StreamID(1)], 5U);
EXPECT_EQ(counts2[StreamID(2)], 0U);
EXPECT_EQ(counts2[StreamID(3)], 5U);
EXPECT_EQ(counts2[StreamID(4)], 5U);
stream2.stream().MaybeMakeActive();
std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
EXPECT_EQ(counts3[StreamID(1)], 5U);
EXPECT_EQ(counts3[StreamID(2)], 5U);
EXPECT_EQ(counts3[StreamID(3)], 5U);
EXPECT_EQ(counts3[StreamID(4)], 5U);
}
} // namespace
} // namespace dcsctp