diff --git a/net/dcsctp/public/dcsctp_socket.h b/net/dcsctp/public/dcsctp_socket.h index d0a81eaeb2..9989ae8d43 100644 --- a/net/dcsctp/public/dcsctp_socket.h +++ b/net/dcsctp/public/dcsctp_socket.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -577,6 +578,16 @@ class DcSctpSocketInterface { virtual SendStatus Send(DcSctpMessage message, const SendOptions& send_options) = 0; + // Sends the messages `messages` using the provided send options. + // Sending a message is an asynchronous operation, and the `OnError` callback + // may be invoked to indicate any errors in sending the message. + // + // This has identical semantics to Send, except that it may coalesce many + // messages into a single SCTP packet if they would fit. + virtual std::vector SendMany( + rtc::ArrayView messages, + const SendOptions& send_options) = 0; + // Resetting streams is an asynchronous operation and the results will // be notified using `DcSctpSocketCallbacks::OnStreamsResetDone()` on success // and `DcSctpSocketCallbacks::OnStreamsResetFailed()` on failure. Note that diff --git a/net/dcsctp/public/mock_dcsctp_socket.h b/net/dcsctp/public/mock_dcsctp_socket.h index 0fd572bd94..c71c3ae16f 100644 --- a/net/dcsctp/public/mock_dcsctp_socket.h +++ b/net/dcsctp/public/mock_dcsctp_socket.h @@ -10,6 +10,8 @@ #ifndef NET_DCSCTP_PUBLIC_MOCK_DCSCTP_SOCKET_H_ #define NET_DCSCTP_PUBLIC_MOCK_DCSCTP_SOCKET_H_ +#include + #include "net/dcsctp/public/dcsctp_socket.h" #include "test/gmock.h" @@ -56,6 +58,12 @@ class MockDcSctpSocket : public DcSctpSocketInterface { (DcSctpMessage message, const SendOptions& send_options), (override)); + MOCK_METHOD(std::vector, + SendMany, + (rtc::ArrayView messages, + const SendOptions& send_options), + (override)); + MOCK_METHOD(ResetStreamsStatus, ResetStreams, (rtc::ArrayView outgoing_streams), diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 5fc9bf57bb..bdf79e4e47 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc @@ -475,8 +475,43 @@ SendStatus DcSctpSocket::Send(DcSctpMessage message, const SendOptions& send_options) { RTC_DCHECK_RUN_ON(&thread_checker_); CallbackDeferrer::ScopedDeferrer deferrer(callbacks_); - LifecycleId lifecycle_id = send_options.lifecycle_id; + SendStatus send_status = InternalSend(message, send_options); + if (send_status != SendStatus::kSuccess) + return send_status; + Timestamp now = callbacks_.Now(); + ++metrics_.tx_messages_count; + send_queue_.Add(now, std::move(message), send_options); + if (tcb_ != nullptr) + tcb_->SendBufferedPackets(now); + RTC_DCHECK(IsConsistent()); + return SendStatus::kSuccess; +} +std::vector DcSctpSocket::SendMany( + rtc::ArrayView messages, + const SendOptions& send_options) { + RTC_DCHECK_RUN_ON(&thread_checker_); + CallbackDeferrer::ScopedDeferrer deferrer(callbacks_); + Timestamp now = callbacks_.Now(); + std::vector send_statuses; + send_statuses.reserve(messages.size()); + for (DcSctpMessage& message : messages) { + SendStatus send_status = InternalSend(message, send_options); + send_statuses.push_back(send_status); + if (send_status != SendStatus::kSuccess) + continue; + ++metrics_.tx_messages_count; + send_queue_.Add(now, std::move(message), send_options); + } + if (tcb_ != nullptr) + tcb_->SendBufferedPackets(now); + RTC_DCHECK(IsConsistent()); + return send_statuses; +} + +SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message, + const SendOptions& send_options) { + LifecycleId lifecycle_id = send_options.lifecycle_id; if (message.payload().empty()) { if (lifecycle_id.IsSet()) { callbacks_.OnLifecycleEnd(lifecycle_id); @@ -514,15 +549,6 @@ SendStatus DcSctpSocket::Send(DcSctpMessage message, "Unable to send message as the send queue is full"); return SendStatus::kErrorResourceExhaustion; } - - Timestamp now = callbacks_.Now(); - ++metrics_.tx_messages_count; - send_queue_.Add(now, std::move(message), send_options); - if (tcb_ != nullptr) { - tcb_->SendBufferedPackets(now); - } - - RTC_DCHECK(IsConsistent()); return SendStatus::kSuccess; } diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h index c59d6cac57..2712d70df9 100644 --- a/net/dcsctp/socket/dcsctp_socket.h +++ b/net/dcsctp/socket/dcsctp_socket.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "absl/strings/string_view.h" #include "api/array_view.h" @@ -91,6 +92,8 @@ class DcSctpSocket : public DcSctpSocketInterface { void Close() override; SendStatus Send(DcSctpMessage message, const SendOptions& send_options) override; + std::vector SendMany(rtc::ArrayView messages, + const SendOptions& send_options) override; ResetStreamsStatus ResetStreams( rtc::ArrayView outgoing_streams) override; SocketState state() const override; @@ -165,6 +168,9 @@ class DcSctpSocket : public DcSctpSocketInterface { void MaybeSendShutdownOnPacketReceived(const SctpPacket& packet); // If there are streams pending to be reset, send a request to reset them. void MaybeSendResetStreamsRequest(); + // Performs internal processing shared between Send and SendMany. + SendStatus InternalSend(const DcSctpMessage& message, + const SendOptions& send_options); // Sends a INIT chunk. void SendInit(); // Sends a SHUTDOWN chunk. diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index bb080d663a..413516bae0 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc @@ -66,6 +66,7 @@ namespace { using ::testing::_; using ::testing::AllOf; using ::testing::ElementsAre; +using ::testing::ElementsAreArray; using ::testing::Eq; using ::testing::HasSubstr; using ::testing::IsEmpty; @@ -1561,6 +1562,33 @@ TEST(DcSctpSocketTest, SetMaxMessageSize) { EXPECT_EQ(a.socket.options().max_message_size, 42u); } +TEST_P(DcSctpSocketParametrizedTest, SendManyMessages) { + SocketUnderTest a("A"); + auto z = std::make_unique("Z"); + + ConnectSockets(a, *z); + z = MaybeHandoverSocket(std::move(z)); + + static constexpr int kIterations = 100; + std::vector messages; + std::vector statuses; + for (int i = 0; i < kIterations; ++i) { + messages.push_back(DcSctpMessage(StreamID(1), PPID(53), {1, 2})); + statuses.push_back(SendStatus::kSuccess); + } + EXPECT_THAT(a.socket.SendMany(messages, {}), ElementsAreArray(statuses)); + + ExchangeMessages(a, *z); + + for (int i = 0; i < kIterations; ++i) { + EXPECT_TRUE(z->cb.ConsumeReceivedMessage().has_value()); + } + + EXPECT_FALSE(z->cb.ConsumeReceivedMessage().has_value()); + + MaybeHandoverSocketAndSendMessage(a, std::move(z)); +} + TEST_P(DcSctpSocketParametrizedTest, SendsMessagesWithLowLifetime) { SocketUnderTest a("A"); auto z = std::make_unique("Z");