Update SctpTransportInternal to use RTCError.

This avoids a couple of layers of error code conversion, reduces
dependency on cricket error types and allows us to preserve error
information from dcsctp. Along the way remove SendDataResult.

Bug: none
Change-Id: I1ad18a8f0b2fb181745b19c49f36f270708720c0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/298305
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39619}
This commit is contained in:
Tommi 2023-03-21 14:48:51 +01:00 committed by WebRTC LUCI CQ
parent 4c842224e1
commit 1fabbac6b6
14 changed files with 80 additions and 141 deletions

View File

@ -569,6 +569,7 @@ rtc_source_set("rtc_data_sctp_transport_internal") {
sources = [ "sctp/sctp_transport_internal.h" ]
deps = [
":media_channel",
"../api:rtc_error",
"../api/transport:datagram_transport_interface",
"../media:rtc_media_base",
"../p2p:rtc_p2p",

View File

@ -999,8 +999,6 @@ class VideoMediaReceiveChannelInterface : public MediaReceiveChannelInterface {
absl::optional<int> rtx_time) = 0;
};
enum SendDataResult { SDR_SUCCESS, SDR_ERROR, SDR_BLOCK };
} // namespace cricket
#endif // MEDIA_BASE_MEDIA_CHANNEL_H_

View File

