diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 78c88c36b9..d77e591c7b 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -1086,9 +1086,9 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) { } if (tcb_->data_tracker().Observe(tsn, immediate_ack)) { + tcb_->reassembly_queue().Add(tsn, std::move(data)); tcb_->reassembly_queue().MaybeResetStreamsDeferred( tcb_->data_tracker().last_cumulative_acked_tsn()); - tcb_->reassembly_queue().Add(tsn, std::move(data)); DeliverReassembledMessages(); } } diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index ab854da290..46ac4bd183 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -268,6 +268,45 @@ MATCHER_P(HasReconfigWithStreams, streams_matcher, "") { return true; } +MATCHER_P(HasReconfigWithResponse, result, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != ReConfigChunk::kType) { + *result_listener << "the first chunk in the packet is not a reconfig chunk"; + return false; + } + + absl::optional reconfig = + ReConfigChunk::Parse(packet->descriptors()[0].data); + if (!reconfig.has_value()) { + *result_listener << "The first chunk didn't parse as a reconfig chunk"; + return false; + } + + const Parameters& parameters = reconfig->parameters(); + if (parameters.descriptors().size() != 1 || + parameters.descriptors()[0].type != + ReconfigurationResponseParameter::kType) { + *result_listener << "Expected the reconfig chunk to have a " + "ReconfigurationResponse Parameter"; + return false; + } + + absl::optional p = + ReconfigurationResponseParameter::Parse(parameters.descriptors()[0].data); + if (p->result() != result) { + *result_listener << "ReconfigurationResponse Parameter doesn't contain the " + "expected result"; + return false; + } + + return true; +} + TSN AddTo(TSN tsn, int delta) { return TSN(*tsn + delta); } @@ -2692,5 +2731,89 @@ TEST_P(DcSctpSocketParametrizedTest, ExposesTheNumberOfNegotiatedStreams) { EXPECT_EQ(metrics_z.negotiated_maximum_incoming_streams, 23); EXPECT_EQ(metrics_z.negotiated_maximum_outgoing_streams, 12); } + +TEST(DcSctpSocketTest, ResetStreamsDeferred) { + // Guaranteed to be fragmented into two fragments. + constexpr size_t kTwoFragmentsSize = DcSctpOptions::kMaxSafeMTUSize + 100; + + SocketUnderTest a("A"); + SocketUnderTest z("Z"); + + ConnectSockets(a, z); + + a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), + std::vector(kTwoFragmentsSize)), + {}); + a.socket.Send(DcSctpMessage(StreamID(1), PPID(54), + std::vector(kSmallMessageSize)), + {}); + + a.socket.ResetStreams(std::vector({StreamID(1)})); + + auto data1 = a.cb.ConsumeSentPacket(); + auto data2 = a.cb.ConsumeSentPacket(); + auto data3 = a.cb.ConsumeSentPacket(); + auto reconfig = a.cb.ConsumeSentPacket(); + + EXPECT_THAT(data1, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data2, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data3, HasDataChunkWithSsn(SSN(1))); + EXPECT_THAT(reconfig, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + + // Receive them slightly out of order to make stream resetting deferred. + z.socket.ReceivePacket(reconfig); + + z.socket.ReceivePacket(data1); + z.socket.ReceivePacket(data2); + z.socket.ReceivePacket(data3); + + absl::optional msg1 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg1.has_value()); + EXPECT_EQ(msg1->stream_id(), StreamID(1)); + EXPECT_EQ(msg1->ppid(), PPID(53)); + EXPECT_EQ(msg1->payload().size(), kTwoFragmentsSize); + + absl::optional msg2 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg2.has_value()); + EXPECT_EQ(msg2->stream_id(), StreamID(1)); + EXPECT_EQ(msg2->ppid(), PPID(54)); + EXPECT_EQ(msg2->payload().size(), kSmallMessageSize); + + EXPECT_CALL(a.cb, OnStreamsResetPerformed(ElementsAre(StreamID(1)))); + ExchangeMessages(a, z); + + // Z sent "in progress", which will make A buffer packets until it's sure + // that the reconfiguration has been applied. A will retry - wait for that. + AdvanceTime(a, z, a.options.rto_initial); + + auto reconfig2 = a.cb.ConsumeSentPacket(); + EXPECT_THAT(reconfig2, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + EXPECT_CALL(z.cb, OnIncomingStreamsReset(ElementsAre(StreamID(1)))); + z.socket.ReceivePacket(reconfig2); + + auto reconfig3 = z.cb.ConsumeSentPacket(); + EXPECT_THAT(reconfig3, + HasReconfigWithResponse( + ReconfigurationResponseParameter::Result::kSuccessPerformed)); + a.socket.ReceivePacket(reconfig3); + + EXPECT_THAT(data1, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data2, HasDataChunkWithSsn(SSN(0))); + EXPECT_THAT(data3, HasDataChunkWithSsn(SSN(1))); + EXPECT_THAT(reconfig, HasReconfigWithStreams(ElementsAre(StreamID(1)))); + + // Send a new message after the stream has been reset. + a.socket.Send(DcSctpMessage(StreamID(1), PPID(55), + std::vector(kSmallMessageSize)), + {}); + ExchangeMessages(a, z); + + absl::optional msg3 = z.cb.ConsumeReceivedMessage(); + ASSERT_TRUE(msg3.has_value()); + EXPECT_EQ(msg3->stream_id(), StreamID(1)); + EXPECT_EQ(msg3->ppid(), PPID(55)); + EXPECT_EQ(msg3->payload().size(), kSmallMessageSize); +} + } // namespace } // namespace dcsctp