From 2440d34075cd9ddc087a7e2e64a32ae295165996 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 20 May 2021 13:47:32 +0200 Subject: [PATCH] dcsctp: Rename FCFSSendQueue to RRSendQueue The current send queue implements SCTP_SS_FCFS as defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.1, but that has always been known to be a temporary solution. The end goal is to implement a Weighted Fair Queueing Scheduler (SCTP_SS_WFQ), but that's likely to take some time. Meanwhile, a round robin scheduler (SCTP_SS_RR) will be used to avoid some issues with the current scheduler, such as a single data channel completely blocking all others if it sends a lot of messages. In this first commit, the code has simply been renamed and is still implementing first-come-first-served. That will be fixed in follow-up CLS. Bug: webrtc:12793 Change-Id: Idc03b1594551bfe1ddbe1710872814b9fdf60cc9 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219684 Commit-Queue: Victor Boivie Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/master@{#34090} --- net/dcsctp/socket/BUILD.gn | 2 +- net/dcsctp/socket/dcsctp_socket.h | 4 +- net/dcsctp/tx/BUILD.gn | 10 ++--- .../{fcfs_send_queue.cc => rr_send_queue.cc} | 41 +++++++++--------- .../tx/{fcfs_send_queue.h => rr_send_queue.h} | 29 +++++++------ ...nd_queue_test.cc => rr_send_queue_test.cc} | 42 +++++++++---------- 6 files changed, 63 insertions(+), 65 deletions(-) rename net/dcsctp/tx/{fcfs_send_queue.cc => rr_send_queue.cc} (88%) rename net/dcsctp/tx/{fcfs_send_queue.h => rr_send_queue.h} (81%) rename net/dcsctp/tx/{fcfs_send_queue_test.cc => rr_send_queue_test.cc} (90%) diff --git a/net/dcsctp/socket/BUILD.gn b/net/dcsctp/socket/BUILD.gn index 2fb05abdc9..58abd7ac31 100644 --- a/net/dcsctp/socket/BUILD.gn +++ b/net/dcsctp/socket/BUILD.gn @@ -133,10 +133,10 @@ rtc_library("dcsctp_socket") { "../rx:data_tracker", "../rx:reassembly_queue", "../timer", - "../tx:fcfs_send_queue", "../tx:retransmission_error_counter", "../tx:retransmission_queue", "../tx:retransmission_timeout", + "../tx:rr_send_queue", "../tx:send_queue", ] sources = [ diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h index 24c0437b41..96f00d1893 100644 --- a/net/dcsctp/socket/dcsctp_socket.h +++ b/net/dcsctp/socket/dcsctp_socket.h @@ -49,10 +49,10 @@ #include "net/dcsctp/socket/state_cookie.h" #include "net/dcsctp/socket/transmission_control_block.h" #include "net/dcsctp/timer/timer.h" -#include "net/dcsctp/tx/fcfs_send_queue.h" #include "net/dcsctp/tx/retransmission_error_counter.h" #include "net/dcsctp/tx/retransmission_queue.h" #include "net/dcsctp/tx/retransmission_timeout.h" +#include "net/dcsctp/tx/rr_send_queue.h" namespace dcsctp { @@ -257,7 +257,7 @@ class DcSctpSocket : public DcSctpSocketInterface { // The actual SendQueue implementation. As data can be sent on a socket before // the connection is established, this component is not in the TCB. - FCFSSendQueue send_queue_; + RRSendQueue send_queue_; // Only valid when state == State::kCookieEchoed // A cached Cookie Echo Chunk, to be re-sent on timer expiry. diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index 924a194f85..641c8a6519 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -20,7 +20,7 @@ rtc_source_set("send_queue") { absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } -rtc_library("fcfs_send_queue") { +rtc_library("rr_send_queue") { deps = [ ":send_queue", "../../../api:array_view", @@ -32,8 +32,8 @@ rtc_library("fcfs_send_queue") { "../public:types", ] sources = [ - "fcfs_send_queue.cc", - "fcfs_send_queue.h", + "rr_send_queue.cc", + "rr_send_queue.h", ] absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", @@ -111,11 +111,11 @@ if (rtc_include_tests) { testonly = true deps = [ - ":fcfs_send_queue", ":mock_send_queue", ":retransmission_error_counter", ":retransmission_queue", ":retransmission_timeout", + ":rr_send_queue", ":send_queue", "../../../api:array_view", "../../../rtc_base:checks", @@ -131,10 +131,10 @@ if (rtc_include_tests) { ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] sources = [ - "fcfs_send_queue_test.cc", "retransmission_error_counter_test.cc", "retransmission_queue_test.cc", "retransmission_timeout_test.cc", + "rr_send_queue_test.cc", ] } } diff --git a/net/dcsctp/tx/fcfs_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc similarity index 88% rename from net/dcsctp/tx/fcfs_send_queue.cc rename to net/dcsctp/tx/rr_send_queue.cc index f2dc5e40f8..f2d22c8576 100644 --- a/net/dcsctp/tx/fcfs_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -7,7 +7,7 @@ * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ -#include "net/dcsctp/tx/fcfs_send_queue.h" +#include "net/dcsctp/tx/rr_send_queue.h" #include #include @@ -26,9 +26,9 @@ #include "rtc_base/logging.h" namespace dcsctp { -void FCFSSendQueue::Add(TimeMs now, - DcSctpMessage message, - const SendOptions& send_options) { +void RRSendQueue::Add(TimeMs now, + DcSctpMessage message, + const SendOptions& send_options) { RTC_DCHECK(!message.payload().empty()); std::deque& queue = IsPaused(message.stream_id()) ? paused_items_ : items_; @@ -44,7 +44,7 @@ void FCFSSendQueue::Add(TimeMs now, queue.emplace_back(std::move(message), expires_at, send_options); } -size_t FCFSSendQueue::total_bytes() const { +size_t RRSendQueue::total_bytes() const { // TODO(boivie): Have the current size as a member variable, so that's it not // calculated for every operation. return absl::c_accumulate(items_, 0, @@ -57,17 +57,17 @@ size_t FCFSSendQueue::total_bytes() const { }); } -bool FCFSSendQueue::IsFull() const { +bool RRSendQueue::IsFull() const { return total_bytes() >= buffer_size_; } -bool FCFSSendQueue::IsEmpty() const { +bool RRSendQueue::IsEmpty() const { return items_.empty(); } -FCFSSendQueue::Item* FCFSSendQueue::GetFirstNonExpiredMessage(TimeMs now) { +RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) { while (!items_.empty()) { - FCFSSendQueue::Item& item = items_.front(); + RRSendQueue::Item& item = items_.front(); // An entire item can be discarded iff: // 1) It hasn't been partially sent (has been allocated a message_id). // 2) It has a non-negative expiry time. @@ -87,8 +87,8 @@ FCFSSendQueue::Item* FCFSSendQueue::GetFirstNonExpiredMessage(TimeMs now) { return nullptr; } -absl::optional FCFSSendQueue::Produce(TimeMs now, - size_t max_size) { +absl::optional RRSendQueue::Produce(TimeMs now, + size_t max_size) { Item* item = GetFirstNonExpiredMessage(now); if (item == nullptr) { return absl::nullopt; @@ -163,9 +163,9 @@ absl::optional FCFSSendQueue::Produce(TimeMs now, return chunk; } -void FCFSSendQueue::Discard(IsUnordered unordered, - StreamID stream_id, - MID message_id) { +void RRSendQueue::Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) { // As this method will only discard partially sent messages, and as the queue // is a FIFO queue, the only partially sent message would be the topmost // message. @@ -179,8 +179,7 @@ void FCFSSendQueue::Discard(IsUnordered unordered, } } -void FCFSSendQueue::PrepareResetStreams( - rtc::ArrayView streams) { +void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { for (StreamID stream_id : streams) { paused_streams_.insert(stream_id); } @@ -197,7 +196,7 @@ void FCFSSendQueue::PrepareResetStreams( } } -bool FCFSSendQueue::CanResetStreams() const { +bool RRSendQueue::CanResetStreams() const { for (auto& item : items_) { if (IsPaused(item.message.stream_id())) { return false; @@ -206,7 +205,7 @@ bool FCFSSendQueue::CanResetStreams() const { return true; } -void FCFSSendQueue::CommitResetStreams() { +void RRSendQueue::CommitResetStreams() { for (StreamID stream_id : paused_streams_) { ssn_by_stream_id_[stream_id] = SSN(0); // https://tools.ietf.org/html/rfc8260#section-2.3.2 @@ -219,7 +218,7 @@ void FCFSSendQueue::CommitResetStreams() { RollbackResetStreams(); } -void FCFSSendQueue::RollbackResetStreams() { +void RRSendQueue::RollbackResetStreams() { while (!paused_items_.empty()) { items_.push_back(std::move(paused_items_.front())); paused_items_.pop_front(); @@ -227,7 +226,7 @@ void FCFSSendQueue::RollbackResetStreams() { paused_streams_.clear(); } -void FCFSSendQueue::Reset() { +void RRSendQueue::Reset() { if (!items_.empty()) { // If this message has been partially sent, reset it so that it will be // re-sent. @@ -243,7 +242,7 @@ void FCFSSendQueue::Reset() { ssn_by_stream_id_.clear(); } -bool FCFSSendQueue::IsPaused(StreamID stream_id) const { +bool RRSendQueue::IsPaused(StreamID stream_id) const { return paused_streams_.find(stream_id) != paused_streams_.end(); } diff --git a/net/dcsctp/tx/fcfs_send_queue.h b/net/dcsctp/tx/rr_send_queue.h similarity index 81% rename from net/dcsctp/tx/fcfs_send_queue.h rename to net/dcsctp/tx/rr_send_queue.h index 63e7eab49a..c43dc91881 100644 --- a/net/dcsctp/tx/fcfs_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -7,8 +7,8 @@ * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ -#ifndef NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_ -#define NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_ +#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_ +#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_ #include #include @@ -29,24 +29,23 @@ namespace dcsctp { -// The FCFSSendQueue (First-Come, First-Served Send Queue) holds all messages -// that the client wants to send, but that haven't yet been split into chunks -// and sent on the wire. +// The Round Robin SendQueue holds all messages that the client wants to send, +// but that haven't yet been split into chunks and fully sent on the wire. // -// First-Come, First Served means that it passes the data in the exact same -// order as they were delivered by the calling application, and is defined in -// https://tools.ietf.org/html/rfc8260#section-3.1. It's a FIFO queue, but that -// term isn't used in this RFC. +// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2, +// it will cycle to send messages from different streams. It will send all +// fragments from one message before continuing with a different message on +// possibly a different stream, until support for message interleaving has been +// implemented. // -// As messages can be (requested to be) sent before -// the connection is properly established, this send queue is always present - -// even for closed connections. -class FCFSSendQueue : public SendQueue { +// As messages can be (requested to be) sent before the connection is properly +// established, this send queue is always present - even for closed connections. +class RRSendQueue : public SendQueue { public: // How small a data chunk's payload may be, if having to fragment a message. static constexpr size_t kMinimumFragmentedPayload = 10; - FCFSSendQueue(absl::string_view log_prefix, size_t buffer_size) + RRSendQueue(absl::string_view log_prefix, size_t buffer_size) : log_prefix_(std::string(log_prefix) + "fcfs: "), buffer_size_(buffer_size) {} @@ -120,4 +119,4 @@ class FCFSSendQueue : public SendQueue { }; } // namespace dcsctp -#endif // NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_ +#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_ diff --git a/net/dcsctp/tx/fcfs_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc similarity index 90% rename from net/dcsctp/tx/fcfs_send_queue_test.cc rename to net/dcsctp/tx/rr_send_queue_test.cc index a67a0a1a9c..0f6fd2bd05 100644 --- a/net/dcsctp/tx/fcfs_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -7,7 +7,7 @@ * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ -#include "net/dcsctp/tx/fcfs_send_queue.h" +#include "net/dcsctp/tx/rr_send_queue.h" #include #include @@ -29,21 +29,21 @@ constexpr TimeMs kNow = TimeMs(0); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); -class FCFSSendQueueTest : public testing::Test { +class RRSendQueueTest : public testing::Test { protected: - FCFSSendQueueTest() : buf_("log: ", 100) {} + RRSendQueueTest() : buf_("log: ", 100) {} const DcSctpOptions options_; - FCFSSendQueue buf_; + RRSendQueue buf_; }; -TEST_F(FCFSSendQueueTest, EmptyBuffer) { +TEST_F(RRSendQueueTest, EmptyBuffer) { EXPECT_TRUE(buf_.IsEmpty()); EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); EXPECT_FALSE(buf_.IsFull()); } -TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) { +TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); EXPECT_FALSE(buf_.IsEmpty()); @@ -54,7 +54,7 @@ TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) { EXPECT_TRUE(chunk_opt->data.is_end); } -TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) { +TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) { std::vector payload(60); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); @@ -79,7 +79,7 @@ TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) { EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); } -TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) { +TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { std::vector payload(60); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); @@ -99,7 +99,7 @@ TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) { EXPECT_TRUE(chunk_two->data.is_end); } -TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) { +TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { std::vector payload(60); EXPECT_FALSE(buf_.IsFull()); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); @@ -136,20 +136,20 @@ TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) { EXPECT_TRUE(buf_.IsEmpty()); } -TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) { - std::vector payload(FCFSSendQueue::kMinimumFragmentedPayload + 1); +TEST_F(RRSendQueueTest, WillNotSendTooSmallPacket) { + std::vector payload(RRSendQueue::kMinimumFragmentedPayload + 1); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); // Wouldn't fit enough payload (wouldn't want to fragment) EXPECT_FALSE( buf_.Produce(kNow, - /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload - 1) + /*max_size=*/RRSendQueue::kMinimumFragmentedPayload - 1) .has_value()); // Minimum fragment absl::optional chunk_one = buf_.Produce(kNow, - /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload); + /*max_size=*/RRSendQueue::kMinimumFragmentedPayload); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); @@ -165,7 +165,7 @@ TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) { EXPECT_TRUE(buf_.IsEmpty()); } -TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) { +TEST_F(RRSendQueueTest, DefaultsToOrderedSend) { std::vector payload(20); // Default is ordered @@ -185,7 +185,7 @@ TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) { EXPECT_TRUE(chunk_two->data.is_unordered); } -TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) { +TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { std::vector payload(20); // Default is no expiry @@ -225,7 +225,7 @@ TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) { ASSERT_FALSE(buf_.Produce(now, 100)); } -TEST_F(FCFSSendQueueTest, DiscardPartialPackets) { +TEST_F(RRSendQueueTest, DiscardPartialPackets) { std::vector payload(120); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); @@ -255,7 +255,7 @@ TEST_F(FCFSSendQueueTest, DiscardPartialPackets) { ASSERT_FALSE(buf_.Produce(kNow, 100)); } -TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) { +TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3})); buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); EXPECT_EQ(buf_.total_bytes(), 8u); @@ -267,7 +267,7 @@ TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) { EXPECT_EQ(buf_.total_bytes(), 0u); } -TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) { +TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { std::vector payload(120); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); @@ -283,7 +283,7 @@ TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) { EXPECT_EQ(buf_.total_bytes(), payload.size() - 50); } -TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { +TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { std::vector payload(50); buf_.PrepareResetStreams(std::vector({StreamID(1)})); @@ -302,7 +302,7 @@ TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { EXPECT_EQ(buf_.total_bytes(), 0u); } -TEST_F(FCFSSendQueueTest, CommittingResetsSSN) { +TEST_F(RRSendQueueTest, CommittingResetsSSN) { std::vector payload(50); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); @@ -330,7 +330,7 @@ TEST_F(FCFSSendQueueTest, CommittingResetsSSN) { EXPECT_EQ(chunk_three->data.ssn, SSN(0)); } -TEST_F(FCFSSendQueueTest, RollBackResumesSSN) { +TEST_F(RRSendQueueTest, RollBackResumesSSN) { std::vector payload(50); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));