From abf6188cbaaa1a468ad9b3bb08179c7688a6a771 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 12 Aug 2021 15:57:49 +0200 Subject: [PATCH] dcsctp: Add PacketSender This is mainly a refactoring commit, to break out packet sending to a dedicated component. Bug: webrtc:12943 Change-Id: I78f18933776518caf49737d3952bda97f19ef335 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228565 Reviewed-by: Florent Castelli Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/master@{#34772} --- net/dcsctp/socket/BUILD.gn | 18 +++ net/dcsctp/socket/dcsctp_socket.cc | 105 +++++++++--------- net/dcsctp/socket/dcsctp_socket.h | 8 +- net/dcsctp/socket/packet_sender.cc | 48 ++++++++ net/dcsctp/socket/packet_sender.h | 40 +++++++ net/dcsctp/socket/packet_sender_test.cc | 50 +++++++++ .../socket/transmission_control_block.cc | 4 +- .../socket/transmission_control_block.h | 13 ++- 8 files changed, 225 insertions(+), 61 deletions(-) create mode 100644 net/dcsctp/socket/packet_sender.cc create mode 100644 net/dcsctp/socket/packet_sender.h create mode 100644 net/dcsctp/socket/packet_sender_test.cc diff --git a/net/dcsctp/socket/BUILD.gn b/net/dcsctp/socket/BUILD.gn index f24e60b3c9..805c6779b9 100644 --- a/net/dcsctp/socket/BUILD.gn +++ b/net/dcsctp/socket/BUILD.gn @@ -76,10 +76,25 @@ rtc_library("stream_reset_handler") { ] } +rtc_library("packet_sender") { + deps = [ + "../packet:sctp_packet", + "../public:socket", + "../public:types", + "../timer", + ] + sources = [ + "packet_sender.cc", + "packet_sender.h", + ] + absl_deps = [] +} + rtc_library("transmission_control_block") { deps = [ ":context", ":heartbeat_handler", + ":packet_sender", ":stream_reset_handler", "../../../api:array_view", "../../../rtc_base", @@ -114,6 +129,7 @@ rtc_library("dcsctp_socket") { deps = [ ":context", ":heartbeat_handler", + ":packet_sender", ":stream_reset_handler", ":transmission_control_block", "../../../api:array_view", @@ -201,6 +217,7 @@ if (rtc_include_tests) { ":heartbeat_handler", ":mock_callbacks", ":mock_context", + ":packet_sender", ":stream_reset_handler", "../../../api:array_view", "../../../rtc_base:checks", @@ -233,6 +250,7 @@ if (rtc_include_tests) { sources = [ "dcsctp_socket_test.cc", "heartbeat_handler_test.cc", + "packet_sender_test.cc", "state_cookie_test.cc", "stream_reset_handler_test.cc", ] diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index bb84d94e8b..a39ec5ce88 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -168,6 +168,8 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, TimerOptions(options.t2_shutdown_timeout, TimerBackoffAlgorithm::kExponential, options.max_retransmissions))), + packet_sender_(callbacks_, + absl::bind_front(&DcSctpSocket::OnSentPacket, this)), send_queue_( log_prefix_, options_.max_send_buffer_size, @@ -251,7 +253,7 @@ void DcSctpSocket::SendInit() { connect_params_.initial_tsn, params_builder.Build()); SctpPacket::Builder b(VerificationTag(0), options_); b.Add(init); - SendPacket(b); + packet_sender_.Send(b); } void DcSctpSocket::MakeConnectionParameters() { @@ -316,7 +318,7 @@ void DcSctpSocket::Close() { Parameters::Builder() .Add(UserInitiatedAbortCause("Close called")) .Build())); - SendPacket(b); + packet_sender_.Send(b); } InternalClose(ErrorKind::kNoError, ""); } else { @@ -327,7 +329,7 @@ void DcSctpSocket::Close() { } void DcSctpSocket::CloseConnectionBecauseOfTooManyTransmissionErrors() { - SendPacket(tcb_->PacketBuilder().Add(AbortChunk( + packet_sender_.Send(tcb_->PacketBuilder().Add(AbortChunk( true, Parameters::Builder() .Add(UserInitiatedAbortCause("Too many retransmissions")) .Build()))); @@ -412,7 +414,7 @@ ResetStreamsStatus DcSctpSocket::ResetStreams( if (reconfig.has_value()) { SctpPacket::Builder builder = tcb_->PacketBuilder(); builder.Add(*reconfig); - SendPacket(builder); + packet_sender_.Send(builder); } RTC_DCHECK(IsConsistent()); @@ -751,7 +753,7 @@ bool DcSctpSocket::HandleUnrecognizedChunk( // cause." if (tcb_ != nullptr) { // Need TCB - this chunk must be sent with a correct verification tag. - SendPacket(tcb_->PacketBuilder().Add( + packet_sender_.Send(tcb_->PacketBuilder().Add( ErrorChunk(Parameters::Builder() .Add(UnrecognizedChunkTypeCause(std::vector( descriptor.data.begin(), descriptor.data.end()))) @@ -819,7 +821,7 @@ absl::optional DcSctpSocket::OnShutdownTimerExpiry() { // chunk to the protocol parameter 'Association.Max.Retrans'. If this // threshold is exceeded, the endpoint should destroy the TCB..." - SendPacket(tcb_->PacketBuilder().Add( + packet_sender_.Send(tcb_->PacketBuilder().Add( AbortChunk(true, Parameters::Builder() .Add(UserInitiatedAbortCause( "Too many retransmissions of SHUTDOWN")) @@ -838,28 +840,27 @@ absl::optional DcSctpSocket::OnShutdownTimerExpiry() { return tcb_->current_rto(); } -void DcSctpSocket::SendPacket(SctpPacket::Builder& builder) { - if (builder.empty()) { - return; - } - - std::vector payload = builder.Build(); - - if (RTC_DLOG_IS_ON) { - DebugPrintOutgoing(payload); - } - - // The heartbeat interval timer is restarted for every sent packet, to - // fire when the outgoing channel is inactive. - if (tcb_ != nullptr) { - tcb_->heartbeat_handler().RestartTimer(); - } - +void DcSctpSocket::OnSentPacket(rtc::ArrayView packet, + SendPacketStatus status) { + // The packet observer is invoked even if the packet was failed to be sent, to + // indicate an attempt was made. if (packet_observer_ != nullptr) { - packet_observer_->OnSentPacket(callbacks_.TimeMillis(), payload); + packet_observer_->OnSentPacket(callbacks_.TimeMillis(), packet); + } + + if (status == SendPacketStatus::kSuccess) { + if (RTC_DLOG_IS_ON) { + DebugPrintOutgoing(packet); + } + + // The heartbeat interval timer is restarted for every sent packet, to + // fire when the outgoing channel is inactive. + if (tcb_ != nullptr) { + tcb_->heartbeat_handler().RestartTimer(); + } + + ++metrics_.tx_packets_count; } - ++metrics_.tx_packets_count; - callbacks_.SendPacketWithStatus(payload); } bool DcSctpSocket::ValidateHasTCB() { @@ -902,7 +903,7 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) { if (data.payload.empty()) { // Empty DATA chunks are illegal. - SendPacket(tcb_->PacketBuilder().Add( + packet_sender_.Send(tcb_->PacketBuilder().Add( ErrorChunk(Parameters::Builder().Add(NoUserDataCause(tsn)).Build()))); callbacks_.OnError(ErrorKind::kProtocolViolation, "Received DATA chunk with no user data"); @@ -922,7 +923,7 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) { // specification only allows dropping gap-ack-blocks, and that's not // likely to help as the socket has been trying to fill gaps since the // watermark was reached. - SendPacket(tcb_->PacketBuilder().Add(AbortChunk( + packet_sender_.Send(tcb_->PacketBuilder().Add(AbortChunk( true, Parameters::Builder().Add(OutOfResourceErrorCause()).Build()))); InternalClose(ErrorKind::kResourceExhaustion, "Reassembly Queue is exhausted"); @@ -975,12 +976,13 @@ void DcSctpSocket::HandleInit(const CommonHeader& header, // "A receiver of an INIT with the MIS value of 0 SHOULD abort the // association." - SendPacket(SctpPacket::Builder(VerificationTag(0), options_) - .Add(AbortChunk( - /*filled_in_verification_tag=*/false, - Parameters::Builder() - .Add(ProtocolViolationCause("INIT malformed")) - .Build()))); + packet_sender_.Send( + SctpPacket::Builder(VerificationTag(0), options_) + .Add(AbortChunk( + /*filled_in_verification_tag=*/false, + Parameters::Builder() + .Add(ProtocolViolationCause("INIT malformed")) + .Build()))); InternalClose(ErrorKind::kProtocolViolation, "Received invalid INIT"); return; } @@ -1069,7 +1071,7 @@ void DcSctpSocket::HandleInit(const CommonHeader& header, options_.announced_maximum_incoming_streams, connect_params_.initial_tsn, params_builder.Build()); b.Add(init_ack); - SendPacket(b); + packet_sender_.Send(b); } void DcSctpSocket::HandleInitAck( @@ -1091,12 +1093,13 @@ void DcSctpSocket::HandleInitAck( auto cookie = chunk->parameters().get(); if (!cookie.has_value()) { - SendPacket(SctpPacket::Builder(connect_params_.verification_tag, options_) - .Add(AbortChunk( - /*filled_in_verification_tag=*/false, - Parameters::Builder() - .Add(ProtocolViolationCause("INIT-ACK malformed")) - .Build()))); + packet_sender_.Send( + SctpPacket::Builder(connect_params_.verification_tag, options_) + .Add(AbortChunk( + /*filled_in_verification_tag=*/false, + Parameters::Builder() + .Add(ProtocolViolationCause("INIT-ACK malformed")) + .Build()))); InternalClose(ErrorKind::kProtocolViolation, "InitAck chunk doesn't contain a cookie"); return; @@ -1108,9 +1111,8 @@ void DcSctpSocket::HandleInitAck( timer_manager_, log_prefix_, options_, capabilities, callbacks_, send_queue_, connect_params_.verification_tag, connect_params_.initial_tsn, chunk->initiate_tag(), chunk->initial_tsn(), - chunk->a_rwnd(), MakeTieTag(callbacks_), - [this]() { return state_ == State::kEstablished; }, - absl::bind_front(&DcSctpSocket::SendPacket, this)); + chunk->a_rwnd(), MakeTieTag(callbacks_), packet_sender_, + [this]() { return state_ == State::kEstablished; }); RTC_DLOG(LS_VERBOSE) << log_prefix() << "Created peer TCB: " << tcb_->ToString(); @@ -1171,8 +1173,7 @@ void DcSctpSocket::HandleCookieEcho( callbacks_, send_queue_, connect_params_.verification_tag, connect_params_.initial_tsn, cookie->initiate_tag(), cookie->initial_tsn(), cookie->a_rwnd(), MakeTieTag(callbacks_), - [this]() { return state_ == State::kEstablished; }, - absl::bind_front(&DcSctpSocket::SendPacket, this)); + packet_sender_, [this]() { return state_ == State::kEstablished; }); RTC_DLOG(LS_VERBOSE) << log_prefix() << "Created peer TCB: " << tcb_->ToString(); } @@ -1213,7 +1214,7 @@ bool DcSctpSocket::HandleCookieEchoWithTCB(const CommonHeader& header, b.Add(ErrorChunk(Parameters::Builder() .Add(CookieReceivedWhileShuttingDownCause()) .Build())); - SendPacket(b); + packet_sender_.Send(b); callbacks_.OnError(ErrorKind::kWrongSequence, "Received COOKIE-ECHO while shutting down"); return false; @@ -1445,7 +1446,7 @@ void DcSctpSocket::HandleShutdownAck( SctpPacket::Builder b = tcb_->PacketBuilder(); b.Add(ShutdownCompleteChunk(/*tag_reflected=*/false)); - SendPacket(b); + packet_sender_.Send(b); InternalClose(ErrorKind::kNoError, ""); } else { // https://tools.ietf.org/html/rfc4960#section-8.5.1 @@ -1464,7 +1465,7 @@ void DcSctpSocket::HandleShutdownAck( SctpPacket::Builder b(header.verification_tag, options_); b.Add(ShutdownCompleteChunk(/*tag_reflected=*/true)); - SendPacket(b); + packet_sender_.Send(b); } } @@ -1516,7 +1517,7 @@ void DcSctpSocket::HandleForwardTsnCommon(const AnyForwardTsnChunk& chunk) { "I-FORWARD-TSN received, but not indicated " "during connection establishment")) .Build())); - SendPacket(b); + packet_sender_.Send(b); callbacks_.OnError(ErrorKind::kProtocolViolation, "Received a FORWARD_TSN without announced peer support"); @@ -1564,11 +1565,11 @@ void DcSctpSocket::MaybeSendShutdownOrAck() { void DcSctpSocket::SendShutdown() { SctpPacket::Builder b = tcb_->PacketBuilder(); b.Add(ShutdownChunk(tcb_->data_tracker().last_cumulative_acked_tsn())); - SendPacket(b); + packet_sender_.Send(b); } void DcSctpSocket::SendShutdownAck() { - SendPacket(tcb_->PacketBuilder().Add(ShutdownAckChunk())); + packet_sender_.Send(tcb_->PacketBuilder().Add(ShutdownAckChunk())); t2_shutdown_->set_duration(tcb_->current_rto()); t2_shutdown_->Start(); } diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h index e0aae38e28..60359bd173 100644 --- a/net/dcsctp/socket/dcsctp_socket.h +++ b/net/dcsctp/socket/dcsctp_socket.h @@ -46,6 +46,7 @@ #include "net/dcsctp/rx/data_tracker.h" #include "net/dcsctp/rx/reassembly_queue.h" #include "net/dcsctp/socket/callback_deferrer.h" +#include "net/dcsctp/socket/packet_sender.h" #include "net/dcsctp/socket/state_cookie.h" #include "net/dcsctp/socket/transmission_control_block.h" #include "net/dcsctp/timer/timer.h" @@ -141,8 +142,8 @@ class DcSctpSocket : public DcSctpSocketInterface { absl::optional OnInitTimerExpiry(); absl::optional OnCookieTimerExpiry(); absl::optional OnShutdownTimerExpiry(); - // Builds the packet from `builder` and sends it (through callbacks). - void SendPacket(SctpPacket::Builder& builder); + void OnSentPacket(rtc::ArrayView packet, + SendPacketStatus status); // Sends SHUTDOWN or SHUTDOWN-ACK if the socket is shutting down and if all // outstanding data has been acknowledged. void MaybeSendShutdownOrAck(); @@ -258,6 +259,9 @@ class DcSctpSocket : public DcSctpSocketInterface { const std::unique_ptr t1_cookie_; const std::unique_ptr t2_shutdown_; + // Packets that failed to be sent, but should be retried. + PacketSender packet_sender_; + // The actual SendQueue implementation. As data can be sent on a socket before // the connection is established, this component is not in the TCB. RRSendQueue send_queue_; diff --git a/net/dcsctp/socket/packet_sender.cc b/net/dcsctp/socket/packet_sender.cc new file mode 100644 index 0000000000..85392e205d --- /dev/null +++ b/net/dcsctp/socket/packet_sender.cc @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/socket/packet_sender.h" + +#include +#include + +#include "net/dcsctp/public/types.h" + +namespace dcsctp { + +PacketSender::PacketSender(DcSctpSocketCallbacks& callbacks, + std::function, + SendPacketStatus)> on_sent_packet) + : callbacks_(callbacks), on_sent_packet_(std::move(on_sent_packet)) {} + +bool PacketSender::Send(SctpPacket::Builder& builder) { + if (builder.empty()) { + return false; + } + + std::vector payload = builder.Build(); + + SendPacketStatus status = callbacks_.SendPacketWithStatus(payload); + on_sent_packet_(payload, status); + switch (status) { + case SendPacketStatus::kSuccess: { + return true; + } + case SendPacketStatus::kTemporaryFailure: { + // TODO(boivie): Queue this packet to be retried to be sent later. + return false; + } + + case SendPacketStatus::kError: { + // Nothing that can be done. + return false; + } + } +} +} // namespace dcsctp diff --git a/net/dcsctp/socket/packet_sender.h b/net/dcsctp/socket/packet_sender.h new file mode 100644 index 0000000000..7af4d3c47b --- /dev/null +++ b/net/dcsctp/socket/packet_sender.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_SOCKET_PACKET_SENDER_H_ +#define NET_DCSCTP_SOCKET_PACKET_SENDER_H_ + +#include "net/dcsctp/packet/sctp_packet.h" +#include "net/dcsctp/public/dcsctp_socket.h" + +namespace dcsctp { + +// The PacketSender sends packets to the network using the provided callback +// interface. When an attempt to send a packet is made, the `on_sent_packet` +// callback will be triggered. +class PacketSender { + public: + PacketSender(DcSctpSocketCallbacks& callbacks, + std::function, + SendPacketStatus)> on_sent_packet); + + // Sends the packet, and returns true if it was sent successfully. + bool Send(SctpPacket::Builder& builder); + + private: + DcSctpSocketCallbacks& callbacks_; + + // Callback that will be triggered for every send attempt, indicating the + // status of the operation. + std::function, SendPacketStatus)> + on_sent_packet_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_SOCKET_PACKET_SENDER_H_ diff --git a/net/dcsctp/socket/packet_sender_test.cc b/net/dcsctp/socket/packet_sender_test.cc new file mode 100644 index 0000000000..079dc36a41 --- /dev/null +++ b/net/dcsctp/socket/packet_sender_test.cc @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/socket/packet_sender.h" + +#include "net/dcsctp/common/internal_types.h" +#include "net/dcsctp/packet/chunk/cookie_ack_chunk.h" +#include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::_; + +constexpr VerificationTag kVerificationTag(123); + +class PacketSenderTest : public testing::Test { + protected: + PacketSenderTest() : sender_(callbacks_, on_send_fn_.AsStdFunction()) {} + + SctpPacket::Builder PacketBuilder() const { + return SctpPacket::Builder(kVerificationTag, options_); + } + + DcSctpOptions options_; + testing::NiceMock callbacks_; + testing::MockFunction, SendPacketStatus)> + on_send_fn_; + PacketSender sender_; +}; + +TEST_F(PacketSenderTest, SendPacketCallsCallback) { + EXPECT_CALL(on_send_fn_, Call(_, SendPacketStatus::kSuccess)); + EXPECT_TRUE(sender_.Send(PacketBuilder().Add(CookieAckChunk()))); + + EXPECT_CALL(callbacks_, SendPacketWithStatus) + .WillOnce(testing::Return(SendPacketStatus::kError)); + EXPECT_CALL(on_send_fn_, Call(_, SendPacketStatus::kError)); + EXPECT_FALSE(sender_.Send(PacketBuilder().Add(CookieAckChunk()))); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/socket/transmission_control_block.cc b/net/dcsctp/socket/transmission_control_block.cc index 167534db52..9ec275febd 100644 --- a/net/dcsctp/socket/transmission_control_block.cc +++ b/net/dcsctp/socket/transmission_control_block.cc @@ -131,10 +131,10 @@ void TransmissionControlBlock::SendBufferedPackets(SctpPacket::Builder& builder, builder.Add(DataChunk(tsn, std::move(data), false)); } } - if (builder.empty()) { + + if (!packet_sender_.Send(builder)) { break; } - Send(builder); if (cookie_echo_chunk_.has_value()) { // https://tools.ietf.org/html/rfc4960#section-5.1 diff --git a/net/dcsctp/socket/transmission_control_block.h b/net/dcsctp/socket/transmission_control_block.h index e846451c21..b189ae82e0 100644 --- a/net/dcsctp/socket/transmission_control_block.h +++ b/net/dcsctp/socket/transmission_control_block.h @@ -29,6 +29,7 @@ #include "net/dcsctp/socket/capabilities.h" #include "net/dcsctp/socket/context.h" #include "net/dcsctp/socket/heartbeat_handler.h" +#include "net/dcsctp/socket/packet_sender.h" #include "net/dcsctp/socket/stream_reset_handler.h" #include "net/dcsctp/timer/timer.h" #include "net/dcsctp/tx/retransmission_error_counter.h" @@ -55,8 +56,8 @@ class TransmissionControlBlock : public Context { TSN peer_initial_tsn, size_t a_rwnd, TieTag tie_tag, - std::function is_connection_established, - std::function send_fn) + PacketSender& packet_sender, + std::function is_connection_established) : log_prefix_(log_prefix), options_(options), timer_manager_(timer_manager), @@ -79,7 +80,7 @@ class TransmissionControlBlock : public Context { peer_initial_tsn_(peer_initial_tsn), tie_tag_(tie_tag), is_connection_established_(std::move(is_connection_established)), - send_fn_(std::move(send_fn)), + packet_sender_(packet_sender), rto_(options), tx_error_counter_(log_prefix, options), data_tracker_(log_prefix, delayed_ack_timer_.get(), peer_initial_tsn), @@ -124,7 +125,9 @@ class TransmissionControlBlock : public Context { bool HasTooManyTxErrors() const override { return tx_error_counter_.IsExhausted(); } - void Send(SctpPacket::Builder& builder) override { send_fn_(builder); } + void Send(SctpPacket::Builder& builder) override { + packet_sender_.Send(builder); + } // Other accessors DataTracker& data_tracker() { return data_tracker_; } @@ -202,7 +205,7 @@ class TransmissionControlBlock : public Context { // Nonce, used to detect reconnections. const TieTag tie_tag_; const std::function is_connection_established_; - const std::function send_fn_; + PacketSender& packet_sender_; RetransmissionTimeout rto_; RetransmissionErrorCounter tx_error_counter_;