@ -59,11 +59,11 @@ enum class WebrtcPPID : dcsctp::PPID::UnderlyingType {
WebrtcPPID ToPPID(DataMessageType message_type, size_t size) {
switch (message_type) {
case webrtc::DataMessageType::kControl:
case DataMessageType::kControl:
return WebrtcPPID::kDCEP;
case webrtc::DataMessageType::kText:
case DataMessageType::kText:
return size > 0 ? WebrtcPPID::kString : WebrtcPPID::kStringEmpty;
case webrtc::DataMessageType::kBinary:
case DataMessageType::kBinary:
return size > 0 ? WebrtcPPID::kBinary : WebrtcPPID::kBinaryEmpty;
}
}
@ -71,15 +71,15 @@ WebrtcPPID ToPPID(DataMessageType message_type, size_t size) {
absl::optional<DataMessageType> ToDataMessageType(dcsctp::PPID ppid) {
switch (static_cast<WebrtcPPID>(ppid.value())) {
case WebrtcPPID::kDCEP:
return webrtc::DataMessageType::kControl;
return DataMessageType::kControl;
case WebrtcPPID::kString:
case WebrtcPPID::kStringPartial:
case WebrtcPPID::kStringEmpty:
return webrtc::DataMessageType::kText;
return DataMessageType::kText;
case WebrtcPPID::kBinary:
case WebrtcPPID::kBinaryPartial:
case WebrtcPPID::kBinaryEmpty:
return webrtc::DataMessageType::kBinary;
return DataMessageType::kBinary;
}
return absl::nullopt;
}
@ -257,10 +257,9 @@ bool DcSctpTransport::ResetStream(int sid) {
return true;
}
bool DcSctpTransport::SendData(int sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
RTCError DcSctpTransport::SendData(int sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid
<< ", type=" << static_cast<int>(params.type)
@ -269,8 +268,7 @@ bool DcSctpTransport::SendData(int sid,
if (!socket_) {
RTC_LOG(LS_ERROR) << debug_name_
<< "->SendData(...): Transport is not started.";
*result = cricket::SDR_ERROR;
return false;
return RTCError(RTCErrorType::INVALID_STATE);
}
// It is possible for a message to be sent from the signaling thread at the
@ -284,8 +282,7 @@ bool DcSctpTransport::SendData(int sid,
if (stream_state == stream_states_.end()) {
RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
<< sid;
*result = cricket::SDR_ERROR;
return false;
return RTCError(RTCErrorType::INVALID_STATE);
}
if (stream_state->second.closure_initiated ||
@ -293,8 +290,7 @@ bool DcSctpTransport::SendData(int sid,
stream_state->second.outgoing_reset_done) {
RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
<< sid;
*result = cricket::SDR_ERROR;
return false;
return RTCError(RTCErrorType::INVALID_STATE);
}
auto max_message_size = socket_->options().max_message_size;
@ -304,8 +300,7 @@ bool DcSctpTransport::SendData(int sid,
"Trying to send packet bigger "
"than the max message size: "
<< payload.size() << " vs max of " << max_message_size;
*result = cricket::SDR_ERROR;
return false;
return RTCError(RTCErrorType::INVALID_RANGE);
}
std::vector<uint8_t> message_payload(payload.cdata(),
@ -337,24 +332,20 @@ bool DcSctpTransport::SendData(int sid,
send_options.max_retransmissions = *params.max_rtx_count;
}
auto error = socket_->Send(std::move(message), send_options);
dcsctp::SendStatus error = socket_->Send(std::move(message), send_options);
switch (error) {
case dcsctp::SendStatus::kSuccess:
*result = cricket::SDR_SUCCESS;
break;
return RTCError::OK();
case dcsctp::SendStatus::kErrorResourceExhaustion:
*result = cricket::SDR_BLOCK;
ready_to_send_data_ = false;
break;
return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
default:
absl::string_view message = dcsctp::ToString(error);
RTC_LOG(LS_ERROR) << debug_name_
<< "->SendData(...): send() failed with error "
<< dcsctp::ToString(error) << ".";
*result = cricket::SDR_ERROR;
break;
<< message << ".";
return RTCError(RTCErrorType::NETWORK_ERROR, message);
}
return *result == cricket::SDR_SUCCESS;
}
bool DcSctpTransport::ReadyToSendData() {
@ -425,7 +416,7 @@ SendPacketStatus DcSctpTransport::SendPacketWithStatus(
}
std::unique_ptr<dcsctp::Timeout> DcSctpTransport::CreateTimeout(
webrtc::TaskQueueBase::DelayPrecision precision) {
TaskQueueBase::DelayPrecision precision) {
return task_queue_timeout_factory_.CreateTimeout(precision);
}

View File

@ -57,10 +57,9 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
int max_message_size) override;
bool OpenStream(int sid) override;
bool ResetStream(int sid) override;
bool SendData(int sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result = nullptr) override;
RTCError SendData(int sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) override;
bool ReadyToSendData() override;
int max_message_size() const override;
absl::optional<int> max_outbound_streams() const override;
@ -72,7 +71,7 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
dcsctp::SendPacketStatus SendPacketWithStatus(
rtc::ArrayView<const uint8_t> data) override;
std::unique_ptr<dcsctp::Timeout> CreateTimeout(
webrtc::TaskQueueBase::DelayPrecision precision) override;
TaskQueueBase::DelayPrecision precision) override;
dcsctp::TimeMs TimeMillis() override;
uint32_t GetRandomInt(uint32_t low, uint32_t high) override;
void OnTotalBufferedAmountLow() override;

View File

@ -181,13 +181,10 @@ TEST(DcSctpTransportTest, DiscardMessageClosedChannel) {
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
cricket::SendDataResult result;
SendDataParams params;
rtc::CopyOnWriteBuffer payload;
bool send_data_return =
peer_a.sctp_transport_->SendData(1, params, payload, &result);
EXPECT_FALSE(send_data_return);
EXPECT_EQ(cricket::SDR_ERROR, result);
EXPECT_EQ(peer_a.sctp_transport_->SendData(1, params, payload).type(),
RTCErrorType::INVALID_STATE);
}
TEST(DcSctpTransportTest, DiscardMessageClosingChannel) {
@ -200,14 +197,10 @@ TEST(DcSctpTransportTest, DiscardMessageClosingChannel) {
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_a.sctp_transport_->ResetStream(1);
cricket::SendDataResult result;
SendDataParams params;
rtc::CopyOnWriteBuffer payload;
bool send_data_return =
peer_a.sctp_transport_->SendData(1, params, payload, &result);
EXPECT_FALSE(send_data_return);
EXPECT_EQ(cricket::SDR_ERROR, result);
EXPECT_EQ(peer_a.sctp_transport_->SendData(1, params, payload).type(),
RTCErrorType::INVALID_STATE);
}
TEST(DcSctpTransportTest, SendDataOpenChannel) {
@ -221,14 +214,9 @@ TEST(DcSctpTransportTest, SendDataOpenChannel) {
peer_a.sctp_transport_->OpenStream(1);
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
cricket::SendDataResult result;
SendDataParams params;
rtc::CopyOnWriteBuffer payload;
bool send_data_return =
peer_a.sctp_transport_->SendData(1, params, payload, &result);
EXPECT_TRUE(send_data_return);
EXPECT_EQ(cricket::SDR_SUCCESS, result);
EXPECT_TRUE(peer_a.sctp_transport_->SendData(1, params, payload).ok());
}
TEST(DcSctpTransportTest, DeliversMessage) {

View File

@ -18,6 +18,7 @@
#include <string>
#include <vector>
#include "api/rtc_error.h"
#include "api/transport/data_channel_transport_interface.h"
#include "media/base/media_channel.h"
#include "p2p/base/packet_transport_internal.h"
@ -119,11 +120,11 @@ class SctpTransportInternal {
// SignalClosingProcedureComplete on the other side.
virtual bool ResetStream(int sid) = 0;
// Send data down this channel.
// Returns true iff successful data somewhere on the send-queue/network.
virtual bool SendData(int sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
SendDataResult* result = nullptr) = 0;
// Returns RTCError::OK() if successful an error otherwise. Notably
// RTCErrorType::RESOURCE_EXHAUSTED for blocked operations.
virtual webrtc::RTCError SendData(int sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) = 0;
// Indicates when the SCTP socket is created and not blocked by congestion
// control. This changes to false when SDR_BLOCK is returned from SendData,

View File

@ -33,14 +33,14 @@ bool DataChannelController::HasUsedDataChannels() const {
return has_used_data_channels_;
}
bool DataChannelController::SendData(StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
RTCError DataChannelController::SendData(
StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) {
if (data_channel_transport())
return DataChannelSendData(sid, params, payload, result);
return DataChannelSendData(sid, params, payload);
RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
return false;
return RTCError(RTCErrorType::INVALID_STATE);
}
void DataChannelController::AddSctpDataStream(StreamId sid) {
@ -374,33 +374,20 @@ void DataChannelController::set_data_channel_transport(
data_channel_transport_ = transport;
}
bool DataChannelController::DataChannelSendData(
RTCError DataChannelController::DataChannelSendData(
StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
const rtc::CopyOnWriteBuffer& payload) {
// TODO(bugs.webrtc.org/11547): Expect method to be called on the network
// thread instead. Remove the BlockingCall() below and move associated state
// to the network thread.
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(data_channel_transport());
RTCError error = network_thread()->BlockingCall([this, sid, params, payload] {
return network_thread()->BlockingCall([this, sid, params, payload] {
return data_channel_transport()->SendData(sid.stream_id_int(), params,
payload);
});
if (error.ok()) {
*result = cricket::SendDataResult::SDR_SUCCESS;
return true;
} else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
// SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
// TODO(mellem): Stop using RTCError here and get rid of the mapping.
*result = cricket::SendDataResult::SDR_BLOCK;
return false;
}
*result = cricket::SendDataResult::SDR_ERROR;
return false;
}
void DataChannelController::NotifyDataChannelsOfTransportCreated() {

View File

@ -47,10 +47,9 @@ class DataChannelController : public SctpDataChannelControllerInterface,
// Implements
// SctpDataChannelProviderInterface.
bool SendData(StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) override;
RTCError SendData(StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) override;
void AddSctpDataStream(StreamId sid) override;
void RemoveSctpDataStream(StreamId sid) override;
bool ReadyToSendData() const override;
@ -124,10 +123,9 @@ class DataChannelController : public SctpDataChannelControllerInterface,
RTC_RUN_ON(signaling_thread());
// Called from SendData when data_channel_transport() is true.
bool DataChannelSendData(StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result);
RTCError DataChannelSendData(StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload);
// Called when all data channels need to be notified of a transport channel
// (calls OnTransportChannelCreated on the signaling thread).

View File

@ -670,11 +670,9 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
send_params.type =
buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
bool success =
controller_->SendData(id_, send_params, buffer.data, &send_result);
RTCError error = controller_->SendData(id_, send_params, buffer.data);
if (success) {
if (error.ok()) {
++messages_sent_;
bytes_sent_ += buffer.size();
@ -684,7 +682,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
return true;
}
if (send_result == cricket::SDR_BLOCK) {
if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
if (!queue_if_blocked || QueueSendDataMessage(buffer)) {
return false;
}
@ -693,7 +691,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
// message failed.
RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
"send_result = "
<< send_result;
<< ToString(error.type()) << ":" << error.message();
CloseAbruptlyWithError(
RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data"));
@ -748,9 +746,8 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
send_params.ordered = ordered_ || is_open_message;
send_params.type = DataMessageType::kControl;
cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
bool retval = controller_->SendData(id_, send_params, buffer, &send_result);
if (retval) {
RTCError err = controller_->SendData(id_, send_params, buffer);
if (err.ok()) {
RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
<< id_.stream_id_int();
@ -759,16 +756,16 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
} else if (handshake_state_ == kHandshakeShouldSendOpen) {
handshake_state_ = kHandshakeWaitingForAck;
}
} else if (send_result == cricket::SDR_BLOCK) {
} else if (err.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
QueueControlMessage(buffer);
} else {
RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
" the CONTROL message, send_result = "
<< send_result;
CloseAbruptlyWithError(RTCError(RTCErrorType::NETWORK_ERROR,
"Failed to send a CONTROL message"));
<< ToString(err.type());
err.set_message("Failed to send a CONTROL message");
CloseAbruptlyWithError(err);
}
return retval;
return err.ok();
}
// static

View File

@ -39,10 +39,9 @@ class SctpDataChannel;
class SctpDataChannelControllerInterface {
public:
// Sends the data to the transport.
virtual bool SendData(StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) = 0;
virtual RTCError SendData(StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) = 0;
// Adds the data channel SID to the transport for SCTP.
virtual void AddSctpDataStream(StreamId sid) = 0;
// Begins the closing procedure by sending an outgoing stream reset. Still

View File

@ -78,22 +78,7 @@ RTCError SctpTransport::SendData(int channel_id,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK_RUN_ON(owner_thread_);
RTC_DCHECK(internal_sctp_transport_);
cricket::SendDataResult result;
internal_sctp_transport_->SendData(channel_id, params, buffer, &result);
// TODO(mellem): See about changing the interfaces to not require mapping
// SendDataResult to RTCError and back again.
switch (result) {
case cricket::SendDataResult::SDR_SUCCESS:
return RTCError::OK();
case cricket::SendDataResult::SDR_BLOCK:
// Send buffer is full.
return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
case cricket::SendDataResult::SDR_ERROR:
return RTCError(RTCErrorType::NETWORK_ERROR);
}
return RTCError(RTCErrorType::NETWORK_ERROR);
return internal_sctp_transport_->SendData(channel_id, params, buffer);
}
RTCError SctpTransport::CloseChannel(int channel_id) {

View File

@ -49,11 +49,10 @@ class FakeCricketSctpTransport : public cricket::SctpTransportInternal {
}
bool OpenStream(int sid) override { return true; }
bool ResetStream(int sid) override { return true; }
bool SendData(int sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result = nullptr) override {
return true;
RTCError SendData(int sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) override {
return RTCError::OK();
}
bool ReadyToSendData() override { return true; }
void set_debug_name_for_testing(const char* debug_name) override {}

View File

@ -44,25 +44,22 @@ class FakeDataChannelController
return channel;
}
bool SendData(webrtc::StreamId sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) override {
webrtc::RTCError SendData(webrtc::StreamId sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) override {
RTC_CHECK(ready_to_send_);
RTC_CHECK(transport_available_);
if (send_blocked_) {
*result = cricket::SDR_BLOCK;
return false;
return webrtc::RTCError(webrtc::RTCErrorType::RESOURCE_EXHAUSTED);
}
if (transport_error_) {
*result = cricket::SDR_ERROR;
return false;
return webrtc::RTCError(webrtc::RTCErrorType::INTERNAL_ERROR);
}
last_sid_ = sid.stream_id_int();
last_send_data_params_ = params;
return true;
return webrtc::RTCError::OK();
}
void AddSctpDataStream(webrtc::StreamId sid) override {

View File

@ -32,11 +32,10 @@ class FakeSctpTransport : public cricket::SctpTransportInternal {
}
bool OpenStream(int sid) override { return true; }
bool ResetStream(int sid) override { return true; }
bool SendData(int sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result = nullptr) override {
return true;
webrtc::RTCError SendData(int sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) override {
return webrtc::RTCError::OK();
}
bool ReadyToSendData() override { return true; }
void set_debug_name_for_testing(const char* debug_name) override {}