diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h index c00836e8e5..e357aa01a6 100644 --- a/net/dcsctp/public/dcsctp_options.h +++ b/net/dcsctp/public/dcsctp_options.h @@ -128,10 +128,20 @@ struct DcSctpOptions { // segments. size_t cwnd_mtus_initial = 10; - // The minimum congestion window size, in number of MTUs. - // See https://tools.ietf.org/html/rfc4960#section-7.2.3. + // The minimum congestion window size, in number of MTUs, upon detection of + // packet loss by SACK. Note that if the retransmission timer expires, the + // congestion window will be as small as one MTU. See + // https://tools.ietf.org/html/rfc4960#section-7.2.3. size_t cwnd_mtus_min = 4; + // When the congestion window is at or above this number of MTUs, the + // congestion control algorithm will avoid filling the congestion window + // fully, if that results in fragmenting large messages into quite small + // packets. When the congestion window is smaller than this option, it will + // aim to fill the congestion window as much as it can, even if it results in + // creating small fragmented packets. + size_t avoid_fragmentation_cwnd_mtus = 6; + // The number of packets that may be sent at once. This is limited to avoid // bursts that too quickly fill the send buffer. Typically in a a socket in // its "slow start" phase (when it sends as much as it can), it will send diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index 4faba56945..6e0bbf7b56 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -1246,7 +1246,9 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { // Create a new association, z2 - and don't use z anymore. testing::NiceMock cb_z2("Z2"); DcSctpOptions options = options_; - options.max_receiver_window_buffer_size = 100; + constexpr size_t kReceiveWindowBufferSize = 2000; + options.max_receiver_window_buffer_size = kReceiveWindowBufferSize; + options.mtu = 3000; DcSctpSocket sock_z2("Z2", cb_z2, nullptr, options); EXPECT_CALL(cb_z2, OnClosed).Times(0); @@ -1265,15 +1267,17 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { sock_a_.ReceivePacket(cb_z2.ConsumeSentPacket()); // Fill up Z2 to the high watermark limit. + constexpr size_t kWatermarkLimit = + kReceiveWindowBufferSize * ReassemblyQueue::kHighWatermarkLimit; + constexpr size_t kRemainingSize = kReceiveWindowBufferSize - kWatermarkLimit; + TSN tsn = init_chunk.initial_tsn(); AnyDataChunk::Options opts; opts.is_beginning = Data::IsBeginning(true); sock_z2.ReceivePacket( SctpPacket::Builder(sock_z2.verification_tag(), options) .Add(DataChunk(tsn, StreamID(1), SSN(0), PPID(53), - std::vector( - 100 * ReassemblyQueue::kHighWatermarkLimit + 1), - opts)) + std::vector(kWatermarkLimit + 1), opts)) .Build()); // First DATA will always trigger a SACK. It's not interesting. @@ -1323,7 +1327,7 @@ TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { sock_z2.ReceivePacket( SctpPacket::Builder(sock_z2.verification_tag(), options) .Add(DataChunk(AddTo(tsn, 2), StreamID(1), SSN(0), PPID(53), - std::vector(kSmallMessageSize), + std::vector(kRemainingSize), /*options=*/{})) .Build()); @@ -1552,27 +1556,13 @@ TEST_F(DcSctpSocketTest, TriggersOnBufferedAmountLowOnlyWhenCrossingThreshold) { // Add a few messages to fill up the congestion window. When that is full, // messages will start to be fully buffered. - while (sock_a_.buffered_amount(StreamID(1)) == 0) { + while (sock_a_.buffered_amount(StreamID(1)) <= kBufferedAmountLowThreshold) { sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), std::vector(kMessageSize)), kSendOptions); } size_t initial_buffered = sock_a_.buffered_amount(StreamID(1)); - ASSERT_GE(initial_buffered, 0u); - ASSERT_LT(initial_buffered, kMessageSize); - - // Up to kMessageSize (which is below the threshold) - sock_a_.Send( - DcSctpMessage(StreamID(1), PPID(53), - std::vector(kMessageSize - initial_buffered)), - kSendOptions); - EXPECT_EQ(sock_a_.buffered_amount(StreamID(1)), kMessageSize); - - // Up to 2*kMessageSize (which is above the threshold) - sock_a_.Send( - DcSctpMessage(StreamID(1), PPID(53), std::vector(kMessageSize)), - kSendOptions); - EXPECT_EQ(sock_a_.buffered_amount(StreamID(1)), 2 * kMessageSize); + ASSERT_GT(initial_buffered, kBufferedAmountLowThreshold); // Start ACKing packets, which will empty the send queue, and trigger the // callback. @@ -1731,5 +1721,48 @@ TEST_F(DcSctpSocketTest, DoesntSendMoreThanMaxBurstPackets) { EXPECT_THAT(cb_a_.ConsumeSentPacket(), IsEmpty()); } +TEST_F(DcSctpSocketTest, SendsOnlyLargePackets) { + ConnectSockets(); + + // A really large message, to ensure that the congestion window is often full. + constexpr size_t kMessageSize = 100000; + sock_a_.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector(kMessageSize)), + kSendOptions); + + bool delivered_packet = false; + std::vector data_packet_sizes; + do { + delivered_packet = false; + std::vector packet_from_a = cb_a_.ConsumeSentPacket(); + if (!packet_from_a.empty()) { + data_packet_sizes.push_back(packet_from_a.size()); + delivered_packet = true; + sock_z_.ReceivePacket(std::move(packet_from_a)); + } + std::vector packet_from_z = cb_z_.ConsumeSentPacket(); + if (!packet_from_z.empty()) { + delivered_packet = true; + sock_a_.ReceivePacket(std::move(packet_from_z)); + } + } while (delivered_packet); + + size_t packet_payload_bytes = + options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize; + // +1 accounts for padding, and rounding up. + size_t expected_packets = + (kMessageSize + packet_payload_bytes - 1) / packet_payload_bytes + 1; + EXPECT_THAT(data_packet_sizes, SizeIs(expected_packets)); + + // Remove the last size - it will be the remainder. But all other sizes should + // be large. + data_packet_sizes.pop_back(); + + for (size_t size : data_packet_sizes) { + // The 4 is for padding/alignment. + EXPECT_GE(size, options_.mtu - 4); + } +} + } // namespace } // namespace dcsctp diff --git a/net/dcsctp/socket/transmission_control_block.cc b/net/dcsctp/socket/transmission_control_block.cc index 9ec275febd..cc29ebd67e 100644 --- a/net/dcsctp/socket/transmission_control_block.cc +++ b/net/dcsctp/socket/transmission_control_block.cc @@ -84,7 +84,9 @@ void TransmissionControlBlock::MaybeSendSack() { void TransmissionControlBlock::SendBufferedPackets(SctpPacket::Builder& builder, TimeMs now) { - for (int packet_idx = 0; packet_idx < options_.max_burst; ++packet_idx) { + for (int packet_idx = 0; + packet_idx < options_.max_burst && retransmission_queue_.can_send_data(); + ++packet_idx) { // Only add control chunks to the first packet that is sent, if sending // multiple packets in one go (as allowed by the congestion window). if (packet_idx == 0) { diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index 763a82d026..d73616eaf0 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -49,6 +49,9 @@ namespace { // The number of times a packet must be NACKed before it's retransmitted. // See https://tools.ietf.org/html/rfc4960#section-7.2.4 constexpr size_t kNumberOfNacksForRetransmission = 3; + +// Allow sending only slightly less than an MTU, to account for headers. +constexpr float kMinBytesRequiredToSendFactor = 0.9; } // namespace RetransmissionQueue::RetransmissionQueue( @@ -63,6 +66,7 @@ RetransmissionQueue::RetransmissionQueue( bool supports_partial_reliability, bool use_message_interleaving) : options_(options), + min_bytes_required_to_send_(options.mtu * kMinBytesRequiredToSendFactor), partial_reliability_(supports_partial_reliability), log_prefix_(std::string(log_prefix) + "tx: "), data_chunk_header_size_(use_message_interleaving @@ -602,10 +606,8 @@ std::vector> RetransmissionQueue::GetChunksToSend( // allowed to be sent), and fill that up first with chunks that are // scheduled to be retransmitted. If there is still budget, send new chunks // (which will have their TSN assigned here.) - size_t remaining_cwnd_bytes = - outstanding_bytes_ >= cwnd_ ? 0 : cwnd_ - outstanding_bytes_; - size_t max_bytes = RoundDownTo4(std::min( - std::min(bytes_remaining_in_packet, rwnd()), remaining_cwnd_bytes)); + size_t max_bytes = + RoundDownTo4(std::min(max_bytes_to_send(), bytes_remaining_in_packet)); to_be_sent = GetChunksToBeRetransmitted(max_bytes); max_bytes -= absl::c_accumulate( @@ -707,6 +709,11 @@ RetransmissionQueue::GetChunkStatesForTesting() const { return states; } +bool RetransmissionQueue::can_send_data() const { + return cwnd_ < options_.avoid_fragmentation_cwnd_mtus * options_.mtu || + max_bytes_to_send() >= min_bytes_required_to_send_; +} + bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { if (!partial_reliability_) { return false; @@ -833,6 +840,11 @@ void RetransmissionQueue::AbandonAllFor( } } +size_t RetransmissionQueue::max_bytes_to_send() const { + size_t left = outstanding_bytes_ >= cwnd_ ? 0 : cwnd_ - outstanding_bytes_; + return std::min(rwnd(), left); +} + ForwardTsnChunk RetransmissionQueue::CreateForwardTsn() const { std::unordered_map skipped_per_ordered_stream; diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h index f81c6042ad..e4175c6aab 100644 --- a/net/dcsctp/tx/retransmission_queue.h +++ b/net/dcsctp/tx/retransmission_queue.h @@ -118,6 +118,9 @@ class RetransmissionQueue { // Returns the number of DATA chunks that are in-flight. size_t outstanding_items() const { return outstanding_items_; } + // Indicates if the congestion control algorithm allows data to be sent. + bool can_send_data() const; + // Given the current time `now`, it will evaluate if there are chunks that // have expired and that need to be discarded. It returns true if a // FORWARD-TSN should be sent. @@ -343,7 +346,14 @@ class RetransmissionQueue { : CongestionAlgorithmPhase::kCongestionAvoidance; } + // Returns the number of bytes that may be sent in a single packet according + // to the congestion control algorithm. + size_t max_bytes_to_send() const; + const DcSctpOptions options_; + // The minimum bytes required to be available in the congestion window to + // allow packets to be sent - to avoid sending too small packets. + const size_t min_bytes_required_to_send_; // If the peer supports RFC3758 - SCTP Partial Reliability Extension. const bool partial_reliability_; const std::string log_prefix_; diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc index 5974e052a5..c64aeb19df 100644 --- a/net/dcsctp/tx/retransmission_queue_test.cc +++ b/net/dcsctp/tx/retransmission_queue_test.cc @@ -49,10 +49,17 @@ using ::testing::UnorderedElementsAre; constexpr uint32_t kArwnd = 100000; constexpr uint32_t kMaxMtu = 1191; +DcSctpOptions MakeOptions() { + DcSctpOptions options; + options.mtu = kMaxMtu; + return options; +} + class RetransmissionQueueTest : public testing::Test { protected: RetransmissionQueueTest() - : gen_(MID(42)), + : options_(MakeOptions()), + gen_(MID(42)), timeout_manager_([this]() { return now_; }), timer_manager_([this]() { return timeout_manager_.CreateTimeout(); }), timer_(timer_manager_.CreateTimer( @@ -76,14 +83,13 @@ class RetransmissionQueueTest : public testing::Test { RetransmissionQueue CreateQueue(bool supports_partial_reliability = true, bool use_message_interleaving = false) { - DcSctpOptions options; - options.mtu = kMaxMtu; return RetransmissionQueue( "", TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(), - on_clear_retransmission_counter_.AsStdFunction(), *timer_, options, + on_clear_retransmission_counter_.AsStdFunction(), *timer_, options_, supports_partial_reliability, use_message_interleaving); } + DcSctpOptions options_; DataGenerator gen_; TimeMs now_ = TimeMs(0); FakeTimeoutManager timeout_manager_; @@ -1211,5 +1217,63 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) { EXPECT_EQ(queue.cwnd(), kCwnd + serialized_size); } +// Verifies that it doesn't produce tiny packets, when getting close to +// the full congestion window. +TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { + RetransmissionQueue queue = CreateQueue(); + size_t intial_cwnd = options_.avoid_fragmentation_cwnd_mtus * options_.mtu; + queue.set_cwnd(intial_cwnd); + EXPECT_EQ(queue.cwnd(), intial_cwnd); + + // Fill the congestion window almost - leaving 500 bytes. + size_t chunk_size = intial_cwnd - 500; + EXPECT_CALL(producer_, Produce) + .WillOnce([chunk_size, this](TimeMs, size_t) { + return SendQueue::DataToSend( + gen_.Ordered(std::vector(chunk_size), "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_TRUE(queue.can_send_data()); + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 10000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + + // To little space left - will not send more. + EXPECT_FALSE(queue.can_send_data()); + + // But when the first chunk is acked, it will continue. + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + + EXPECT_TRUE(queue.can_send_data()); + EXPECT_EQ(queue.outstanding_bytes(), 0u); + EXPECT_EQ(queue.cwnd(), intial_cwnd + kMaxMtu); +} + +TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) { + RetransmissionQueue queue = CreateQueue(); + size_t intial_cwnd = + options_.avoid_fragmentation_cwnd_mtus * options_.mtu - 1; + queue.set_cwnd(intial_cwnd); + EXPECT_EQ(queue.cwnd(), intial_cwnd); + + // Fill the congestion window almost - leaving 500 bytes. + size_t chunk_size = intial_cwnd - 500; + EXPECT_CALL(producer_, Produce) + .WillOnce([chunk_size, this](TimeMs, size_t) { + return SendQueue::DataToSend( + gen_.Ordered(std::vector(chunk_size), "BE")); + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + EXPECT_TRUE(queue.can_send_data()); + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 10000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); + + // With congestion window under limit, allow small packets to be created. + EXPECT_TRUE(queue.can_send_data()); +} + } // namespace } // namespace dcsctp