Add SendMany method to dcsctp socket

Bug: webrtc:15724
Change-Id: Ib1689cd46395e2315803714ef50c009580fd71bb
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/331021
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41397}
This commit is contained in:
Daniel Collins 2023-12-15 09:59:46 -05:00 committed by WebRTC LUCI CQ
parent 0f86cd126b
commit c9d44b3fb9
5 changed files with 89 additions and 10 deletions

View File

@ -13,6 +13,7 @@
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -577,6 +578,16 @@ class DcSctpSocketInterface {
virtual SendStatus Send(DcSctpMessage message,
const SendOptions& send_options) = 0;
// Sends the messages `messages` using the provided send options.
// Sending a message is an asynchronous operation, and the `OnError` callback
// may be invoked to indicate any errors in sending the message.
//
// This has identical semantics to Send, except that it may coalesce many
// messages into a single SCTP packet if they would fit.
virtual std::vector<SendStatus> SendMany(
rtc::ArrayView<DcSctpMessage> messages,
const SendOptions& send_options) = 0;
// Resetting streams is an asynchronous operation and the results will
// be notified using `DcSctpSocketCallbacks::OnStreamsResetDone()` on success
// and `DcSctpSocketCallbacks::OnStreamsResetFailed()` on failure. Note that

View File

@ -10,6 +10,8 @@
#ifndef NET_DCSCTP_PUBLIC_MOCK_DCSCTP_SOCKET_H_
#define NET_DCSCTP_PUBLIC_MOCK_DCSCTP_SOCKET_H_
#include <vector>
#include "net/dcsctp/public/dcsctp_socket.h"
#include "test/gmock.h"
@ -56,6 +58,12 @@ class MockDcSctpSocket : public DcSctpSocketInterface {
(DcSctpMessage message, const SendOptions& send_options),
(override));
MOCK_METHOD(std::vector<SendStatus>,
SendMany,
(rtc::ArrayView<DcSctpMessage> messages,
const SendOptions& send_options),
(override));
MOCK_METHOD(ResetStreamsStatus,
ResetStreams,
(rtc::ArrayView<const StreamID> outgoing_streams),

View File

@ -475,8 +475,43 @@ SendStatus DcSctpSocket::Send(DcSctpMessage message,
const SendOptions& send_options) {
RTC_DCHECK_RUN_ON(&thread_checker_);
CallbackDeferrer::ScopedDeferrer deferrer(callbacks_);
LifecycleId lifecycle_id = send_options.lifecycle_id;
SendStatus send_status = InternalSend(message, send_options);
if (send_status != SendStatus::kSuccess)
return send_status;
Timestamp now = callbacks_.Now();
++metrics_.tx_messages_count;
send_queue_.Add(now, std::move(message), send_options);
if (tcb_ != nullptr)
tcb_->SendBufferedPackets(now);
RTC_DCHECK(IsConsistent());
return SendStatus::kSuccess;
}
std::vector<SendStatus> DcSctpSocket::SendMany(
rtc::ArrayView<DcSctpMessage> messages,
const SendOptions& send_options) {
RTC_DCHECK_RUN_ON(&thread_checker_);
CallbackDeferrer::ScopedDeferrer deferrer(callbacks_);
Timestamp now = callbacks_.Now();
std::vector<SendStatus> send_statuses;
send_statuses.reserve(messages.size());
for (DcSctpMessage& message : messages) {
SendStatus send_status = InternalSend(message, send_options);
send_statuses.push_back(send_status);
if (send_status != SendStatus::kSuccess)
continue;
++metrics_.tx_messages_count;
send_queue_.Add(now, std::move(message), send_options);
}
if (tcb_ != nullptr)
tcb_->SendBufferedPackets(now);
RTC_DCHECK(IsConsistent());
return send_statuses;
}
SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message,
const SendOptions& send_options) {
LifecycleId lifecycle_id = send_options.lifecycle_id;
if (message.payload().empty()) {
if (lifecycle_id.IsSet()) {
callbacks_.OnLifecycleEnd(lifecycle_id);
@ -514,15 +549,6 @@ SendStatus DcSctpSocket::Send(DcSctpMessage message,
"Unable to send message as the send queue is full");
return SendStatus::kErrorResourceExhaustion;
}
Timestamp now = callbacks_.Now();
++metrics_.tx_messages_count;
send_queue_.Add(now, std::move(message), send_options);
if (tcb_ != nullptr) {
tcb_->SendBufferedPackets(now);
}
RTC_DCHECK(IsConsistent());
return SendStatus::kSuccess;
}

View File

@ -14,6 +14,7 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include "api/array_view.h"
@ -91,6 +92,8 @@ class DcSctpSocket : public DcSctpSocketInterface {
void Close() override;
SendStatus Send(DcSctpMessage message,
const SendOptions& send_options) override;
std::vector<SendStatus> SendMany(rtc::ArrayView<DcSctpMessage> messages,
const SendOptions& send_options) override;
ResetStreamsStatus ResetStreams(
rtc::ArrayView<const StreamID> outgoing_streams) override;
SocketState state() const override;
@ -165,6 +168,9 @@ class DcSctpSocket : public DcSctpSocketInterface {
void MaybeSendShutdownOnPacketReceived(const SctpPacket& packet);
// If there are streams pending to be reset, send a request to reset them.
void MaybeSendResetStreamsRequest();
// Performs internal processing shared between Send and SendMany.
SendStatus InternalSend(const DcSctpMessage& message,
const SendOptions& send_options);
// Sends a INIT chunk.
void SendInit();
// Sends a SHUTDOWN chunk.

View File

@ -66,6 +66,7 @@ namespace {
using ::testing::_;
using ::testing::AllOf;
using ::testing::ElementsAre;
using ::testing::ElementsAreArray;
using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::IsEmpty;
@ -1561,6 +1562,33 @@ TEST(DcSctpSocketTest, SetMaxMessageSize) {
EXPECT_EQ(a.socket.options().max_message_size, 42u);
}
TEST_P(DcSctpSocketParametrizedTest, SendManyMessages) {
SocketUnderTest a("A");
auto z = std::make_unique<SocketUnderTest>("Z");
ConnectSockets(a, *z);
z = MaybeHandoverSocket(std::move(z));
static constexpr int kIterations = 100;
std::vector<DcSctpMessage> messages;
std::vector<SendStatus> statuses;
for (int i = 0; i < kIterations; ++i) {
messages.push_back(DcSctpMessage(StreamID(1), PPID(53), {1, 2}));
statuses.push_back(SendStatus::kSuccess);
}
EXPECT_THAT(a.socket.SendMany(messages, {}), ElementsAreArray(statuses));
ExchangeMessages(a, *z);
for (int i = 0; i < kIterations; ++i) {
EXPECT_TRUE(z->cb.ConsumeReceivedMessage().has_value());
}
EXPECT_FALSE(z->cb.ConsumeReceivedMessage().has_value());
MaybeHandoverSocketAndSendMessage(a, std::move(z));
}
TEST_P(DcSctpSocketParametrizedTest, SendsMessagesWithLowLifetime) {
SocketUnderTest a("A");
auto z = std::make_unique<SocketUnderTest>("Z");