From d7fd0f9744bc0ed89857d1ec78c4299f19b9fb97 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Mon, 2 May 2022 17:15:57 +0200 Subject: [PATCH] dcsctp: Handle rapid closing of streams When streams were to be reset, but there was already an ongoing stream reset command in-flight, those streams wouldn't be properly reset. When multiple streams were reset close to each other (within an RTT), some streams would not have their SSNs reset, which resulted in the stream resuming the SSN sequence. This could result in ordered streams not delivering all messages as the receiver wouldn't deliver any messages with SSN different from the expected SSN=0. In WebRTC data channels, this would be triggered if multiple channels were closed at roughly the same time, then re-opened, and continued to be used in ordered mode. Unordered messages would still be delivered, but the stream state could be wrong as the DATA_CHANNEL_ACK message is sent ordered, and possibly not delivered. There were unit tests for this, but not on the socket level using real components, but just on the stream reset handler using mocks, where this issue wasn't found. Also, those mocks didn't validate that the correct parameters were provided, so that's fixed now. The root cause was the PrepareResetStreams was only called if there wasn't an ongoing stream reset operation in progress. One may try to solve it by always calling PrepareResetStreams also when there is an ongoing request, or to call it when the request has finished. One would then realize that when the response of the outgoing stream request is received, and CommitResetStreams is called, it would reset all paused and (prepared) to-be-reset streams - not just the ones in the outgoing stream request. One cause of this was the lack of a single source of truth of the stream states. The SendQueue kept track of which streams that were paused, but the stream reset handler kept track of which streams that were resetting. As that's error prone, this CL moves the source of truth completely to the SendQueue and defining explicit stream pause states. A stream can be in one of these possible states: * Not paused. This is the default for an active stream. * Pending to be paused. This is when it's about to be reset, but there is a message that has been partly sent, with fragments remaining to be sent before it can be paused. * Paused, with no partly sent message. In this state, it's ready to be reset. * Resetting. A stream transitions into this state when it has been paused and has been included in an outgoing stream reset request. When this request has been responded to, the stream can really be reset (SSN=0, MID=0). This CL also improves logging, and adds socket tests to catch this issue. Bug: webrtc:13994, chromium:1320194 Change-Id: I883570d1f277bc01e52b1afad62d6be2aca930a2 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261180 Reviewed-by: Harald Alvestrand Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/main@{#36771} --- net/dcsctp/socket/dcsctp_socket_test.cc | 110 ++++++++++++++++++ net/dcsctp/socket/stream_reset_handler.cc | 23 +--- net/dcsctp/socket/stream_reset_handler.h | 4 - .../socket/stream_reset_handler_test.cc | 110 +++++++++++++----- net/dcsctp/tx/mock_send_queue.h | 9 +- net/dcsctp/tx/retransmission_queue.cc | 9 +- net/dcsctp/tx/retransmission_queue.h | 7 +- net/dcsctp/tx/rr_send_queue.cc | 98 +++++++++++++--- net/dcsctp/tx/rr_send_queue.h | 51 ++++++-- net/dcsctp/tx/rr_send_queue_test.cc | 44 ++++--- net/dcsctp/tx/send_queue.h | 36 +++--- 11 files changed, 382 insertions(+), 119 deletions(-) diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index b4bc9c486e..914bea34c4 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -36,6 +36,7 @@ #include "net/dcsctp/packet/error_cause/error_cause.h" #include "net/dcsctp/packet/error_cause/unrecognized_chunk_type_cause.h" #include "net/dcsctp/packet/parameter/heartbeat_info_parameter.h" +#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/parameter.h" #include "net/dcsctp/packet/sctp_packet.h" #include "net/dcsctp/packet/tlv_trait.h" @@ -229,6 +230,44 @@ MATCHER(HasSackWithNoGapAckBlocks, "") { return true; } +MATCHER_P(HasReconfigWithStreams, streams_matcher, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != ReConfigChunk::kType) { + *result_listener << "the first chunk in the packet is not a data chunk"; + return false; + } + + absl::optional reconfig = + ReConfigChunk::Parse(packet->descriptors()[0].data); + if (!reconfig.has_value()) { + *result_listener << "The first chunk didn't parse as a data chunk"; + return false; + } + + const Parameters& parameters = reconfig->parameters(); + if (parameters.descriptors().size() != 1 || + parameters.descriptors()[0].type != + OutgoingSSNResetRequestParameter::kType) { + *result_listener << "Expected the reconfig chunk to have an outgoing SSN " + "reset request parameter"; + return false; + } + + absl::optional p = + OutgoingSSNResetRequestParameter::Parse(parameters.descriptors()[0].data); + testing::Matcher> matcher = streams_matcher; + if (!matcher.MatchAndExplain(p->stream_ids(), result_listener)) { + return false; + } + + return true; +} + TSN AddTo(TSN tsn, int delta) { return TSN(*tsn + delta); } @@ -2232,5 +2271,76 @@ TEST(DcSctpSocketTest, CloseThreeStreamsAtTheSameTime) { ExchangeMessages(a, z); } + +TEST(DcSctpSocketTest, CloseStreamsWithPendingRequest) { + // Checks that stream reset requests are properly paused when they can't be + // immediately reset - i.e. when there is already an ongoing stream reset + // request (and there can only be a single one in-flight). + SocketUnderTest a("A"); + SocketUnderTest z("Z"); + + EXPECT_CALL(z.cb, OnIncomingStreamsReset(ElementsAre(StreamID(1)))).Times(1); + EXPECT_CALL(z.cb, OnIncomingStreamsReset( + UnorderedElementsAre(StreamID(2), StreamID(3)))) + .Times(1); + EXPECT_CALL(a.cb, OnStreamsResetPerformed(ElementsAre(StreamID(1)))).Times(1); + EXPECT_CALL(a.cb, OnStreamsResetPerformed( + UnorderedElementsAre(StreamID(2), StreamID(3)))) + .Times(1); + + ConnectSockets(a, z); + + SendOptions send_options = {.unordered = IsUnordered(false)}; + + // Send a few ordered messages + a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(3), PPID(53), {1, 2}), send_options); + + ExchangeMessages(a, z); + + // Receive these messages + absl::optional msg1 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg1.has_value()); + EXPECT_EQ(msg1->stream_id(), StreamID(1)); + absl::optional msg2 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg2.has_value()); + EXPECT_EQ(msg2->stream_id(), StreamID(2)); + absl::optional msg3 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg3.has_value()); + EXPECT_EQ(msg3->stream_id(), StreamID(3)); + + // Reset the streams - not all at once. + a.socket.ResetStreams(std::vector({StreamID(1)})); + + std::vector packet = a.cb.ConsumeSentPacket(); + EXPECT_THAT(packet, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + z.socket.ReceivePacket(std::move(packet)); + + // Sending more reset requests while this one is ongoing. + + a.socket.ResetStreams(std::vector({StreamID(2)})); + a.socket.ResetStreams(std::vector({StreamID(3)})); + + ExchangeMessages(a, z); + + // Send a few more ordered messages + a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), send_options); + a.socket.Send(DcSctpMessage(StreamID(3), PPID(53), {1, 2}), send_options); + + ExchangeMessages(a, z); + + // Receive these messages + absl::optional msg4 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg4.has_value()); + EXPECT_EQ(msg4->stream_id(), StreamID(1)); + absl::optional msg5 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg5.has_value()); + EXPECT_EQ(msg5->stream_id(), StreamID(2)); + absl::optional msg6 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg6.has_value()); + EXPECT_EQ(msg6->stream_id(), StreamID(3)); +} // namespace } // namespace } // namespace dcsctp diff --git a/net/dcsctp/socket/stream_reset_handler.cc b/net/dcsctp/socket/stream_reset_handler.cc index 1c6ce09e56..2d66658c95 100644 --- a/net/dcsctp/socket/stream_reset_handler.cc +++ b/net/dcsctp/socket/stream_reset_handler.cc @@ -270,16 +270,13 @@ absl::optional StreamResetHandler::MakeStreamResetRequest() { // Only send stream resets if there are streams to reset, and no current // ongoing request (there can only be one at a time), and if the stream // can be reset. - if (streams_to_reset_.empty() || current_request_.has_value() || - !retransmission_queue_->CanResetStreams()) { + if (current_request_.has_value() || + !retransmission_queue_->HasStreamsReadyToBeReset()) { return absl::nullopt; } - std::vector streams_to_reset(streams_to_reset_.begin(), - streams_to_reset_.end()); current_request_.emplace(TSN(*retransmission_queue_->next_tsn() - 1), - std::move(streams_to_reset)); - streams_to_reset_.clear(); + retransmission_queue_->GetStreamsReadyToBeReset()); reconfig_timer_->set_duration(ctx_->current_rto()); reconfig_timer_->Start(); return MakeReconfigChunk(); @@ -310,18 +307,8 @@ ReConfigChunk StreamResetHandler::MakeReconfigChunk() { void StreamResetHandler::ResetStreams( rtc::ArrayView outgoing_streams) { - // Enqueue streams to be reset - as this may be called multiple times - // while a request is already in progress (and there can only be one). for (StreamID stream_id : outgoing_streams) { - streams_to_reset_.insert(stream_id); - } - if (current_request_.has_value()) { - // Already an ongoing request - will need to wait for it to finish as - // there can only be one in-flight ReConfig chunk with requests at any - // time. - } else { - retransmission_queue_->PrepareResetStreams(std::vector( - streams_to_reset_.begin(), streams_to_reset_.end())); + retransmission_queue_->PrepareResetStream(stream_id); } } @@ -345,7 +332,7 @@ absl::optional StreamResetHandler::OnReconfigTimerExpiry() { HandoverReadinessStatus StreamResetHandler::GetHandoverReadiness() const { HandoverReadinessStatus status; - if (!streams_to_reset_.empty()) { + if (retransmission_queue_->HasStreamsReadyToBeReset()) { status.Add(HandoverUnreadinessReason::kPendingStreamReset); } if (current_request_.has_value()) { diff --git a/net/dcsctp/socket/stream_reset_handler.h b/net/dcsctp/socket/stream_reset_handler.h index a691eb8312..6e49665538 100644 --- a/net/dcsctp/socket/stream_reset_handler.h +++ b/net/dcsctp/socket/stream_reset_handler.h @@ -216,10 +216,6 @@ class StreamResetHandler { RetransmissionQueue* retransmission_queue_; const std::unique_ptr reconfig_timer_; - // Outgoing streams that have been requested to be reset, but hasn't yet - // been included in an outgoing request. - webrtc::flat_set streams_to_reset_; - // The next sequence number for outgoing stream requests. ReconfigRequestSN next_outgoing_req_seq_nbr_; diff --git a/net/dcsctp/socket/stream_reset_handler_test.cc b/net/dcsctp/socket/stream_reset_handler_test.cc index 6f6874f2a0..1b7463e8d7 100644 --- a/net/dcsctp/socket/stream_reset_handler_test.cc +++ b/net/dcsctp/socket/stream_reset_handler_test.cc @@ -343,10 +343,13 @@ TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) { } TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -360,13 +363,21 @@ TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) { } TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(3); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(40))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(41))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))).Times(2); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(44))); handler_->ResetStreams(std::vector({StreamID(42)})); handler_->ResetStreams( std::vector({StreamID(43), StreamID(44), StreamID(41)})); handler_->ResetStreams(std::vector({StreamID(42), StreamID(40)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return( + std::vector({StreamID(40), StreamID(41), StreamID(42), + StreamID(43), StreamID(44)}))); absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -382,10 +393,10 @@ TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) { } TEST_F(StreamResetHandlerTest, SendOutgoingRequestDeferred) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()) + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) .WillOnce(Return(false)) .WillOnce(Return(false)) .WillOnce(Return(true)); @@ -396,10 +407,12 @@ TEST_F(StreamResetHandlerTest, SendOutgoingRequestDeferred) { } TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); @@ -412,8 +425,8 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) { req.request_sequence_number(), ResponseResult::kSuccessPerformed)); ReConfigChunk response_reconfig(builder.Build()); - EXPECT_CALL(producer_, CommitResetStreams()).Times(1); - EXPECT_CALL(producer_, RollbackResetStreams()).Times(0); + EXPECT_CALL(producer_, CommitResetStreams); + EXPECT_CALL(producer_, RollbackResetStreams).Times(0); // Processing a response shouldn't result in sending anything. EXPECT_CALL(callbacks_, OnError).Times(0); @@ -422,10 +435,12 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) { } TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); @@ -438,8 +453,8 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) { req.request_sequence_number(), ResponseResult::kErrorBadSequenceNumber)); ReConfigChunk response_reconfig(builder.Build()); - EXPECT_CALL(producer_, CommitResetStreams()).Times(0); - EXPECT_CALL(producer_, RollbackResetStreams()).Times(1); + EXPECT_CALL(producer_, CommitResetStreams).Times(0); + EXPECT_CALL(producer_, RollbackResetStreams); // Only requests should result in sending responses. EXPECT_CALL(callbacks_, OnError).Times(0); @@ -450,10 +465,12 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) { TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) { static constexpr StreamID kStreamToReset = StreamID(42); - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(kStreamToReset)); handler_->ResetStreams(std::vector({kStreamToReset})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({kStreamToReset}))); absl::optional reconfig1 = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig1.has_value()); @@ -499,10 +516,13 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) { } TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + absl::optional reconfig1 = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig1.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -514,6 +534,8 @@ TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) { EXPECT_THAT(req1.stream_ids(), UnorderedElementsAre(StreamID(42))); // Streams reset while the request is in-flight will be queued. + EXPECT_CALL(producer_, PrepareResetStream(StreamID(41))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); StreamID stream_ids[] = {StreamID(41), StreamID(43)}; handler_->ResetStreams(stream_ids); EXPECT_EQ(handler_->MakeStreamResetRequest(), absl::nullopt); @@ -532,7 +554,10 @@ TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) { handler_->HandleReConfig(std::move(response_reconfig)); // Response has been processed. A new request can be sent. - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(41), StreamID(43)}))); + absl::optional reconfig2 = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig2.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -591,21 +616,31 @@ TEST_F(StreamResetHandlerTest, SendSameRequestTwiceReturnsNothingToDo) { TEST_F(StreamResetHandlerTest, HandoverIsAllowedOnlyWhenNoStreamIsBeingOrWillBeReset) { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); EXPECT_EQ( handler_->GetHandoverReadiness(), HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset)); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value()); EXPECT_EQ(handler_->GetHandoverReadiness(), HandoverReadinessStatus( HandoverUnreadinessReason::kPendingStreamResetRequest)); // Reset more streams while the request is in-flight. + EXPECT_CALL(producer_, PrepareResetStream(StreamID(41))); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); StreamID stream_ids[] = {StreamID(41), StreamID(43)}; handler_->ResetStreams(stream_ids); + + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); EXPECT_EQ(handler_->GetHandoverReadiness(), HandoverReadinessStatus() .Add(HandoverUnreadinessReason::kPendingStreamResetRequest) @@ -618,12 +653,18 @@ TEST_F(StreamResetHandlerTest, .Add(ReconfigurationResponseParameter( kMyInitialReqSn, ResponseResult::kSuccessPerformed)) .Build())); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); EXPECT_EQ( handler_->GetHandoverReadiness(), HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset)); // Second request can be sent. - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(41), StreamID(43)}))); + ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value()); EXPECT_EQ(handler_->GetHandoverReadiness(), HandoverReadinessStatus( @@ -638,16 +679,21 @@ TEST_F(StreamResetHandlerTest, .Build())); // Seconds response has been processed. No pending resets. + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(false)); + EXPECT_TRUE(handler_->GetHandoverReadiness().IsReady()); } TEST_F(StreamResetHandlerTest, HandoverInInitialState) { PerformHandover(); - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + absl::optional reconfig = handler_->MakeStreamResetRequest(); ASSERT_TRUE(reconfig.has_value()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -663,10 +709,15 @@ TEST_F(StreamResetHandlerTest, HandoverInInitialState) { TEST_F(StreamResetHandlerTest, HandoverAfterHavingResetOneStream) { // Reset one stream { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))); handler_->ResetStreams(std::vector({StreamID(42)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(42)}))); + ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig, handler_->MakeStreamResetRequest()); ASSERT_HAS_VALUE_AND_ASSIGN( @@ -690,10 +741,13 @@ TEST_F(StreamResetHandlerTest, HandoverAfterHavingResetOneStream) { // Reset another stream after handover { - EXPECT_CALL(producer_, PrepareResetStreams).Times(1); + EXPECT_CALL(producer_, PrepareResetStream(StreamID(43))); handler_->ResetStreams(std::vector({StreamID(43)})); - EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true)); + EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true)); + EXPECT_CALL(producer_, GetStreamsReadyToBeReset()) + .WillOnce(Return(std::vector({StreamID(43)}))); + ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig, handler_->MakeStreamResetRequest()); ASSERT_HAS_VALUE_AND_ASSIGN( diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h index 0cf64583ae..82e96b7084 100644 --- a/net/dcsctp/tx/mock_send_queue.h +++ b/net/dcsctp/tx/mock_send_queue.h @@ -11,6 +11,7 @@ #define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_ #include +#include #include "absl/types/optional.h" #include "api/array_view.h" @@ -35,11 +36,9 @@ class MockSendQueue : public SendQueue { Discard, (IsUnordered unordered, StreamID stream_id, MID message_id), (override)); - MOCK_METHOD(void, - PrepareResetStreams, - (rtc::ArrayView streams), - (override)); - MOCK_METHOD(bool, CanResetStreams, (), (const, override)); + MOCK_METHOD(void, PrepareResetStream, (StreamID stream_id), (override)); + MOCK_METHOD(bool, HasStreamsReadyToBeReset, (), (const, override)); + MOCK_METHOD(std::vector, GetStreamsReadyToBeReset, (), (override)); MOCK_METHOD(void, CommitResetStreams, (), (override)); MOCK_METHOD(void, RollbackResetStreams, (), (override)); MOCK_METHOD(void, Reset, (), (override)); diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index 4afc01bf6e..57559195f0 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -529,16 +529,15 @@ size_t RetransmissionQueue::max_bytes_to_send() const { return std::min(rwnd(), left); } -void RetransmissionQueue::PrepareResetStreams( - rtc::ArrayView streams) { +void RetransmissionQueue::PrepareResetStream(StreamID stream_id) { // TODO(boivie): These calls are now only affecting the send queue. The // packet buffer can also change behavior - for example draining the chunk // producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request" // can be sent quickly, with a known `sender_last_assigned_tsn`. - send_queue_.PrepareResetStreams(streams); + send_queue_.PrepareResetStream(stream_id); } -bool RetransmissionQueue::CanResetStreams() const { - return send_queue_.CanResetStreams(); +bool RetransmissionQueue::HasStreamsReadyToBeReset() const { + return send_queue_.HasStreamsReadyToBeReset(); } void RetransmissionQueue::CommitResetStreams() { send_queue_.CommitResetStreams(); diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h index 1e866b30b5..1958dfd643 100644 --- a/net/dcsctp/tx/retransmission_queue.h +++ b/net/dcsctp/tx/retransmission_queue.h @@ -143,8 +143,11 @@ class RetransmissionQueue { // See the SendQueue for a longer description of these methods related // to stream resetting. - void PrepareResetStreams(rtc::ArrayView streams); - bool CanResetStreams() const; + void PrepareResetStream(StreamID stream_id); + bool HasStreamsReadyToBeReset() const; + std::vector GetStreamsReadyToBeReset() const { + return send_queue_.GetStreamsReadyToBeReset(); + } void CommitResetStreams(); void RollbackResetStreams(); diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index b3e695bc5c..d4ce59d58c 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -41,6 +41,12 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, } bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { + if (pause_state_ == PauseState::kPaused || + pause_state_ == PauseState::kResetting) { + // The stream has paused (and there is no partially sent message). + return false; + } + while (!items_.empty()) { RRSendQueue::OutgoingStream::Item& item = items_.front(); if (item.message_id.has_value()) { @@ -59,10 +65,6 @@ bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) { continue; } - if (is_paused_) { - // The stream has paused (and there is no partially sent message). - return false; - } return true; } return false; @@ -139,6 +141,8 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, SendQueue::DataToSend RRSendQueue::OutgoingStream::Produce(TimeMs now, size_t max_size) { RTC_DCHECK(!items_.empty()); + RTC_DCHECK(pause_state_ != PauseState::kPaused && + pause_state_ != PauseState::kResetting); Item* item = &items_.front(); DcSctpMessage& message = item->message; @@ -196,6 +200,12 @@ SendQueue::DataToSend RRSendQueue::OutgoingStream::Produce(TimeMs now, // The entire message has been sent, and its last data copied to `chunk`, so // it can safely be discarded. items_.pop_front(); + + if (pause_state_ == PauseState::kPending) { + RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id + << " is moving from pending to paused"; + pause_state_ = PauseState::kPaused; + } } else { item->remaining_offset += chunk_payload.size(); item->remaining_size -= chunk_payload.size(); @@ -217,6 +227,11 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, buffered_amount_.Decrease(item.remaining_size); total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); + + if (pause_state_ == PauseState::kPending) { + pause_state_ = PauseState::kPaused; + } + // As the item still existed, it had unsent data. result = true; } @@ -226,7 +241,12 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, } void RRSendQueue::OutgoingStream::Pause() { - is_paused_ = true; + if (pause_state_ != PauseState::kNotPaused) { + // Already in progress. + return; + } + + bool had_pending_items = !items_.empty(); // https://datatracker.ietf.org/doc/html/rfc8831#section-6.7 // "Closing of a data channel MUST be signaled by resetting the corresponding @@ -250,10 +270,33 @@ void RRSendQueue::OutgoingStream::Pause() { ++it; } } + + pause_state_ = (items_.empty() || items_.front().remaining_offset == 0) + ? PauseState::kPaused + : PauseState::kPending; + + if (had_pending_items && pause_state_ == PauseState::kPaused) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() + << " was previously active, but is now paused."; + } + + RTC_DCHECK(IsConsistent()); +} + +void RRSendQueue::OutgoingStream::Resume() { + RTC_DCHECK(pause_state_ == PauseState::kResetting); + if (!items_.empty()) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id() + << " was previously paused, but is now active."; + } + pause_state_ = PauseState::kNotPaused; RTC_DCHECK(IsConsistent()); } void RRSendQueue::OutgoingStream::Reset() { + // This can be called both when an outgoing stream reset has been responded + // to, or when the entire SendQueue is reset due to detecting the peer having + // restarted. The stream may be in any state at this time. if (!items_.empty()) { // If this message has been partially sent, reset it so that it will be // re-sent. @@ -268,7 +311,7 @@ void RRSendQueue::OutgoingStream::Reset() { item.ssn = absl::nullopt; item.current_fsn = FSN(0); } - is_paused_ = false; + pause_state_ = PauseState::kNotPaused; next_ordered_mid_ = MID(0); next_unordered_mid_ = MID(0); next_ssn_ = SSN(0); @@ -381,27 +424,39 @@ bool RRSendQueue::Discard(IsUnordered unordered, return has_discarded; } -void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { - for (StreamID stream_id : streams) { - GetOrCreateStreamInfo(stream_id).Pause(); - } +void RRSendQueue::PrepareResetStream(StreamID stream_id) { + GetOrCreateStreamInfo(stream_id).Pause(); RTC_DCHECK(IsConsistent()); } -bool RRSendQueue::CanResetStreams() const { - // Streams can be reset if those streams that are paused don't have any - // messages that are partially sent. +bool RRSendQueue::HasStreamsReadyToBeReset() const { for (auto& [unused, stream] : streams_) { - if (stream.is_paused() && stream.has_partially_sent_message()) { - return false; + if (stream.IsReadyToBeReset()) { + return true; } } - return true; + return false; +} +std::vector RRSendQueue::GetStreamsReadyToBeReset() { + RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) { + return p.second.IsResetting(); + }) == 0); + std::vector ready; + for (auto& [stream_id, stream] : streams_) { + if (stream.IsReadyToBeReset()) { + stream.SetAsResetting(); + ready.push_back(stream_id); + } + } + return ready; } void RRSendQueue::CommitResetStreams() { + RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) { + return p.second.IsResetting(); + }) > 0); for (auto& [unused, stream] : streams_) { - if (stream.is_paused()) { + if (stream.IsResetting()) { stream.Reset(); } } @@ -409,8 +464,13 @@ void RRSendQueue::CommitResetStreams() { } void RRSendQueue::RollbackResetStreams() { + RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) { + return p.second.IsResetting(); + }) > 0); for (auto& [unused, stream] : streams_) { - stream.Resume(); + if (stream.IsResetting()) { + stream.Resume(); + } } RTC_DCHECK(IsConsistent()); } @@ -455,6 +515,7 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( return streams_ .emplace(stream_id, OutgoingStream( + stream_id, [this, stream_id]() { on_buffered_amount_low_(stream_id); }, total_buffered_amount_)) .first->second; @@ -482,6 +543,7 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { state.tx.streams) { StreamID stream_id(state_stream.id); streams_.emplace(stream_id, OutgoingStream( + stream_id, [this, stream_id]() { on_buffered_amount_low_(stream_id); }, diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index 6da585df85..57a43ccd66 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -15,6 +15,7 @@ #include #include #include +#include #include "absl/algorithm/container.h" #include "absl/strings/string_view.h" @@ -65,8 +66,9 @@ class RRSendQueue : public SendQueue { bool Discard(IsUnordered unordered, StreamID stream_id, MID message_id) override; - void PrepareResetStreams(rtc::ArrayView streams) override; - bool CanResetStreams() const override; + void PrepareResetStream(StreamID streams) override; + bool HasStreamsReadyToBeReset() const override; + std::vector GetStreamsReadyToBeReset() override; void CommitResetStreams() override; void RollbackResetStreams() override; void Reset() override; @@ -108,16 +110,20 @@ class RRSendQueue : public SendQueue { // Per-stream information. class OutgoingStream { public: - explicit OutgoingStream( + OutgoingStream( + StreamID stream_id, std::function on_buffered_amount_low, ThresholdWatcher& total_buffered_amount, const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) - : next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), + : stream_id_(stream_id), + next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)), buffered_amount_(std::move(on_buffered_amount_low)), total_buffered_amount_(total_buffered_amount) {} + StreamID stream_id() const { return stream_id_; } + // Enqueues a message to this stream. void Add(DcSctpMessage message, TimeMs expires_at, @@ -137,9 +143,18 @@ class RRSendQueue : public SendQueue { void Pause(); // Resumes a paused stream. - void Resume() { is_paused_ = false; } + void Resume(); - bool is_paused() const { return is_paused_; } + bool IsReadyToBeReset() const { + return pause_state_ == PauseState::kPaused; + } + + bool IsResetting() const { return pause_state_ == PauseState::kResetting; } + + void SetAsResetting() { + RTC_DCHECK(pause_state_ == PauseState::kPaused); + pause_state_ = PauseState::kResetting; + } // Resets this stream, meaning MIDs and SSNs are set to zero. void Reset(); @@ -155,6 +170,26 @@ class RRSendQueue : public SendQueue { DcSctpSocketHandoverState::OutgoingStream& state) const; private: + // Streams are paused before they can be reset. To reset a stream, the + // socket sends an outgoing stream reset command with the TSN of the last + // fragment of the last message, so that receivers and senders can agree on + // when it stopped. And if the send queue is in the middle of sending a + // message, and without fragments not yet sent and without TSNs allocated to + // them, it will keep sending data until that message has ended. + enum class PauseState { + // The stream is not paused, and not scheduled to be reset. + kNotPaused, + // The stream has requested to be reset/paused but is still producing + // fragments of a message that hasn't ended yet. When it does, it will + // transition to the `kPaused` state. + kPending, + // The stream is fully paused and can be reset. + kPaused, + // The stream has been added to an outgoing stream reset request and a + // response from the peer hasn't been received yet. + kResetting, + }; + // An enqueued message and metadata. struct Item { explicit Item(DcSctpMessage msg, @@ -182,8 +217,8 @@ class RRSendQueue : public SendQueue { bool IsConsistent() const; - // Streams are pause when they are about to be reset. - bool is_paused_ = false; + const StreamID stream_id_; + PauseState pause_state_ = PauseState::kNotPaused; // MIDs are different for unordered and ordered messages sent on a stream. MID next_unordered_mid_; MID next_ordered_mid_; diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index a93c4a3d1c..fbbce58de1 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc @@ -26,6 +26,7 @@ namespace dcsctp { namespace { using ::testing::SizeIs; +using ::testing::UnorderedElementsAre; constexpr TimeMs kNow = TimeMs(0); constexpr StreamID kStreamID(1); @@ -252,10 +253,13 @@ TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); EXPECT_EQ(buf_.total_buffered_amount(), 8u); - buf_.PrepareResetStreams(std::vector({StreamID(1)})); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), 5u); + + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); buf_.CommitResetStreams(); - buf_.PrepareResetStreams(std::vector({StreamID(2)})); + buf_.PrepareResetStream(StreamID(2)); EXPECT_EQ(buf_.total_buffered_amount(), 0u); } @@ -270,21 +274,27 @@ TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50); - StreamID stream_ids[] = {StreamID(1)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50); } TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { std::vector payload(50); - buf_.PrepareResetStreams(std::vector({StreamID(1)})); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), 0u); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); + + EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); + buf_.CommitResetStreams(); EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); @@ -309,8 +319,7 @@ TEST_F(RRSendQueueTest, PausedStreamsStillSendPartialMessagesUntilEnd) { EXPECT_EQ(buf_.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize); // This will stop the second message from being sent. - StreamID stream_ids[] = {StreamID(1)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(1)); EXPECT_EQ(buf_.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize); // Should still produce fragments until end of message. @@ -340,13 +349,14 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) { ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); - StreamID stream_ids[] = {StreamID(1)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(1)); // Buffered buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_TRUE(buf_.CanResetStreams()); + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); buf_.CommitResetStreams(); absl::optional chunk_three = @@ -373,14 +383,16 @@ TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) { EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ssn, SSN(0)); - StreamID stream_ids[] = {StreamID(3)}; - buf_.PrepareResetStreams(stream_ids); + buf_.PrepareResetStream(StreamID(3)); // Send two more messages - SID 3 will buffer, SID 1 will send. buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload)); buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload)); - EXPECT_TRUE(buf_.CanResetStreams()); + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(3))); + buf_.CommitResetStreams(); absl::optional chunk_three = @@ -412,12 +424,14 @@ TEST_F(RRSendQueueTest, RollBackResumesSSN) { ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.ssn, SSN(1)); - buf_.PrepareResetStreams(std::vector({StreamID(1)})); + buf_.PrepareResetStream(StreamID(1)); // Buffered buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_TRUE(buf_.CanResetStreams()); + EXPECT_TRUE(buf_.HasStreamsReadyToBeReset()); + EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), + UnorderedElementsAre(StreamID(1))); buf_.RollbackResetStreams(); absl::optional chunk_three = diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h index a821d20785..b2e5a9d436 100644 --- a/net/dcsctp/tx/send_queue.h +++ b/net/dcsctp/tx/send_queue.h @@ -67,11 +67,11 @@ class SendQueue { StreamID stream_id, MID message_id) = 0; - // Prepares the streams to be reset. This is used to close a WebRTC data + // Prepares the stream to be reset. This is used to close a WebRTC data // channel and will be signaled to the other side. // // Concretely, it discards all whole (not partly sent) messages in the given - // streams and pauses those streams so that future added messages aren't + // stream and pauses that stream so that future added messages aren't // produced until `ResumeStreams` is called. // // TODO(boivie): Investigate if it really should discard any message at all. @@ -82,24 +82,28 @@ class SendQueue { // reset, and paused while they are resetting. This is the first part of the // two-phase commit protocol to reset streams, where the caller completes the // procedure by either calling `CommitResetStreams` or `RollbackResetStreams`. - virtual void PrepareResetStreams(rtc::ArrayView streams) = 0; + virtual void PrepareResetStream(StreamID stream_id) = 0; - // Returns true if all non-discarded messages during `PrepareResetStreams` - // (which are those that was partially sent before that method was called) - // have been sent. - virtual bool CanResetStreams() const = 0; + // Indicates if there are any streams that are ready to be reset. + virtual bool HasStreamsReadyToBeReset() const = 0; - // Called to commit to reset the streams provided to `PrepareResetStreams`. - // It will reset the stream sequence numbers (SSNs) and message identifiers - // (MIDs) and resume the paused streams. + // Returns a list of streams that are ready to be included in an outgoing + // stream reset request. Any streams that are returned here must be included + // in an outgoing stream reset request, and there must not be concurrent + // requests. Before calling this method again, you must have called + virtual std::vector GetStreamsReadyToBeReset() = 0; + + // Called to commit to reset the streams returned by + // `GetStreamsReadyToBeReset`. It will reset the stream sequence numbers + // (SSNs) and message identifiers (MIDs) and resume the paused streams. virtual void CommitResetStreams() = 0; - // Called to abort the resetting of streams provided to `PrepareResetStreams`. - // Will resume the paused streams without resetting the stream sequence - // numbers (SSNs) or message identifiers (MIDs). Note that the non-partial - // messages that were discarded when calling `PrepareResetStreams` will not be - // recovered, to better match the intention from the sender to "close the - // channel". + // Called to abort the resetting of streams returned by + // `GetStreamsReadyToBeReset`. Will resume the paused streams without + // resetting the stream sequence numbers (SSNs) or message identifiers (MIDs). + // Note that the non-partial messages that were discarded when calling + // `PrepareResetStreams` will not be recovered, to better match the intention + // from the sender to "close the channel". virtual void RollbackResetStreams() = 0; // Resets all message identifier counters (MID, SSN) and makes all partially