When the client asks for a message to be sent, it's put in the SendQueue, which is available even when the socket is not yet connected. When the socket is connected, those messages will be sent on the wire, possibly fragmented if the message is large enough to not fit inside a single packet. When the message has been fully sent, it's removed from the send queue (but it will be in the RetransmissionQueue - which is added in a follow-up change, until the message has been ACKed). The Send Queue is a FIFO queue in this iteration, and in SCTP, that's called a "First Come, First Served" queue, or FCFS. In follow-up work, the queue and the actual scheduling algorithm which decides which message that is sent, when there are messages in multiple streams, will likely be decoupled. But in this iteration, they're in the same class. Bug: webrtc:12614 Change-Id: Iec1183e625499a21e402e4f2a5ebcf989bc5c3ec Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214044 Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33798}
362 lines
12 KiB
C++
362 lines
12 KiB
C++
/*
|
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
#include "net/dcsctp/tx/fcfs_send_queue.h"
|
|
|
|
#include <cstdint>
|
|
#include <type_traits>
|
|
#include <vector>
|
|
|
|
#include "net/dcsctp/packet/data.h"
|
|
#include "net/dcsctp/public/dcsctp_message.h"
|
|
#include "net/dcsctp/public/dcsctp_options.h"
|
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
|
#include "net/dcsctp/public/types.h"
|
|
#include "net/dcsctp/tx/send_queue.h"
|
|
#include "rtc_base/gunit.h"
|
|
#include "test/gmock.h"
|
|
|
|
namespace dcsctp {
|
|
namespace {
|
|
|
|
constexpr TimeMs kNow = TimeMs(0);
|
|
constexpr StreamID kStreamID(1);
|
|
constexpr PPID kPPID(53);
|
|
|
|
class FCFSSendQueueTest : public testing::Test {
|
|
protected:
|
|
FCFSSendQueueTest() : buf_("log: ", 100) {}
|
|
|
|
const DcSctpOptions options_;
|
|
FCFSSendQueue buf_;
|
|
};
|
|
|
|
TEST_F(FCFSSendQueueTest, EmptyBuffer) {
|
|
EXPECT_TRUE(buf_.IsEmpty());
|
|
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
|
EXPECT_FALSE(buf_.IsFull());
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) {
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
|
|
|
|
EXPECT_FALSE(buf_.IsEmpty());
|
|
EXPECT_FALSE(buf_.IsFull());
|
|
absl::optional<SendQueue::DataToSend> chunk_opt = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_opt.has_value());
|
|
EXPECT_TRUE(chunk_opt->data.is_beginning);
|
|
EXPECT_TRUE(chunk_opt->data.is_end);
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) {
|
|
std::vector<uint8_t> payload(60);
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_beg =
|
|
buf_.Produce(kNow, /*max_size=*/20);
|
|
ASSERT_TRUE(chunk_beg.has_value());
|
|
EXPECT_TRUE(chunk_beg->data.is_beginning);
|
|
EXPECT_FALSE(chunk_beg->data.is_end);
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_mid =
|
|
buf_.Produce(kNow, /*max_size=*/20);
|
|
ASSERT_TRUE(chunk_mid.has_value());
|
|
EXPECT_FALSE(chunk_mid->data.is_beginning);
|
|
EXPECT_FALSE(chunk_mid->data.is_end);
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_end =
|
|
buf_.Produce(kNow, /*max_size=*/20);
|
|
ASSERT_TRUE(chunk_end.has_value());
|
|
EXPECT_FALSE(chunk_end->data.is_beginning);
|
|
EXPECT_TRUE(chunk_end->data.is_end);
|
|
|
|
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) {
|
|
std::vector<uint8_t> payload(60);
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
|
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
|
EXPECT_TRUE(chunk_one->data.is_beginning);
|
|
EXPECT_TRUE(chunk_one->data.is_end);
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_two.has_value());
|
|
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
|
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
|
EXPECT_TRUE(chunk_two->data.is_beginning);
|
|
EXPECT_TRUE(chunk_two->data.is_end);
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) {
|
|
std::vector<uint8_t> payload(60);
|
|
EXPECT_FALSE(buf_.IsFull());
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
EXPECT_FALSE(buf_.IsFull());
|
|
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
|
|
EXPECT_TRUE(buf_.IsFull());
|
|
// However, it's still possible to add messages. It's a soft limit, and it
|
|
// might be necessary to forcefully add messages due to e.g. external
|
|
// fragmentation.
|
|
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
|
|
EXPECT_TRUE(buf_.IsFull());
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
|
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
|
|
|
EXPECT_TRUE(buf_.IsFull());
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_two.has_value());
|
|
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
|
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
|
|
|
EXPECT_FALSE(buf_.IsFull());
|
|
EXPECT_FALSE(buf_.IsEmpty());
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_three.has_value());
|
|
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
|
|
EXPECT_EQ(chunk_three->data.ppid, PPID(55));
|
|
|
|
EXPECT_FALSE(buf_.IsFull());
|
|
EXPECT_TRUE(buf_.IsEmpty());
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) {
|
|
std::vector<uint8_t> payload(FCFSSendQueue::kMinimumFragmentedPayload + 1);
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
|
|
// Wouldn't fit enough payload (wouldn't want to fragment)
|
|
EXPECT_FALSE(
|
|
buf_.Produce(kNow,
|
|
/*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload - 1)
|
|
.has_value());
|
|
|
|
// Minimum fragment
|
|
absl::optional<SendQueue::DataToSend> chunk_one =
|
|
buf_.Produce(kNow,
|
|
/*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
|
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
|
|
|
// There is only one byte remaining - it can be fetched as it doesn't require
|
|
// additional fragmentation.
|
|
absl::optional<SendQueue::DataToSend> chunk_two =
|
|
buf_.Produce(kNow, /*max_size=*/1);
|
|
ASSERT_TRUE(chunk_two.has_value());
|
|
EXPECT_EQ(chunk_two->data.stream_id, kStreamID);
|
|
EXPECT_EQ(chunk_two->data.ppid, kPPID);
|
|
|
|
EXPECT_TRUE(buf_.IsEmpty());
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) {
|
|
std::vector<uint8_t> payload(20);
|
|
|
|
// Default is ordered
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
absl::optional<SendQueue::DataToSend> chunk_one =
|
|
buf_.Produce(kNow, /*max_size=*/100);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_FALSE(chunk_one->data.is_unordered);
|
|
|
|
// Explicitly unordered.
|
|
SendOptions opts;
|
|
opts.unordered = IsUnordered(true);
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
|
|
absl::optional<SendQueue::DataToSend> chunk_two =
|
|
buf_.Produce(kNow, /*max_size=*/100);
|
|
ASSERT_TRUE(chunk_two.has_value());
|
|
EXPECT_TRUE(chunk_two->data.is_unordered);
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) {
|
|
std::vector<uint8_t> payload(20);
|
|
|
|
// Default is no expiry
|
|
TimeMs now = kNow;
|
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
|
|
now = now + DurationMs(1000000);
|
|
ASSERT_TRUE(buf_.Produce(now, 100));
|
|
|
|
SendOptions expires_2_seconds;
|
|
expires_2_seconds.lifetime = DurationMs(2000);
|
|
|
|
// Add and consume within lifetime
|
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
|
now = now + DurationMs(1999);
|
|
ASSERT_TRUE(buf_.Produce(now, 100));
|
|
|
|
// Add and consume just outside lifetime
|
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
|
now = now + DurationMs(2000);
|
|
ASSERT_FALSE(buf_.Produce(now, 100));
|
|
|
|
// A long time after expiry
|
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
|
now = now + DurationMs(1000000);
|
|
ASSERT_FALSE(buf_.Produce(now, 100));
|
|
|
|
// Expire one message, but produce the second that is not expired.
|
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
|
|
|
SendOptions expires_4_seconds;
|
|
expires_4_seconds.lifetime = DurationMs(4000);
|
|
|
|
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
|
|
now = now + DurationMs(2000);
|
|
|
|
ASSERT_TRUE(buf_.Produce(now, 100));
|
|
ASSERT_FALSE(buf_.Produce(now, 100));
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, DiscardPartialPackets) {
|
|
std::vector<uint8_t> payload(120);
|
|
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_FALSE(chunk_one->data.is_end);
|
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
|
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
|
chunk_one->data.message_id);
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_two.has_value());
|
|
EXPECT_FALSE(chunk_two->data.is_end);
|
|
EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_three.has_value());
|
|
EXPECT_TRUE(chunk_three->data.is_end);
|
|
EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
|
|
ASSERT_FALSE(buf_.Produce(kNow, 100));
|
|
|
|
// Calling it again shouldn't cause issues.
|
|
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
|
chunk_one->data.message_id);
|
|
ASSERT_FALSE(buf_.Produce(kNow, 100));
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) {
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
|
|
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
|
|
EXPECT_EQ(buf_.total_bytes(), 8u);
|
|
|
|
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
|
|
EXPECT_EQ(buf_.total_bytes(), 5u);
|
|
buf_.CommitResetStreams();
|
|
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(2)}));
|
|
EXPECT_EQ(buf_.total_bytes(), 0u);
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) {
|
|
std::vector<uint8_t> payload(120);
|
|
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
|
EXPECT_EQ(buf_.total_bytes(), 2 * payload.size() - 50);
|
|
|
|
StreamID stream_ids[] = {StreamID(1)};
|
|
buf_.PrepareResetStreams(stream_ids);
|
|
EXPECT_EQ(buf_.total_bytes(), payload.size() - 50);
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
|
|
std::vector<uint8_t> payload(50);
|
|
|
|
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
|
|
EXPECT_EQ(buf_.total_bytes(), 0u);
|
|
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
|
|
|
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
|
buf_.CommitResetStreams();
|
|
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
|
EXPECT_EQ(buf_.total_bytes(), 0u);
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, CommittingResetsSSN) {
|
|
std::vector<uint8_t> payload(50);
|
|
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_two.has_value());
|
|
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
|
|
|
StreamID stream_ids[] = {StreamID(1)};
|
|
buf_.PrepareResetStreams(stream_ids);
|
|
|
|
// Buffered
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
|
|
EXPECT_TRUE(buf_.CanResetStreams());
|
|
buf_.CommitResetStreams();
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_three.has_value());
|
|
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
|
|
}
|
|
|
|
TEST_F(FCFSSendQueueTest, RollBackResumesSSN) {
|
|
std::vector<uint8_t> payload(50);
|
|
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_one.has_value());
|
|
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_two.has_value());
|
|
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
|
|
|
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
|
|
|
|
// Buffered
|
|
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
|
|
|
EXPECT_TRUE(buf_.CanResetStreams());
|
|
buf_.RollbackResetStreams();
|
|
|
|
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
|
ASSERT_TRUE(chunk_three.has_value());
|
|
EXPECT_EQ(chunk_three->data.ssn, SSN(2));
|
|
}
|
|
|
|
} // namespace
|
|
} // namespace dcsctp
|