diff --git a/net/dcsctp/BUILD.gn b/net/dcsctp/BUILD.gn index 9f7f541695..af7082b999 100644 --- a/net/dcsctp/BUILD.gn +++ b/net/dcsctp/BUILD.gn @@ -18,6 +18,7 @@ if (rtc_include_tests) { "public:dcsctp_public_unittests", "rx:dcsctp_rx_unittests", "timer:dcsctp_timer_unittests", + "tx:dcsctp_tx_unittests", ] } } diff --git a/net/dcsctp/common/internal_types.h b/net/dcsctp/common/internal_types.h index 4551fd17d3..b99e3779dd 100644 --- a/net/dcsctp/common/internal_types.h +++ b/net/dcsctp/common/internal_types.h @@ -13,6 +13,7 @@ #include #include "net/dcsctp/public/strong_alias.h" +#include "net/dcsctp/public/types.h" namespace dcsctp { @@ -34,5 +35,13 @@ using ReconfigRequestSN = StrongAlias; // Verification Tag, used for packet validation. using VerificationTag = StrongAlias; +// Hasher for separated ordered/unordered stream identifiers. +struct UnorderedStreamHash { + size_t operator()(const std::pair& p) const { + return std::hash{}(*p.first) ^ + (std::hash{}(*p.second) << 1); + } +}; + } // namespace dcsctp #endif // NET_DCSCTP_COMMON_INTERNAL_TYPES_H_ diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn new file mode 100644 index 0000000000..0f06e477ed --- /dev/null +++ b/net/dcsctp/tx/BUILD.gn @@ -0,0 +1,53 @@ +# Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +import("../../../webrtc.gni") + +rtc_source_set("send_queue") { + deps = [ + "../common:internal_types", + "../packet:chunk", + "../public:types", + ] + sources = [ "send_queue.h" ] +} + +rtc_library("fcfs_send_queue") { + deps = [ + ":send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + ] + sources = [ + "fcfs_send_queue.cc", + "fcfs_send_queue.h", + ] +} + +if (rtc_include_tests) { + rtc_source_set("mock_send_queue") { + testonly = true + deps = [ ":send_queue" ] + sources = [ "mock_send_queue.h" ] + } + + rtc_library("dcsctp_tx_unittests") { + testonly = true + + deps = [ + ":fcfs_send_queue", + "../../../api:array_view", + "../../../rtc_base:checks", + "../../../rtc_base:gunit_helpers", + "../../../rtc_base:rtc_base_approved", + "../../../test:test_support", + ] + sources = [ "fcfs_send_queue_test.cc" ] + } +} diff --git a/net/dcsctp/tx/fcfs_send_queue.cc b/net/dcsctp/tx/fcfs_send_queue.cc new file mode 100644 index 0000000000..eae90e09f9 --- /dev/null +++ b/net/dcsctp/tx/fcfs_send_queue.cc @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/fcfs_send_queue.h" + +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/logging.h" + +namespace dcsctp { +void FCFSSendQueue::Add(TimeMs now, + DcSctpMessage message, + const SendOptions& send_options) { + RTC_DCHECK(!message.payload().empty()); + std::deque& queue = + IsPaused(message.stream_id()) ? paused_items_ : items_; + // Any limited lifetime should start counting from now - when the message + // has been added to the queue. + absl::optional expires_at = absl::nullopt; + if (send_options.lifetime.has_value()) { + expires_at = now + *send_options.lifetime; + } + queue.emplace_back(std::move(message), expires_at, send_options); +} + +size_t FCFSSendQueue::total_bytes() const { + // TODO(boivie): Have the current size as a member variable, so that's it not + // calculated for every operation. + return absl::c_accumulate(items_, 0, + [](size_t size, const Item& item) { + return size + item.remaining_size; + }) + + absl::c_accumulate(paused_items_, 0, + [](size_t size, const Item& item) { + return size + item.remaining_size; + }); +} + +bool FCFSSendQueue::IsFull() const { + return total_bytes() >= buffer_size_; +} + +bool FCFSSendQueue::IsEmpty() const { + return items_.empty(); +} + +FCFSSendQueue::Item* FCFSSendQueue::GetFirstNonExpiredMessage(TimeMs now) { + while (!items_.empty()) { + FCFSSendQueue::Item& item = items_.front(); + // An entire item can be discarded iff: + // 1) It hasn't been partially sent (has been allocated a message_id). + // 2) It has a non-negative expiry time. + // 3) And that expiry time has passed. + if (!item.message_id.has_value() && item.expires_at.has_value() && + *item.expires_at <= now) { + // TODO(boivie): This should be reported to the client. + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "Message is expired before even partially sent - discarding"; + items_.pop_front(); + continue; + } + + return &item; + } + return nullptr; +} + +absl::optional FCFSSendQueue::Produce(TimeMs now, + size_t max_size) { + Item* item = GetFirstNonExpiredMessage(now); + if (item == nullptr) { + return absl::nullopt; + } + + DcSctpMessage& message = item->message; + + // Don't make too small fragments as that can result in increased risk of + // failure to assemble a message if a small fragment is missing. + if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Will not fragment " + << item->remaining_size << " bytes into buffer of " + << max_size << " bytes"; + return absl::nullopt; + } + + // Allocate Message ID and SSN when the first fragment is sent. + if (!item->message_id.has_value()) { + MID& mid = + mid_by_stream_id_[{item->send_options.unordered, message.stream_id()}]; + item->message_id = mid; + mid = MID(*mid + 1); + } + if (!item->send_options.unordered && !item->ssn.has_value()) { + SSN& ssn = ssn_by_stream_id_[message.stream_id()]; + item->ssn = ssn; + ssn = SSN(*ssn + 1); + } + + // Grab the next `max_size` fragment from this message and calculate flags. + rtc::ArrayView chunk_payload = + item->message.payload().subview(item->remaining_offset, max_size); + rtc::ArrayView message_payload = message.payload(); + Data::IsBeginning is_beginning(chunk_payload.data() == + message_payload.data()); + Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) == + (message_payload.data() + message_payload.size())); + + StreamID stream_id = message.stream_id(); + PPID ppid = message.ppid(); + + // Zero-copy the payload if the message fits in a single chunk. + std::vector payload = + is_beginning && is_end + ? std::move(message).ReleasePayload() + : std::vector(chunk_payload.begin(), chunk_payload.end()); + + FSN fsn(item->current_fsn); + item->current_fsn = FSN(*item->current_fsn + 1); + + SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)), + item->message_id.value(), fsn, ppid, + std::move(payload), is_beginning, is_end, + item->send_options.unordered)); + chunk.max_retransmissions = item->send_options.max_retransmissions; + chunk.expires_at = item->expires_at; + + if (is_end) { + // The entire message has been sent, and its last data copied to `chunk`, so + // it can safely be discarded. + items_.pop_front(); + } else { + item->remaining_offset += chunk_payload.size(); + item->remaining_size -= chunk_payload.size(); + RTC_DCHECK(item->remaining_offset + item->remaining_size == + item->message.payload().size()); + RTC_DCHECK(item->remaining_size > 0); + } + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of " + << chunk.data.size() << " bytes (max: " << max_size + << ")"; + return chunk; +} + +void FCFSSendQueue::Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) { + // As this method will only discard partially sent messages, and as the queue + // is a FIFO queue, the only partially sent message would be the topmost + // message. + if (!items_.empty()) { + Item& item = items_.front(); + if (item.send_options.unordered == unordered && + item.message.stream_id() == stream_id && item.message_id.has_value() && + *item.message_id == message_id) { + items_.pop_front(); + } + } +} + +void FCFSSendQueue::PrepareResetStreams( + rtc::ArrayView streams) { + for (StreamID stream_id : streams) { + paused_streams_.insert(stream_id); + } + + // Will not discard partially sent messages - only whole messages. Partially + // delivered messages (at the time of receiving a Stream Reset command) will + // always deliver all the fragments before actually resetting the stream. + for (auto it = items_.begin(); it != items_.end();) { + if (IsPaused(it->message.stream_id()) && it->remaining_offset == 0) { + it = items_.erase(it); + } else { + ++it; + } + } +} + +bool FCFSSendQueue::CanResetStreams() const { + for (auto& item : items_) { + if (IsPaused(item.message.stream_id())) { + return false; + } + } + return true; +} + +void FCFSSendQueue::CommitResetStreams() { + for (StreamID stream_id : paused_streams_) { + ssn_by_stream_id_[stream_id] = SSN(0); + // https://tools.ietf.org/html/rfc8260#section-2.3.2 + // "When an association resets the SSN using the SCTP extension defined + // in [RFC6525], the two counters (one for the ordered messages, one for + // the unordered messages) used for the MIDs MUST be reset to 0." + mid_by_stream_id_[{IsUnordered(false), stream_id}] = MID(0); + mid_by_stream_id_[{IsUnordered(true), stream_id}] = MID(0); + } + RollbackResetStreams(); +} + +void FCFSSendQueue::RollbackResetStreams() { + while (!paused_items_.empty()) { + items_.push_back(std::move(paused_items_.front())); + paused_items_.pop_front(); + } + paused_streams_.clear(); +} + +void FCFSSendQueue::Reset() { + if (!items_.empty()) { + // If this message has been partially sent, reset it so that it will be + // re-sent. + auto& item = items_.front(); + item.remaining_offset = 0; + item.remaining_size = item.message.payload().size(); + item.message_id = absl::nullopt; + item.ssn = absl::nullopt; + item.current_fsn = FSN(0); + } + RollbackResetStreams(); + mid_by_stream_id_.clear(); + ssn_by_stream_id_.clear(); +} + +bool FCFSSendQueue::IsPaused(StreamID stream_id) const { + return paused_streams_.find(stream_id) != paused_streams_.end(); +} + +} // namespace dcsctp diff --git a/net/dcsctp/tx/fcfs_send_queue.h b/net/dcsctp/tx/fcfs_send_queue.h new file mode 100644 index 0000000000..63e7eab49a --- /dev/null +++ b/net/dcsctp/tx/fcfs_send_queue.h @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_ +#define NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_ + +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/pair_hash.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" + +namespace dcsctp { + +// The FCFSSendQueue (First-Come, First-Served Send Queue) holds all messages +// that the client wants to send, but that haven't yet been split into chunks +// and sent on the wire. +// +// First-Come, First Served means that it passes the data in the exact same +// order as they were delivered by the calling application, and is defined in +// https://tools.ietf.org/html/rfc8260#section-3.1. It's a FIFO queue, but that +// term isn't used in this RFC. +// +// As messages can be (requested to be) sent before +// the connection is properly established, this send queue is always present - +// even for closed connections. +class FCFSSendQueue : public SendQueue { + public: + // How small a data chunk's payload may be, if having to fragment a message. + static constexpr size_t kMinimumFragmentedPayload = 10; + + FCFSSendQueue(absl::string_view log_prefix, size_t buffer_size) + : log_prefix_(std::string(log_prefix) + "fcfs: "), + buffer_size_(buffer_size) {} + + // Indicates if the buffer is full. Note that it's up to the caller to ensure + // that the buffer is not full prior to adding new items to it. + bool IsFull() const; + // Indicates if the buffer is empty. + bool IsEmpty() const; + + // Adds the message to be sent using the `send_options` provided. The current + // time should be in `now`. Note that it's the responsibility of the caller to + // ensure that the buffer is not full (by calling `IsFull`) before adding + // messages to it. + void Add(TimeMs now, + DcSctpMessage message, + const SendOptions& send_options = {}); + + // Implementation of `SendQueue`. + absl::optional Produce(TimeMs now, size_t max_size) override; + void Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) override; + void PrepareResetStreams(rtc::ArrayView streams) override; + bool CanResetStreams() const override; + void CommitResetStreams() override; + void RollbackResetStreams() override; + void Reset() override; + + // The size of the buffer, in "payload bytes". + size_t total_bytes() const; + + private: + // An enqueued message and metadata. + struct Item { + explicit Item(DcSctpMessage msg, + absl::optional expires_at, + const SendOptions& send_options) + : message(std::move(msg)), + expires_at(expires_at), + send_options(send_options), + remaining_offset(0), + remaining_size(message.payload().size()) {} + DcSctpMessage message; + absl::optional expires_at; + SendOptions send_options; + // The remaining payload (offset and size) to be sent, when it has been + // fragmented. + size_t remaining_offset; + size_t remaining_size; + // If set, an allocated Message ID and SSN. Will be allocated when the first + // fragment is sent. + absl::optional message_id = absl::nullopt; + absl::optional ssn = absl::nullopt; + // The current Fragment Sequence Number, incremented for each fragment. + FSN current_fsn = FSN(0); + }; + + Item* GetFirstNonExpiredMessage(TimeMs now); + bool IsPaused(StreamID stream_id) const; + + const std::string log_prefix_; + const size_t buffer_size_; + std::deque items_; + + std::unordered_set paused_streams_; + std::deque paused_items_; + + std::unordered_map, MID, UnorderedStreamHash> + mid_by_stream_id_; + std::unordered_map ssn_by_stream_id_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_ diff --git a/net/dcsctp/tx/fcfs_send_queue_test.cc b/net/dcsctp/tx/fcfs_send_queue_test.cc new file mode 100644 index 0000000000..ec28b41b25 --- /dev/null +++ b/net/dcsctp/tx/fcfs_send_queue_test.cc @@ -0,0 +1,361 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/fcfs_send_queue.h" + +#include +#include +#include + +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { + +constexpr TimeMs kNow = TimeMs(0); +constexpr StreamID kStreamID(1); +constexpr PPID kPPID(53); + +class FCFSSendQueueTest : public testing::Test { + protected: + FCFSSendQueueTest() : buf_("log: ", 100) {} + + const DcSctpOptions options_; + FCFSSendQueue buf_; +}; + +TEST_F(FCFSSendQueueTest, EmptyBuffer) { + EXPECT_TRUE(buf_.IsEmpty()); + EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); + EXPECT_FALSE(buf_.IsFull()); +} + +TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) { + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); + + EXPECT_FALSE(buf_.IsEmpty()); + EXPECT_FALSE(buf_.IsFull()); + absl::optional chunk_opt = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_opt.has_value()); + EXPECT_TRUE(chunk_opt->data.is_beginning); + EXPECT_TRUE(chunk_opt->data.is_end); +} + +TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) { + std::vector payload(60); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional chunk_beg = + buf_.Produce(kNow, /*max_size=*/20); + ASSERT_TRUE(chunk_beg.has_value()); + EXPECT_TRUE(chunk_beg->data.is_beginning); + EXPECT_FALSE(chunk_beg->data.is_end); + + absl::optional chunk_mid = + buf_.Produce(kNow, /*max_size=*/20); + ASSERT_TRUE(chunk_mid.has_value()); + EXPECT_FALSE(chunk_mid->data.is_beginning); + EXPECT_FALSE(chunk_mid->data.is_end); + + absl::optional chunk_end = + buf_.Produce(kNow, /*max_size=*/20); + ASSERT_TRUE(chunk_end.has_value()); + EXPECT_FALSE(chunk_end->data.is_beginning); + EXPECT_TRUE(chunk_end->data.is_end); + + EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); +} + +TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) { + std::vector payload(60); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); + + absl::optional chunk_one = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(chunk_one->data.ppid, kPPID); + EXPECT_TRUE(chunk_one->data.is_beginning); + EXPECT_TRUE(chunk_one->data.is_end); + + absl::optional chunk_two = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); + EXPECT_EQ(chunk_two->data.ppid, PPID(54)); + EXPECT_TRUE(chunk_two->data.is_beginning); + EXPECT_TRUE(chunk_two->data.is_end); +} + +TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) { + std::vector payload(60); + EXPECT_FALSE(buf_.IsFull()); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_FALSE(buf_.IsFull()); + buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); + EXPECT_TRUE(buf_.IsFull()); + // However, it's still possible to add messages. It's a soft limit, and it + // might be necessary to forcefully add messages due to e.g. external + // fragmentation. + buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); + EXPECT_TRUE(buf_.IsFull()); + + absl::optional chunk_one = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(chunk_one->data.ppid, kPPID); + + EXPECT_TRUE(buf_.IsFull()); + + absl::optional chunk_two = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); + EXPECT_EQ(chunk_two->data.ppid, PPID(54)); + + EXPECT_FALSE(buf_.IsFull()); + EXPECT_FALSE(buf_.IsEmpty()); + + absl::optional chunk_three = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); + EXPECT_EQ(chunk_three->data.ppid, PPID(55)); + + EXPECT_FALSE(buf_.IsFull()); + EXPECT_TRUE(buf_.IsEmpty()); +} + +TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) { + std::vector payload(FCFSSendQueue::kMinimumFragmentedPayload + 1); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + // Wouldn't fit enough payload (wouldn't want to fragment) + EXPECT_FALSE( + buf_.Produce(kNow, + /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload - 1) + .has_value()); + + // Minimum fragment + absl::optional chunk_one = + buf_.Produce(kNow, + /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(chunk_one->data.ppid, kPPID); + + // There is only one byte remaining - it can be fetched as it doesn't require + // additional fragmentation. + absl::optional chunk_two = + buf_.Produce(kNow, /*max_size=*/1); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.stream_id, kStreamID); + EXPECT_EQ(chunk_two->data.ppid, kPPID); + + EXPECT_TRUE(buf_.IsEmpty()); +} + +TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) { + std::vector payload(20); + + // Default is ordered + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + absl::optional chunk_one = + buf_.Produce(kNow, /*max_size=*/100); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_FALSE(chunk_one->data.is_unordered); + + // Explicitly unordered. + SendOptions opts; + opts.unordered = IsUnordered(true); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts); + absl::optional chunk_two = + buf_.Produce(kNow, /*max_size=*/100); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_TRUE(chunk_two->data.is_unordered); +} + +TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) { + std::vector payload(20); + + // Default is no expiry + TimeMs now = kNow; + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); + now = now + DurationMs(1000000); + ASSERT_TRUE(buf_.Produce(now, 100)); + + SendOptions expires_2_seconds; + expires_2_seconds.lifetime = DurationMs(2000); + + // Add and consume within lifetime + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + now = now + DurationMs(1999); + ASSERT_TRUE(buf_.Produce(now, 100)); + + // Add and consume just outside lifetime + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + now = now + DurationMs(2000); + ASSERT_FALSE(buf_.Produce(now, 100)); + + // A long time after expiry + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + now = now + DurationMs(1000000); + ASSERT_FALSE(buf_.Produce(now, 100)); + + // Expire one message, but produce the second that is not expired. + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + + SendOptions expires_4_seconds; + expires_4_seconds.lifetime = DurationMs(4000); + + buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); + now = now + DurationMs(2000); + + ASSERT_TRUE(buf_.Produce(now, 100)); + ASSERT_FALSE(buf_.Produce(now, 100)); +} + +TEST_F(FCFSSendQueueTest, DiscardPartialPackets) { + std::vector payload(120); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload)); + + absl::optional chunk_one = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_FALSE(chunk_one->data.is_end); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, + chunk_one->data.message_id); + + absl::optional chunk_two = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_FALSE(chunk_two->data.is_end); + EXPECT_EQ(chunk_two->data.stream_id, StreamID(2)); + + absl::optional chunk_three = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_TRUE(chunk_three->data.is_end); + EXPECT_EQ(chunk_three->data.stream_id, StreamID(2)); + ASSERT_FALSE(buf_.Produce(kNow, 100)); + + // Calling it again shouldn't cause issues. + buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, + chunk_one->data.message_id); + ASSERT_FALSE(buf_.Produce(kNow, 100)); +} + +TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) { + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3})); + buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); + EXPECT_EQ(buf_.total_bytes(), 8u); + + buf_.PrepareResetStreams(std::vector({StreamID(1)})); + EXPECT_EQ(buf_.total_bytes(), 5u); + buf_.CommitResetStreams(); + buf_.PrepareResetStreams(std::vector({StreamID(2)})); + EXPECT_EQ(buf_.total_bytes(), 0u); +} + +TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) { + std::vector payload(120); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional chunk_one = buf_.Produce(kNow, 50); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(buf_.total_bytes(), 2 * payload.size() - 50); + + StreamID stream_ids[] = {StreamID(1)}; + buf_.PrepareResetStreams(stream_ids); + EXPECT_EQ(buf_.total_bytes(), payload.size() - 50); +} + +TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { + std::vector payload(50); + + buf_.PrepareResetStreams(std::vector({StreamID(1)})); + EXPECT_EQ(buf_.total_bytes(), 0u); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(buf_.total_bytes(), payload.size()); + + EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); + buf_.CommitResetStreams(); + EXPECT_EQ(buf_.total_bytes(), payload.size()); + + absl::optional chunk_one = buf_.Produce(kNow, 50); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.stream_id, kStreamID); + EXPECT_EQ(buf_.total_bytes(), 0u); +} + +TEST_F(FCFSSendQueueTest, CommittingResetsSSN) { + std::vector payload(50); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional chunk_one = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.ssn, SSN(0)); + + absl::optional chunk_two = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.ssn, SSN(1)); + + StreamID stream_ids[] = {StreamID(1)}; + buf_.PrepareResetStreams(stream_ids); + + // Buffered + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + EXPECT_TRUE(buf_.CanResetStreams()); + buf_.CommitResetStreams(); + + absl::optional chunk_three = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_EQ(chunk_three->data.ssn, SSN(0)); +} + +TEST_F(FCFSSendQueueTest, RollBackResumesSSN) { + std::vector payload(50); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + absl::optional chunk_one = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_one.has_value()); + EXPECT_EQ(chunk_one->data.ssn, SSN(0)); + + absl::optional chunk_two = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_two.has_value()); + EXPECT_EQ(chunk_two->data.ssn, SSN(1)); + + buf_.PrepareResetStreams(std::vector({StreamID(1)})); + + // Buffered + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + + EXPECT_TRUE(buf_.CanResetStreams()); + buf_.RollbackResetStreams(); + + absl::optional chunk_three = buf_.Produce(kNow, 100); + ASSERT_TRUE(chunk_three.has_value()); + EXPECT_EQ(chunk_three->data.ssn, SSN(2)); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h new file mode 100644 index 0000000000..54f5fd275d --- /dev/null +++ b/net/dcsctp/tx/mock_send_queue.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ +#define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ + +#include + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/tx/send_queue.h" +#include "test/gmock.h" + +namespace dcsctp { + +class MockSendQueue : public SendQueue { + public: + MockSendQueue() { + ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) { + return absl::nullopt; + }); + } + + MOCK_METHOD(absl::optional, + Produce, + (TimeMs now, size_t max_size), + (override)); + MOCK_METHOD(void, + Discard, + (IsUnordered unordered, StreamID stream_id, MID message_id), + (override)); + MOCK_METHOD(void, + PrepareResetStreams, + (rtc::ArrayView streams), + (override)); + MOCK_METHOD(bool, CanResetStreams, (), (const, override)); + MOCK_METHOD(void, CommitResetStreams, (), (override)); + MOCK_METHOD(void, RollbackResetStreams, (), (override)); + MOCK_METHOD(void, Reset, (), (override)); +}; + +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h new file mode 100644 index 0000000000..bb5aab2df8 --- /dev/null +++ b/net/dcsctp/tx/send_queue.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_SEND_QUEUE_H_ +#define NET_DCSCTP_TX_SEND_QUEUE_H_ + +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/internal_types.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/types.h" + +namespace dcsctp { + +class SendQueue { + public: + // Container for a data chunk that is produced by the SendQueue + struct DataToSend { + explicit DataToSend(Data data) : data(std::move(data)) {} + // The data to send, including all parameters. + Data data; + + // Partial reliability - RFC3758 + absl::optional max_retransmissions; + absl::optional expires_at; + }; + + virtual ~SendQueue() = default; + + // TODO(boivie): This interface is obviously missing an "Add" function, but + // that is postponed a bit until the story around how to model message + // prioritization, which is important for any advanced stream scheduler, is + // further clarified. + + // Produce a chunk to be sent. + // + // `max_size` refers to how many payload bytes that may be produced, not + // including any headers. + virtual absl::optional Produce(TimeMs now, size_t max_size) = 0; + + // Discards a partially sent message identified by the parameters `unordered`, + // `stream_id` and `message_id`. The `message_id` comes from the returned + // information when having called `Produce`. A partially sent message means + // that it has had at least one fragment of it returned when `Produce` was + // called prior to calling this method). + // + // This is used when a message has been found to be expired (by the partial + // reliability extension), and the retransmission queue will signal the + // receiver that any partially received message fragments should be skipped. + // This means that any remaining fragments in the Send Queue must be removed + // as well so that they are not sent. + virtual void Discard(IsUnordered unordered, + StreamID stream_id, + MID message_id) = 0; + + // Prepares the streams to be reset. This is used to close a WebRTC data + // channel and will be signaled to the other side. + // + // Concretely, it discards all whole (not partly sent) messages in the given + // streams and pauses those streams so that future added messages aren't + // produced until `ResumeStreams` is called. + // + // TODO(boivie): Investigate if it really should discard any message at all. + // RFC8831 only mentions that "[RFC6525] also guarantees that all the messages + // are delivered (or abandoned) before the stream is reset." + // + // This method can be called multiple times to add more streams to be + // reset, and paused while they are resetting. This is the first part of the + // two-phase commit protocol to reset streams, where the caller completes the + // procedure by either calling `CommitResetStreams` or `RollbackResetStreams`. + virtual void PrepareResetStreams(rtc::ArrayView streams) = 0; + + // Returns true if all non-discarded messages during `PrepareResetStreams` + // (which are those that was partially sent before that method was called) + // have been sent. + virtual bool CanResetStreams() const = 0; + + // Called to commit to reset the streams provided to `PrepareResetStreams`. + // It will reset the stream sequence numbers (SSNs) and message identifiers + // (MIDs) and resume the paused streams. + virtual void CommitResetStreams() = 0; + + // Called to abort the resetting of streams provided to `PrepareResetStreams`. + // Will resume the paused streams without resetting the stream sequence + // numbers (SSNs) or message identifiers (MIDs). Note that the non-partial + // messages that were discarded when calling `PrepareResetStreams` will not be + // recovered, to better match the intention from the sender to "close the + // channel". + virtual void RollbackResetStreams() = 0; + + // Resets all message identifier counters (MID, SSN) and makes all partially + // messages be ready to be re-sent in full. This is used when the peer has + // been detected to have restarted and is used to try to minimize the amount + // of data loss. However, data loss cannot be completely guaranteed when a + // peer restarts. + virtual void Reset() = 0; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_SEND_QUEUE_H_