dcsctp: Only reset paused streams when peer acks

When a single stream is reset, and an outgoing SSN reset request is sent
and later acked by the peer sending a reconfiguration response with
status=Performed, the sender should unpause the paused stream and reset
the SSNs of that (ordered) stream. But only the single stream that was
paused, and not all streams. In this scenario, dcSCTP would - when the
peer acked the SSN reset request - reset the SSN of all streams.

This was found by orphis@webrtc.org using a data channel test
application. The peer, if it's a usrsctp client, will ABORT with
PROTOCOL_VIOLATION as it has already seen that SSN on that stream but
with a different TSN.

This bug was introduced when implementing the Round Robin scheduler in
https://webrtc-review.googlesource.com/c/src/+/219682. The FCFS
scheduler prior to this change was implemented correctly.

Bug: webrtc:12952
Change-Id: I3ea144a1df303145f69a5b03aada7f448c8c8163
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225266
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34436}
This commit is contained in:
Victor Boivie 2021-07-07 19:38:43 +02:00 committed by WebRTC LUCI CQ
parent 706ef1b913
commit 8bd26e12ed
3 changed files with 151 additions and 1 deletions

View File

@ -60,6 +60,33 @@ constexpr SendOptions kSendOptions;
constexpr size_t kLargeMessageSize = DcSctpOptions::kMaxSafeMTUSize * 20;
static constexpr size_t kSmallMessageSize = 10;
MATCHER_P(HasDataChunkWithStreamId, stream_id, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
if (!packet.has_value()) {
*result_listener << "data didn't parse as an SctpPacket";
return false;
}
if (packet->descriptors()[0].type != DataChunk::kType) {
*result_listener << "the first chunk in the packet is not a data chunk";
return false;
}
absl::optional<DataChunk> dc =
DataChunk::Parse(packet->descriptors()[0].data);
if (!dc.has_value()) {
*result_listener << "The first chunk didn't parse as a data chunk";
return false;
}
if (dc->stream_id() != stream_id) {
*result_listener << "the stream_id is " << *dc->stream_id();
return false;
}
return true;
}
MATCHER_P(HasDataChunkWithSsn, ssn, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
if (!packet.has_value()) {
@ -888,6 +915,84 @@ TEST_F(DcSctpSocketTest, ResetStreamWillMakeChunksStartAtZeroSsn) {
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
}
TEST_F(DcSctpSocketTest, ResetStreamWillOnlyResetTheRequestedStreams) {
ConnectSockets();
std::vector<uint8_t> payload(options_.mtu - 100);
// Send two ordered messages on SID 1
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});
auto packet1 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet1, HasDataChunkWithStreamId(StreamID(1)));
EXPECT_THAT(packet1, HasDataChunkWithSsn(SSN(0)));
sock_z_.ReceivePacket(packet1);
auto packet2 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet1, HasDataChunkWithStreamId(StreamID(1)));
EXPECT_THAT(packet2, HasDataChunkWithSsn(SSN(1)));
sock_z_.ReceivePacket(packet2);
// Handle SACK
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
// Do the same, for SID 3
sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
auto packet3 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet3, HasDataChunkWithStreamId(StreamID(3)));
EXPECT_THAT(packet3, HasDataChunkWithSsn(SSN(0)));
sock_z_.ReceivePacket(packet3);
auto packet4 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet4, HasDataChunkWithStreamId(StreamID(3)));
EXPECT_THAT(packet4, HasDataChunkWithSsn(SSN(1)));
sock_z_.ReceivePacket(packet4);
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
// Receive all messages.
absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg1.has_value());
EXPECT_EQ(msg1->stream_id(), StreamID(1));
absl::optional<DcSctpMessage> msg2 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg2.has_value());
EXPECT_EQ(msg2->stream_id(), StreamID(1));
absl::optional<DcSctpMessage> msg3 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg3.has_value());
EXPECT_EQ(msg3->stream_id(), StreamID(3));
absl::optional<DcSctpMessage> msg4 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg4.has_value());
EXPECT_EQ(msg4->stream_id(), StreamID(3));
// Reset SID 1. This will directly send a RE-CONFIG.
sock_a_.ResetStreams(std::vector<StreamID>({StreamID(3)}));
// RE-CONFIG, req
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
// RE-CONFIG, resp
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
// Send a message on SID 1 and 3 - SID 1 should not be reset, but 3 should.
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});
sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
auto packet5 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet5, HasDataChunkWithStreamId(StreamID(1)));
EXPECT_THAT(packet5, HasDataChunkWithSsn(SSN(2))); // Unchanged.
sock_z_.ReceivePacket(packet5);
auto packet6 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet6, HasDataChunkWithStreamId(StreamID(3)));
EXPECT_THAT(packet6, HasDataChunkWithSsn(SSN(0))); // Reset.
sock_z_.ReceivePacket(packet6);
// Handle SACK
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
}
TEST_F(DcSctpSocketTest, OnePeerReconnects) {
ConnectSockets();

View File

@ -373,7 +373,11 @@ bool RRSendQueue::CanResetStreams() const {
}
void RRSendQueue::CommitResetStreams() {
Reset();
for (auto& stream_entry : streams_) {
if (stream_entry.second.is_paused()) {
stream_entry.second.Reset();
}
}
RTC_DCHECK(IsConsistent());
}

View File

@ -354,6 +354,47 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) {
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
}
TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) {
std::vector<uint8_t> payload(50);
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
absl::optional<SendQueue::DataToSend> chunk_one =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, StreamID(1));
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
absl::optional<SendQueue::DataToSend> chunk_two =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_two->data.ssn, SSN(0));
StreamID stream_ids[] = {StreamID(3)};
buf_.PrepareResetStreams(stream_ids);
// Send two more messages - SID 3 will buffer, SID 1 will send.
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
EXPECT_TRUE(buf_.CanResetStreams());
buf_.CommitResetStreams();
absl::optional<SendQueue::DataToSend> chunk_three =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_three.has_value());
EXPECT_EQ(chunk_three->data.stream_id, StreamID(1));
EXPECT_EQ(chunk_three->data.ssn, SSN(1));
absl::optional<SendQueue::DataToSend> chunk_four =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_four.has_value());
EXPECT_EQ(chunk_four->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_four->data.ssn, SSN(0));
}
TEST_F(RRSendQueueTest, RollBackResumesSSN) {
std::vector<uint8_t> payload(50);