Remove sigslot usage from SctpTransportInternal
Bug: webrtc:11943 Change-Id: I42edf8e2e15e580bcda090447a7aae4a56366b33 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/270661 Commit-Queue: Fredrik Solenberg <solenberg@webrtc.org> Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37867}
This commit is contained in:
parent
bd02e70629
commit
5cb3a90870
@ -406,7 +406,6 @@ rtc_source_set("rtc_data_sctp_transport_internal") {
|
||||
"../p2p:rtc_p2p",
|
||||
"../rtc_base:copy_on_write_buffer",
|
||||
"../rtc_base:threading",
|
||||
"../rtc_base/third_party/sigslot",
|
||||
]
|
||||
}
|
||||
|
||||
@ -432,6 +431,7 @@ if (rtc_build_dcsctp) {
|
||||
"../rtc_base:copy_on_write_buffer",
|
||||
"../rtc_base:event_tracer",
|
||||
"../rtc_base:logging",
|
||||
"../rtc_base:macromagic",
|
||||
"../rtc_base:random",
|
||||
"../rtc_base:socket",
|
||||
"../rtc_base:stringutils",
|
||||
|
||||
@ -151,6 +151,19 @@ DcSctpTransport::~DcSctpTransport() {
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::SetOnConnectedCallback(std::function<void()> callback) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
on_connected_callback_ = std::move(callback);
|
||||
}
|
||||
|
||||
void DcSctpTransport::SetDataChannelSink(DataChannelSink* sink) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
data_channel_sink_ = sink;
|
||||
if (data_channel_sink_ && ready_to_send_data_) {
|
||||
data_channel_sink_->OnReadyToSend();
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::SetDtlsTransport(
|
||||
rtc::PacketTransportInternal* transport) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
@ -165,10 +178,9 @@ bool DcSctpTransport::Start(int local_sctp_port,
|
||||
int max_message_size) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DCHECK(max_message_size > 0);
|
||||
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port
|
||||
<< ", remote=" << remote_sctp_port
|
||||
<< ", max_message_size=" << max_message_size << ")";
|
||||
RTC_DLOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port
|
||||
<< ", remote=" << remote_sctp_port
|
||||
<< ", max_message_size=" << max_message_size << ")";
|
||||
|
||||
if (!socket_) {
|
||||
dcsctp::DcSctpOptions options;
|
||||
@ -206,7 +218,7 @@ bool DcSctpTransport::Start(int local_sctp_port,
|
||||
}
|
||||
|
||||
bool DcSctpTransport::OpenStream(int sid) {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
|
||||
RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
|
||||
if (!socket_) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid
|
||||
<< "): Transport is not started.";
|
||||
@ -216,7 +228,7 @@ bool DcSctpTransport::OpenStream(int sid) {
|
||||
}
|
||||
|
||||
bool DcSctpTransport::ResetStream(int sid) {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
|
||||
RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
|
||||
if (!socket_) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
|
||||
<< "): Transport is not started.";
|
||||
@ -242,10 +254,9 @@ bool DcSctpTransport::SendData(int sid,
|
||||
const rtc::CopyOnWriteBuffer& payload,
|
||||
cricket::SendDataResult* result) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid
|
||||
<< ", type=" << static_cast<int>(params.type)
|
||||
<< ", length=" << payload.size() << ").";
|
||||
RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid
|
||||
<< ", type=" << static_cast<int>(params.type)
|
||||
<< ", length=" << payload.size() << ").";
|
||||
|
||||
if (!socket_) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
@ -308,6 +319,7 @@ bool DcSctpTransport::SendData(int sid,
|
||||
<< "->SendData(...): send() failed with error "
|
||||
<< dcsctp::ToString(error) << ".";
|
||||
*result = cricket::SDR_ERROR;
|
||||
break;
|
||||
}
|
||||
|
||||
return *result == cricket::SDR_SUCCESS;
|
||||
@ -394,18 +406,21 @@ uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) {
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnTotalBufferedAmountLow() {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
if (!ready_to_send_data_) {
|
||||
ready_to_send_data_ = true;
|
||||
SignalReadyToSendData();
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnReadyToSend();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid="
|
||||
<< message.stream_id().value()
|
||||
<< ", ppid=" << message.ppid().value()
|
||||
<< ", length=" << message.payload().size() << ").";
|
||||
RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid="
|
||||
<< message.stream_id().value()
|
||||
<< ", ppid=" << message.ppid().value()
|
||||
<< ", length=" << message.payload().size() << ").";
|
||||
cricket::ReceiveDataParams receive_data_params;
|
||||
receive_data_params.sid = message.stream_id().value();
|
||||
auto type = ToDataMessageType(message.ppid());
|
||||
@ -423,7 +438,10 @@ void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) {
|
||||
receive_buffer_.AppendData(message.payload().data(),
|
||||
message.payload().size());
|
||||
|
||||
SignalDataReceived(receive_data_params, receive_buffer_);
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnDataReceived(
|
||||
receive_data_params.sid, receive_data_params.type, receive_buffer_);
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnError(dcsctp::ErrorKind error,
|
||||
@ -444,6 +462,7 @@ void DcSctpTransport::OnError(dcsctp::ErrorKind error,
|
||||
|
||||
void DcSctpTransport::OnAborted(dcsctp::ErrorKind error,
|
||||
absl::string_view message) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->OnAborted(error=" << dcsctp::ToString(error)
|
||||
<< ", message=" << message << ").";
|
||||
@ -455,23 +474,30 @@ void DcSctpTransport::OnAborted(dcsctp::ErrorKind error,
|
||||
if (code.has_value()) {
|
||||
rtc_error.set_sctp_cause_code(static_cast<uint16_t>(*code));
|
||||
}
|
||||
SignalClosedAbruptly(rtc_error);
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnTransportClosed(rtc_error);
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnConnected() {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OnConnected().";
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnected().";
|
||||
ready_to_send_data_ = true;
|
||||
SignalReadyToSendData();
|
||||
SignalAssociationChangeCommunicationUp();
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnReadyToSend();
|
||||
}
|
||||
if (on_connected_callback_) {
|
||||
on_connected_callback_();
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnClosed() {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OnClosed().";
|
||||
RTC_DLOG(LS_INFO) << debug_name_ << "->OnClosed().";
|
||||
ready_to_send_data_ = false;
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnConnectionRestarted() {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted().";
|
||||
RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted().";
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnStreamsResetFailed(
|
||||
@ -488,6 +514,7 @@ void DcSctpTransport::OnStreamsResetFailed(
|
||||
|
||||
void DcSctpTransport::OnStreamsResetPerformed(
|
||||
rtc::ArrayView<const dcsctp::StreamID> outgoing_streams) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
for (auto& stream_id : outgoing_streams) {
|
||||
RTC_LOG(LS_INFO) << debug_name_
|
||||
<< "->OnStreamsResetPerformed(...): Outgoing stream reset"
|
||||
@ -498,7 +525,9 @@ void DcSctpTransport::OnStreamsResetPerformed(
|
||||
if (closing_state.incoming_reset_done) {
|
||||
// When the close was not initiated locally, we can signal the end of the
|
||||
// data channel close procedure when the remote ACKs the reset.
|
||||
SignalClosingProcedureComplete(stream_id.value());
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnChannelClosed(stream_id.value());
|
||||
}
|
||||
closing_states_.erase(stream_id);
|
||||
}
|
||||
}
|
||||
@ -506,6 +535,7 @@ void DcSctpTransport::OnStreamsResetPerformed(
|
||||
|
||||
void DcSctpTransport::OnIncomingStreamsReset(
|
||||
rtc::ArrayView<const dcsctp::StreamID> incoming_streams) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
for (auto& stream_id : incoming_streams) {
|
||||
RTC_LOG(LS_INFO) << debug_name_
|
||||
<< "->OnIncomingStreamsReset(...): Incoming stream reset"
|
||||
@ -519,13 +549,17 @@ void DcSctpTransport::OnIncomingStreamsReset(
|
||||
// direction too.
|
||||
dcsctp::StreamID streams[1] = {stream_id};
|
||||
socket_->ResetStreams(streams);
|
||||
SignalClosingProcedureStartedRemotely(stream_id.value());
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnChannelClosing(stream_id.value());
|
||||
}
|
||||
}
|
||||
|
||||
if (closing_state.outgoing_reset_done) {
|
||||
// The close procedure that was initiated locally is complete when we
|
||||
// receive and incoming reset event.
|
||||
SignalClosingProcedureComplete(stream_id.value());
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnChannelClosed(stream_id.value());
|
||||
}
|
||||
closing_states_.erase(stream_id);
|
||||
}
|
||||
}
|
||||
@ -557,11 +591,9 @@ void DcSctpTransport::OnTransportWritableState(
|
||||
rtc::PacketTransportInternal* transport) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DCHECK_EQ(transport_, transport);
|
||||
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_
|
||||
<< "->OnTransportWritableState(), writable="
|
||||
<< transport->writable();
|
||||
|
||||
RTC_DLOG(LS_VERBOSE) << debug_name_
|
||||
<< "->OnTransportWritableState(), writable="
|
||||
<< transport->writable();
|
||||
MaybeConnectSocket();
|
||||
}
|
||||
|
||||
@ -587,8 +619,11 @@ void DcSctpTransport::OnTransportReadPacket(
|
||||
|
||||
void DcSctpTransport::OnTransportClosed(
|
||||
rtc::PacketTransportInternal* transport) {
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed().";
|
||||
SignalClosedAbruptly({});
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed().";
|
||||
if (data_channel_sink_) {
|
||||
data_channel_sink_->OnTransportClosed({});
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::MaybeConnectSocket() {
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "rtc_base/random.h"
|
||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||
#include "rtc_base/thread.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
#include "system_wrappers/include/clock.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -48,6 +49,8 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
|
||||
~DcSctpTransport() override;
|
||||
|
||||
// cricket::SctpTransportInternal
|
||||
void SetOnConnectedCallback(std::function<void()> callback) override;
|
||||
void SetDataChannelSink(DataChannelSink* sink) override;
|
||||
void SetDtlsTransport(rtc::PacketTransportInternal* transport) override;
|
||||
bool Start(int local_sctp_port,
|
||||
int remote_sctp_port,
|
||||
@ -128,6 +131,8 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
|
||||
|
||||
flat_map<dcsctp::StreamID, StreamClosingState> closing_states_;
|
||||
bool ready_to_send_data_ = false;
|
||||
std::function<void()> on_connected_callback_ RTC_GUARDED_BY(network_thread_);
|
||||
DataChannelSink* data_channel_sink_ RTC_GUARDED_BY(network_thread_) = nullptr;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
@ -29,12 +29,18 @@ using ::testing::Return;
|
||||
namespace webrtc {
|
||||
|
||||
namespace {
|
||||
class SctpInternalTransportObserver : public sigslot::has_slots<> {
|
||||
class MockDataChannelObserver : public DataChannelSink {
|
||||
public:
|
||||
MOCK_METHOD(void, OnSignalReadyToSendData, ());
|
||||
MOCK_METHOD(void, OnSignalAssociationChangeCommunicationUp, ());
|
||||
MOCK_METHOD(void, OnSignalClosingProcedureStartedRemotely, (int));
|
||||
MOCK_METHOD(void, OnSignalClosingProcedureComplete, (int));
|
||||
MOCK_METHOD(void, OnConnected, ());
|
||||
|
||||
// DataChannelSink
|
||||
MOCK_METHOD(void,
|
||||
OnDataReceived,
|
||||
(int, DataMessageType, const rtc::CopyOnWriteBuffer&));
|
||||
MOCK_METHOD(void, OnChannelClosing, (int));
|
||||
MOCK_METHOD(void, OnChannelClosed, (int));
|
||||
MOCK_METHOD(void, OnReadyToSend, ());
|
||||
MOCK_METHOD(void, OnTransportClosed, (RTCError));
|
||||
};
|
||||
|
||||
class Peer {
|
||||
@ -52,28 +58,16 @@ class Peer {
|
||||
sctp_transport_ = std::make_unique<webrtc::DcSctpTransport>(
|
||||
rtc::Thread::Current(), &fake_packet_transport_, &simulated_clock_,
|
||||
std::move(mock_dcsctp_socket_factory));
|
||||
|
||||
sctp_transport_->SignalAssociationChangeCommunicationUp.connect(
|
||||
static_cast<SctpInternalTransportObserver*>(&observer_),
|
||||
&SctpInternalTransportObserver::OnSignalReadyToSendData);
|
||||
sctp_transport_->SignalAssociationChangeCommunicationUp.connect(
|
||||
static_cast<SctpInternalTransportObserver*>(&observer_),
|
||||
&SctpInternalTransportObserver::
|
||||
OnSignalAssociationChangeCommunicationUp);
|
||||
sctp_transport_->SignalClosingProcedureStartedRemotely.connect(
|
||||
static_cast<SctpInternalTransportObserver*>(&observer_),
|
||||
&SctpInternalTransportObserver::
|
||||
OnSignalClosingProcedureStartedRemotely);
|
||||
sctp_transport_->SignalClosingProcedureComplete.connect(
|
||||
static_cast<SctpInternalTransportObserver*>(&observer_),
|
||||
&SctpInternalTransportObserver::OnSignalClosingProcedureComplete);
|
||||
sctp_transport_->SetDataChannelSink(&observer_);
|
||||
sctp_transport_->SetOnConnectedCallback(
|
||||
[this]() { observer_.OnConnected(); });
|
||||
}
|
||||
|
||||
rtc::FakePacketTransport fake_packet_transport_;
|
||||
webrtc::SimulatedClock simulated_clock_;
|
||||
dcsctp::MockDcSctpSocket* socket_;
|
||||
std::unique_ptr<webrtc::DcSctpTransport> sctp_transport_;
|
||||
NiceMock<SctpInternalTransportObserver> observer_;
|
||||
NiceMock<MockDataChannelObserver> observer_;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
@ -86,8 +80,8 @@ TEST(DcSctpTransportTest, OpenSequence) {
|
||||
.Times(1)
|
||||
.WillOnce(Invoke(peer_a.sctp_transport_.get(),
|
||||
&dcsctp::DcSctpSocketCallbacks::OnConnected));
|
||||
EXPECT_CALL(peer_a.observer_, OnSignalReadyToSendData);
|
||||
EXPECT_CALL(peer_a.observer_, OnSignalAssociationChangeCommunicationUp);
|
||||
EXPECT_CALL(peer_a.observer_, OnReadyToSend);
|
||||
EXPECT_CALL(peer_a.observer_, OnConnected);
|
||||
|
||||
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
|
||||
}
|
||||
@ -109,11 +103,10 @@ TEST(DcSctpTransportTest, CloseSequence) {
|
||||
EXPECT_CALL(*peer_b.socket_, ResetStreams(ElementsAre(dcsctp::StreamID(1))))
|
||||
.WillOnce(Return(dcsctp::ResetStreamsStatus::kPerformed));
|
||||
|
||||
EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureStartedRemotely(1))
|
||||
.Times(0);
|
||||
EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureStartedRemotely(1));
|
||||
EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureComplete(1));
|
||||
EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureComplete(1));
|
||||
EXPECT_CALL(peer_a.observer_, OnChannelClosing(1)).Times(0);
|
||||
EXPECT_CALL(peer_b.observer_, OnChannelClosing(1));
|
||||
EXPECT_CALL(peer_a.observer_, OnChannelClosed(1));
|
||||
EXPECT_CALL(peer_b.observer_, OnChannelClosed(1));
|
||||
}
|
||||
|
||||
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
|
||||
@ -135,7 +128,7 @@ TEST(DcSctpTransportTest, CloseSequence) {
|
||||
|
||||
// Tests that the close sequence initiated from both peers at the same time
|
||||
// terminates properly. Both peers will think they initiated it, so no
|
||||
// OnSignalClosingProcedureStartedRemotely should be called.
|
||||
// OnClosingProcedureStartedRemotely should be called.
|
||||
TEST(DcSctpTransportTest, CloseSequenceSimultaneous) {
|
||||
rtc::AutoThread main_thread;
|
||||
Peer peer_a;
|
||||
@ -151,12 +144,10 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) {
|
||||
EXPECT_CALL(*peer_b.socket_, ResetStreams(ElementsAre(dcsctp::StreamID(1))))
|
||||
.WillOnce(Return(dcsctp::ResetStreamsStatus::kPerformed));
|
||||
|
||||
EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureStartedRemotely(1))
|
||||
.Times(0);
|
||||
EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureStartedRemotely(1))
|
||||
.Times(0);
|
||||
EXPECT_CALL(peer_a.observer_, OnSignalClosingProcedureComplete(1));
|
||||
EXPECT_CALL(peer_b.observer_, OnSignalClosingProcedureComplete(1));
|
||||
EXPECT_CALL(peer_a.observer_, OnChannelClosing(1)).Times(0);
|
||||
EXPECT_CALL(peer_b.observer_, OnChannelClosing(1)).Times(0);
|
||||
EXPECT_CALL(peer_a.observer_, OnChannelClosed(1));
|
||||
EXPECT_CALL(peer_b.observer_, OnChannelClosed(1));
|
||||
}
|
||||
|
||||
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
|
||||
|
||||
@ -19,13 +19,13 @@
|
||||
#include <vector>
|
||||
|
||||
#include "api/transport/data_channel_transport_interface.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/thread.h"
|
||||
// For SendDataParams/ReceiveDataParams.
|
||||
// TODO(deadbeef): Use something else for SCTP. It's confusing that we use an
|
||||
// SSRC field for SID.
|
||||
#include "media/base/media_channel.h"
|
||||
#include "p2p/base/packet_transport_internal.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/thread.h"
|
||||
|
||||
namespace cricket {
|
||||
|
||||
@ -77,6 +77,9 @@ class SctpTransportInternal {
|
||||
public:
|
||||
virtual ~SctpTransportInternal() {}
|
||||
|
||||
virtual void SetOnConnectedCallback(std::function<void()> callback) = 0;
|
||||
virtual void SetDataChannelSink(webrtc::DataChannelSink* sink) = 0;
|
||||
|
||||
// Changes what underlying DTLS transport is uses. Used when switching which
|
||||
// bundled transport the SctpTransport uses.
|
||||
virtual void SetDtlsTransport(rtc::PacketTransportInternal* transport) = 0;
|
||||
@ -140,24 +143,6 @@ class SctpTransportInternal {
|
||||
// Returns the current negotiated max # of inbound streams.
|
||||
virtual absl::optional<int> max_inbound_streams() const = 0;
|
||||
|
||||
sigslot::signal0<> SignalReadyToSendData;
|
||||
sigslot::signal0<> SignalAssociationChangeCommunicationUp;
|
||||
// ReceiveDataParams includes SID, seq num, timestamp, etc. CopyOnWriteBuffer
|
||||
// contains message payload.
|
||||
sigslot::signal2<const ReceiveDataParams&, const rtc::CopyOnWriteBuffer&>
|
||||
SignalDataReceived;
|
||||
// Parameter is SID; fired when we receive an incoming stream reset on an
|
||||
// open stream, indicating that the other side started the closing procedure.
|
||||
// After resetting the outgoing stream, SignalClosingProcedureComplete will
|
||||
// fire too.
|
||||
sigslot::signal1<int> SignalClosingProcedureStartedRemotely;
|
||||
// Parameter is SID; fired when closing procedure is complete (both incoming
|
||||
// and outgoing streams reset).
|
||||
sigslot::signal1<int> SignalClosingProcedureComplete;
|
||||
// Fired when the underlying DTLS transport has closed due to an error
|
||||
// or an incoming DTLS disconnect or SCTP transport errors.
|
||||
sigslot::signal1<webrtc::RTCError> SignalClosedAbruptly;
|
||||
|
||||
// Helper for debugging.
|
||||
virtual void set_debug_name_for_testing(const char* debug_name) = 0;
|
||||
};
|
||||
|
||||
18
pc/BUILD.gn
18
pc/BUILD.gn
@ -204,7 +204,6 @@ rtc_source_set("jsep_transport") {
|
||||
":rtcp_mux_filter",
|
||||
":rtp_transport",
|
||||
":rtp_transport_internal",
|
||||
":sctp_data_channel_transport",
|
||||
":sctp_transport",
|
||||
":session_description",
|
||||
":srtp_filter",
|
||||
@ -466,22 +465,6 @@ rtc_source_set("rtp_transport_internal") {
|
||||
]
|
||||
}
|
||||
|
||||
rtc_source_set("sctp_data_channel_transport") {
|
||||
visibility = [ ":*" ]
|
||||
sources = [
|
||||
"sctp_data_channel_transport.cc",
|
||||
"sctp_data_channel_transport.h",
|
||||
]
|
||||
deps = [
|
||||
"../api:rtc_error",
|
||||
"../api/transport:datagram_transport_interface",
|
||||
"../media:rtc_data_sctp_transport_internal",
|
||||
"../media:rtc_media_base",
|
||||
"../rtc_base:copy_on_write_buffer",
|
||||
"../rtc_base/third_party/sigslot",
|
||||
]
|
||||
}
|
||||
|
||||
rtc_source_set("sctp_transport") {
|
||||
visibility = [ ":*" ]
|
||||
sources = [
|
||||
@ -494,6 +477,7 @@ rtc_source_set("sctp_transport") {
|
||||
"../api:libjingle_peerconnection_api",
|
||||
"../api:scoped_refptr",
|
||||
"../api:sequence_checker",
|
||||
"../api/transport:datagram_transport_interface",
|
||||
"../media:rtc_data_sctp_transport_internal",
|
||||
"../p2p:rtc_p2p",
|
||||
"../rtc_base",
|
||||
|
||||
@ -22,7 +22,6 @@
|
||||
#include "api/candidate.h"
|
||||
#include "p2p/base/p2p_constants.h"
|
||||
#include "p2p/base/p2p_transport_channel.h"
|
||||
#include "pc/sctp_data_channel_transport.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/logging.h"
|
||||
@ -99,10 +98,6 @@ JsepTransport::JsepTransport(
|
||||
? rtc::make_ref_counted<webrtc::DtlsTransport>(
|
||||
std::move(rtcp_dtls_transport))
|
||||
: nullptr),
|
||||
sctp_data_channel_transport_(
|
||||
sctp_transport ? std::make_unique<webrtc::SctpDataChannelTransport>(
|
||||
sctp_transport.get())
|
||||
: nullptr),
|
||||
sctp_transport_(sctp_transport
|
||||
? rtc::make_ref_counted<webrtc::SctpTransport>(
|
||||
std::move(sctp_transport))
|
||||
|
||||
@ -223,10 +223,7 @@ class JsepTransport {
|
||||
// TODO(bugs.webrtc.org/9719): Delete method, update callers to use
|
||||
// SctpTransport() instead.
|
||||
webrtc::DataChannelTransportInterface* data_channel_transport() const {
|
||||
if (sctp_data_channel_transport_) {
|
||||
return sctp_data_channel_transport_.get();
|
||||
}
|
||||
return nullptr;
|
||||
return sctp_transport_.get();
|
||||
}
|
||||
|
||||
// TODO(deadbeef): The methods below are only public for testing. Should make
|
||||
@ -311,8 +308,6 @@ class JsepTransport {
|
||||
rtc::scoped_refptr<webrtc::DtlsTransport> rtcp_dtls_transport_
|
||||
RTC_GUARDED_BY(network_thread_);
|
||||
|
||||
const std::unique_ptr<webrtc::DataChannelTransportInterface>
|
||||
sctp_data_channel_transport_;
|
||||
const rtc::scoped_refptr<webrtc::SctpTransport> sctp_transport_;
|
||||
|
||||
SrtpFilter sdes_negotiator_ RTC_GUARDED_BY(network_thread_);
|
||||
|
||||
@ -1,108 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include "pc/sctp_data_channel_transport.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
SctpDataChannelTransport::SctpDataChannelTransport(
|
||||
cricket::SctpTransportInternal* sctp_transport)
|
||||
: sctp_transport_(sctp_transport) {
|
||||
sctp_transport_->SignalReadyToSendData.connect(
|
||||
this, &SctpDataChannelTransport::OnReadyToSendData);
|
||||
sctp_transport_->SignalDataReceived.connect(
|
||||
this, &SctpDataChannelTransport::OnDataReceived);
|
||||
sctp_transport_->SignalClosingProcedureStartedRemotely.connect(
|
||||
this, &SctpDataChannelTransport::OnClosingProcedureStartedRemotely);
|
||||
sctp_transport_->SignalClosingProcedureComplete.connect(
|
||||
this, &SctpDataChannelTransport::OnClosingProcedureComplete);
|
||||
sctp_transport_->SignalClosedAbruptly.connect(
|
||||
this, &SctpDataChannelTransport::OnClosedAbruptly);
|
||||
}
|
||||
|
||||
RTCError SctpDataChannelTransport::OpenChannel(int channel_id) {
|
||||
sctp_transport_->OpenStream(channel_id);
|
||||
return RTCError::OK();
|
||||
}
|
||||
|
||||
RTCError SctpDataChannelTransport::SendData(
|
||||
int channel_id,
|
||||
const SendDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& buffer) {
|
||||
cricket::SendDataResult result;
|
||||
sctp_transport_->SendData(channel_id, params, buffer, &result);
|
||||
|
||||
// TODO(mellem): See about changing the interfaces to not require mapping
|
||||
// SendDataResult to RTCError and back again.
|
||||
switch (result) {
|
||||
case cricket::SendDataResult::SDR_SUCCESS:
|
||||
return RTCError::OK();
|
||||
case cricket::SendDataResult::SDR_BLOCK: {
|
||||
// Send buffer is full.
|
||||
ready_to_send_ = false;
|
||||
return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
|
||||
}
|
||||
case cricket::SendDataResult::SDR_ERROR:
|
||||
return RTCError(RTCErrorType::NETWORK_ERROR);
|
||||
}
|
||||
return RTCError(RTCErrorType::NETWORK_ERROR);
|
||||
}
|
||||
|
||||
RTCError SctpDataChannelTransport::CloseChannel(int channel_id) {
|
||||
sctp_transport_->ResetStream(channel_id);
|
||||
return RTCError::OK();
|
||||
}
|
||||
|
||||
void SctpDataChannelTransport::SetDataSink(DataChannelSink* sink) {
|
||||
sink_ = sink;
|
||||
if (sink_ && ready_to_send_) {
|
||||
sink_->OnReadyToSend();
|
||||
}
|
||||
}
|
||||
|
||||
bool SctpDataChannelTransport::IsReadyToSend() const {
|
||||
return ready_to_send_;
|
||||
}
|
||||
|
||||
void SctpDataChannelTransport::OnReadyToSendData() {
|
||||
ready_to_send_ = true;
|
||||
if (sink_) {
|
||||
sink_->OnReadyToSend();
|
||||
}
|
||||
}
|
||||
|
||||
void SctpDataChannelTransport::OnDataReceived(
|
||||
const cricket::ReceiveDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& buffer) {
|
||||
if (sink_) {
|
||||
sink_->OnDataReceived(params.sid, params.type, buffer);
|
||||
}
|
||||
}
|
||||
|
||||
void SctpDataChannelTransport::OnClosingProcedureStartedRemotely(
|
||||
int channel_id) {
|
||||
if (sink_) {
|
||||
sink_->OnChannelClosing(channel_id);
|
||||
}
|
||||
}
|
||||
|
||||
void SctpDataChannelTransport::OnClosingProcedureComplete(int channel_id) {
|
||||
if (sink_) {
|
||||
sink_->OnChannelClosed(channel_id);
|
||||
}
|
||||
}
|
||||
|
||||
void SctpDataChannelTransport::OnClosedAbruptly(RTCError error) {
|
||||
if (sink_) {
|
||||
sink_->OnTransportClosed(error);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace webrtc
|
||||
@ -1,54 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#ifndef PC_SCTP_DATA_CHANNEL_TRANSPORT_H_
|
||||
#define PC_SCTP_DATA_CHANNEL_TRANSPORT_H_
|
||||
|
||||
#include "api/rtc_error.h"
|
||||
#include "api/transport/data_channel_transport_interface.h"
|
||||
#include "media/base/media_channel.h"
|
||||
#include "media/sctp/sctp_transport_internal.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
// SCTP implementation of DataChannelTransportInterface.
|
||||
class SctpDataChannelTransport : public DataChannelTransportInterface,
|
||||
public sigslot::has_slots<> {
|
||||
public:
|
||||
explicit SctpDataChannelTransport(
|
||||
cricket::SctpTransportInternal* sctp_transport);
|
||||
|
||||
RTCError OpenChannel(int channel_id) override;
|
||||
RTCError SendData(int channel_id,
|
||||
const SendDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& buffer) override;
|
||||
RTCError CloseChannel(int channel_id) override;
|
||||
void SetDataSink(DataChannelSink* sink) override;
|
||||
bool IsReadyToSend() const override;
|
||||
|
||||
private:
|
||||
void OnReadyToSendData();
|
||||
void OnDataReceived(const cricket::ReceiveDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& buffer);
|
||||
void OnClosingProcedureStartedRemotely(int channel_id);
|
||||
void OnClosingProcedureComplete(int channel_id);
|
||||
void OnClosedAbruptly(RTCError error);
|
||||
|
||||
cricket::SctpTransportInternal* const sctp_transport_;
|
||||
|
||||
DataChannelSink* sink_ = nullptr;
|
||||
bool ready_to_send_ = false;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
#endif // PC_SCTP_DATA_CHANNEL_TRANSPORT_H_
|
||||
@ -28,9 +28,8 @@ SctpTransport::SctpTransport(
|
||||
info_(SctpTransportState::kNew),
|
||||
internal_sctp_transport_(std::move(internal)) {
|
||||
RTC_DCHECK(internal_sctp_transport_.get());
|
||||
internal_sctp_transport_->SignalAssociationChangeCommunicationUp.connect(
|
||||
this, &SctpTransport::OnAssociationChangeCommunicationUp);
|
||||
// TODO(https://bugs.webrtc.org/10360): Add handlers for transport closing.
|
||||
internal_sctp_transport_->SetOnConnectedCallback(
|
||||
[this]() { OnAssociationChangeCommunicationUp(); });
|
||||
|
||||
if (dtls_transport_) {
|
||||
UpdateInformation(SctpTransportState::kConnecting);
|
||||
@ -70,6 +69,54 @@ void SctpTransport::UnregisterObserver() {
|
||||
observer_ = nullptr;
|
||||
}
|
||||
|
||||
RTCError SctpTransport::OpenChannel(int channel_id) {
|
||||
RTC_DCHECK_RUN_ON(owner_thread_);
|
||||
RTC_DCHECK(internal_sctp_transport_);
|
||||
internal_sctp_transport_->OpenStream(channel_id);
|
||||
return RTCError::OK();
|
||||
}
|
||||
|
||||
RTCError SctpTransport::SendData(int channel_id,
|
||||
const SendDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& buffer) {
|
||||
RTC_DCHECK_RUN_ON(owner_thread_);
|
||||
RTC_DCHECK(internal_sctp_transport_);
|
||||
cricket::SendDataResult result;
|
||||
internal_sctp_transport_->SendData(channel_id, params, buffer, &result);
|
||||
|
||||
// TODO(mellem): See about changing the interfaces to not require mapping
|
||||
// SendDataResult to RTCError and back again.
|
||||
switch (result) {
|
||||
case cricket::SendDataResult::SDR_SUCCESS:
|
||||
return RTCError::OK();
|
||||
case cricket::SendDataResult::SDR_BLOCK:
|
||||
// Send buffer is full.
|
||||
return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
|
||||
case cricket::SendDataResult::SDR_ERROR:
|
||||
return RTCError(RTCErrorType::NETWORK_ERROR);
|
||||
}
|
||||
return RTCError(RTCErrorType::NETWORK_ERROR);
|
||||
}
|
||||
|
||||
RTCError SctpTransport::CloseChannel(int channel_id) {
|
||||
RTC_DCHECK_RUN_ON(owner_thread_);
|
||||
RTC_DCHECK(internal_sctp_transport_);
|
||||
internal_sctp_transport_->ResetStream(channel_id);
|
||||
return RTCError::OK();
|
||||
}
|
||||
|
||||
void SctpTransport::SetDataSink(DataChannelSink* sink) {
|
||||
RTC_DCHECK_RUN_ON(owner_thread_);
|
||||
RTC_DCHECK(internal_sctp_transport_);
|
||||
internal_sctp_transport_->SetDataChannelSink(sink);
|
||||
}
|
||||
|
||||
bool SctpTransport::IsReadyToSend() const {
|
||||
RTC_DCHECK_RUN_ON(owner_thread_);
|
||||
RTC_DCHECK(internal_sctp_transport_);
|
||||
return internal_sctp_transport_->ReadyToSendData();
|
||||
}
|
||||
|
||||
rtc::scoped_refptr<DtlsTransportInterface> SctpTransport::dtls_transport()
|
||||
const {
|
||||
RTC_DCHECK_RUN_ON(owner_thread_);
|
||||
|
||||
@ -17,11 +17,11 @@
|
||||
#include "api/scoped_refptr.h"
|
||||
#include "api/sctp_transport_interface.h"
|
||||
#include "api/sequence_checker.h"
|
||||
#include "api/transport/data_channel_transport_interface.h"
|
||||
#include "media/sctp/sctp_transport_internal.h"
|
||||
#include "p2p/base/dtls_transport_internal.h"
|
||||
#include "pc/dtls_transport.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||
#include "rtc_base/thread.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
|
||||
@ -33,16 +33,26 @@ namespace webrtc {
|
||||
// the same thread as the one the cricket::SctpTransportInternal object
|
||||
// lives on.
|
||||
class SctpTransport : public SctpTransportInterface,
|
||||
public sigslot::has_slots<> {
|
||||
public DataChannelTransportInterface {
|
||||
public:
|
||||
explicit SctpTransport(
|
||||
std::unique_ptr<cricket::SctpTransportInternal> internal);
|
||||
|
||||
// SctpTransportInterface
|
||||
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport() const override;
|
||||
SctpTransportInformation Information() const override;
|
||||
void RegisterObserver(SctpTransportObserverInterface* observer) override;
|
||||
void UnregisterObserver() override;
|
||||
|
||||
// DataChannelTransportInterface
|
||||
RTCError OpenChannel(int channel_id) override;
|
||||
RTCError SendData(int channel_id,
|
||||
const SendDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& buffer) override;
|
||||
RTCError CloseChannel(int channel_id) override;
|
||||
void SetDataSink(DataChannelSink* sink) override;
|
||||
bool IsReadyToSend() const override;
|
||||
|
||||
// Internal functions
|
||||
void Clear();
|
||||
void SetDtlsTransport(rtc::scoped_refptr<DtlsTransport>);
|
||||
|
||||
@ -39,6 +39,10 @@ namespace {
|
||||
|
||||
class FakeCricketSctpTransport : public cricket::SctpTransportInternal {
|
||||
public:
|
||||
void SetOnConnectedCallback(std::function<void()> callback) override {
|
||||
on_connected_callback_ = std::move(callback);
|
||||
}
|
||||
void SetDataChannelSink(DataChannelSink* sink) override {}
|
||||
void SetDtlsTransport(rtc::PacketTransportInternal* transport) override {}
|
||||
bool Start(int local_port, int remote_port, int max_message_size) override {
|
||||
return true;
|
||||
@ -60,20 +64,12 @@ class FakeCricketSctpTransport : public cricket::SctpTransportInternal {
|
||||
absl::optional<int> max_inbound_streams() const override {
|
||||
return max_inbound_streams_;
|
||||
}
|
||||
// Methods exposed for testing
|
||||
void SendSignalReadyToSendData() { SignalReadyToSendData(); }
|
||||
|
||||
void SendSignalAssociationChangeCommunicationUp() {
|
||||
SignalAssociationChangeCommunicationUp();
|
||||
ASSERT_TRUE(on_connected_callback_);
|
||||
on_connected_callback_();
|
||||
}
|
||||
|
||||
void SendSignalClosingProcedureStartedRemotely() {
|
||||
SignalClosingProcedureStartedRemotely(1);
|
||||
}
|
||||
|
||||
void SendSignalClosingProcedureComplete() {
|
||||
SignalClosingProcedureComplete(1);
|
||||
}
|
||||
void set_max_outbound_streams(int streams) {
|
||||
max_outbound_streams_ = streams;
|
||||
}
|
||||
@ -82,6 +78,7 @@ class FakeCricketSctpTransport : public cricket::SctpTransportInternal {
|
||||
private:
|
||||
absl::optional<int> max_outbound_streams_;
|
||||
absl::optional<int> max_inbound_streams_;
|
||||
std::function<void()> on_connected_callback_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
@ -134,7 +131,6 @@ class SctpTransportTest : public ::testing::Test {
|
||||
}
|
||||
|
||||
void CompleteSctpHandshake() {
|
||||
CricketSctpTransport()->SendSignalReadyToSendData();
|
||||
// The computed MaxChannels shall be the minimum of the outgoing
|
||||
// and incoming # of streams.
|
||||
CricketSctpTransport()->set_max_outbound_streams(kTestMaxSctpStreams);
|
||||
|
||||
@ -20,6 +20,8 @@
|
||||
// local/remote ports.
|
||||
class FakeSctpTransport : public cricket::SctpTransportInternal {
|
||||
public:
|
||||
void SetOnConnectedCallback(std::function<void()> callback) override {}
|
||||
void SetDataChannelSink(webrtc::DataChannelSink* sink) override {}
|
||||
void SetDtlsTransport(rtc::PacketTransportInternal* transport) override {}
|
||||
bool Start(int local_port, int remote_port, int max_message_size) override {
|
||||
local_port_.emplace(local_port);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user