Remove SctpDataChannelControllerInterface::ConnectDataChannel

Bug: webrtc:11547
Change-Id: I389cb641746ef892106c22fd46b8d70218b99f58
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/297421
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39608}
This commit is contained in:
Tommi 2023-03-20 14:43:09 +01:00 committed by WebRTC LUCI CQ
parent d20b1cf215
commit e9aa8675d7
11 changed files with 92 additions and 125 deletions

View File

@ -887,11 +887,9 @@ rtc_library("sctp_data_channel") {
"../rtc_base:macromagic",
"../rtc_base:ssl",
"../rtc_base:threading",
"../rtc_base:threading",
"../rtc_base:weak_ptr",
"../rtc_base/containers:flat_set",
"../rtc_base/system:unused",
"../rtc_base/third_party/sigslot:sigslot",
]
absl_deps = [
"//third_party/abseil-cpp/absl/cleanup",
@ -973,7 +971,6 @@ rtc_source_set("data_channel_controller") {
"../rtc_base:ssl",
"../rtc_base:threading",
"../rtc_base:weak_ptr",
"../rtc_base/third_party/sigslot",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",

View File

@ -43,14 +43,6 @@ bool DataChannelController::SendData(int sid,
return false;
}
bool DataChannelController::ConnectDataChannel(
SctpDataChannel* webrtc_data_channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/11547): This method can be removed once not
// needed by `SctpDataChannel`.
return data_channel_transport() ? true : false;
}
void DataChannelController::AddSctpDataStream(int sid) {
if (data_channel_transport()) {
network_thread()->BlockingCall([this, sid] {
@ -144,7 +136,7 @@ void DataChannelController::OnReadyToSend() {
data_channel_transport_ready_to_send_ = true;
auto copy = sctp_data_channels_;
for (const auto& channel : copy)
channel->OnTransportReady(true);
channel->OnTransportReady();
}));
}
@ -289,9 +281,19 @@ DataChannelController::InternalCreateSctpDataChannel(
}
// In case `sid` has changed. Update `new_config` accordingly.
new_config.id = sid.stream_id_int();
rtc::scoped_refptr<SctpDataChannel> channel(
SctpDataChannel::Create(weak_factory_.GetWeakPtr(), label, new_config,
signaling_thread(), network_thread()));
// TODO(bugs.webrtc.org/11547): The `data_channel_transport_` pointer belongs
// to the network thread but there are a few places where we check this
// pointer from the signaling thread. Instead of this approach, we should have
// a separate channel initialization step that runs on the network thread
// where we inform the channel of information about whether there's a
// transport or not, what the role is, and supply an id if any. Subsequently
// all that state in the channel code, is needed for callbacks from the
// transport which is already initiated from the network thread. Then we can
// Remove the trampoline code (see e.g. PostTask() calls in this file) that
// travels between the signaling and network threads.
rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
weak_factory_.GetWeakPtr(), label, data_channel_transport() != nullptr,
new_config, signaling_thread(), network_thread()));
if (!channel) {
sid_allocator_.ReleaseSid(sid);
return nullptr;
@ -402,6 +404,7 @@ bool DataChannelController::DataChannelSendData(
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(data_channel_transport());
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
auto copy = sctp_data_channels_;

View File

@ -51,7 +51,6 @@ class DataChannelController : public SctpDataChannelControllerInterface,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) override;
bool ConnectDataChannel(SctpDataChannel* webrtc_data_channel) override;
void AddSctpDataStream(int sid) override;
void RemoveSctpDataStream(int sid) override;
bool ReadyToSendData() const override;

View File

@ -98,11 +98,6 @@ TEST_F(DataChannelControllerTest, CloseAfterControllerDestroyed) {
auto channel = dcc->InternalCreateDataChannelWithProxy(
"label",
std::make_unique<InternalDataChannelInit>(DataChannelInit()).get());
// Connect to provider
auto inner_channel =
DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
channel.get());
dcc->ConnectDataChannel(inner_channel);
dcc.reset();
channel->Close();
}
@ -122,9 +117,6 @@ TEST_F(DataChannelControllerTest, AsyncChannelCloseTeardown) {
channel = nullptr; // dcc still holds a reference to `channel`.
EXPECT_TRUE(dcc.HasDataChannels());
// Make sure callbacks to dcc are set up.
dcc.ConnectDataChannel(inner_channel);
// Trigger a Close() for the channel. This will send events back to dcc,
// eventually reaching `OnSctpDataChannelClosed` where dcc removes
// the channel from the internal list of data channels, but does not release

View File

@ -71,19 +71,12 @@ class FakeDataChannelObserver : public DataChannelObserver {
size_t on_buffered_amount_change_count_;
};
// TODO(deadbeef): The fact that these tests use a fake controller makes them
// not too valuable. Should rewrite using the
// peerconnection_datachannel_unittest.cc infrastructure.
// TODO(bugs.webrtc.org/11547): Incorporate a dedicated network thread.
class SctpDataChannelTest : public ::testing::Test {
protected:
SctpDataChannelTest()
: controller_(new FakeDataChannelController()),
webrtc_data_channel_(SctpDataChannel::Create(controller_->weak_ptr(),
"test",
init_,
rtc::Thread::Current(),
rtc::Thread::Current())) {}
webrtc_data_channel_(controller_->CreateDataChannel("test", init_)) {}
void SetChannelReady() {
controller_->set_transport_available(true);
@ -137,8 +130,7 @@ TEST_F(SctpDataChannelTest, VerifyConfigurationGetters) {
TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
controller_->set_transport_available(true);
rtc::scoped_refptr<SctpDataChannel> dc =
SctpDataChannel::Create(controller_->weak_ptr(), "test1", init_,
rtc::Thread::Current(), rtc::Thread::Current());
controller_->CreateDataChannel("test1", init_);
EXPECT_TRUE(controller_->IsConnected(dc.get()));
// The sid is not set yet, so it should not have added the streams.
@ -150,16 +142,6 @@ TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
EXPECT_TRUE(controller_->IsRecvStreamAdded(dc->id()));
}
// Verifies that the data channel is connected to the transport if the transport
// is not available initially and becomes available later.
TEST_F(SctpDataChannelTest, ConnectedAfterTransportBecomesAvailable) {
EXPECT_FALSE(controller_->IsConnected(webrtc_data_channel_.get()));
controller_->set_transport_available(true);
webrtc_data_channel_->OnTransportChannelCreated();
EXPECT_TRUE(controller_->IsConnected(webrtc_data_channel_.get()));
}
// Tests the state of the data channel.
TEST_F(SctpDataChannelTest, StateTransition) {
AddObserver();
@ -327,8 +309,7 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
InternalDataChannelInit init;
init.id = 1;
rtc::scoped_refptr<SctpDataChannel> dc =
SctpDataChannel::Create(controller_->weak_ptr(), "test1", init,
rtc::Thread::Current(), rtc::Thread::Current());
controller_->CreateDataChannel("test1", init);
EXPECT_EQ(DataChannelInterface::kConnecting, dc->state());
EXPECT_TRUE_WAIT(DataChannelInterface::kOpen == dc->state(), 1000);
}
@ -341,8 +322,7 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) {
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
SctpDataChannel::Create(controller_->weak_ptr(), "test1", init,
rtc::Thread::Current(), rtc::Thread::Current());
controller_->CreateDataChannel("test1", init);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
@ -369,8 +349,7 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) {
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
SctpDataChannel::Create(controller_->weak_ptr(), "test1", init,
rtc::Thread::Current(), rtc::Thread::Current());
controller_->CreateDataChannel("test1", init);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
@ -449,8 +428,7 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) {
SetChannelReady();
rtc::scoped_refptr<SctpDataChannel> dc =
SctpDataChannel::Create(controller_->weak_ptr(), "test1", config,
rtc::Thread::Current(), rtc::Thread::Current());
controller_->CreateDataChannel("test1", config);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ(0, controller_->last_sid());
@ -511,8 +489,7 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) {
SetChannelReady();
rtc::scoped_refptr<SctpDataChannel> dc =
SctpDataChannel::Create(controller_->weak_ptr(), "test1", config,
rtc::Thread::Current(), rtc::Thread::Current());
controller_->CreateDataChannel("test1", config);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);

View File

@ -2125,10 +2125,10 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
// TODO(bugs.webrtc.org/11547): Supply a separate network thread.
FakeDataChannelController controller;
rtc::scoped_refptr<SctpDataChannel> dummy_channel_a = SctpDataChannel::Create(
controller.weak_ptr(), "DummyChannelA", InternalDataChannelInit(),
controller.weak_ptr(), "DummyChannelA", false, InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());
rtc::scoped_refptr<SctpDataChannel> dummy_channel_b = SctpDataChannel::Create(
controller.weak_ptr(), "DummyChannelB", InternalDataChannelInit(),
controller.weak_ptr(), "DummyChannelB", false, InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());
stats_->stats_collector()->OnSctpDataChannelStateChanged(

View File

@ -143,6 +143,7 @@ void SctpSidAllocator::ReleaseSid(const StreamId& sid) {
rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
const std::string& label,
bool connected_to_transport,
const InternalDataChannelInit& config,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread) {
@ -155,7 +156,8 @@ rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
}
auto channel = rtc::make_ref_counted<SctpDataChannel>(
config, std::move(controller), label, signaling_thread, network_thread);
config, std::move(controller), label, connected_to_transport,
signaling_thread, network_thread);
channel->Init();
return channel;
}
@ -172,6 +174,7 @@ SctpDataChannel::SctpDataChannel(
const InternalDataChannelInit& config,
rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
const std::string& label,
bool connected_to_transport,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread)
: signaling_thread_(signaling_thread),
@ -186,7 +189,8 @@ SctpDataChannel::SctpDataChannel(
negotiated_(config.negotiated),
ordered_(config.ordered),
observer_(nullptr),
controller_(std::move(controller)) {
controller_(std::move(controller)),
connected_to_transport_(connected_to_transport) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_UNUSED(network_thread_);
RTC_DCHECK(config.IsValid());
@ -203,27 +207,30 @@ SctpDataChannel::SctpDataChannel(
handshake_state_ = kHandshakeShouldSendAck;
break;
}
// Try to connect to the transport in case the transport channel already
// exists.
if (id_.HasValue()) {
controller_->AddSctpDataStream(id_.stream_id_int());
}
}
void SctpDataChannel::Init() {
RTC_DCHECK_RUN_ON(signaling_thread_);
// Try to connect to the transport in case the transport channel already
// exists.
OnTransportChannelCreated();
// Checks if the transport is ready to send because the initial channel
// ready signal may have been sent before the DataChannel creation.
// This has to be done async because the upper layer objects (e.g.
// Chrome glue and WebKit) are not wired up properly until after this
// function returns.
if (controller_->ReadyToSendData()) {
RTC_DCHECK(connected_to_transport_);
AddRef();
absl::Cleanup release = [this] { Release(); };
rtc::Thread::Current()->PostTask([this, release = std::move(release)] {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ != kClosed)
OnTransportReady(true);
OnTransportReady();
});
}
}
@ -362,10 +369,6 @@ void SctpDataChannel::SetSctpSid(const StreamId& sid) {
RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
RTC_DCHECK_EQ(state_, kConnecting);
if (id_ == sid) {
return;
}
id_ = sid;
controller_->AddSctpDataStream(sid.stream_id_int());
}
@ -397,12 +400,10 @@ void SctpDataChannel::OnClosingProcedureComplete() {
void SctpDataChannel::OnTransportChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (!controller_) {
return;
}
if (!connected_to_transport_) {
connected_to_transport_ = controller_->ConnectDataChannel(this);
}
RTC_DCHECK(controller_);
connected_to_transport_ = true;
// The sid may have been unassigned when controller_->ConnectDataChannel was
// done. So always add the streams even if connected_to_transport_ is true.
if (id_.HasValue()) {
@ -485,13 +486,23 @@ void SctpDataChannel::OnDataReceived(DataMessageType type,
}
}
void SctpDataChannel::OnTransportReady(bool writable) {
void SctpDataChannel::OnTransportReady() {
RTC_DCHECK_RUN_ON(signaling_thread_);
writable_ = writable;
if (!writable) {
return;
}
// TODO(tommi, hta): We don't need the `writable_` flag for SCTP datachannels.
// Remove it and just rely on `connected_to_transport_` instead.
// In practice the transport is configured inside
// `PeerConnection::SetupDataChannelTransport_n`, which results in
// `SctpDataChannel` getting the OnTransportChannelCreated callback, and then
// that's immediately followed by calling `transport->SetDataSink` which is
// what triggers the callback to `OnTransportReady()`.
// These steps are currently accomplished via two separate PostTask calls to
// the signaling thread, but could simply be done in single method call on
// the network thread (which incidentally is the thread that we'll need to
// be on for the below `Send*` calls, which currently do a BlockingCall
// from the signaling thread to the network thread.
RTC_DCHECK(connected_to_transport_);
writable_ = true;
SendQueuedControlMessages();
SendQueuedDataMessages();
@ -506,7 +517,7 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
return;
}
DisconnectFromTransport();
connected_to_transport_ = false;
// Closing abruptly means any queued data gets thrown away.
queued_send_data_.Clear();
@ -554,6 +565,8 @@ void SctpDataChannel::UpdateState() {
// Deliver them now.
DeliverQueuedReceivedData();
}
} else {
RTC_DCHECK(!id_.HasValue());
}
break;
}
@ -603,14 +616,6 @@ void SctpDataChannel::SetState(DataState state) {
controller_->OnChannelStateChanged(this, state_);
}
void SctpDataChannel::DisconnectFromTransport() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (!connected_to_transport_ || !controller_)
return;
connected_to_transport_ = false;
}
void SctpDataChannel::DeliverQueuedReceivedData() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (!observer_) {
@ -727,6 +732,7 @@ void SctpDataChannel::QueueControlMessage(
bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(writable_);
RTC_DCHECK(connected_to_transport_);
RTC_DCHECK(id_.HasValue());
if (!controller_) {

View File

@ -36,8 +36,6 @@ namespace webrtc {
class SctpDataChannel;
// TODO(deadbeef): Get rid of this and have SctpDataChannel depend on
// SctpTransportInternal (pure virtual SctpTransport interface) instead.
class SctpDataChannelControllerInterface {
public:
// Sends the data to the transport.
@ -45,8 +43,6 @@ class SctpDataChannelControllerInterface {
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) = 0;
// Connects to the transport signals.
virtual bool ConnectDataChannel(SctpDataChannel* data_channel) = 0;
// Adds the data channel SID to the transport for SCTP.
virtual void AddSctpDataStream(int sid) = 0;
// Begins the closing procedure by sending an outgoing stream reset. Still
@ -123,6 +119,7 @@ class SctpDataChannel : public DataChannelInterface {
static rtc::scoped_refptr<SctpDataChannel> Create(
rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
const std::string& label,
bool connected_to_transport,
const InternalDataChannelInit& config,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread);
@ -175,7 +172,7 @@ class SctpDataChannel : public DataChannelInterface {
// Called when the SctpTransport's ready to use. That can happen when we've
// finished negotiation, or if the channel was created after negotiation has
// already finished.
void OnTransportReady(bool writable);
void OnTransportReady();
void OnDataReceived(DataMessageType type,
const rtc::CopyOnWriteBuffer& payload);
@ -192,7 +189,6 @@ class SctpDataChannel : public DataChannelInterface {
// asynchronously after RemoveSctpDataStream.
void OnClosingProcedureComplete();
// Called when the transport channel is created.
// Only needs to be called for SCTP data channels.
void OnTransportChannelCreated();
// Called when the transport channel is unusable.
// This method makes sure the DataChannel is disconnected and changes state
@ -211,6 +207,7 @@ class SctpDataChannel : public DataChannelInterface {
SctpDataChannel(const InternalDataChannelInit& config,
rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
const std::string& label,
bool connected_to_transport,
rtc::Thread* signaling_thread,
rtc::Thread* network_thread);
~SctpDataChannel() override;
@ -228,7 +225,6 @@ class SctpDataChannel : public DataChannelInterface {
void Init();
void UpdateState();
void SetState(DataState state);
void DisconnectFromTransport();
void DeliverQueuedReceivedData();

View File

@ -12,6 +12,7 @@
#define PC_TEST_FAKE_DATA_CHANNEL_CONTROLLER_H_
#include <set>
#include <string>
#include "pc/sctp_data_channel.h"
#include "rtc_base/checks.h"
@ -31,6 +32,18 @@ class FakeDataChannelController
return weak_factory_.GetWeakPtr();
}
rtc::scoped_refptr<webrtc::SctpDataChannel> CreateDataChannel(
absl::string_view label,
webrtc::InternalDataChannelInit init,
rtc::Thread* network_thread = rtc::Thread::Current()) {
rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
webrtc::SctpDataChannel::Create(weak_ptr(), std::string(label),
transport_available_, init,
rtc::Thread::Current(), network_thread);
connected_channels_.insert(channel.get());
return channel;
}
bool SendData(int sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
@ -52,17 +65,6 @@ class FakeDataChannelController
return true;
}
bool ConnectDataChannel(webrtc::SctpDataChannel* data_channel) override {
RTC_CHECK(connected_channels_.find(data_channel) ==
connected_channels_.end());
if (!transport_available_) {
return false;
}
RTC_LOG(LS_VERBOSE) << "DataChannel connected " << data_channel;
connected_channels_.insert(data_channel);
return true;
}
void AddSctpDataStream(int sid) override {
RTC_CHECK(sid >= 0);
if (!transport_available_) {
@ -78,15 +80,12 @@ class FakeDataChannelController
recv_ssrcs_.erase(sid);
// Unlike the real SCTP transport, act like the closing procedure finished
// instantly, doing the same snapshot thing as below.
for (webrtc::SctpDataChannel* ch : std::set<webrtc::SctpDataChannel*>(
connected_channels_.begin(), connected_channels_.end())) {
if (connected_channels_.count(ch) && ch->id() == sid) {
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
ch->OnClosingProcedureComplete();
return;
}
}
auto it = absl::c_find_if(connected_channels_,
[&](const auto* c) { return c->id() == sid; });
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
if (it != connected_channels_.end())
(*it)->OnClosingProcedureComplete();
}
bool ReadyToSendData() const override { return ready_to_send_; }
@ -106,15 +105,12 @@ class FakeDataChannelController
void set_send_blocked(bool blocked) {
send_blocked_ = blocked;
if (!blocked) {
// Take a snapshot of the connected channels and check to see whether
// each value is still in connected_channels_ before calling
// OnTransportReady(). This avoids problems where the set gets modified
// in response to OnTransportReady().
for (webrtc::SctpDataChannel* ch : std::set<webrtc::SctpDataChannel*>(
connected_channels_.begin(), connected_channels_.end())) {
if (connected_channels_.count(ch)) {
ch->OnTransportReady(true);
}
RTC_CHECK(transport_available_);
// Make a copy since `connected_channels_` may change while
// OnTransportReady is called.
auto copy = connected_channels_;
for (webrtc::SctpDataChannel* ch : copy) {
ch->OnTransportReady();
}
}
}
@ -134,7 +130,7 @@ class FakeDataChannelController
std::set<webrtc::SctpDataChannel*>::iterator it;
for (it = connected_channels_.begin(); it != connected_channels_.end();
++it) {
(*it)->OnTransportReady(true);
(*it)->OnTransportReady();
}
}
}

View File

@ -343,7 +343,7 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase {
const InternalDataChannelInit& init) {
// TODO(bugs.webrtc.org/11547): Supply a separate network thread.
AddSctpDataChannel(SctpDataChannel::Create(
data_channel_controller_.weak_ptr(), label, init,
data_channel_controller_.weak_ptr(), label, false, init,
rtc::Thread::Current(), rtc::Thread::Current()));
}

View File

@ -50,6 +50,7 @@ class MockSctpDataChannel : public SctpDataChannel {
: SctpDataChannel(config,
std::move(controller),
label,
false,
signaling_thread,
network_thread) {
EXPECT_CALL(*this, id()).WillRepeatedly(::testing::Return(id));