From 625884e7caeefa1dc1d707365f087734585ec544 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Fri, 31 Jan 2025 14:54:41 +0100 Subject: [PATCH] dcsctp: Refactor rr_send_queue_test This test used a fixture to create the send queue, but that makes it hard to construct them with different parameters in some tests. This refactoring removes the test fixture and creates the queue in each test, which improves test readability instead, as there will be no need for remembering how it was created - that's given by each test now. Bug: webrtc:393540127 Change-Id: I8d158b6ff57fe9cb03b2762d736cf79afbbb8283 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/376100 Reviewed-by: Harald Alvestrand Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/main@{#43880} --- net/dcsctp/tx/rr_send_queue_test.cc | 745 +++++++++++++++------------- 1 file changed, 397 insertions(+), 348 deletions(-) diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 04f718ce46..3af511d841 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -41,69 +41,62 @@ constexpr size_t kOneFragmentPacketSize = 100; constexpr size_t kTwoFragmentPacketSize = 101; constexpr size_t kMtu = 1100; -class RRSendQueueTest : public testing::Test { - protected: - RRSendQueueTest() - : buf_("log: ", - &callbacks_, - - kMtu, - kDefaultPriority, - kBufferedAmountLowThreshold) {} - - testing::NiceMock callbacks_; - const DcSctpOptions options_; - RRSendQueue buf_; -}; - -TEST_F(RRSendQueueTest, EmptyBuffer) { - EXPECT_TRUE(buf_.IsEmpty()); - EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); +TEST(RRSendQueueTest, EmptyBuffer) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_TRUE(q.IsEmpty()); + EXPECT_FALSE(q.Produce(kNow, kOneFragmentPacketSize).has_value()); } -TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); +TEST(RRSendQueueTest, AddAndGetSingleChunk) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); - EXPECT_FALSE(buf_.IsEmpty()); + EXPECT_FALSE(q.IsEmpty()); std::optional chunk_opt = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_opt.has_value()); EXPECT_TRUE(chunk_opt->data.is_beginning); EXPECT_TRUE(chunk_opt->data.is_end); } -TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) { +TEST(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(60); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); std::optional chunk_beg = - buf_.Produce(kNow, /*max_size=*/20); + q.Produce(kNow, /*max_size=*/20); ASSERT_TRUE(chunk_beg.has_value()); EXPECT_TRUE(chunk_beg->data.is_beginning); EXPECT_FALSE(chunk_beg->data.is_end); std::optional chunk_mid = - buf_.Produce(kNow, /*max_size=*/20); + q.Produce(kNow, /*max_size=*/20); ASSERT_TRUE(chunk_mid.has_value()); EXPECT_FALSE(chunk_mid->data.is_beginning); EXPECT_FALSE(chunk_mid->data.is_end); std::optional chunk_end = - buf_.Produce(kNow, /*max_size=*/20); + q.Produce(kNow, /*max_size=*/20); ASSERT_TRUE(chunk_end.has_value()); EXPECT_FALSE(chunk_end->data.is_beginning); EXPECT_TRUE(chunk_end->data.is_end); - EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + EXPECT_FALSE(q.Produce(kNow, kOneFragmentPacketSize).has_value()); } -TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { +TEST(RRSendQueueTest, GetChunksFromTwoMessages) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(60); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); std::optional chunk_one = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); @@ -111,7 +104,7 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { EXPECT_TRUE(chunk_one->data.is_end); std::optional chunk_two = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); @@ -119,772 +112,828 @@ TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) { EXPECT_TRUE(chunk_two->data.is_end); } -TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { +TEST(RRSendQueueTest, BufferBecomesFullAndEmptied) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(600); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); - buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); - EXPECT_GE(buf_.total_buffered_amount(), 1000u); + EXPECT_LT(q.total_buffered_amount(), 1000u); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_LT(q.total_buffered_amount(), 1000u); + q.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); + EXPECT_GE(q.total_buffered_amount(), 1000u); // However, it's still possible to add messages. It's a soft limit, and it // might be necessary to forcefully add messages due to e.g. external // fragmentation. - buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); - EXPECT_GE(buf_.total_buffered_amount(), 1000u); + q.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); + EXPECT_GE(q.total_buffered_amount(), 1000u); - std::optional chunk_one = buf_.Produce(kNow, 1000); + std::optional chunk_one = q.Produce(kNow, 1000); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); - EXPECT_GE(buf_.total_buffered_amount(), 1000u); + EXPECT_GE(q.total_buffered_amount(), 1000u); - std::optional chunk_two = buf_.Produce(kNow, 1000); + std::optional chunk_two = q.Produce(kNow, 1000); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); - EXPECT_FALSE(buf_.IsEmpty()); + EXPECT_LT(q.total_buffered_amount(), 1000u); + EXPECT_FALSE(q.IsEmpty()); - std::optional chunk_three = buf_.Produce(kNow, 1000); + std::optional chunk_three = q.Produce(kNow, 1000); ASSERT_TRUE(chunk_three.has_value()); EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.ppid, PPID(55)); - EXPECT_LT(buf_.total_buffered_amount(), 1000u); - EXPECT_TRUE(buf_.IsEmpty()); + EXPECT_LT(q.total_buffered_amount(), 1000u); + EXPECT_TRUE(q.IsEmpty()); } -TEST_F(RRSendQueueTest, DefaultsToOrderedSend) { +TEST(RRSendQueueTest, DefaultsToOrderedSend) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(20); // Default is ordered - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); std::optional chunk_one = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_FALSE(chunk_one->data.is_unordered); // Explicitly unordered. SendOptions opts; opts.unordered = IsUnordered(true); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts); std::optional chunk_two = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_TRUE(chunk_two->data.is_unordered); } -TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { +TEST(RRSendQueueTest, ProduceWithLifetimeExpiry) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(20); // Default is no expiry Timestamp now = kNow; - buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); now += TimeDelta::Seconds(1000); - ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); + ASSERT_TRUE(q.Produce(now, kOneFragmentPacketSize)); SendOptions expires_2_seconds; expires_2_seconds.lifetime = DurationMs(2000); // Add and consume within lifetime - buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + q.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); now += TimeDelta::Millis(2000); - ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); + ASSERT_TRUE(q.Produce(now, kOneFragmentPacketSize)); // Add and consume just outside lifetime - buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + q.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); now += TimeDelta::Millis(2001); - ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); + ASSERT_FALSE(q.Produce(now, kOneFragmentPacketSize)); // A long time after expiry - buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + q.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); now += TimeDelta::Seconds(1000); - ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); + ASSERT_FALSE(q.Produce(now, kOneFragmentPacketSize)); // Expire one message, but produce the second that is not expired. - buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); + q.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); SendOptions expires_4_seconds; expires_4_seconds.lifetime = DurationMs(4000); - buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); + q.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); now += TimeDelta::Millis(2001); - ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); - ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); + ASSERT_TRUE(q.Produce(now, kOneFragmentPacketSize)); + ASSERT_FALSE(q.Produce(now, kOneFragmentPacketSize)); } -TEST_F(RRSendQueueTest, DiscardPartialPackets) { +TEST(RRSendQueueTest, DiscardPartialPackets) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(120); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload)); std::optional chunk_one = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_FALSE(chunk_one->data.is_end); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id); + q.Discard(chunk_one->data.stream_id, chunk_one->message_id); std::optional chunk_two = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_FALSE(chunk_two->data.is_end); EXPECT_EQ(chunk_two->data.stream_id, StreamID(2)); std::optional chunk_three = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_three.has_value()); EXPECT_TRUE(chunk_three->data.is_end); EXPECT_EQ(chunk_three->data.stream_id, StreamID(2)); - ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); + ASSERT_FALSE(q.Produce(kNow, kOneFragmentPacketSize)); // Calling it again shouldn't cause issues. - buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id); - ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Discard(chunk_one->data.stream_id, chunk_one->message_id); + ASSERT_FALSE(q.Produce(kNow, kOneFragmentPacketSize)); } -TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3})); - buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); - EXPECT_EQ(buf_.total_buffered_amount(), 8u); +TEST(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3})); + q.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); + EXPECT_EQ(q.total_buffered_amount(), 8u); - buf_.PrepareResetStream(StreamID(1)); - EXPECT_EQ(buf_.total_buffered_amount(), 5u); + q.PrepareResetStream(StreamID(1)); + EXPECT_EQ(q.total_buffered_amount(), 5u); - EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), - UnorderedElementsAre(StreamID(1))); - buf_.CommitResetStreams(); - buf_.PrepareResetStream(StreamID(2)); - EXPECT_EQ(buf_.total_buffered_amount(), 0u); + EXPECT_THAT(q.GetStreamsReadyToBeReset(), UnorderedElementsAre(StreamID(1))); + q.CommitResetStreams(); + q.PrepareResetStream(StreamID(2)); + EXPECT_EQ(q.total_buffered_amount(), 0u); } -TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { +TEST(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(120); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - std::optional chunk_one = buf_.Produce(kNow, 50); + std::optional chunk_one = q.Produce(kNow, 50); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50); + EXPECT_EQ(q.total_buffered_amount(), 2 * payload.size() - 50); - buf_.PrepareResetStream(StreamID(1)); - EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50); + q.PrepareResetStream(StreamID(1)); + EXPECT_EQ(q.total_buffered_amount(), payload.size() - 50); } -TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { +TEST(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(50); - buf_.PrepareResetStream(StreamID(1)); - EXPECT_EQ(buf_.total_buffered_amount(), 0u); + q.PrepareResetStream(StreamID(1)); + EXPECT_EQ(q.total_buffered_amount(), 0u); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(q.total_buffered_amount(), payload.size()); - EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + EXPECT_FALSE(q.Produce(kNow, kOneFragmentPacketSize).has_value()); - EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); - EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), - UnorderedElementsAre(StreamID(1))); + EXPECT_TRUE(q.HasStreamsReadyToBeReset()); + EXPECT_THAT(q.GetStreamsReadyToBeReset(), UnorderedElementsAre(StreamID(1))); - EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + EXPECT_FALSE(q.Produce(kNow, kOneFragmentPacketSize).has_value()); - buf_.CommitResetStreams(); - EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + q.CommitResetStreams(); + EXPECT_EQ(q.total_buffered_amount(), payload.size()); - std::optional chunk_one = buf_.Produce(kNow, 50); + std::optional chunk_one = q.Produce(kNow, 50); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - EXPECT_EQ(buf_.total_buffered_amount(), 0u); + EXPECT_EQ(q.total_buffered_amount(), 0u); } -TEST_F(RRSendQueueTest, PausedStreamsStillSendPartialMessagesUntilEnd) { +TEST(RRSendQueueTest, PausedStreamsStillSendPartialMessagesUntilEnd) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); constexpr size_t kPayloadSize = 100; constexpr size_t kFragmentSize = 50; std::vector payload(kPayloadSize); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); std::optional chunk_one = - buf_.Produce(kNow, kFragmentSize); + q.Produce(kNow, kFragmentSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - EXPECT_EQ(buf_.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize); + EXPECT_EQ(q.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize); // This will stop the second message from being sent. - buf_.PrepareResetStream(StreamID(1)); - EXPECT_EQ(buf_.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize); + q.PrepareResetStream(StreamID(1)); + EXPECT_EQ(q.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize); // Should still produce fragments until end of message. std::optional chunk_two = - buf_.Produce(kNow, kFragmentSize); + q.Produce(kNow, kFragmentSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, kStreamID); - EXPECT_EQ(buf_.total_buffered_amount(), 0ul); + EXPECT_EQ(q.total_buffered_amount(), 0ul); // But shouldn't produce any more messages as the stream is paused. - EXPECT_FALSE(buf_.Produce(kNow, kFragmentSize).has_value()); + EXPECT_FALSE(q.Produce(kNow, kFragmentSize).has_value()); } -TEST_F(RRSendQueueTest, CommittingResetsSSN) { +TEST(RRSendQueueTest, CommittingResetsSSN) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(50); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); std::optional chunk_one = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.ssn, SSN(0)); std::optional chunk_two = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); - buf_.PrepareResetStream(StreamID(1)); + q.PrepareResetStream(StreamID(1)); // Buffered - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); - EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), - UnorderedElementsAre(StreamID(1))); - buf_.CommitResetStreams(); + EXPECT_TRUE(q.HasStreamsReadyToBeReset()); + EXPECT_THAT(q.GetStreamsReadyToBeReset(), UnorderedElementsAre(StreamID(1))); + q.CommitResetStreams(); std::optional chunk_three = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_three.has_value()); EXPECT_EQ(chunk_three->data.ssn, SSN(0)); } -TEST_F(RRSendQueueTest, CommittingDoesNotResetMessageId) { +TEST(RRSendQueueTest, CommittingDoesNotResetMessageId) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(50); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.ssn, SSN(0)); EXPECT_EQ(chunk1.message_id, OutgoingMessageId(0)); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.ssn, SSN(1)); EXPECT_EQ(chunk2.message_id, OutgoingMessageId(1)); - buf_.PrepareResetStream(kStreamID); - EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), UnorderedElementsAre(kStreamID)); - buf_.CommitResetStreams(); + q.PrepareResetStream(kStreamID); + EXPECT_THAT(q.GetStreamsReadyToBeReset(), UnorderedElementsAre(kStreamID)); + q.CommitResetStreams(); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk3.data.ssn, SSN(0)); EXPECT_EQ(chunk3.message_id, OutgoingMessageId(2)); } -TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) { +TEST(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(50); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload)); std::optional chunk_one = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, StreamID(1)); EXPECT_EQ(chunk_one->data.ssn, SSN(0)); std::optional chunk_two = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ssn, SSN(0)); - buf_.PrepareResetStream(StreamID(3)); + q.PrepareResetStream(StreamID(3)); // Send two more messages - SID 3 will buffer, SID 1 will send. - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload)); - EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); - EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), - UnorderedElementsAre(StreamID(3))); + EXPECT_TRUE(q.HasStreamsReadyToBeReset()); + EXPECT_THAT(q.GetStreamsReadyToBeReset(), UnorderedElementsAre(StreamID(3))); - buf_.CommitResetStreams(); + q.CommitResetStreams(); std::optional chunk_three = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_three.has_value()); EXPECT_EQ(chunk_three->data.stream_id, StreamID(1)); EXPECT_EQ(chunk_three->data.ssn, SSN(1)); std::optional chunk_four = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_four.has_value()); EXPECT_EQ(chunk_four->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_four->data.ssn, SSN(0)); } -TEST_F(RRSendQueueTest, RollBackResumesSSN) { +TEST(RRSendQueueTest, RollBackResumesSSN) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(50); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); std::optional chunk_one = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.ssn, SSN(0)); std::optional chunk_two = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); - buf_.PrepareResetStream(StreamID(1)); + q.PrepareResetStream(StreamID(1)); // Buffered - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); - EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), - UnorderedElementsAre(StreamID(1))); - buf_.RollbackResetStreams(); + EXPECT_TRUE(q.HasStreamsReadyToBeReset()); + EXPECT_THAT(q.GetStreamsReadyToBeReset(), UnorderedElementsAre(StreamID(1))); + q.RollbackResetStreams(); std::optional chunk_three = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_three.has_value()); EXPECT_EQ(chunk_three->data.ssn, SSN(2)); } -TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) { +TEST(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(200); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk3.data.stream_id, StreamID(2)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk4.data.stream_id, StreamID(2)); } -TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) { +TEST(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(kTwoFragmentPacketSize); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); EXPECT_THAT(chunk2.data.payload, SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk3.data.stream_id, StreamID(2)); EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk4.data.stream_id, StreamID(2)); EXPECT_THAT(chunk4.data.payload, SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize)); } -TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) { - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(2))); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(3))); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(4))); - buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(5))); - buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(6))); - buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector(7))); - buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector(8))); +TEST(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(2))); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(3))); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(4))); + q.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(5))); + q.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(6))); + q.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector(7))); + q.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector(8))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); EXPECT_THAT(chunk1.data.payload, SizeIs(1)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.stream_id, StreamID(2)); EXPECT_THAT(chunk2.data.payload, SizeIs(3)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk3.data.stream_id, StreamID(3)); EXPECT_THAT(chunk3.data.payload, SizeIs(5)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk4.data.stream_id, StreamID(4)); EXPECT_THAT(chunk4.data.payload, SizeIs(7)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk5.data.stream_id, StreamID(1)); EXPECT_THAT(chunk5.data.payload, SizeIs(2)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk6.data.stream_id, StreamID(2)); EXPECT_THAT(chunk6.data.payload, SizeIs(4)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk7.data.stream_id, StreamID(3)); EXPECT_THAT(chunk7.data.payload, SizeIs(6)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk8.data.stream_id, StreamID(4)); EXPECT_THAT(chunk8.data.payload, SizeIs(8)); } -TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) { - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u); +TEST(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); + q.SetBufferedAmountLowThreshold(StreamID(1), 0u); } -TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) { - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u); +TEST(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 1u); - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); EXPECT_THAT(chunk1.data.payload, SizeIs(1)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 0u); } -TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) { - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); +TEST(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); EXPECT_THAT(chunk1.data.payload, SizeIs(1)); - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1))); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 1u); // Should now trigger again, as buffer_amount went above the threshold. - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); EXPECT_THAT(chunk2.data.payload, SizeIs(1)); } -TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) { - buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000); +TEST(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.SetBufferedAmountLowThreshold(StreamID(1), 1000); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(10))); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(10))); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 10u); - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); EXPECT_THAT(chunk1.data.payload, SizeIs(10)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 0u); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(20))); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(20))); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 20u); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); EXPECT_THAT(chunk2.data.payload, SizeIs(20)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 0u); } -TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) { - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); +TEST(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 700); + q.SetBufferedAmountLowThreshold(StreamID(1), 700); std::vector payload(1000); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 900u); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 800u); - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 700u); // Doesn't trigger when reducing even further. - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 600u); } -TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) { - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); +TEST(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 700); + q.SetBufferedAmountLowThreshold(StreamID(1), 700); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1000))); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(1000))); - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, 400)); + q.Produce(kNow, 400)); EXPECT_EQ(chunk1.data.stream_id, StreamID(1)); EXPECT_THAT(chunk1.data.payload, SizeIs(400)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 600u); - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(200))); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(200))); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 800u); // Will trigger again, as it went above the limit. - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, 200)); + q.Produce(kNow, 200)); EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); EXPECT_THAT(chunk2.data.payload, SizeIs(200)); - EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u); + EXPECT_EQ(q.buffered_amount(StreamID(1)), 600u); } -TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) { - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); +TEST(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(100))); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(100))); // Modifying the threshold, still under buffered_amount, should not trigger. - buf_.SetBufferedAmountLowThreshold(StreamID(1), 50); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 99); + q.SetBufferedAmountLowThreshold(StreamID(1), 50); + q.SetBufferedAmountLowThreshold(StreamID(1), 99); // When the threshold reaches buffered_amount, it will trigger. - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 100); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); + q.SetBufferedAmountLowThreshold(StreamID(1), 100); // But not when it's set low again. - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 50); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); + q.SetBufferedAmountLowThreshold(StreamID(1), 50); // But it will trigger when it overshoots. - EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1))); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 150); + EXPECT_CALL(cb, OnBufferedAmountLow(StreamID(1))); + q.SetBufferedAmountLowThreshold(StreamID(1), 150); // But not when it's set low again. - EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0); - buf_.SetBufferedAmountLowThreshold(StreamID(1), 0); + EXPECT_CALL(cb, OnBufferedAmountLow).Times(0); + q.SetBufferedAmountLowThreshold(StreamID(1), 0); } -TEST_F(RRSendQueueTest, - OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) { - EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0); +TEST(RRSendQueueTest, OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_CALL(cb, OnTotalBufferedAmountLow).Times(0); std::vector payload(kBufferedAmountLowThreshold - 1); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(q.total_buffered_amount(), payload.size()); // Will not trigger if going above but never below. - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, - std::vector(kOneFragmentPacketSize))); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, + std::vector(kOneFragmentPacketSize))); } -TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) { - EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0); +TEST(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_CALL(cb, OnTotalBufferedAmountLow).Times(0); std::vector payload(kBufferedAmountLowThreshold); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(q.total_buffered_amount(), payload.size()); // Reaches it. - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector(1))); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector(1))); // Drain it a bit - will trigger. - EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(1); + EXPECT_CALL(cb, OnTotalBufferedAmountLow).Times(1); std::optional chunk_two = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); } -TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) { - buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector(1))); +TEST(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk1.data.stream_id, StreamID(5)); EXPECT_THAT(chunk1.data.payload, SizeIs(1)); // Next, it should pick a different stream. - buf_.Add(kNow, - DcSctpMessage(StreamID(1), kPPID, - std::vector(kOneFragmentPacketSize * 2))); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, + std::vector(kOneFragmentPacketSize * 2))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk2.data.stream_id, StreamID(1)); EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize)); // It should still stay on the Stream1 now, even if might be tempted to switch // to this stream, as it's the stream following 5. - buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector(1))); + q.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector(1))); ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk3.data.stream_id, StreamID(1)); EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize)); // After stream id 1 is complete, it's time to do stream 6. ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4, - buf_.Produce(kNow, kOneFragmentPacketSize)); + q.Produce(kNow, kOneFragmentPacketSize)); EXPECT_EQ(chunk4.data.stream_id, StreamID(6)); EXPECT_THAT(chunk4.data.payload, SizeIs(1)); - EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + EXPECT_FALSE(q.Produce(kNow, kOneFragmentPacketSize).has_value()); } -TEST_F(RRSendQueueTest, StreamsHaveInitialPriority) { - EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), kDefaultPriority); +TEST(RRSendQueueTest, StreamsHaveInitialPriority) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + EXPECT_EQ(q.GetStreamPriority(StreamID(1)), kDefaultPriority); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(40))); - EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), kDefaultPriority); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(40))); + EXPECT_EQ(q.GetStreamPriority(StreamID(2)), kDefaultPriority); } -TEST_F(RRSendQueueTest, CanChangeStreamPriority) { - buf_.SetStreamPriority(StreamID(1), StreamPriority(42)); - EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), StreamPriority(42)); +TEST(RRSendQueueTest, CanChangeStreamPriority) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.SetStreamPriority(StreamID(1), StreamPriority(42)); + EXPECT_EQ(q.GetStreamPriority(StreamID(1)), StreamPriority(42)); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(40))); - buf_.SetStreamPriority(StreamID(2), StreamPriority(42)); - EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), StreamPriority(42)); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(40))); + q.SetStreamPriority(StreamID(2), StreamPriority(42)); + EXPECT_EQ(q.GetStreamPriority(StreamID(2)), StreamPriority(42)); } -TEST_F(RRSendQueueTest, WillHandoverPriority) { - buf_.SetStreamPriority(StreamID(1), StreamPriority(42)); +TEST(RRSendQueueTest, WillHandoverPriority) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.SetStreamPriority(StreamID(1), StreamPriority(42)); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(40))); - buf_.SetStreamPriority(StreamID(2), StreamPriority(42)); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(40))); + q.SetStreamPriority(StreamID(2), StreamPriority(42)); DcSctpSocketHandoverState state; - buf_.AddHandoverState(state); + q.AddHandoverState(state); - RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority, + RRSendQueue q2("log: ", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); q2.RestoreFromState(state); EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42)); EXPECT_EQ(q2.GetStreamPriority(StreamID(2)), StreamPriority(42)); } -TEST_F(RRSendQueueTest, WillSendMessagesByPrio) { - buf_.EnableMessageInterleaving(true); - buf_.SetStreamPriority(StreamID(1), StreamPriority(10)); - buf_.SetStreamPriority(StreamID(2), StreamPriority(20)); - buf_.SetStreamPriority(StreamID(3), StreamPriority(30)); +TEST(RRSendQueueTest, WillSendMessagesByPrio) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); + q.EnableMessageInterleaving(true); + q.SetStreamPriority(StreamID(1), StreamPriority(10)); + q.SetStreamPriority(StreamID(2), StreamPriority(20)); + q.SetStreamPriority(StreamID(3), StreamPriority(30)); - buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(40))); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(20))); - buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(10))); + q.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector(40))); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector(20))); + q.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector(10))); std::vector expected_streams = {3, 2, 2, 1, 1, 1, 1}; for (uint16_t stream_num : expected_streams) { ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk, - buf_.Produce(kNow, 10)); + q.Produce(kNow, 10)); EXPECT_EQ(chunk.data.stream_id, StreamID(stream_num)); } - EXPECT_FALSE(buf_.Produce(kNow, 1).has_value()); + EXPECT_FALSE(q.Produce(kNow, 1).has_value()); } -TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) { +TEST(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(kOneFragmentPacketSize); - buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload), - SendOptions{.lifetime = DurationMs(1000), - .lifecycle_id = LifecycleId(1)}); + q.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload), + SendOptions{.lifetime = DurationMs(1000), + .lifecycle_id = LifecycleId(1)}); - EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1), - /*maybe_delivered=*/false)); - EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1))); - EXPECT_FALSE( - buf_.Produce(kNow + TimeDelta::Millis(1001), kOneFragmentPacketSize) - .has_value()); + EXPECT_CALL(cb, OnLifecycleMessageExpired(LifecycleId(1), + /*maybe_delivered=*/false)); + EXPECT_CALL(cb, OnLifecycleEnd(LifecycleId(1))); + EXPECT_FALSE(q.Produce(kNow + TimeDelta::Millis(1001), kOneFragmentPacketSize) + .has_value()); } -TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) { +TEST(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(120); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), - SendOptions{.lifecycle_id = LifecycleId(1)}); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), - SendOptions{.lifecycle_id = LifecycleId(2)}); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), + SendOptions{.lifecycle_id = LifecycleId(1)}); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), + SendOptions{.lifecycle_id = LifecycleId(2)}); - std::optional chunk_one = buf_.Produce(kNow, 50); + std::optional chunk_one = q.Produce(kNow, 50); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50); + EXPECT_EQ(q.total_buffered_amount(), 2 * payload.size() - 50); - EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(2), - /*maybe_delivered=*/false)); - EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(2))); - buf_.PrepareResetStream(StreamID(1)); - EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50); + EXPECT_CALL(cb, OnLifecycleMessageExpired(LifecycleId(2), + /*maybe_delivered=*/false)); + EXPECT_CALL(cb, OnLifecycleEnd(LifecycleId(2))); + q.PrepareResetStream(StreamID(1)); + EXPECT_EQ(q.total_buffered_amount(), payload.size() - 50); } -TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) { +TEST(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) { + testing::NiceMock cb; + RRSendQueue q("", &cb, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); std::vector payload(kOneFragmentPacketSize + 20); - buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), - SendOptions{.lifecycle_id = LifecycleId(1)}); + q.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), + SendOptions{.lifecycle_id = LifecycleId(1)}); std::optional chunk_one = - buf_.Produce(kNow, kOneFragmentPacketSize); + q.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_one.has_value()); EXPECT_FALSE(chunk_one->data.is_end); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1), - /*maybe_delivered=*/false)); - EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1))); - buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id); + EXPECT_CALL(cb, OnLifecycleMessageExpired(LifecycleId(1), + /*maybe_delivered=*/false)); + EXPECT_CALL(cb, OnLifecycleEnd(LifecycleId(1))); + q.Discard(chunk_one->data.stream_id, chunk_one->message_id); } } // namespace } // namespace dcsctp