From 504198a50ef116cc877e1fcac5f02481cdee9718 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 1 Sep 2022 08:46:32 +0000 Subject: [PATCH] dcsctp: Apply chunk before apply deferred reset When a RECONFIG has been received with a last assigned TSN that is not yet seen by the receiver, it will enter deferred reset mode (https://www.rfc-editor.org/rfc/rfc6525#section-5.2.2, E2). When more DATA is received, moving the cumulative acknowledgment point, the request will finally be processed. But the last chunk that has the same TSN as the last assigned TSN was before this CL not applied before doing the reset - it was applied after. This would result of a message getting lost or possibly getting truncated or incorrectly merged with another. Handling the message before resetting the stream is the simple solution here. Bug: webrtc:14277 Change-Id: Iea9fa227778077a9ff2f78bc77b5d93cc32b702b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/273323 Reviewed-by: Florent Castelli Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/main@{#37993} --- net/dcsctp/socket/dcsctp_socket.cc | 2 +- net/dcsctp/socket/dcsctp_socket_test.cc | 123 ++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 1 deletion(-) diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 78c88c36b9..d77e591c7b 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -1086,9 +1086,9 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) { } if (tcb_->data_tracker().Observe(tsn, immediate_ack)) { + tcb_->reassembly_queue().Add(tsn, std::move(data)); tcb_->reassembly_queue().MaybeResetStreamsDeferred( tcb_->data_tracker().last_cumulative_acked_tsn()); - tcb_->reassembly_queue().Add(tsn, std::move(data)); DeliverReassembledMessages(); } } diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index ab854da290..46ac4bd183 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -268,6 +268,45 @@ MATCHER_P(HasReconfigWithStreams, streams_matcher, "") { return true; } +MATCHER_P(HasReconfigWithResponse, result, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != ReConfigChunk::kType) { + *result_listener << "the first chunk in the packet is not a reconfig chunk"; + return false; + } + + absl::optional reconfig = + ReConfigChunk::Parse(packet->descriptors()[0].data); + if (!reconfig.has_value()) { + *result_listener << "The first chunk didn't parse as a reconfig chunk"; + return false; + } + + const Parameters& parameters = reconfig->parameters(); + if (parameters.descriptors().size() != 1 || + parameters.descriptors()[0].type != + ReconfigurationResponseParameter::kType) { + *result_listener << "Expected the reconfig chunk to have a " + "ReconfigurationResponse Parameter"; + return false; + } + + absl::optional p = + ReconfigurationResponseParameter::Parse(parameters.descriptors()[0].data); + if (p->result() != result) { + *result_listener << "ReconfigurationResponse Parameter doesn't contain the " + "expected result"; + return false; + } + + return true; +} + TSN AddTo(TSN tsn, int delta) { return TSN(*tsn + delta); } @@ -2692,5 +2731,89 @@ TEST_P(DcSctpSocketParametrizedTest, ExposesTheNumberOfNegotiatedStreams) { EXPECT_EQ(metrics_z.negotiated_maximum_incoming_streams, 23); EXPECT_EQ(metrics_z.negotiated_maximum_outgoing_streams, 12); } + +TEST(DcSctpSocketTest, ResetStreamsDeferred) { + // Guaranteed to be fragmented into two fragments. + constexpr size_t kTwoFragmentsSize = DcSctpOptions::kMaxSafeMTUSize + 100; + + SocketUnderTest a("A"); + SocketUnderTest z("Z"); + + ConnectSockets(a, z); + + a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), + std::vector(kTwoFragmentsSize)), + {}); + a.socket.Send(DcSctpMessage(StreamID(1), PPID(54), + std::vector(kSmallMessageSize)), + {}); + + a.socket.ResetStreams(std::vector({StreamID(1)})); + + auto data1 = a.cb.ConsumeSentPacket(); + auto data2 = a.cb.ConsumeSentPacket(); + auto data3 = a.cb.ConsumeSentPacket(); + auto reconfig = a.cb.ConsumeSentPacket(); + + EXPECT_THAT(data1, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data2, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data3, HasDataChunkWithSsn(SSN(1))); + EXPECT_THAT(reconfig, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + + // Receive them slightly out of order to make stream resetting deferred. + z.socket.ReceivePacket(reconfig); + + z.socket.ReceivePacket(data1); + z.socket.ReceivePacket(data2); + z.socket.ReceivePacket(data3); + + absl::optional msg1 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg1.has_value()); + EXPECT_EQ(msg1->stream_id(), StreamID(1)); + EXPECT_EQ(msg1->ppid(), PPID(53)); + EXPECT_EQ(msg1->payload().size(), kTwoFragmentsSize); + + absl::optional msg2 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg2.has_value()); + EXPECT_EQ(msg2->stream_id(), StreamID(1)); + EXPECT_EQ(msg2->ppid(), PPID(54)); + EXPECT_EQ(msg2->payload().size(), kSmallMessageSize); + + EXPECT_CALL(a.cb, OnStreamsResetPerformed(ElementsAre(StreamID(1)))); + ExchangeMessages(a, z); + + // Z sent "in progress", which will make A buffer packets until it's sure + // that the reconfiguration has been applied. A will retry - wait for that. + AdvanceTime(a, z, a.options.rto_initial); + + auto reconfig2 = a.cb.ConsumeSentPacket(); + EXPECT_THAT(reconfig2, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + EXPECT_CALL(z.cb, OnIncomingStreamsReset(ElementsAre(StreamID(1)))); + z.socket.ReceivePacket(reconfig2); + + auto reconfig3 = z.cb.ConsumeSentPacket(); + EXPECT_THAT(reconfig3, + HasReconfigWithResponse( + ReconfigurationResponseParameter::Result::kSuccessPerformed)); + a.socket.ReceivePacket(reconfig3); + + EXPECT_THAT(data1, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data2, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data3, HasDataChunkWithSsn(SSN(1))); + EXPECT_THAT(reconfig, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + + // Send a new message after the stream has been reset. + a.socket.Send(DcSctpMessage(StreamID(1), PPID(55), + std::vector(kSmallMessageSize)), + {}); + ExchangeMessages(a, z); + + absl::optional msg3 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg3.has_value()); + EXPECT_EQ(msg3->stream_id(), StreamID(1)); + EXPECT_EQ(msg3->ppid(), PPID(55)); + EXPECT_EQ(msg3->payload().size(), kSmallMessageSize); +} + } // namespace } // namespace dcsctp