dcsctp: Rename FCFSSendQueue to RRSendQueue

The current send queue implements SCTP_SS_FCFS as defined in
https://datatracker.ietf.org/doc/html/rfc8260#section-3.1, but that has
always been known to be a temporary solution. The end goal is to
implement a Weighted Fair Queueing Scheduler (SCTP_SS_WFQ), but that's
likely to take some time.

Meanwhile, a round robin scheduler (SCTP_SS_RR) will be used to avoid
some issues with the current scheduler, such as a single data channel
completely blocking all others if it sends a lot of messages.

In this first commit, the code has simply been renamed and is still
implementing first-come-first-served. That will be fixed in follow-up
CLS.

Bug: webrtc:12793
Change-Id: Idc03b1594551bfe1ddbe1710872814b9fdf60cc9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219684
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34090}
This commit is contained in:
Victor Boivie 2021-05-20 13:47:32 +02:00 committed by WebRTC LUCI CQ
parent 913c3af879
commit 2440d34075
6 changed files with 63 additions and 65 deletions

View File

@ -133,10 +133,10 @@ rtc_library("dcsctp_socket") {
"../rx:data_tracker",
"../rx:reassembly_queue",
"../timer",
"../tx:fcfs_send_queue",
"../tx:retransmission_error_counter",
"../tx:retransmission_queue",
"../tx:retransmission_timeout",
"../tx:rr_send_queue",
"../tx:send_queue",
]
sources = [

View File

@ -49,10 +49,10 @@
#include "net/dcsctp/socket/state_cookie.h"
#include "net/dcsctp/socket/transmission_control_block.h"
#include "net/dcsctp/timer/timer.h"
#include "net/dcsctp/tx/fcfs_send_queue.h"
#include "net/dcsctp/tx/retransmission_error_counter.h"
#include "net/dcsctp/tx/retransmission_queue.h"
#include "net/dcsctp/tx/retransmission_timeout.h"
#include "net/dcsctp/tx/rr_send_queue.h"
namespace dcsctp {
@ -257,7 +257,7 @@ class DcSctpSocket : public DcSctpSocketInterface {
// The actual SendQueue implementation. As data can be sent on a socket before
// the connection is established, this component is not in the TCB.
FCFSSendQueue send_queue_;
RRSendQueue send_queue_;
// Only valid when state == State::kCookieEchoed
// A cached Cookie Echo Chunk, to be re-sent on timer expiry.

View File

@ -20,7 +20,7 @@ rtc_source_set("send_queue") {
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("fcfs_send_queue") {
rtc_library("rr_send_queue") {
deps = [
":send_queue",
"../../../api:array_view",
@ -32,8 +32,8 @@ rtc_library("fcfs_send_queue") {
"../public:types",
]
sources = [
"fcfs_send_queue.cc",
"fcfs_send_queue.h",
"rr_send_queue.cc",
"rr_send_queue.h",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
@ -111,11 +111,11 @@ if (rtc_include_tests) {
testonly = true
deps = [
":fcfs_send_queue",
":mock_send_queue",
":retransmission_error_counter",
":retransmission_queue",
":retransmission_timeout",
":rr_send_queue",
":send_queue",
"../../../api:array_view",
"../../../rtc_base:checks",
@ -131,10 +131,10 @@ if (rtc_include_tests) {
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
sources = [
"fcfs_send_queue_test.cc",
"retransmission_error_counter_test.cc",
"retransmission_queue_test.cc",
"retransmission_timeout_test.cc",
"rr_send_queue_test.cc",
]
}
}

View File

@ -7,7 +7,7 @@
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/fcfs_send_queue.h"
#include "net/dcsctp/tx/rr_send_queue.h"
#include <cstdint>
#include <deque>
@ -26,9 +26,9 @@
#include "rtc_base/logging.h"
namespace dcsctp {
void FCFSSendQueue::Add(TimeMs now,
DcSctpMessage message,
const SendOptions& send_options) {
void RRSendQueue::Add(TimeMs now,
DcSctpMessage message,
const SendOptions& send_options) {
RTC_DCHECK(!message.payload().empty());
std::deque<Item>& queue =
IsPaused(message.stream_id()) ? paused_items_ : items_;
@ -44,7 +44,7 @@ void FCFSSendQueue::Add(TimeMs now,
queue.emplace_back(std::move(message), expires_at, send_options);
}
size_t FCFSSendQueue::total_bytes() const {
size_t RRSendQueue::total_bytes() const {
// TODO(boivie): Have the current size as a member variable, so that's it not
// calculated for every operation.
return absl::c_accumulate(items_, 0,
@ -57,17 +57,17 @@ size_t FCFSSendQueue::total_bytes() const {
});
}
bool FCFSSendQueue::IsFull() const {
bool RRSendQueue::IsFull() const {
return total_bytes() >= buffer_size_;
}
bool FCFSSendQueue::IsEmpty() const {
bool RRSendQueue::IsEmpty() const {
return items_.empty();
}
FCFSSendQueue::Item* FCFSSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
while (!items_.empty()) {
FCFSSendQueue::Item& item = items_.front();
RRSendQueue::Item& item = items_.front();
// An entire item can be discarded iff:
// 1) It hasn't been partially sent (has been allocated a message_id).
// 2) It has a non-negative expiry time.
@ -87,8 +87,8 @@ FCFSSendQueue::Item* FCFSSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
return nullptr;
}
absl::optional<SendQueue::DataToSend> FCFSSendQueue::Produce(TimeMs now,
size_t max_size) {
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
size_t max_size) {
Item* item = GetFirstNonExpiredMessage(now);
if (item == nullptr) {
return absl::nullopt;
@ -163,9 +163,9 @@ absl::optional<SendQueue::DataToSend> FCFSSendQueue::Produce(TimeMs now,
return chunk;
}
void FCFSSendQueue::Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) {
void RRSendQueue::Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) {
// As this method will only discard partially sent messages, and as the queue
// is a FIFO queue, the only partially sent message would be the topmost
// message.
@ -179,8 +179,7 @@ void FCFSSendQueue::Discard(IsUnordered unordered,
}
}
void FCFSSendQueue::PrepareResetStreams(
rtc::ArrayView<const StreamID> streams) {
void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
for (StreamID stream_id : streams) {
paused_streams_.insert(stream_id);
}
@ -197,7 +196,7 @@ void FCFSSendQueue::PrepareResetStreams(
}
}
bool FCFSSendQueue::CanResetStreams() const {
bool RRSendQueue::CanResetStreams() const {
for (auto& item : items_) {
if (IsPaused(item.message.stream_id())) {
return false;
@ -206,7 +205,7 @@ bool FCFSSendQueue::CanResetStreams() const {
return true;
}
void FCFSSendQueue::CommitResetStreams() {
void RRSendQueue::CommitResetStreams() {
for (StreamID stream_id : paused_streams_) {
ssn_by_stream_id_[stream_id] = SSN(0);
// https://tools.ietf.org/html/rfc8260#section-2.3.2
@ -219,7 +218,7 @@ void FCFSSendQueue::CommitResetStreams() {
RollbackResetStreams();
}
void FCFSSendQueue::RollbackResetStreams() {
void RRSendQueue::RollbackResetStreams() {
while (!paused_items_.empty()) {
items_.push_back(std::move(paused_items_.front()));
paused_items_.pop_front();
@ -227,7 +226,7 @@ void FCFSSendQueue::RollbackResetStreams() {
paused_streams_.clear();
}
void FCFSSendQueue::Reset() {
void RRSendQueue::Reset() {
if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be
// re-sent.
@ -243,7 +242,7 @@ void FCFSSendQueue::Reset() {
ssn_by_stream_id_.clear();
}
bool FCFSSendQueue::IsPaused(StreamID stream_id) const {
bool RRSendQueue::IsPaused(StreamID stream_id) const {
return paused_streams_.find(stream_id) != paused_streams_.end();
}

View File

@ -7,8 +7,8 @@
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
#define NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_
#include <cstdint>
#include <deque>
@ -29,24 +29,23 @@
namespace dcsctp {
// The FCFSSendQueue (First-Come, First-Served Send Queue) holds all messages
// that the client wants to send, but that haven't yet been split into chunks
// and sent on the wire.
// The Round Robin SendQueue holds all messages that the client wants to send,
// but that haven't yet been split into chunks and fully sent on the wire.
//
// First-Come, First Served means that it passes the data in the exact same
// order as they were delivered by the calling application, and is defined in
// https://tools.ietf.org/html/rfc8260#section-3.1. It's a FIFO queue, but that
// term isn't used in this RFC.
// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
// it will cycle to send messages from different streams. It will send all
// fragments from one message before continuing with a different message on
// possibly a different stream, until support for message interleaving has been
// implemented.
//
// As messages can be (requested to be) sent before
// the connection is properly established, this send queue is always present -
// even for closed connections.
class FCFSSendQueue : public SendQueue {
// As messages can be (requested to be) sent before the connection is properly
// established, this send queue is always present - even for closed connections.
class RRSendQueue : public SendQueue {
public:
// How small a data chunk's payload may be, if having to fragment a message.
static constexpr size_t kMinimumFragmentedPayload = 10;
FCFSSendQueue(absl::string_view log_prefix, size_t buffer_size)
RRSendQueue(absl::string_view log_prefix, size_t buffer_size)
: log_prefix_(std::string(log_prefix) + "fcfs: "),
buffer_size_(buffer_size) {}
@ -120,4 +119,4 @@ class FCFSSendQueue : public SendQueue {
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_

View File

@ -7,7 +7,7 @@
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/fcfs_send_queue.h"
#include "net/dcsctp/tx/rr_send_queue.h"
#include <cstdint>
#include <type_traits>
@ -29,21 +29,21 @@ constexpr TimeMs kNow = TimeMs(0);
constexpr StreamID kStreamID(1);
constexpr PPID kPPID(53);
class FCFSSendQueueTest : public testing::Test {
class RRSendQueueTest : public testing::Test {
protected:
FCFSSendQueueTest() : buf_("log: ", 100) {}
RRSendQueueTest() : buf_("log: ", 100) {}
const DcSctpOptions options_;
FCFSSendQueue buf_;
RRSendQueue buf_;
};
TEST_F(FCFSSendQueueTest, EmptyBuffer) {
TEST_F(RRSendQueueTest, EmptyBuffer) {
EXPECT_TRUE(buf_.IsEmpty());
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
EXPECT_FALSE(buf_.IsFull());
}
TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) {
TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
EXPECT_FALSE(buf_.IsEmpty());
@ -54,7 +54,7 @@ TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) {
EXPECT_TRUE(chunk_opt->data.is_end);
}
TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) {
TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) {
std::vector<uint8_t> payload(60);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@ -79,7 +79,7 @@ TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) {
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
}
TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) {
TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
std::vector<uint8_t> payload(60);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
@ -99,7 +99,7 @@ TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) {
EXPECT_TRUE(chunk_two->data.is_end);
}
TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) {
TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
std::vector<uint8_t> payload(60);
EXPECT_FALSE(buf_.IsFull());
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@ -136,20 +136,20 @@ TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) {
EXPECT_TRUE(buf_.IsEmpty());
}
TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) {
std::vector<uint8_t> payload(FCFSSendQueue::kMinimumFragmentedPayload + 1);
TEST_F(RRSendQueueTest, WillNotSendTooSmallPacket) {
std::vector<uint8_t> payload(RRSendQueue::kMinimumFragmentedPayload + 1);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
// Wouldn't fit enough payload (wouldn't want to fragment)
EXPECT_FALSE(
buf_.Produce(kNow,
/*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload - 1)
/*max_size=*/RRSendQueue::kMinimumFragmentedPayload - 1)
.has_value());
// Minimum fragment
absl::optional<SendQueue::DataToSend> chunk_one =
buf_.Produce(kNow,
/*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload);
/*max_size=*/RRSendQueue::kMinimumFragmentedPayload);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
EXPECT_EQ(chunk_one->data.ppid, kPPID);
@ -165,7 +165,7 @@ TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) {
EXPECT_TRUE(buf_.IsEmpty());
}
TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) {
TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
std::vector<uint8_t> payload(20);
// Default is ordered
@ -185,7 +185,7 @@ TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) {
EXPECT_TRUE(chunk_two->data.is_unordered);
}
TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) {
TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
std::vector<uint8_t> payload(20);
// Default is no expiry
@ -225,7 +225,7 @@ TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) {
ASSERT_FALSE(buf_.Produce(now, 100));
}
TEST_F(FCFSSendQueueTest, DiscardPartialPackets) {
TEST_F(RRSendQueueTest, DiscardPartialPackets) {
std::vector<uint8_t> payload(120);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@ -255,7 +255,7 @@ TEST_F(FCFSSendQueueTest, DiscardPartialPackets) {
ASSERT_FALSE(buf_.Produce(kNow, 100));
}
TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) {
TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
EXPECT_EQ(buf_.total_bytes(), 8u);
@ -267,7 +267,7 @@ TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) {
EXPECT_EQ(buf_.total_bytes(), 0u);
}
TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) {
TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) {
std::vector<uint8_t> payload(120);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@ -283,7 +283,7 @@ TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) {
EXPECT_EQ(buf_.total_bytes(), payload.size() - 50);
}
TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
std::vector<uint8_t> payload(50);
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
@ -302,7 +302,7 @@ TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
EXPECT_EQ(buf_.total_bytes(), 0u);
}
TEST_F(FCFSSendQueueTest, CommittingResetsSSN) {
TEST_F(RRSendQueueTest, CommittingResetsSSN) {
std::vector<uint8_t> payload(50);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@ -330,7 +330,7 @@ TEST_F(FCFSSendQueueTest, CommittingResetsSSN) {
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
}
TEST_F(FCFSSendQueueTest, RollBackResumesSSN) {
TEST_F(RRSendQueueTest, RollBackResumesSSN) {
std::vector<uint8_t> payload(50);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));