diff --git a/net/dcsctp/common/internal_types.h b/net/dcsctp/common/internal_types.h index 2354b92cc4..4f3b1935a2 100644 --- a/net/dcsctp/common/internal_types.h +++ b/net/dcsctp/common/internal_types.h @@ -40,5 +40,10 @@ using VerificationTag = webrtc::StrongAlias; // Tie Tag, used as a nonce when connecting. using TieTag = webrtc::StrongAlias; +// An ID for every outgoing message, to correlate outgoing data chunks with the +// message it was carved from. +using OutgoingMessageId = + webrtc::StrongAlias; + } // namespace dcsctp #endif // NET_DCSCTP_COMMON_INTERNAL_TYPES_H_ diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index 5219317d97..5547ffa870 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -29,6 +29,7 @@ rtc_library("rr_send_queue") { "../../../rtc_base:checks", "../../../rtc_base:logging", "../../../rtc_base/containers:flat_map", + "../common:internal_types", "../common:str_join", "../packet:data", "../public:socket", @@ -105,6 +106,7 @@ rtc_library("outstanding_data") { "../../../rtc_base:checks", "../../../rtc_base:logging", "../../../rtc_base/containers:flat_set", + "../common:internal_types", "../common:math", "../common:sequence_numbers", "../common:str_join", @@ -184,6 +186,7 @@ if (rtc_include_tests) { "../../../rtc_base:gunit_helpers", "../../../test:test_support", "../common:handover_testing", + "../common:internal_types", "../common:math", "../common:sequence_numbers", "../packet:chunk", diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h index d551885799..04921866ae 100644 --- a/net/dcsctp/tx/mock_send_queue.h +++ b/net/dcsctp/tx/mock_send_queue.h @@ -34,7 +34,7 @@ class MockSendQueue : public SendQueue { (override)); MOCK_METHOD(bool, Discard, - (IsUnordered unordered, StreamID stream_id, MID mid), + (StreamID stream_id, OutgoingMessageId message_id), (override)); MOCK_METHOD(void, PrepareResetStream, (StreamID stream_id), (override)); MOCK_METHOD(bool, HasStreamsReadyToBeReset, (), (const, override)); diff --git a/net/dcsctp/tx/outstanding_data.cc b/net/dcsctp/tx/outstanding_data.cc index 6d18625f6f..c2706bd0d2 100644 --- a/net/dcsctp/tx/outstanding_data.cc +++ b/net/dcsctp/tx/outstanding_data.cc @@ -63,6 +63,8 @@ void OutstandingData::Item::MarkAsRetransmitted() { } void OutstandingData::Item::Abandon() { + RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() || + max_retransmissions_ != MaxRetransmits::NoLimit()); lifecycle_ = Lifecycle::kAbandoned; } @@ -252,6 +254,7 @@ bool OutstandingData::NackItem(UnwrappedTSN tsn, RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " marked for retransmission"; break; case Item::NackAction::kAbandon: + RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " Nacked, resulted in abandoning"; AbandonAllFor(item); break; } @@ -260,8 +263,7 @@ bool OutstandingData::NackItem(UnwrappedTSN tsn, void OutstandingData::AbandonAllFor(const Item& item) { // Erase all remaining chunks from the producer, if any. - if (discard_from_send_queue_(item.data().is_unordered, item.data().stream_id, - item.data().mid)) { + if (discard_from_send_queue_(item.data().stream_id, item.message_id())) { // There were remaining chunks to be produced for this message. Since the // receiver may have already received all chunks (up till now) for this // message, we can't just FORWARD-TSN to the last fragment in this @@ -279,10 +281,10 @@ void OutstandingData::AbandonAllFor(const Item& item) { Item& added_item = outstanding_data_ .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple(std::move(message_end), TimeMs(0), - MaxRetransmits::NoLimit(), - TimeMs::InfiniteFuture(), - LifecycleId::NotSet())) + std::forward_as_tuple( + item.message_id(), std::move(message_end), TimeMs(0), + MaxRetransmits(0), TimeMs::InfiniteFuture(), + LifecycleId::NotSet())) .first->second; // The added chunk shouldn't be included in `outstanding_bytes`, so set it // as acked. @@ -294,8 +296,7 @@ void OutstandingData::AbandonAllFor(const Item& item) { for (auto& [tsn, other] : outstanding_data_) { if (!other.is_abandoned() && other.data().stream_id == item.data().stream_id && - other.data().is_unordered == item.data().is_unordered && - other.data().mid == item.data().mid) { + other.message_id() == item.message_id()) { RTC_DLOG(LS_VERBOSE) << "Marking chunk " << *tsn.Wrap() << " as abandoned"; if (other.should_be_retransmitted()) { @@ -395,6 +396,7 @@ UnwrappedTSN OutstandingData::highest_outstanding_tsn() const { } absl::optional OutstandingData::Insert( + OutgoingMessageId message_id, const Data& data, TimeMs time_sent, MaxRetransmits max_retransmissions, @@ -409,9 +411,9 @@ absl::optional OutstandingData::Insert( ++outstanding_items_; auto it = outstanding_data_ .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple(data.Clone(), time_sent, - max_retransmissions, expires_at, - lifecycle_id)) + std::forward_as_tuple(message_id, data.Clone(), + time_sent, max_retransmissions, + expires_at, lifecycle_id)) .first; if (it->second.has_expired(time_sent)) { diff --git a/net/dcsctp/tx/outstanding_data.h b/net/dcsctp/tx/outstanding_data.h index 0bca1c6fe4..f8e939661d 100644 --- a/net/dcsctp/tx/outstanding_data.h +++ b/net/dcsctp/tx/outstanding_data.h @@ -16,6 +16,7 @@ #include #include "absl/types/optional.h" +#include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" #include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" @@ -75,7 +76,7 @@ class OutstandingData { size_t data_chunk_header_size, UnwrappedTSN next_tsn, UnwrappedTSN last_cumulative_tsn_ack, - std::function discard_from_send_queue) + std::function discard_from_send_queue) : data_chunk_header_size_(data_chunk_header_size), next_tsn_(next_tsn), last_cumulative_tsn_ack_(last_cumulative_tsn_ack), @@ -128,6 +129,7 @@ class OutstandingData { // parameters. Returns the TSN if the item was actually added and scheduled to // be sent, and absl::nullopt if it shouldn't be sent. absl::optional Insert( + OutgoingMessageId message_id, const Data& data, TimeMs time_sent, MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(), @@ -175,12 +177,14 @@ class OutstandingData { kAbandon, }; - Item(Data data, + Item(OutgoingMessageId message_id, + Data data, TimeMs time_sent, MaxRetransmits max_retransmissions, TimeMs expires_at, LifecycleId lifecycle_id) - : time_sent_(time_sent), + : message_id_(message_id), + time_sent_(time_sent), max_retransmissions_(max_retransmissions), expires_at_(expires_at), lifecycle_id_(lifecycle_id), @@ -189,6 +193,8 @@ class OutstandingData { Item(const Item&) = delete; Item& operator=(const Item&) = delete; + OutgoingMessageId message_id() const { return message_id_; } + TimeMs time_sent() const { return time_sent_; } const Data& data() const { return data_; } @@ -249,6 +255,8 @@ class OutstandingData { // NOTE: This data structure has been optimized for size, by ordering fields // to avoid unnecessary padding. + const OutgoingMessageId message_id_; + // When the packet was sent, and placed in this queue. const TimeMs time_sent_; // If the message was sent with a maximum number of retransmissions, this is @@ -338,7 +346,7 @@ class OutstandingData { // The last cumulative TSN ack number. UnwrappedTSN last_cumulative_tsn_ack_; // Callback when to discard items from the send queue. - std::function discard_from_send_queue_; + std::function discard_from_send_queue_; std::map outstanding_data_; // The number of bytes that are in-flight (sent but not yet acked or nacked). diff --git a/net/dcsctp/tx/outstanding_data_test.cc b/net/dcsctp/tx/outstanding_data_test.cc index 3bb82fd3e6..b8c2e593a1 100644 --- a/net/dcsctp/tx/outstanding_data_test.cc +++ b/net/dcsctp/tx/outstanding_data_test.cc @@ -12,10 +12,12 @@ #include #include "absl/types/optional.h" +#include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/packet/chunk/data_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" #include "net/dcsctp/testing/data_generator.h" #include "net/dcsctp/testing/testing_macros.h" @@ -37,6 +39,7 @@ using ::testing::StrictMock; using ::testing::UnorderedElementsAre; constexpr TimeMs kNow(42); +constexpr OutgoingMessageId kMessageId = OutgoingMessageId(17); class OutstandingDataTest : public testing::Test { protected: @@ -49,7 +52,7 @@ class OutstandingDataTest : public testing::Test { UnwrappedTSN::Unwrapper unwrapper_; DataGenerator gen_; - StrictMock> on_discard_; + StrictMock> on_discard_; OutstandingData buf_; }; @@ -67,8 +70,8 @@ TEST_F(OutstandingDataTest, HasInitialState) { } TEST_F(OutstandingDataTest, InsertChunk) { - ASSERT_HAS_VALUE_AND_ASSIGN(UnwrappedTSN tsn, - buf_.Insert(gen_.Ordered({1}, "BE"), kNow)); + ASSERT_HAS_VALUE_AND_ASSIGN( + UnwrappedTSN tsn, buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow)); EXPECT_EQ(tsn.Wrap(), TSN(10)); @@ -84,7 +87,7 @@ TEST_F(OutstandingDataTest, InsertChunk) { } TEST_F(OutstandingDataTest, AcksSingleChunk) { - buf_.Insert(gen_.Ordered({1}, "BE"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); OutstandingData::AckInfo ack = buf_.HandleSack(unwrapper_.Unwrap(TSN(10)), {}, false); @@ -103,7 +106,7 @@ TEST_F(OutstandingDataTest, AcksSingleChunk) { } TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) { - buf_.Insert(gen_.Ordered({1}, "BE"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false); EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); @@ -118,8 +121,8 @@ TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) { } TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) { - buf_.Insert(gen_.Ordered({1}, "B"), kNow); - buf_.Insert(gen_.Ordered({1}, "E"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow); std::vector gab = {SackChunk::GapAckBlock(2, 2)}; OutstandingData::AckInfo ack = @@ -141,8 +144,8 @@ TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) { } TEST_F(OutstandingDataTest, NacksThreeTimesWithSameTsnDoesntRetransmit) { - buf_.Insert(gen_.Ordered({1}, "B"), kNow); - buf_.Insert(gen_.Ordered({1}, "E"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow); std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; EXPECT_FALSE( @@ -164,10 +167,10 @@ TEST_F(OutstandingDataTest, NacksThreeTimesWithSameTsnDoesntRetransmit) { } TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) { - buf_.Insert(gen_.Ordered({1}, "B"), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, "E"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow); std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; EXPECT_FALSE( @@ -202,10 +205,10 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) { TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) { static constexpr MaxRetransmits kMaxRetransmissions(0); - buf_.Insert(gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions); std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; EXPECT_FALSE( @@ -217,7 +220,7 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) { buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); std::vector gab3 = {SackChunk::GapAckBlock(2, 4)}; OutstandingData::AckInfo ack = @@ -238,10 +241,10 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) { TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) { static constexpr MaxRetransmits kMaxRetransmissions(0); - buf_.Insert(gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; EXPECT_FALSE( @@ -253,7 +256,7 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) { buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(true)); std::vector gab3 = {SackChunk::GapAckBlock(2, 4)}; OutstandingData::AckInfo ack = @@ -275,17 +278,19 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) { TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) { static constexpr TimeMs kExpiresAt = kNow + DurationMs(1); - EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, "B"), kNow, + EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); - EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, ""), kNow + DurationMs(0), - MaxRetransmits::NoLimit(), kExpiresAt) + EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, ""), + kNow + DurationMs(0), MaxRetransmits::NoLimit(), + kExpiresAt) .has_value()); - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); - EXPECT_FALSE(buf_.Insert(gen_.Ordered({1}, "E"), kNow + DurationMs(1), - MaxRetransmits::NoLimit(), kExpiresAt) + EXPECT_FALSE(buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), + kNow + DurationMs(1), MaxRetransmits::NoLimit(), + kExpiresAt) .has_value()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); @@ -301,11 +306,11 @@ TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) { TEST_F(OutstandingDataTest, CanGenerateForwardTsn) { static constexpr MaxRetransmits kMaxRetransmissions(0); - buf_.Insert(gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); - buf_.Insert(gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions); - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); buf_.NackAll(); @@ -322,14 +327,14 @@ TEST_F(OutstandingDataTest, CanGenerateForwardTsn) { } TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) { - buf_.Insert(gen_.Ordered({1}, "B"), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, ""), kNow); - buf_.Insert(gen_.Ordered({1}, "E"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow); EXPECT_THAT(buf_.GetChunkStatesForTesting(), testing::ElementsAre(Pair(TSN(9), State::kAcked), // @@ -356,9 +361,9 @@ TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) { } TEST_F(OutstandingDataTest, MeasureRTT) { - buf_.Insert(gen_.Ordered({1}, "BE"), kNow); - buf_.Insert(gen_.Ordered({1}, "BE"), kNow + DurationMs(1)); - buf_.Insert(gen_.Ordered({1}, "BE"), kNow + DurationMs(2)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(1)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(2)); static constexpr DurationMs kDuration(123); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -375,7 +380,8 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { static constexpr MaxRetransmits kOneRetransmission(1); for (int tsn = 10; tsn <= 20; ++tsn) { - buf_.Insert(gen_.Ordered({1}, tsn == 10 ? "B" + buf_.Insert(kMessageId, + gen_.Ordered({1}, tsn == 10 ? "B" : tsn == 20 ? "E" : ""), kNow, kOneRetransmission); @@ -434,7 +440,7 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab8, false).has_packet_loss); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); std::vector gab9 = {SackChunk::GapAckBlock(2, 10)}; @@ -445,54 +451,16 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); } -TEST_F(OutstandingDataTest, CanAbandonChunksMarkedForFastRetransmit) { - // This test is a bit convoluted, and can't really happen with a well behaving - // client, but this was found by fuzzers. This test will verify that a message - // that was both marked as "to be fast retransmitted" and "abandoned" at the - // same time doesn't cause any consistency issues. - - // Add chunks 10-14, but chunk 11 has zero retransmissions. When chunk 10 and - // 11 are NACKed three times, chunk 10 will be marked for retransmission, but - // chunk 11 will be abandoned, which also abandons chunk 10, as it's part of - // the same message. - buf_.Insert(gen_.Ordered({1}, "B"), kNow); // 10 - buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); // 11 - buf_.Insert(gen_.Ordered({1}, ""), kNow); // 12 - buf_.Insert(gen_.Ordered({1}, ""), kNow); // 13 - buf_.Insert(gen_.Ordered({1}, "E"), kNow); // 14 - - // ACK 9, 12 - std::vector gab1 = {SackChunk::GapAckBlock(3, 3)}; - EXPECT_FALSE( - buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss); - EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); - - // ACK 9, 12, 13 - std::vector gab2 = {SackChunk::GapAckBlock(3, 4)}; - EXPECT_FALSE( - buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss); - EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); - - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) - .WillOnce(Return(false)); - - // ACK 9, 12, 13, 14 - std::vector gab3 = {SackChunk::GapAckBlock(3, 5)}; - OutstandingData::AckInfo ack = - buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false); - EXPECT_TRUE(ack.has_packet_loss); - EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); - EXPECT_THAT(buf_.GetChunksToBeFastRetransmitted(1000), IsEmpty()); - EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000), IsEmpty()); -} - TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) { - buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(), - TimeMs::InfiniteFuture(), LifecycleId(42)); - buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(), - TimeMs::InfiniteFuture(), LifecycleId(43)); - buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(), - TimeMs::InfiniteFuture(), LifecycleId(44)); + buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE"), kNow, + MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + LifecycleId(42)); + buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE"), kNow, + MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + LifecycleId(43)); + buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE"), kNow, + MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + LifecycleId(44)); OutstandingData::AckInfo ack1 = buf_.HandleSack(unwrapper_.Unwrap(TSN(11)), {}, false); @@ -507,10 +475,10 @@ TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) { } TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) { - buf_.Insert(gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0)); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), TimeMs::InfiniteFuture(), LifecycleId(42)); std::vector gab1 = {SackChunk::GapAckBlock(2, 2)}; @@ -524,7 +492,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) { EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); std::vector gab3 = {SackChunk::GapAckBlock(2, 4)}; - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); OutstandingData::AckInfo ack1 = buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false); @@ -543,10 +511,10 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) { } TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) { - buf_.Insert(gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), + buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0)); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); + buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), TimeMs::InfiniteFuture(), LifecycleId(42)); EXPECT_THAT(buf_.GetChunkStatesForTesting(), @@ -569,7 +537,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) { Pair(TSN(13), State::kAcked))); // T3-rtx triggered. - EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); buf_.NackAll(); @@ -609,24 +577,31 @@ TEST_F(OutstandingDataTest, GeneratesForwardTsnUntilNextStreamResetTsn) { constexpr DataGeneratorOptions kStream1 = {.stream_id = StreamID(1)}; constexpr DataGeneratorOptions kStream2 = {.stream_id = StreamID(2)}; constexpr DataGeneratorOptions kStream3 = {.stream_id = StreamID(3)}; + constexpr MaxRetransmits kNoRtx = MaxRetransmits(0); EXPECT_CALL(on_discard_, Call).WillRepeatedly(Return(false)); // TSN 10-12 - buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow); - buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow); - buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow, MaxRetransmits(0)); + buf_.Insert(OutgoingMessageId(0), gen_.Ordered({1}, "BE", kStream1), kNow, + kNoRtx); + buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE", kStream1), kNow, + kNoRtx); + buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE", kStream1), kNow, + kNoRtx); buf_.BeginResetStreams(); // TSN 13, 14 - buf_.Insert(gen_.Ordered({1}, "BE", kStream2), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, "BE", kStream2), kNow, MaxRetransmits(0)); + buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE", kStream2), kNow, + kNoRtx); + buf_.Insert(OutgoingMessageId(4), gen_.Ordered({1}, "BE", kStream2), kNow, + kNoRtx); buf_.BeginResetStreams(); // TSN 15, 16 - buf_.Insert(gen_.Ordered({1}, "BE", kStream3), kNow, MaxRetransmits(0)); - buf_.Insert(gen_.Ordered({1}, "BE", kStream3), kNow); + buf_.Insert(OutgoingMessageId(5), gen_.Ordered({1}, "BE", kStream3), kNow, + kNoRtx); + buf_.Insert(OutgoingMessageId(6), gen_.Ordered({1}, "BE", kStream3), kNow); EXPECT_FALSE(buf_.ShouldSendForwardTsn()); diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index e2b28b1d81..2b9843f4a7 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -86,8 +86,8 @@ RetransmissionQueue::RetransmissionQueue( data_chunk_header_size_, tsn_unwrapper_.Unwrap(my_initial_tsn), tsn_unwrapper_.Unwrap(TSN(*my_initial_tsn - 1)), - [this](IsUnordered unordered, StreamID stream_id, MID mid) { - return send_queue_.Discard(unordered, stream_id, mid); + [this](StreamID stream_id, OutgoingMessageId message_id) { + return send_queue_.Discard(stream_id, message_id); }) {} bool RetransmissionQueue::IsConsistent() const { @@ -491,7 +491,7 @@ std::vector> RetransmissionQueue::GetChunksToSend( rwnd_ -= chunk_size; absl::optional tsn = outstanding_data_.Insert( - chunk_opt->data, now, + chunk_opt->message_id, chunk_opt->data, now, partial_reliability_ ? chunk_opt->max_retransmissions : MaxRetransmits::NoLimit(), partial_reliability_ ? chunk_opt->expires_at : TimeMs::InfiniteFuture(), diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc index 88de0b54f0..d50494f084 100644 --- a/net/dcsctp/tx/retransmission_queue_test.cc +++ b/net/dcsctp/tx/retransmission_queue_test.cc @@ -20,6 +20,7 @@ #include "api/array_view.h" #include "api/task_queue/task_queue_base.h" #include "net/dcsctp/common/handover_testing.h" +#include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/packet/chunk/data_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" @@ -44,6 +45,7 @@ using ::testing::MockFunction; using State = ::dcsctp::RetransmissionQueue::State; using ::testing::_; using ::testing::ElementsAre; +using ::testing::Field; using ::testing::IsEmpty; using ::testing::NiceMock; using ::testing::Pair; @@ -53,6 +55,7 @@ using ::testing::UnorderedElementsAre; constexpr uint32_t kArwnd = 100000; constexpr uint32_t kMaxMtu = 1191; +constexpr OutgoingMessageId kMessageId = OutgoingMessageId(42); DcSctpOptions MakeOptions() { DcSctpOptions options; @@ -74,9 +77,11 @@ class RetransmissionQueueTest : public testing::Test { []() { return absl::nullopt; }, TimerOptions(options_.rto_initial))) {} - std::function CreateChunk() { - return [this](TimeMs now, size_t max_size) { - return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + std::function CreateChunk( + OutgoingMessageId message_id) { + return [this, message_id](TimeMs now, size_t max_size) { + return SendQueue::DataToSend(message_id, + gen_.Ordered({1, 2, 3, 4}, "BE")); }; } @@ -140,7 +145,7 @@ TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) { TEST_F(RetransmissionQueueTest, SendOneChunk) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -153,7 +158,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunk) { TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -167,9 +172,9 @@ TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), @@ -185,14 +190,14 @@ TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) + .WillOnce(CreateChunk(OutgoingMessageId(5))) + .WillOnce(CreateChunk(OutgoingMessageId(6))) + .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), @@ -216,14 +221,14 @@ TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) + .WillOnce(CreateChunk(OutgoingMessageId(5))) + .WillOnce(CreateChunk(OutgoingMessageId(6))) + .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), @@ -235,7 +240,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 18 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(8))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18))); @@ -256,7 +261,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 19 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(9))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19))); @@ -268,7 +273,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 20 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(10))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20))); @@ -313,9 +318,9 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // TSN, it will also restart T3-RTX. RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); static constexpr TimeMs kStartTime(100000); @@ -336,7 +341,7 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Send 13 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13))); @@ -347,7 +352,7 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Send 14 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14))); @@ -393,10 +398,12 @@ TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered({1, 2, 3, 4}, "BE")); }) .WillOnce([this](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + return SendQueue::DataToSend(OutgoingMessageId(1), + gen_.Ordered({1, 2, 3, 4}, "BE")); }) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); @@ -414,7 +421,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered({1, 2, 3, 4}, "BE")); }) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); @@ -452,7 +460,7 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { CreateQueue(/*supports_partial_reliability=*/false); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) @@ -473,8 +481,7 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted))); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(0); + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); } // namespace dcsctp @@ -482,7 +489,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) @@ -497,8 +504,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { Pair(TSN(10), State::kInFlight))); // Will force chunks to be retransmitted - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(1); + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); queue.HandleT3RtxTimerExpiry(); @@ -524,7 +530,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(3); return dts; }) @@ -538,8 +544,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(0); + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0); // Retransmission 1 queue.HandleT3RtxTimerExpiry(); @@ -557,8 +562,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); // Retransmission 4 - not allowed. - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(1); + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); queue.HandleT3RtxTimerExpiry(); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty()); @@ -579,7 +583,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { std::vector payload(1000); EXPECT_CALL(producer_, Produce) .WillOnce([this, payload](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "BE")); }) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); @@ -615,17 +620,18 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "")); + SendQueue::DataToSend dts(kMessageId, + gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) @@ -645,7 +651,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { // Chunk 10 is acked, but the remaining are lost queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(true)); queue.HandleT3RtxTimerExpiry(); @@ -670,17 +676,18 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E")); + SendQueue::DataToSend dts(kMessageId, + gen_.Ordered({9, 10, 11, 12}, "E")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) @@ -700,7 +707,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { // Chunk 10 is acked, but the remaining are lost queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(false)); queue.HandleT3RtxTimerExpiry(); @@ -725,28 +732,32 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { .WillOnce([this](TimeMs, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(1); - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts)); + SendQueue::DataToSend dts(OutgoingMessageId(42), + gen_.Ordered({1, 2, 3, 4}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(2); - SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts)); + SendQueue::DataToSend dts(OutgoingMessageId(43), + gen_.Unordered({1, 2, 3, 4}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(3); - SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts)); + SendQueue::DataToSend dts(OutgoingMessageId(44), + gen_.Ordered({9, 10, 11, 12}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(4); - SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts)); + SendQueue::DataToSend dts(OutgoingMessageId(45), + gen_.Ordered({13, 14, 15, 16}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) @@ -773,11 +784,11 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { Pair(TSN(12), State::kNacked), // Pair(TSN(13), State::kAcked))); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(42))) .WillOnce(Return(true)); - EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(2), OutgoingMessageId(43))) .WillOnce(Return(true)); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(3), OutgoingMessageId(44))) .WillOnce(Return(true)); queue.HandleT3RtxTimerExpiry(); @@ -840,7 +851,8 @@ TEST_F(RetransmissionQueueTest, MeasureRTT) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + SendQueue::DataToSend dts(OutgoingMessageId(0), + gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) @@ -868,14 +880,14 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) + .WillOnce(CreateChunk(OutgoingMessageId(5))) + .WillOnce(CreateChunk(OutgoingMessageId(6))) + .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), @@ -898,14 +910,14 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) + .WillOnce(CreateChunk(OutgoingMessageId(5))) + .WillOnce(CreateChunk(OutgoingMessageId(6))) + .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), @@ -945,14 +957,14 @@ TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) { TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) + .WillOnce(CreateChunk(OutgoingMessageId(5))) + .WillOnce(CreateChunk(OutgoingMessageId(6))) + .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), @@ -987,13 +999,15 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize); std::vector payload(183); - return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "BE")); }) .WillOnce([this](TimeMs, size_t size) { EXPECT_EQ(size, 976 - DataChunk::kHeaderSize); std::vector payload(957); - return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + return SendQueue::DataToSend(OutgoingMessageId(1), + gen_.Ordered(payload, "BE")); }); std::vector> chunks_to_send = @@ -1005,17 +1019,18 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "")); + SendQueue::DataToSend dts(kMessageId, + gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) @@ -1035,8 +1050,7 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { EXPECT_EQ(queue.outstanding_items(), 3u); // Mark the message as lost. - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(1); + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); queue.HandleT3RtxTimerExpiry(); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); @@ -1071,12 +1085,14 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { TimeMs test_start = now_; EXPECT_CALL(producer_, Produce) .WillOnce([&](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", options)); + SendQueue::DataToSend dts(kMessageId, + gen_.Ordered({1, 2, 3, 4}, "B", options)); dts.expires_at = TimeMs(test_start + DurationMs(10)); return dts; }) .WillOnce([&](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "", options)); + SendQueue::DataToSend dts(kMessageId, + gen_.Ordered({5, 6, 7, 8}, "", options)); dts.expires_at = TimeMs(test_start + DurationMs(10)); return dts; }) @@ -1086,7 +1102,7 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { queue.GetChunksToSend(now_, 24); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(17), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId)) .WillOnce(Return(true)); now_ += DurationMs(100); @@ -1100,17 +1116,74 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { Pair(TSN(12), State::kAbandoned))); // Placeholder end } +TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { + RetransmissionQueue queue = CreateQueue(); + TimeMs test_start = now_; + EXPECT_CALL(producer_, Produce) + .WillOnce([&](TimeMs, size_t) { + SendQueue::DataToSend dts( + OutgoingMessageId(42), + gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); + dts.expires_at = TimeMs(test_start + DurationMs(10)); + return dts; + }) + .WillOnce([&](TimeMs, size_t) { + SendQueue::DataToSend dts( + OutgoingMessageId(43), + gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); + dts.expires_at = TimeMs(test_start + DurationMs(10)); + return dts; + }) + // Stream reset - MID reset to zero again. + .WillOnce([&](TimeMs, size_t) { + SendQueue::DataToSend dts( + OutgoingMessageId(44), + gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)})); + dts.expires_at = TimeMs(test_start + DurationMs(10)); + return dts; + }) + .WillOnce([&](TimeMs, size_t) { + SendQueue::DataToSend dts( + OutgoingMessageId(44), + gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)})); + dts.expires_at = TimeMs(test_start + DurationMs(10)); + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44))) + .WillOnce(Return(true)); + + EXPECT_THAT(queue.GetChunksToSend(now_, 24), + ElementsAre(Pair(TSN(10), Field(&Data::mid, MID(0))))); + EXPECT_THAT(queue.GetChunksToSend(now_, 24), + ElementsAre(Pair(TSN(11), Field(&Data::mid, MID(1))))); + EXPECT_THAT(queue.GetChunksToSend(now_, 24), + ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0))))); + + now_ += DurationMs(100); + EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); + + EXPECT_THAT( + queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // Initial TSN + Pair(TSN(10), State::kInFlight), // OutgoingMessageId=42, BE + Pair(TSN(11), State::kInFlight), // OutgoingMessageId=43, BE + Pair(TSN(12), State::kAbandoned), // OutgoingMessageId=44, B + Pair(TSN(13), State::kAbandoned), // Produced and expired + Pair(TSN(14), State::kAbandoned))); // Placeholder end +} + TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1128,8 +1201,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(0); + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0); queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {})); @@ -1155,7 +1227,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(false)); queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {})); @@ -1175,19 +1247,19 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](TimeMs, size_t) { - SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE")); + SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(2); return dts; }) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) + .WillOnce(CreateChunk(OutgoingMessageId(5))) + .WillOnce(CreateChunk(OutgoingMessageId(6))) + .WillOnce(CreateChunk(OutgoingMessageId(7))) + .WillOnce(CreateChunk(OutgoingMessageId(8))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1213,8 +1285,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { Pair(TSN(18), State::kInFlight), // Pair(TSN(19), State::kInFlight))); - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(0); + EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(8))).Times(0); // Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it. for (int tsn = 11; tsn <= 13; ++tsn) { @@ -1284,7 +1355,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); // Ack TSN 19 - three more nacks for TSN 10, no more retransmissions. - EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(false)); queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {})); @@ -1316,7 +1387,8 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) { std::vector payload(1000); EXPECT_CALL(producer_, Produce) .WillOnce([this, payload](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered(payload, "BE")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "BE")); }) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); @@ -1344,6 +1416,7 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { EXPECT_CALL(producer_, Produce) .WillOnce([chunk_size, this](TimeMs, size_t) { return SendQueue::DataToSend( + OutgoingMessageId(0), gen_.Ordered(std::vector(chunk_size), "BE")); }) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); @@ -1376,6 +1449,7 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) { EXPECT_CALL(producer_, Produce) .WillOnce([chunk_size, this](TimeMs, size_t) { return SendQueue::DataToSend( + OutgoingMessageId(0), gen_.Ordered(std::vector(chunk_size), "BE")); }) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); @@ -1392,7 +1466,7 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) { TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); @@ -1408,14 +1482,14 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) { TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) + .WillOnce(CreateChunk(OutgoingMessageId(5))) + .WillOnce(CreateChunk(OutgoingMessageId(6))) + .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8)); EXPECT_EQ( @@ -1428,7 +1502,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 18 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(8))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); @@ -1440,7 +1514,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 19 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(9))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); @@ -1452,7 +1526,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 20 EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(10))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); @@ -1487,8 +1561,8 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { TEST_F(RetransmissionQueueTest, HandoverTest) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(0))) + .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2)); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); @@ -1497,9 +1571,9 @@ TEST_F(RetransmissionQueueTest, HandoverTest) { CreateQueueByHandover(queue); EXPECT_CALL(producer_, Produce) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) - .WillOnce(CreateChunk()) + .WillOnce(CreateChunk(OutgoingMessageId(2))) + .WillOnce(CreateChunk(OutgoingMessageId(3))) + .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(*handedover_queue), testing::ElementsAre(TSN(12), TSN(13), TSN(14))); @@ -1519,19 +1593,24 @@ TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) { EXPECT_CALL(producer_, Produce) .WillOnce([this, payload](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered(payload, "B")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "B")); }) .WillOnce([this, payload](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered(payload, "")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "")); }) .WillOnce([this, payload](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered(payload, "")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "")); }) .WillOnce([this, payload](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered(payload, "")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "")); }) .WillOnce([this, payload](TimeMs, size_t) { - return SendQueue::DataToSend(gen_.Ordered(payload, "E")); + return SendQueue::DataToSend(OutgoingMessageId(0), + gen_.Ordered(payload, "E")); }) .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 4212d3e0de..facb432c59 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -20,6 +20,7 @@ #include "absl/algorithm/container.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" @@ -123,7 +124,10 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, bool was_active = bytes_to_send_in_next_message() > 0; buffered_amount_.Increase(message.payload().size()); parent_.total_buffered_amount_.Increase(message.payload().size()); - items_.emplace_back(std::move(message), std::move(attributes)); + OutgoingMessageId message_id = parent_.current_message_id; + parent_.current_message_id = + OutgoingMessageId(*parent_.current_message_id + 1); + items_.emplace_back(message_id, std::move(message), std::move(attributes)); if (!was_active) { scheduler_stream_->MaybeMakeActive(); @@ -184,9 +188,10 @@ absl::optional RRSendQueue::OutgoingStream::Produce( buffered_amount_.Decrease(payload.size()); parent_.total_buffered_amount_.Decrease(payload.size()); - SendQueue::DataToSend chunk(Data( - stream_id, item.ssn.value_or(SSN(0)), item.mid.value(), fsn, ppid, - std::move(payload), is_beginning, is_end, item.attributes.unordered)); + SendQueue::DataToSend chunk( + item.message_id, Data(stream_id, item.ssn.value_or(SSN(0)), *item.mid, + fsn, ppid, std::move(payload), is_beginning, + is_end, item.attributes.unordered)); chunk.max_retransmissions = item.attributes.max_retransmissions; chunk.expires_at = item.attributes.expires_at; chunk.lifecycle_id = @@ -230,12 +235,11 @@ void RRSendQueue::OutgoingStream::HandleMessageExpired( } } -bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, MID mid) { +bool RRSendQueue::OutgoingStream::Discard(OutgoingMessageId message_id) { bool result = false; if (!items_.empty()) { Item& item = items_.front(); - if (item.attributes.unordered == unordered && item.mid.has_value() && - *item.mid == mid) { + if (item.message_id == message_id) { HandleMessageExpired(item); items_.pop_front(); @@ -384,8 +388,8 @@ absl::optional RRSendQueue::Produce(TimeMs now, return scheduler_.Produce(now, max_size); } -bool RRSendQueue::Discard(IsUnordered unordered, StreamID stream_id, MID mid) { - bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(unordered, mid); +bool RRSendQueue::Discard(StreamID stream_id, OutgoingMessageId message_id) { + bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(message_id); RTC_DCHECK(IsConsistent()); return has_discarded; diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index e30fbe5105..bef5fe437d 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -22,6 +22,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" @@ -76,7 +77,7 @@ class RRSendQueue : public SendQueue { // Implementation of `SendQueue`. absl::optional Produce(TimeMs now, size_t max_size) override; - bool Discard(IsUnordered unordered, StreamID stream_id, MID mid) override; + bool Discard(StreamID stream_id, OutgoingMessageId message_id) override; void PrepareResetStream(StreamID streams) override; bool HasStreamsReadyToBeReset() const override; std::vector GetStreamsReadyToBeReset() override; @@ -161,7 +162,7 @@ class RRSendQueue : public SendQueue { ThresholdWatcher& buffered_amount() { return buffered_amount_; } // Discards a partially sent message, see `SendQueue::Discard`. - bool Discard(IsUnordered unordered, MID mid); + bool Discard(OutgoingMessageId message_id); // Pauses this stream, which is used before resetting it. void Pause(); @@ -217,11 +218,15 @@ class RRSendQueue : public SendQueue { // An enqueued message and metadata. struct Item { - explicit Item(DcSctpMessage msg, MessageAttributes attributes) - : message(std::move(msg)), + explicit Item(OutgoingMessageId message_id, + DcSctpMessage msg, + MessageAttributes attributes) + : message_id(message_id), + message(std::move(msg)), attributes(std::move(attributes)), remaining_offset(0), remaining_size(message.payload().size()) {} + OutgoingMessageId message_id; DcSctpMessage message; MessageAttributes attributes; // The remaining payload (offset and size) to be sent, when it has been @@ -267,6 +272,7 @@ class RRSendQueue : public SendQueue { DcSctpSocketCallbacks& callbacks_; const size_t buffer_size_; const StreamPriority default_priority_; + OutgoingMessageId current_message_id = OutgoingMessageId(0); StreamScheduler scheduler_; // The total amount of buffer data, for all streams. diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 1684ffda49..9d6da7bdff 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -13,6 +13,7 @@ #include #include +#include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_options.h" @@ -227,8 +228,7 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) { ASSERT_TRUE(chunk_one.has_value()); EXPECT_FALSE(chunk_one->data.is_end); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, - chunk_one->data.mid); + buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id); absl::optional chunk_two = buf_.Produce(kNow, kOneFragmentPacketSize); @@ -244,8 +244,7 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) { ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); // Calling it again shouldn't cause issues. - buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, - chunk_one->data.mid); + buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id); ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize)); } @@ -366,6 +365,32 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) { EXPECT_EQ(chunk_three->data.ssn, SSN(0)); } +TEST_F(RRSendQueueTest, CommittingDoesNotResetMessageId) { + std::vector payload(50); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk1.data.ssn, SSN(0)); + EXPECT_EQ(chunk1.message_id, OutgoingMessageId(0)); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk2.data.ssn, SSN(1)); + EXPECT_EQ(chunk2.message_id, OutgoingMessageId(1)); + + buf_.PrepareResetStream(kStreamID); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), UnorderedElementsAre(kStreamID)); + buf_.CommitResetStreams(); + + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3, + buf_.Produce(kNow, kOneFragmentPacketSize)); + EXPECT_EQ(chunk3.data.ssn, SSN(0)); + EXPECT_EQ(chunk3.message_id, OutgoingMessageId(2)); +} + TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) { std::vector payload(50); @@ -859,8 +884,7 @@ TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) { EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1), /*maybe_delivered=*/false)); EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1))); - buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, - chunk_one->data.mid); + buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id); } } // namespace } // namespace dcsctp diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h index c969917913..48eaefaf6a 100644 --- a/net/dcsctp/tx/send_queue.h +++ b/net/dcsctp/tx/send_queue.h @@ -27,7 +27,11 @@ class SendQueue { public: // Container for a data chunk that is produced by the SendQueue struct DataToSend { - explicit DataToSend(Data data) : data(std::move(data)) {} + DataToSend(OutgoingMessageId message_id, Data data) + : message_id(message_id), data(std::move(data)) {} + + OutgoingMessageId message_id; + // The data to send, including all parameters. Data data; @@ -53,8 +57,8 @@ class SendQueue { // including any headers. virtual absl::optional Produce(TimeMs now, size_t max_size) = 0; - // Discards a partially sent message identified by the parameters `unordered`, - // `stream_id` and `mid`. The `mid` comes from the returned + // Discards a partially sent message identified by the parameters + // `stream_id` and `message_id`. The `message_id` comes from the returned // information when having called `Produce`. A partially sent message means // that it has had at least one fragment of it returned when `Produce` was // called prior to calling this method). @@ -67,7 +71,7 @@ class SendQueue { // // This function returns true if this message had unsent fragments still in // the queue that were discarded, and false if there were no such fragments. - virtual bool Discard(IsUnordered unordered, StreamID stream_id, MID mid) = 0; + virtual bool Discard(StreamID stream_id, OutgoingMessageId message_id) = 0; // Prepares the stream to be reset. This is used to close a WebRTC data // channel and will be signaled to the other side. diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc index e55272b16b..4f5fb0fb84 100644 --- a/net/dcsctp/tx/stream_scheduler_test.cc +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -39,11 +39,16 @@ MATCHER_P(HasDataWithMid, mid, "") { } std::function(TimeMs, size_t)> -CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) { - return [sid, mid, payload_size](TimeMs now, size_t max_size) { - return SendQueue::DataToSend(Data( - sid, SSN(0), mid, FSN(0), PPID(42), std::vector(payload_size), - Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true))); +CreateChunk(OutgoingMessageId message_id, + StreamID sid, + MID mid, + size_t payload_size = kPayloadSize) { + return [sid, mid, payload_size, message_id](TimeMs now, size_t max_size) { + return SendQueue::DataToSend( + message_id, + Data(sid, SSN(0), mid, FSN(0), PPID(42), + std::vector(payload_size), Data::IsBeginning(true), + Data::IsEnd(true), IsUnordered(true))); }; } @@ -76,7 +81,8 @@ class TestStream { StreamPriority priority, size_t packet_size = kPayloadSize) { EXPECT_CALL(producer_, Produce) - .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size)); + .WillRepeatedly( + CreateChunk(OutgoingMessageId(0), stream_id, MID(0), packet_size)); EXPECT_CALL(producer_, bytes_to_send_in_next_message) .WillRepeatedly(Return(packet_size)); stream_ = scheduler.CreateStream(&producer_, stream_id, priority); @@ -117,7 +123,8 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { StreamScheduler scheduler("", kMtu); StrictMock producer; - EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); + EXPECT_CALL(producer, Produce) + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0))); EXPECT_CALL(producer, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(0)); @@ -135,9 +142,9 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { StrictMock producer1; EXPECT_CALL(producer1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(100))) - .WillOnce(CreateChunk(StreamID(1), MID(101))) - .WillOnce(CreateChunk(StreamID(1), MID(102))); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100))) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101))) + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102))); EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -149,9 +156,9 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { StrictMock producer2; EXPECT_CALL(producer2, Produce) - .WillOnce(CreateChunk(StreamID(2), MID(200))) - .WillOnce(CreateChunk(StreamID(2), MID(201))) - .WillOnce(CreateChunk(StreamID(2), MID(202))); + .WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(200))) + .WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(201))) + .WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(2), MID(202))); EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -177,26 +184,29 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { StrictMock producer1; EXPECT_CALL(producer1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100))) .WillOnce([](...) { return SendQueue::DataToSend( + OutgoingMessageId(1), Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), std::vector(4), Data::IsBeginning(true), Data::IsEnd(false), IsUnordered(true))); }) .WillOnce([](...) { return SendQueue::DataToSend( + OutgoingMessageId(1), Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), std::vector(4), Data::IsBeginning(false), Data::IsEnd(false), IsUnordered(true))); }) .WillOnce([](...) { return SendQueue::DataToSend( + OutgoingMessageId(1), Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), std::vector(4), Data::IsBeginning(false), Data::IsEnd(true), IsUnordered(true))); }) - .WillOnce(CreateChunk(StreamID(1), MID(102))); + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102))); EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -210,9 +220,9 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { StrictMock producer2; EXPECT_CALL(producer2, Produce) - .WillOnce(CreateChunk(StreamID(2), MID(200))) - .WillOnce(CreateChunk(StreamID(2), MID(201))) - .WillOnce(CreateChunk(StreamID(2), MID(202))); + .WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200))) + .WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201))) + .WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202))); EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -239,8 +249,8 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { StrictMock producer1; EXPECT_CALL(producer1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(100))) - .WillOnce(CreateChunk(StreamID(1), MID(101))); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100))) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101))); EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -264,9 +274,9 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { StrictMock producer1; // Callbacks are setup so that they hint that there is a MID(2) coming... EXPECT_CALL(producer1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(100))) - .WillOnce(CreateChunk(StreamID(1), MID(101))) - .WillOnce(CreateChunk(StreamID(1), MID(102))); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100))) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101))) + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102))); EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -293,9 +303,9 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { StrictMock producer1; EXPECT_CALL(producer1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(100))) - .WillOnce(CreateChunk(StreamID(1), MID(101))) - .WillOnce(CreateChunk(StreamID(1), MID(102))); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100))) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101))) + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102))); EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -308,9 +318,9 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { StrictMock producer2; EXPECT_CALL(producer2, Produce) - .WillOnce(CreateChunk(StreamID(2), MID(200))) - .WillOnce(CreateChunk(StreamID(2), MID(201))) - .WillOnce(CreateChunk(StreamID(2), MID(202))); + .WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200))) + .WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201))) + .WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202))); EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -384,9 +394,12 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { StrictMock callback1; EXPECT_CALL(callback1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket)) - .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) - .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket)); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100), + kSmallPacket)) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101), + kSmallPacket)) + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102), + kSmallPacket)); EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kSmallPacket)) // When making active .WillOnce(Return(kSmallPacket)) @@ -398,9 +411,12 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { StrictMock callback2; EXPECT_CALL(callback2, Produce) - .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket)) - .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) - .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket)); + .WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200), + kLargePacket)) + .WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201), + kLargePacket)) + .WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202), + kLargePacket)); EXPECT_CALL(callback2, bytes_to_send_in_next_message) .WillOnce(Return(kLargePacket)) // When making active .WillOnce(Return(kLargePacket)) @@ -432,9 +448,9 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { StrictMock callback1; EXPECT_CALL(callback1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(100))) - .WillOnce(CreateChunk(StreamID(1), MID(101))) - .WillOnce(CreateChunk(StreamID(1), MID(102))); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100))) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101))) + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102))); EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -447,9 +463,9 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { StrictMock callback2; EXPECT_CALL(callback2, Produce) - .WillOnce(CreateChunk(StreamID(2), MID(200))) - .WillOnce(CreateChunk(StreamID(2), MID(201))) - .WillOnce(CreateChunk(StreamID(2), MID(202))); + .WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200))) + .WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201))) + .WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202))); EXPECT_CALL(callback2, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -462,9 +478,9 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { StrictMock callback3; EXPECT_CALL(callback3, Produce) - .WillOnce(CreateChunk(StreamID(3), MID(300))) - .WillOnce(CreateChunk(StreamID(3), MID(301))) - .WillOnce(CreateChunk(StreamID(3), MID(302))); + .WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300))) + .WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301))) + .WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302))); EXPECT_CALL(callback3, bytes_to_send_in_next_message) .WillOnce(Return(kPayloadSize)) // When making active .WillOnce(Return(kPayloadSize)) @@ -510,11 +526,14 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { StrictMock callback1; EXPECT_CALL(callback1, Produce) // virtual finish time ~ 0 + 50 * 80 = 4000 - .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket)) + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100), + kMediumPacket)) // virtual finish time ~ 4000 + 20 * 80 = 5600 - .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101), + kSmallPacket)) // virtual finish time ~ 5600 + 70 * 80 = 11200 - .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket)); + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102), + kLargePacket)); EXPECT_CALL(callback1, bytes_to_send_in_next_message) .WillOnce(Return(kMediumPacket)) // When making active .WillOnce(Return(kSmallPacket)) @@ -528,11 +547,14 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { StrictMock callback2; EXPECT_CALL(callback2, Produce) // virtual finish time ~ 0 + 50 * 50 = 2500 - .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket)) + .WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200), + kMediumPacket)) // virtual finish time ~ 2500 + 70 * 50 = 6000 - .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) + .WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201), + kLargePacket)) // virtual finish time ~ 6000 + 20 * 50 = 7000 - .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket)); + .WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202), + kSmallPacket)); EXPECT_CALL(callback2, bytes_to_send_in_next_message) .WillOnce(Return(kMediumPacket)) // When making active .WillOnce(Return(kLargePacket)) @@ -546,11 +568,14 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { StrictMock callback3; EXPECT_CALL(callback3, Produce) // virtual finish time ~ 0 + 20 * 20 = 400 - .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket)) + .WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300), + kSmallPacket)) // virtual finish time ~ 400 + 50 * 20 = 1400 - .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket)) + .WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301), + kMediumPacket)) // virtual finish time ~ 1400 + 70 * 20 = 2800 - .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket)); + .WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302), + kLargePacket)); EXPECT_CALL(callback3, bytes_to_send_in_next_message) .WillOnce(Return(kSmallPacket)) // When making active .WillOnce(Return(kMediumPacket)) @@ -677,8 +702,8 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { StrictMock producer1; EXPECT_CALL(producer1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(0), 100)) - .WillOnce(CreateChunk(StreamID(1), MID(0), 100)); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 100)) + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(0), 100)); EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(200)) // When making active .WillOnce(Return(100)) @@ -689,8 +714,8 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { StrictMock producer2; EXPECT_CALL(producer2, Produce) - .WillOnce(CreateChunk(StreamID(2), MID(1), 100)) - .WillOnce(CreateChunk(StreamID(2), MID(1), 50)); + .WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(2), MID(1), 100)) + .WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(1), 50)); EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(150)) // When making active .WillOnce(Return(50)) @@ -714,7 +739,7 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { StrictMock producer1; EXPECT_CALL(producer1, Produce) - .WillOnce(CreateChunk(StreamID(1), MID(0), 200)); + .WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 200)); EXPECT_CALL(producer1, bytes_to_send_in_next_message) .WillOnce(Return(200)) // When making active .WillOnce(Return(0)); @@ -724,7 +749,7 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { StrictMock producer2; EXPECT_CALL(producer2, Produce) - .WillOnce(CreateChunk(StreamID(2), MID(1), 150)); + .WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(2), MID(1), 150)); EXPECT_CALL(producer2, bytes_to_send_in_next_message) .WillOnce(Return(150)) // When making active .WillOnce(Return(0));