dcsctp: Abandon correct message on stream reset

Before this CL, a message was identified by the triple (stream_id,
is_unordered, MID) (and yes, the MID is always present in the send
queue, even when interleaved message is not enabled.). So when a chunk
was abandoned due to e.g. having reached the retransmission limit, all
other chunks for that message in the retransmission queue, and all
unsent chunks in the send queue were discarded as well.

This works well, except for the fact that resetting a stream will result
in the MID being set to zero again, which can result in two different
messages having the same identifying triple. And due to the
implementation, both messages would get abandoned.

In WebRTC, an entire data channels is either reliable or unreliable, and
for a message to be abandoned, the channel must be unreliable. So this
means that in the case of stream resets - meaning that a channel was
closed and then reopened, an abandoned message from the old (now closed)
channel would result in abandoning another message sent on the re-opened
data channel.

This CL introduces a new internal property on messages while in the
retransmission and send queue; The "outgoing message id". It's a
monotonically increasing identifier - shared among all streams - that is
never reset to zero in the event of a stream reset. And now a message is
actually only identified by the outgoing message id, but often used
together with the stream identifier, as all data in the send queue is
partitioned by stream. This identifier is 32 bits wide, allowing at most
four billion messages to be in-flight, which is not a limitation, as the
TSN is also 32 bits wide.

Bug: webrtc:14600
Change-Id: I33c23fb0e4bde95327b15d1999e76aa43f5fa7db
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/322603
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40881}
This commit is contained in:
Victor Boivie 2023-10-04 14:25:26 +02:00 committed by WebRTC LUCI CQ
parent b01cd19fd7
commit 9cf825ded9
13 changed files with 486 additions and 351 deletions

View File

@ -40,5 +40,10 @@ using VerificationTag = webrtc::StrongAlias<class VerificationTagTag, uint32_t>;
// Tie Tag, used as a nonce when connecting.
using TieTag = webrtc::StrongAlias<class TieTagTag, uint64_t>;
// An ID for every outgoing message, to correlate outgoing data chunks with the
// message it was carved from.
using OutgoingMessageId =
webrtc::StrongAlias<class OutgoingMessageIdTag, uint32_t>;
} // namespace dcsctp
#endif // NET_DCSCTP_COMMON_INTERNAL_TYPES_H_

View File

