dcsctp: Estimate rwnd by payload bytes

The dcSCTP receiver was advertising available window space (arwnd) based
solely on payload bytes, while the sender's rwnd estimation included
packet headers. This mismatch caused the sender to underestimate the
receiver's available buffer, potentially leading to reduced throughput.

This commit resolves the issue by ensuring both sender and receiver use
payload bytes, as headers have been removed on the receiver side while
in the reassembly queue.

Bug: webrtc:396373001
Change-Id: I508419efb09cabf2fb011f952f5f4a06586a4019
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/377122
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#43899}
This commit is contained in:
Victor Boivie 2025-02-13 16:08:52 +01:00 committed by WebRTC LUCI CQ
parent 6d2579ef25
commit bcf588da8f
7 changed files with 130 additions and 53 deletions

View File

@ -1735,7 +1735,7 @@ void DcSctpSocket::HandleForwardTsnCommon(const AnyForwardTsnChunk& chunk) {
} }
void DcSctpSocket::MaybeSendShutdownOrAck() { void DcSctpSocket::MaybeSendShutdownOrAck() {
if (tcb_->retransmission_queue().unacked_bytes() != 0) { if (tcb_->retransmission_queue().unacked_items() != 0) {
return; return;
} }

View File

@ -77,7 +77,8 @@ bool OutstandingData::Item::has_expired(Timestamp now) const {
} }
bool OutstandingData::IsConsistent() const { bool OutstandingData::IsConsistent() const {
size_t actual_unacked_bytes = 0; size_t actual_unacked_payload_bytes = 0;
size_t actual_unacked_packet_bytes = 0;
size_t actual_unacked_items = 0; size_t actual_unacked_items = 0;
std::set<UnwrappedTSN> combined_to_be_retransmitted; std::set<UnwrappedTSN> combined_to_be_retransmitted;
@ -91,7 +92,8 @@ bool OutstandingData::IsConsistent() const {
for (const Item& item : outstanding_data_) { for (const Item& item : outstanding_data_) {
tsn.Increment(); tsn.Increment();
if (item.is_outstanding()) { if (item.is_outstanding()) {
actual_unacked_bytes += GetSerializedChunkSize(item.data()); actual_unacked_payload_bytes += item.data().size();
actual_unacked_packet_bytes += GetSerializedChunkSize(item.data());
++actual_unacked_items; ++actual_unacked_items;
} }
@ -100,7 +102,8 @@ bool OutstandingData::IsConsistent() const {
} }
} }
return actual_unacked_bytes == unacked_bytes_ && return actual_unacked_payload_bytes == unacked_payload_bytes_ &&
actual_unacked_packet_bytes == unacked_packet_bytes_ &&
actual_unacked_items == unacked_items_ && actual_unacked_items == unacked_items_ &&
actual_combined_to_be_retransmitted == combined_to_be_retransmitted; actual_combined_to_be_retransmitted == combined_to_be_retransmitted;
} }
@ -112,7 +115,8 @@ void OutstandingData::AckChunk(AckInfo& ack_info,
size_t serialized_size = GetSerializedChunkSize(item.data()); size_t serialized_size = GetSerializedChunkSize(item.data());
ack_info.bytes_acked += serialized_size; ack_info.bytes_acked += serialized_size;
if (item.is_outstanding()) { if (item.is_outstanding()) {
unacked_bytes_ -= serialized_size; unacked_payload_bytes_ -= item.data().size();
unacked_packet_bytes_ -= serialized_size;
--unacked_items_; --unacked_items_;
} }
if (item.should_be_retransmitted()) { if (item.should_be_retransmitted()) {
@ -258,7 +262,8 @@ bool OutstandingData::NackItem(UnwrappedTSN tsn,
bool do_fast_retransmit) { bool do_fast_retransmit) {
Item& item = GetItem(tsn); Item& item = GetItem(tsn);
if (item.is_outstanding()) { if (item.is_outstanding()) {
unacked_bytes_ -= GetSerializedChunkSize(item.data()); unacked_payload_bytes_ -= item.data().size();
unacked_packet_bytes_ -= GetSerializedChunkSize(item.data());
--unacked_items_; --unacked_items_;
} }
@ -343,7 +348,8 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit(
item.MarkAsRetransmitted(); item.MarkAsRetransmitted();
result.emplace_back(tsn.Wrap(), item.data().Clone()); result.emplace_back(tsn.Wrap(), item.data().Clone());
max_size -= serialized_size; max_size -= serialized_size;
unacked_bytes_ += serialized_size; unacked_payload_bytes_ += item.data().size();
unacked_packet_bytes_ += serialized_size;
++unacked_items_; ++unacked_items_;
it = chunks.erase(it); it = chunks.erase(it);
} else { } else {
@ -421,7 +427,8 @@ std::optional<UnwrappedTSN> OutstandingData::Insert(
LifecycleId lifecycle_id) { LifecycleId lifecycle_id) {
// All chunks are always padded to be even divisible by 4. // All chunks are always padded to be even divisible by 4.
size_t chunk_size = GetSerializedChunkSize(data); size_t chunk_size = GetSerializedChunkSize(data);
unacked_bytes_ += chunk_size; unacked_payload_bytes_ += data.size();
unacked_packet_bytes_ += chunk_size;
++unacked_items_; ++unacked_items_;
UnwrappedTSN tsn = next_tsn(); UnwrappedTSN tsn = next_tsn();
Item& item = outstanding_data_.emplace_back(message_id, data.Clone(), Item& item = outstanding_data_.emplace_back(message_id, data.Clone(),

View File

@ -101,7 +101,11 @@ class OutstandingData {
// it? // it?
std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size); std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size);
size_t unacked_bytes() const { return unacked_bytes_; } // How many inflight bytes there are, as sent on the wire as packets.
size_t unacked_packet_bytes() const { return unacked_packet_bytes_; }
// How many inflight bytes there are, counting only the payload.
size_t unacked_payload_bytes() const { return unacked_payload_bytes_; }
// Returns the number of DATA chunks that are in-flight (not acked or nacked). // Returns the number of DATA chunks that are in-flight (not acked or nacked).
size_t unacked_items() const { return unacked_items_; } size_t unacked_items() const { return unacked_items_; }
@ -358,8 +362,10 @@ class OutstandingData {
// `TSN=last_cumulative_tsn_ack_ + 1` and the following items are in strict // `TSN=last_cumulative_tsn_ack_ + 1` and the following items are in strict
// increasing TSN order. The last item has `TSN=highest_outstanding_tsn()`. // increasing TSN order. The last item has `TSN=highest_outstanding_tsn()`.
std::deque<Item> outstanding_data_; std::deque<Item> outstanding_data_;
// The number of bytes that are in-flight (sent but not yet acked or nacked). // The number of bytes that are in-flight, counting only the payload.
size_t unacked_bytes_ = 0; size_t unacked_payload_bytes_ = 0;
// The number of bytes that are in-flight, as sent on the wire (as packets).
size_t unacked_packet_bytes_ = 0;
// The number of DATA chunks that are in-flight (sent but not yet acked or // The number of DATA chunks that are in-flight (sent but not yet acked or
// nacked). // nacked).
size_t unacked_items_ = 0; size_t unacked_items_ = 0;

View File

@ -59,7 +59,8 @@ class OutstandingDataTest : public testing::Test {
TEST_F(OutstandingDataTest, HasInitialState) { TEST_F(OutstandingDataTest, HasInitialState) {
EXPECT_TRUE(buf_.empty()); EXPECT_TRUE(buf_.empty());
EXPECT_EQ(buf_.unacked_bytes(), 0u); EXPECT_EQ(buf_.unacked_payload_bytes(), 0u);
EXPECT_EQ(buf_.unacked_packet_bytes(), 0u);
EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_EQ(buf_.unacked_items(), 0u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
@ -76,7 +77,7 @@ TEST_F(OutstandingDataTest, InsertChunk) {
EXPECT_EQ(tsn.Wrap(), TSN(10)); EXPECT_EQ(tsn.Wrap(), TSN(10));
EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); EXPECT_EQ(buf_.unacked_payload_bytes(), 1u);
EXPECT_EQ(buf_.unacked_items(), 1u); EXPECT_EQ(buf_.unacked_items(), 1u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
@ -96,7 +97,7 @@ TEST_F(OutstandingDataTest, AcksSingleChunk) {
EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(10)); EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(10));
EXPECT_FALSE(ack.has_packet_loss); EXPECT_FALSE(ack.has_packet_loss);
EXPECT_EQ(buf_.unacked_bytes(), 0u); EXPECT_EQ(buf_.unacked_payload_bytes(), 0u);
EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_EQ(buf_.unacked_items(), 0u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(10)); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(10));
@ -110,7 +111,7 @@ TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) {
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false); buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false);
EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); EXPECT_EQ(buf_.unacked_payload_bytes(), 1u);
EXPECT_EQ(buf_.unacked_items(), 1u); EXPECT_EQ(buf_.unacked_items(), 1u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
@ -132,7 +133,7 @@ TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) {
EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(11)); EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(11));
EXPECT_FALSE(ack.has_packet_loss); EXPECT_FALSE(ack.has_packet_loss);
EXPECT_EQ(buf_.unacked_bytes(), 0u); EXPECT_EQ(buf_.unacked_payload_bytes(), 0u);
EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_EQ(buf_.unacked_items(), 0u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
@ -657,5 +658,20 @@ TEST_F(OutstandingDataTest, GeneratesForwardTsnUntilNextStreamResetTsn) {
EXPECT_FALSE(buf_.ShouldSendForwardTsn()); EXPECT_FALSE(buf_.ShouldSendForwardTsn());
} }
TEST_F(OutstandingDataTest, TreatsUnackedPayloadBytesDifferentFromPacketBytes) {
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
EXPECT_EQ(buf_.unacked_payload_bytes(), 1u);
EXPECT_EQ(buf_.unacked_packet_bytes(),
DataChunk::kHeaderSize + RoundUpTo4(1));
EXPECT_EQ(buf_.unacked_items(), 1u);
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
EXPECT_EQ(buf_.unacked_payload_bytes(), 2u);
EXPECT_EQ(buf_.unacked_packet_bytes(),
2 * (DataChunk::kHeaderSize + RoundUpTo4(1)));
EXPECT_EQ(buf_.unacked_items(), 2u);
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp

View File

@ -111,12 +111,12 @@ void RetransmissionQueue::MaybeExitFastRecovery(
} }
void RetransmissionQueue::HandleIncreasedCumulativeTsnAck( void RetransmissionQueue::HandleIncreasedCumulativeTsnAck(
size_t unacked_bytes, size_t unacked_packet_bytes,
size_t total_bytes_acked) { size_t total_bytes_acked) {
// Allow some margin for classifying as fully utilized, due to e.g. that too // Allow some margin for classifying as fully utilized, due to e.g. that too
// small packets (less than kMinimumFragmentedPayload) are not sent + // small packets (less than kMinimumFragmentedPayload) are not sent +
// overhead. // overhead.
bool is_fully_utilized = unacked_bytes + options_.mtu >= cwnd_; bool is_fully_utilized = unacked_packet_bytes + options_.mtu >= cwnd_;
size_t old_cwnd = cwnd_; size_t old_cwnd = cwnd_;
if (phase() == CongestionAlgorithmPhase::kSlowStart) { if (phase() == CongestionAlgorithmPhase::kSlowStart) {
if (is_fully_utilized && !is_in_fast_recovery()) { if (is_fully_utilized && !is_in_fast_recovery()) {
@ -202,14 +202,14 @@ void RetransmissionQueue::HandlePacketLoss(
} }
void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) { void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) {
rwnd_ = outstanding_data_.unacked_bytes() >= a_rwnd rwnd_ = outstanding_data_.unacked_payload_bytes() >= a_rwnd
? 0 ? 0
: a_rwnd - outstanding_data_.unacked_bytes(); : a_rwnd - outstanding_data_.unacked_payload_bytes();
} }
void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() { void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() {
// Note: Can't use `unacked_bytes()` as that one doesn't count chunks to // Note: Can't use `unacked_packet_bytes()` as that one doesn't count chunks
// be retransmitted. // to be retransmitted.
if (outstanding_data_.empty()) { if (outstanding_data_.empty()) {
// https://tools.ietf.org/html/rfc4960#section-6.3.2 // https://tools.ietf.org/html/rfc4960#section-6.3.2
// "Whenever all outstanding data sent to an address have been // "Whenever all outstanding data sent to an address have been
@ -262,7 +262,7 @@ bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) {
UnwrappedTSN old_last_cumulative_tsn_ack = UnwrappedTSN old_last_cumulative_tsn_ack =
outstanding_data_.last_cumulative_tsn_ack(); outstanding_data_.last_cumulative_tsn_ack();
size_t old_unacked_bytes = outstanding_data_.unacked_bytes(); size_t old_unacked_packet_bytes = outstanding_data_.unacked_packet_bytes();
size_t old_rwnd = rwnd_; size_t old_rwnd = rwnd_;
UnwrappedTSN cumulative_tsn_ack = UnwrappedTSN cumulative_tsn_ack =
tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack()); tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack());
@ -299,10 +299,10 @@ bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK, cum_tsn_ack=" RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK, cum_tsn_ack="
<< *cumulative_tsn_ack.Wrap() << " (" << *cumulative_tsn_ack.Wrap() << " ("
<< *old_last_cumulative_tsn_ack.Wrap() << *old_last_cumulative_tsn_ack.Wrap()
<< "), unacked_bytes=" << "), unacked_packet_bytes="
<< outstanding_data_.unacked_bytes() << " (" << outstanding_data_.unacked_packet_bytes() << " ("
<< old_unacked_bytes << "), rwnd=" << rwnd_ << " (" << old_unacked_packet_bytes << "), rwnd=" << rwnd_
<< old_rwnd << ")"; << " (" << old_rwnd << ")";
if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) { if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) {
// https://tools.ietf.org/html/rfc4960#section-6.3.2 // https://tools.ietf.org/html/rfc4960#section-6.3.2
@ -313,7 +313,8 @@ bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) {
// Note: It may be started again in a bit further down. // Note: It may be started again in a bit further down.
t3_rtx_.Stop(); t3_rtx_.Stop();
HandleIncreasedCumulativeTsnAck(old_unacked_bytes, ack_info.bytes_acked); HandleIncreasedCumulativeTsnAck(old_unacked_packet_bytes,
ack_info.bytes_acked);
} }
if (ack_info.has_packet_loss) { if (ack_info.has_packet_loss) {
@ -351,7 +352,7 @@ void RetransmissionQueue::UpdateRTT(Timestamp now,
void RetransmissionQueue::HandleT3RtxTimerExpiry() { void RetransmissionQueue::HandleT3RtxTimerExpiry() {
size_t old_cwnd = cwnd_; size_t old_cwnd = cwnd_;
size_t old_unacked_bytes = unacked_bytes(); size_t old_unacked_packet_bytes = unacked_packet_bytes();
// https://tools.ietf.org/html/rfc4960#section-6.3.3 // https://tools.ietf.org/html/rfc4960#section-6.3.3
// "For the destination address for which the timer expires, adjust // "For the destination address for which the timer expires, adjust
// its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU." // its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU."
@ -388,8 +389,8 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() {
RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_ RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
<< " (" << old_cwnd << "), ssthresh=" << ssthresh_ << " (" << old_cwnd << "), ssthresh=" << ssthresh_
<< ", unacked_bytes " << unacked_bytes() << " (" << ", unacked_packet_bytes " << unacked_packet_bytes()
<< old_unacked_bytes << ")"; << " (" << old_unacked_packet_bytes << ")";
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
} }
@ -398,7 +399,7 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) {
RTC_DCHECK(outstanding_data_.has_data_to_be_fast_retransmitted()); RTC_DCHECK(outstanding_data_.has_data_to_be_fast_retransmitted());
RTC_DCHECK(IsDivisibleBy4(bytes_in_packet)); RTC_DCHECK(IsDivisibleBy4(bytes_in_packet));
std::vector<std::pair<TSN, Data>> to_be_sent; std::vector<std::pair<TSN, Data>> to_be_sent;
size_t old_unacked_bytes = unacked_bytes(); size_t old_unacked_packet_bytes = unacked_packet_bytes();
to_be_sent = to_be_sent =
outstanding_data_.GetChunksToBeFastRetransmitted(bytes_in_packet); outstanding_data_.GetChunksToBeFastRetransmitted(bytes_in_packet);
@ -437,8 +438,9 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) {
sb << *c.first; sb << *c.first;
}) })
<< " - " << bytes_retransmitted << " - " << bytes_retransmitted
<< " bytes. unacked_bytes=" << unacked_bytes() << " (" << " bytes. unacked_packet_bytes="
<< old_unacked_bytes << ")"; << unacked_packet_bytes() << " ("
<< old_unacked_packet_bytes << ")";
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
return to_be_sent; return to_be_sent;
@ -451,7 +453,7 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet)); RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet));
std::vector<std::pair<TSN, Data>> to_be_sent; std::vector<std::pair<TSN, Data>> to_be_sent;
size_t old_unacked_bytes = unacked_bytes(); size_t old_unacked_packet_bytes = unacked_packet_bytes();
size_t old_rwnd = rwnd_; size_t old_rwnd = rwnd_;
// Calculate the bandwidth budget (how many bytes that is // Calculate the bandwidth budget (how many bytes that is
@ -484,7 +486,7 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
size_t chunk_size = GetSerializedChunkSize(chunk_opt->data); size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
max_bytes -= chunk_size; max_bytes -= chunk_size;
rwnd_ -= chunk_size; rwnd_ -= chunk_opt->data.size();
std::optional<UnwrappedTSN> tsn = outstanding_data_.Insert( std::optional<UnwrappedTSN> tsn = outstanding_data_.Insert(
chunk_opt->message_id, chunk_opt->data, now, chunk_opt->message_id, chunk_opt->data, now,
@ -523,8 +525,9 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
[&](size_t r, const std::pair<TSN, Data>& d) { [&](size_t r, const std::pair<TSN, Data>& d) {
return r + GetSerializedChunkSize(d.second); return r + GetSerializedChunkSize(d.second);
}) })
<< " bytes. unacked_bytes=" << unacked_bytes() << " (" << " bytes. unacked_packet_bytes="
<< old_unacked_bytes << "), cwnd=" << cwnd_ << unacked_packet_bytes() << " ("
<< old_unacked_packet_bytes << "), cwnd=" << cwnd_
<< ", rwnd=" << rwnd_ << " (" << old_rwnd << ")"; << ", rwnd=" << rwnd_ << " (" << old_rwnd << ")";
} }
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
@ -542,9 +545,10 @@ bool RetransmissionQueue::ShouldSendForwardTsn(Timestamp now) {
} }
size_t RetransmissionQueue::max_bytes_to_send() const { size_t RetransmissionQueue::max_bytes_to_send() const {
size_t left = unacked_bytes() >= cwnd_ ? 0 : cwnd_ - unacked_bytes(); size_t left =
unacked_packet_bytes() >= cwnd_ ? 0 : cwnd_ - unacked_packet_bytes();
if (unacked_bytes() == 0) { if (unacked_packet_bytes() == 0) {
// https://datatracker.ietf.org/doc/html/rfc4960#section-6.1 // https://datatracker.ietf.org/doc/html/rfc4960#section-6.1
// ... However, regardless of the value of rwnd (including if it is 0), the // ... However, regardless of the value of rwnd (including if it is 0), the
// data sender can always have one DATA chunk in flight to the receiver if // data sender can always have one DATA chunk in flight to the receiver if

View File

@ -120,8 +120,10 @@ class RetransmissionQueue {
size_t rtx_packets_count() const { return rtx_packets_count_; } size_t rtx_packets_count() const { return rtx_packets_count_; }
uint64_t rtx_bytes_count() const { return rtx_bytes_count_; } uint64_t rtx_bytes_count() const { return rtx_bytes_count_; }
// Returns the number of bytes of packets that are in-flight. // How many inflight bytes there are, as sent on the wire as packets.
size_t unacked_bytes() const { return outstanding_data_.unacked_bytes(); } size_t unacked_packet_bytes() const {
return outstanding_data_.unacked_packet_bytes();
}
// Returns the number of DATA chunks that are in-flight. // Returns the number of DATA chunks that are in-flight.
size_t unacked_items() const { return outstanding_data_.unacked_items(); } size_t unacked_items() const { return outstanding_data_.unacked_items(); }
@ -190,7 +192,7 @@ class RetransmissionQueue {
// Update the congestion control algorithm given as the cumulative ack TSN // Update the congestion control algorithm given as the cumulative ack TSN
// value has increased, as reported in an incoming SACK chunk. // value has increased, as reported in an incoming SACK chunk.
void HandleIncreasedCumulativeTsnAck(size_t unacked_bytes, void HandleIncreasedCumulativeTsnAck(size_t unacked_packet_bytes,
size_t total_bytes_acked); size_t total_bytes_acked);
// Update the congestion control algorithm, given as packet loss has been // Update the congestion control algorithm, given as packet loss has been
// detected, as reported in an incoming SACK chunk. // detected, as reported in an incoming SACK chunk.

View File

@ -579,7 +579,7 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
static constexpr size_t kCwnd = 1200; static constexpr size_t kCwnd = 1200;
queue.set_cwnd(kCwnd); queue.set_cwnd(kCwnd);
EXPECT_EQ(queue.cwnd(), kCwnd); EXPECT_EQ(queue.cwnd(), kCwnd);
EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_packet_bytes(), 0u);
EXPECT_EQ(queue.unacked_items(), 0u); EXPECT_EQ(queue.unacked_items(), 0u);
std::vector<uint8_t> payload(1000); std::vector<uint8_t> payload(1000);
@ -596,7 +596,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
EXPECT_THAT(queue.GetChunkStatesForTesting(), EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), // ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight))); Pair(TSN(10), State::kInFlight)));
EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); EXPECT_EQ(queue.unacked_packet_bytes(),
payload.size() + DataChunk::kHeaderSize);
EXPECT_EQ(queue.unacked_items(), 1u); EXPECT_EQ(queue.unacked_items(), 1u);
// Will force chunks to be retransmitted // Will force chunks to be retransmitted
@ -605,7 +606,7 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
EXPECT_THAT(queue.GetChunkStatesForTesting(), EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), // ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted))); Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_packet_bytes(), 0u);
EXPECT_EQ(queue.unacked_items(), 0u); EXPECT_EQ(queue.unacked_items(), 0u);
std::vector<std::pair<TSN, Data>> chunks_to_rtx = std::vector<std::pair<TSN, Data>> chunks_to_rtx =
@ -614,7 +615,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
EXPECT_THAT(queue.GetChunkStatesForTesting(), EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), // ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight))); Pair(TSN(10), State::kInFlight)));
EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); EXPECT_EQ(queue.unacked_packet_bytes(),
payload.size() + DataChunk::kHeaderSize);
EXPECT_EQ(queue.unacked_items(), 1u); EXPECT_EQ(queue.unacked_items(), 1u);
} }
@ -1048,7 +1050,7 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
Pair(TSN(10), State::kInFlight), // Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), // Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight))); Pair(TSN(12), State::kInFlight)));
EXPECT_EQ(queue.unacked_bytes(), (16 + 4) * 3u); EXPECT_EQ(queue.unacked_packet_bytes(), (16 + 4) * 3u);
EXPECT_EQ(queue.unacked_items(), 3u); EXPECT_EQ(queue.unacked_items(), 3u);
// Mark the message as lost. // Mark the message as lost.
@ -1062,20 +1064,20 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
Pair(TSN(10), State::kAbandoned), // Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), // Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned))); Pair(TSN(12), State::kAbandoned)));
EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_packet_bytes(), 0u);
EXPECT_EQ(queue.unacked_items(), 0u); EXPECT_EQ(queue.unacked_items(), 0u);
// Now ACK those, one at a time. // Now ACK those, one at a time.
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_packet_bytes(), 0u);
EXPECT_EQ(queue.unacked_items(), 0u); EXPECT_EQ(queue.unacked_items(), 0u);
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_packet_bytes(), 0u);
EXPECT_EQ(queue.unacked_items(), 0u); EXPECT_EQ(queue.unacked_items(), 0u);
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_packet_bytes(), 0u);
EXPECT_EQ(queue.unacked_items(), 0u); EXPECT_EQ(queue.unacked_items(), 0u);
} }
@ -1398,7 +1400,7 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) {
queue.GetChunksToSend(now_, 1500); queue.GetChunksToSend(now_, 1500);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
size_t serialized_size = payload.size() + DataChunk::kHeaderSize; size_t serialized_size = payload.size() + DataChunk::kHeaderSize;
EXPECT_EQ(queue.unacked_bytes(), serialized_size); EXPECT_EQ(queue.unacked_packet_bytes(), serialized_size);
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
@ -1610,5 +1612,45 @@ TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) {
EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty()); EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty());
} }
TEST_F(RetransmissionQueueTest, UpdatesRwndFromSackAndUnackedPayloadBytes) {
RetransmissionQueue queue = CreateQueue();
EXPECT_EQ(queue.rwnd(), kArwnd);
constexpr size_t kChunkSize = 4;
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillRepeatedly([](Timestamp, size_t) { return std::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight),
Pair(TSN(11), State::kInFlight),
Pair(TSN(12), State::kInFlight)));
EXPECT_EQ(queue.rwnd(), kArwnd - kChunkSize * 3);
// Acknowledge TSN = 10.
queue.HandleSack(now_, SackChunk(TSN(10), 1000, {}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
EXPECT_EQ(queue.rwnd(), 1000 - kChunkSize * 2);
// Acknowledge everything.
queue.HandleSack(now_, SackChunk(TSN(12), 2000, {}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(12), State::kAcked)));
EXPECT_EQ(queue.rwnd(), 2000u);
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp