[DataChannel] Send and receive packets on the network thread.

This updates sctp channels, including work that happens between the
data channel controller and the transport, to run on the network
thread. Previously all network traffic related to data channels was
routed through the signaling thread before going to either the network
thread or the caller's thread (e.g. js thread in chrome). Now the
calls can go straight from the network thread to the JS thread with
enabling a special flag on the observer (see below) and similarly
calls to send data, involve 2 threads instead of 3.

* Custom data channel observer adapter implementation that
  maintains compatibility with existing observer implementations in
  that notifications are delivered on the signaling thread.
  The adapter can be explicitly disabled for implementations that
  want to optimize the callback path and promise to not block the
  network thread.
* Remove the signaling thread copy of data channels in the controller.
* Remove several PostTask operations that were needed to keep things
  in sync (but the need has gone away).
* Update tests for the controller to consistently call
  TeardownDataChannelTransport_n to match with production.
* Update stats collectors (current and legacy) to fetch the data
  channel stats on the network thread where they're maintained.
* Remove the AsyncChannelCloseTeardown test since the async teardown
  step has gone away.
* Remove `sid_s` in the channel code since we only need the network
  state now.
* For the custom observer support (with and without data adapter) and
  maintain compatibility with existing implementations, added a new
  proxy macro that allows an implementation to selectively provide
  its own implementation without being proxied. This is used for
  registering/unregistering a data channel observer.
* Update the data channel proxy to map most methods to the network
  thread, avoiding the interim jump to the signaling thread.
* Update a plethora of thread checkers from signaling to network.

Bug: webrtc:11547
Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39760}
This commit is contained in:
Tommi 2023-04-04 13:48:34 +02:00 committed by WebRTC LUCI CQ
parent 8481f6358e
commit fe53fec24e
15 changed files with 622 additions and 476 deletions

View File

@ -100,6 +100,17 @@ class DataChannelObserver {
// The data channel's buffered_amount has changed.
virtual void OnBufferedAmountChange(uint64_t sent_data_size) {}
// Override this to get callbacks directly on the network thread.
// An implementation that does that must not block the network thread
// but rather only use the callback to trigger asynchronous processing
// elsewhere as a result of the notification.
// The default return value, `false`, means that notifications will be
// delivered on the signaling thread associated with the peerconnection
// instance.
// TODO(webrtc:11547): Eventually all DataChannelObserver implementations
// should be called on the network thread and this method removed.
virtual bool IsOkToCallOnTheNetworkThread() { return false; }
protected:
virtual ~DataChannelObserver() = default;
};

View File