@ -29,6 +29,7 @@ rtc_library("rr_send_queue") {
"../../../rtc_base:checks",
"../../../rtc_base:logging",
"../../../rtc_base/containers:flat_map",
"../common:internal_types",
"../common:str_join",
"../packet:data",
"../public:socket",
@ -105,6 +106,7 @@ rtc_library("outstanding_data") {
"../../../rtc_base:checks",
"../../../rtc_base:logging",
"../../../rtc_base/containers:flat_set",
"../common:internal_types",
"../common:math",
"../common:sequence_numbers",
"../common:str_join",
@ -184,6 +186,7 @@ if (rtc_include_tests) {
"../../../rtc_base:gunit_helpers",
"../../../test:test_support",
"../common:handover_testing",
"../common:internal_types",
"../common:math",
"../common:sequence_numbers",
"../packet:chunk",

View File

@ -34,7 +34,7 @@ class MockSendQueue : public SendQueue {
(override));
MOCK_METHOD(bool,
Discard,
(IsUnordered unordered, StreamID stream_id, MID mid),
(StreamID stream_id, OutgoingMessageId message_id),
(override));
MOCK_METHOD(void, PrepareResetStream, (StreamID stream_id), (override));
MOCK_METHOD(bool, HasStreamsReadyToBeReset, (), (const, override));

View File

@ -63,6 +63,8 @@ void OutstandingData::Item::MarkAsRetransmitted() {
}
void OutstandingData::Item::Abandon() {
RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() ||
max_retransmissions_ != MaxRetransmits::NoLimit());
lifecycle_ = Lifecycle::kAbandoned;
}
@ -252,6 +254,7 @@ bool OutstandingData::NackItem(UnwrappedTSN tsn,
RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " marked for retransmission";
break;
case Item::NackAction::kAbandon:
RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " Nacked, resulted in abandoning";
AbandonAllFor(item);
break;
}
@ -260,8 +263,7 @@ bool OutstandingData::NackItem(UnwrappedTSN tsn,
void OutstandingData::AbandonAllFor(const Item& item) {
// Erase all remaining chunks from the producer, if any.
if (discard_from_send_queue_(item.data().is_unordered, item.data().stream_id,
item.data().mid)) {
if (discard_from_send_queue_(item.data().stream_id, item.message_id())) {
// There were remaining chunks to be produced for this message. Since the
// receiver may have already received all chunks (up till now) for this
// message, we can't just FORWARD-TSN to the last fragment in this
@ -279,10 +281,10 @@ void OutstandingData::AbandonAllFor(const Item& item) {
Item& added_item =
outstanding_data_
.emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
std::forward_as_tuple(std::move(message_end), TimeMs(0),
MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture(),
LifecycleId::NotSet()))
std::forward_as_tuple(
item.message_id(), std::move(message_end), TimeMs(0),
MaxRetransmits(0), TimeMs::InfiniteFuture(),
LifecycleId::NotSet()))
.first->second;
// The added chunk shouldn't be included in `outstanding_bytes`, so set it
// as acked.
@ -294,8 +296,7 @@ void OutstandingData::AbandonAllFor(const Item& item) {
for (auto& [tsn, other] : outstanding_data_) {
if (!other.is_abandoned() &&
other.data().stream_id == item.data().stream_id &&
other.data().is_unordered == item.data().is_unordered &&
other.data().mid == item.data().mid) {
other.message_id() == item.message_id()) {
RTC_DLOG(LS_VERBOSE) << "Marking chunk " << *tsn.Wrap()
<< " as abandoned";
if (other.should_be_retransmitted()) {
@ -395,6 +396,7 @@ UnwrappedTSN OutstandingData::highest_outstanding_tsn() const {
}
absl::optional<UnwrappedTSN> OutstandingData::Insert(
OutgoingMessageId message_id,
const Data& data,
TimeMs time_sent,
MaxRetransmits max_retransmissions,
@ -409,9 +411,9 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
++outstanding_items_;
auto it = outstanding_data_
.emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
std::forward_as_tuple(data.Clone(), time_sent,
max_retransmissions, expires_at,
lifecycle_id))
std::forward_as_tuple(message_id, data.Clone(),
time_sent, max_retransmissions,
expires_at, lifecycle_id))
.first;
if (it->second.has_expired(time_sent)) {

View File

@ -16,6 +16,7 @@
#include <vector>
#include "absl/types/optional.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
@ -75,7 +76,7 @@ class OutstandingData {
size_t data_chunk_header_size,
UnwrappedTSN next_tsn,
UnwrappedTSN last_cumulative_tsn_ack,
std::function<bool(IsUnordered, StreamID, MID)> discard_from_send_queue)
std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue)
: data_chunk_header_size_(data_chunk_header_size),
next_tsn_(next_tsn),
last_cumulative_tsn_ack_(last_cumulative_tsn_ack),
@ -128,6 +129,7 @@ class OutstandingData {
// parameters. Returns the TSN if the item was actually added and scheduled to
// be sent, and absl::nullopt if it shouldn't be sent.
absl::optional<UnwrappedTSN> Insert(
OutgoingMessageId message_id,
const Data& data,
TimeMs time_sent,
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(),
@ -175,12 +177,14 @@ class OutstandingData {
kAbandon,
};
Item(Data data,
Item(OutgoingMessageId message_id,
Data data,
TimeMs time_sent,
MaxRetransmits max_retransmissions,
TimeMs expires_at,
LifecycleId lifecycle_id)
: time_sent_(time_sent),
: message_id_(message_id),
time_sent_(time_sent),
max_retransmissions_(max_retransmissions),
expires_at_(expires_at),
lifecycle_id_(lifecycle_id),
@ -189,6 +193,8 @@ class OutstandingData {
Item(const Item&) = delete;
Item& operator=(const Item&) = delete;
OutgoingMessageId message_id() const { return message_id_; }
TimeMs time_sent() const { return time_sent_; }
const Data& data() const { return data_; }
@ -249,6 +255,8 @@ class OutstandingData {
// NOTE: This data structure has been optimized for size, by ordering fields
// to avoid unnecessary padding.
const OutgoingMessageId message_id_;
// When the packet was sent, and placed in this queue.
const TimeMs time_sent_;
// If the message was sent with a maximum number of retransmissions, this is
@ -338,7 +346,7 @@ class OutstandingData {
// The last cumulative TSN ack number.
UnwrappedTSN last_cumulative_tsn_ack_;
// Callback when to discard items from the send queue.
std::function<bool(IsUnordered, StreamID, MID)> discard_from_send_queue_;
std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue_;
std::map<UnwrappedTSN, Item> outstanding_data_;
// The number of bytes that are in-flight (sent but not yet acked or nacked).

View File

@ -12,10 +12,12 @@
#include <vector>
#include "absl/types/optional.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/data_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/testing/data_generator.h"
#include "net/dcsctp/testing/testing_macros.h"
@ -37,6 +39,7 @@ using ::testing::StrictMock;
using ::testing::UnorderedElementsAre;
constexpr TimeMs kNow(42);
constexpr OutgoingMessageId kMessageId = OutgoingMessageId(17);
class OutstandingDataTest : public testing::Test {
protected:
@ -49,7 +52,7 @@ class OutstandingDataTest : public testing::Test {
UnwrappedTSN::Unwrapper unwrapper_;
DataGenerator gen_;
StrictMock<MockFunction<bool(IsUnordered, StreamID, MID)>> on_discard_;
StrictMock<MockFunction<bool(StreamID, OutgoingMessageId)>> on_discard_;
OutstandingData buf_;
};
@ -67,8 +70,8 @@ TEST_F(OutstandingDataTest, HasInitialState) {
}
TEST_F(OutstandingDataTest, InsertChunk) {
ASSERT_HAS_VALUE_AND_ASSIGN(UnwrappedTSN tsn,
buf_.Insert(gen_.Ordered({1}, "BE"), kNow));
ASSERT_HAS_VALUE_AND_ASSIGN(
UnwrappedTSN tsn, buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow));
EXPECT_EQ(tsn.Wrap(), TSN(10));
@ -84,7 +87,7 @@ TEST_F(OutstandingDataTest, InsertChunk) {
}
TEST_F(OutstandingDataTest, AcksSingleChunk) {
buf_.Insert(gen_.Ordered({1}, "BE"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
OutstandingData::AckInfo ack =
buf_.HandleSack(unwrapper_.Unwrap(TSN(10)), {}, false);
@ -103,7 +106,7 @@ TEST_F(OutstandingDataTest, AcksSingleChunk) {
}
TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) {
buf_.Insert(gen_.Ordered({1}, "BE"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false);
EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1));
@ -118,8 +121,8 @@ TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) {
}
TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow);
buf_.Insert(gen_.Ordered({1}, "E"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow);
std::vector<SackChunk::GapAckBlock> gab = {SackChunk::GapAckBlock(2, 2)};
OutstandingData::AckInfo ack =
@ -141,8 +144,8 @@ TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) {
}
TEST_F(OutstandingDataTest, NacksThreeTimesWithSameTsnDoesntRetransmit) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow);
buf_.Insert(gen_.Ordered({1}, "E"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow);
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -164,10 +167,10 @@ TEST_F(OutstandingDataTest, NacksThreeTimesWithSameTsnDoesntRetransmit) {
}
TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, "E"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow);
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -202,10 +205,10 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) {
TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) {
static constexpr MaxRetransmits kMaxRetransmissions(0);
buf_.Insert(gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions);
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -217,7 +220,7 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) {
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(false));
std::vector<SackChunk::GapAckBlock> gab3 = {SackChunk::GapAckBlock(2, 4)};
OutstandingData::AckInfo ack =
@ -238,10 +241,10 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) {
TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) {
static constexpr MaxRetransmits kMaxRetransmissions(0);
buf_.Insert(gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -253,7 +256,7 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) {
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(true));
std::vector<SackChunk::GapAckBlock> gab3 = {SackChunk::GapAckBlock(2, 4)};
OutstandingData::AckInfo ack =
@ -275,17 +278,19 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) {
TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) {
static constexpr TimeMs kExpiresAt = kNow + DurationMs(1);
EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, "B"), kNow,
EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow,
MaxRetransmits::NoLimit(), kExpiresAt)
.has_value());
EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, ""), kNow + DurationMs(0),
MaxRetransmits::NoLimit(), kExpiresAt)
EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, ""),
kNow + DurationMs(0), MaxRetransmits::NoLimit(),
kExpiresAt)
.has_value());
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(false));
EXPECT_FALSE(buf_.Insert(gen_.Ordered({1}, "E"), kNow + DurationMs(1),
MaxRetransmits::NoLimit(), kExpiresAt)
EXPECT_FALSE(buf_.Insert(kMessageId, gen_.Ordered({1}, "E"),
kNow + DurationMs(1), MaxRetransmits::NoLimit(),
kExpiresAt)
.has_value());
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
@ -301,11 +306,11 @@ TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) {
TEST_F(OutstandingDataTest, CanGenerateForwardTsn) {
static constexpr MaxRetransmits kMaxRetransmissions(0);
buf_.Insert(gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, kMaxRetransmissions);
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, kMaxRetransmissions);
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(false));
buf_.NackAll();
@ -322,14 +327,14 @@ TEST_F(OutstandingDataTest, CanGenerateForwardTsn) {
}
TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, ""), kNow);
buf_.Insert(gen_.Ordered({1}, "E"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow);
EXPECT_THAT(buf_.GetChunkStatesForTesting(),
testing::ElementsAre(Pair(TSN(9), State::kAcked), //
@ -356,9 +361,9 @@ TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) {
}
TEST_F(OutstandingDataTest, MeasureRTT) {
buf_.Insert(gen_.Ordered({1}, "BE"), kNow);
buf_.Insert(gen_.Ordered({1}, "BE"), kNow + DurationMs(1));
buf_.Insert(gen_.Ordered({1}, "BE"), kNow + DurationMs(2));
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(1));
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(2));
static constexpr DurationMs kDuration(123);
ASSERT_HAS_VALUE_AND_ASSIGN(
@ -375,7 +380,8 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) {
static constexpr MaxRetransmits kOneRetransmission(1);
for (int tsn = 10; tsn <= 20; ++tsn) {
buf_.Insert(gen_.Ordered({1}, tsn == 10 ? "B"
buf_.Insert(kMessageId,
gen_.Ordered({1}, tsn == 10 ? "B"
: tsn == 20 ? "E"
: ""),
kNow, kOneRetransmission);
@ -434,7 +440,7 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) {
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab8, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(false));
std::vector<SackChunk::GapAckBlock> gab9 = {SackChunk::GapAckBlock(2, 10)};
@ -445,54 +451,16 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) {
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
}
TEST_F(OutstandingDataTest, CanAbandonChunksMarkedForFastRetransmit) {
// This test is a bit convoluted, and can't really happen with a well behaving
// client, but this was found by fuzzers. This test will verify that a message
// that was both marked as "to be fast retransmitted" and "abandoned" at the
// same time doesn't cause any consistency issues.
// Add chunks 10-14, but chunk 11 has zero retransmissions. When chunk 10 and
// 11 are NACKed three times, chunk 10 will be marked for retransmission, but
// chunk 11 will be abandoned, which also abandons chunk 10, as it's part of
// the same message.
buf_.Insert(gen_.Ordered({1}, "B"), kNow); // 10
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); // 11
buf_.Insert(gen_.Ordered({1}, ""), kNow); // 12
buf_.Insert(gen_.Ordered({1}, ""), kNow); // 13
buf_.Insert(gen_.Ordered({1}, "E"), kNow); // 14
// ACK 9, 12
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(3, 3)};
EXPECT_FALSE(
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
// ACK 9, 12, 13
std::vector<SackChunk::GapAckBlock> gab2 = {SackChunk::GapAckBlock(3, 4)};
EXPECT_FALSE(
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
// ACK 9, 12, 13, 14
std::vector<SackChunk::GapAckBlock> gab3 = {SackChunk::GapAckBlock(3, 5)};
OutstandingData::AckInfo ack =
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false);
EXPECT_TRUE(ack.has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_THAT(buf_.GetChunksToBeFastRetransmitted(1000), IsEmpty());
EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000), IsEmpty());
}
TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) {
buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture(), LifecycleId(42));
buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture(), LifecycleId(43));
buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture(), LifecycleId(44));
buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE"), kNow,
MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(),
LifecycleId(42));
buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE"), kNow,
MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(),
LifecycleId(43));
buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE"), kNow,
MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(),
LifecycleId(44));
OutstandingData::AckInfo ack1 =
buf_.HandleSack(unwrapper_.Unwrap(TSN(11)), {}, false);
@ -507,10 +475,10 @@ TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) {
}
TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
TimeMs::InfiniteFuture(), LifecycleId(42));
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
@ -524,7 +492,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) {
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
std::vector<SackChunk::GapAckBlock> gab3 = {SackChunk::GapAckBlock(2, 4)};
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(false));
OutstandingData::AckInfo ack1 =
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false);
@ -543,10 +511,10 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) {
}
TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
TimeMs::InfiniteFuture(), LifecycleId(42));
EXPECT_THAT(buf_.GetChunkStatesForTesting(),
@ -569,7 +537,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) {
Pair(TSN(13), State::kAcked)));
// T3-rtx triggered.
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(false));
buf_.NackAll();
@ -609,24 +577,31 @@ TEST_F(OutstandingDataTest, GeneratesForwardTsnUntilNextStreamResetTsn) {
constexpr DataGeneratorOptions kStream1 = {.stream_id = StreamID(1)};
constexpr DataGeneratorOptions kStream2 = {.stream_id = StreamID(2)};
constexpr DataGeneratorOptions kStream3 = {.stream_id = StreamID(3)};
constexpr MaxRetransmits kNoRtx = MaxRetransmits(0);
EXPECT_CALL(on_discard_, Call).WillRepeatedly(Return(false));
// TSN 10-12
buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow);
buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow);
buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow, MaxRetransmits(0));
buf_.Insert(OutgoingMessageId(0), gen_.Ordered({1}, "BE", kStream1), kNow,
kNoRtx);
buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE", kStream1), kNow,
kNoRtx);
buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE", kStream1), kNow,
kNoRtx);
buf_.BeginResetStreams();
// TSN 13, 14
buf_.Insert(gen_.Ordered({1}, "BE", kStream2), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, "BE", kStream2), kNow, MaxRetransmits(0));
buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE", kStream2), kNow,
kNoRtx);
buf_.Insert(OutgoingMessageId(4), gen_.Ordered({1}, "BE", kStream2), kNow,
kNoRtx);
buf_.BeginResetStreams();
// TSN 15, 16
buf_.Insert(gen_.Ordered({1}, "BE", kStream3), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, "BE", kStream3), kNow);
buf_.Insert(OutgoingMessageId(5), gen_.Ordered({1}, "BE", kStream3), kNow,
kNoRtx);
buf_.Insert(OutgoingMessageId(6), gen_.Ordered({1}, "BE", kStream3), kNow);
EXPECT_FALSE(buf_.ShouldSendForwardTsn());

