Reland "[DataChannel] Send and receive packets on the network thread."

This reverts commit 7f16fcda0fd5bb625584b71311dd37b54c096136.

Reason for reland: Re-landing after addressing issues in downstream
code and hardening the ObserverAdapter from situations where attempted
usage of data channel proxies could occur after shutting down the
peer connection and terminating the network thread.

Original change's description:
> Revert "[DataChannel] Send and receive packets on the network thread."
>
> This reverts commit fe53fec24e02d2d644220f913c3f9ae596bbb2d9.
>
> Reason for revert: Speculative revert, may be breaking downstream project
>
> Original change's description:
> > [DataChannel] Send and receive packets on the network thread.
> >
> > This updates sctp channels, including work that happens between the
> > data channel controller and the transport, to run on the network
> > thread. Previously all network traffic related to data channels was
> > routed through the signaling thread before going to either the network
> > thread or the caller's thread (e.g. js thread in chrome). Now the
> > calls can go straight from the network thread to the JS thread with
> > enabling a special flag on the observer (see below) and similarly
> > calls to send data, involve 2 threads instead of 3.
> >
> > * Custom data channel observer adapter implementation that
> >   maintains compatibility with existing observer implementations in
> >   that notifications are delivered on the signaling thread.
> >   The adapter can be explicitly disabled for implementations that
> >   want to optimize the callback path and promise to not block the
> >   network thread.
> > * Remove the signaling thread copy of data channels in the controller.
> > * Remove several PostTask operations that were needed to keep things
> >   in sync (but the need has gone away).
> > * Update tests for the controller to consistently call
> >   TeardownDataChannelTransport_n to match with production.
> > * Update stats collectors (current and legacy) to fetch the data
> >   channel stats on the network thread where they're maintained.
> > * Remove the AsyncChannelCloseTeardown test since the async teardown
> >   step has gone away.
> > * Remove `sid_s` in the channel code since we only need the network
> >   state now.
> > * For the custom observer support (with and without data adapter) and
> >   maintain compatibility with existing implementations, added a new
> >   proxy macro that allows an implementation to selectively provide
> >   its own implementation without being proxied. This is used for
> >   registering/unregistering a data channel observer.
> > * Update the data channel proxy to map most methods to the network
> >   thread, avoiding the interim jump to the signaling thread.
> > * Update a plethora of thread checkers from signaling to network.
> >
> > Bug: webrtc:11547
> > Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142
> > Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> > Reviewed-by: Henrik Boström <hbos@webrtc.org>
> > Cr-Commit-Position: refs/heads/main@{#39760}
>
> Bug: webrtc:11547
> Change-Id: Id0d65594bf727ccea5c49093c942b09714d101ad
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300341
> Auto-Submit: Andrey Logvin <landrey@webrtc.org>
> Owners-Override: Andrey Logvin <landrey@webrtc.org>
> Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39764}

Bug: webrtc:11547
Change-Id: I47dfa7e7168be0cd2faab4f8f3ebf110c3728af5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300360
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39786}
This commit is contained in:
Tommi 2023-04-06 21:21:45 +02:00 committed by WebRTC LUCI CQ
parent fba851559b
commit f9e13f8813
15 changed files with 681 additions and 493 deletions

View File