@ -29,8 +29,15 @@ DataChannelController::~DataChannelController() {
}
bool DataChannelController::HasDataChannelsForTest() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return !sctp_data_channels_.empty();
auto has_channels = [&] {
RTC_DCHECK_RUN_ON(network_thread());
return !sctp_data_channels_n_.empty();
};
if (network_thread()->IsCurrent())
return has_channels();
return network_thread()->BlockingCall(std::move(has_channels));
}
bool DataChannelController::HasUsedDataChannels() const {
@ -66,11 +73,15 @@ void DataChannelController::RemoveSctpDataStream(StreamId sid) {
void DataChannelController::OnChannelStateChanged(
SctpDataChannel* channel,
DataChannelInterface::DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
if (state == DataChannelInterface::DataState::kClosed)
OnSctpDataChannelClosed(channel);
pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state);
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(),
[this, channel_id = channel->internal_id(), state = state] {
pc_->OnSctpDataChannelStateChanged(channel_id, state);
}));
}
void DataChannelController::OnDataReceived(
@ -82,27 +93,22 @@ void DataChannelController::OnDataReceived(
if (HandleOpenMessage_n(channel_id, type, buffer))
return;
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/11547): The data being received should be
// delivered on the network thread.
auto it = FindChannel(StreamId(channel_id));
if (it != sctp_data_channels_.end())
(*it)->OnDataReceived(type, buffer);
}));
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c->sid_n().stream_id_int() == channel_id;
});
if (it != sctp_data_channels_n_.end())
(*it)->OnDataReceived(type, buffer);
}
void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [this, channel_id] {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/11547): Should run on the network thread.
auto it = FindChannel(StreamId(channel_id));
if (it != sctp_data_channels_.end())
(*it)->OnClosingProcedureStartedRemotely();
}));
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c->sid_n().stream_id_int() == channel_id;
});
if (it != sctp_data_channels_n_.end())
(*it)->OnClosingProcedureStartedRemotely();
}
void DataChannelController::OnChannelClosed(int channel_id) {
@ -112,48 +118,44 @@ void DataChannelController::OnChannelClosed(int channel_id) {
auto it = absl::c_find_if(sctp_data_channels_n_,
[&](const auto& c) { return c->sid_n() == sid; });
if (it != sctp_data_channels_n_.end())
if (it != sctp_data_channels_n_.end()) {
rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
sctp_data_channels_n_.erase(it);
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
RTC_DCHECK_RUN_ON(signaling_thread());
auto it = FindChannel(sid);
// Remove the channel from our list, close it and free up resources.
if (it != sctp_data_channels_.end()) {
rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
// Note: this causes OnSctpDataChannelClosed() to not do anything
// when called from within `OnClosingProcedureComplete`.
sctp_data_channels_.erase(it);
channel->OnClosingProcedureComplete();
}
}));
channel->OnClosingProcedureComplete();
}
}
void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
auto copy = sctp_data_channels_;
for (const auto& channel : copy)
auto copy = sctp_data_channels_n_;
for (const auto& channel : copy) {
if (channel->sid_n().HasValue()) {
channel->OnTransportReady();
}));
} else {
// This happens for role==SSL_SERVER channels when we get notified by
// the transport *before* the SDP code calls `AllocateSctpSids` to
// trigger assignment of sids. In this case OnTransportReady() will be
// called from within `AllocateSctpSids` below.
RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel.";
}
}
}
void DataChannelController::OnTransportClosed(RTCError error) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [this, error] {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportChannelClosed(error);
}));
// This loop will close all data channels and trigger a callback to
// `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so
// we create a local copy while we do the fan-out.
auto copy = sctp_data_channels_n_;
for (const auto& channel : copy)
channel->OnTransportChannelClosed(error);
}
void DataChannelController::SetupDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
// There's a new data channel transport. This needs to be signaled to the
// `sctp_data_channels_` so that they can reopen and reconnect. This is
// `sctp_data_channels_n_` so that they can reopen and reconnect. This is
// necessary when bundling is applied.
NotifyDataChannelsOfTransportCreated();
}
@ -165,11 +167,12 @@ void DataChannelController::PrepareForShutdown() {
void DataChannelController::TeardownDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport()) {
data_channel_transport()->SetDataSink(nullptr);
if (data_channel_transport_) {
data_channel_transport_->SetDataSink(nullptr);
set_data_channel_transport(nullptr);
}
set_data_channel_transport(nullptr);
sctp_data_channels_n_.clear();
weak_factory_.InvalidateWeakPtrs();
}
void DataChannelController::OnTransportChanged(
@ -185,7 +188,7 @@ void DataChannelController::OnTransportChanged(
new_data_channel_transport->SetDataSink(this);
// There's a new data channel transport. This needs to be signaled to the
// `sctp_data_channels_` so that they can reopen and reconnect. This is
// `sctp_data_channels_n_` so that they can reopen and reconnect. This is
// necessary when bundling is applied.
NotifyDataChannelsOfTransportCreated();
}
@ -194,10 +197,10 @@ void DataChannelController::OnTransportChanged(
std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
const {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
std::vector<DataChannelStats> stats;
stats.reserve(sctp_data_channels_.size());
for (const auto& channel : sctp_data_channels_)
stats.reserve(sctp_data_channels_n_.size());
for (const auto& channel : sctp_data_channels_n_)
stats.push_back(channel->GetStats());
return stats;
}
@ -219,28 +222,38 @@ bool DataChannelController::HandleOpenMessage_n(
<< channel_id;
} else {
config.open_handshake_role = InternalDataChannelInit::kAcker;
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(),
[this, label = std::move(label), config = std::move(config)] {
RTC_DCHECK_RUN_ON(signaling_thread());
OnDataChannelOpenMessage(label, config);
}));
auto channel_or_error = CreateDataChannel(label, config);
if (channel_or_error.ok()) {
signaling_thread()->PostTask(SafeTask(
signaling_safety_.flag(),
[this, channel = channel_or_error.MoveValue(),
ready_to_send = data_channel_transport_->IsReadyToSend()] {
RTC_DCHECK_RUN_ON(signaling_thread());
OnDataChannelOpenMessage(std::move(channel), ready_to_send);
}));
} else {
RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
<< ToString(channel_or_error.error().type());
}
}
return true;
}
void DataChannelController::OnDataChannelOpenMessage(
const std::string& label,
const InternalDataChannelInit& config) {
auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
if (!channel_or_error.ok()) {
RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
<< ToString(channel_or_error.error().type());
return;
}
rtc::scoped_refptr<SctpDataChannel> channel,
bool ready_to_send) {
has_used_data_channels_ = true;
auto proxy = SctpDataChannel::CreateProxy(channel);
pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
pc_->Observer()->OnDataChannel(proxy);
pc_->NoteDataAddedEvent();
if (ready_to_send) {
network_thread()->PostTask([channel = std::move(channel)] {
if (channel->state() != DataChannelInterface::DataState::kClosed)
channel->OnTransportReady();
});
}
}
// RTC_RUN_ON(network_thread())
@ -269,6 +282,31 @@ RTCError DataChannelController::ReserveOrAllocateSid(
return RTCError::OK();
}
// RTC_RUN_ON(network_thread())
RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
DataChannelController::CreateDataChannel(const std::string& label,
InternalDataChannelInit& config) {
StreamId sid(config.id);
RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
if (!err.ok())
return err;
// In case `sid` has changed. Update `config` accordingly.
config.id = sid.stream_id_int();
rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
config, signaling_thread(), network_thread());
RTC_DCHECK(channel);
sctp_data_channels_n_.push_back(channel);
// If we have an id already, notify the transport.
if (sid.HasValue())
AddSctpDataStream(sid);
return channel;
}
RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
DataChannelController::InternalCreateDataChannelWithProxy(
const std::string& label,
@ -283,29 +321,25 @@ DataChannelController::InternalCreateDataChannelWithProxy(
bool ready_to_send = false;
InternalDataChannelInit new_config = config;
StreamId sid(new_config.id);
auto weak_ptr = weak_factory_.GetWeakPtr();
RTC_DCHECK(weak_ptr); // Associate with current thread.
auto ret = network_thread()->BlockingCall(
[&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
RTC_DCHECK_RUN_ON(network_thread());
RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
if (!err.ok())
return err;
// In case `sid` has changed. Update `new_config` accordingly.
new_config.id = sid.stream_id_int();
auto channel = CreateDataChannel(label, new_config);
if (!channel.ok())
return channel;
ready_to_send =
data_channel_transport_ && data_channel_transport_->IsReadyToSend();
rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
std::move(weak_ptr), label, data_channel_transport_ != nullptr,
new_config, signaling_thread(), network_thread()));
RTC_DCHECK(channel);
sctp_data_channels_n_.push_back(channel);
// If we have an id already, notify the transport.
if (sid.HasValue())
AddSctpDataStream(sid);
if (ready_to_send) {
// If the transport is ready to send because the initial channel
// ready signal may have been sent before the DataChannel creation.
// This has to be done async because the upper layer objects (e.g.
// Chrome glue and WebKit) are not wired up properly until after
// `InternalCreateDataChannelWithProxy` returns.
network_thread()->PostTask([channel = channel.value()] {
if (channel->state() != DataChannelInterface::DataState::kClosed)
channel->OnTransportReady();
});
}
return channel;
});
@ -313,114 +347,71 @@ DataChannelController::InternalCreateDataChannelWithProxy(
if (!ret.ok())
return ret.MoveError();
if (ready_to_send) {
// Checks if the transport is ready to send because the initial channel
// ready signal may have been sent before the DataChannel creation.
// This has to be done async because the upper layer objects (e.g.
// Chrome glue and WebKit) are not wired up properly until after this
// function returns.
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
if (channel->state() != DataChannelInterface::DataState::kClosed)
channel->OnTransportReady();
}));
}
sctp_data_channels_.push_back(ret.value());
has_used_data_channels_ = true;
return SctpDataChannel::CreateProxy(ret.MoveValue());
}
void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
const bool ready_to_send =
data_channel_transport_ && data_channel_transport_->IsReadyToSend();
std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
network_thread()->BlockingCall([&] {
RTC_DCHECK_RUN_ON(network_thread());
for (auto it = sctp_data_channels_n_.begin();
it != sctp_data_channels_n_.end();) {
if (!(*it)->sid_n().HasValue()) {
StreamId sid = sid_allocator_.AllocateSid(role);
if (sid.HasValue()) {
(*it)->SetSctpSid_n(sid);
AddSctpDataStream(sid);
channels_to_update.push_back(std::make_pair((*it).get(), sid));
} else {
channels_to_close.push_back(std::move(*it));
it = sctp_data_channels_n_.erase(it);
continue;
for (auto it = sctp_data_channels_n_.begin();
it != sctp_data_channels_n_.end();) {
if (!(*it)->sid_n().HasValue()) {
StreamId sid = sid_allocator_.AllocateSid(role);
if (sid.HasValue()) {
(*it)->SetSctpSid_n(sid);
AddSctpDataStream(sid);
if (ready_to_send) {
RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
(*it)->OnTransportReady();
}
channels_to_update.push_back(std::make_pair((*it).get(), sid));
} else {
channels_to_close.push_back(std::move(*it));
it = sctp_data_channels_n_.erase(it);
continue;
}
++it;
}
});
++it;
}
// Since closing modifies the list of channels, we have to do the actual
// closing outside the loop.
for (const auto& channel : channels_to_close) {
channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
// The channel should now have been removed from sctp_data_channels_.
RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
return c.get() == channel.get();
}) == sctp_data_channels_.end());
}
for (auto& pair : channels_to_update) {
auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
return c.get() == pair.first;
});
RTC_DCHECK(it != sctp_data_channels_.end());
(*it)->SetSctpSid_s(pair.second);
}
}
void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
network_thread()->BlockingCall([&] {
RTC_DCHECK_RUN_ON(network_thread());
// After the closing procedure is done, it's safe to use this ID for
// another data channel.
if (channel->sid_n().HasValue()) {
sid_allocator_.ReleaseSid(channel->sid_n());
}
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c.get() == channel;
});
if (it != sctp_data_channels_n_.end())
sctp_data_channels_n_.erase(it);
});
for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
++it) {
if (it->get() == channel) {
// Since this method is triggered by a signal from the DataChannel,
// we can't free it directly here; we need to free it asynchronously.
rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
sctp_data_channels_.erase(it);
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(),
[release = std::move(release)] {}));
return;
}
RTC_DCHECK_RUN_ON(network_thread());
// After the closing procedure is done, it's safe to use this ID for
// another data channel.
if (channel->sid_n().HasValue()) {
sid_allocator_.ReleaseSid(channel->sid_n());
}
auto it = absl::c_find_if(sctp_data_channels_n_,
[&](const auto& c) { return c.get() == channel; });
if (it != sctp_data_channels_n_.end())
sctp_data_channels_n_.erase(it);
}
void DataChannelController::OnTransportChannelClosed(RTCError error) {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
// Use a temporary copy of the SCTP DataChannel list because the
// DataChannel may callback to us and try to modify the list.
// TODO(tommi): `OnTransportChannelClosed` is called from
// `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before
// `TeardownDataChannelTransport_n` is called (but on the network thread) from
// the same function. Once `sctp_data_channels_` moves to the network thread,
// we can get rid of this function (OnTransportChannelClosed) and run this
// loop from within the TeardownDataChannelTransport_n callback.
// the same function. We can now get rid of this function
// (OnTransportChannelClosed) and run this loop from within the
// TeardownDataChannelTransport_n callback.
std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
temp_sctp_dcs.swap(sctp_data_channels_);
temp_sctp_dcs.swap(sctp_data_channels_n_);
for (const auto& channel : temp_sctp_dcs) {
channel->OnTransportChannelClosed(error);
}
@ -444,16 +435,10 @@ RTCError DataChannelController::DataChannelSendData(
StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) {
// TODO(bugs.webrtc.org/11547): Expect method to be called on the network
// thread instead. Remove the BlockingCall() below and move associated state
// to the network thread.
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(data_channel_transport());
return network_thread()->BlockingCall([this, sid, params, payload] {
return data_channel_transport()->SendData(sid.stream_id_int(), params,
payload);
});
return data_channel_transport()->SendData(sid.stream_id_int(), params,
payload);
}
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
@ -463,22 +448,8 @@ void DataChannelController::NotifyDataChannelsOfTransportCreated() {
for (const auto& channel : sctp_data_channels_n_) {
if (channel->sid_n().HasValue())
AddSctpDataStream(channel->sid_n());
channel->OnTransportChannelCreated();
}
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
for (const auto& channel : sctp_data_channels_) {
channel->OnTransportChannelCreated();
}
}));
}
std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator
DataChannelController::FindChannel(StreamId stream_id) {
RTC_DCHECK_RUN_ON(signaling_thread());
return absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
return c->sid_s() == stream_id;
});
}
rtc::Thread* DataChannelController::network_thread() const {

View File

@ -107,6 +107,11 @@ class DataChannelController : public SctpDataChannelControllerInterface,
rtc::Thread* signaling_thread() const;
private:
// Creates a new SctpDataChannel object on the network thread.
RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> CreateDataChannel(
const std::string& label,
InternalDataChannelInit& config) RTC_RUN_ON(network_thread());
// Parses and handles open messages. Returns true if the message is an open
// message and should be considered to be handled, false otherwise.
bool HandleOpenMessage_n(int channel_id,
@ -114,8 +119,8 @@ class DataChannelController : public SctpDataChannelControllerInterface,
const rtc::CopyOnWriteBuffer& buffer)
RTC_RUN_ON(network_thread());
// Called when a valid data channel OPEN message is received.
void OnDataChannelOpenMessage(const std::string& label,
const InternalDataChannelInit& config)
void OnDataChannelOpenMessage(rtc::scoped_refptr<SctpDataChannel> channel,
bool ready_to_send)
RTC_RUN_ON(signaling_thread());
// Accepts a `StreamId` which may be pre-negotiated or unassigned. For
@ -139,9 +144,6 @@ class DataChannelController : public SctpDataChannelControllerInterface,
// (calls OnTransportChannelCreated on the signaling thread).
void NotifyDataChannelsOfTransportCreated();
std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator FindChannel(
StreamId stream_id);
// Plugin transport used for data channels. Pointer may be accessed and
// checked from any thread, but the object may only be touched on the
// network thread.
@ -149,21 +151,16 @@ class DataChannelController : public SctpDataChannelControllerInterface,
// thread.
DataChannelTransportInterface* data_channel_transport_ = nullptr;
SctpSidAllocator sid_allocator_ RTC_GUARDED_BY(network_thread());
std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_
RTC_GUARDED_BY(signaling_thread());
// TODO(bugs.webrtc.org/11547): This vector will eventually take over from
// `sctp_data_channels_`. While we're migrating away from thread hops
// between the signaling and network threads, we need both, so this is
// a temporary situation.
std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_n_
RTC_GUARDED_BY(network_thread());
bool has_used_data_channels_ RTC_GUARDED_BY(signaling_thread()) = false;
// Owning PeerConnection.
PeerConnectionInternal* const pc_;
// The weak pointers must be dereferenced and invalidated on the signalling
// The weak pointers must be dereferenced and invalidated on the network
// thread only.
rtc::WeakPtrFactory<DataChannelController> weak_factory_{this};
rtc::WeakPtrFactory<DataChannelController> weak_factory_
RTC_GUARDED_BY(network_thread()){this};
ScopedTaskSafety signaling_safety_;
};

View File

@ -131,44 +131,6 @@ TEST_F(DataChannelControllerTest, CloseAfterControllerDestroyed) {
channel->Close();
}
TEST_F(DataChannelControllerTest, AsyncChannelCloseTeardown) {
DataChannelControllerForTest dcc(pc_.get());
auto ret = dcc.InternalCreateDataChannelWithProxy(
"label", InternalDataChannelInit(DataChannelInit()));
ASSERT_TRUE(ret.ok());
auto channel = ret.MoveValue();
SctpDataChannel* inner_channel =
DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
channel.get());
// Grab a reference for testing purposes.
inner_channel->AddRef();
channel = nullptr; // dcc still holds a reference to `channel`.
EXPECT_TRUE(dcc.HasDataChannelsForTest());
// Trigger a Close() for the channel. This will send events back to dcc,
// eventually reaching `OnSctpDataChannelClosed` where dcc removes
// the channel from the internal list of data channels, but does not release
// the reference synchronously since that reference might be the last one.
inner_channel->Close();
// Now there should be no tracked data channels.
EXPECT_FALSE(dcc.HasDataChannelsForTest());
// But there should be an async operation queued that still holds a reference.
// That means that the test reference, must not be the last one.
ASSERT_NE(inner_channel->Release(),
rtc::RefCountReleaseStatus::kDroppedLastRef);
// Grab a reference again (using the pointer is safe since the object still
// exists and we control the single-threaded environment manually).
inner_channel->AddRef();
// Now run the queued up async operations on the signaling (current) thread.
// This time, the reference formerly owned by dcc, should be release and the
// truly last reference is now held by the test.
run_loop_.Flush();
// Check that this is the last reference.
EXPECT_EQ(inner_channel->Release(),
rtc::RefCountReleaseStatus::kDroppedLastRef);
}
// Allocate the maximum number of data channels and then one more.
// The last allocation should fail.
TEST_F(DataChannelControllerTest, MaxChannels) {

View File

@ -40,18 +40,19 @@ static constexpr int kDefaultTimeout = 10000;
class FakeDataChannelObserver : public DataChannelObserver {
public:
FakeDataChannelObserver()
: messages_received_(0),
on_state_change_count_(0),
on_buffered_amount_change_count_(0) {}
FakeDataChannelObserver() {
// This implementation relies on the SctpDataChannel::ObserverAdapter
// implementation to post events to the signaling thread.
RTC_DCHECK(!IsOkToCallOnTheNetworkThread());
}
void OnStateChange() { ++on_state_change_count_; }
void OnStateChange() override { ++on_state_change_count_; }
void OnBufferedAmountChange(uint64_t previous_amount) {
void OnBufferedAmountChange(uint64_t previous_amount) override {
++on_buffered_amount_change_count_;
}
void OnMessage(const DataBuffer& buffer) { ++messages_received_; }
void OnMessage(const DataBuffer& buffer) override { ++messages_received_; }
size_t messages_received() const { return messages_received_; }
@ -68,9 +69,9 @@ class FakeDataChannelObserver : public DataChannelObserver {
}
private:
size_t messages_received_;
size_t on_state_change_count_;
size_t on_buffered_amount_change_count_;
size_t messages_received_ = 0u;
size_t on_state_change_count_ = 0u;
size_t on_buffered_amount_change_count_ = 0u;
};
class SctpDataChannelTest : public ::testing::Test {
@ -93,11 +94,17 @@ class SctpDataChannelTest : public ::testing::Test {
void SetChannelReady() {
controller_->set_transport_available(true);
inner_channel_->OnTransportChannelCreated();
if (!inner_channel_->sid_s().HasValue()) {
SetChannelSid(inner_channel_, StreamId(0));
}
StreamId sid(0);
network_thread_.BlockingCall([&]() {
RTC_DCHECK_RUN_ON(&network_thread_);
if (!inner_channel_->sid_n().HasValue()) {
inner_channel_->SetSctpSid_n(sid);
controller_->AddSctpDataStream(sid);
}
inner_channel_->OnTransportChannelCreated();
});
controller_->set_ready_to_send(true);
run_loop_.Flush();
}
// TODO(bugs.webrtc.org/11547): This mirrors what the DataChannelController
@ -108,9 +115,10 @@ class SctpDataChannelTest : public ::testing::Test {
void SetChannelSid(const rtc::scoped_refptr<SctpDataChannel>& channel,
StreamId sid) {
RTC_DCHECK(sid.HasValue());
network_thread_.BlockingCall(
[&]() { controller_->AddSctpDataStream(sid); });
channel->SetSctpSid_s(sid);
network_thread_.BlockingCall([&]() {
channel->SetSctpSid_n(sid);
controller_->AddSctpDataStream(sid);
});
}
void AddObserver() {
@ -146,11 +154,13 @@ TEST_F(SctpDataChannelTest, VerifyConfigurationGetters) {
// Check the non-const part of the configuration.
EXPECT_EQ(channel_->id(), init_.id);
EXPECT_EQ(inner_channel_->sid_s(), StreamId());
network_thread_.BlockingCall(
[&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId()); });
SetChannelReady();
EXPECT_EQ(channel_->id(), 0);
EXPECT_EQ(inner_channel_->sid_s(), StreamId(0));
network_thread_.BlockingCall(
[&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId(0)); });
}
// Verifies that the data channel is connected to the transport after creation.
@ -158,13 +168,15 @@ TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
controller_->set_transport_available(true);
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init_);
EXPECT_TRUE(controller_->IsConnected(dc.get()));
// The sid is not set yet, so it should not have added the streams.
EXPECT_FALSE(controller_->IsStreamAdded(dc->sid_s()));
StreamId sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
EXPECT_FALSE(controller_->IsStreamAdded(sid));
SetChannelSid(dc, StreamId(0));
EXPECT_TRUE(controller_->IsStreamAdded(dc->sid_s()));
sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
EXPECT_TRUE(controller_->IsStreamAdded(sid));
}
// Tests the state of the data channel.
@ -183,7 +195,7 @@ TEST_F(SctpDataChannelTest, StateTransition) {
channel_->Close();
// The (simulated) transport close notifications runs on the network thread
// and posts a completion notification to the signaling (current) thread.
// Allow that ooperation to complete before checking the state.
// Allow that operation to complete before checking the state.
run_loop_.Flush();
EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
EXPECT_EQ(observer_->on_state_change_count(), 3u);
@ -201,6 +213,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
EXPECT_TRUE(channel_->Send(buffer));
size_t successful_send_count = 1;
run_loop_.Flush();
EXPECT_EQ(0U, channel_->buffered_amount());
EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count());
@ -217,6 +230,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
observer_->on_buffered_amount_change_count());
controller_->set_send_blocked(false);
run_loop_.Flush();
successful_send_count += number_of_packets;
EXPECT_EQ(0U, channel_->buffered_amount());
EXPECT_EQ(successful_send_count,
@ -337,10 +351,9 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
SetChannelReady();
InternalDataChannelInit init;
init.id = 1;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
EXPECT_EQ(DataChannelInterface::kConnecting, dc->state());
EXPECT_TRUE_WAIT(DataChannelInterface::kOpen == dc->state(), 1000);
auto dc = webrtc::SctpDataChannel::CreateProxy(
controller_->CreateDataChannel("test1", init));
EXPECT_EQ(DataChannelInterface::kOpen, dc->state());
}
// Tests that an unordered DataChannel sends data as ordered until the OPEN_ACK
@ -352,21 +365,23 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) {
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Sends a message and verifies it's ordered.
DataBuffer buffer("some data");
ASSERT_TRUE(dc->Send(buffer));
ASSERT_TRUE(proxy->Send(buffer));
EXPECT_TRUE(controller_->last_send_data_params().ordered);
// Emulates receiving an OPEN_ACK message.
rtc::CopyOnWriteBuffer payload;
WriteDataChannelOpenAckMessage(&payload);
dc->OnDataReceived(DataMessageType::kControl, payload);
network_thread_.BlockingCall(
[&] { dc->OnDataReceived(DataMessageType::kControl, payload); });
// Sends another message and verifies it's unordered.
ASSERT_TRUE(dc->Send(buffer));
ASSERT_TRUE(proxy->Send(buffer));
EXPECT_FALSE(controller_->last_send_data_params().ordered);
}
@ -379,15 +394,17 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) {
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Emulates receiving a DATA message.
DataBuffer buffer("data");
dc->OnDataReceived(DataMessageType::kText, buffer.data);
network_thread_.BlockingCall(
[&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); });
// Sends a message and verifies it's unordered.
ASSERT_TRUE(dc->Send(buffer));
ASSERT_TRUE(proxy->Send(buffer));
EXPECT_FALSE(controller_->last_send_data_params().ordered);
}
@ -440,7 +457,10 @@ TEST_F(SctpDataChannelTest, ReceiveDataWithValidId) {
AddObserver();
DataBuffer buffer("abcd");
inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
network_thread_.BlockingCall([&] {
inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
});
run_loop_.Flush();
EXPECT_EQ(1U, observer_->messages_received());
}
@ -455,8 +475,9 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) {
SetChannelReady();
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", config);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
EXPECT_EQ(0, controller_->last_sid());
}
@ -480,9 +501,10 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) {
EXPECT_EQ(0U, channel_->bytes_received());
// Receive three buffers while data channel isn't open.
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[0].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[1].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[2].data);
network_thread_.BlockingCall([&] {
for (int i : {0, 1, 2})
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
});
EXPECT_EQ(0U, observer_->messages_received());
EXPECT_EQ(0U, channel_->messages_received());
EXPECT_EQ(0U, channel_->bytes_received());
@ -496,9 +518,11 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) {
EXPECT_EQ(bytes_received, channel_->bytes_received());
// Receive three buffers while open.
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[3].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[4].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[5].data);
network_thread_.BlockingCall([&] {
for (int i : {3, 4, 5})
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
});
run_loop_.Flush();
bytes_received += buffers[3].size() + buffers[4].size() + buffers[5].size();
EXPECT_EQ(6U, observer_->messages_received());
EXPECT_EQ(6U, channel_->messages_received());
@ -516,8 +540,9 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) {
SetChannelReady();
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", config);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
EXPECT_EQ(config.id, controller_->last_sid());
EXPECT_EQ(DataMessageType::kControl,
@ -554,9 +579,8 @@ TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) {
EXPECT_TRUE(channel_->Send(packet));
}
// The sending buffer shoul be full, send returns false.
// The sending buffer should be full, `Send()` returns false.
EXPECT_FALSE(channel_->Send(packet));
EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state());
}
@ -580,10 +604,12 @@ TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) {
rtc::CopyOnWriteBuffer buffer(1024);
memset(buffer.MutableData(), 0, buffer.size());
// Receiving data without having an observer will overflow the buffer.
for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
}
network_thread_.BlockingCall([&] {
// Receiving data without having an observer will overflow the buffer.
for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
}
});
EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
EXPECT_FALSE(channel_->error().ok());
EXPECT_EQ(RTCErrorType::RESOURCE_EXHAUSTED, channel_->error().type());
@ -604,7 +630,8 @@ TEST_F(SctpDataChannelTest, SendEmptyData) {
// Tests that a channel can be closed without being opened or assigned an sid.
TEST_F(SctpDataChannelTest, NeverOpened) {
controller_->set_transport_available(true);
inner_channel_->OnTransportChannelCreated();
network_thread_.BlockingCall(
[&] { inner_channel_->OnTransportChannelCreated(); });
channel_->Close();
}
@ -634,7 +661,8 @@ TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) {
// transition to the "closed" state.
RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, "");
error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
inner_channel_->OnTransportChannelClosed(error);
network_thread_.BlockingCall(
[&] { inner_channel_->OnTransportChannelClosed(error); });
controller_.reset(nullptr);
EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
kDefaultTimeout);
@ -654,7 +682,8 @@ TEST_F(SctpDataChannelTest, TransportGotErrorCode) {
error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
error.set_sctp_cause_code(
static_cast<uint16_t>(cricket::SctpErrorCauseCode::kProtocolViolation));
inner_channel_->OnTransportChannelClosed(error);
network_thread_.BlockingCall(
[&] { inner_channel_->OnTransportChannelClosed(error); });
controller_.reset(nullptr);
EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
kDefaultTimeout);

View File

@ -670,7 +670,7 @@ void LegacyStatsCollector::UpdateStats(
// to fetch stats, then applies them on the signaling thread. See if we need
// to do this synchronously or if updating the stats without blocking is safe.
std::map<std::string, std::string> transport_names_by_mid =
ExtractSessionInfo();
ExtractSessionAndDataInfo();
// TODO(tommi): All of these hop over to the worker thread to fetch
// information. We could post a task to run all of these and post
@ -681,7 +681,6 @@ void LegacyStatsCollector::UpdateStats(
ExtractBweInfo();
ExtractMediaInfo(transport_names_by_mid);
ExtractSenderInfo();
ExtractDataInfo();
UpdateTrackReports();
}
@ -856,19 +855,26 @@ StatsReport* LegacyStatsCollector::AddCandidateReport(
return report;
}
std::map<std::string, std::string> LegacyStatsCollector::ExtractSessionInfo() {
TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionInfo");
std::map<std::string, std::string>
LegacyStatsCollector::ExtractSessionAndDataInfo() {
TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionAndDataInfo");
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
SessionStats stats;
StatsCollection::Container data_report_collection;
auto transceivers = pc_->GetTransceiversInternal();
pc_->network_thread()->BlockingCall(
[&, sctp_transport_name = pc_->sctp_transport_name(),
sctp_mid = pc_->sctp_mid()]() mutable {
stats = ExtractSessionInfo_n(
transceivers, std::move(sctp_transport_name), std::move(sctp_mid));
StatsCollection data_reports;
ExtractDataInfo_n(&data_reports);
data_report_collection = data_reports.DetachCollection();
});
reports_.MergeCollection(std::move(data_report_collection));
ExtractSessionInfo_s(stats);
return std::move(stats.transport_names_by_mid);
@ -1292,8 +1298,8 @@ void LegacyStatsCollector::ExtractSenderInfo() {
}
}
void LegacyStatsCollector::ExtractDataInfo() {
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
void LegacyStatsCollector::ExtractDataInfo_n(StatsCollection* reports) {
RTC_DCHECK_RUN_ON(pc_->network_thread());
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
@ -1301,7 +1307,7 @@ void LegacyStatsCollector::ExtractDataInfo() {
for (const auto& stats : data_stats) {
StatsReport::Id id(StatsReport::NewTypedIntId(
StatsReport::kStatsReportTypeDataChannel, stats.id));
StatsReport* report = reports_.ReplaceOrAddNew(id);
StatsReport* report = reports->ReplaceOrAddNew(id);
report->set_timestamp(stats_gathering_started_);
report->AddString(StatsReport::kStatsValueNameLabel, stats.label);
// Filter out the initial id (-1).

View File

@ -165,11 +165,13 @@ class LegacyStatsCollector : public LegacyStatsCollectorInterface {
const StatsReport::Id& channel_report_id,
const cricket::ConnectionInfo& info);
void ExtractDataInfo();
void ExtractDataInfo_n(StatsCollection* reports);
// Returns the `transport_names_by_mid` member from the SessionStats as
// gathered and used to populate the stats.
std::map<std::string, std::string> ExtractSessionInfo();
// gathered and used to populate the stats. Contains one synchronous hop
// to the network thread to get this information along with querying data
// channel stats at the same time and populating `reports_`.
std::map<std::string, std::string> ExtractSessionAndDataInfo();
void ExtractBweInfo();
void ExtractMediaInfo(

View File

@ -2286,7 +2286,7 @@ bool PeerConnection::GetTransportDescription(
}
std::vector<DataChannelStats> PeerConnection::GetDataChannelStats() const {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
return data_channel_controller_.GetDataChannelStats();
}

View File

@ -450,6 +450,24 @@ class ConstMethodCall {
return c_->method(); \
}
// Allows a custom implementation of a method where the otherwise proxied
// implementation can do a more efficient, yet thread-safe, job than the proxy
// can do by default or when more flexibility is needed than can be provided
// by a proxy.
// Note that calls to these methods should be expected to be made from unknown
// threads.
#define BYPASS_PROXY_METHOD0(r, method) \
r method() override { \
TRACE_BOILERPLATE(method); \
return c_->method(); \
}
// The 1 argument version of `BYPASS_PROXY_METHOD0`.
#define BYPASS_PROXY_METHOD1(r, method, t1) \
r method(t1 a1) override { \
TRACE_BOILERPLATE(method); \
return c_->method(std::move(a1)); \
}
} // namespace webrtc
#endif // PC_PROXY_H_

View File

@ -1499,7 +1499,6 @@ void RTCStatsCollector::ProducePartialResultsOnSignalingThreadImpl(
RTC_DCHECK_RUN_ON(signaling_thread_);
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
ProduceDataChannelStats_s(timestamp, partial_report);
ProduceMediaStreamStats_s(timestamp, partial_report);
ProduceMediaStreamTrackStats_s(timestamp, partial_report);
ProduceMediaSourceStats_s(timestamp, partial_report);
@ -1519,6 +1518,8 @@ void RTCStatsCollector::ProducePartialResultsOnNetworkThread(
// `network_report_event_` is reset before this method is invoked.
network_report_ = RTCStatsReport::Create(timestamp);
ProduceDataChannelStats_n(timestamp, network_report_.get());
std::set<std::string> transport_names;
if (sctp_transport_name) {
transport_names.emplace(std::move(*sctp_transport_name));
@ -1653,10 +1654,10 @@ void RTCStatsCollector::ProduceCertificateStats_n(
}
}
void RTCStatsCollector::ProduceDataChannelStats_s(
void RTCStatsCollector::ProduceDataChannelStats_n(
Timestamp timestamp,
RTCStatsReport* report) const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
std::vector<DataChannelStats> data_stats = pc_->GetDataChannelStats();
for (const auto& stats : data_stats) {

View File

@ -186,7 +186,7 @@ class RTCStatsCollector : public rtc::RefCountInterface {
const std::map<std::string, CertificateStatsPair>& transport_cert_stats,
RTCStatsReport* report) const;
// Produces `RTCDataChannelStats`.
void ProduceDataChannelStats_s(Timestamp timestamp,
void ProduceDataChannelStats_n(Timestamp timestamp,
RTCStatsReport* report) const;
// Produces `RTCIceCandidatePairStats` and `RTCIceCandidateStats`.
void ProduceIceCandidateAndPairStats_n(

View File

@ -38,8 +38,8 @@ int GenerateUniqueId() {
// Define proxy for DataChannelInterface.
BEGIN_PROXY_MAP(DataChannel)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
PROXY_METHOD0(void, UnregisterObserver)
BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
BYPASS_PROXY_METHOD0(void, UnregisterObserver)
BYPASS_PROXY_CONSTMETHOD0(std::string, label)
BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
@ -50,20 +50,18 @@ BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
// Can't bypass the proxy since the id may change.
PROXY_CONSTMETHOD0(int, id)
PROXY_SECONDARY_CONSTMETHOD0(int, id)
BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
PROXY_CONSTMETHOD0(DataState, state)
PROXY_CONSTMETHOD0(RTCError, error)
PROXY_CONSTMETHOD0(uint32_t, messages_sent)
PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
PROXY_CONSTMETHOD0(uint32_t, messages_received)
PROXY_CONSTMETHOD0(uint64_t, bytes_received)
PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
PROXY_METHOD0(void, Close)
// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
PROXY_METHOD1(bool, Send, const DataBuffer&)
BYPASS_PROXY_CONSTMETHOD0(DataState, state)
PROXY_SECONDARY_CONSTMETHOD0(RTCError, error)
PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent)
PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent)
PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received)
PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received)
PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount)
PROXY_SECONDARY_METHOD0(void, Close)
PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&)
END_PROXY_MAP(DataChannel)
} // namespace
InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
@ -142,6 +140,78 @@ void SctpSidAllocator::ReleaseSid(StreamId sid) {
used_sids_.erase(sid);
}
// A DataChannelObserver implementation that offers backwards compatibility with
// implementations that aren't yet ready to be called back on the network
// thread. This implementation posts events to the signaling thread where
// events are delivered.
// In the class, and together with the `SctpDataChannel` implementation, there's
// special handling for the `state()` property whereby if that property is
// queried on the channel object while inside an event callback, we return
// the state that was active at the time the event was issued. This is to avoid
// a problem with calling the `state()` getter on the proxy, which would do
// a blocking call to the network thread, effectively flushing operations on
// the network thread that could cause the state to change and eventually return
// a misleading or arguably, wrong, state value to the callback implementation.
// As a future improvement to the ObserverAdapter, we could do the same for
// other properties that need to be read on the network thread. Eventually
// all implementations should expect to be called on the network thread though
// and the ObserverAdapter no longer be necessary.
class SctpDataChannel::ObserverAdapter : public DataChannelObserver {
public:
explicit ObserverAdapter(DataChannelObserver* delegate,
SctpDataChannel* channel)
: delegate_(delegate), channel_(channel) {}
bool IsInsideStateNotification() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return inside_state_change_;
}
DataChannelInterface::DataState cached_state() const {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(IsInsideStateNotification());
return cached_state_;
}
private:
void OnStateChange() override {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(safety_.flag(), [this, new_state = channel_->state()] {
RTC_DCHECK_RUN_ON(signaling_thread());
cached_state_ = new_state;
inside_state_change_ = true;
delegate_->OnStateChange();
inside_state_change_ = false;
}));
}
void OnMessage(const DataBuffer& buffer) override {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(safety_.flag(),
[this, buffer = buffer] { delegate_->OnMessage(buffer); }));
}
void OnBufferedAmountChange(uint64_t sent_data_size) override {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(safety_.flag(), [this, sent_data_size] {
delegate_->OnBufferedAmountChange(sent_data_size);
}));
}
rtc::Thread* signaling_thread() const { return channel_->signaling_thread_; }
rtc::Thread* network_thread() const { return channel_->network_thread_; }
DataChannelObserver* const delegate_;
SctpDataChannel* const channel_;
ScopedTaskSafety safety_;
bool inside_state_change_ RTC_GUARDED_BY(signaling_thread()) = false;
DataChannelInterface::DataState cached_state_
RTC_GUARDED_BY(signaling_thread()) = DataChannelInterface::kConnecting;
};
// static
rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
@ -175,7 +245,6 @@ SctpDataChannel::SctpDataChannel(
rtc::Thread* network_thread)
: signaling_thread_(signaling_thread),
network_thread_(network_thread),
id_s_(config.id),
id_n_(config.id),
internal_id_(GenerateUniqueId()),
label_(label),
@ -207,19 +276,81 @@ SctpDataChannel::SctpDataChannel(
}
}
SctpDataChannel::~SctpDataChannel() {
RTC_DCHECK_RUN_ON(signaling_thread_);
}
SctpDataChannel::~SctpDataChannel() {}
void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = observer;
DeliverQueuedReceivedData();
// Note: at this point, we do not know on which thread we're being called
// from since this method bypasses the proxy. On Android in particular,
// registration methods are called from unknown threads.
// Check if we should set up an observer adapter that will make sure that
// callbacks are delivered on the signaling thread rather than directly
// on the network thread.
const auto* current_thread = rtc::Thread::Current();
// TODO(webrtc:11547): Eventually all DataChannelObserver implementations
// should be called on the network thread and IsOkToCallOnTheNetworkThread().
if (!observer->IsOkToCallOnTheNetworkThread()) {
auto prepare_observer = [&]() {
RTC_DCHECK(!observer_adapter_);
observer_adapter_ = std::make_unique<ObserverAdapter>(observer, this);
return observer_adapter_.get();
};
// Instantiate the adapter in the right context and then substitute the
// observer pointer the SctpDataChannel will call back on, with the adapter.
if (signaling_thread_ == current_thread) {
observer = prepare_observer();
} else {
observer = signaling_thread_->BlockingCall(std::move(prepare_observer));
}
}
// Now do the observer registration on the network thread.
auto register_observer = [&] {
RTC_DCHECK_RUN_ON(network_thread_);
observer_ = observer;
DeliverQueuedReceivedData();
};
if (network_thread_ == current_thread) {
register_observer();
} else {
network_thread_->BlockingCall(std::move(register_observer));
}
}
void SctpDataChannel::UnregisterObserver() {
RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = nullptr;
// Note: As with `RegisterObserver`, the proxy is being bypassed.
const auto* current_thread = rtc::Thread::Current();
// Callers must not be invoking the unregistration from the network thread
// (assuming a multi-threaded environment where we have a dedicated network
// thread). That would indicate non-network related work happening on the
// network thread or that unregistration is being done from within a callback
// (without unwinding the stack, which is a requirement).
// The network thread is not allowed to make blocking calls to the signaling
// thread, so that would blow up if attempted. Since we support an adapter
// for observers that are not safe to call on the network thread, we do
// need to check+free it on the signaling thread.
RTC_DCHECK(current_thread != network_thread_ ||
network_thread_ == signaling_thread_);
auto unregister_observer = [&] {
RTC_DCHECK_RUN_ON(network_thread_);
observer_ = nullptr;
};
if (current_thread == network_thread_) {
unregister_observer();
} else {
network_thread_->BlockingCall(std::move(unregister_observer));
}
auto clear_observer = [&]() { observer_adapter_.reset(); };
if (current_thread != signaling_thread_) {
signaling_thread_->BlockingCall(std::move(clear_observer));
} else {
clear_observer();
}
}
std::string SctpDataChannel::label() const {
@ -261,8 +392,11 @@ bool SctpDataChannel::negotiated() const {
}
int SctpDataChannel::id() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return id_s_.stream_id_int();
RTC_DCHECK_RUN_ON(network_thread_);
// TODO(tommi): Once an ID has been assigned, it won't change (can be
// considered const). We could do special handling of this and allow bypassing
// the proxy so that we can return a valid id without thread hopping.
return id_n_.stream_id_int();
}
Priority SctpDataChannel::priority() const {
@ -270,12 +404,12 @@ Priority SctpDataChannel::priority() const {
}
uint64_t SctpDataChannel::buffered_amount() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return queued_send_data_.byte_count();
}
void SctpDataChannel::Close() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ == kClosing || state_ == kClosed)
return;
SetState(kClosing);
@ -284,40 +418,58 @@ void SctpDataChannel::Close() {
}
SctpDataChannel::DataState SctpDataChannel::state() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return state_;
// Note: The proxy is bypassed for the `state()` accessor. This is to allow
// observer callbacks to query what the new state is from within a state
// update notification without having to do a blocking call to the network
// thread from within a callback. This also makes it so that the returned
// state is guaranteed to be the new state that provoked the state change
// notification, whereby a blocking call to the network thread might end up
// getting put behind other messages on the network thread and eventually
// fetch a different state value (since pending messages might cause the
// state to change in the meantime).
const auto* current_thread = rtc::Thread::Current();
if (current_thread == signaling_thread_) {
if (observer_adapter_ && observer_adapter_->IsInsideStateNotification())
return observer_adapter_->cached_state();
}
auto return_state = [&] {
RTC_DCHECK_RUN_ON(network_thread_);
return state_;
};
return current_thread == network_thread_
? return_state()
: network_thread_->BlockingCall(std::move(return_state));
}
RTCError SctpDataChannel::error() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return error_;
}
uint32_t SctpDataChannel::messages_sent() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return messages_sent_;
}
uint64_t SctpDataChannel::bytes_sent() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return bytes_sent_;
}
uint32_t SctpDataChannel::messages_received() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return messages_received_;
}
uint64_t SctpDataChannel::bytes_received() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return bytes_received_;
}
bool SctpDataChannel::Send(const DataBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
// TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
// thread. Bring buffer management etc to the network thread and keep the
// operational state management on the signaling thread.
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ != kOpen) {
return false;
@ -335,25 +487,17 @@ bool SctpDataChannel::Send(const DataBuffer& buffer) {
return true;
}
void SctpDataChannel::SetSctpSid_s(StreamId sid) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(!id_s_.HasValue());
RTC_DCHECK(sid.HasValue());
RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
RTC_DCHECK_EQ(state_, kConnecting);
id_s_ = sid;
}
void SctpDataChannel::SetSctpSid_n(StreamId sid) {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(!id_n_.HasValue());
RTC_DCHECK(sid.HasValue());
RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
RTC_DCHECK_EQ(state_, kConnecting);
id_n_ = sid;
}
void SctpDataChannel::OnClosingProcedureStartedRemotely() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ != kClosing && state_ != kClosed) {
// Don't bother sending queued data since the side that initiated the
// closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
@ -369,7 +513,7 @@ void SctpDataChannel::OnClosingProcedureStartedRemotely() {
}
void SctpDataChannel::OnClosingProcedureComplete() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
// If the closing procedure is complete, we should have finished sending
// all pending data and transitioned to kClosing already.
RTC_DCHECK_EQ(state_, kClosing);
@ -378,12 +522,12 @@ void SctpDataChannel::OnClosingProcedureComplete() {
}
void SctpDataChannel::OnTransportChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
connected_to_transport_ = true;
}
void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
RTC_DCHECK_RUN_ON(network_thread_);
// The SctpTransport is unusable, which could come from multiple reasons:
// - the SCTP m= section was rejected
// - the DTLS transport is closed
@ -392,7 +536,7 @@ void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
}
DataChannelStats SctpDataChannel::GetStats() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
DataChannelStats stats{internal_id_, id(), label(),
protocol(), state(), messages_sent(),
messages_received(), bytes_sent(), bytes_received()};
@ -401,25 +545,25 @@ DataChannelStats SctpDataChannel::GetStats() const {
void SctpDataChannel::OnDataReceived(DataMessageType type,
const rtc::CopyOnWriteBuffer& payload) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (type == DataMessageType::kControl) {
if (handshake_state_ != kHandshakeWaitingForAck) {
// Ignore it if we are not expecting an ACK message.
RTC_LOG(LS_WARNING)
<< "DataChannel received unexpected CONTROL message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
return;
}
if (ParseDataChannelOpenAckMessage(payload)) {
// We can send unordered as soon as we receive the ACK message.
handshake_state_ = kHandshakeReady;
RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
} else {
RTC_LOG(LS_WARNING)
<< "DataChannel failed to parse OPEN_ACK message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
}
return;
}
@ -428,7 +572,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type,
type == DataMessageType::kText);
RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
// We can send unordered as soon as we receive any DATA message since the
// remote side must have received the OPEN (and old clients do not send
// OPEN_ACK).
@ -459,7 +603,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type,
}
void SctpDataChannel::OnTransportReady() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
// TODO(bugs.webrtc.org/11547): The transport is configured inside
// `PeerConnection::SetupDataChannelTransport_n`, which results in
@ -472,6 +616,7 @@ void SctpDataChannel::OnTransportReady() {
// be on for the below `Send*` calls, which currently do a BlockingCall
// from the signaling thread to the network thread.
RTC_DCHECK(connected_to_transport_);
RTC_DCHECK(id_n_.HasValue());
SendQueuedControlMessages();
SendQueuedDataMessages();
@ -480,7 +625,7 @@ void SctpDataChannel::OnTransportReady() {
}
void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ == kClosed) {
return;
@ -501,13 +646,14 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
const std::string& message) {
RTC_DCHECK_RUN_ON(network_thread_);
RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
CloseAbruptlyWithError(std::move(error));
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::UpdateState() {
RTC_DCHECK_RUN_ON(signaling_thread_);
// UpdateState determines what to do from a few state variables. Include
// all conditions required for each state transition here for
// clarity. OnTransportReady(true) will send any queued data and then invoke
@ -535,7 +681,7 @@ void SctpDataChannel::UpdateState() {
DeliverQueuedReceivedData();
}
} else {
RTC_DCHECK(!id_s_.HasValue());
RTC_DCHECK(!id_n_.HasValue());
}
break;
}
@ -551,11 +697,9 @@ void SctpDataChannel::UpdateState() {
// to complete; after calling RemoveSctpDataStream,
// OnClosingProcedureComplete will end up called asynchronously
// afterwards.
if (!started_closing_procedure_ && id_s_.HasValue()) {
if (!started_closing_procedure_ && id_n_.HasValue()) {
started_closing_procedure_ = true;
network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] {
c->RemoveSctpDataStream(sid);
});
controller_->RemoveSctpDataStream(id_n_);
}
}
} else {
@ -572,8 +716,8 @@ void SctpDataChannel::UpdateState() {
}
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::SetState(DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ == state) {
return;
}
@ -587,8 +731,8 @@ void SctpDataChannel::SetState(DataState state) {
controller_->OnChannelStateChanged(this, state_);
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::DeliverQueuedReceivedData() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (!observer_) {
return;
}
@ -601,8 +745,8 @@ void SctpDataChannel::DeliverQueuedReceivedData() {
}
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::SendQueuedDataMessages() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (queued_send_data_.Empty()) {
return;
}
@ -619,9 +763,9 @@ void SctpDataChannel::SendQueuedDataMessages() {
}
}
// RTC_RUN_ON(network_thread_).
bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
bool queue_if_blocked) {
RTC_DCHECK_RUN_ON(signaling_thread_);
SendDataParams send_params;
if (!controller_) {
return false;
@ -641,7 +785,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
send_params.type =
buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
RTCError error = controller_->SendData(id_s_, send_params, buffer.data);
RTCError error = controller_->SendData(id_n_, send_params, buffer.data);
if (error.ok()) {
++messages_sent_;
@ -669,8 +813,8 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
return false;
}
// RTC_RUN_ON(network_thread_).
bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
size_t start_buffered_amount = queued_send_data_.byte_count();
if (start_buffered_amount + buffer.size() >
DataChannelInterface::MaxSendQueueSize()) {
@ -681,8 +825,8 @@ bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
return true;
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::SendQueuedControlMessages() {
RTC_DCHECK_RUN_ON(signaling_thread_);
PacketQueue control_packets;
control_packets.Swap(&queued_control_data_);
@ -692,10 +836,10 @@ void SctpDataChannel::SendQueuedControlMessages() {
}
}
// RTC_RUN_ON(network_thread_).
bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(connected_to_transport_);
RTC_DCHECK(id_s_.HasValue());
RTC_DCHECK(id_n_.HasValue());
RTC_DCHECK(controller_);
bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
@ -708,10 +852,10 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
send_params.ordered = ordered_ || is_open_message;
send_params.type = DataMessageType::kControl;
RTCError err = controller_->SendData(id_s_, send_params, buffer);
RTCError err = controller_->SendData(id_n_, send_params, buffer);
if (err.ok()) {
RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
if (handshake_state_ == kHandshakeShouldSendAck) {
handshake_state_ = kHandshakeReady;

View File

@ -192,7 +192,6 @@ class SctpDataChannel : public DataChannelInterface {
// Sets the SCTP sid and adds to transport layer if not set yet. Should only
// be called once.
void SetSctpSid_s(StreamId sid);
void SetSctpSid_n(StreamId sid);
// The remote side started the closing procedure by resetting its outgoing
@ -216,10 +215,6 @@ class SctpDataChannel : public DataChannelInterface {
// stats purposes (see also `GetStats()`).
int internal_id() const { return internal_id_; }
StreamId sid_s() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return id_s_;
}
StreamId sid_n() const {
RTC_DCHECK_RUN_ON(network_thread_);
return id_n_;
@ -239,6 +234,8 @@ class SctpDataChannel : public DataChannelInterface {
~SctpDataChannel() override;
private:
class ObserverAdapter;
// The OPEN(_ACK) signaling state.
enum HandshakeState {
kHandshakeInit,
@ -248,21 +245,23 @@ class SctpDataChannel : public DataChannelInterface {
kHandshakeReady
};
void UpdateState();
void SetState(DataState state);
void UpdateState() RTC_RUN_ON(network_thread_);
void SetState(DataState state) RTC_RUN_ON(network_thread_);
void DeliverQueuedReceivedData();
void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_);
void SendQueuedDataMessages();
bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked);
bool QueueSendDataMessage(const DataBuffer& buffer);
void SendQueuedDataMessages() RTC_RUN_ON(network_thread_);
bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked)
RTC_RUN_ON(network_thread_);
bool QueueSendDataMessage(const DataBuffer& buffer)
RTC_RUN_ON(network_thread_);
void SendQueuedControlMessages();
bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
void SendQueuedControlMessages() RTC_RUN_ON(network_thread_);
bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer)
RTC_RUN_ON(network_thread_);
rtc::Thread* const signaling_thread_;
rtc::Thread* const network_thread_;
StreamId id_s_ RTC_GUARDED_BY(signaling_thread_);
StreamId id_n_ RTC_GUARDED_BY(network_thread_);
const int internal_id_;
const std::string label_;
@ -273,25 +272,26 @@ class SctpDataChannel : public DataChannelInterface {
const bool negotiated_;
const bool ordered_;
DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr;
DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting;
RTCError error_ RTC_GUARDED_BY(signaling_thread_);
uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr;
std::unique_ptr<ObserverAdapter> observer_adapter_;
DataState state_ RTC_GUARDED_BY(network_thread_) = kConnecting;
RTCError error_ RTC_GUARDED_BY(network_thread_);
uint32_t messages_sent_ RTC_GUARDED_BY(network_thread_) = 0;
uint64_t bytes_sent_ RTC_GUARDED_BY(network_thread_) = 0;
uint32_t messages_received_ RTC_GUARDED_BY(network_thread_) = 0;
uint64_t bytes_received_ RTC_GUARDED_BY(network_thread_) = 0;
rtc::WeakPtr<SctpDataChannelControllerInterface> controller_
RTC_GUARDED_BY(signaling_thread_);
HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_) =
RTC_GUARDED_BY(network_thread_);
HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) =
kHandshakeInit;
bool connected_to_transport_ RTC_GUARDED_BY(signaling_thread_) = false;
bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false;
// Did we already start the graceful SCTP closing procedure?
bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false;
bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false;
// Control messages that always have to get sent out before any queued
// data.
PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_);
PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_);
PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_);
};
// Downcast a PeerConnectionInterface that points to a proxy object

View File

@ -3268,20 +3268,16 @@ void SdpOfferAnswerHandler::AllocateSctpSids() {
return;
}
absl::optional<rtc::SSLRole> role = network_thread()->BlockingCall([this] {
RTC_DCHECK_RUN_ON(network_thread());
return pc_->GetSctpSslRole_n();
});
if (!role) {
role = GuessSslRole();
}
if (role) {
// TODO(webrtc:11547): Make this call on the network thread too once
// `AllocateSctpSids` has been updated.
data_channel_controller()->AllocateSctpSids(*role);
}
absl::optional<rtc::SSLRole> guessed_role = GuessSslRole();
network_thread()->BlockingCall(
[&, data_channel_controller = data_channel_controller()] {
RTC_DCHECK_RUN_ON(network_thread());
absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n();
if (!role)
role = guessed_role;
if (role)
data_channel_controller->AllocateSctpSids(*role);
});
}
absl::optional<rtc::SSLRole> SdpOfferAnswerHandler::GuessSslRole() const {
@ -5117,13 +5113,13 @@ void SdpOfferAnswerHandler::DestroyDataChannelTransport(RTCError error) {
RTC_DCHECK_RUN_ON(signaling_thread());
const bool has_sctp = pc_->sctp_mid().has_value();
if (has_sctp)
data_channel_controller()->OnTransportChannelClosed(error);
context_->network_thread()->BlockingCall([this] {
RTC_DCHECK_RUN_ON(context_->network_thread());
pc_->TeardownDataChannelTransport_n();
});
context_->network_thread()->BlockingCall(
[&, data_channel_controller = data_channel_controller()] {
RTC_DCHECK_RUN_ON(context_->network_thread());
if (has_sctp)
data_channel_controller->OnTransportChannelClosed(error);
pc_->TeardownDataChannelTransport_n();
});
if (has_sctp)
pc_->ResetSctpDataMid();

View File

@ -29,23 +29,31 @@ class FakeDataChannelController
transport_available_(false),
ready_to_send_(false),
transport_error_(false) {}
virtual ~FakeDataChannelController() {}
~FakeDataChannelController() override {
network_thread_->BlockingCall([&] {
RTC_DCHECK_RUN_ON(network_thread_);
weak_factory_.InvalidateWeakPtrs();
});
}
rtc::WeakPtr<FakeDataChannelController> weak_ptr() {
RTC_DCHECK_RUN_ON(network_thread_);
return weak_factory_.GetWeakPtr();
}
rtc::scoped_refptr<webrtc::SctpDataChannel> CreateDataChannel(
absl::string_view label,
webrtc::InternalDataChannelInit init) {
rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
// Explicitly associate the weak ptr instance with the current thread to
// catch early any inappropriate referencing of it on the network thread.
RTC_CHECK(my_weak_ptr);
rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
network_thread_->BlockingCall([&]() {
RTC_DCHECK_RUN_ON(network_thread_);
rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
// Explicitly associate the weak ptr instance with the current thread
// to catch early any inappropriate referencing of it on the network
// thread.
RTC_CHECK(my_weak_ptr);
rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
webrtc::SctpDataChannel::Create(
std::move(my_weak_ptr), std::string(label),
@ -54,17 +62,16 @@ class FakeDataChannelController
if (transport_available_ && channel->sid_n().HasValue()) {
AddSctpDataStream(channel->sid_n());
}
if (ready_to_send_) {
network_thread_->PostTask([channel = channel] {
if (channel->state() !=
webrtc::DataChannelInterface::DataState::kClosed) {
channel->OnTransportReady();
}
});
}
return channel;
});
if (ready_to_send_) {
signaling_thread_->PostTask(
SafeTask(signaling_safety_.flag(), [channel = channel] {
if (channel->state() !=
webrtc::DataChannelInterface::DataState::kClosed) {
channel->OnTransportReady();
}
}));
}
connected_channels_.insert(channel.get());
return channel;
}
@ -72,6 +79,7 @@ class FakeDataChannelController
webrtc::RTCError SendData(webrtc::StreamId sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) override {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(ready_to_send_);
RTC_CHECK(transport_available_);
if (send_blocked_) {
@ -100,17 +108,14 @@ class FakeDataChannelController
RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
known_stream_ids_.erase(sid);
signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
// Unlike the real SCTP transport, act like the closing procedure finished
// instantly.
auto it = absl::c_find_if(connected_channels_, [&](const auto* c) {
return c->sid_s() == sid;
});
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
if (it != connected_channels_.end())
(*it)->OnClosingProcedureComplete();
}));
// Unlike the real SCTP transport, act like the closing procedure finished
// instantly.
auto it = absl::c_find_if(connected_channels_,
[&](const auto* c) { return c->sid_n() == sid; });
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
if (it != connected_channels_.end())
(*it)->OnClosingProcedureComplete();
}
void OnChannelStateChanged(
@ -126,36 +131,40 @@ class FakeDataChannelController
// Set true to emulate the SCTP stream being blocked by congestion control.
void set_send_blocked(bool blocked) {
send_blocked_ = blocked;
if (!blocked) {
RTC_CHECK(transport_available_);
// Make a copy since `connected_channels_` may change while
// OnTransportReady is called.
auto copy = connected_channels_;
for (webrtc::SctpDataChannel* ch : copy) {
ch->OnTransportReady();
network_thread_->BlockingCall([&]() {
send_blocked_ = blocked;
if (!blocked) {
RTC_CHECK(transport_available_);
// Make a copy since `connected_channels_` may change while
// OnTransportReady is called.
auto copy = connected_channels_;
for (webrtc::SctpDataChannel* ch : copy) {
ch->OnTransportReady();
}
}
}
});
}
// Set true to emulate the transport channel creation, e.g. after
// setLocalDescription/setRemoteDescription called with data content.
void set_transport_available(bool available) {
transport_available_ = available;
network_thread_->BlockingCall([&]() { transport_available_ = available; });
}
// Set true to emulate the transport OnTransportReady signal when the
// transport becomes writable for the first time.
void set_ready_to_send(bool ready) {
RTC_CHECK(transport_available_);
ready_to_send_ = ready;
if (ready) {
std::set<webrtc::SctpDataChannel*>::iterator it;
for (it = connected_channels_.begin(); it != connected_channels_.end();
++it) {
(*it)->OnTransportReady();
network_thread_->BlockingCall([&]() {
ready_to_send_ = ready;
if (ready) {
std::set<webrtc::SctpDataChannel*>::iterator it;
for (it = connected_channels_.begin(); it != connected_channels_.end();
++it) {
(*it)->OnTransportReady();
}
}
}
});
}
void set_transport_error() { transport_error_ = true; }