diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h index 9d6c9dc481..692bed3bee 100644 --- a/net/dcsctp/public/dcsctp_options.h +++ b/net/dcsctp/public/dcsctp_options.h @@ -43,11 +43,33 @@ struct DcSctpOptions { // port number as destination port. int remote_port = 5000; + // The announced maximum number of incoming streams. Note that this value is + // constant and can't be currently increased in run-time as "Add Incoming + // Streams Request" in RFC6525 isn't supported. + // + // The socket implementation doesn't have any per-stream fixed costs, which is + // why the default value is set to be the maximum value. + uint16_t announced_maximum_incoming_streams = 65535; + + // The announced maximum number of outgoing streams. Note that this value is + // constant and can't be currently increased in run-time as "Add Outgoing + // Streams Request" in RFC6525 isn't supported. + // + // The socket implementation doesn't have any per-stream fixed costs, which is + // why the default value is set to be the maximum value. + uint16_t announced_maximum_outgoing_streams = 65535; + // Maximum SCTP packet size. The library will limit the size of generated // packets to be less than or equal to this number. This does not include any // overhead of DTLS, TURN, UDP or IP headers. size_t mtu = kMaxSafeMTUSize; + // The largest allowed message payload to be sent. Messages will be rejected + // if their payload is larger than this value. Note that this doesn't affect + // incoming messages, which may larger than this value (but smaller than + // `max_receiver_window_buffer_size`). + size_t max_message_size = 256 * 1024; + // Maximum received window buffer size. This should be a bit larger than the // largest sized message you want to be able to receive. This essentially // limits the memory usage on the receive side. Note that memory is allocated @@ -65,7 +87,7 @@ struct DcSctpOptions { // than this value, it will be discarded and not used for e.g. any RTO // calculation. The default value is an extreme maximum but can be adapted // to better match the environment. - DurationMs rtt_max = DurationMs(8'000); + DurationMs rtt_max = DurationMs(8000); // Initial RTO value. DurationMs rto_initial = DurationMs(500); @@ -86,7 +108,7 @@ struct DcSctpOptions { DurationMs t2_shutdown_timeout = DurationMs(1000); // Hearbeat interval (on idle connections only). - DurationMs heartbeat_interval = DurationMs(30'000); + DurationMs heartbeat_interval = DurationMs(30000); // The maximum time when a SACK will be sent from the arrival of an // unacknowledged packet. Whatever is smallest of RTO/2 and this will be used. diff --git a/net/dcsctp/public/dcsctp_socket.h b/net/dcsctp/public/dcsctp_socket.h index e7f21349bd..f340cd9ca0 100644 --- a/net/dcsctp/public/dcsctp_socket.h +++ b/net/dcsctp/public/dcsctp_socket.h @@ -18,12 +18,27 @@ #include "absl/types/optional.h" #include "api/array_view.h" #include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_options.h" #include "net/dcsctp/public/packet_observer.h" #include "net/dcsctp/public/timeout.h" #include "net/dcsctp/public/types.h" namespace dcsctp { +// The socket/association state +enum class SocketState { + // The socket is closed. + kClosed, + // The socket has initiated a connection, which is not yet established. Note + // that for incoming connections and for reconnections when the socket is + // already connected, the socket will not transition to this state. + kConnecting, + // The socket is connected, and the connection is established. + kConnected, + // The socket is shutting down, and the connection is not yet closed. + kShuttingDown, +}; + // Send options for sending messages struct SendOptions { // If the message should be sent with unordered message delivery. @@ -59,6 +74,8 @@ enum class ErrorKind { kProtocolViolation, // The receive or send buffers have been exhausted. kResourceExhaustion, + // The client has performed an invalid operation. + kUnsupportedOperation, }; inline constexpr absl::string_view ToString(ErrorKind error) { @@ -79,19 +96,65 @@ inline constexpr absl::string_view ToString(ErrorKind error) { return "PROTOCOL_VIOLATION"; case ErrorKind::kResourceExhaustion: return "RESOURCE_EXHAUSTION"; + case ErrorKind::kUnsupportedOperation: + return "UNSUPPORTED_OPERATION"; } } -// Return value of SupportsStreamReset. -enum class StreamResetSupport { +enum class SendStatus { + // The message was enqueued successfully. As sending the message is done + // asynchronously, this is no guarantee that the message has been actually + // sent. + kSuccess, + // The message was rejected as the payload was empty (which is not allowed in + // SCTP). + kErrorMessageEmpty, + // The message was rejected as the payload was larger than what has been set + // as `DcSctpOptions.max_message_size`. + kErrorMessageTooLarge, + // The message could not be enqueued as the socket is out of resources. This + // mainly indicates that the send queue is full. + kErrorResourceExhaustion, + // The message could not be sent as the socket is shutting down. + kErrorShuttingDown, +}; + +inline constexpr absl::string_view ToString(SendStatus error) { + switch (error) { + case SendStatus::kSuccess: + return "SUCCESS"; + case SendStatus::kErrorMessageEmpty: + return "ERROR_MESSAGE_EMPTY"; + case SendStatus::kErrorMessageTooLarge: + return "ERROR_MESSAGE_TOO_LARGE"; + case SendStatus::kErrorResourceExhaustion: + return "ERROR_RESOURCE_EXHAUSTION"; + case SendStatus::kErrorShuttingDown: + return "ERROR_SHUTTING_DOWN"; + } +} + +// Return value of ResetStreams. +enum class ResetStreamsStatus { // If the connection is not yet established, this will be returned. - kUnknown, - // Indicates that Stream Reset is supported by the peer. - kSupported, - // Indicates that Stream Reset is not supported by the peer. + kNotConnected, + // Indicates that ResetStreams operation has been successfully initiated. + kPerformed, + // Indicates that ResetStreams has failed as it's not supported by the peer. kNotSupported, }; +inline constexpr absl::string_view ToString(ResetStreamsStatus error) { + switch (error) { + case ResetStreamsStatus::kNotConnected: + return "NOT_CONNECTED"; + case ResetStreamsStatus::kPerformed: + return "PERFORMED"; + case ResetStreamsStatus::kNotSupported: + return "NOT_SUPPORTED"; + } +} + // Callbacks that the DcSctpSocket will be done synchronously to the owning // client. It is allowed to call back into the library from callbacks that start // with "On". It has been explicitly documented when it's not allowed to call @@ -123,9 +186,9 @@ class DcSctpSocketCallbacks { virtual TimeMs TimeMillis() = 0; // Called when the library needs a random number uniformly distributed between - // `low` (inclusive) and `high` (exclusive). The random number used by the - // library are not used for cryptographic purposes there are no requirements - // on a secure random number generator. + // `low` (inclusive) and `high` (exclusive). The random numbers used by the + // library are not used for cryptographic purposes. There are no requirements + // that the random number generator must be secure. // // Note that it's NOT ALLOWED to call into this library from within this // callback. @@ -200,15 +263,6 @@ class DcSctpSocketCallbacks { // It is allowed to call into this library from within this callback. virtual void OnIncomingStreamsReset( rtc::ArrayView incoming_streams) = 0; - - // If an outgoing message has expired before being completely sent. - // TODO(boivie) Add some kind of message identifier. - // TODO(boivie) Add callbacks for OnMessageSent and OnSentMessageAcked - // - // It is allowed to call into this library from within this callback. - virtual void OnSentMessageExpired(StreamID stream_id, - PPID ppid, - bool unsent) = 0; }; // The DcSctpSocket implementation implements the following interface. @@ -236,6 +290,22 @@ class DcSctpSocketInterface { // not already closed. No callbacks will be made after Close() has returned. virtual void Close() = 0; + // The socket state. + virtual SocketState state() const = 0; + + // The options it was created with. + virtual const DcSctpOptions& options() const = 0; + + // Sends the message `message` using the provided send options. + // Sending a message is an asynchrous operation, and the `OnError` callback + // may be invoked to indicate any errors in sending the message. + // + // The association does not have to be established before calling this method. + // If it's called before there is an established association, the message will + // be queued. + virtual SendStatus Send(DcSctpMessage message, + const SendOptions& send_options) = 0; + // Resetting streams is an asynchronous operation and the results will // be notified using `DcSctpSocketCallbacks::OnStreamsResetDone()` on success // and `DcSctpSocketCallbacks::OnStreamsResetFailed()` on failure. Note that @@ -251,27 +321,8 @@ class DcSctpSocketInterface { // Resetting streams can only be done on an established association that // supports stream resetting. Calling this method on e.g. a closed association // or streams that don't support resetting will not perform any operation. - virtual void ResetStreams( + virtual ResetStreamsStatus ResetStreams( rtc::ArrayView outgoing_streams) = 0; - - // Indicates if the peer supports resetting streams (RFC6525). Please note - // that the connection must be established for support to be known. - virtual StreamResetSupport SupportsStreamReset() const = 0; - - // Sends the message `message` using the provided send options. - // Sending a message is an asynchrous operation, and the `OnError` callback - // may be invoked to indicate any errors in sending the message. - // - // The association does not have to be established before calling this method. - // If it's called before there is an established association, the message will - // be queued. - void Send(DcSctpMessage message, const SendOptions& send_options = {}) { - SendMessage(std::move(message), send_options); - } - - private: - virtual void SendMessage(DcSctpMessage message, - const SendOptions& send_options) = 0; }; } // namespace dcsctp diff --git a/net/dcsctp/socket/BUILD.gn b/net/dcsctp/socket/BUILD.gn index b5cdf67540..b48c3f6097 100644 --- a/net/dcsctp/socket/BUILD.gn +++ b/net/dcsctp/socket/BUILD.gn @@ -76,6 +76,27 @@ rtc_library("transmission_control_block") { ] } +rtc_library("dcsctp_socket") { + deps = [ + ":context", + ":transmission_control_block", + "../../../api:array_view", + "../../../rtc_base", + "../../../rtc_base:checks", + "../../../rtc_base:rtc_base_approved", + "../packet:chunk_validators", + "../public:types", + "../tx:fcfs_send_queue", + ] + sources = [ + "callback_deferrer.h", + "dcsctp_socket.cc", + "dcsctp_socket.h", + "state_cookie.cc", + "state_cookie.h", + ] +} + if (rtc_include_tests) { rtc_source_set("mock_callbacks") { testonly = true @@ -100,6 +121,7 @@ if (rtc_include_tests) { testonly = true deps = [ + ":dcsctp_socket", ":heartbeat_handler", ":stream_reset_handler", "../../../api:array_view", @@ -109,7 +131,9 @@ if (rtc_include_tests) { "../../../test:test_support", ] sources = [ + "dcsctp_socket_test.cc", "heartbeat_handler_test.cc", + "state_cookie_test.cc", "stream_reset_handler_test.cc", ] } diff --git a/net/dcsctp/socket/callback_deferrer.h b/net/dcsctp/socket/callback_deferrer.h new file mode 100644 index 0000000000..79f3f36d15 --- /dev/null +++ b/net/dcsctp/socket/callback_deferrer.h @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_SOCKET_CALLBACK_DEFERRER_H_ +#define NET_DCSCTP_SOCKET_CALLBACK_DEFERRER_H_ + +#include +#include +#include +#include +#include +#include + +#include "absl/strings/string_view.h" +#include "api/array_view.h" +#include "api/ref_counted_base.h" +#include "api/scoped_refptr.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "rtc_base/ref_counted_object.h" + +namespace dcsctp { + +// Defers callbacks until they can be safely triggered. +// +// There are a lot of callbacks from the dcSCTP library to the client, +// such as when messages are received or streams are closed. When the client +// receives these callbacks, the client is expected to be able to call into the +// library - from within the callback. For example, sending a reply message when +// a certain SCTP message has been received, or to reconnect when the connection +// was closed for any reason. This means that the dcSCTP library must always be +// in a consistent and stable state when these callbacks are delivered, and to +// ensure that's the case, callbacks are not immediately delivered from where +// they originate, but instead queued (deferred) by this class. At the end of +// any public API method that may result in callbacks, they are triggered and +// then delivered. +// +// There are a number of exceptions, which is clearly annotated in the API. +class CallbackDeferrer : public DcSctpSocketCallbacks { + public: + explicit CallbackDeferrer(DcSctpSocketCallbacks& underlying) + : underlying_(underlying) {} + + void TriggerDeferred() { + // Need to swap here. The client may call into the library from within a + // callback, and that might result in adding new callbacks to this instance, + // and the vector can't be modified while iterated on. + std::vector> deferred; + deferred.swap(deferred_); + + for (auto& cb : deferred) { + cb(underlying_); + } + } + + void SendPacket(rtc::ArrayView data) override { + // Will not be deferred - call directly. + underlying_.SendPacket(data); + } + + std::unique_ptr CreateTimeout() override { + // Will not be deferred - call directly. + return underlying_.CreateTimeout(); + } + + TimeMs TimeMillis() override { + // Will not be deferred - call directly. + return underlying_.TimeMillis(); + } + + uint32_t GetRandomInt(uint32_t low, uint32_t high) override { + // Will not be deferred - call directly. + return underlying_.GetRandomInt(low, high); + } + + void NotifyOutgoingMessageBufferEmpty() override { + // Will not be deferred - call directly. + underlying_.NotifyOutgoingMessageBufferEmpty(); + } + + void OnMessageReceived(DcSctpMessage message) override { + deferred_.emplace_back( + [deliverer = MessageDeliverer(std::move(message))]( + DcSctpSocketCallbacks& cb) mutable { deliverer.Deliver(cb); }); + } + + void OnError(ErrorKind error, absl::string_view message) override { + deferred_.emplace_back( + [error, message = std::string(message)](DcSctpSocketCallbacks& cb) { + cb.OnError(error, message); + }); + } + + void OnAborted(ErrorKind error, absl::string_view message) override { + deferred_.emplace_back( + [error, message = std::string(message)](DcSctpSocketCallbacks& cb) { + cb.OnAborted(error, message); + }); + } + + void OnConnected() override { + deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnConnected(); }); + } + + void OnClosed() override { + deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnClosed(); }); + } + + void OnConnectionRestarted() override { + deferred_.emplace_back( + [](DcSctpSocketCallbacks& cb) { cb.OnConnectionRestarted(); }); + } + + void OnStreamsResetFailed(rtc::ArrayView outgoing_streams, + absl::string_view reason) override { + deferred_.emplace_back( + [streams = std::vector(outgoing_streams.begin(), + outgoing_streams.end()), + reason = std::string(reason)](DcSctpSocketCallbacks& cb) { + cb.OnStreamsResetFailed(streams, reason); + }); + } + + void OnStreamsResetPerformed( + rtc::ArrayView outgoing_streams) override { + deferred_.emplace_back( + [streams = std::vector(outgoing_streams.begin(), + outgoing_streams.end())]( + DcSctpSocketCallbacks& cb) { + cb.OnStreamsResetPerformed(streams); + }); + } + + void OnIncomingStreamsReset( + rtc::ArrayView incoming_streams) override { + deferred_.emplace_back( + [streams = std::vector(incoming_streams.begin(), + incoming_streams.end())]( + DcSctpSocketCallbacks& cb) { cb.OnIncomingStreamsReset(streams); }); + } + + private: + // A wrapper around the move-only DcSctpMessage, to let it be captured in a + // lambda. + class MessageDeliverer { + public: + explicit MessageDeliverer(DcSctpMessage&& message) + : state_(rtc::make_ref_counted(std::move(message))) {} + + void Deliver(DcSctpSocketCallbacks& c) { + // Really ensure that it's only called once. + RTC_DCHECK(!state_->has_delivered); + state_->has_delivered = true; + c.OnMessageReceived(std::move(state_->message)); + } + + private: + struct State : public rtc::RefCountInterface { + explicit State(DcSctpMessage&& m) + : has_delivered(false), message(std::move(m)) {} + bool has_delivered; + DcSctpMessage message; + }; + rtc::scoped_refptr state_; + }; + + DcSctpSocketCallbacks& underlying_; + std::vector> deferred_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_SOCKET_CALLBACK_DEFERRER_H_ diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc new file mode 100644 index 0000000000..b54ad80b9c --- /dev/null +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -0,0 +1,1522 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/socket/dcsctp_socket.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/chunk/abort_chunk.h" +#include "net/dcsctp/packet/chunk/chunk.h" +#include "net/dcsctp/packet/chunk/cookie_ack_chunk.h" +#include "net/dcsctp/packet/chunk/cookie_echo_chunk.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/data_common.h" +#include "net/dcsctp/packet/chunk/error_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/heartbeat_ack_chunk.h" +#include "net/dcsctp/packet/chunk/heartbeat_request_chunk.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/init_ack_chunk.h" +#include "net/dcsctp/packet/chunk/init_chunk.h" +#include "net/dcsctp/packet/chunk/reconfig_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/chunk/shutdown_ack_chunk.h" +#include "net/dcsctp/packet/chunk/shutdown_chunk.h" +#include "net/dcsctp/packet/chunk/shutdown_complete_chunk.h" +#include "net/dcsctp/packet/chunk_validators.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/packet/error_cause/cookie_received_while_shutting_down_cause.h" +#include "net/dcsctp/packet/error_cause/error_cause.h" +#include "net/dcsctp/packet/error_cause/no_user_data_cause.h" +#include "net/dcsctp/packet/error_cause/out_of_resource_error_cause.h" +#include "net/dcsctp/packet/error_cause/protocol_violation_cause.h" +#include "net/dcsctp/packet/error_cause/unrecognized_chunk_type_cause.h" +#include "net/dcsctp/packet/error_cause/user_initiated_abort_cause.h" +#include "net/dcsctp/packet/parameter/forward_tsn_supported_parameter.h" +#include "net/dcsctp/packet/parameter/parameter.h" +#include "net/dcsctp/packet/parameter/state_cookie_parameter.h" +#include "net/dcsctp/packet/parameter/supported_extensions_parameter.h" +#include "net/dcsctp/packet/sctp_packet.h" +#include "net/dcsctp/packet/tlv_trait.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/packet_observer.h" +#include "net/dcsctp/rx/data_tracker.h" +#include "net/dcsctp/rx/reassembly_queue.h" +#include "net/dcsctp/socket/callback_deferrer.h" +#include "net/dcsctp/socket/capabilities.h" +#include "net/dcsctp/socket/heartbeat_handler.h" +#include "net/dcsctp/socket/state_cookie.h" +#include "net/dcsctp/socket/stream_reset_handler.h" +#include "net/dcsctp/socket/transmission_control_block.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/retransmission_queue.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/strings/string_builder.h" +#include "rtc_base/strings/string_format.h" + +namespace dcsctp { +namespace { + +// https://tools.ietf.org/html/rfc4960#section-5.1 +constexpr uint32_t kMinVerificationTag = 1; +constexpr uint32_t kMaxVerificationTag = std::numeric_limits::max(); + +// https://tools.ietf.org/html/rfc4960#section-3.3.2 +constexpr uint32_t kMinInitialTsn = 0; +constexpr uint32_t kMaxInitialTsn = std::numeric_limits::max(); + +Capabilities GetCapabilities(const DcSctpOptions& options, + const Parameters& parameters) { + Capabilities capabilities; + absl::optional supported_extensions = + parameters.get(); + + if (options.enable_partial_reliability) { + capabilities.partial_reliability = + parameters.get().has_value(); + if (supported_extensions.has_value()) { + capabilities.partial_reliability |= + supported_extensions->supports(ForwardTsnChunk::kType); + } + } + + if (options.enable_message_interleaving && supported_extensions.has_value()) { + capabilities.message_interleaving = + supported_extensions->supports(IDataChunk::kType) && + supported_extensions->supports(IForwardTsnChunk::kType); + } + if (supported_extensions.has_value() && + supported_extensions->supports(ReConfigChunk::kType)) { + capabilities.reconfig = true; + } + return capabilities; +} + +void AddCapabilityParameters(const DcSctpOptions& options, + Parameters::Builder& builder) { + std::vector chunk_types = {ReConfigChunk::kType}; + + if (options.enable_partial_reliability) { + builder.Add(ForwardTsnSupportedParameter()); + chunk_types.push_back(ForwardTsnChunk::kType); + } + if (options.enable_message_interleaving) { + chunk_types.push_back(IDataChunk::kType); + chunk_types.push_back(IForwardTsnChunk::kType); + } + builder.Add(SupportedExtensionsParameter(std::move(chunk_types))); +} + +TieTag MakeTieTag(DcSctpSocketCallbacks& cb) { + uint32_t tie_tag_upper = + cb.GetRandomInt(0, std::numeric_limits::max()); + uint32_t tie_tag_lower = + cb.GetRandomInt(1, std::numeric_limits::max()); + return TieTag(static_cast(tie_tag_upper) << 32 | + static_cast(tie_tag_lower)); +} + +} // namespace + +DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, + DcSctpSocketCallbacks& callbacks, + std::unique_ptr packet_observer, + const DcSctpOptions& options) + : log_prefix_(std::string(log_prefix) + ": "), + packet_observer_(std::move(packet_observer)), + options_(options), + callbacks_(callbacks), + timer_manager_([this]() { return callbacks_.CreateTimeout(); }), + t1_init_(timer_manager_.CreateTimer( + "t1-init", + [this]() { return OnInitTimerExpiry(); }, + TimerOptions(options.t1_init_timeout, + TimerBackoffAlgorithm::kExponential, + options.max_init_retransmits))), + t1_cookie_(timer_manager_.CreateTimer( + "t1-cookie", + [this]() { return OnCookieTimerExpiry(); }, + TimerOptions(options.t1_cookie_timeout, + TimerBackoffAlgorithm::kExponential, + options.max_init_retransmits))), + t2_shutdown_(timer_manager_.CreateTimer( + "t2-shutdown", + [this]() { return OnShutdownTimerExpiry(); }, + TimerOptions(options.t2_shutdown_timeout, + TimerBackoffAlgorithm::kExponential, + options.max_retransmissions))), + send_queue_(log_prefix_, options_.max_send_buffer_size) {} + +std::string DcSctpSocket::log_prefix() const { + return log_prefix_ + "[" + std::string(ToString(state_)) + "] "; +} + +bool DcSctpSocket::IsConsistent() const { + switch (state_) { + case State::kClosed: + return (tcb_ == nullptr && !t1_init_->is_running() && + !t1_cookie_->is_running() && !t2_shutdown_->is_running()); + case State::kCookieWait: + return (tcb_ == nullptr && t1_init_->is_running() && + !t1_cookie_->is_running() && !t2_shutdown_->is_running()); + case State::kCookieEchoed: + return (tcb_ != nullptr && !t1_init_->is_running() && + t1_cookie_->is_running() && !t2_shutdown_->is_running() && + cookie_echo_chunk_.has_value()); + case State::kEstablished: + return (tcb_ != nullptr && !t1_init_->is_running() && + !t1_cookie_->is_running() && !t2_shutdown_->is_running()); + case State::kShutdownPending: + return (tcb_ != nullptr && !t1_init_->is_running() && + !t1_cookie_->is_running() && !t2_shutdown_->is_running()); + case State::kShutdownSent: + return (tcb_ != nullptr && !t1_init_->is_running() && + !t1_cookie_->is_running() && t2_shutdown_->is_running()); + case State::kShutdownReceived: + return (tcb_ != nullptr && !t1_init_->is_running() && + !t1_cookie_->is_running() && !t2_shutdown_->is_running()); + case State::kShutdownAckSent: + return (tcb_ != nullptr && !t1_init_->is_running() && + !t1_cookie_->is_running() && t2_shutdown_->is_running()); + } +} + +constexpr absl::string_view DcSctpSocket::ToString(DcSctpSocket::State state) { + switch (state) { + case DcSctpSocket::State::kClosed: + return "CLOSED"; + case DcSctpSocket::State::kCookieWait: + return "COOKIE_WAIT"; + case DcSctpSocket::State::kCookieEchoed: + return "COOKIE_ECHOED"; + case DcSctpSocket::State::kEstablished: + return "ESTABLISHED"; + case DcSctpSocket::State::kShutdownPending: + return "SHUTDOWN_PENDING"; + case DcSctpSocket::State::kShutdownSent: + return "SHUTDOWN_SENT"; + case DcSctpSocket::State::kShutdownReceived: + return "SHUTDOWN_RECEIVED"; + case DcSctpSocket::State::kShutdownAckSent: + return "SHUTDOWN_ACK_SENT"; + } +} + +void DcSctpSocket::SetState(State state, absl::string_view reason) { + if (state_ != state) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Socket state changed from " + << ToString(state_) << " to " << ToString(state) + << " due to " << reason; + state_ = state; + } +} + +void DcSctpSocket::SendInit() { + Parameters::Builder params_builder; + AddCapabilityParameters(options_, params_builder); + InitChunk init(/*initiate_tag=*/connect_params_.verification_tag, + /*a_rwnd=*/options_.max_receiver_window_buffer_size, + options_.announced_maximum_outgoing_streams, + options_.announced_maximum_incoming_streams, + connect_params_.initial_tsn, params_builder.Build()); + SctpPacket::Builder b(VerificationTag(0), options_); + b.Add(init); + SendPacket(b); +} + +void DcSctpSocket::MakeConnectionParameters() { + VerificationTag new_verification_tag( + callbacks_.GetRandomInt(kMinVerificationTag, kMaxVerificationTag)); + TSN initial_tsn(callbacks_.GetRandomInt(kMinInitialTsn, kMaxInitialTsn)); + connect_params_.initial_tsn = initial_tsn; + connect_params_.verification_tag = new_verification_tag; +} + +void DcSctpSocket::Connect() { + if (state_ == State::kClosed) { + MakeConnectionParameters(); + RTC_DLOG(LS_INFO) + << log_prefix() + << rtc::StringFormat( + "Connecting. my_verification_tag=%08x, my_initial_tsn=%u", + *connect_params_.verification_tag, *connect_params_.initial_tsn); + SendInit(); + t1_init_->Start(); + SetState(State::kCookieWait, "Connect called"); + } else { + RTC_DLOG(LS_WARNING) << log_prefix() + << "Called Connect on a socket that is not closed"; + } + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); +} + +void DcSctpSocket::Shutdown() { + if (tcb_ != nullptr) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "Upon receipt of the SHUTDOWN primitive from its upper layer, the + // endpoint enters the SHUTDOWN-PENDING state and remains there until all + // outstanding data has been acknowledged by its peer." + SetState(State::kShutdownPending, "Shutdown called"); + MaybeSendShutdownOrAck(); + } else { + // Connection closed before even starting to connect, or during the initial + // connection phase. There is no outstanding data, so the socket can just + // be closed (stopping any connection timers, if any), as this is the + // client's intention, by calling Shutdown. + InternalClose(ErrorKind::kNoError, ""); + } + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); +} + +void DcSctpSocket::Close() { + if (state_ != State::kClosed) { + if (tcb_ != nullptr) { + SctpPacket::Builder b = tcb_->PacketBuilder(); + b.Add(AbortChunk(/*filled_in_verification_tag=*/true, + Parameters::Builder() + .Add(UserInitiatedAbortCause("Close called")) + .Build())); + SendPacket(b); + } + InternalClose(ErrorKind::kNoError, ""); + } else { + RTC_DLOG(LS_INFO) << log_prefix() << "Called Close on a closed socket"; + } + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); +} + +void DcSctpSocket::CloseConnectionBecauseOfTooManyTransmissionErrors() { + SendPacket(tcb_->PacketBuilder().Add(AbortChunk( + true, Parameters::Builder() + .Add(UserInitiatedAbortCause("Too many retransmissions")) + .Build()))); + InternalClose(ErrorKind::kTooManyRetries, "Too many retransmissions"); +} + +void DcSctpSocket::InternalClose(ErrorKind error, absl::string_view message) { + if (state_ != State::kClosed) { + t1_init_->Stop(); + t1_cookie_->Stop(); + t2_shutdown_->Stop(); + tcb_ = nullptr; + cookie_echo_chunk_ = absl::nullopt; + + if (error == ErrorKind::kNoError) { + callbacks_.OnClosed(); + } else { + callbacks_.OnAborted(error, message); + } + SetState(State::kClosed, message); + } + // This method's purpose is to abort/close and make it consistent by ensuring + // that e.g. all timers really are stopped. + RTC_DCHECK(IsConsistent()); +} + +SendStatus DcSctpSocket::Send(DcSctpMessage message, + const SendOptions& send_options) { + if (message.payload().empty()) { + callbacks_.OnError(ErrorKind::kProtocolViolation, + "Unable to send empty message"); + return SendStatus::kErrorMessageEmpty; + } + if (message.payload().size() > options_.max_message_size) { + callbacks_.OnError(ErrorKind::kProtocolViolation, + "Unable to send too large message"); + return SendStatus::kErrorMessageTooLarge; + } + if (state_ == State::kShutdownPending || state_ == State::kShutdownSent || + state_ == State::kShutdownReceived || state_ == State::kShutdownAckSent) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "An endpoint should reject any new data request from its upper layer + // if it is in the SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED, or + // SHUTDOWN-ACK-SENT state." + callbacks_.OnError(ErrorKind::kWrongSequence, + "Unable to send message as the socket is shutting down"); + return SendStatus::kErrorShuttingDown; + } + if (send_queue_.IsFull()) { + callbacks_.OnError(ErrorKind::kResourceExhaustion, + "Unable to send message as the send queue is full"); + return SendStatus::kErrorResourceExhaustion; + } + + send_queue_.Add(callbacks_.TimeMillis(), std::move(message), send_options); + if (tcb_ != nullptr) { + tcb_->SendBufferedPackets(); + } + + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); + return SendStatus::kSuccess; +} + +ResetStreamsStatus DcSctpSocket::ResetStreams( + rtc::ArrayView outgoing_streams) { + if (tcb_ == nullptr) { + callbacks_.OnError(ErrorKind::kWrongSequence, + "Can't reset streams as the socket is not connected"); + return ResetStreamsStatus::kNotConnected; + } + if (!tcb_->capabilities().reconfig) { + callbacks_.OnError(ErrorKind::kUnsupportedOperation, + "Can't reset streams as the peer doesn't support it"); + return ResetStreamsStatus::kNotSupported; + } + + tcb_->stream_reset_handler().ResetStreams(outgoing_streams); + absl::optional reconfig = + tcb_->stream_reset_handler().MakeStreamResetRequest(); + if (reconfig.has_value()) { + SctpPacket::Builder builder = tcb_->PacketBuilder(); + builder.Add(*reconfig); + SendPacket(builder); + } + + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); + return ResetStreamsStatus::kPerformed; +} + +SocketState DcSctpSocket::state() const { + switch (state_) { + case State::kClosed: + return SocketState::kClosed; + case State::kCookieWait: + ABSL_FALLTHROUGH_INTENDED; + case State::kCookieEchoed: + return SocketState::kConnecting; + case State::kEstablished: + return SocketState::kConnected; + case State::kShutdownPending: + ABSL_FALLTHROUGH_INTENDED; + case State::kShutdownSent: + ABSL_FALLTHROUGH_INTENDED; + case State::kShutdownReceived: + ABSL_FALLTHROUGH_INTENDED; + case State::kShutdownAckSent: + return SocketState::kShuttingDown; + } +} + +void DcSctpSocket::MaybeSendShutdownOnPacketReceived(const SctpPacket& packet) { + if (state_ == State::kShutdownSent) { + bool has_data_chunk = + std::find_if(packet.descriptors().begin(), packet.descriptors().end(), + [](const SctpPacket::ChunkDescriptor& descriptor) { + return descriptor.type == DataChunk::kType; + }) != packet.descriptors().end(); + if (has_data_chunk) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "While in the SHUTDOWN-SENT state, the SHUTDOWN sender MUST immediately + // respond to each received packet containing one or more DATA chunks with + // a SHUTDOWN chunk and restart the T2-shutdown timer."" + SendShutdown(); + t2_shutdown_->set_duration(tcb_->current_rto()); + t2_shutdown_->Start(); + } + } +} + +bool DcSctpSocket::ValidatePacket(const SctpPacket& packet) { + const CommonHeader& header = packet.common_header(); + VerificationTag my_verification_tag = + tcb_ != nullptr ? tcb_->my_verification_tag() : VerificationTag(0); + + if (header.verification_tag == VerificationTag(0)) { + if (packet.descriptors().size() == 1 && + packet.descriptors()[0].type == InitChunk::kType) { + // https://tools.ietf.org/html/rfc4960#section-8.5.1 + // "When an endpoint receives an SCTP packet with the Verification Tag + // set to 0, it should verify that the packet contains only an INIT chunk. + // Otherwise, the receiver MUST silently discard the packet."" + return true; + } + callbacks_.OnError( + ErrorKind::kParseFailed, + "Only a single INIT chunk can be present in packets sent on " + "verification_tag = 0"); + return false; + } + + if (packet.descriptors().size() == 1 && + packet.descriptors()[0].type == AbortChunk::kType) { + // https://tools.ietf.org/html/rfc4960#section-8.5.1 + // "The receiver of an ABORT MUST accept the packet if the Verification + // Tag field of the packet matches its own tag and the T bit is not set OR + // if it is set to its peer's tag and the T bit is set in the Chunk Flags. + // Otherwise, the receiver MUST silently discard the packet and take no + // further action." + bool t_bit = (packet.descriptors()[0].flags & 0x01) != 0; + if (t_bit && tcb_ == nullptr) { + // Can't verify the tag - assume it's okey. + return true; + } + if ((!t_bit && header.verification_tag == my_verification_tag) || + (t_bit && header.verification_tag == tcb_->peer_verification_tag())) { + return true; + } + callbacks_.OnError(ErrorKind::kParseFailed, + "ABORT chunk verification tag was wrong"); + return false; + } + + if (packet.descriptors()[0].type == InitAckChunk::kType) { + if (header.verification_tag == connect_params_.verification_tag) { + return true; + } + callbacks_.OnError( + ErrorKind::kParseFailed, + rtc::StringFormat( + "Packet has invalid verification tag: %08x, expected %08x", + *header.verification_tag, *connect_params_.verification_tag)); + return false; + } + + if (packet.descriptors()[0].type == CookieEchoChunk::kType) { + // Handled in chunk handler (due to RFC 4960, section 5.2.4). + return true; + } + + if (packet.descriptors().size() == 1 && + packet.descriptors()[0].type == ShutdownCompleteChunk::kType) { + // https://tools.ietf.org/html/rfc4960#section-8.5.1 + // "The receiver of a SHUTDOWN COMPLETE shall accept the packet if the + // Verification Tag field of the packet matches its own tag and the T bit is + // not set OR if it is set to its peer's tag and the T bit is set in the + // Chunk Flags. Otherwise, the receiver MUST silently discard the packet + // and take no further action." + bool t_bit = (packet.descriptors()[0].flags & 0x01) != 0; + if (t_bit && tcb_ == nullptr) { + // Can't verify the tag - assume it's okey. + return true; + } + if ((!t_bit && header.verification_tag == my_verification_tag) || + (t_bit && header.verification_tag == tcb_->peer_verification_tag())) { + return true; + } + callbacks_.OnError(ErrorKind::kParseFailed, + "SHUTDOWN_COMPLETE chunk verification tag was wrong"); + return false; + } + + // https://tools.ietf.org/html/rfc4960#section-8.5 + // "When receiving an SCTP packet, the endpoint MUST ensure that the value + // in the Verification Tag field of the received SCTP packet matches its own + // tag. If the received Verification Tag value does not match the receiver's + // own tag value, the receiver shall silently discard the packet and shall not + // process it any further..." + if (header.verification_tag == my_verification_tag) { + return true; + } + + callbacks_.OnError( + ErrorKind::kParseFailed, + rtc::StringFormat( + "Packet has invalid verification tag: %08x, expected %08x", + *header.verification_tag, *my_verification_tag)); + return false; +} + +void DcSctpSocket::HandleTimeout(TimeoutID timeout_id) { + timer_manager_.HandleTimeout(timeout_id); + + if (tcb_ != nullptr && tcb_->HasTooManyTxErrors()) { + // Tearing down the TCB has to be done outside the handlers. + CloseConnectionBecauseOfTooManyTransmissionErrors(); + } + + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); +} + +void DcSctpSocket::ReceivePacket(rtc::ArrayView data) { + if (packet_observer_ != nullptr) { + packet_observer_->OnReceivedPacket(callbacks_.TimeMillis(), data); + } + + absl::optional packet = + SctpPacket::Parse(data, options_.disable_checksum_verification); + if (!packet.has_value()) { + // https://tools.ietf.org/html/rfc4960#section-6.8 + // "The default procedure for handling invalid SCTP packets is to + // silently discard them." + callbacks_.OnError(ErrorKind::kParseFailed, + "Failed to parse received SCTP packet"); + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); + return; + } + + if (RTC_DLOG_IS_ON) { + for (const auto& descriptor : packet->descriptors()) { + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Received " + << DebugConvertChunkToString(descriptor.data); + } + } + + if (!ValidatePacket(*packet)) { + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Packet failed verification tag check - dropping"; + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); + return; + } + + MaybeSendShutdownOnPacketReceived(*packet); + + for (const auto& descriptor : packet->descriptors()) { + if (!Dispatch(packet->common_header(), descriptor)) { + break; + } + } + + if (tcb_ != nullptr) { + tcb_->data_tracker().ObservePacketEnd(); + tcb_->MaybeSendSack(); + } + + RTC_DCHECK(IsConsistent()); + callbacks_.TriggerDeferred(); +} + +void DcSctpSocket::DebugPrintOutgoing(rtc::ArrayView payload) { + auto packet = SctpPacket::Parse(payload); + RTC_DCHECK(packet.has_value()); + + for (const auto& desc : packet->descriptors()) { + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Sent " + << DebugConvertChunkToString(desc.data); + } +} + +bool DcSctpSocket::Dispatch(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + switch (descriptor.type) { + case DataChunk::kType: + HandleData(header, descriptor); + break; + case InitChunk::kType: + HandleInit(header, descriptor); + break; + case InitAckChunk::kType: + HandleInitAck(header, descriptor); + break; + case SackChunk::kType: + HandleSack(header, descriptor); + break; + case HeartbeatRequestChunk::kType: + HandleHeartbeatRequest(header, descriptor); + break; + case HeartbeatAckChunk::kType: + HandleHeartbeatAck(header, descriptor); + break; + case AbortChunk::kType: + HandleAbort(header, descriptor); + break; + case ErrorChunk::kType: + HandleError(header, descriptor); + break; + case CookieEchoChunk::kType: + HandleCookieEcho(header, descriptor); + break; + case CookieAckChunk::kType: + HandleCookieAck(header, descriptor); + break; + case ShutdownChunk::kType: + HandleShutdown(header, descriptor); + break; + case ShutdownAckChunk::kType: + HandleShutdownAck(header, descriptor); + break; + case ShutdownCompleteChunk::kType: + HandleShutdownComplete(header, descriptor); + break; + case ReConfigChunk::kType: + HandleReconfig(header, descriptor); + break; + case ForwardTsnChunk::kType: + HandleForwardTsn(header, descriptor); + break; + case IDataChunk::kType: + HandleIData(header, descriptor); + break; + case IForwardTsnChunk::kType: + HandleForwardTsn(header, descriptor); + break; + default: + return HandleUnrecognizedChunk(descriptor); + } + return true; +} + +bool DcSctpSocket::HandleUnrecognizedChunk( + const SctpPacket::ChunkDescriptor& descriptor) { + bool report_as_error = (descriptor.type & 0x40) != 0; + bool continue_processing = (descriptor.type & 0x80) != 0; + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Received unknown chunk: " + << static_cast(descriptor.type); + if (report_as_error) { + rtc::StringBuilder sb; + sb << "Received unknown chunk of type: " + << static_cast(descriptor.type) << " with report-error bit set"; + callbacks_.OnError(ErrorKind::kParseFailed, sb.str()); + RTC_DLOG(LS_VERBOSE) + << log_prefix() + << "Unknown chunk, with type indicating it should be reported."; + + // https://tools.ietf.org/html/rfc4960#section-3.2 + // "... report in an ERROR chunk using the 'Unrecognized Chunk Type' + // cause." + if (tcb_ != nullptr) { + // Need TCB - this chunk must be sent with a correct verification tag. + SendPacket(tcb_->PacketBuilder().Add( + ErrorChunk(Parameters::Builder() + .Add(UnrecognizedChunkTypeCause(std::vector( + descriptor.data.begin(), descriptor.data.end()))) + .Build()))); + } + } + if (!continue_processing) { + // https://tools.ietf.org/html/rfc4960#section-3.2 + // "Stop processing this SCTP packet and discard it, do not process any + // further chunks within it." + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Unknown chunk, with type indicating not to " + "process any further chunks"; + } + + return continue_processing; +} + +absl::optional DcSctpSocket::OnInitTimerExpiry() { + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Timer " << t1_init_->name() + << " has expired: " << t1_init_->expiration_count() + << "/" << t1_init_->options().max_restarts; + RTC_DCHECK(state_ == State::kCookieWait); + + if (t1_init_->is_running()) { + SendInit(); + } else { + InternalClose(ErrorKind::kTooManyRetries, "No INIT_ACK received"); + } + RTC_DCHECK(IsConsistent()); + return absl::nullopt; +} + +absl::optional DcSctpSocket::OnCookieTimerExpiry() { + // https://tools.ietf.org/html/rfc4960#section-4 + // "If the T1-cookie timer expires, the endpoint MUST retransmit COOKIE + // ECHO and restart the T1-cookie timer without changing state. This MUST + // be repeated up to 'Max.Init.Retransmits' times. After that, the endpoint + // MUST abort the initialization process and report the error to the SCTP + // user." + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Timer " << t1_cookie_->name() + << " has expired: " << t1_cookie_->expiration_count() + << "/" << t1_cookie_->options().max_restarts; + + RTC_DCHECK(state_ == State::kCookieEchoed); + + if (t1_cookie_->is_running()) { + SendCookieEcho(); + } else { + InternalClose(ErrorKind::kTooManyRetries, "No COOKIE_ACK received"); + } + + RTC_DCHECK(IsConsistent()); + return absl::nullopt; +} + +absl::optional DcSctpSocket::OnShutdownTimerExpiry() { + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Timer " << t2_shutdown_->name() + << " has expired: " << t2_shutdown_->expiration_count() + << "/" << t2_shutdown_->options().max_restarts; + + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "If the timer expires, the endpoint must resend the SHUTDOWN with the + // updated last sequential TSN received from its peer." + if (t2_shutdown_->is_running()) { + SendShutdown(); + } else { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "An endpoint should limit the number of retransmissions of the SHUTDOWN + // chunk to the protocol parameter 'Association.Max.Retrans'. If this + // threshold is exceeded, the endpoint should destroy the TCB..." + + SendPacket(tcb_->PacketBuilder().Add( + AbortChunk(true, Parameters::Builder() + .Add(UserInitiatedAbortCause( + "Too many retransmissions of SHUTDOWN")) + .Build()))); + + InternalClose(ErrorKind::kTooManyRetries, "No SHUTDOWN_ACK received"); + } + RTC_DCHECK(IsConsistent()); + return tcb_->current_rto(); +} + +void DcSctpSocket::SendPacket(SctpPacket::Builder& builder) { + if (builder.empty()) { + return; + } + + std::vector payload = builder.Build(); + + if (RTC_DLOG_IS_ON) { + DebugPrintOutgoing(payload); + } + + // The heartbeat interval timer is restarted for every sent packet, to + // fire when the outgoing channel is inactive. + if (tcb_ != nullptr) { + tcb_->heartbeat_handler().RestartTimer(); + } + + if (packet_observer_ != nullptr) { + packet_observer_->OnSentPacket(callbacks_.TimeMillis(), payload); + } + callbacks_.SendPacket(payload); +} + +bool DcSctpSocket::ValidateHasTCB() { + if (tcb_ != nullptr) { + return true; + } + + callbacks_.OnError( + ErrorKind::kNotConnected, + "Received unexpected commands on socket that is not connected"); + return false; +} + +void DcSctpSocket::ReportFailedToParseChunk(int chunk_type) { + rtc::StringBuilder sb; + sb << "Failed to parse chunk of type: " << chunk_type; + callbacks_.OnError(ErrorKind::kParseFailed, sb.str()); +} + +void DcSctpSocket::HandleData(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = DataChunk::Parse(descriptor.data); + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + HandleDataCommon(*chunk); + } +} + +void DcSctpSocket::HandleIData(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = IDataChunk::Parse(descriptor.data); + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + HandleDataCommon(*chunk); + } +} + +void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) { + TSN tsn = chunk.tsn(); + AnyDataChunk::ImmediateAckFlag immediate_ack = chunk.options().immediate_ack; + Data data = std::move(chunk).extract(); + + if (data.payload.empty()) { + // Empty DATA chunks are illegal. + SendPacket(tcb_->PacketBuilder().Add( + ErrorChunk(Parameters::Builder().Add(NoUserDataCause(tsn)).Build()))); + callbacks_.OnError(ErrorKind::kProtocolViolation, + "Received DATA chunk with no user data"); + return; + } + + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Handle DATA, queue_size=" + << tcb_->reassembly_queue().queued_bytes() + << ", water_mark=" + << tcb_->reassembly_queue().watermark_bytes() + << ", full=" << tcb_->reassembly_queue().is_full() + << ", above=" + << tcb_->reassembly_queue().is_above_watermark(); + + if (tcb_->reassembly_queue().is_full()) { + // If the reassembly queue is full, there is nothing that can be done. The + // specification only allows dropping gap-ack-blocks, and that's not + // likely to help as the socket has been trying to fill gaps since the + // watermark was reached. + SendPacket(tcb_->PacketBuilder().Add(AbortChunk( + true, Parameters::Builder().Add(OutOfResourceErrorCause()).Build()))); + InternalClose(ErrorKind::kResourceExhaustion, + "Reassembly Queue is exhausted"); + return; + } + + if (tcb_->reassembly_queue().is_above_watermark()) { + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Is above high watermark"; + // If the reassembly queue is above its high watermark, only accept data + // chunks that increase its cumulative ack tsn in an attempt to fill gaps + // to deliver messages. + if (!tcb_->data_tracker().will_increase_cum_ack_tsn(tsn)) { + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Rejected data because of exceeding watermark"; + tcb_->data_tracker().ForceImmediateSack(); + return; + } + } + + if (!tcb_->data_tracker().IsTSNValid(tsn)) { + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Rejected data because of failing TSN validity"; + return; + } + + tcb_->data_tracker().Observe(tsn, immediate_ack); + tcb_->reassembly_queue().MaybeResetStreamsDeferred( + tcb_->data_tracker().last_cumulative_acked_tsn()); + tcb_->reassembly_queue().Add(tsn, std::move(data)); + DeliverReassembledMessages(); +} + +void DcSctpSocket::HandleInit(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = InitChunk::Parse(descriptor.data); + if (!ValidateParseSuccess(chunk)) { + return; + } + + if (chunk->initiate_tag() == VerificationTag(0) || + chunk->nbr_outbound_streams() == 0 || chunk->nbr_inbound_streams() == 0) { + // https://tools.ietf.org/html/rfc4960#section-3.3.2 + // "If the value of the Initiate Tag in a received INIT chunk is found + // to be 0, the receiver MUST treat it as an error and close the + // association by transmitting an ABORT." + + // "A receiver of an INIT with the OS value set to 0 SHOULD abort the + // association." + + // "A receiver of an INIT with the MIS value of 0 SHOULD abort the + // association." + + SendPacket(SctpPacket::Builder(VerificationTag(0), options_) + .Add(AbortChunk( + /*filled_in_verification_tag=*/false, + Parameters::Builder() + .Add(ProtocolViolationCause("INIT malformed")) + .Build()))); + InternalClose(ErrorKind::kProtocolViolation, "Received invalid INIT"); + return; + } + + if (state_ == State::kShutdownAckSent) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "If an endpoint is in the SHUTDOWN-ACK-SENT state and receives an + // INIT chunk (e.g., if the SHUTDOWN COMPLETE was lost) with source and + // destination transport addresses (either in the IP addresses or in the + // INIT chunk) that belong to this association, it should discard the INIT + // chunk and retransmit the SHUTDOWN ACK chunk." + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received Init indicating lost ShutdownComplete"; + SendShutdownAck(); + return; + } + + TieTag tie_tag(0); + if (state_ == State::kClosed) { + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received Init in closed state (normal)"; + + MakeConnectionParameters(); + } else if (state_ == State::kCookieWait || state_ == State::kCookieEchoed) { + // https://tools.ietf.org/html/rfc4960#section-5.2.1 + // "This usually indicates an initialization collision, i.e., each + // endpoint is attempting, at about the same time, to establish an + // association with the other endpoint. Upon receipt of an INIT in the + // COOKIE-WAIT state, an endpoint MUST respond with an INIT ACK using the + // same parameters it sent in its original INIT chunk (including its + // Initiate Tag, unchanged). When responding, the endpoint MUST send the + // INIT ACK back to the same address that the original INIT (sent by this + // endpoint) was sent." + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received Init indicating simultaneous connections"; + } else { + RTC_DCHECK(tcb_ != nullptr); + // https://tools.ietf.org/html/rfc4960#section-5.2.2 + // "The outbound SCTP packet containing this INIT ACK MUST carry a + // Verification Tag value equal to the Initiate Tag found in the + // unexpected INIT. And the INIT ACK MUST contain a new Initiate Tag + // (randomly generated; see Section 5.3.1). Other parameters for the + // endpoint SHOULD be copied from the existing parameters of the + // association (e.g., number of outbound streams) into the INIT ACK and + // cookie." + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received Init indicating restarted connection"; + // Create a new verification tag - different from the previous one. + for (int tries = 0; tries < 10; ++tries) { + connect_params_.verification_tag = VerificationTag( + callbacks_.GetRandomInt(kMinVerificationTag, kMaxVerificationTag)); + if (connect_params_.verification_tag != tcb_->my_verification_tag()) { + break; + } + } + + // Make the initial TSN make a large jump, so that there is no overlap + // with the old and new association. + connect_params_.initial_tsn = + TSN(*tcb_->retransmission_queue().next_tsn() + 1000000); + tie_tag = tcb_->tie_tag(); + } + + RTC_DLOG(LS_VERBOSE) + << log_prefix() + << rtc::StringFormat( + "Proceeding with connection. my_verification_tag=%08x, " + "my_initial_tsn=%u, peer_verification_tag=%08x, " + "peer_initial_tsn=%u", + *connect_params_.verification_tag, *connect_params_.initial_tsn, + *chunk->initiate_tag(), *chunk->initial_tsn()); + + Capabilities capabilities = GetCapabilities(options_, chunk->parameters()); + + SctpPacket::Builder b(chunk->initiate_tag(), options_); + Parameters::Builder params_builder = + Parameters::Builder().Add(StateCookieParameter( + StateCookie(chunk->initiate_tag(), chunk->initial_tsn(), + chunk->a_rwnd(), tie_tag, capabilities) + .Serialize())); + AddCapabilityParameters(options_, params_builder); + + InitAckChunk init_ack(/*initiate_tag=*/connect_params_.verification_tag, + options_.max_receiver_window_buffer_size, + options_.announced_maximum_outgoing_streams, + options_.announced_maximum_incoming_streams, + connect_params_.initial_tsn, params_builder.Build()); + b.Add(init_ack); + SendPacket(b); +} + +void DcSctpSocket::SendCookieEcho() { + RTC_DCHECK(tcb_ != nullptr); + SctpPacket::Builder b = tcb_->PacketBuilder(); + b.Add(*cookie_echo_chunk_); + + // https://tools.ietf.org/html/rfc4960#section-5.1 + // "The COOKIE ECHO chunk can be bundled with any pending outbound DATA + // chunks, but it MUST be the first chunk in the packet and until the COOKIE + // ACK is returned the sender MUST NOT send any other packets to the peer." + tcb_->SendBufferedPackets(b, /*only_one_packet=*/true); +} + +void DcSctpSocket::HandleInitAck( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = InitAckChunk::Parse(descriptor.data); + if (!ValidateParseSuccess(chunk)) { + return; + } + + if (state_ != State::kCookieWait) { + // https://tools.ietf.org/html/rfc4960#section-5.2.3 + // "If an INIT ACK is received by an endpoint in any state other than + // the COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk." + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received INIT_ACK in unexpected state"; + return; + } + + auto cookie = chunk->parameters().get(); + if (!cookie.has_value()) { + SendPacket(SctpPacket::Builder(connect_params_.verification_tag, options_) + .Add(AbortChunk( + /*filled_in_verification_tag=*/false, + Parameters::Builder() + .Add(ProtocolViolationCause("INIT-ACK malformed")) + .Build()))); + InternalClose(ErrorKind::kProtocolViolation, + "InitAck chunk doesn't contain a cookie"); + return; + } + Capabilities capabilities = GetCapabilities(options_, chunk->parameters()); + t1_init_->Stop(); + + tcb_ = std::make_unique( + timer_manager_, log_prefix_, options_, capabilities, callbacks_, + send_queue_, connect_params_.verification_tag, + connect_params_.initial_tsn, chunk->initiate_tag(), chunk->initial_tsn(), + chunk->a_rwnd(), MakeTieTag(callbacks_), + [this]() { return state_ == State::kEstablished; }, + [this](SctpPacket::Builder& builder) { return SendPacket(builder); }); + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Created peer TCB: " << tcb_->ToString(); + + SetState(State::kCookieEchoed, "INIT_ACK received"); + + // The connection isn't fully established just yet. + cookie_echo_chunk_ = CookieEchoChunk(cookie->data()); + SendCookieEcho(); + t1_cookie_->Start(); +} + +void DcSctpSocket::HandleCookieEcho( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = + CookieEchoChunk::Parse(descriptor.data); + if (!ValidateParseSuccess(chunk)) { + return; + } + + absl::optional cookie = + StateCookie::Deserialize(chunk->cookie()); + if (!cookie.has_value()) { + callbacks_.OnError(ErrorKind::kParseFailed, "Failed to parse state cookie"); + return; + } + + if (tcb_ != nullptr) { + if (!HandleCookieEchoWithTCB(header, *cookie)) { + return; + } + } else { + if (header.verification_tag != connect_params_.verification_tag) { + callbacks_.OnError( + ErrorKind::kParseFailed, + rtc::StringFormat( + "Received CookieEcho with invalid verification tag: %08x, " + "expected %08x", + *header.verification_tag, *connect_params_.verification_tag)); + return; + } + } + + // The init timer can be running on simultaneous connections. + t1_init_->Stop(); + t1_cookie_->Stop(); + if (state_ != State::kEstablished) { + cookie_echo_chunk_ = absl::nullopt; + SetState(State::kEstablished, "COOKIE_ECHO received"); + callbacks_.OnConnected(); + } + + if (tcb_ == nullptr) { + tcb_ = std::make_unique( + timer_manager_, log_prefix_, options_, cookie->capabilities(), + callbacks_, send_queue_, connect_params_.verification_tag, + connect_params_.initial_tsn, cookie->initiate_tag(), + cookie->initial_tsn(), cookie->a_rwnd(), MakeTieTag(callbacks_), + [this]() { return state_ == State::kEstablished; }, + [this](SctpPacket::Builder& builder) { return SendPacket(builder); }); + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Created peer TCB: " << tcb_->ToString(); + } + + SctpPacket::Builder b = tcb_->PacketBuilder(); + b.Add(CookieAckChunk()); + + // https://tools.ietf.org/html/rfc4960#section-5.1 + // "A COOKIE ACK chunk may be bundled with any pending DATA chunks (and/or + // SACK chunks), but the COOKIE ACK chunk MUST be the first chunk in the + // packet." + tcb_->SendBufferedPackets(b); +} + +bool DcSctpSocket::HandleCookieEchoWithTCB(const CommonHeader& header, + const StateCookie& cookie) { + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Handling CookieEchoChunk with TCB. local_tag=" + << *tcb_->my_verification_tag() + << ", peer_tag=" << *header.verification_tag + << ", tcb_tag=" << *tcb_->peer_verification_tag() + << ", cookie_tag=" << *cookie.initiate_tag() + << ", local_tie_tag=" << *tcb_->tie_tag() + << ", peer_tie_tag=" << *cookie.tie_tag(); + // https://tools.ietf.org/html/rfc4960#section-5.2.4 + // "Handle a COOKIE ECHO when a TCB Exists" + if (header.verification_tag != tcb_->my_verification_tag() && + tcb_->peer_verification_tag() != cookie.initiate_tag() && + cookie.tie_tag() == tcb_->tie_tag()) { + // "A) In this case, the peer may have restarted." + if (state_ == State::kShutdownAckSent) { + // "If the endpoint is in the SHUTDOWN-ACK-SENT state and recognizes + // that the peer has restarted ... it MUST NOT set up a new association + // but instead resend the SHUTDOWN ACK and send an ERROR chunk with a + // "Cookie Received While Shutting Down" error cause to its peer." + SctpPacket::Builder b(cookie.initiate_tag(), options_); + b.Add(ShutdownAckChunk()); + b.Add(ErrorChunk(Parameters::Builder() + .Add(CookieReceivedWhileShuttingDownCause()) + .Build())); + SendPacket(b); + callbacks_.OnError(ErrorKind::kWrongSequence, + "Received COOKIE-ECHO while shutting down"); + return false; + } + + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received COOKIE-ECHO indicating a restarted peer"; + + // If a message was partly sent, and the peer restarted, resend it in + // full by resetting the send queue. + send_queue_.Reset(); + tcb_ = nullptr; + callbacks_.OnConnectionRestarted(); + } else if (header.verification_tag == tcb_->my_verification_tag() && + tcb_->peer_verification_tag() != cookie.initiate_tag()) { + // TODO(boivie): Handle the peer_tag == 0? + // "B) In this case, both sides may be attempting to start an + // association at about the same time, but the peer endpoint started its + // INIT after responding to the local endpoint's INIT." + RTC_DLOG(LS_VERBOSE) + << log_prefix() + << "Received COOKIE-ECHO indicating simultaneous connections"; + tcb_ = nullptr; + } else if (header.verification_tag != tcb_->my_verification_tag() && + tcb_->peer_verification_tag() == cookie.initiate_tag() && + cookie.tie_tag() == TieTag(0)) { + // "C) In this case, the local endpoint's cookie has arrived late. + // Before it arrived, the local endpoint sent an INIT and received an + // INIT ACK and finally sent a COOKIE ECHO with the peer's same tag but + // a new tag of its own. The cookie should be silently discarded. The + // endpoint SHOULD NOT change states and should leave any timers + // running." + RTC_DLOG(LS_VERBOSE) + << log_prefix() + << "Received COOKIE-ECHO indicating a late COOKIE-ECHO. Discarding"; + return false; + } else if (header.verification_tag == tcb_->my_verification_tag() && + tcb_->peer_verification_tag() == cookie.initiate_tag()) { + // "D) When both local and remote tags match, the endpoint should enter + // the ESTABLISHED state, if it is in the COOKIE-ECHOED state. It + // should stop any cookie timer that may be running and send a COOKIE + // ACK." + RTC_DLOG(LS_VERBOSE) + << log_prefix() + << "Received duplicate COOKIE-ECHO, probably because of peer not " + "receiving COOKIE-ACK and retransmitting COOKIE-ECHO. Continuing."; + } + return true; +} + +void DcSctpSocket::HandleCookieAck( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = CookieAckChunk::Parse(descriptor.data); + if (!ValidateParseSuccess(chunk)) { + return; + } + + if (state_ != State::kCookieEchoed) { + // https://tools.ietf.org/html/rfc4960#section-5.2.5 + // "At any state other than COOKIE-ECHOED, an endpoint should silently + // discard a received COOKIE ACK chunk." + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received COOKIE_ACK not in COOKIE_ECHOED state"; + return; + } + + // RFC 4960, Errata ID: 4400 + t1_cookie_->Stop(); + cookie_echo_chunk_ = absl::nullopt; + SetState(State::kEstablished, "COOKIE_ACK received"); + tcb_->SendBufferedPackets(); + callbacks_.OnConnected(); +} + +void DcSctpSocket::DeliverReassembledMessages() { + if (tcb_->reassembly_queue().HasMessages()) { + for (auto& message : tcb_->reassembly_queue().FlushMessages()) { + callbacks_.OnMessageReceived(std::move(message)); + } + } +} + +void DcSctpSocket::HandleSack(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = SackChunk::Parse(descriptor.data); + + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + SackChunk sack = ChunkValidators::Clean(*std::move(chunk)); + + if (tcb_->retransmission_queue().HandleSack(callbacks_.TimeMillis(), + sack)) { + MaybeSendShutdownOrAck(); + // Receiving an ACK will decrease outstanding bytes (maybe now below + // cwnd?) or indicate packet loss that may result in sending FORWARD-TSN. + tcb_->SendBufferedPackets(); + } else { + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Dropping out-of-order SACK with TSN " + << *sack.cumulative_tsn_ack(); + } + } +} + +void DcSctpSocket::HandleHeartbeatRequest( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = + HeartbeatRequestChunk::Parse(descriptor.data); + + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + tcb_->heartbeat_handler().HandleHeartbeatRequest(*std::move(chunk)); + } +} + +void DcSctpSocket::HandleHeartbeatAck( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = + HeartbeatAckChunk::Parse(descriptor.data); + + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + tcb_->heartbeat_handler().HandleHeartbeatAck(*std::move(chunk)); + } +} + +void DcSctpSocket::HandleAbort(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = AbortChunk::Parse(descriptor.data); + if (ValidateParseSuccess(chunk)) { + std::string error_string = ErrorCausesToString(chunk->error_causes()); + if (tcb_ == nullptr) { + // https://tools.ietf.org/html/rfc4960#section-3.3.7 + // "If an endpoint receives an ABORT with a format error or no TCB is + // found, it MUST silently discard it." + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Received ABORT (" << error_string + << ") on a connection with no TCB. Ignoring"; + return; + } + + RTC_DLOG(LS_WARNING) << log_prefix() << "Received ABORT (" << error_string + << ") - closing connection."; + InternalClose(ErrorKind::kPeerReported, error_string); + } +} + +void DcSctpSocket::HandleError(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = ErrorChunk::Parse(descriptor.data); + if (ValidateParseSuccess(chunk)) { + std::string error_string = ErrorCausesToString(chunk->error_causes()); + if (tcb_ == nullptr) { + RTC_DLOG(LS_VERBOSE) << log_prefix() << "Received ERROR (" << error_string + << ") on a connection with no TCB. Ignoring"; + return; + } + + RTC_DLOG(LS_WARNING) << log_prefix() << "Received ERROR: " << error_string; + callbacks_.OnError(ErrorKind::kPeerReported, + "Peer reported error: " + error_string); + } +} + +void DcSctpSocket::HandleReconfig( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = ReConfigChunk::Parse(descriptor.data); + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + tcb_->stream_reset_handler().HandleReConfig(*std::move(chunk)); + } +} + +void DcSctpSocket::HandleShutdown( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + if (!ValidateParseSuccess(ShutdownChunk::Parse(descriptor.data))) { + return; + } + + if (state_ == State::kClosed) { + return; + } else if (state_ == State::kCookieWait || state_ == State::kCookieEchoed) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "If a SHUTDOWN is received in the COOKIE-WAIT or COOKIE ECHOED state, + // the SHUTDOWN chunk SHOULD be silently discarded." + } else if (state_ == State::kShutdownSent) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "If an endpoint is in the SHUTDOWN-SENT state and receives a + // SHUTDOWN chunk from its peer, the endpoint shall respond immediately + // with a SHUTDOWN ACK to its peer, and move into the SHUTDOWN-ACK-SENT + // state restarting its T2-shutdown timer." + SendShutdownAck(); + SetState(State::kShutdownAckSent, "SHUTDOWN received"); + } else if (state_ != State::kShutdownReceived) { + RTC_DLOG(LS_VERBOSE) << log_prefix() + << "Received SHUTDOWN - shutting down the socket"; + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "Upon reception of the SHUTDOWN, the peer endpoint shall enter the + // SHUTDOWN-RECEIVED state, stop accepting new data from its SCTP user, + // and verify, by checking the Cumulative TSN Ack field of the chunk, that + // all its outstanding DATA chunks have been received by the SHUTDOWN + // sender." + SetState(State::kShutdownReceived, "SHUTDOWN received"); + MaybeSendShutdownOrAck(); + } +} + +void DcSctpSocket::HandleShutdownAck( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + if (!ValidateParseSuccess(ShutdownAckChunk::Parse(descriptor.data))) { + return; + } + + if (state_ == State::kShutdownSent || state_ == State::kShutdownAckSent) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "Upon the receipt of the SHUTDOWN ACK, the SHUTDOWN sender shall stop + // the T2-shutdown timer, send a SHUTDOWN COMPLETE chunk to its peer, and + // remove all record of the association." + + // "If an endpoint is in the SHUTDOWN-ACK-SENT state and receives a + // SHUTDOWN ACK, it shall stop the T2-shutdown timer, send a SHUTDOWN + // COMPLETE chunk to its peer, and remove all record of the association." + + SctpPacket::Builder b = tcb_->PacketBuilder(); + b.Add(ShutdownCompleteChunk(/*tag_reflected=*/false)); + SendPacket(b); + InternalClose(ErrorKind::kNoError, ""); + } else { + // https://tools.ietf.org/html/rfc4960#section-8.5.1 + // "If the receiver is in COOKIE-ECHOED or COOKIE-WAIT state + // the procedures in Section 8.4 SHOULD be followed; in other words, it + // should be treated as an Out Of The Blue packet." + + // https://tools.ietf.org/html/rfc4960#section-8.4 + // "If the packet contains a SHUTDOWN ACK chunk, the receiver + // should respond to the sender of the OOTB packet with a SHUTDOWN + // COMPLETE. When sending the SHUTDOWN COMPLETE, the receiver of the OOTB + // packet must fill in the Verification Tag field of the outbound packet + // with the Verification Tag received in the SHUTDOWN ACK and set the T + // bit in the Chunk Flags to indicate that the Verification Tag is + // reflected." + + SctpPacket::Builder b(header.verification_tag, options_); + b.Add(ShutdownCompleteChunk(/*tag_reflected=*/true)); + SendPacket(b); + } +} + +void DcSctpSocket::HandleShutdownComplete( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + if (!ValidateParseSuccess(ShutdownCompleteChunk::Parse(descriptor.data))) { + return; + } + + if (state_ == State::kShutdownAckSent) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "Upon reception of the SHUTDOWN COMPLETE chunk, the endpoint will + // verify that it is in the SHUTDOWN-ACK-SENT state; if it is not, the + // chunk should be discarded. If the endpoint is in the SHUTDOWN-ACK-SENT + // state, the endpoint should stop the T2-shutdown timer and remove all + // knowledge of the association (and thus the association enters the + // CLOSED state)." + InternalClose(ErrorKind::kNoError, ""); + } +} + +void DcSctpSocket::HandleForwardTsn( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = + ForwardTsnChunk::Parse(descriptor.data); + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + HandleForwardTsnCommon(*chunk); + } +} + +void DcSctpSocket::HandleIForwardTsn( + const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor) { + absl::optional chunk = + IForwardTsnChunk::Parse(descriptor.data); + if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { + HandleForwardTsnCommon(*chunk); + } +} + +void DcSctpSocket::HandleForwardTsnCommon(const AnyForwardTsnChunk& chunk) { + if (!tcb_->capabilities().partial_reliability) { + SctpPacket::Builder b = tcb_->PacketBuilder(); + b.Add(AbortChunk(/*filled_in_verification_tag=*/true, + Parameters::Builder() + .Add(ProtocolViolationCause( + "I-FORWARD-TSN received, but not indicated " + "during connection establishment")) + .Build())); + SendPacket(b); + + callbacks_.OnError(ErrorKind::kProtocolViolation, + "Received a FORWARD_TSN without announced peer support"); + return; + } + tcb_->data_tracker().HandleForwardTsn(chunk.new_cumulative_tsn()); + tcb_->reassembly_queue().Handle(chunk); + // A forward TSN - for ordered streams - may allow messages to be + // delivered. + DeliverReassembledMessages(); + + // Processing a FORWARD_TSN might result in sending a SACK. + tcb_->MaybeSendSack(); +} + +void DcSctpSocket::MaybeSendShutdownOrAck() { + if (tcb_->retransmission_queue().outstanding_bytes() != 0) { + return; + } + + if (state_ == State::kShutdownPending) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "Once all its outstanding data has been acknowledged, the endpoint + // shall send a SHUTDOWN chunk to its peer including in the Cumulative TSN + // Ack field the last sequential TSN it has received from the peer. It + // shall then start the T2-shutdown timer and enter the SHUTDOWN-SENT + // state."" + + SendShutdown(); + t2_shutdown_->set_duration(tcb_->current_rto()); + t2_shutdown_->Start(); + SetState(State::kShutdownSent, "No more outstanding data"); + } else if (state_ == State::kShutdownReceived) { + // https://tools.ietf.org/html/rfc4960#section-9.2 + // "If the receiver of the SHUTDOWN has no more outstanding DATA + // chunks, the SHUTDOWN receiver MUST send a SHUTDOWN ACK and start a + // T2-shutdown timer of its own, entering the SHUTDOWN-ACK-SENT state. If + // the timer expires, the endpoint must resend the SHUTDOWN ACK." + + SendShutdownAck(); + SetState(State::kShutdownAckSent, "No more outstanding data"); + } +} + +void DcSctpSocket::SendShutdown() { + SctpPacket::Builder b = tcb_->PacketBuilder(); + b.Add(ShutdownChunk(tcb_->data_tracker().last_cumulative_acked_tsn())); + SendPacket(b); +} + +void DcSctpSocket::SendShutdownAck() { + SendPacket(tcb_->PacketBuilder().Add(ShutdownAckChunk())); + t2_shutdown_->set_duration(tcb_->current_rto()); + t2_shutdown_->Start(); +} + +} // namespace dcsctp diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h new file mode 100644 index 0000000000..271e82e42e --- /dev/null +++ b/net/dcsctp/socket/dcsctp_socket.h @@ -0,0 +1,275 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_SOCKET_DCSCTP_SOCKET_H_ +#define NET_DCSCTP_SOCKET_DCSCTP_SOCKET_H_ + +#include +#include +#include +#include + +#include "absl/strings/string_view.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/chunk/abort_chunk.h" +#include "net/dcsctp/packet/chunk/chunk.h" +#include "net/dcsctp/packet/chunk/cookie_ack_chunk.h" +#include "net/dcsctp/packet/chunk/cookie_echo_chunk.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/data_common.h" +#include "net/dcsctp/packet/chunk/error_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/chunk/heartbeat_ack_chunk.h" +#include "net/dcsctp/packet/chunk/heartbeat_request_chunk.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" +#include "net/dcsctp/packet/chunk/init_ack_chunk.h" +#include "net/dcsctp/packet/chunk/init_chunk.h" +#include "net/dcsctp/packet/chunk/reconfig_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/chunk/shutdown_ack_chunk.h" +#include "net/dcsctp/packet/chunk/shutdown_chunk.h" +#include "net/dcsctp/packet/chunk/shutdown_complete_chunk.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/packet/sctp_packet.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/packet_observer.h" +#include "net/dcsctp/rx/data_tracker.h" +#include "net/dcsctp/rx/reassembly_queue.h" +#include "net/dcsctp/socket/callback_deferrer.h" +#include "net/dcsctp/socket/state_cookie.h" +#include "net/dcsctp/socket/transmission_control_block.h" +#include "net/dcsctp/timer/timer.h" +#include "net/dcsctp/tx/fcfs_send_queue.h" +#include "net/dcsctp/tx/retransmission_error_counter.h" +#include "net/dcsctp/tx/retransmission_queue.h" +#include "net/dcsctp/tx/retransmission_timeout.h" + +namespace dcsctp { + +// DcSctpSocket represents a single SCTP socket, to be used over DTLS. +// +// Every dcSCTP is completely isolated from any other socket. +// +// This class manages all packet and chunk dispatching and mainly handles the +// connection sequences (connect, close, shutdown, etc) as well as managing +// the Transmission Control Block (tcb). +// +// This class is thread-compatible. +class DcSctpSocket : public DcSctpSocketInterface { + public: + // Instantiates a DcSctpSocket, which interacts with the world through the + // `callbacks` interface and is configured using `options`. + // + // For debugging, `log_prefix` will prefix all debug logs, and a + // `packet_observer` can be attached to e.g. dump sent and received packets. + DcSctpSocket(absl::string_view log_prefix, + DcSctpSocketCallbacks& callbacks, + std::unique_ptr packet_observer, + const DcSctpOptions& options); + + DcSctpSocket(const DcSctpSocket&) = delete; + DcSctpSocket& operator=(const DcSctpSocket&) = delete; + + // Implementation of `DcSctpSocketInterface`. + void ReceivePacket(rtc::ArrayView data) override; + void HandleTimeout(TimeoutID timeout_id) override; + void Connect() override; + void Shutdown() override; + void Close() override; + SendStatus Send(DcSctpMessage message, + const SendOptions& send_options) override; + ResetStreamsStatus ResetStreams( + rtc::ArrayView outgoing_streams) override; + SocketState state() const override; + const DcSctpOptions& options() const override { return options_; } + + // Returns this socket's verification tag, or zero if not yet connected. + VerificationTag verification_tag() const { + return tcb_ != nullptr ? tcb_->my_verification_tag() : VerificationTag(0); + } + + private: + // Parameter proposals valid during the connect phase. + struct ConnectParameters { + TSN initial_tsn = TSN(0); + VerificationTag verification_tag = VerificationTag(0); + }; + + // Detailed state (separate from SocketState, which is the public state). + enum class State { + kClosed, + kCookieWait, + // TCB valid in these: + kCookieEchoed, + kEstablished, + kShutdownPending, + kShutdownSent, + kShutdownReceived, + kShutdownAckSent, + }; + + // Returns the log prefix used for debug logging. + std::string log_prefix() const; + + bool IsConsistent() const; + static constexpr absl::string_view ToString(DcSctpSocket::State state); + + // Changes the socket state, given a `reason` (for debugging/logging). + void SetState(State state, absl::string_view reason); + // Fills in `connect_params` with random verification tag and initial TSN. + void MakeConnectionParameters(); + // Closes the association. Note that the TCB will not be valid past this call. + void InternalClose(ErrorKind error, absl::string_view message); + // Closes the association, because of too many retransmission errors. + void CloseConnectionBecauseOfTooManyTransmissionErrors(); + // Timer expiration handlers + absl::optional OnInitTimerExpiry(); + absl::optional OnCookieTimerExpiry(); + absl::optional OnShutdownTimerExpiry(); + // Builds the packet from `builder` and sends it (through callbacks). + void SendPacket(SctpPacket::Builder& builder); + // Sends SHUTDOWN or SHUTDOWN-ACK if the socket is shutting down and if all + // outstanding data has been acknowledged. + void MaybeSendShutdownOrAck(); + // If the socket is shutting down, responds SHUTDOWN to any incoming DATA. + void MaybeSendShutdownOnPacketReceived(const SctpPacket& packet); + // Sends a INIT chunk. + void SendInit(); + // Sends a CookieEcho chunk. + void SendCookieEcho(); + // Sends a SHUTDOWN chunk. + void SendShutdown(); + // Sends a SHUTDOWN-ACK chunk. + void SendShutdownAck(); + // Validates the SCTP packet, as a whole - not the validity of individual + // chunks within it, as that's done in the different chunk handlers. + bool ValidatePacket(const SctpPacket& packet); + // Parses `payload`, which is a serialized packet that is just going to be + // sent and prints all chunks. + void DebugPrintOutgoing(rtc::ArrayView payload); + // Called whenever there may be reassembled messages, and delivers those. + void DeliverReassembledMessages(); + // Returns true if there is a TCB, and false otherwise (and reports an error). + bool ValidateHasTCB(); + + // Returns true if the parsing of a chunk of type `T` succeeded. If it didn't, + // it reports an error and returns false. + template + bool ValidateParseSuccess(const absl::optional& c) { + if (c.has_value()) { + return true; + } + + ReportFailedToParseChunk(T::kType); + return false; + } + + // Reports failing to have parsed a chunk with the provided `chunk_type`. + void ReportFailedToParseChunk(int chunk_type); + // Called when unknown chunks are received. May report an error. + bool HandleUnrecognizedChunk(const SctpPacket::ChunkDescriptor& descriptor); + + // Will dispatch more specific chunk handlers. + bool Dispatch(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming DATA chunks. + void HandleData(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming I-DATA chunks. + void HandleIData(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Common handler for DATA and I-DATA chunks. + void HandleDataCommon(AnyDataChunk& chunk); + // Handles incoming INIT chunks. + void HandleInit(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming INIT-ACK chunks. + void HandleInitAck(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming SACK chunks. + void HandleSack(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming HEARTBEAT chunks. + void HandleHeartbeatRequest(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming HEARTBEAT-ACK chunks. + void HandleHeartbeatAck(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming ABORT chunks. + void HandleAbort(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming ERROR chunks. + void HandleError(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming COOKIE-ECHO chunks. + void HandleCookieEcho(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles receiving COOKIE-ECHO when there already is a TCB. The return value + // indicates if the processing should continue. + bool HandleCookieEchoWithTCB(const CommonHeader& header, + const StateCookie& cookie); + // Handles incoming COOKIE-ACK chunks. + void HandleCookieAck(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming SHUTDOWN chunks. + void HandleShutdown(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming SHUTDOWN-ACK chunks. + void HandleShutdownAck(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming FORWARD-TSN chunks. + void HandleForwardTsn(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming I-FORWARD-TSN chunks. + void HandleIForwardTsn(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Handles incoming RE-CONFIG chunks. + void HandleReconfig(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + // Common handled for FORWARD-TSN/I-FORWARD-TSN. + void HandleForwardTsnCommon(const AnyForwardTsnChunk& chunk); + // Handles incoming SHUTDOWN-COMPLETE chunks + void HandleShutdownComplete(const CommonHeader& header, + const SctpPacket::ChunkDescriptor& descriptor); + + const std::string log_prefix_; + const std::unique_ptr packet_observer_; + const DcSctpOptions options_; + + // Enqueues callbacks and dispatches them just before returning to the caller. + CallbackDeferrer callbacks_; + + TimerManager timer_manager_; + const std::unique_ptr t1_init_; + const std::unique_ptr t1_cookie_; + const std::unique_ptr t2_shutdown_; + + // The actual SendQueue implementation. As data can be sent on a socket before + // the connection is established, this component is not in the TCB. + FCFSSendQueue send_queue_; + + // Only valid when state == State::kCookieEchoed + // A cached Cookie Echo Chunk, to be re-sent on timer expiry. + absl::optional cookie_echo_chunk_ = absl::nullopt; + + // Contains verification tag and initial TSN between having sent the INIT + // until the connection is established (there is no TCB at this point). + ConnectParameters connect_params_; + // The socket state. + State state_ = State::kClosed; + // If the connection is established, contains a transmission control block. + std::unique_ptr tcb_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_SOCKET_DCSCTP_SOCKET_H_ diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc new file mode 100644 index 0000000000..1848448587 --- /dev/null +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -0,0 +1,1089 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/socket/dcsctp_socket.h" + +#include +#include +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/chunk/chunk.h" +#include "net/dcsctp/packet/chunk/cookie_echo_chunk.h" +#include "net/dcsctp/packet/chunk/data_chunk.h" +#include "net/dcsctp/packet/chunk/data_common.h" +#include "net/dcsctp/packet/chunk/error_chunk.h" +#include "net/dcsctp/packet/chunk/heartbeat_ack_chunk.h" +#include "net/dcsctp/packet/chunk/heartbeat_request_chunk.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/chunk/init_chunk.h" +#include "net/dcsctp/packet/chunk/sack_chunk.h" +#include "net/dcsctp/packet/error_cause/error_cause.h" +#include "net/dcsctp/packet/error_cause/unrecognized_chunk_type_cause.h" +#include "net/dcsctp/packet/parameter/heartbeat_info_parameter.h" +#include "net/dcsctp/packet/parameter/parameter.h" +#include "net/dcsctp/packet/sctp_packet.h" +#include "net/dcsctp/packet/tlv_trait.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_options.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/rx/reassembly_queue.h" +#include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h" +#include "net/dcsctp/testing/testing_macros.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::_; +using ::testing::AllOf; +using ::testing::ElementsAre; +using ::testing::HasSubstr; +using ::testing::IsEmpty; +using ::testing::SizeIs; + +constexpr SendOptions kSendOptions; + +MATCHER_P(HasDataChunkWithSsn, ssn, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != DataChunk::kType) { + *result_listener << "the first chunk in the packet is not a data chunk"; + return false; + } + + absl::optional dc = + DataChunk::Parse(packet->descriptors()[0].data); + if (!dc.has_value()) { + *result_listener << "The first chunk didn't parse as a data chunk"; + return false; + } + + if (dc->ssn() != ssn) { + *result_listener << "the ssn is " << *dc->ssn(); + return false; + } + + return true; +} + +MATCHER_P(HasDataChunkWithMid, mid, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != IDataChunk::kType) { + *result_listener << "the first chunk in the packet is not an i-data chunk"; + return false; + } + + absl::optional dc = + IDataChunk::Parse(packet->descriptors()[0].data); + if (!dc.has_value()) { + *result_listener << "The first chunk didn't parse as an i-data chunk"; + return false; + } + + if (dc->message_id() != mid) { + *result_listener << "the mid is " << *dc->message_id(); + return false; + } + + return true; +} + +MATCHER_P(HasSackWithCumAckTsn, tsn, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != SackChunk::kType) { + *result_listener << "the first chunk in the packet is not a data chunk"; + return false; + } + + absl::optional sc = + SackChunk::Parse(packet->descriptors()[0].data); + if (!sc.has_value()) { + *result_listener << "The first chunk didn't parse as a data chunk"; + return false; + } + + if (sc->cumulative_tsn_ack() != tsn) { + *result_listener << "the cum_ack_tsn is " << *sc->cumulative_tsn_ack(); + return false; + } + + return true; +} + +MATCHER(HasSackWithNoGapAckBlocks, "") { + absl::optional packet = SctpPacket::Parse(arg); + if (!packet.has_value()) { + *result_listener << "data didn't parse as an SctpPacket"; + return false; + } + + if (packet->descriptors()[0].type != SackChunk::kType) { + *result_listener << "the first chunk in the packet is not a data chunk"; + return false; + } + + absl::optional sc = + SackChunk::Parse(packet->descriptors()[0].data); + if (!sc.has_value()) { + *result_listener << "The first chunk didn't parse as a data chunk"; + return false; + } + + if (!sc->gap_ack_blocks().empty()) { + *result_listener << "there are gap ack blocks"; + return false; + } + + return true; +} + +TSN AddTo(TSN tsn, int delta) { + return TSN(*tsn + delta); +} + +DcSctpOptions MakeOptionsForTest(bool enable_message_interleaving) { + DcSctpOptions options; + // To make the interval more predictable in tests. + options.heartbeat_interval_include_rtt = false; + options.enable_message_interleaving = enable_message_interleaving; + return options; +} + +class DcSctpSocketTest : public testing::Test { + protected: + explicit DcSctpSocketTest(bool enable_message_interleaving = false) + : options_(MakeOptionsForTest(enable_message_interleaving)), + sock_a_("A", cb_a_, nullptr, options_), + sock_z_("Z", cb_z_, nullptr, options_) {} + + void AdvanceTime(DurationMs duration) { + cb_a_.AdvanceTime(duration); + cb_z_.AdvanceTime(duration); + } + + static void ExchangeMessages(DcSctpSocket& sock_a, + MockDcSctpSocketCallbacks& cb_a, + DcSctpSocket& sock_z, + MockDcSctpSocketCallbacks& cb_z) { + bool delivered_packet = false; + do { + delivered_packet = false; + std::vector packet_from_a = cb_a.ConsumeSentPacket(); + if (!packet_from_a.empty()) { + delivered_packet = true; + sock_z.ReceivePacket(std::move(packet_from_a)); + } + std::vector packet_from_z = cb_z.ConsumeSentPacket(); + if (!packet_from_z.empty()) { + delivered_packet = true; + sock_a.ReceivePacket(std::move(packet_from_z)); + } + } while (delivered_packet); + } + + void RunTimers() { + for (const auto timeout_id : cb_a_.RunTimers()) { + sock_a_.HandleTimeout(timeout_id); + } + for (const auto timeout_id : cb_z_.RunTimers()) { + sock_z_.HandleTimeout(timeout_id); + } + } + + const DcSctpOptions options_; + testing::NiceMock cb_a_; + testing::NiceMock cb_z_; + DcSctpSocket sock_a_; + DcSctpSocket sock_z_; +}; + +TEST_F(DcSctpSocketTest, EstablishConnection) { + EXPECT_CALL(cb_a_, OnConnected).Times(1); + EXPECT_CALL(cb_z_, OnConnected).Times(1); + EXPECT_CALL(cb_a_, OnConnectionRestarted).Times(0); + EXPECT_CALL(cb_z_, OnConnectionRestarted).Times(0); + + sock_a_.Connect(); + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads COOKIE_ACK. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); +} + +TEST_F(DcSctpSocketTest, EstablishConnectionWithSetupCollision) { + EXPECT_CALL(cb_a_, OnConnected).Times(1); + EXPECT_CALL(cb_z_, OnConnected).Times(1); + EXPECT_CALL(cb_a_, OnConnectionRestarted).Times(0); + EXPECT_CALL(cb_z_, OnConnectionRestarted).Times(0); + sock_a_.Connect(); + sock_z_.Connect(); + + ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); +} + +TEST_F(DcSctpSocketTest, EstablishSimultaneousConnection) { + EXPECT_CALL(cb_a_, OnConnected).Times(1); + EXPECT_CALL(cb_z_, OnConnected).Times(1); + EXPECT_CALL(cb_a_, OnConnectionRestarted).Times(0); + EXPECT_CALL(cb_z_, OnConnectionRestarted).Times(0); + sock_a_.Connect(); + + // INIT isn't received by Z, as it wasn't ready yet. + cb_a_.ConsumeSentPacket(); + + sock_z_.Connect(); + + // A reads INIT, produces INIT_ACK + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + // Z reads INIT_ACK, sends COOKIE_ECHO + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + // A reads COOKIE_ECHO - establishes connection. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + + // Proceed with the remaining packets. + ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); +} + +TEST_F(DcSctpSocketTest, EstablishConnectionLostCookieAck) { + EXPECT_CALL(cb_a_, OnConnected).Times(1); + EXPECT_CALL(cb_z_, OnConnected).Times(1); + EXPECT_CALL(cb_a_, OnConnectionRestarted).Times(0); + EXPECT_CALL(cb_z_, OnConnectionRestarted).Times(0); + + sock_a_.Connect(); + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // COOKIE_ACK is lost. + cb_z_.ConsumeSentPacket(); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnecting); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + // This will make A re-send the COOKIE_ECHO + AdvanceTime(DurationMs(options_.t1_cookie_timeout)); + RunTimers(); + + // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads COOKIE_ACK. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); +} + +TEST_F(DcSctpSocketTest, ResendInitAndEstablishConnection) { + sock_a_.Connect(); + // INIT is never received by Z. + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket init_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(init_packet.descriptors()[0].type, InitChunk::kType); + + AdvanceTime(options_.t1_init_timeout); + RunTimers(); + + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads COOKIE_ACK. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); +} + +TEST_F(DcSctpSocketTest, ResendingInitTooManyTimesAborts) { + sock_a_.Connect(); + + // INIT is never received by Z. + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket init_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(init_packet.descriptors()[0].type, InitChunk::kType); + + for (int i = 0; i < options_.max_init_retransmits; ++i) { + AdvanceTime(options_.t1_init_timeout * (1 << i)); + RunTimers(); + + // INIT is resent + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket resent_init_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(resent_init_packet.descriptors()[0].type, InitChunk::kType); + } + + // Another timeout, after the max init retransmits. + AdvanceTime(options_.t1_init_timeout * (1 << options_.max_init_retransmits)); + EXPECT_CALL(cb_a_, OnAborted).Times(1); + RunTimers(); + + EXPECT_EQ(sock_a_.state(), SocketState::kClosed); +} + +TEST_F(DcSctpSocketTest, ResendCookieEchoAndEstablishConnection) { + sock_a_.Connect(); + + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + // COOKIE_ECHO is never received by Z. + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket init_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(init_packet.descriptors()[0].type, CookieEchoChunk::kType); + + AdvanceTime(options_.t1_init_timeout); + RunTimers(); + + // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads COOKIE_ACK. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); +} + +TEST_F(DcSctpSocketTest, ResendingCookieEchoTooManyTimesAborts) { + sock_a_.Connect(); + + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + // COOKIE_ECHO is never received by Z. + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket init_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(init_packet.descriptors()[0].type, CookieEchoChunk::kType); + + for (int i = 0; i < options_.max_init_retransmits; ++i) { + AdvanceTime(options_.t1_cookie_timeout * (1 << i)); + RunTimers(); + + // COOKIE_ECHO is resent + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket resent_init_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(resent_init_packet.descriptors()[0].type, CookieEchoChunk::kType); + } + + // Another timeout, after the max init retransmits. + AdvanceTime(options_.t1_cookie_timeout * + (1 << options_.max_init_retransmits)); + EXPECT_CALL(cb_a_, OnAborted).Times(1); + RunTimers(); + + EXPECT_EQ(sock_a_.state(), SocketState::kClosed); +} + +TEST_F(DcSctpSocketTest, ShutdownConnection) { + sock_a_.Connect(); + + ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_); + + RTC_LOG(LS_INFO) << "Shutting down"; + + sock_a_.Shutdown(); + // Z reads SHUTDOWN, produces SHUTDOWN_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // A reads SHUTDOWN_ACK, produces SHUTDOWN_COMPLETE + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + // Z reads SHUTDOWN_COMPLETE. + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kClosed); + EXPECT_EQ(sock_z_.state(), SocketState::kClosed); +} + +TEST_F(DcSctpSocketTest, EstablishConnectionWhileSendingData) { + sock_a_.Connect(); + + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), kSendOptions); + + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + // // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // // A reads COOKIE_ACK. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + absl::optional msg = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg.has_value()); + EXPECT_EQ(msg->stream_id(), StreamID(1)); +} + +TEST_F(DcSctpSocketTest, SendMessageAfterEstablished) { + sock_a_.Connect(); + + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), kSendOptions); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + absl::optional msg = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg.has_value()); + EXPECT_EQ(msg->stream_id(), StreamID(1)); +} + +TEST_F(DcSctpSocketTest, TimeoutResendsPacket) { + sock_a_.Connect(); + + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + // // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // // A reads COOKIE_ACK. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), kSendOptions); + cb_a_.ConsumeSentPacket(); + + RTC_LOG(LS_INFO) << "Advancing time"; + AdvanceTime(options_.rto_initial); + RunTimers(); + + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + absl::optional msg = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg.has_value()); + EXPECT_EQ(msg->stream_id(), StreamID(1)); +} + +TEST_F(DcSctpSocketTest, SendALotOfBytesMissedSecondPacket) { + sock_a_.Connect(); + + // Z reads INIT, produces INIT_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // // A reads INIT_ACK, produces COOKIE_ECHO + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + // // Z reads COOKIE_ECHO, produces COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // // A reads COOKIE_ACK. + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + std::vector payload(options_.mtu * 10); + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), kSendOptions); + + // First DATA + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // Second DATA (lost) + cb_a_.ConsumeSentPacket(); + + // Retransmit and handle the rest + ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_); + + absl::optional msg = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg.has_value()); + EXPECT_EQ(msg->stream_id(), StreamID(1)); + EXPECT_THAT(msg->payload(), testing::ElementsAreArray(payload)); +} + +TEST_F(DcSctpSocketTest, SendingHeartbeatAnswersWithAck) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + // Inject a HEARTBEAT chunk + SctpPacket::Builder b(sock_a_.verification_tag(), DcSctpOptions()); + uint8_t info[] = {1, 2, 3, 4}; + Parameters::Builder params_builder; + params_builder.Add(HeartbeatInfoParameter(info)); + b.Add(HeartbeatRequestChunk(params_builder.Build())); + sock_a_.ReceivePacket(b.Build()); + + // HEARTBEAT_ACK is sent as a reply. Capture it. + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket ack_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + ASSERT_THAT(ack_packet.descriptors(), SizeIs(1)); + ASSERT_HAS_VALUE_AND_ASSIGN( + HeartbeatAckChunk ack, + HeartbeatAckChunk::Parse(ack_packet.descriptors()[0].data)); + ASSERT_HAS_VALUE_AND_ASSIGN(HeartbeatInfoParameter info_param, ack.info()); + EXPECT_THAT(info_param.info(), ElementsAre(1, 2, 3, 4)); +} + +TEST_F(DcSctpSocketTest, ExpectHeartbeatToBeSent) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + EXPECT_THAT(cb_a_.ConsumeSentPacket(), IsEmpty()); + + AdvanceTime(options_.heartbeat_interval); + RunTimers(); + + std::vector hb_packet_raw = cb_a_.ConsumeSentPacket(); + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket hb_packet, + SctpPacket::Parse(hb_packet_raw)); + ASSERT_THAT(hb_packet.descriptors(), SizeIs(1)); + ASSERT_HAS_VALUE_AND_ASSIGN( + HeartbeatRequestChunk hb, + HeartbeatRequestChunk::Parse(hb_packet.descriptors()[0].data)); + ASSERT_HAS_VALUE_AND_ASSIGN(HeartbeatInfoParameter info_param, hb.info()); + + // The info is a single 64-bit number. + EXPECT_THAT(hb.info()->info(), SizeIs(8)); + + // Feed it to Sock-z and expect a HEARTBEAT_ACK that will be propagated back. + sock_z_.ReceivePacket(hb_packet_raw); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); +} + +TEST_F(DcSctpSocketTest, CloseConnectionAfterTooManyLostHeartbeats) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + EXPECT_THAT(cb_a_.ConsumeSentPacket(), testing::IsEmpty()); + // Force-close socket Z so that it doesn't interfere from now on. + sock_z_.Close(); + + DurationMs time_to_next_hearbeat = options_.heartbeat_interval; + + for (int i = 0; i < options_.max_retransmissions; ++i) { + RTC_LOG(LS_INFO) << "Letting HEARTBEAT interval timer expire - sending..."; + AdvanceTime(time_to_next_hearbeat); + RunTimers(); + + // Dropping every heartbeat. + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket hb_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(hb_packet.descriptors()[0].type, HeartbeatRequestChunk::kType); + + RTC_LOG(LS_INFO) << "Letting the heartbeat expire."; + AdvanceTime(DurationMs(1000)); + RunTimers(); + + time_to_next_hearbeat = options_.heartbeat_interval - DurationMs(1000); + } + + RTC_LOG(LS_INFO) << "Letting HEARTBEAT interval timer expire - sending..."; + AdvanceTime(time_to_next_hearbeat); + RunTimers(); + + // Last heartbeat + EXPECT_THAT(cb_a_.ConsumeSentPacket(), Not(IsEmpty())); + + EXPECT_CALL(cb_a_, OnAborted).Times(1); + // Should suffice as exceeding RTO + AdvanceTime(DurationMs(1000)); + RunTimers(); +} + +TEST_F(DcSctpSocketTest, RecoversAfterASuccessfulAck) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + EXPECT_THAT(cb_a_.ConsumeSentPacket(), testing::IsEmpty()); + // Force-close socket Z so that it doesn't interfere from now on. + sock_z_.Close(); + + DurationMs time_to_next_hearbeat = options_.heartbeat_interval; + + for (int i = 0; i < options_.max_retransmissions; ++i) { + AdvanceTime(time_to_next_hearbeat); + RunTimers(); + + // Dropping every heartbeat. + cb_a_.ConsumeSentPacket(); + + RTC_LOG(LS_INFO) << "Letting the heartbeat expire."; + AdvanceTime(DurationMs(1000)); + RunTimers(); + + time_to_next_hearbeat = options_.heartbeat_interval - DurationMs(1000); + } + + RTC_LOG(LS_INFO) << "Getting the last heartbeat - and acking it"; + AdvanceTime(time_to_next_hearbeat); + RunTimers(); + + std::vector hb_packet_raw = cb_a_.ConsumeSentPacket(); + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket hb_packet, + SctpPacket::Parse(hb_packet_raw)); + ASSERT_THAT(hb_packet.descriptors(), SizeIs(1)); + ASSERT_HAS_VALUE_AND_ASSIGN( + HeartbeatRequestChunk hb, + HeartbeatRequestChunk::Parse(hb_packet.descriptors()[0].data)); + + SctpPacket::Builder b(sock_a_.verification_tag(), options_); + b.Add(HeartbeatAckChunk(std::move(hb).extract_parameters())); + sock_a_.ReceivePacket(b.Build()); + + // Should suffice as exceeding RTO - which will not fire. + EXPECT_CALL(cb_a_, OnAborted).Times(0); + AdvanceTime(DurationMs(1000)); + RunTimers(); + EXPECT_THAT(cb_a_.ConsumeSentPacket(), IsEmpty()); + + // Verify that we get new heartbeats again. + RTC_LOG(LS_INFO) << "Expecting a new heartbeat"; + AdvanceTime(time_to_next_hearbeat); + RunTimers(); + + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket another_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + EXPECT_EQ(another_packet.descriptors()[0].type, HeartbeatRequestChunk::kType); +} + +TEST_F(DcSctpSocketTest, ResetStream) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), {}); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + absl::optional msg = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg.has_value()); + EXPECT_EQ(msg->stream_id(), StreamID(1)); + + // Handle SACK + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + // Reset the outgoing stream. This will directly send a RE-CONFIG. + sock_a_.ResetStreams(std::vector({StreamID(1)})); + + // Receiving the packet will trigger a callback, indicating that A has + // reset its stream. It will also send a RE-CONFIG with a response. + EXPECT_CALL(cb_z_, OnIncomingStreamsReset).Times(1); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + // Receiving a response will trigger a callback. Streams are now reset. + EXPECT_CALL(cb_a_, OnStreamsResetPerformed).Times(1); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); +} + +TEST_F(DcSctpSocketTest, ResetStreamWillMakeChunksStartAtZeroSsn) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + std::vector payload(options_.mtu - 100); + + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {}); + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {}); + + auto packet1 = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet1, HasDataChunkWithSsn(SSN(0))); + sock_z_.ReceivePacket(packet1); + + auto packet2 = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet2, HasDataChunkWithSsn(SSN(1))); + sock_z_.ReceivePacket(packet2); + + // Handle SACK + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + absl::optional msg1 = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg1.has_value()); + EXPECT_EQ(msg1->stream_id(), StreamID(1)); + + absl::optional msg2 = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg2.has_value()); + EXPECT_EQ(msg2->stream_id(), StreamID(1)); + + // Reset the outgoing stream. This will directly send a RE-CONFIG. + sock_a_.ResetStreams(std::vector({StreamID(1)})); + // RE-CONFIG, req + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // RE-CONFIG, resp + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {}); + + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {}); + + auto packet3 = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet3, HasDataChunkWithSsn(SSN(0))); + sock_z_.ReceivePacket(packet3); + + auto packet4 = cb_a_.ConsumeSentPacket(); + EXPECT_THAT(packet4, HasDataChunkWithSsn(SSN(1))); + sock_z_.ReceivePacket(packet4); + + // Handle SACK + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); +} + +TEST_F(DcSctpSocketTest, OnePeerReconnects) { + EXPECT_CALL(cb_a_, OnConnectionRestarted).Times(1); + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + // Let's be evil here - reconnect while a fragmented packet was about to be + // sent. The receiving side should get it in full. + std::vector payload(options_.mtu * 10); + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), kSendOptions); + + // First DATA + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + // Create a new association, z2 - and don't use z anymore. + testing::NiceMock cb_z2; + DcSctpSocket sock_z2("Z2", cb_z2, nullptr, options_); + + sock_z2.Connect(); + + // Retransmit and handle the rest. As there will be some chunks in-flight that + // have the wrong verification tag, those will yield errors. + ExchangeMessages(sock_a_, cb_a_, sock_z2, cb_z2); + + absl::optional msg = cb_z2.ConsumeReceivedMessage(); + ASSERT_TRUE(msg.has_value()); + EXPECT_EQ(msg->stream_id(), StreamID(1)); + EXPECT_THAT(msg->payload(), testing::ElementsAreArray(payload)); +} + +TEST_F(DcSctpSocketTest, SendMessageWithLimitedRtx) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + SendOptions send_options; + send_options.max_retransmissions = 0; + std::vector payload(options_.mtu - 100); + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(51), payload), send_options); + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(52), payload), send_options); + sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), send_options); + + // First DATA + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + // Second DATA (lost) + cb_a_.ConsumeSentPacket(); + // Third DATA + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + // Handle SACK + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + // Now the missing data chunk will be marked as nacked, but it might still be + // in-flight and the reported gap could be due to out-of-order delivery. So + // the RetransmissionQueue will not mark it as "to be retransmitted" until + // after the t3-rtx timer has expired. + AdvanceTime(options_.rto_initial); + RunTimers(); + + // The chunk will be marked as retransmitted, and then as abandoned, which + // will trigger a FORWARD-TSN to be sent. + + // FORWARD-TSN (third) + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + + // The receiver might have moved into delayed ack mode. + AdvanceTime(options_.rto_initial); + RunTimers(); + + // Handle SACK + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + absl::optional msg1 = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg1.has_value()); + EXPECT_EQ(msg1->ppid(), PPID(51)); + + absl::optional msg2 = cb_z_.ConsumeReceivedMessage(); + ASSERT_TRUE(msg2.has_value()); + EXPECT_EQ(msg2->ppid(), PPID(53)); + + absl::optional msg3 = cb_z_.ConsumeReceivedMessage(); + EXPECT_FALSE(msg3.has_value()); +} + +struct FakeChunkConfig : ChunkConfig { + static constexpr int kType = 0x49; + static constexpr size_t kHeaderSize = 4; + static constexpr int kVariableLengthAlignment = 0; +}; + +class FakeChunk : public Chunk, public TLVTrait { + public: + FakeChunk() {} + + FakeChunk(FakeChunk&& other) = default; + FakeChunk& operator=(FakeChunk&& other) = default; + + void SerializeTo(std::vector& out) const override { + AllocateTLV(out); + } + std::string ToString() const override { return "FAKE"; } +}; + +TEST_F(DcSctpSocketTest, ReceivingUnknownChunkRespondsWithError) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + // Inject a FAKE chunk + SctpPacket::Builder b(sock_a_.verification_tag(), DcSctpOptions()); + b.Add(FakeChunk()); + sock_a_.ReceivePacket(b.Build()); + + // ERROR is sent as a reply. Capture it. + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket reply_packet, + SctpPacket::Parse(cb_a_.ConsumeSentPacket())); + ASSERT_THAT(reply_packet.descriptors(), SizeIs(1)); + ASSERT_HAS_VALUE_AND_ASSIGN( + ErrorChunk error, ErrorChunk::Parse(reply_packet.descriptors()[0].data)); + ASSERT_HAS_VALUE_AND_ASSIGN( + UnrecognizedChunkTypeCause cause, + error.error_causes().get()); + EXPECT_THAT(cause.unrecognized_chunk(), ElementsAre(0x49, 0x00, 0x00, 0x04)); +} + +TEST_F(DcSctpSocketTest, ReceivingErrorChunkReportsAsCallback) { + sock_a_.Connect(); + // Z reads INIT, INIT_ACK, COOKIE_ECHO, COOKIE_ACK + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); + + EXPECT_EQ(sock_a_.state(), SocketState::kConnected); + EXPECT_EQ(sock_z_.state(), SocketState::kConnected); + + // Inject a ERROR chunk + SctpPacket::Builder b(sock_a_.verification_tag(), DcSctpOptions()); + b.Add( + ErrorChunk(Parameters::Builder() + .Add(UnrecognizedChunkTypeCause({0x49, 0x00, 0x00, 0x04})) + .Build())); + + EXPECT_CALL(cb_a_, OnError(ErrorKind::kPeerReported, + HasSubstr("Unrecognized Chunk Type"))); + sock_a_.ReceivePacket(b.Build()); +} + +TEST_F(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { + // Create a new association, z2 - and don't use z anymore. + testing::NiceMock cb_z2; + DcSctpOptions options = options_; + options.max_receiver_window_buffer_size = 100; + DcSctpSocket sock_z2("Z2", cb_z2, nullptr, options); + + EXPECT_CALL(cb_z2, OnClosed).Times(0); + EXPECT_CALL(cb_z2, OnAborted).Times(0); + + sock_a_.Connect(); + std::vector init_data = cb_a_.ConsumeSentPacket(); + ASSERT_HAS_VALUE_AND_ASSIGN(SctpPacket init_packet, + SctpPacket::Parse(init_data)); + ASSERT_HAS_VALUE_AND_ASSIGN( + InitChunk init_chunk, + InitChunk::Parse(init_packet.descriptors()[0].data)); + sock_z2.ReceivePacket(init_data); + sock_a_.ReceivePacket(cb_z2.ConsumeSentPacket()); + sock_z2.ReceivePacket(cb_a_.ConsumeSentPacket()); + sock_a_.ReceivePacket(cb_z2.ConsumeSentPacket()); + + // Fill up Z2 to the high watermark limit. + TSN tsn = init_chunk.initial_tsn(); + AnyDataChunk::Options opts; + opts.is_beginning = Data::IsBeginning(true); + sock_z2.ReceivePacket( + SctpPacket::Builder(sock_z2.verification_tag(), options) + .Add(DataChunk(tsn, StreamID(1), SSN(0), PPID(53), + std::vector( + 100 * ReassemblyQueue::kHighWatermarkLimit + 1), + opts)) + .Build()); + + // First DATA will always trigger a SACK. It's not interesting. + EXPECT_THAT(cb_z2.ConsumeSentPacket(), + AllOf(HasSackWithCumAckTsn(tsn), HasSackWithNoGapAckBlocks())); + + // This DATA should be accepted - it's advancing cum ack tsn. + sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options) + .Add(DataChunk(AddTo(tsn, 1), StreamID(1), SSN(0), + PPID(53), std::vector(1), + /*options=*/{})) + .Build()); + + // The receiver might have moved into delayed ack mode. + cb_z2.AdvanceTime(options.rto_initial); + for (const auto timeout_id : cb_z2.RunTimers()) { + sock_z2.HandleTimeout(timeout_id); + } + + EXPECT_THAT( + cb_z2.ConsumeSentPacket(), + AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks())); + + // This DATA will not be accepted - it's not advancing cum ack tsn. + sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options) + .Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0), + PPID(53), std::vector(1), + /*options=*/{})) + .Build()); + + // Sack will be sent in IMMEDIATE mode when this is happening. + EXPECT_THAT( + cb_z2.ConsumeSentPacket(), + AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks())); + + // This DATA will not be accepted either. + sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options) + .Add(DataChunk(AddTo(tsn, 4), StreamID(1), SSN(0), + PPID(53), std::vector(1), + /*options=*/{})) + .Build()); + + // Sack will be sent in IMMEDIATE mode when this is happening. + EXPECT_THAT( + cb_z2.ConsumeSentPacket(), + AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks())); + + // This DATA should be accepted, and it fills the reassembly queue. + sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options) + .Add(DataChunk(AddTo(tsn, 2), StreamID(1), SSN(0), + PPID(53), std::vector(10), + /*options=*/{})) + .Build()); + + // The receiver might have moved into delayed ack mode. + cb_z2.AdvanceTime(options.rto_initial); + for (const auto timeout_id : cb_z2.RunTimers()) { + sock_z2.HandleTimeout(timeout_id); + } + + EXPECT_THAT( + cb_z2.ConsumeSentPacket(), + AllOf(HasSackWithCumAckTsn(AddTo(tsn, 2)), HasSackWithNoGapAckBlocks())); + + EXPECT_CALL(cb_z2, OnAborted(ErrorKind::kResourceExhaustion, _)); + EXPECT_CALL(cb_z2, OnClosed).Times(0); + + // This DATA will make the connection close. It's too full now. + sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options) + .Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0), + PPID(53), std::vector(10), + /*options=*/{})) + .Build()); +} + +} // namespace +} // namespace dcsctp diff --git a/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h b/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h index fce75c30aa..bad1aa697d 100644 --- a/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h +++ b/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h @@ -111,10 +111,6 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks { OnIncomingStreamsReset, (rtc::ArrayView incoming_streams), (override)); - MOCK_METHOD(void, - OnSentMessageExpired, - (StreamID stream_id, PPID ppid, bool unsent), - (override)); bool HasPacket() const { return !sent_packets_.empty(); } diff --git a/net/dcsctp/socket/state_cookie.cc b/net/dcsctp/socket/state_cookie.cc new file mode 100644 index 0000000000..7d04cbb0d7 --- /dev/null +++ b/net/dcsctp/socket/state_cookie.cc @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/socket/state_cookie.h" + +#include +#include + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/bounded_byte_reader.h" +#include "net/dcsctp/packet/bounded_byte_writer.h" +#include "net/dcsctp/socket/capabilities.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +// Magic values, which the state cookie is prefixed with. +constexpr uint32_t kMagic1 = 1684230979; +constexpr uint32_t kMagic2 = 1414541360; +constexpr size_t StateCookie::kCookieSize; + +std::vector StateCookie::Serialize() { + std::vector cookie; + cookie.resize(kCookieSize); + BoundedByteWriter buffer(cookie); + buffer.Store32<0>(kMagic1); + buffer.Store32<4>(kMagic2); + buffer.Store32<8>(*initiate_tag_); + buffer.Store32<12>(*initial_tsn_); + buffer.Store32<16>(a_rwnd_); + buffer.Store32<20>(static_cast(*tie_tag_ >> 32)); + buffer.Store32<24>(static_cast(*tie_tag_)); + buffer.Store8<28>(capabilities_.partial_reliability); + buffer.Store8<29>(capabilities_.message_interleaving); + buffer.Store8<30>(capabilities_.reconfig); + return cookie; +} + +absl::optional StateCookie::Deserialize( + rtc::ArrayView cookie) { + if (cookie.size() != kCookieSize) { + RTC_DLOG(LS_WARNING) << "Invalid state cookie: " << cookie.size() + << " bytes"; + return absl::nullopt; + } + + BoundedByteReader buffer(cookie); + uint32_t magic1 = buffer.Load32<0>(); + uint32_t magic2 = buffer.Load32<4>(); + if (magic1 != kMagic1 || magic2 != kMagic2) { + RTC_DLOG(LS_WARNING) << "Invalid state cookie; wrong magic"; + return absl::nullopt; + } + + VerificationTag verification_tag(buffer.Load32<8>()); + TSN initial_tsn(buffer.Load32<12>()); + uint32_t a_rwnd = buffer.Load32<16>(); + uint32_t tie_tag_upper = buffer.Load32<20>(); + uint32_t tie_tag_lower = buffer.Load32<24>(); + TieTag tie_tag(static_cast(tie_tag_upper) << 32 | + static_cast(tie_tag_lower)); + Capabilities capabilities; + capabilities.partial_reliability = buffer.Load8<28>() != 0; + capabilities.message_interleaving = buffer.Load8<29>() != 0; + capabilities.reconfig = buffer.Load8<30>() != 0; + + return StateCookie(verification_tag, initial_tsn, a_rwnd, tie_tag, + capabilities); +} + +} // namespace dcsctp diff --git a/net/dcsctp/socket/state_cookie.h b/net/dcsctp/socket/state_cookie.h new file mode 100644 index 0000000000..df4b801397 --- /dev/null +++ b/net/dcsctp/socket/state_cookie.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_SOCKET_STATE_COOKIE_H_ +#define NET_DCSCTP_SOCKET_STATE_COOKIE_H_ + +#include +#include + +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/internal_types.h" +#include "net/dcsctp/socket/capabilities.h" + +namespace dcsctp { + +// This is serialized as a state cookie and put in INIT_ACK. The client then +// responds with this in COOKIE_ECHO. +// +// NOTE: Expect that the client will modify it to try to exploit the library. +// Do not trust anything in it; no pointers or anything like that. +class StateCookie { + public: + static constexpr size_t kCookieSize = 31; + + StateCookie(VerificationTag initiate_tag, + TSN initial_tsn, + uint32_t a_rwnd, + TieTag tie_tag, + Capabilities capabilities) + : initiate_tag_(initiate_tag), + initial_tsn_(initial_tsn), + a_rwnd_(a_rwnd), + tie_tag_(tie_tag), + capabilities_(capabilities) {} + + // Returns a serialized version of this cookie. + std::vector Serialize(); + + // Deserializes the cookie, and returns absl::nullopt if that failed. + static absl::optional Deserialize( + rtc::ArrayView cookie); + + VerificationTag initiate_tag() const { return initiate_tag_; } + TSN initial_tsn() const { return initial_tsn_; } + uint32_t a_rwnd() const { return a_rwnd_; } + TieTag tie_tag() const { return tie_tag_; } + const Capabilities& capabilities() const { return capabilities_; } + + private: + const VerificationTag initiate_tag_; + const TSN initial_tsn_; + const uint32_t a_rwnd_; + const TieTag tie_tag_; + const Capabilities capabilities_; +}; +} // namespace dcsctp + +#endif // NET_DCSCTP_SOCKET_STATE_COOKIE_H_ diff --git a/net/dcsctp/socket/state_cookie_test.cc b/net/dcsctp/socket/state_cookie_test.cc new file mode 100644 index 0000000000..eab41a7a56 --- /dev/null +++ b/net/dcsctp/socket/state_cookie_test.cc @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/socket/state_cookie.h" + +#include "net/dcsctp/testing/testing_macros.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::SizeIs; + +TEST(StateCookieTest, SerializeAndDeserialize) { + Capabilities capabilities = {/*partial_reliability=*/true, + /*message_interleaving=*/false, + /*reconfig=*/true}; + StateCookie cookie(VerificationTag(123), TSN(456), + /*a_rwnd=*/789, TieTag(101112), capabilities); + std::vector serialized = cookie.Serialize(); + EXPECT_THAT(serialized, SizeIs(StateCookie::kCookieSize)); + ASSERT_HAS_VALUE_AND_ASSIGN(StateCookie deserialized, + StateCookie::Deserialize(serialized)); + EXPECT_EQ(deserialized.initiate_tag(), VerificationTag(123)); + EXPECT_EQ(deserialized.initial_tsn(), TSN(456)); + EXPECT_EQ(deserialized.a_rwnd(), 789u); + EXPECT_EQ(deserialized.tie_tag(), TieTag(101112)); + EXPECT_TRUE(deserialized.capabilities().partial_reliability); + EXPECT_FALSE(deserialized.capabilities().message_interleaving); + EXPECT_TRUE(deserialized.capabilities().reconfig); +} + +} // namespace +} // namespace dcsctp