@ -892,10 +892,7 @@ rtc_library("sctp_data_channel") {
"../rtc_base/system:no_unique_address",
"../rtc_base/system:unused",
]
absl_deps = [
"//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/types:optional",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("data_channel_utils") {
@ -1326,7 +1323,6 @@ rtc_source_set("legacy_stats_collector") {
"../api/video:video_rtp_headers",
"../call:call_interfaces",
"../media:media_channel",
"../media:media_channel_impl",
"../media:rtc_media_base",
"../modules/audio_processing:audio_processing_statistics",
"../p2p:rtc_p2p",

View File

@ -29,8 +29,15 @@ DataChannelController::~DataChannelController() {
}
bool DataChannelController::HasDataChannelsForTest() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return !sctp_data_channels_.empty();
auto has_channels = [&] {
RTC_DCHECK_RUN_ON(network_thread());
return !sctp_data_channels_n_.empty();
};
if (network_thread()->IsCurrent())
return has_channels();
return network_thread()->BlockingCall(std::move(has_channels));
}
bool DataChannelController::HasUsedDataChannels() const {
@ -66,11 +73,15 @@ void DataChannelController::RemoveSctpDataStream(StreamId sid) {
void DataChannelController::OnChannelStateChanged(
SctpDataChannel* channel,
DataChannelInterface::DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
if (state == DataChannelInterface::DataState::kClosed)
OnSctpDataChannelClosed(channel);
pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state);
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(),
[this, channel_id = channel->internal_id(), state = state] {
pc_->OnSctpDataChannelStateChanged(channel_id, state);
}));
}
void DataChannelController::OnDataReceived(
@ -82,27 +93,22 @@ void DataChannelController::OnDataReceived(
if (HandleOpenMessage_n(channel_id, type, buffer))
return;
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/11547): The data being received should be
// delivered on the network thread.
auto it = FindChannel(StreamId(channel_id));
if (it != sctp_data_channels_.end())
(*it)->OnDataReceived(type, buffer);
}));
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c->sid_n().stream_id_int() == channel_id;
});
if (it != sctp_data_channels_n_.end())
(*it)->OnDataReceived(type, buffer);
}
void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [this, channel_id] {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/11547): Should run on the network thread.
auto it = FindChannel(StreamId(channel_id));
if (it != sctp_data_channels_.end())
(*it)->OnClosingProcedureStartedRemotely();
}));
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c->sid_n().stream_id_int() == channel_id;
});
if (it != sctp_data_channels_n_.end())
(*it)->OnClosingProcedureStartedRemotely();
}
void DataChannelController::OnChannelClosed(int channel_id) {
@ -112,48 +118,44 @@ void DataChannelController::OnChannelClosed(int channel_id) {
auto it = absl::c_find_if(sctp_data_channels_n_,
[&](const auto& c) { return c->sid_n() == sid; });
if (it != sctp_data_channels_n_.end())
if (it != sctp_data_channels_n_.end()) {
rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
sctp_data_channels_n_.erase(it);
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
RTC_DCHECK_RUN_ON(signaling_thread());
auto it = FindChannel(sid);
// Remove the channel from our list, close it and free up resources.
if (it != sctp_data_channels_.end()) {
rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
// Note: this causes OnSctpDataChannelClosed() to not do anything
// when called from within `OnClosingProcedureComplete`.
sctp_data_channels_.erase(it);
channel->OnClosingProcedureComplete();
}
}));
channel->OnClosingProcedureComplete();
}
}
void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
auto copy = sctp_data_channels_;
for (const auto& channel : copy)
auto copy = sctp_data_channels_n_;
for (const auto& channel : copy) {
if (channel->sid_n().HasValue()) {
channel->OnTransportReady();
}));
} else {
// This happens for role==SSL_SERVER channels when we get notified by
// the transport *before* the SDP code calls `AllocateSctpSids` to
// trigger assignment of sids. In this case OnTransportReady() will be
// called from within `AllocateSctpSids` below.
RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel.";
}
}
}
void DataChannelController::OnTransportClosed(RTCError error) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [this, error] {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportChannelClosed(error);
}));
// This loop will close all data channels and trigger a callback to
// `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so
// we create a local copy while we do the fan-out.
auto copy = sctp_data_channels_n_;
for (const auto& channel : copy)
channel->OnTransportChannelClosed(error);
}
void DataChannelController::SetupDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
// There's a new data channel transport. This needs to be signaled to the
// `sctp_data_channels_` so that they can reopen and reconnect. This is
// `sctp_data_channels_n_` so that they can reopen and reconnect. This is
// necessary when bundling is applied.
NotifyDataChannelsOfTransportCreated();
}
@ -165,11 +167,12 @@ void DataChannelController::PrepareForShutdown() {
void DataChannelController::TeardownDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport()) {
data_channel_transport()->SetDataSink(nullptr);
if (data_channel_transport_) {
data_channel_transport_->SetDataSink(nullptr);
set_data_channel_transport(nullptr);
}
set_data_channel_transport(nullptr);
sctp_data_channels_n_.clear();
weak_factory_.InvalidateWeakPtrs();
}
void DataChannelController::OnTransportChanged(
@ -185,7 +188,7 @@ void DataChannelController::OnTransportChanged(
new_data_channel_transport->SetDataSink(this);
// There's a new data channel transport. This needs to be signaled to the
// `sctp_data_channels_` so that they can reopen and reconnect. This is
// `sctp_data_channels_n_` so that they can reopen and reconnect. This is
// necessary when bundling is applied.
NotifyDataChannelsOfTransportCreated();
}
@ -194,10 +197,10 @@ void DataChannelController::OnTransportChanged(
std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
const {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
std::vector<DataChannelStats> stats;
stats.reserve(sctp_data_channels_.size());
for (const auto& channel : sctp_data_channels_)
stats.reserve(sctp_data_channels_n_.size());
for (const auto& channel : sctp_data_channels_n_)
stats.push_back(channel->GetStats());
return stats;
}
@ -219,28 +222,38 @@ bool DataChannelController::HandleOpenMessage_n(
<< channel_id;
} else {
config.open_handshake_role = InternalDataChannelInit::kAcker;
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(),
[this, label = std::move(label), config = std::move(config)] {
RTC_DCHECK_RUN_ON(signaling_thread());
OnDataChannelOpenMessage(label, config);
}));
auto channel_or_error = CreateDataChannel(label, config);
if (channel_or_error.ok()) {
signaling_thread()->PostTask(SafeTask(
signaling_safety_.flag(),
[this, channel = channel_or_error.MoveValue(),
ready_to_send = data_channel_transport_->IsReadyToSend()] {
RTC_DCHECK_RUN_ON(signaling_thread());
OnDataChannelOpenMessage(std::move(channel), ready_to_send);
}));
} else {
RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
<< ToString(channel_or_error.error().type());
}
}
return true;
}
void DataChannelController::OnDataChannelOpenMessage(
const std::string& label,
const InternalDataChannelInit& config) {
auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
if (!channel_or_error.ok()) {
RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
<< ToString(channel_or_error.error().type());
return;
}
rtc::scoped_refptr<SctpDataChannel> channel,
bool ready_to_send) {
has_used_data_channels_ = true;
auto proxy = SctpDataChannel::CreateProxy(channel, signaling_safety_.flag());
pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
pc_->Observer()->OnDataChannel(proxy);
pc_->NoteDataAddedEvent();
if (ready_to_send) {
network_thread()->PostTask([channel = std::move(channel)] {
if (channel->state() != DataChannelInterface::DataState::kClosed)
channel->OnTransportReady();
});
}
}
// RTC_RUN_ON(network_thread())
@ -269,6 +282,31 @@ RTCError DataChannelController::ReserveOrAllocateSid(
return RTCError::OK();
}
// RTC_RUN_ON(network_thread())
RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
DataChannelController::CreateDataChannel(const std::string& label,
InternalDataChannelInit& config) {
StreamId sid(config.id);
RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
if (!err.ok())
return err;
// In case `sid` has changed. Update `config` accordingly.
config.id = sid.stream_id_int();
rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
config, signaling_thread(), network_thread());
RTC_DCHECK(channel);
sctp_data_channels_n_.push_back(channel);
// If we have an id already, notify the transport.
if (sid.HasValue())
AddSctpDataStream(sid);
return channel;
}
RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
DataChannelController::InternalCreateDataChannelWithProxy(
const std::string& label,
@ -283,29 +321,25 @@ DataChannelController::InternalCreateDataChannelWithProxy(
bool ready_to_send = false;
InternalDataChannelInit new_config = config;
StreamId sid(new_config.id);
auto weak_ptr = weak_factory_.GetWeakPtr();
RTC_DCHECK(weak_ptr); // Associate with current thread.
auto ret = network_thread()->BlockingCall(
[&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
RTC_DCHECK_RUN_ON(network_thread());
RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
if (!err.ok())
return err;
// In case `sid` has changed. Update `new_config` accordingly.
new_config.id = sid.stream_id_int();
auto channel = CreateDataChannel(label, new_config);
if (!channel.ok())
return channel;
ready_to_send =
data_channel_transport_ && data_channel_transport_->IsReadyToSend();
rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
std::move(weak_ptr), label, data_channel_transport_ != nullptr,
new_config, signaling_thread(), network_thread()));
RTC_DCHECK(channel);
sctp_data_channels_n_.push_back(channel);
// If we have an id already, notify the transport.
if (sid.HasValue())
AddSctpDataStream(sid);
if (ready_to_send) {
// If the transport is ready to send because the initial channel
// ready signal may have been sent before the DataChannel creation.
// This has to be done async because the upper layer objects (e.g.
// Chrome glue and WebKit) are not wired up properly until after
// `InternalCreateDataChannelWithProxy` returns.
network_thread()->PostTask([channel = channel.value()] {
if (channel->state() != DataChannelInterface::DataState::kClosed)
channel->OnTransportReady();
});
}
return channel;
});
@ -313,114 +347,72 @@ DataChannelController::InternalCreateDataChannelWithProxy(
if (!ret.ok())
return ret.MoveError();
if (ready_to_send) {
// Checks if the transport is ready to send because the initial channel
// ready signal may have been sent before the DataChannel creation.
// This has to be done async because the upper layer objects (e.g.
// Chrome glue and WebKit) are not wired up properly until after this
// function returns.
signaling_thread()->PostTask(
SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
if (channel->state() != DataChannelInterface::DataState::kClosed)
channel->OnTransportReady();
}));
}
sctp_data_channels_.push_back(ret.value());
has_used_data_channels_ = true;
return SctpDataChannel::CreateProxy(ret.MoveValue());
return SctpDataChannel::CreateProxy(ret.MoveValue(),
signaling_safety_.flag());
}
void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
const bool ready_to_send =
data_channel_transport_ && data_channel_transport_->IsReadyToSend();
std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
network_thread()->BlockingCall([&] {
RTC_DCHECK_RUN_ON(network_thread());
for (auto it = sctp_data_channels_n_.begin();
it != sctp_data_channels_n_.end();) {
if (!(*it)->sid_n().HasValue()) {
StreamId sid = sid_allocator_.AllocateSid(role);
if (sid.HasValue()) {
(*it)->SetSctpSid_n(sid);
AddSctpDataStream(sid);
channels_to_update.push_back(std::make_pair((*it).get(), sid));
} else {
channels_to_close.push_back(std::move(*it));
it = sctp_data_channels_n_.erase(it);
continue;
for (auto it = sctp_data_channels_n_.begin();
it != sctp_data_channels_n_.end();) {
if (!(*it)->sid_n().HasValue()) {
StreamId sid = sid_allocator_.AllocateSid(role);
if (sid.HasValue()) {
(*it)->SetSctpSid_n(sid);
AddSctpDataStream(sid);
if (ready_to_send) {
RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
(*it)->OnTransportReady();
}
channels_to_update.push_back(std::make_pair((*it).get(), sid));
} else {
channels_to_close.push_back(std::move(*it));
it = sctp_data_channels_n_.erase(it);
continue;
}
++it;
}
});
++it;
}
// Since closing modifies the list of channels, we have to do the actual
// closing outside the loop.
for (const auto& channel : channels_to_close) {
channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
// The channel should now have been removed from sctp_data_channels_.
RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
return c.get() == channel.get();
}) == sctp_data_channels_.end());
}
for (auto& pair : channels_to_update) {
auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
return c.get() == pair.first;
});
RTC_DCHECK(it != sctp_data_channels_.end());
(*it)->SetSctpSid_s(pair.second);
}
}
void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
network_thread()->BlockingCall([&] {
RTC_DCHECK_RUN_ON(network_thread());
// After the closing procedure is done, it's safe to use this ID for
// another data channel.
if (channel->sid_n().HasValue()) {
sid_allocator_.ReleaseSid(channel->sid_n());
}
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c.get() == channel;
});
if (it != sctp_data_channels_n_.end())
sctp_data_channels_n_.erase(it);
});
for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
++it) {
if (it->get() == channel) {
// Since this method is triggered by a signal from the DataChannel,
// we can't free it directly here; we need to free it asynchronously.
rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
sctp_data_channels_.erase(it);
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(),
[release = std::move(release)] {}));
return;
}
RTC_DCHECK_RUN_ON(network_thread());
// After the closing procedure is done, it's safe to use this ID for
// another data channel.
if (channel->sid_n().HasValue()) {
sid_allocator_.ReleaseSid(channel->sid_n());
}
auto it = absl::c_find_if(sctp_data_channels_n_,
[&](const auto& c) { return c.get() == channel; });
if (it != sctp_data_channels_n_.end())
sctp_data_channels_n_.erase(it);
}
void DataChannelController::OnTransportChannelClosed(RTCError error) {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
// Use a temporary copy of the SCTP DataChannel list because the
// DataChannel may callback to us and try to modify the list.
// TODO(tommi): `OnTransportChannelClosed` is called from
// `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before
// `TeardownDataChannelTransport_n` is called (but on the network thread) from
// the same function. Once `sctp_data_channels_` moves to the network thread,
// we can get rid of this function (OnTransportChannelClosed) and run this
// loop from within the TeardownDataChannelTransport_n callback.
// the same function. We can now get rid of this function
// (OnTransportChannelClosed) and run this loop from within the
// TeardownDataChannelTransport_n callback.
std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
temp_sctp_dcs.swap(sctp_data_channels_);
temp_sctp_dcs.swap(sctp_data_channels_n_);
for (const auto& channel : temp_sctp_dcs) {
channel->OnTransportChannelClosed(error);
}
@ -444,16 +436,10 @@ RTCError DataChannelController::DataChannelSendData(
StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) {
// TODO(bugs.webrtc.org/11547): Expect method to be called on the network
// thread instead. Remove the BlockingCall() below and move associated state
// to the network thread.
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(data_channel_transport());
return network_thread()->BlockingCall([this, sid, params, payload] {
return data_channel_transport()->SendData(sid.stream_id_int(), params,
payload);
});
return data_channel_transport()->SendData(sid.stream_id_int(), params,
payload);
}
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
@ -463,27 +449,14 @@ void DataChannelController::NotifyDataChannelsOfTransportCreated() {
for (const auto& channel : sctp_data_channels_n_) {
if (channel->sid_n().HasValue())
AddSctpDataStream(channel->sid_n());
channel->OnTransportChannelCreated();
}
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
for (const auto& channel : sctp_data_channels_) {
channel->OnTransportChannelCreated();
}
}));
}
std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator
DataChannelController::FindChannel(StreamId stream_id) {
RTC_DCHECK_RUN_ON(signaling_thread());
return absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
return c->sid_s() == stream_id;
});
}
rtc::Thread* DataChannelController::network_thread() const {
return pc_->network_thread();
}
rtc::Thread* DataChannelController::signaling_thread() const {
return pc_->signaling_thread();
}

View File

@ -107,6 +107,11 @@ class DataChannelController : public SctpDataChannelControllerInterface,
rtc::Thread* signaling_thread() const;
private:
// Creates a new SctpDataChannel object on the network thread.
RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> CreateDataChannel(
const std::string& label,
InternalDataChannelInit& config) RTC_RUN_ON(network_thread());
// Parses and handles open messages. Returns true if the message is an open
// message and should be considered to be handled, false otherwise.
bool HandleOpenMessage_n(int channel_id,
@ -114,8 +119,8 @@ class DataChannelController : public SctpDataChannelControllerInterface,
const rtc::CopyOnWriteBuffer& buffer)
RTC_RUN_ON(network_thread());
// Called when a valid data channel OPEN message is received.
void OnDataChannelOpenMessage(const std::string& label,
const InternalDataChannelInit& config)
void OnDataChannelOpenMessage(rtc::scoped_refptr<SctpDataChannel> channel,
bool ready_to_send)
RTC_RUN_ON(signaling_thread());
// Accepts a `StreamId` which may be pre-negotiated or unassigned. For
@ -139,9 +144,6 @@ class DataChannelController : public SctpDataChannelControllerInterface,
// (calls OnTransportChannelCreated on the signaling thread).
void NotifyDataChannelsOfTransportCreated();
std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator FindChannel(
StreamId stream_id);
// Plugin transport used for data channels. Pointer may be accessed and
// checked from any thread, but the object may only be touched on the
// network thread.
@ -149,21 +151,16 @@ class DataChannelController : public SctpDataChannelControllerInterface,
// thread.
DataChannelTransportInterface* data_channel_transport_ = nullptr;
SctpSidAllocator sid_allocator_ RTC_GUARDED_BY(network_thread());
std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_
RTC_GUARDED_BY(signaling_thread());
// TODO(bugs.webrtc.org/11547): This vector will eventually take over from
// `sctp_data_channels_`. While we're migrating away from thread hops
// between the signaling and network threads, we need both, so this is
// a temporary situation.
std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_n_
RTC_GUARDED_BY(network_thread());
bool has_used_data_channels_ RTC_GUARDED_BY(signaling_thread()) = false;
// Owning PeerConnection.
PeerConnectionInternal* const pc_;
// The weak pointers must be dereferenced and invalidated on the signalling
// The weak pointers must be dereferenced and invalidated on the network
// thread only.
rtc::WeakPtrFactory<DataChannelController> weak_factory_{this};
rtc::WeakPtrFactory<DataChannelController> weak_factory_
RTC_GUARDED_BY(network_thread()){this};
ScopedTaskSafety signaling_safety_;
};

View File

@ -131,44 +131,6 @@ TEST_F(DataChannelControllerTest, CloseAfterControllerDestroyed) {
channel->Close();
}
TEST_F(DataChannelControllerTest, AsyncChannelCloseTeardown) {
DataChannelControllerForTest dcc(pc_.get());
auto ret = dcc.InternalCreateDataChannelWithProxy(
"label", InternalDataChannelInit(DataChannelInit()));
ASSERT_TRUE(ret.ok());
auto channel = ret.MoveValue();
SctpDataChannel* inner_channel =
DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
channel.get());
// Grab a reference for testing purposes.
inner_channel->AddRef();
channel = nullptr; // dcc still holds a reference to `channel`.
EXPECT_TRUE(dcc.HasDataChannelsForTest());
// Trigger a Close() for the channel. This will send events back to dcc,
// eventually reaching `OnSctpDataChannelClosed` where dcc removes
// the channel from the internal list of data channels, but does not release
// the reference synchronously since that reference might be the last one.
inner_channel->Close();
// Now there should be no tracked data channels.
EXPECT_FALSE(dcc.HasDataChannelsForTest());
// But there should be an async operation queued that still holds a reference.
// That means that the test reference, must not be the last one.
ASSERT_NE(inner_channel->Release(),
rtc::RefCountReleaseStatus::kDroppedLastRef);
// Grab a reference again (using the pointer is safe since the object still
// exists and we control the single-threaded environment manually).
inner_channel->AddRef();
// Now run the queued up async operations on the signaling (current) thread.
// This time, the reference formerly owned by dcc, should be release and the
// truly last reference is now held by the test.
run_loop_.Flush();
// Check that this is the last reference.
EXPECT_EQ(inner_channel->Release(),
rtc::RefCountReleaseStatus::kDroppedLastRef);
}
// Allocate the maximum number of data channels and then one more.
// The last allocation should fail.
TEST_F(DataChannelControllerTest, MaxChannels) {

View File

@ -77,10 +77,12 @@ class SctpDataChannelTest : public ::testing::Test {
controller_(new FakeDataChannelController(&network_thread_)) {
network_thread_.Start();
inner_channel_ = controller_->CreateDataChannel("test", init_);
channel_ = webrtc::SctpDataChannel::CreateProxy(inner_channel_);
channel_ =
webrtc::SctpDataChannel::CreateProxy(inner_channel_, signaling_safety_);
}
~SctpDataChannelTest() override {
run_loop_.Flush();
signaling_safety_->SetNotAlive();
inner_channel_ = nullptr;
channel_ = nullptr;
controller_.reset();
@ -90,11 +92,17 @@ class SctpDataChannelTest : public ::testing::Test {
void SetChannelReady() {
controller_->set_transport_available(true);
inner_channel_->OnTransportChannelCreated();
if (!inner_channel_->sid_s().HasValue()) {
SetChannelSid(inner_channel_, StreamId(0));
}
StreamId sid(0);
network_thread_.BlockingCall([&]() {
RTC_DCHECK_RUN_ON(&network_thread_);
if (!inner_channel_->sid_n().HasValue()) {
inner_channel_->SetSctpSid_n(sid);
controller_->AddSctpDataStream(sid);
}
inner_channel_->OnTransportChannelCreated();
});
controller_->set_ready_to_send(true);
run_loop_.Flush();
}
// TODO(bugs.webrtc.org/11547): This mirrors what the DataChannelController
@ -105,9 +113,10 @@ class SctpDataChannelTest : public ::testing::Test {
void SetChannelSid(const rtc::scoped_refptr<SctpDataChannel>& channel,
StreamId sid) {
RTC_DCHECK(sid.HasValue());
network_thread_.BlockingCall(
[&]() { controller_->AddSctpDataStream(sid); });
channel->SetSctpSid_s(sid);
network_thread_.BlockingCall([&]() {
channel->SetSctpSid_n(sid);
controller_->AddSctpDataStream(sid);
});
}
void AddObserver() {
@ -118,6 +127,8 @@ class SctpDataChannelTest : public ::testing::Test {
test::RunLoop run_loop_;
rtc::Thread network_thread_;
InternalDataChannelInit init_;
rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety_ =
PendingTaskSafetyFlag::Create();
std::unique_ptr<FakeDataChannelController> controller_;
std::unique_ptr<FakeDataChannelObserver> observer_;
rtc::scoped_refptr<SctpDataChannel> inner_channel_;
@ -143,11 +154,13 @@ TEST_F(SctpDataChannelTest, VerifyConfigurationGetters) {
// Check the non-const part of the configuration.
EXPECT_EQ(channel_->id(), init_.id);
EXPECT_EQ(inner_channel_->sid_s(), StreamId());
network_thread_.BlockingCall(
[&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId()); });
SetChannelReady();
EXPECT_EQ(channel_->id(), 0);
EXPECT_EQ(inner_channel_->sid_s(), StreamId(0));
network_thread_.BlockingCall(
[&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId(0)); });
}
// Verifies that the data channel is connected to the transport after creation.
@ -155,13 +168,15 @@ TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
controller_->set_transport_available(true);
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init_);
EXPECT_TRUE(controller_->IsConnected(dc.get()));
// The sid is not set yet, so it should not have added the streams.
EXPECT_FALSE(controller_->IsStreamAdded(dc->sid_s()));
StreamId sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
EXPECT_FALSE(controller_->IsStreamAdded(sid));
SetChannelSid(dc, StreamId(0));
EXPECT_TRUE(controller_->IsStreamAdded(dc->sid_s()));
sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
EXPECT_TRUE(controller_->IsStreamAdded(sid));
}
// Tests the state of the data channel.
@ -180,7 +195,7 @@ TEST_F(SctpDataChannelTest, StateTransition) {
channel_->Close();
// The (simulated) transport close notifications runs on the network thread
// and posts a completion notification to the signaling (current) thread.
// Allow that ooperation to complete before checking the state.
// Allow that operation to complete before checking the state.
run_loop_.Flush();
EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
EXPECT_EQ(observer_->on_state_change_count(), 3u);
@ -198,6 +213,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
EXPECT_TRUE(channel_->Send(buffer));
size_t successful_send_count = 1;
run_loop_.Flush();
EXPECT_EQ(0U, channel_->buffered_amount());
EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count());
@ -214,6 +230,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
observer_->on_buffered_amount_change_count());
controller_->set_send_blocked(false);
run_loop_.Flush();
successful_send_count += number_of_packets;
EXPECT_EQ(0U, channel_->buffered_amount());
EXPECT_EQ(successful_send_count,
@ -334,10 +351,9 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
SetChannelReady();
InternalDataChannelInit init;
init.id = 1;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
EXPECT_EQ(DataChannelInterface::kConnecting, dc->state());
EXPECT_TRUE_WAIT(DataChannelInterface::kOpen == dc->state(), 1000);
auto dc = webrtc::SctpDataChannel::CreateProxy(
controller_->CreateDataChannel("test1", init), signaling_safety_);
EXPECT_EQ(DataChannelInterface::kOpen, dc->state());
}
// Tests that an unordered DataChannel sends data as ordered until the OPEN_ACK
@ -349,21 +365,23 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) {
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Sends a message and verifies it's ordered.
DataBuffer buffer("some data");
ASSERT_TRUE(dc->Send(buffer));
ASSERT_TRUE(proxy->Send(buffer));
EXPECT_TRUE(controller_->last_send_data_params().ordered);
// Emulates receiving an OPEN_ACK message.
rtc::CopyOnWriteBuffer payload;
WriteDataChannelOpenAckMessage(&payload);
dc->OnDataReceived(DataMessageType::kControl, payload);
network_thread_.BlockingCall(
[&] { dc->OnDataReceived(DataMessageType::kControl, payload); });
// Sends another message and verifies it's unordered.
ASSERT_TRUE(dc->Send(buffer));
ASSERT_TRUE(proxy->Send(buffer));
EXPECT_FALSE(controller_->last_send_data_params().ordered);
}
@ -376,15 +394,17 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) {
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Emulates receiving a DATA message.
DataBuffer buffer("data");
dc->OnDataReceived(DataMessageType::kText, buffer.data);
network_thread_.BlockingCall(
[&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); });
// Sends a message and verifies it's unordered.
ASSERT_TRUE(dc->Send(buffer));
ASSERT_TRUE(proxy->Send(buffer));
EXPECT_FALSE(controller_->last_send_data_params().ordered);
}
@ -437,7 +457,10 @@ TEST_F(SctpDataChannelTest, ReceiveDataWithValidId) {
AddObserver();
DataBuffer buffer("abcd");
inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
network_thread_.BlockingCall([&] {
inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
});
run_loop_.Flush();
EXPECT_EQ(1U, observer_->messages_received());
}
@ -452,8 +475,9 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) {
SetChannelReady();
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", config);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
EXPECT_EQ(0, controller_->last_sid());
}
@ -477,9 +501,10 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) {
EXPECT_EQ(0U, channel_->bytes_received());
// Receive three buffers while data channel isn't open.
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[0].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[1].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[2].data);
network_thread_.BlockingCall([&] {
for (int i : {0, 1, 2})
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
});
EXPECT_EQ(0U, observer_->messages_received());
EXPECT_EQ(0U, channel_->messages_received());
EXPECT_EQ(0U, channel_->bytes_received());
@ -493,9 +518,11 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) {
EXPECT_EQ(bytes_received, channel_->bytes_received());
// Receive three buffers while open.
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[3].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[4].data);
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[5].data);
network_thread_.BlockingCall([&] {
for (int i : {3, 4, 5})
inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
});
run_loop_.Flush();
bytes_received += buffers[3].size() + buffers[4].size() + buffers[5].size();
EXPECT_EQ(6U, observer_->messages_received());
EXPECT_EQ(6U, channel_->messages_received());
@ -513,8 +540,9 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) {
SetChannelReady();
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", config);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
EXPECT_EQ(config.id, controller_->last_sid());
EXPECT_EQ(DataMessageType::kControl,
@ -551,9 +579,8 @@ TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) {
EXPECT_TRUE(channel_->Send(packet));
}
// The sending buffer shoul be full, send returns false.
// The sending buffer should be full, `Send()` returns false.
EXPECT_FALSE(channel_->Send(packet));
EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state());
}
@ -577,10 +604,12 @@ TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) {
rtc::CopyOnWriteBuffer buffer(1024);
memset(buffer.MutableData(), 0, buffer.size());
// Receiving data without having an observer will overflow the buffer.
for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
}
network_thread_.BlockingCall([&] {
// Receiving data without having an observer will overflow the buffer.
for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
}
});
EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
EXPECT_FALSE(channel_->error().ok());
EXPECT_EQ(RTCErrorType::RESOURCE_EXHAUSTED, channel_->error().type());
@ -601,7 +630,8 @@ TEST_F(SctpDataChannelTest, SendEmptyData) {
// Tests that a channel can be closed without being opened or assigned an sid.
TEST_F(SctpDataChannelTest, NeverOpened) {
controller_->set_transport_available(true);
inner_channel_->OnTransportChannelCreated();
network_thread_.BlockingCall(
[&] { inner_channel_->OnTransportChannelCreated(); });
channel_->Close();
}
@ -631,7 +661,8 @@ TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) {
// transition to the "closed" state.
RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, "");
error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
inner_channel_->OnTransportChannelClosed(error);
network_thread_.BlockingCall(
[&] { inner_channel_->OnTransportChannelClosed(error); });
controller_.reset(nullptr);
EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
kDefaultTimeout);
@ -651,7 +682,8 @@ TEST_F(SctpDataChannelTest, TransportGotErrorCode) {
error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
error.set_sctp_cause_code(
static_cast<uint16_t>(cricket::SctpErrorCauseCode::kProtocolViolation));
inner_channel_->OnTransportChannelClosed(error);
network_thread_.BlockingCall(
[&] { inner_channel_->OnTransportChannelClosed(error); });
controller_.reset(nullptr);
EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
kDefaultTimeout);

View File

