dcsctp: Apply chunk before apply deferred reset
When a RECONFIG has been received with a last assigned TSN that is not yet seen by the receiver, it will enter deferred reset mode (https://www.rfc-editor.org/rfc/rfc6525#section-5.2.2, E2). When more DATA is received, moving the cumulative acknowledgment point, the request will finally be processed. But the last chunk that has the same TSN as the last assigned TSN was before this CL not applied before doing the reset - it was applied after. This would result of a message getting lost or possibly getting truncated or incorrectly merged with another. Handling the message before resetting the stream is the simple solution here. Bug: webrtc:14277 Change-Id: Iea9fa227778077a9ff2f78bc77b5d93cc32b702b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/273323 Reviewed-by: Florent Castelli <orphis@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37993}
This commit is contained in:
parent
8dfc90f947
commit
504198a50e
@ -1086,9 +1086,9 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) {
|
||||
}
|
||||
|
||||
if (tcb_->data_tracker().Observe(tsn, immediate_ack)) {
|
||||
tcb_->reassembly_queue().Add(tsn, std::move(data));
|
||||
tcb_->reassembly_queue().MaybeResetStreamsDeferred(
|
||||
tcb_->data_tracker().last_cumulative_acked_tsn());
|
||||
tcb_->reassembly_queue().Add(tsn, std::move(data));
|
||||
DeliverReassembledMessages();
|
||||
}
|
||||
}
|
||||
|
||||
@ -268,6 +268,45 @@ MATCHER_P(HasReconfigWithStreams, streams_matcher, "") {
|
||||
return true;
|
||||
}
|
||||
|
||||
MATCHER_P(HasReconfigWithResponse, result, "") {
|
||||
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
|
||||
if (!packet.has_value()) {
|
||||
*result_listener << "data didn't parse as an SctpPacket";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (packet->descriptors()[0].type != ReConfigChunk::kType) {
|
||||
*result_listener << "the first chunk in the packet is not a reconfig chunk";
|
||||
return false;
|
||||
}
|
||||
|
||||
absl::optional<ReConfigChunk> reconfig =
|
||||
ReConfigChunk::Parse(packet->descriptors()[0].data);
|
||||
if (!reconfig.has_value()) {
|
||||
*result_listener << "The first chunk didn't parse as a reconfig chunk";
|
||||
return false;
|
||||
}
|
||||
|
||||
const Parameters& parameters = reconfig->parameters();
|
||||
if (parameters.descriptors().size() != 1 ||
|
||||
parameters.descriptors()[0].type !=
|
||||
ReconfigurationResponseParameter::kType) {
|
||||
*result_listener << "Expected the reconfig chunk to have a "
|
||||
"ReconfigurationResponse Parameter";
|
||||
return false;
|
||||
}
|
||||
|
||||
absl::optional<ReconfigurationResponseParameter> p =
|
||||
ReconfigurationResponseParameter::Parse(parameters.descriptors()[0].data);
|
||||
if (p->result() != result) {
|
||||
*result_listener << "ReconfigurationResponse Parameter doesn't contain the "
|
||||
"expected result";
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
TSN AddTo(TSN tsn, int delta) {
|
||||
return TSN(*tsn + delta);
|
||||
}
|
||||
@ -2692,5 +2731,89 @@ TEST_P(DcSctpSocketParametrizedTest, ExposesTheNumberOfNegotiatedStreams) {
|
||||
EXPECT_EQ(metrics_z.negotiated_maximum_incoming_streams, 23);
|
||||
EXPECT_EQ(metrics_z.negotiated_maximum_outgoing_streams, 12);
|
||||
}
|
||||
|
||||
TEST(DcSctpSocketTest, ResetStreamsDeferred) {
|
||||
// Guaranteed to be fragmented into two fragments.
|
||||
constexpr size_t kTwoFragmentsSize = DcSctpOptions::kMaxSafeMTUSize + 100;
|
||||
|
||||
SocketUnderTest a("A");
|
||||
SocketUnderTest z("Z");
|
||||
|
||||
ConnectSockets(a, z);
|
||||
|
||||
a.socket.Send(DcSctpMessage(StreamID(1), PPID(53),
|
||||
std::vector<uint8_t>(kTwoFragmentsSize)),
|
||||
{});
|
||||
a.socket.Send(DcSctpMessage(StreamID(1), PPID(54),
|
||||
std::vector<uint8_t>(kSmallMessageSize)),
|
||||
{});
|
||||
|
||||
a.socket.ResetStreams(std::vector<StreamID>({StreamID(1)}));
|
||||
|
||||
auto data1 = a.cb.ConsumeSentPacket();
|
||||
auto data2 = a.cb.ConsumeSentPacket();
|
||||
auto data3 = a.cb.ConsumeSentPacket();
|
||||
auto reconfig = a.cb.ConsumeSentPacket();
|
||||
|
||||
EXPECT_THAT(data1, HasDataChunkWithSsn(SSN(0)));
|
||||
EXPECT_THAT(data2, HasDataChunkWithSsn(SSN(0)));
|
||||
EXPECT_THAT(data3, HasDataChunkWithSsn(SSN(1)));
|
||||
EXPECT_THAT(reconfig, HasReconfigWithStreams(ElementsAre(StreamID(1))));
|
||||
|
||||
// Receive them slightly out of order to make stream resetting deferred.
|
||||
z.socket.ReceivePacket(reconfig);
|
||||
|
||||
z.socket.ReceivePacket(data1);
|
||||
z.socket.ReceivePacket(data2);
|
||||
z.socket.ReceivePacket(data3);
|
||||
|
||||
absl::optional<DcSctpMessage> msg1 = z.cb.ConsumeReceivedMessage();
|
||||
ASSERT_TRUE(msg1.has_value());
|
||||
EXPECT_EQ(msg1->stream_id(), StreamID(1));
|
||||
EXPECT_EQ(msg1->ppid(), PPID(53));
|
||||
EXPECT_EQ(msg1->payload().size(), kTwoFragmentsSize);
|
||||
|
||||
absl::optional<DcSctpMessage> msg2 = z.cb.ConsumeReceivedMessage();
|
||||
ASSERT_TRUE(msg2.has_value());
|
||||
EXPECT_EQ(msg2->stream_id(), StreamID(1));
|
||||
EXPECT_EQ(msg2->ppid(), PPID(54));
|
||||
EXPECT_EQ(msg2->payload().size(), kSmallMessageSize);
|
||||
|
||||
EXPECT_CALL(a.cb, OnStreamsResetPerformed(ElementsAre(StreamID(1))));
|
||||
ExchangeMessages(a, z);
|
||||
|
||||
// Z sent "in progress", which will make A buffer packets until it's sure
|
||||
// that the reconfiguration has been applied. A will retry - wait for that.
|
||||
AdvanceTime(a, z, a.options.rto_initial);
|
||||
|
||||
auto reconfig2 = a.cb.ConsumeSentPacket();
|
||||
EXPECT_THAT(reconfig2, HasReconfigWithStreams(ElementsAre(StreamID(1))));
|
||||
EXPECT_CALL(z.cb, OnIncomingStreamsReset(ElementsAre(StreamID(1))));
|
||||
z.socket.ReceivePacket(reconfig2);
|
||||
|
||||
auto reconfig3 = z.cb.ConsumeSentPacket();
|
||||
EXPECT_THAT(reconfig3,
|
||||
HasReconfigWithResponse(
|
||||
ReconfigurationResponseParameter::Result::kSuccessPerformed));
|
||||
a.socket.ReceivePacket(reconfig3);
|
||||
|
||||
EXPECT_THAT(data1, HasDataChunkWithSsn(SSN(0)));
|
||||
EXPECT_THAT(data2, HasDataChunkWithSsn(SSN(0)));
|
||||
EXPECT_THAT(data3, HasDataChunkWithSsn(SSN(1)));
|
||||
EXPECT_THAT(reconfig, HasReconfigWithStreams(ElementsAre(StreamID(1))));
|
||||
|
||||
// Send a new message after the stream has been reset.
|
||||
a.socket.Send(DcSctpMessage(StreamID(1), PPID(55),
|
||||
std::vector<uint8_t>(kSmallMessageSize)),
|
||||
{});
|
||||
ExchangeMessages(a, z);
|
||||
|
||||
absl::optional<DcSctpMessage> msg3 = z.cb.ConsumeReceivedMessage();
|
||||
ASSERT_TRUE(msg3.has_value());
|
||||
EXPECT_EQ(msg3->stream_id(), StreamID(1));
|
||||
EXPECT_EQ(msg3->ppid(), PPID(55));
|
||||
EXPECT_EQ(msg3->payload().size(), kSmallMessageSize);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace dcsctp
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user