View File

@ -86,8 +86,8 @@ RetransmissionQueue::RetransmissionQueue(
data_chunk_header_size_,
tsn_unwrapper_.Unwrap(my_initial_tsn),
tsn_unwrapper_.Unwrap(TSN(*my_initial_tsn - 1)),
[this](IsUnordered unordered, StreamID stream_id, MID mid) {
return send_queue_.Discard(unordered, stream_id, mid);
[this](StreamID stream_id, OutgoingMessageId message_id) {
return send_queue_.Discard(stream_id, message_id);
}) {}
bool RetransmissionQueue::IsConsistent() const {
@ -491,7 +491,7 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
rwnd_ -= chunk_size;
absl::optional<UnwrappedTSN> tsn = outstanding_data_.Insert(
chunk_opt->data, now,
chunk_opt->message_id, chunk_opt->data, now,
partial_reliability_ ? chunk_opt->max_retransmissions
: MaxRetransmits::NoLimit(),
partial_reliability_ ? chunk_opt->expires_at : TimeMs::InfiniteFuture(),

View File

@ -20,6 +20,7 @@
#include "api/array_view.h"
#include "api/task_queue/task_queue_base.h"
#include "net/dcsctp/common/handover_testing.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/packet/chunk/data_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
@ -44,6 +45,7 @@ using ::testing::MockFunction;
using State = ::dcsctp::RetransmissionQueue::State;
using ::testing::_;
using ::testing::ElementsAre;
using ::testing::Field;
using ::testing::IsEmpty;
using ::testing::NiceMock;
using ::testing::Pair;
@ -53,6 +55,7 @@ using ::testing::UnorderedElementsAre;
constexpr uint32_t kArwnd = 100000;
constexpr uint32_t kMaxMtu = 1191;
constexpr OutgoingMessageId kMessageId = OutgoingMessageId(42);
DcSctpOptions MakeOptions() {
DcSctpOptions options;
@ -74,9 +77,11 @@ class RetransmissionQueueTest : public testing::Test {
[]() { return absl::nullopt; },
TimerOptions(options_.rto_initial))) {}
std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk() {
return [this](TimeMs now, size_t max_size) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk(
OutgoingMessageId message_id) {
return [this, message_id](TimeMs now, size_t max_size) {
return SendQueue::DataToSend(message_id,
gen_.Ordered({1, 2, 3, 4}, "BE"));
};
}
@ -140,7 +145,7 @@ TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) {
TEST_F(RetransmissionQueueTest, SendOneChunk) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
@ -153,7 +158,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunk) {
TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
@ -167,9 +172,9 @@ TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) {
TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
@ -185,14 +190,14 @@ TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) {
TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
@ -216,14 +221,14 @@ TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) {
TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
@ -235,7 +240,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
// Send 18
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(8)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18)));
@ -256,7 +261,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
// Send 19
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(9)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19)));
@ -268,7 +273,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
// Send 20
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(10)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20)));
@ -313,9 +318,9 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
// TSN, it will also restart T3-RTX.
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
static constexpr TimeMs kStartTime(100000);
@ -336,7 +341,7 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
// Send 13
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13)));
@ -347,7 +352,7 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
// Send 14
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14)));
@ -393,10 +398,12 @@ TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered({1, 2, 3, 4}, "BE"));
})
.WillOnce([this](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
return SendQueue::DataToSend(OutgoingMessageId(1),
gen_.Ordered({1, 2, 3, 4}, "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -414,7 +421,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered({1, 2, 3, 4}, "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -452,7 +460,7 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
CreateQueue(/*supports_partial_reliability=*/false);
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
@ -473,8 +481,7 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0);
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
} // namespace dcsctp
@ -482,7 +489,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
@ -497,8 +504,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
Pair(TSN(10), State::kInFlight)));
// Will force chunks to be retransmitted
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1);
queue.HandleT3RtxTimerExpiry();
@ -524,7 +530,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(3);
return dts;
})
@ -538,8 +544,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0);
// Retransmission 1
queue.HandleT3RtxTimerExpiry();
@ -557,8 +562,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
// Retransmission 4 - not allowed.
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1);
queue.HandleT3RtxTimerExpiry();
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
@ -579,7 +583,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
std::vector<uint8_t> payload(1000);
EXPECT_CALL(producer_, Produce)
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -615,17 +620,18 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
@ -645,7 +651,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
.WillOnce(Return(true));
queue.HandleT3RtxTimerExpiry();
@ -670,17 +676,18 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E"));
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({9, 10, 11, 12}, "E"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
@ -700,7 +707,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
.WillOnce(Return(false));
queue.HandleT3RtxTimerExpiry();
@ -725,28 +732,32 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(1);
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts));
SendQueue::DataToSend dts(OutgoingMessageId(42),
gen_.Ordered({1, 2, 3, 4}, "B", opts));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(2);
SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts));
SendQueue::DataToSend dts(OutgoingMessageId(43),
gen_.Unordered({1, 2, 3, 4}, "B", opts));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(3);
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts));
SendQueue::DataToSend dts(OutgoingMessageId(44),
gen_.Ordered({9, 10, 11, 12}, "B", opts));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(4);
SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts));
SendQueue::DataToSend dts(OutgoingMessageId(45),
gen_.Ordered({13, 14, 15, 16}, "B", opts));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
@ -773,11 +784,11 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
Pair(TSN(12), State::kNacked), //
Pair(TSN(13), State::kAcked)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(42)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(2), OutgoingMessageId(43)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(3), OutgoingMessageId(44)))
.WillOnce(Return(true));
queue.HandleT3RtxTimerExpiry();
@ -840,7 +851,8 @@ TEST_F(RetransmissionQueueTest, MeasureRTT) {
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
SendQueue::DataToSend dts(OutgoingMessageId(0),
gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
@ -868,14 +880,14 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
@ -898,14 +910,14 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) {
TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
@ -945,14 +957,14 @@ TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) {
TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
@ -987,13 +999,15 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize);
std::vector<uint8_t> payload(183);
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "BE"));
})
.WillOnce([this](TimeMs, size_t size) {
EXPECT_EQ(size, 976 - DataChunk::kHeaderSize);
std::vector<uint8_t> payload(957);
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
return SendQueue::DataToSend(OutgoingMessageId(1),
gen_.Ordered(payload, "BE"));
});
std::vector<std::pair<TSN, Data>> chunks_to_send =
@ -1005,17 +1019,18 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
@ -1035,8 +1050,7 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
EXPECT_EQ(queue.outstanding_items(), 3u);
// Mark the message as lost.
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1);
queue.HandleT3RtxTimerExpiry();
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
@ -1071,12 +1085,14 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
TimeMs test_start = now_;
EXPECT_CALL(producer_, Produce)
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", options));
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({1, 2, 3, 4}, "B", options));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "", options));
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({5, 6, 7, 8}, "", options));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
@ -1086,7 +1102,7 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
queue.GetChunksToSend(now_, 24);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(17), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId))
.WillOnce(Return(true));
now_ += DurationMs(100);
@ -1100,17 +1116,74 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
Pair(TSN(12), State::kAbandoned))); // Placeholder end
}
TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) {
RetransmissionQueue queue = CreateQueue();
TimeMs test_start = now_;
EXPECT_CALL(producer_, Produce)
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(42),
gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(43),
gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)}));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
// Stream reset - MID reset to zero again.
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(44),
gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)}));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
.WillOnce([&](TimeMs, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(44),
gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)}));
dts.expires_at = TimeMs(test_start + DurationMs(10));
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44)))
.WillOnce(Return(true));
EXPECT_THAT(queue.GetChunksToSend(now_, 24),
ElementsAre(Pair(TSN(10), Field(&Data::mid, MID(0)))));
EXPECT_THAT(queue.GetChunksToSend(now_, 24),
ElementsAre(Pair(TSN(11), Field(&Data::mid, MID(1)))));
EXPECT_THAT(queue.GetChunksToSend(now_, 24),
ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0)))));
now_ += DurationMs(100);
EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());
EXPECT_THAT(
queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), // Initial TSN
Pair(TSN(10), State::kInFlight), // OutgoingMessageId=42, BE
Pair(TSN(11), State::kInFlight), // OutgoingMessageId=43, BE
Pair(TSN(12), State::kAbandoned), // OutgoingMessageId=44, B
Pair(TSN(13), State::kAbandoned), // Produced and expired
Pair(TSN(14), State::kAbandoned))); // Placeholder end
}
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
@ -1128,8 +1201,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0);
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {}));
@ -1155,7 +1227,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
.WillOnce(Return(false));
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {}));
@ -1175,19 +1247,19 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(2);
return dts;
})
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillOnce(CreateChunk(OutgoingMessageId(8)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
@ -1213,8 +1285,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
Pair(TSN(18), State::kInFlight), //
Pair(TSN(19), State::kInFlight)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(8))).Times(0);
// Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it.
for (int tsn = 11; tsn <= 13; ++tsn) {
@ -1284,7 +1355,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
// Ack TSN 19 - three more nacks for TSN 10, no more retransmissions.
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
.WillOnce(Return(false));
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {}));
@ -1316,7 +1387,8 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) {
std::vector<uint8_t> payload(1000);
EXPECT_CALL(producer_, Produce)
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -1344,6 +1416,7 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) {
EXPECT_CALL(producer_, Produce)
.WillOnce([chunk_size, this](TimeMs, size_t) {
return SendQueue::DataToSend(
OutgoingMessageId(0),
gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -1376,6 +1449,7 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) {
EXPECT_CALL(producer_, Produce)
.WillOnce([chunk_size, this](TimeMs, size_t) {
return SendQueue::DataToSend(
OutgoingMessageId(0),
gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -1392,7 +1466,7 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) {
TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
@ -1408,14 +1482,14 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) {
TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8));
EXPECT_EQ(
@ -1428,7 +1502,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
// Send 18
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(8)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
@ -1440,7 +1514,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
// Send 19
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(9)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
@ -1452,7 +1526,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
// Send 20
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(10)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
@ -1487,8 +1561,8 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
TEST_F(RetransmissionQueueTest, HandoverTest) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2));
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
@ -1497,9 +1571,9 @@ TEST_F(RetransmissionQueueTest, HandoverTest) {
CreateQueueByHandover(queue);
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(*handedover_queue),
testing::ElementsAre(TSN(12), TSN(13), TSN(14)));
@ -1519,19 +1593,24 @@ TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, "B"));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "B"));
})
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, ""));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, ""));
})
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, ""));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, ""));
})
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, ""));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, ""));
})
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, "E"));
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "E"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });

View File

@ -20,6 +20,7 @@
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
@ -123,7 +124,10 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size());
parent_.total_buffered_amount_.Increase(message.payload().size());
items_.emplace_back(std::move(message), std::move(attributes));
OutgoingMessageId message_id = parent_.current_message_id;
parent_.current_message_id =
OutgoingMessageId(*parent_.current_message_id + 1);
items_.emplace_back(message_id, std::move(message), std::move(attributes));
if (!was_active) {
scheduler_stream_->MaybeMakeActive();
@ -184,9 +188,10 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
buffered_amount_.Decrease(payload.size());
parent_.total_buffered_amount_.Decrease(payload.size());
SendQueue::DataToSend chunk(Data(
stream_id, item.ssn.value_or(SSN(0)), item.mid.value(), fsn, ppid,
std::move(payload), is_beginning, is_end, item.attributes.unordered));
SendQueue::DataToSend chunk(
item.message_id, Data(stream_id, item.ssn.value_or(SSN(0)), *item.mid,
fsn, ppid, std::move(payload), is_beginning,
is_end, item.attributes.unordered));
chunk.max_retransmissions = item.attributes.max_retransmissions;
chunk.expires_at = item.attributes.expires_at;
chunk.lifecycle_id =
@ -230,12 +235,11 @@ void RRSendQueue::OutgoingStream::HandleMessageExpired(
}
}
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, MID mid) {
bool RRSendQueue::OutgoingStream::Discard(OutgoingMessageId message_id) {
bool result = false;
if (!items_.empty()) {
Item& item = items_.front();
if (item.attributes.unordered == unordered && item.mid.has_value() &&
*item.mid == mid) {
if (item.message_id == message_id) {
HandleMessageExpired(item);
items_.pop_front();
@ -384,8 +388,8 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
return scheduler_.Produce(now, max_size);
}
bool RRSendQueue::Discard(IsUnordered unordered, StreamID stream_id, MID mid) {
bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(unordered, mid);
bool RRSendQueue::Discard(StreamID stream_id, OutgoingMessageId message_id) {
bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(message_id);
RTC_DCHECK(IsConsistent());
return has_discarded;

View File

@ -22,6 +22,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
@ -76,7 +77,7 @@ class RRSendQueue : public SendQueue {
// Implementation of `SendQueue`.
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
bool Discard(IsUnordered unordered, StreamID stream_id, MID mid) override;
bool Discard(StreamID stream_id, OutgoingMessageId message_id) override;
void PrepareResetStream(StreamID streams) override;
bool HasStreamsReadyToBeReset() const override;
std::vector<StreamID> GetStreamsReadyToBeReset() override;
@ -161,7 +162,7 @@ class RRSendQueue : public SendQueue {
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
// Discards a partially sent message, see `SendQueue::Discard`.
bool Discard(IsUnordered unordered, MID mid);
bool Discard(OutgoingMessageId message_id);
// Pauses this stream, which is used before resetting it.
void Pause();
@ -217,11 +218,15 @@ class RRSendQueue : public SendQueue {
// An enqueued message and metadata.
struct Item {
explicit Item(DcSctpMessage msg, MessageAttributes attributes)
: message(std::move(msg)),
explicit Item(OutgoingMessageId message_id,
DcSctpMessage msg,
MessageAttributes attributes)
: message_id(message_id),
message(std::move(msg)),
attributes(std::move(attributes)),
remaining_offset(0),
remaining_size(message.payload().size()) {}
OutgoingMessageId message_id;
DcSctpMessage message;
MessageAttributes attributes;
// The remaining payload (offset and size) to be sent, when it has been
@ -267,6 +272,7 @@ class RRSendQueue : public SendQueue {
DcSctpSocketCallbacks& callbacks_;
const size_t buffer_size_;
const StreamPriority default_priority_;
OutgoingMessageId current_message_id = OutgoingMessageId(0);
StreamScheduler scheduler_;
// The total amount of buffer data, for all streams.

View File

@ -13,6 +13,7 @@
#include <type_traits>
#include <vector>
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_options.h"
@ -227,8 +228,7 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) {
ASSERT_TRUE(chunk_one.has_value());
EXPECT_FALSE(chunk_one->data.is_end);
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.mid);
buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id);
absl::optional<SendQueue::DataToSend> chunk_two =
buf_.Produce(kNow, kOneFragmentPacketSize);
@ -244,8 +244,7 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) {
ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
// Calling it again shouldn't cause issues.
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.mid);
buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id);
ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
}
@ -366,6 +365,32 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) {
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
}
TEST_F(RRSendQueueTest, CommittingDoesNotResetMessageId) {
std::vector<uint8_t> payload(50);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
buf_.Produce(kNow, kOneFragmentPacketSize));
EXPECT_EQ(chunk1.data.ssn, SSN(0));
EXPECT_EQ(chunk1.message_id, OutgoingMessageId(0));
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
buf_.Produce(kNow, kOneFragmentPacketSize));
EXPECT_EQ(chunk2.data.ssn, SSN(1));
EXPECT_EQ(chunk2.message_id, OutgoingMessageId(1));
buf_.PrepareResetStream(kStreamID);
EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), UnorderedElementsAre(kStreamID));
buf_.CommitResetStreams();
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
buf_.Produce(kNow, kOneFragmentPacketSize));
EXPECT_EQ(chunk3.data.ssn, SSN(0));
EXPECT_EQ(chunk3.message_id, OutgoingMessageId(2));
}
TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) {
std::vector<uint8_t> payload(50);
@ -859,8 +884,7 @@ TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) {
EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
/*maybe_delivered=*/false));
EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
chunk_one->data.mid);
buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id);
}
} // namespace
} // namespace dcsctp

