diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index b4bc9c486e..914bea34c4 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -36,6 +36,7 @@ #include "net/dcsctp/packet/error_cause/error_cause.h" #include "net/dcsctp/packet/error_cause/unrecognized_chunk_type_cause.h" #include "net/dcsctp/packet/parameter/heartbeat_info_parameter.h" +#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/parameter.h" #include "net/dcsctp/packet/sctp_packet.h" #include "net/dcsctp/packet/tlv_trait.h" @@ -229,6 +230,44 @@ MATCHER(HasSackWithNoGapAckBlocks, "") { return true; } +MATCHER_P(HasReconfigWithStreams, streams_matcher, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != ReConfigChunk::kType) { + *result_listener << "the first chunk in the packet is not a data chunk"; + return false; + } + + absl::optional reconfig = + ReConfigChunk::Parse(packet->descriptors()[0].data); + if (!reconfig.has_value()) { + *result_listener << "The first chunk didn't parse as a data chunk"; + return false; + } + + const Parameters& parameters = reconfig->parameters(); + if (parameters.descriptors().size() != 1 || + parameters.descriptors()[0].type != + OutgoingSSNResetRequestParameter::kType) { + *result_listener << "Expected the reconfig chunk to have an outgoing SSN " + "reset request parameter"; + return false; + } + + absl::optional p = + OutgoingSSNResetRequestParameter::Parse(parameters.descriptors()[0].data); + testing::Matcher> matcher = streams_matcher; + if (!matcher.MatchAndExplain(p->stream_ids(), result_listener)) { + return false; + } + + return true; +} + TSN AddTo(TSN tsn, int delta) { return TSN(*tsn + delta); } @@ -2232,5 +2271,76 @@ TEST(DcSctpSocketTest, CloseThreeStreamsAtTheSameTime) { ExchangeMessages(a, z); } + +TEST(DcSctpSocketTest, CloseStreamsWithPendingRequest) { + // Checks that stream reset requests are properly paused when they can't be + // immediately reset - i.e. when there is already an ongoing stream reset + // request (and there can only be a single one in-flight). + SocketUnderTest a("A"); + SocketUnderTest z("Z"); + + EXPECT_CALL(z.cb, OnIncomingStreamsReset(ElementsAre(StreamID(1)))).Times(1); + EXPECT_CALL(z.cb, OnIncomingStreamsReset( + UnorderedElementsAre(StreamID(2), StreamID(3)))) + .Times(1); + EXPECT_CALL(a.cb, OnStreamsResetPerformed(ElementsAre(StreamID(1)))).Times(1); + EXPECT_CALL(a.cb, OnStreamsResetPerformed( + UnorderedElementsAre(StreamID(2), StreamID(3)))) + .Times(1); + + ConnectSockets(a, z); + + SendOptions send_options = {.unordered = IsUnordered(false)}; + + // Send a few ordered messages + a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(3), PPID(53), {1, 2}), send_options); + + ExchangeMessages(a, z); + + // Receive these messages + absl::optional msg1 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg1.has_value()); + EXPECT_EQ(msg1->stream_id(), StreamID(1)); + absl::optional msg2 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg2.has_value()); + EXPECT_EQ(msg2->stream_id(), StreamID(2)); + absl::optional msg3 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg3.has_value()); + EXPECT_EQ(msg3->stream_id(), StreamID(3)); + + // Reset the streams - not all at once. + a.socket.ResetStreams(std::vector({StreamID(1)})); + + std::vector packet = a.cb.ConsumeSentPacket(); + EXPECT_THAT(packet, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + z.socket.ReceivePacket(std::move(packet)); + + // Sending more reset requests while this one is ongoing. + + a.socket.ResetStreams(std::vector({StreamID(2)})); + a.socket.ResetStreams(std::vector({StreamID(3)})); + + ExchangeMessages(a, z); + + // Send a few more ordered messages + a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(3), PPID(53), {1, 2}), send_options); + + ExchangeMessages(a, z); + + // Receive these messages + absl::optional msg4 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg4.has_value()); + EXPECT_EQ(msg4->stream_id(), StreamID(1)); + absl::optional msg5 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg5.has_value()); + EXPECT_EQ(msg5->stream_id(), StreamID(2)); + absl::optional msg6 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg6.has_value()); + EXPECT_EQ(msg6->stream_id(), StreamID(3)); +} // namespace } // namespace } // namespace dcsctp diff --git a/net/dcsctp/socket/stream_reset_handler.cc b/net/dcsctp/socket/stream_reset_handler.cc index 1c6ce09e56..2d66658c95 100644 --- a/net/dcsctp/socket/stream_reset_handler.cc +++ b/net/dcsctp/socket/stream_reset_handler.cc @@ -270,16 +270,13 @@ absl::optional StreamResetHandler::MakeStreamResetRequest() { // Only send stream resets if there are streams to reset, and no current // ongoing request (there can only be one at a time), and if the stream // can be reset. - if (streams_to_reset_.empty() || current_request_.has_value() || - !retransmission_queue_->CanResetStreams()) { + if (current_request_.has_value() || + !retransmission_queue_->HasStreamsReadyToBeReset()) { return absl::nullopt; } - std::vector streams_to_reset(streams_to_reset_.begin(), - streams_to_reset_.end()); current_request_.emplace(TSN(*retransmission_queue_->next_tsn() - 1), - std::move(streams_to_reset)); - streams_to_reset_.clear(); + retransmission_queue_->GetStreamsReadyToBeReset()); reconfig_timer_->set_duration(ctx_->current_rto()); reconfig_timer_->Start(); return MakeReconfigChunk(); @@ -310,18 +307,8 @@ ReConfigChunk StreamResetHandler::MakeReconfigChunk() { void StreamResetHandler::ResetStreams( rtc::ArrayView outgoing_streams) { - // Enqueue streams to be reset - as this may be called multiple times - // while a request is already in progress (and there can only be one). for (StreamID stream_id : outgoing_streams) { - streams_to_reset_.insert(stream_id); - } - if (current_request_.has_value()) { - // Already an ongoing request - will need to wait for it to finish as - // there can only be one in-flight ReConfig chunk with requests at any - // time. - } else { - retransmission_queue_->PrepareResetStreams(std::vector( - streams_to_reset_.begin(), streams_to_reset_.end())); + retransmission_queue_->PrepareResetStream(stream_id); } } @@ -345,7 +332,7 @@ absl::optional StreamResetHandler::OnReconfigTimerExpiry() { HandoverReadinessStatus StreamResetHandler::GetHandoverReadiness() const { HandoverReadinessStatus status; - if (!streams_to_reset_.empty()) { + if (retransmission_queue_->HasStreamsReadyToBeReset()) { status.Add(HandoverUnreadinessReason::kPendingStreamReset); } if (current_request_.has_value()) { diff --git a/net/dcsctp/socket/stream_reset_handler.h b/net/dcsctp/socket/stream_reset_handler.h index a691eb8312..6e49665538 100644 --- a/net/dcsctp/socket/stream_reset_handler.h +++ b/net/dcsctp/socket/stream_reset_handler.h @@ -216,10 +216,6 @@ class StreamResetHandler { RetransmissionQueue* retransmission_queue_; const std::unique_ptr reconfig_timer_; - // Outgoing streams that have been requested to be reset, but hasn't yet - // been included in an outgoing request. - webrtc::flat_set streams_to_reset_; - // The next sequence number for outgoing stream requests. ReconfigRequestSN next_outgoing_req_seq_nbr_; diff --git a/net/dcsctp/socket/stream_reset_handler_test.cc b/net/dcsctp/socket/stream_reset_handler_test.cc index 6f6874f2a0..1b7463e8d7 100644 --- a/net/dcsctp/socket/stream_reset_handler_test.cc +++ b/net/dcsctp/socket/stream_reset_handler_test.cc @@ -343,10 +343,13 @@ TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) { } TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -360,13 +363,21 @@ TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) { } TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(3); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(40))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(41))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))).Times(2); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(44))); handler_->ResetStreams(std::vector({StreamID(42)})); handler_->ResetStreams( std::vector({StreamID(43), StreamID(44), StreamID(41)})); handler_->ResetStreams(std::vector({StreamID(42), StreamID(40)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return( + std::vector({StreamID(40), StreamID(41), StreamID(42), + StreamID(43), StreamID(44)}))); absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -382,10 +393,10 @@ TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) { } TEST_F(StreamResetHandlerTest, SendOutgoingRequestDeferred) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()) + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) .WillOnce(Return(false)) .WillOnce(Return(false)) .WillOnce(Return(true)); @@ -396,10 +407,12 @@ TEST_F(StreamResetHandlerTest, SendOutgoingRequestDeferred) { } TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); @@ -412,8 +425,8 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) { req.request_sequence_number(), ResponseResult::kSuccessPerformed)); ReConfigChunk response_reconfig(builder.Build()); - EXPECT_CALL(producer_, CommitResetStreams()).Times(1); - EXPECT_CALL(producer_, RollbackResetStreams()).Times(0); + EXPECT_CALL(producer_, CommitResetStreams); + EXPECT_CALL(producer_, RollbackResetStreams).Times(0); // Processing a response shouldn't result in sending anything. EXPECT_CALL(callbacks_, OnError).Times(0); @@ -422,10 +435,12 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) { } TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); @@ -438,8 +453,8 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) { req.request_sequence_number(), ResponseResult::kErrorBadSequenceNumber)); ReConfigChunk response_reconfig(builder.Build()); - EXPECT_CALL(producer_, CommitResetStreams()).Times(0); - EXPECT_CALL(producer_, RollbackResetStreams()).Times(1); + EXPECT_CALL(producer_, CommitResetStreams).Times(0); + EXPECT_CALL(producer_, RollbackResetStreams); // Only requests should result in sending responses. EXPECT_CALL(callbacks_, OnError).Times(0); @@ -450,10 +465,12 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) { TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) { static constexpr StreamID kStreamToReset = StreamID(42); - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(kStreamToReset)); handler_->ResetStreams(std::vector({kStreamToReset})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({kStreamToReset}))); absl::optional reconfig1 = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig1.has_value()); @@ -499,10 +516,13 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) { } TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + absl::optional reconfig1 = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig1.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -514,6 +534,8 @@ TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) { EXPECT_THAT(req1.stream_ids(), UnorderedElementsAre(StreamID(42))); // Streams reset while the request is in-flight will be queued. + EXPECT_CALL(producer_, PrepareResetStream(StreamID(41))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); StreamID stream_ids[] = {StreamID(41), StreamID(43)}; handler_->ResetStreams(stream_ids); EXPECT_EQ(handler_->MakeStreamResetRequest(), absl::nullopt); @@ -532,7 +554,10 @@ TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) { handler_->HandleReConfig(std::move(response_reconfig)); // Response has been processed. A new request can be sent. - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(41), StreamID(43)}))); + absl::optional reconfig2 = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig2.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -591,21 +616,31 @@ TEST_F(StreamResetHandlerTest, SendSameRequestTwiceReturnsNothingToDo) { TEST_F(StreamResetHandlerTest, HandoverIsAllowedOnlyWhenNoStreamIsBeingOrWillBeReset) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); EXPECT_EQ( handler_->GetHandoverReadiness(), HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset)); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value()); EXPECT_EQ(handler_->GetHandoverReadiness(), HandoverReadinessStatus( HandoverUnreadinessReason::kPendingStreamResetRequest)); // Reset more streams while the request is in-flight. + EXPECT_CALL(producer_, PrepareResetStream(StreamID(41))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); StreamID stream_ids[] = {StreamID(41), StreamID(43)}; handler_->ResetStreams(stream_ids); + + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); EXPECT_EQ(handler_->GetHandoverReadiness(), HandoverReadinessStatus() .Add(HandoverUnreadinessReason::kPendingStreamResetRequest) @@ -618,12 +653,18 @@ TEST_F(StreamResetHandlerTest, .Add(ReconfigurationResponseParameter( kMyInitialReqSn, ResponseResult::kSuccessPerformed)) .Build())); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); EXPECT_EQ( handler_->GetHandoverReadiness(), HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset)); // Second request can be sent. - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(41), StreamID(43)}))); + ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value()); EXPECT_EQ(handler_->GetHandoverReadiness(), HandoverReadinessStatus( @@ -638,16 +679,21 @@ TEST_F(StreamResetHandlerTest, .Build())); // Seconds response has been processed. No pending resets. + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(false)); + EXPECT_TRUE(handler_->GetHandoverReadiness().IsReady()); } TEST_F(StreamResetHandlerTest, HandoverInInitialState) { PerformHandover(); - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -663,10 +709,15 @@ TEST_F(StreamResetHandlerTest, HandoverInInitialState) { TEST_F(StreamResetHandlerTest, HandoverAfterHavingResetOneStream) { // Reset one stream { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig, handler_->MakeStreamResetRequest()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -690,10 +741,13 @@ TEST_F(StreamResetHandlerTest, HandoverAfterHavingResetOneStream) { // Reset another stream after handover { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); handler_->ResetStreams(std::vector({StreamID(43)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(43)}))); + ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig, handler_->MakeStreamResetRequest()); ASSERT_HAS_VALUE_AND_ASSIGN( diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h index 0cf64583ae..82e96b7084 100644 --- a/net/dcsctp/tx/mock_send_queue.h +++ b/net/dcsctp/tx/mock_send_queue.h @@ -11,6 +11,7 @@ #define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ #include +#include #include "absl/types/optional.h" #include "api/array_view.h" @@ -35,11 +36,9 @@ class MockSendQueue : public SendQueue { Discard, (IsUnordered unordered, StreamID stream_id, MID message_id), (override)); - MOCK_METHOD(void, - PrepareResetStreams, - (rtc::ArrayView streams), - (override)); - MOCK_METHOD(bool, CanResetStreams, (), (const, override)); + MOCK_METHOD(void, PrepareResetStream, (StreamID stream_id), (override)); + MOCK_METHOD(bool, HasStreamsReadyToBeReset, (), (const, override)); + MOCK_METHOD(std::vector, GetStreamsReadyToBeReset, (), (override)); MOCK_METHOD(void, CommitResetStreams, (), (override)); MOCK_METHOD(void, RollbackResetStreams, (), (override)); MOCK_METHOD(void, Reset, (), (override)); diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index 4afc01bf6e..57559195f0 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -529,16 +529,15 @@ size_t RetransmissionQueue::max_bytes_to_send() const { return std::min(rwnd(), left); } -void RetransmissionQueue::PrepareResetStreams( - rtc::ArrayView streams) { +void RetransmissionQueue::PrepareResetStream(StreamID stream_id) { // TODO(boivie): These calls are now only affecting the send queue. The // packet buffer can also change behavior - for example draining the chunk // producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request" // can be sent quickly, with a known `sender_last_assigned_tsn`. - send_queue_.PrepareResetStreams(streams); + send_queue_.PrepareResetStream(stream_id); } -bool RetransmissionQueue::CanResetStreams() const { - return send_queue_.CanResetStreams(); +bool RetransmissionQueue::HasStreamsReadyToBeReset() const { + return send_queue_.HasStreamsReadyToBeReset(); } void RetransmissionQueue::CommitResetStreams() { send_queue_.CommitResetStreams(); diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h index 1e866b30b5..1958dfd643 100644 --- a/net/dcsctp/tx/retransmission_queue.h +++ b/net/dcsctp/tx/retransmission_queue.h @@ -143,8 +143,11 @@ class RetransmissionQueue { // See the SendQueue for a longer description of these methods related // to stream resetting. - void PrepareResetStreams(rtc::ArrayView streams); - bool CanResetStreams() const; + void PrepareResetStream(StreamID stream_id); + bool HasStreamsReadyToBeReset() const; + std::vector GetStreamsReadyToBeReset() const { + return send_queue_.GetStreamsReadyToBeReset(); + } void CommitResetStreams(); void RollbackResetStreams(); diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index b3e695bc5c..d4ce59d58c 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -41,6 +41,12 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, } bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { + if (pause_state_ == PauseState::kPaused || + pause_state_ == PauseState::kResetting) { + // The stream has paused (and there is no partially sent message). + return false; + } + while (!items_.empty()) { RRSendQueue::OutgoingStream::Item& item = items_.front(); if (item.message_id.has_value()) { @@ -59,10 +65,6 @@ bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { continue; } - if (is_paused_) { - // The stream has paused (and there is no partially sent message). - return false; - } return true; } return false; @@ -139,6 +141,8 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, SendQueue::DataToSend RRSendQueue::OutgoingStream::Produce(TimeMs now, size_t max_size) { RTC_DCHECK(!items_.empty()); + RTC_DCHECK(pause_state_ != PauseState::kPaused && + pause_state_ != PauseState::kResetting); Item* item = &items_.front(); DcSctpMessage& message = item->message; @@ -196,6 +200,12 @@ SendQueue::DataToSend RRSendQueue::OutgoingStream::Produce(TimeMs now, // The entire message has been sent, and its last data copied to `chunk`, so // it can safely be discarded. items_.pop_front(); + + if (pause_state_ == PauseState::kPending) { + RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id + << " is moving from pending to paused"; + pause_state_ = PauseState::kPaused; + } } else { item->remaining_offset += chunk_payload.size(); item->remaining_size -= chunk_payload.size(); @@ -217,6 +227,11 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, buffered_amount_.Decrease(item.remaining_size); total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); + + if (pause_state_ == PauseState::kPending) { + pause_state_ = PauseState::kPaused; + } + // As the item still existed, it had unsent data. result = true; } @@ -226,7 +241,12 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, } void RRSendQueue::OutgoingStream::Pause() { - is_paused_ = true; + if (pause_state_ != PauseState::kNotPaused) { + // Already in progress. + return; + } + + bool had_pending_items = !items_.empty(); // https://datatracker.ietf.org/doc/html/rfc8831#section-6.7 // "Closing of a data channel MUST be signaled by resetting the corresponding @@ -250,10 +270,33 @@ void RRSendQueue::OutgoingStream::Pause() { ++it; } } + + pause_state_ = (items_.empty() || items_.front().remaining_offset == 0) + ? PauseState::kPaused + : PauseState::kPending; + + if (had_pending_items && pause_state_ == PauseState::kPaused) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() + << " was previously active, but is now paused."; + } + + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::OutgoingStream::Resume() { + RTC_DCHECK(pause_state_ == PauseState::kResetting); + if (!items_.empty()) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() + << " was previously paused, but is now active."; + } + pause_state_ = PauseState::kNotPaused; RTC_DCHECK(IsConsistent()); } void RRSendQueue::OutgoingStream::Reset() { + // This can be called both when an outgoing stream reset has been responded + // to, or when the entire SendQueue is reset due to detecting the peer having + // restarted. The stream may be in any state at this time. if (!items_.empty()) { // If this message has been partially sent, reset it so that it will be // re-sent. @@ -268,7 +311,7 @@ void RRSendQueue::OutgoingStream::Reset() { item.ssn = absl::nullopt; item.current_fsn = FSN(0); } - is_paused_ = false; + pause_state_ = PauseState::kNotPaused; next_ordered_mid_ = MID(0); next_unordered_mid_ = MID(0); next_ssn_ = SSN(0); @@ -381,27 +424,39 @@ bool RRSendQueue::Discard(IsUnordered unordered, return has_discarded; } -void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { - for (StreamID stream_id : streams) { - GetOrCreateStreamInfo(stream_id).Pause(); - } +void RRSendQueue::PrepareResetStream(StreamID stream_id) { + GetOrCreateStreamInfo(stream_id).Pause(); RTC_DCHECK(IsConsistent()); } -bool RRSendQueue::CanResetStreams() const { - // Streams can be reset if those streams that are paused don't have any - // messages that are partially sent. +bool RRSendQueue::HasStreamsReadyToBeReset() const { for (auto& [unused, stream] : streams_) { - if (stream.is_paused() && stream.has_partially_sent_message()) { - return false; + if (stream.IsReadyToBeReset()) { + return true; } } - return true; + return false; +} +std::vector RRSendQueue::GetStreamsReadyToBeReset() { + RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) { + return p.second.IsResetting(); + }) == 0); + std::vector ready; + for (auto& [stream_id, stream] : streams_) { + if (stream.IsReadyToBeReset()) { + stream.SetAsResetting(); + ready.push_back(stream_id); + } + } + return ready; } void RRSendQueue::CommitResetStreams() { + RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) { + return p.second.IsResetting(); + }) > 0); for (auto& [unused, stream] : streams_) { - if (stream.is_paused()) { + if (stream.IsResetting()) { stream.Reset(); } } @@ -409,8 +464,13 @@ void RRSendQueue::CommitResetStreams() { } void RRSendQueue::RollbackResetStreams() { + RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) { + return p.second.IsResetting(); + }) > 0); for (auto& [unused, stream] : streams_) { - stream.Resume(); + if (stream.IsResetting()) { + stream.Resume(); + } } RTC_DCHECK(IsConsistent()); } @@ -455,6 +515,7 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( return streams_ .emplace(stream_id, OutgoingStream( + stream_id, [this, stream_id]() { on_buffered_amount_low_(stream_id); }, total_buffered_amount_)) .first->second; @@ -482,6 +543,7 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { state.tx.streams) { StreamID stream_id(state_stream.id); streams_.emplace(stream_id, OutgoingStream( + stream_id, [this, stream_id]() { on_buffered_amount_low_(stream_id); }, diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index 6da585df85..57a43ccd66 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -15,6 +15,7 @@ #include #include #include +#include #include "absl/algorithm/container.h" #include "absl/strings/string_view.h" @@ -65,8 +66,9 @@ class RRSendQueue : public SendQueue { bool Discard(IsUnordered unordered, StreamID stream_id, MID message_id) override; - void PrepareResetStreams(rtc::ArrayView streams) override; - bool CanResetStreams() const override; + void PrepareResetStream(StreamID streams) override; + bool HasStreamsReadyToBeReset() const override; + std::vector GetStreamsReadyToBeReset() override; void CommitResetStreams() override; void RollbackResetStreams() override; void Reset() override; @@ -108,16 +110,20 @@ class RRSendQueue : public SendQueue { // Per-stream information. class OutgoingStream { public: - explicit OutgoingStream( + OutgoingStream( + StreamID stream_id, std::function on_buffered_amount_low, ThresholdWatcher& total_buffered_amount, const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) - : next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), + : stream_id_(stream_id), + next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)), buffered_amount_(std::move(on_buffered_amount_low)), total_buffered_amount_(total_buffered_amount) {} + StreamID stream_id() const { return stream_id_; } + // Enqueues a message to this stream. void Add(DcSctpMessage message, TimeMs expires_at, @@ -137,9 +143,18 @@ class RRSendQueue : public SendQueue { void Pause(); // Resumes a paused stream. - void Resume() { is_paused_ = false; } + void Resume(); - bool is_paused() const { return is_paused_; } + bool IsReadyToBeReset() const { + return pause_state_ == PauseState::kPaused; + } + + bool IsResetting() const { return pause_state_ == PauseState::kResetting; } + + void SetAsResetting() { + RTC_DCHECK(pause_state_ == PauseState::kPaused); + pause_state_ = PauseState::kResetting; + } // Resets this stream, meaning MIDs and SSNs are set to zero. void Reset(); @@ -155,6 +170,26 @@ class RRSendQueue : public SendQueue { DcSctpSocketHandoverState::OutgoingStream& state) const; private: + // Streams are paused before they can be reset. To reset a stream, the + // socket sends an outgoing stream reset command with the TSN of the last + // fragment of the last message, so that receivers and senders can agree on + // when it stopped. And if the send queue is in the middle of sending a + // message, and without fragments not yet sent and without TSNs allocated to + // them, it will keep sending data until that message has ended. + enum class PauseState { + // The stream is not paused, and not scheduled to be reset. + kNotPaused, + // The stream has requested to be reset/paused but is still producing + // fragments of a message that hasn't ended yet. When it does, it will + // transition to the `kPaused` state. + kPending, + // The stream is fully paused and can be reset. + kPaused, + // The stream has been added to an outgoing stream reset request and a + // response from the peer hasn't been received yet. + kResetting, + }; + // An enqueued message and metadata. struct Item { explicit Item(DcSctpMessage msg, @@ -182,8 +217,8 @@ class RRSendQueue : public SendQueue { bool IsConsistent() const; - // Streams are pause when they are about to be reset. - bool is_paused_ = false; + const StreamID stream_id_; + PauseState pause_state_ = PauseState::kNotPaused; // MIDs are different for unordered and ordered messages sent on a stream. MID next_unordered_mid_; MID next_ordered_mid_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index a93c4a3d1c..fbbce58de1 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -26,6 +26,7 @@ namespace dcsctp { namespace { using ::testing::SizeIs; +using ::testing::UnorderedElementsAre; constexpr TimeMs kNow = TimeMs(0); constexpr StreamID kStreamID(1); @@ -252,10 +253,13 @@ TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); EXPECT_EQ(buf_.total_buffered_amount(), 8u); - buf_.PrepareResetStreams(std::vector({StreamID(1)})); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), 5u); + + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); buf_.CommitResetStreams(); - buf_.PrepareResetStreams(std::vector({StreamID(2)})); + buf_.PrepareResetStream(StreamID(2)); EXPECT_EQ(buf_.total_buffered_amount(), 0u); } @@ -270,21 +274,27 @@ TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50); - StreamID stream_ids[] = {StreamID(1)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50); } TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { std::vector payload(50); - buf_.PrepareResetStreams(std::vector({StreamID(1)})); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), 0u); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); + + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + buf_.CommitResetStreams(); EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); @@ -309,8 +319,7 @@ TEST_F(RRSendQueueTest, PausedStreamsStillSendPartialMessagesUntilEnd) { EXPECT_EQ(buf_.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize); // This will stop the second message from being sent. - StreamID stream_ids[] = {StreamID(1)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize); // Should still produce fragments until end of message. @@ -340,13 +349,14 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) { ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); - StreamID stream_ids[] = {StreamID(1)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(1)); // Buffered buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_TRUE(buf_.CanResetStreams()); + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); buf_.CommitResetStreams(); absl::optional chunk_three = @@ -373,14 +383,16 @@ TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) { EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ssn, SSN(0)); - StreamID stream_ids[] = {StreamID(3)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(3)); // Send two more messages - SID 3 will buffer, SID 1 will send. buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload)); - EXPECT_TRUE(buf_.CanResetStreams()); + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(3))); + buf_.CommitResetStreams(); absl::optional chunk_three = @@ -412,12 +424,14 @@ TEST_F(RRSendQueueTest, RollBackResumesSSN) { ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); - buf_.PrepareResetStreams(std::vector({StreamID(1)})); + buf_.PrepareResetStream(StreamID(1)); // Buffered buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_TRUE(buf_.CanResetStreams()); + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); buf_.RollbackResetStreams(); absl::optional chunk_three = diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h index a821d20785..b2e5a9d436 100644 --- a/net/dcsctp/tx/send_queue.h +++ b/net/dcsctp/tx/send_queue.h @@ -67,11 +67,11 @@ class SendQueue { StreamID stream_id, MID message_id) = 0; - // Prepares the streams to be reset. This is used to close a WebRTC data + // Prepares the stream to be reset. This is used to close a WebRTC data // channel and will be signaled to the other side. // // Concretely, it discards all whole (not partly sent) messages in the given - // streams and pauses those streams so that future added messages aren't + // stream and pauses that stream so that future added messages aren't // produced until `ResumeStreams` is called. // // TODO(boivie): Investigate if it really should discard any message at all. @@ -82,24 +82,28 @@ class SendQueue { // reset, and paused while they are resetting. This is the first part of the // two-phase commit protocol to reset streams, where the caller completes the // procedure by either calling `CommitResetStreams` or `RollbackResetStreams`. - virtual void PrepareResetStreams(rtc::ArrayView streams) = 0; + virtual void PrepareResetStream(StreamID stream_id) = 0; - // Returns true if all non-discarded messages during `PrepareResetStreams` - // (which are those that was partially sent before that method was called) - // have been sent. - virtual bool CanResetStreams() const = 0; + // Indicates if there are any streams that are ready to be reset. + virtual bool HasStreamsReadyToBeReset() const = 0; - // Called to commit to reset the streams provided to `PrepareResetStreams`. - // It will reset the stream sequence numbers (SSNs) and message identifiers - // (MIDs) and resume the paused streams. + // Returns a list of streams that are ready to be included in an outgoing + // stream reset request. Any streams that are returned here must be included + // in an outgoing stream reset request, and there must not be concurrent + // requests. Before calling this method again, you must have called + virtual std::vector GetStreamsReadyToBeReset() = 0; + + // Called to commit to reset the streams returned by + // `GetStreamsReadyToBeReset`. It will reset the stream sequence numbers + // (SSNs) and message identifiers (MIDs) and resume the paused streams. virtual void CommitResetStreams() = 0; - // Called to abort the resetting of streams provided to `PrepareResetStreams`. - // Will resume the paused streams without resetting the stream sequence - // numbers (SSNs) or message identifiers (MIDs). Note that the non-partial - // messages that were discarded when calling `PrepareResetStreams` will not be - // recovered, to better match the intention from the sender to "close the - // channel". + // Called to abort the resetting of streams returned by + // `GetStreamsReadyToBeReset`. Will resume the paused streams without + // resetting the stream sequence numbers (SSNs) or message identifiers (MIDs). + // Note that the non-partial messages that were discarded when calling + // `PrepareResetStreams` will not be recovered, to better match the intention + // from the sender to "close the channel". virtual void RollbackResetStreams() = 0; // Resets all message identifier counters (MID, SSN) and makes all partially