@ -34,7 +34,6 @@
#include "api/video/video_timing.h"
#include "call/call.h"
#include "media/base/media_channel.h"
#include "media/base/media_channel_impl.h"
#include "modules/audio_processing/include/audio_processing_statistics.h"
#include "p2p/base/ice_transport_internal.h"
#include "p2p/base/p2p_constants.h"
@ -670,7 +669,7 @@ void LegacyStatsCollector::UpdateStats(
// to fetch stats, then applies them on the signaling thread. See if we need
// to do this synchronously or if updating the stats without blocking is safe.
std::map<std::string, std::string> transport_names_by_mid =
ExtractSessionInfo();
ExtractSessionAndDataInfo();
// TODO(tommi): All of these hop over to the worker thread to fetch
// information. We could post a task to run all of these and post
@ -681,7 +680,6 @@ void LegacyStatsCollector::UpdateStats(
ExtractBweInfo();
ExtractMediaInfo(transport_names_by_mid);
ExtractSenderInfo();
ExtractDataInfo();
UpdateTrackReports();
}
@ -856,19 +854,26 @@ StatsReport* LegacyStatsCollector::AddCandidateReport(
return report;
}
std::map<std::string, std::string> LegacyStatsCollector::ExtractSessionInfo() {
TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionInfo");
std::map<std::string, std::string>
LegacyStatsCollector::ExtractSessionAndDataInfo() {
TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionAndDataInfo");
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
SessionStats stats;
StatsCollection::Container data_report_collection;
auto transceivers = pc_->GetTransceiversInternal();
pc_->network_thread()->BlockingCall(
[&, sctp_transport_name = pc_->sctp_transport_name(),
sctp_mid = pc_->sctp_mid()]() mutable {
stats = ExtractSessionInfo_n(
transceivers, std::move(sctp_transport_name), std::move(sctp_mid));
StatsCollection data_reports;
ExtractDataInfo_n(&data_reports);
data_report_collection = data_reports.DetachCollection();
});
reports_.MergeCollection(std::move(data_report_collection));
ExtractSessionInfo_s(stats);
return std::move(stats.transport_names_by_mid);
@ -1292,8 +1297,8 @@ void LegacyStatsCollector::ExtractSenderInfo() {
}
}
void LegacyStatsCollector::ExtractDataInfo() {
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
void LegacyStatsCollector::ExtractDataInfo_n(StatsCollection* reports) {
RTC_DCHECK_RUN_ON(pc_->network_thread());
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
@ -1301,7 +1306,7 @@ void LegacyStatsCollector::ExtractDataInfo() {
for (const auto& stats : data_stats) {
StatsReport::Id id(StatsReport::NewTypedIntId(
StatsReport::kStatsReportTypeDataChannel, stats.id));
StatsReport* report = reports_.ReplaceOrAddNew(id);
StatsReport* report = reports->ReplaceOrAddNew(id);
report->set_timestamp(stats_gathering_started_);
report->AddString(StatsReport::kStatsValueNameLabel, stats.label);
// Filter out the initial id (-1).

View File

@ -165,11 +165,13 @@ class LegacyStatsCollector : public LegacyStatsCollectorInterface {
const StatsReport::Id& channel_report_id,
const cricket::ConnectionInfo& info);
void ExtractDataInfo();
void ExtractDataInfo_n(StatsCollection* reports);
// Returns the `transport_names_by_mid` member from the SessionStats as
// gathered and used to populate the stats.
std::map<std::string, std::string> ExtractSessionInfo();
// gathered and used to populate the stats. Contains one synchronous hop
// to the network thread to get this information along with querying data
// channel stats at the same time and populating `reports_`.
std::map<std::string, std::string> ExtractSessionAndDataInfo();
void ExtractBweInfo();
void ExtractMediaInfo(

View File

@ -2286,7 +2286,7 @@ bool PeerConnection::GetTransportDescription(
}
std::vector<DataChannelStats> PeerConnection::GetDataChannelStats() const {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK_RUN_ON(network_thread());
return data_channel_controller_.GetDataChannelStats();
}

View File

@ -449,7 +449,6 @@ class ConstMethodCall {
TRACE_BOILERPLATE(method); \
return c_->method(); \
}
// Allows a custom implementation of a method where the otherwise proxied
// implementation can do a more efficient, yet thread-safe, job than the proxy
// can do by default or when more flexibility is needed than can be provided

View File

@ -1499,7 +1499,6 @@ void RTCStatsCollector::ProducePartialResultsOnSignalingThreadImpl(
RTC_DCHECK_RUN_ON(signaling_thread_);
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
ProduceDataChannelStats_s(timestamp, partial_report);
ProduceMediaStreamStats_s(timestamp, partial_report);
ProduceMediaStreamTrackStats_s(timestamp, partial_report);
ProduceMediaSourceStats_s(timestamp, partial_report);
@ -1519,6 +1518,8 @@ void RTCStatsCollector::ProducePartialResultsOnNetworkThread(
// `network_report_event_` is reset before this method is invoked.
network_report_ = RTCStatsReport::Create(timestamp);
ProduceDataChannelStats_n(timestamp, network_report_.get());
std::set<std::string> transport_names;
if (sctp_transport_name) {
transport_names.emplace(std::move(*sctp_transport_name));
@ -1653,10 +1654,10 @@ void RTCStatsCollector::ProduceCertificateStats_n(
}
}
void RTCStatsCollector::ProduceDataChannelStats_s(
void RTCStatsCollector::ProduceDataChannelStats_n(
Timestamp timestamp,
RTCStatsReport* report) const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
std::vector<DataChannelStats> data_stats = pc_->GetDataChannelStats();
for (const auto& stats : data_stats) {

View File

@ -186,7 +186,7 @@ class RTCStatsCollector : public rtc::RefCountInterface {
const std::map<std::string, CertificateStatsPair>& transport_cert_stats,
RTCStatsReport* report) const;
// Produces `RTCDataChannelStats`.
void ProduceDataChannelStats_s(Timestamp timestamp,
void ProduceDataChannelStats_n(Timestamp timestamp,
RTCStatsReport* report) const;
// Produces `RTCIceCandidatePairStats` and `RTCIceCandidateStats`.
void ProduceIceCandidateAndPairStats_n(

View File

@ -15,7 +15,6 @@
#include <string>
#include <utility>
#include "absl/cleanup/cleanup.h"
#include "media/sctp/sctp_transport_internal.h"
#include "pc/proxy.h"
#include "rtc_base/checks.h"
@ -38,8 +37,8 @@ int GenerateUniqueId() {
// Define proxy for DataChannelInterface.
BEGIN_PROXY_MAP(DataChannel)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
PROXY_METHOD0(void, UnregisterObserver)
BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
BYPASS_PROXY_METHOD0(void, UnregisterObserver)
BYPASS_PROXY_CONSTMETHOD0(std::string, label)
BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
@ -50,20 +49,18 @@ BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
// Can't bypass the proxy since the id may change.
PROXY_CONSTMETHOD0(int, id)
PROXY_SECONDARY_CONSTMETHOD0(int, id)
BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
PROXY_CONSTMETHOD0(DataState, state)
PROXY_CONSTMETHOD0(RTCError, error)
PROXY_CONSTMETHOD0(uint32_t, messages_sent)
PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
PROXY_CONSTMETHOD0(uint32_t, messages_received)
PROXY_CONSTMETHOD0(uint64_t, bytes_received)
PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
PROXY_METHOD0(void, Close)
// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
PROXY_METHOD1(bool, Send, const DataBuffer&)
BYPASS_PROXY_CONSTMETHOD0(DataState, state)
BYPASS_PROXY_CONSTMETHOD0(RTCError, error)
PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent)
PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent)
PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received)
PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received)
PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount)
PROXY_SECONDARY_METHOD0(void, Close)
PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&)
END_PROXY_MAP(DataChannel)
} // namespace
InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
@ -142,6 +139,138 @@ void SctpSidAllocator::ReleaseSid(StreamId sid) {
used_sids_.erase(sid);
}
// A DataChannelObserver implementation that offers backwards compatibility with
// implementations that aren't yet ready to be called back on the network
// thread. This implementation posts events to the signaling thread where
// events are delivered.
// In the class, and together with the `SctpDataChannel` implementation, there's
// special handling for the `state()` property whereby if that property is
// queried on the channel object while inside an event callback, we return
// the state that was active at the time the event was issued. This is to avoid
// a problem with calling the `state()` getter on the proxy, which would do
// a blocking call to the network thread, effectively flushing operations on
// the network thread that could cause the state to change and eventually return
// a misleading or arguably, wrong, state value to the callback implementation.
// As a future improvement to the ObserverAdapter, we could do the same for
// other properties that need to be read on the network thread. Eventually
// all implementations should expect to be called on the network thread though
// and the ObserverAdapter no longer be necessary.
class SctpDataChannel::ObserverAdapter : public DataChannelObserver {
public:
explicit ObserverAdapter(
SctpDataChannel* channel,
rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety)
: channel_(channel), signaling_safety_(std::move(signaling_safety)) {}
bool IsInsideCallback() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return cached_getters_ != nullptr;
}
DataChannelInterface::DataState cached_state() const {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(IsInsideCallback());
return cached_getters_->state();
}
RTCError cached_error() const {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(IsInsideCallback());
return cached_getters_->error();
}
void SetDelegate(DataChannelObserver* delegate) {
RTC_DCHECK_RUN_ON(signaling_thread());
delegate_ = delegate;
safety_.reset(PendingTaskSafetyFlag::CreateDetached());
}
static void DeleteOnSignalingThread(
std::unique_ptr<ObserverAdapter> observer) {
auto* signaling_thread = observer->signaling_thread();
if (!signaling_thread->IsCurrent())
signaling_thread->PostTask([observer = std::move(observer)]() {});
}
private:
class CachedGetters {
public:
explicit CachedGetters(ObserverAdapter* adapter)
: adapter_(adapter),
cached_state_(adapter_->channel_->state()),
cached_error_(adapter_->channel_->error()) {
RTC_DCHECK_RUN_ON(adapter->network_thread());
}
~CachedGetters() {
if (!was_dropped_) {
RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
RTC_DCHECK_EQ(adapter_->cached_getters_, this);
adapter_->cached_getters_ = nullptr;
}
}
bool PrepareForCallback() {
RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
RTC_DCHECK(was_dropped_);
was_dropped_ = false;
adapter_->cached_getters_ = this;
return adapter_->delegate_ && adapter_->signaling_safety_->alive();
}
RTCError error() { return cached_error_; }
DataChannelInterface::DataState state() { return cached_state_; }
private:
ObserverAdapter* const adapter_;
bool was_dropped_ = true;
const DataChannelInterface::DataState cached_state_;
const RTCError cached_error_;
};
void OnStateChange() override {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
SafeTask(safety_.flag(),
[this, cached_state = std::make_unique<CachedGetters>(this)] {
RTC_DCHECK_RUN_ON(signaling_thread());
if (cached_state->PrepareForCallback())
delegate_->OnStateChange();
}));
}
void OnMessage(const DataBuffer& buffer) override {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(SafeTask(
safety_.flag(), [this, buffer = buffer,
cached_state = std::make_unique<CachedGetters>(this)] {
RTC_DCHECK_RUN_ON(signaling_thread());
if (cached_state->PrepareForCallback())
delegate_->OnMessage(buffer);
}));
}
void OnBufferedAmountChange(uint64_t sent_data_size) override {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(SafeTask(
safety_.flag(), [this, sent_data_size,
cached_state = std::make_unique<CachedGetters>(this)] {
RTC_DCHECK_RUN_ON(signaling_thread());
if (cached_state->PrepareForCallback())
delegate_->OnBufferedAmountChange(sent_data_size);
}));
}
rtc::Thread* signaling_thread() const { return channel_->signaling_thread_; }
rtc::Thread* network_thread() const { return channel_->network_thread_; }
DataChannelObserver* delegate_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
SctpDataChannel* const channel_;
ScopedTaskSafety safety_;
rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety_;
CachedGetters* cached_getters_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
};
// static
rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
@ -158,10 +287,13 @@ rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
// static
rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
rtc::scoped_refptr<SctpDataChannel> channel) {
rtc::scoped_refptr<SctpDataChannel> channel,
rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety) {
// Copy thread params to local variables before `std::move()`.
auto* signaling_thread = channel->signaling_thread_;
auto* network_thread = channel->network_thread_;
channel->observer_adapter_ = std::make_unique<ObserverAdapter>(
channel.get(), std::move(signaling_safety));
return DataChannelProxy::Create(signaling_thread, network_thread,
std::move(channel));
}
@ -175,7 +307,6 @@ SctpDataChannel::SctpDataChannel(
rtc::Thread* network_thread)
: signaling_thread_(signaling_thread),
network_thread_(network_thread),
id_s_(config.id),
id_n_(config.id),
internal_id_(GenerateUniqueId()),
label_(label),
@ -208,18 +339,87 @@ SctpDataChannel::SctpDataChannel(
}
SctpDataChannel::~SctpDataChannel() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (observer_adapter_)
ObserverAdapter::DeleteOnSignalingThread(std::move(observer_adapter_));
}
void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = observer;
DeliverQueuedReceivedData();
// Note: at this point, we do not know on which thread we're being called
// from since this method bypasses the proxy. On Android in particular,
// registration methods are called from unknown threads.
// Check if we should set up an observer adapter that will make sure that
// callbacks are delivered on the signaling thread rather than directly
// on the network thread.
const auto* current_thread = rtc::Thread::Current();
// TODO(webrtc:11547): Eventually all DataChannelObserver implementations
// should be called on the network thread and IsOkToCallOnTheNetworkThread().
if (!observer->IsOkToCallOnTheNetworkThread()) {
RTC_LOG(LS_WARNING) << "DataChannelObserver - adapter needed";
auto prepare_observer = [&]() {
RTC_DCHECK(observer_adapter_) << "CreateProxy hasn't been called";
observer_adapter_->SetDelegate(observer);
return observer_adapter_.get();
};
// Instantiate the adapter in the right context and then substitute the
// observer pointer the SctpDataChannel will call back on, with the adapter.
if (signaling_thread_ == current_thread) {
observer = prepare_observer();
} else {
observer = signaling_thread_->BlockingCall(std::move(prepare_observer));
}
}
// Now do the observer registration on the network thread.
auto register_observer = [&] {
RTC_DCHECK_RUN_ON(network_thread_);
observer_ = observer;
DeliverQueuedReceivedData();
};
if (network_thread_ == current_thread) {
register_observer();
} else {
network_thread_->BlockingCall(std::move(register_observer));
}
}
void SctpDataChannel::UnregisterObserver() {
RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = nullptr;
// Note: As with `RegisterObserver`, the proxy is being bypassed.
const auto* current_thread = rtc::Thread::Current();
// Callers must not be invoking the unregistration from the network thread
// (assuming a multi-threaded environment where we have a dedicated network
// thread). That would indicate non-network related work happening on the
// network thread or that unregistration is being done from within a callback
// (without unwinding the stack, which is a requirement).
// The network thread is not allowed to make blocking calls to the signaling
// thread, so that would blow up if attempted. Since we support an adapter
// for observers that are not safe to call on the network thread, we do
// need to check+free it on the signaling thread.
RTC_DCHECK(current_thread != network_thread_ ||
network_thread_ == signaling_thread_);
auto unregister_observer = [&] {
RTC_DCHECK_RUN_ON(network_thread_);
observer_ = nullptr;
};
if (current_thread == network_thread_) {
unregister_observer();
} else {
network_thread_->BlockingCall(std::move(unregister_observer));
}
auto clear_observer = [&]() {
if (observer_adapter_)
observer_adapter_->SetDelegate(nullptr);
};
if (current_thread != signaling_thread_) {
signaling_thread_->BlockingCall(std::move(clear_observer));
} else {
clear_observer();
}
}
std::string SctpDataChannel::label() const {
@ -261,8 +461,11 @@ bool SctpDataChannel::negotiated() const {
}
int SctpDataChannel::id() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return id_s_.stream_id_int();
RTC_DCHECK_RUN_ON(network_thread_);
// TODO(tommi): Once an ID has been assigned, it won't change (can be
// considered const). We could do special handling of this and allow bypassing
// the proxy so that we can return a valid id without thread hopping.
return id_n_.stream_id_int();
}
Priority SctpDataChannel::priority() const {
@ -270,12 +473,12 @@ Priority SctpDataChannel::priority() const {
}
uint64_t SctpDataChannel::buffered_amount() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return queued_send_data_.byte_count();
}
void SctpDataChannel::Close() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ == kClosing || state_ == kClosed)
return;
SetState(kClosing);
@ -284,40 +487,70 @@ void SctpDataChannel::Close() {
}
SctpDataChannel::DataState SctpDataChannel::state() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return state_;
// Note: The proxy is bypassed for the `state()` accessor. This is to allow
// observer callbacks to query what the new state is from within a state
// update notification without having to do a blocking call to the network
// thread from within a callback. This also makes it so that the returned
// state is guaranteed to be the new state that provoked the state change
// notification, whereby a blocking call to the network thread might end up
// getting put behind other messages on the network thread and eventually
// fetch a different state value (since pending messages might cause the
// state to change in the meantime).
const auto* current_thread = rtc::Thread::Current();
if (current_thread == signaling_thread_ && observer_adapter_ &&
observer_adapter_->IsInsideCallback()) {
return observer_adapter_->cached_state();
}
auto return_state = [&] {
RTC_DCHECK_RUN_ON(network_thread_);
return state_;
};
return current_thread == network_thread_
? return_state()
: network_thread_->BlockingCall(std::move(return_state));
}
RTCError SctpDataChannel::error() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return error_;
const auto* current_thread = rtc::Thread::Current();
if (current_thread == signaling_thread_ && observer_adapter_ &&
observer_adapter_->IsInsideCallback()) {
return observer_adapter_->cached_error();
}
auto return_error = [&] {
RTC_DCHECK_RUN_ON(network_thread_);
return error_;
};
return current_thread == network_thread_
? return_error()
: network_thread_->BlockingCall(std::move(return_error));
}
uint32_t SctpDataChannel::messages_sent() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return messages_sent_;
}
uint64_t SctpDataChannel::bytes_sent() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return bytes_sent_;
}
uint32_t SctpDataChannel::messages_received() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return messages_received_;
}
uint64_t SctpDataChannel::bytes_received() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
return bytes_received_;
}
bool SctpDataChannel::Send(const DataBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
// TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
// thread. Bring buffer management etc to the network thread and keep the
// operational state management on the signaling thread.
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ != kOpen) {
return false;
@ -335,25 +568,17 @@ bool SctpDataChannel::Send(const DataBuffer& buffer) {
return true;
}
void SctpDataChannel::SetSctpSid_s(StreamId sid) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(!id_s_.HasValue());
RTC_DCHECK(sid.HasValue());
RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
RTC_DCHECK_EQ(state_, kConnecting);
id_s_ = sid;
}
void SctpDataChannel::SetSctpSid_n(StreamId sid) {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(!id_n_.HasValue());
RTC_DCHECK(sid.HasValue());
RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
RTC_DCHECK_EQ(state_, kConnecting);
id_n_ = sid;
}
void SctpDataChannel::OnClosingProcedureStartedRemotely() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ != kClosing && state_ != kClosed) {
// Don't bother sending queued data since the side that initiated the
// closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
@ -369,7 +594,7 @@ void SctpDataChannel::OnClosingProcedureStartedRemotely() {
}
void SctpDataChannel::OnClosingProcedureComplete() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
// If the closing procedure is complete, we should have finished sending
// all pending data and transitioned to kClosing already.
RTC_DCHECK_EQ(state_, kClosing);
@ -378,12 +603,12 @@ void SctpDataChannel::OnClosingProcedureComplete() {
}
void SctpDataChannel::OnTransportChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
connected_to_transport_ = true;
}
void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
RTC_DCHECK_RUN_ON(network_thread_);
// The SctpTransport is unusable, which could come from multiple reasons:
// - the SCTP m= section was rejected
// - the DTLS transport is closed
@ -392,7 +617,7 @@ void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
}
DataChannelStats SctpDataChannel::GetStats() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
DataChannelStats stats{internal_id_, id(), label(),
protocol(), state(), messages_sent(),
messages_received(), bytes_sent(), bytes_received()};
@ -401,25 +626,25 @@ DataChannelStats SctpDataChannel::GetStats() const {
void SctpDataChannel::OnDataReceived(DataMessageType type,
const rtc::CopyOnWriteBuffer& payload) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (type == DataMessageType::kControl) {
if (handshake_state_ != kHandshakeWaitingForAck) {
// Ignore it if we are not expecting an ACK message.
RTC_LOG(LS_WARNING)
<< "DataChannel received unexpected CONTROL message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
return;
}
if (ParseDataChannelOpenAckMessage(payload)) {
// We can send unordered as soon as we receive the ACK message.
handshake_state_ = kHandshakeReady;
RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
} else {
RTC_LOG(LS_WARNING)
<< "DataChannel failed to parse OPEN_ACK message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
}
return;
}
@ -428,7 +653,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type,
type == DataMessageType::kText);
RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
// We can send unordered as soon as we receive any DATA message since the
// remote side must have received the OPEN (and old clients do not send
// OPEN_ACK).
@ -459,7 +684,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type,
}
void SctpDataChannel::OnTransportReady() {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
// TODO(bugs.webrtc.org/11547): The transport is configured inside
// `PeerConnection::SetupDataChannelTransport_n`, which results in
@ -472,6 +697,7 @@ void SctpDataChannel::OnTransportReady() {
// be on for the below `Send*` calls, which currently do a BlockingCall
// from the signaling thread to the network thread.
RTC_DCHECK(connected_to_transport_);
RTC_DCHECK(id_n_.HasValue());
SendQueuedControlMessages();
SendQueuedDataMessages();
@ -480,7 +706,7 @@ void SctpDataChannel::OnTransportReady() {
}
void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK_RUN_ON(network_thread_);
if (state_ == kClosed) {
return;
@ -501,13 +727,14 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
const std::string& message) {
RTC_DCHECK_RUN_ON(network_thread_);
RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
CloseAbruptlyWithError(std::move(error));
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::UpdateState() {
RTC_DCHECK_RUN_ON(signaling_thread_);
// UpdateState determines what to do from a few state variables. Include
// all conditions required for each state transition here for
// clarity. OnTransportReady(true) will send any queued data and then invoke
@ -535,7 +762,7 @@ void SctpDataChannel::UpdateState() {
DeliverQueuedReceivedData();
}
} else {
RTC_DCHECK(!id_s_.HasValue());
RTC_DCHECK(!id_n_.HasValue());
}
break;
}
@ -551,11 +778,9 @@ void SctpDataChannel::UpdateState() {
// to complete; after calling RemoveSctpDataStream,
// OnClosingProcedureComplete will end up called asynchronously
// afterwards.
if (!started_closing_procedure_ && id_s_.HasValue()) {
if (!started_closing_procedure_ && id_n_.HasValue()) {
started_closing_procedure_ = true;
network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] {
c->RemoveSctpDataStream(sid);
});
controller_->RemoveSctpDataStream(id_n_);
}
}
} else {
@ -572,8 +797,8 @@ void SctpDataChannel::UpdateState() {
}
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::SetState(DataState state) {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ == state) {
return;
}
@ -587,8 +812,8 @@ void SctpDataChannel::SetState(DataState state) {
controller_->OnChannelStateChanged(this, state_);
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::DeliverQueuedReceivedData() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (!observer_) {
return;
}
@ -601,8 +826,8 @@ void SctpDataChannel::DeliverQueuedReceivedData() {
}
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::SendQueuedDataMessages() {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (queued_send_data_.Empty()) {
return;
}
@ -619,9 +844,9 @@ void SctpDataChannel::SendQueuedDataMessages() {
}
}
// RTC_RUN_ON(network_thread_).
bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
bool queue_if_blocked) {
RTC_DCHECK_RUN_ON(signaling_thread_);
SendDataParams send_params;
if (!controller_) {
return false;
@ -641,7 +866,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
send_params.type =
buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
RTCError error = controller_->SendData(id_s_, send_params, buffer.data);
RTCError error = controller_->SendData(id_n_, send_params, buffer.data);
if (error.ok()) {
++messages_sent_;
@ -669,8 +894,8 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
return false;
}
// RTC_RUN_ON(network_thread_).
bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
size_t start_buffered_amount = queued_send_data_.byte_count();
if (start_buffered_amount + buffer.size() >
DataChannelInterface::MaxSendQueueSize()) {
@ -681,8 +906,8 @@ bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
return true;
}
// RTC_RUN_ON(network_thread_).
void SctpDataChannel::SendQueuedControlMessages() {
RTC_DCHECK_RUN_ON(signaling_thread_);
PacketQueue control_packets;
control_packets.Swap(&queued_control_data_);
@ -692,10 +917,10 @@ void SctpDataChannel::SendQueuedControlMessages() {
}
}
// RTC_RUN_ON(network_thread_).
bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(connected_to_transport_);
RTC_DCHECK(id_s_.HasValue());
RTC_DCHECK(id_n_.HasValue());
RTC_DCHECK(controller_);
bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
@ -708,10 +933,10 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
send_params.ordered = ordered_ || is_open_message;
send_params.type = DataMessageType::kControl;
RTCError err = controller_->SendData(id_s_, send_params, buffer);
RTCError err = controller_->SendData(id_n_, send_params, buffer);
if (err.ok()) {
RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
<< id_s_.stream_id_int();
<< id_n_.stream_id_int();
if (handshake_state_ == kHandshakeShouldSendAck) {
handshake_state_ = kHandshakeReady;
@ -735,10 +960,4 @@ void SctpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
g_unique_id = new_value;
}
SctpDataChannel* DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
DataChannelInterface* channel) {
return static_cast<SctpDataChannel*>(
static_cast<DataChannelProxy*>(channel)->internal());
}
} // namespace webrtc

View File

@ -39,9 +39,7 @@ namespace webrtc {
class SctpDataChannel;
// Interface that acts as a bridge from the data channel to the transport.
// TODO(bugs.webrtc.org/11547): The transport operates on the network thread
// and ultimately all the methods in this interface need to be invoked on the
// network thread. Currently, some are called on the signaling thread.
// All methods in this interface need to be invoked on the network thread.
class SctpDataChannelControllerInterface {
public:
// Sends the data to the transport.
@ -49,11 +47,9 @@ class SctpDataChannelControllerInterface {
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) = 0;
// Adds the data channel SID to the transport for SCTP.
// Note: Must be called on the network thread.
virtual void AddSctpDataStream(StreamId sid) = 0;
// Begins the closing procedure by sending an outgoing stream reset. Still
// need to wait for callbacks to tell when this completes.
// Note: Must be called on the network thread.
virtual void RemoveSctpDataStream(StreamId sid) = 0;
// Notifies the controller of state changes.
virtual void OnChannelStateChanged(SctpDataChannel* data_channel,
@ -139,8 +135,14 @@ class SctpDataChannel : public DataChannelInterface {
// Instantiates an API proxy for a SctpDataChannel instance that will be
// handed out to external callers.
// The `signaling_safety` flag is used for the ObserverAdapter callback proxy
// which delivers callbacks on the signaling thread but must not deliver such
// callbacks after the peerconnection has been closed. The data controller
// will update the flag when closed, which will cancel any pending event
// notifications.
static rtc::scoped_refptr<DataChannelInterface> CreateProxy(
rtc::scoped_refptr<SctpDataChannel> channel);
rtc::scoped_refptr<SctpDataChannel> channel,
rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety);
void RegisterObserver(DataChannelObserver* observer) override;
void UnregisterObserver() override;
@ -192,7 +194,6 @@ class SctpDataChannel : public DataChannelInterface {
// Sets the SCTP sid and adds to transport layer if not set yet. Should only
// be called once.
void SetSctpSid_s(StreamId sid);
void SetSctpSid_n(StreamId sid);
// The remote side started the closing procedure by resetting its outgoing
@ -216,10 +217,6 @@ class SctpDataChannel : public DataChannelInterface {
// stats purposes (see also `GetStats()`).
int internal_id() const { return internal_id_; }
StreamId sid_s() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return id_s_;
}
StreamId sid_n() const {
RTC_DCHECK_RUN_ON(network_thread_);
return id_n_;
@ -239,6 +236,8 @@ class SctpDataChannel : public DataChannelInterface {
~SctpDataChannel() override;
private:
class ObserverAdapter;
// The OPEN(_ACK) signaling state.
enum HandshakeState {
kHandshakeInit,
@ -248,21 +247,23 @@ class SctpDataChannel : public DataChannelInterface {
kHandshakeReady
};
void UpdateState();
void SetState(DataState state);
void UpdateState() RTC_RUN_ON(network_thread_);
void SetState(DataState state) RTC_RUN_ON(network_thread_);
void DeliverQueuedReceivedData();
void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_);
void SendQueuedDataMessages();
bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked);
bool QueueSendDataMessage(const DataBuffer& buffer);
void SendQueuedDataMessages() RTC_RUN_ON(network_thread_);
bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked)
RTC_RUN_ON(network_thread_);
bool QueueSendDataMessage(const DataBuffer& buffer)
RTC_RUN_ON(network_thread_);
void SendQueuedControlMessages();
bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
void SendQueuedControlMessages() RTC_RUN_ON(network_thread_);
bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer)
RTC_RUN_ON(network_thread_);
rtc::Thread* const signaling_thread_;
rtc::Thread* const network_thread_;
StreamId id_s_ RTC_GUARDED_BY(signaling_thread_);
StreamId id_n_ RTC_GUARDED_BY(network_thread_);
const int internal_id_;
const std::string label_;
@ -273,32 +274,28 @@ class SctpDataChannel : public DataChannelInterface {
const bool negotiated_;
const bool ordered_;
DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr;
DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting;
RTCError error_ RTC_GUARDED_BY(signaling_thread_);
uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr;
std::unique_ptr<ObserverAdapter> observer_adapter_;
DataState state_ RTC_GUARDED_BY(network_thread_) = kConnecting;
RTCError error_ RTC_GUARDED_BY(network_thread_);
uint32_t messages_sent_ RTC_GUARDED_BY(network_thread_) = 0;
uint64_t bytes_sent_ RTC_GUARDED_BY(network_thread_) = 0;
uint32_t messages_received_ RTC_GUARDED_BY(network_thread_) = 0;
uint64_t bytes_received_ RTC_GUARDED_BY(network_thread_) = 0;
rtc::WeakPtr<SctpDataChannelControllerInterface> controller_
RTC_GUARDED_BY(signaling_thread_);
HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_) =
RTC_GUARDED_BY(network_thread_);
HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) =
kHandshakeInit;
bool connected_to_transport_ RTC_GUARDED_BY(signaling_thread_) = false;
bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false;
// Did we already start the graceful SCTP closing procedure?
bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false;
bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false;
// Control messages that always have to get sent out before any queued
// data.
PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_);
PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_);
PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_);
PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_);
};
// Downcast a PeerConnectionInterface that points to a proxy object
// to its underlying SctpDataChannel object. For testing only.
SctpDataChannel* DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
DataChannelInterface* channel);
} // namespace webrtc
#endif // PC_SCTP_DATA_CHANNEL_H_