View File

@ -27,7 +27,11 @@ class SendQueue {
public:
// Container for a data chunk that is produced by the SendQueue
struct DataToSend {
explicit DataToSend(Data data) : data(std::move(data)) {}
DataToSend(OutgoingMessageId message_id, Data data)
: message_id(message_id), data(std::move(data)) {}
OutgoingMessageId message_id;
// The data to send, including all parameters.
Data data;
@ -53,8 +57,8 @@ class SendQueue {
// including any headers.
virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0;
// Discards a partially sent message identified by the parameters `unordered`,
// `stream_id` and `mid`. The `mid` comes from the returned
// Discards a partially sent message identified by the parameters
// `stream_id` and `message_id`. The `message_id` comes from the returned
// information when having called `Produce`. A partially sent message means
// that it has had at least one fragment of it returned when `Produce` was
// called prior to calling this method).
@ -67,7 +71,7 @@ class SendQueue {
//
// This function returns true if this message had unsent fragments still in
// the queue that were discarded, and false if there were no such fragments.
virtual bool Discard(IsUnordered unordered, StreamID stream_id, MID mid) = 0;
virtual bool Discard(StreamID stream_id, OutgoingMessageId message_id) = 0;
// Prepares the stream to be reset. This is used to close a WebRTC data
// channel and will be signaled to the other side.

View File

@ -39,11 +39,16 @@ MATCHER_P(HasDataWithMid, mid, "") {
}
std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) {
return [sid, mid, payload_size](TimeMs now, size_t max_size) {
return SendQueue::DataToSend(Data(
sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size),
Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true)));
CreateChunk(OutgoingMessageId message_id,
StreamID sid,
MID mid,
size_t payload_size = kPayloadSize) {
return [sid, mid, payload_size, message_id](TimeMs now, size_t max_size) {
return SendQueue::DataToSend(
message_id,
Data(sid, SSN(0), mid, FSN(0), PPID(42),
std::vector<uint8_t>(payload_size), Data::IsBeginning(true),
Data::IsEnd(true), IsUnordered(true)));
};
}
@ -76,7 +81,8 @@ class TestStream {
StreamPriority priority,
size_t packet_size = kPayloadSize) {
EXPECT_CALL(producer_, Produce)
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
.WillRepeatedly(
CreateChunk(OutgoingMessageId(0), stream_id, MID(0), packet_size));
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size));
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
@ -117,7 +123,8 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler("", kMtu);
StrictMock<MockStreamProducer> producer;
EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
EXPECT_CALL(producer, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0)));
EXPECT_CALL(producer, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0));
@ -135,9 +142,9 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -149,9 +156,9 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -177,26 +184,29 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce([](...) {
return SendQueue::DataToSend(
OutgoingMessageId(1),
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(true),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
OutgoingMessageId(1),
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
OutgoingMessageId(1),
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(true), IsUnordered(true)));
})
.WillOnce(CreateChunk(StreamID(1), MID(102)));
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -210,9 +220,9 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -239,8 +249,8 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -264,9 +274,9 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StrictMock<MockStreamProducer> producer1;
// Callbacks are setup so that they hint that there is a MID(2) coming...
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -293,9 +303,9 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -308,9 +318,9 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -384,9 +394,12 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
StrictMock<MockStreamProducer> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket))
.WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
.WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100),
kSmallPacket))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101),
kSmallPacket))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102),
kSmallPacket));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kSmallPacket)) // When making active
.WillOnce(Return(kSmallPacket))
@ -398,9 +411,12 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
StrictMock<MockStreamProducer> callback2;
EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket))
.WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
.WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket));
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200),
kLargePacket))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201),
kLargePacket))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202),
kLargePacket));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kLargePacket)) // When making active
.WillOnce(Return(kLargePacket))
@ -432,9 +448,9 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
StrictMock<MockStreamProducer> callback1;
EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -447,9 +463,9 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
StrictMock<MockStreamProducer> callback2;
EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -462,9 +478,9 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
StrictMock<MockStreamProducer> callback3;
EXPECT_CALL(callback3, Produce)
.WillOnce(CreateChunk(StreamID(3), MID(300)))
.WillOnce(CreateChunk(StreamID(3), MID(301)))
.WillOnce(CreateChunk(StreamID(3), MID(302)));
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300)))
.WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301)))
.WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302)));
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
@ -510,11 +526,14 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
StrictMock<MockStreamProducer> callback1;
EXPECT_CALL(callback1, Produce)
// virtual finish time ~ 0 + 50 * 80 = 4000
.WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket))
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100),
kMediumPacket))
// virtual finish time ~ 4000 + 20 * 80 = 5600
.WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101),
kSmallPacket))
// virtual finish time ~ 5600 + 70 * 80 = 11200
.WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket));
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102),
kLargePacket));
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kMediumPacket)) // When making active
.WillOnce(Return(kSmallPacket))
@ -528,11 +547,14 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
StrictMock<MockStreamProducer> callback2;
EXPECT_CALL(callback2, Produce)
// virtual finish time ~ 0 + 50 * 50 = 2500
.WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket))
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200),
kMediumPacket))
// virtual finish time ~ 2500 + 70 * 50 = 6000
.WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201),
kLargePacket))
// virtual finish time ~ 6000 + 20 * 50 = 7000
.WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket));
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202),
kSmallPacket));
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kMediumPacket)) // When making active
.WillOnce(Return(kLargePacket))
@ -546,11 +568,14 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
StrictMock<MockStreamProducer> callback3;
EXPECT_CALL(callback3, Produce)
// virtual finish time ~ 0 + 20 * 20 = 400
.WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket))
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300),
kSmallPacket))
// virtual finish time ~ 400 + 50 * 20 = 1400
.WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket))
.WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301),
kMediumPacket))
// virtual finish time ~ 1400 + 70 * 20 = 2800
.WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket));
.WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302),
kLargePacket));
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
.WillOnce(Return(kSmallPacket)) // When making active
.WillOnce(Return(kMediumPacket))
@ -677,8 +702,8 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(0), 100))
.WillOnce(CreateChunk(StreamID(1), MID(0), 100));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 100))
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(0), 100));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(200)) // When making active
.WillOnce(Return(100))
@ -689,8 +714,8 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(1), 100))
.WillOnce(CreateChunk(StreamID(2), MID(1), 50));
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(2), MID(1), 100))
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(1), 50));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(150)) // When making active
.WillOnce(Return(50))
@ -714,7 +739,7 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(0), 200));
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 200));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(200)) // When making active
.WillOnce(Return(0));
@ -724,7 +749,7 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(1), 150));
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(2), MID(1), 150));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(150)) // When making active
.WillOnce(Return(0));