View File

@ -3268,20 +3268,16 @@ void SdpOfferAnswerHandler::AllocateSctpSids() {
return;
}
absl::optional<rtc::SSLRole> role = network_thread()->BlockingCall([this] {
RTC_DCHECK_RUN_ON(network_thread());
return pc_->GetSctpSslRole_n();
});
if (!role) {
role = GuessSslRole();
}
if (role) {
// TODO(webrtc:11547): Make this call on the network thread too once
// `AllocateSctpSids` has been updated.
data_channel_controller()->AllocateSctpSids(*role);
}
absl::optional<rtc::SSLRole> guessed_role = GuessSslRole();
network_thread()->BlockingCall(
[&, data_channel_controller = data_channel_controller()] {
RTC_DCHECK_RUN_ON(network_thread());
absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n();
if (!role)
role = guessed_role;
if (role)
data_channel_controller->AllocateSctpSids(*role);
});
}
absl::optional<rtc::SSLRole> SdpOfferAnswerHandler::GuessSslRole() const {
@ -5117,13 +5113,13 @@ void SdpOfferAnswerHandler::DestroyDataChannelTransport(RTCError error) {
RTC_DCHECK_RUN_ON(signaling_thread());
const bool has_sctp = pc_->sctp_mid().has_value();
if (has_sctp)
data_channel_controller()->OnTransportChannelClosed(error);
context_->network_thread()->BlockingCall([this] {
RTC_DCHECK_RUN_ON(context_->network_thread());
pc_->TeardownDataChannelTransport_n();
});
context_->network_thread()->BlockingCall(
[&, data_channel_controller = data_channel_controller()] {
RTC_DCHECK_RUN_ON(context_->network_thread());
if (has_sctp)
data_channel_controller->OnTransportChannelClosed(error);
pc_->TeardownDataChannelTransport_n();
});
if (has_sctp)
pc_->ResetSctpDataMid();

View File

@ -29,23 +29,31 @@ class FakeDataChannelController
transport_available_(false),
ready_to_send_(false),
transport_error_(false) {}
virtual ~FakeDataChannelController() {}
~FakeDataChannelController() override {
network_thread_->BlockingCall([&] {
RTC_DCHECK_RUN_ON(network_thread_);
weak_factory_.InvalidateWeakPtrs();
});
}
rtc::WeakPtr<FakeDataChannelController> weak_ptr() {
RTC_DCHECK_RUN_ON(network_thread_);
return weak_factory_.GetWeakPtr();
}
rtc::scoped_refptr<webrtc::SctpDataChannel> CreateDataChannel(
absl::string_view label,
webrtc::InternalDataChannelInit init) {
rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
// Explicitly associate the weak ptr instance with the current thread to
// catch early any inappropriate referencing of it on the network thread.
RTC_CHECK(my_weak_ptr);
rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
network_thread_->BlockingCall([&]() {
RTC_DCHECK_RUN_ON(network_thread_);
rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
// Explicitly associate the weak ptr instance with the current thread
// to catch early any inappropriate referencing of it on the network
// thread.
RTC_CHECK(my_weak_ptr);
rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
webrtc::SctpDataChannel::Create(
std::move(my_weak_ptr), std::string(label),
@ -54,17 +62,16 @@ class FakeDataChannelController
if (transport_available_ && channel->sid_n().HasValue()) {
AddSctpDataStream(channel->sid_n());
}
if (ready_to_send_) {
network_thread_->PostTask([channel = channel] {
if (channel->state() !=
webrtc::DataChannelInterface::DataState::kClosed) {
channel->OnTransportReady();
}
});
}
return channel;
});
if (ready_to_send_) {
signaling_thread_->PostTask(
SafeTask(signaling_safety_.flag(), [channel = channel] {
if (channel->state() !=
webrtc::DataChannelInterface::DataState::kClosed) {
channel->OnTransportReady();
}
}));
}
connected_channels_.insert(channel.get());
return channel;
}
@ -72,6 +79,7 @@ class FakeDataChannelController
webrtc::RTCError SendData(webrtc::StreamId sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) override {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(ready_to_send_);
RTC_CHECK(transport_available_);
if (send_blocked_) {
@ -100,17 +108,14 @@ class FakeDataChannelController
RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
known_stream_ids_.erase(sid);
signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
// Unlike the real SCTP transport, act like the closing procedure finished
// instantly.
auto it = absl::c_find_if(connected_channels_, [&](const auto* c) {
return c->sid_s() == sid;
});
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
if (it != connected_channels_.end())
(*it)->OnClosingProcedureComplete();
}));
// Unlike the real SCTP transport, act like the closing procedure finished
// instantly.
auto it = absl::c_find_if(connected_channels_,
[&](const auto* c) { return c->sid_n() == sid; });
// This path mimics the DCC's OnChannelClosed handler since the FDCC
// (this class) doesn't have a transport that would do that.
if (it != connected_channels_.end())
(*it)->OnClosingProcedureComplete();
}
void OnChannelStateChanged(
@ -126,36 +131,40 @@ class FakeDataChannelController
// Set true to emulate the SCTP stream being blocked by congestion control.
void set_send_blocked(bool blocked) {
send_blocked_ = blocked;
if (!blocked) {
RTC_CHECK(transport_available_);
// Make a copy since `connected_channels_` may change while
// OnTransportReady is called.
auto copy = connected_channels_;
for (webrtc::SctpDataChannel* ch : copy) {
ch->OnTransportReady();
network_thread_->BlockingCall([&]() {
send_blocked_ = blocked;
if (!blocked) {
RTC_CHECK(transport_available_);
// Make a copy since `connected_channels_` may change while
// OnTransportReady is called.
auto copy = connected_channels_;
for (webrtc::SctpDataChannel* ch : copy) {
ch->OnTransportReady();
}
}
}
});
}
// Set true to emulate the transport channel creation, e.g. after
// setLocalDescription/setRemoteDescription called with data content.
void set_transport_available(bool available) {
transport_available_ = available;
network_thread_->BlockingCall([&]() { transport_available_ = available; });
}
// Set true to emulate the transport OnTransportReady signal when the
// transport becomes writable for the first time.
void set_ready_to_send(bool ready) {
RTC_CHECK(transport_available_);
ready_to_send_ = ready;
if (ready) {
std::set<webrtc::SctpDataChannel*>::iterator it;
for (it = connected_channels_.begin(); it != connected_channels_.end();
++it) {
(*it)->OnTransportReady();
network_thread_->BlockingCall([&]() {
ready_to_send_ = ready;
if (ready) {
std::set<webrtc::SctpDataChannel*>::iterator it;
for (it = connected_channels_.begin(); it != connected_channels_.end();
++it) {
(*it)->OnTransportReady();
}
}
}
});
}
void set_transport_error() { transport_error_ = true; }