From 7f16fcda0fd5bb625584b71311dd37b54c096136 Mon Sep 17 00:00:00 2001 From: Andrey Logvin Date: Wed, 5 Apr 2023 08:53:13 +0000 Subject: [PATCH] Revert "[DataChannel] Send and receive packets on the network thread." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit fe53fec24e02d2d644220f913c3f9ae596bbb2d9. Reason for revert: Speculative revert, may be breaking downstream project Original change's description: > [DataChannel] Send and receive packets on the network thread. > > This updates sctp channels, including work that happens between the > data channel controller and the transport, to run on the network > thread. Previously all network traffic related to data channels was > routed through the signaling thread before going to either the network > thread or the caller's thread (e.g. js thread in chrome). Now the > calls can go straight from the network thread to the JS thread with > enabling a special flag on the observer (see below) and similarly > calls to send data, involve 2 threads instead of 3. > > * Custom data channel observer adapter implementation that > maintains compatibility with existing observer implementations in > that notifications are delivered on the signaling thread. > The adapter can be explicitly disabled for implementations that > want to optimize the callback path and promise to not block the > network thread. > * Remove the signaling thread copy of data channels in the controller. > * Remove several PostTask operations that were needed to keep things > in sync (but the need has gone away). > * Update tests for the controller to consistently call > TeardownDataChannelTransport_n to match with production. > * Update stats collectors (current and legacy) to fetch the data > channel stats on the network thread where they're maintained. > * Remove the AsyncChannelCloseTeardown test since the async teardown > step has gone away. > * Remove `sid_s` in the channel code since we only need the network > state now. > * For the custom observer support (with and without data adapter) and > maintain compatibility with existing implementations, added a new > proxy macro that allows an implementation to selectively provide > its own implementation without being proxied. This is used for > registering/unregistering a data channel observer. > * Update the data channel proxy to map most methods to the network > thread, avoiding the interim jump to the signaling thread. > * Update a plethora of thread checkers from signaling to network. > > Bug: webrtc:11547 > Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142 > Commit-Queue: Tomas Gunnarsson > Reviewed-by: Henrik Boström > Cr-Commit-Position: refs/heads/main@{#39760} Bug: webrtc:11547 Change-Id: Id0d65594bf727ccea5c49093c942b09714d101ad No-Presubmit: true No-Tree-Checks: true No-Try: true Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300341 Auto-Submit: Andrey Logvin Owners-Override: Andrey Logvin Bot-Commit: rubber-stamper@appspot.gserviceaccount.com Commit-Queue: Mirko Bonadei Reviewed-by: Mirko Bonadei Cr-Commit-Position: refs/heads/main@{#39764} --- api/data_channel_interface.h | 11 - pc/data_channel_controller.cc | 351 +++++++++++++------------ pc/data_channel_controller.h | 23 +- pc/data_channel_controller_unittest.cc | 38 +++ pc/data_channel_unittest.cc | 133 ++++------ pc/legacy_stats_collector.cc | 20 +- pc/legacy_stats_collector.h | 8 +- pc/peer_connection.cc | 2 +- pc/proxy.h | 18 -- pc/rtc_stats_collector.cc | 7 +- pc/rtc_stats_collector.h | 2 +- pc/sctp_data_channel.cc | 298 ++++++--------------- pc/sctp_data_channel.h | 56 ++-- pc/sdp_offer_answer.cc | 38 +-- pc/test/fake_data_channel_controller.h | 95 +++---- 15 files changed, 477 insertions(+), 623 deletions(-) diff --git a/api/data_channel_interface.h b/api/data_channel_interface.h index 35ef8e4f0d..4f74918ff9 100644 --- a/api/data_channel_interface.h +++ b/api/data_channel_interface.h @@ -100,17 +100,6 @@ class DataChannelObserver { // The data channel's buffered_amount has changed. virtual void OnBufferedAmountChange(uint64_t sent_data_size) {} - // Override this to get callbacks directly on the network thread. - // An implementation that does that must not block the network thread - // but rather only use the callback to trigger asynchronous processing - // elsewhere as a result of the notification. - // The default return value, `false`, means that notifications will be - // delivered on the signaling thread associated with the peerconnection - // instance. - // TODO(webrtc:11547): Eventually all DataChannelObserver implementations - // should be called on the network thread and this method removed. - virtual bool IsOkToCallOnTheNetworkThread() { return false; } - protected: virtual ~DataChannelObserver() = default; }; diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index 20d5fe98b0..36e8be1411 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -29,15 +29,8 @@ DataChannelController::~DataChannelController() { } bool DataChannelController::HasDataChannelsForTest() const { - auto has_channels = [&] { - RTC_DCHECK_RUN_ON(network_thread()); - return !sctp_data_channels_n_.empty(); - }; - - if (network_thread()->IsCurrent()) - return has_channels(); - - return network_thread()->BlockingCall(std::move(has_channels)); + RTC_DCHECK_RUN_ON(signaling_thread()); + return !sctp_data_channels_.empty(); } bool DataChannelController::HasUsedDataChannels() const { @@ -73,15 +66,11 @@ void DataChannelController::RemoveSctpDataStream(StreamId sid) { void DataChannelController::OnChannelStateChanged( SctpDataChannel* channel, DataChannelInterface::DataState state) { - RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK_RUN_ON(signaling_thread()); if (state == DataChannelInterface::DataState::kClosed) OnSctpDataChannelClosed(channel); - signaling_thread()->PostTask( - SafeTask(signaling_safety_.flag(), - [this, channel_id = channel->internal_id(), state = state] { - pc_->OnSctpDataChannelStateChanged(channel_id, state); - })); + pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state); } void DataChannelController::OnDataReceived( @@ -93,22 +82,27 @@ void DataChannelController::OnDataReceived( if (HandleOpenMessage_n(channel_id, type, buffer)) return; - auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { - return c->sid_n().stream_id_int() == channel_id; - }); - - if (it != sctp_data_channels_n_.end()) - (*it)->OnDataReceived(type, buffer); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] { + RTC_DCHECK_RUN_ON(signaling_thread()); + // TODO(bugs.webrtc.org/11547): The data being received should be + // delivered on the network thread. + auto it = FindChannel(StreamId(channel_id)); + if (it != sctp_data_channels_.end()) + (*it)->OnDataReceived(type, buffer); + })); } void DataChannelController::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { - return c->sid_n().stream_id_int() == channel_id; - }); - - if (it != sctp_data_channels_n_.end()) - (*it)->OnClosingProcedureStartedRemotely(); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), [this, channel_id] { + RTC_DCHECK_RUN_ON(signaling_thread()); + // TODO(bugs.webrtc.org/11547): Should run on the network thread. + auto it = FindChannel(StreamId(channel_id)); + if (it != sctp_data_channels_.end()) + (*it)->OnClosingProcedureStartedRemotely(); + })); } void DataChannelController::OnChannelClosed(int channel_id) { @@ -118,44 +112,48 @@ void DataChannelController::OnChannelClosed(int channel_id) { auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { return c->sid_n() == sid; }); - if (it != sctp_data_channels_n_.end()) { - rtc::scoped_refptr channel = std::move(*it); + if (it != sctp_data_channels_n_.end()) sctp_data_channels_n_.erase(it); - channel->OnClosingProcedureComplete(); - } + + signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] { + RTC_DCHECK_RUN_ON(signaling_thread()); + auto it = FindChannel(sid); + // Remove the channel from our list, close it and free up resources. + if (it != sctp_data_channels_.end()) { + rtc::scoped_refptr channel = std::move(*it); + // Note: this causes OnSctpDataChannelClosed() to not do anything + // when called from within `OnClosingProcedureComplete`. + sctp_data_channels_.erase(it); + + channel->OnClosingProcedureComplete(); + } + })); } void DataChannelController::OnReadyToSend() { RTC_DCHECK_RUN_ON(network_thread()); - auto copy = sctp_data_channels_n_; - for (const auto& channel : copy) { - if (channel->sid_n().HasValue()) { + signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + auto copy = sctp_data_channels_; + for (const auto& channel : copy) channel->OnTransportReady(); - } else { - // This happens for role==SSL_SERVER channels when we get notified by - // the transport *before* the SDP code calls `AllocateSctpSids` to - // trigger assignment of sids. In this case OnTransportReady() will be - // called from within `AllocateSctpSids` below. - RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel."; - } - } + })); } void DataChannelController::OnTransportClosed(RTCError error) { RTC_DCHECK_RUN_ON(network_thread()); - // This loop will close all data channels and trigger a callback to - // `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so - // we create a local copy while we do the fan-out. - auto copy = sctp_data_channels_n_; - for (const auto& channel : copy) - channel->OnTransportChannelClosed(error); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), [this, error] { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportChannelClosed(error); + })); } void DataChannelController::SetupDataChannelTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); // There's a new data channel transport. This needs to be signaled to the - // `sctp_data_channels_n_` so that they can reopen and reconnect. This is + // `sctp_data_channels_` so that they can reopen and reconnect. This is // necessary when bundling is applied. NotifyDataChannelsOfTransportCreated(); } @@ -167,12 +165,11 @@ void DataChannelController::PrepareForShutdown() { void DataChannelController::TeardownDataChannelTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); - if (data_channel_transport_) { - data_channel_transport_->SetDataSink(nullptr); - set_data_channel_transport(nullptr); + if (data_channel_transport()) { + data_channel_transport()->SetDataSink(nullptr); } + set_data_channel_transport(nullptr); sctp_data_channels_n_.clear(); - weak_factory_.InvalidateWeakPtrs(); } void DataChannelController::OnTransportChanged( @@ -188,7 +185,7 @@ void DataChannelController::OnTransportChanged( new_data_channel_transport->SetDataSink(this); // There's a new data channel transport. This needs to be signaled to the - // `sctp_data_channels_n_` so that they can reopen and reconnect. This is + // `sctp_data_channels_` so that they can reopen and reconnect. This is // necessary when bundling is applied. NotifyDataChannelsOfTransportCreated(); } @@ -197,10 +194,10 @@ void DataChannelController::OnTransportChanged( std::vector DataChannelController::GetDataChannelStats() const { - RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK_RUN_ON(signaling_thread()); std::vector stats; - stats.reserve(sctp_data_channels_n_.size()); - for (const auto& channel : sctp_data_channels_n_) + stats.reserve(sctp_data_channels_.size()); + for (const auto& channel : sctp_data_channels_) stats.push_back(channel->GetStats()); return stats; } @@ -222,38 +219,28 @@ bool DataChannelController::HandleOpenMessage_n( << channel_id; } else { config.open_handshake_role = InternalDataChannelInit::kAcker; - auto channel_or_error = CreateDataChannel(label, config); - if (channel_or_error.ok()) { - signaling_thread()->PostTask(SafeTask( - signaling_safety_.flag(), - [this, channel = channel_or_error.MoveValue(), - ready_to_send = data_channel_transport_->IsReadyToSend()] { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnDataChannelOpenMessage(std::move(channel), ready_to_send); - })); - } else { - RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message." - << ToString(channel_or_error.error().type()); - } + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), + [this, label = std::move(label), config = std::move(config)] { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnDataChannelOpenMessage(label, config); + })); } return true; } void DataChannelController::OnDataChannelOpenMessage( - rtc::scoped_refptr channel, - bool ready_to_send) { - has_used_data_channels_ = true; - auto proxy = SctpDataChannel::CreateProxy(channel); - - pc_->Observer()->OnDataChannel(proxy); - pc_->NoteDataAddedEvent(); - - if (ready_to_send) { - network_thread()->PostTask([channel = std::move(channel)] { - if (channel->state() != DataChannelInterface::DataState::kClosed) - channel->OnTransportReady(); - }); + const std::string& label, + const InternalDataChannelInit& config) { + auto channel_or_error = InternalCreateDataChannelWithProxy(label, config); + if (!channel_or_error.ok()) { + RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message." + << ToString(channel_or_error.error().type()); + return; } + + pc_->Observer()->OnDataChannel(channel_or_error.MoveValue()); + pc_->NoteDataAddedEvent(); } // RTC_RUN_ON(network_thread()) @@ -282,31 +269,6 @@ RTCError DataChannelController::ReserveOrAllocateSid( return RTCError::OK(); } -// RTC_RUN_ON(network_thread()) -RTCErrorOr> -DataChannelController::CreateDataChannel(const std::string& label, - InternalDataChannelInit& config) { - StreamId sid(config.id); - RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role); - if (!err.ok()) - return err; - - // In case `sid` has changed. Update `config` accordingly. - config.id = sid.stream_id_int(); - - rtc::scoped_refptr channel = SctpDataChannel::Create( - weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr, - config, signaling_thread(), network_thread()); - RTC_DCHECK(channel); - sctp_data_channels_n_.push_back(channel); - - // If we have an id already, notify the transport. - if (sid.HasValue()) - AddSctpDataStream(sid); - - return channel; -} - RTCErrorOr> DataChannelController::InternalCreateDataChannelWithProxy( const std::string& label, @@ -321,25 +283,29 @@ DataChannelController::InternalCreateDataChannelWithProxy( bool ready_to_send = false; InternalDataChannelInit new_config = config; StreamId sid(new_config.id); + auto weak_ptr = weak_factory_.GetWeakPtr(); + RTC_DCHECK(weak_ptr); // Associate with current thread. auto ret = network_thread()->BlockingCall( [&]() -> RTCErrorOr> { RTC_DCHECK_RUN_ON(network_thread()); - auto channel = CreateDataChannel(label, new_config); - if (!channel.ok()) - return channel; + RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role); + if (!err.ok()) + return err; + + // In case `sid` has changed. Update `new_config` accordingly. + new_config.id = sid.stream_id_int(); ready_to_send = data_channel_transport_ && data_channel_transport_->IsReadyToSend(); - if (ready_to_send) { - // If the transport is ready to send because the initial channel - // ready signal may have been sent before the DataChannel creation. - // This has to be done async because the upper layer objects (e.g. - // Chrome glue and WebKit) are not wired up properly until after - // `InternalCreateDataChannelWithProxy` returns. - network_thread()->PostTask([channel = channel.value()] { - if (channel->state() != DataChannelInterface::DataState::kClosed) - channel->OnTransportReady(); - }); - } + + rtc::scoped_refptr channel(SctpDataChannel::Create( + std::move(weak_ptr), label, data_channel_transport_ != nullptr, + new_config, signaling_thread(), network_thread())); + RTC_DCHECK(channel); + sctp_data_channels_n_.push_back(channel); + + // If we have an id already, notify the transport. + if (sid.HasValue()) + AddSctpDataStream(sid); return channel; }); @@ -347,71 +313,114 @@ DataChannelController::InternalCreateDataChannelWithProxy( if (!ret.ok()) return ret.MoveError(); + if (ready_to_send) { + // Checks if the transport is ready to send because the initial channel + // ready signal may have been sent before the DataChannel creation. + // This has to be done async because the upper layer objects (e.g. + // Chrome glue and WebKit) are not wired up properly until after this + // function returns. + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), [channel = ret.value()] { + if (channel->state() != DataChannelInterface::DataState::kClosed) + channel->OnTransportReady(); + })); + } + + sctp_data_channels_.push_back(ret.value()); has_used_data_channels_ = true; return SctpDataChannel::CreateProxy(ret.MoveValue()); } void DataChannelController::AllocateSctpSids(rtc::SSLRole role) { - RTC_DCHECK_RUN_ON(network_thread()); - - const bool ready_to_send = - data_channel_transport_ && data_channel_transport_->IsReadyToSend(); + RTC_DCHECK_RUN_ON(signaling_thread()); std::vector> channels_to_update; std::vector> channels_to_close; - for (auto it = sctp_data_channels_n_.begin(); - it != sctp_data_channels_n_.end();) { - if (!(*it)->sid_n().HasValue()) { - StreamId sid = sid_allocator_.AllocateSid(role); - if (sid.HasValue()) { - (*it)->SetSctpSid_n(sid); - AddSctpDataStream(sid); - if (ready_to_send) { - RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send."; - (*it)->OnTransportReady(); + + network_thread()->BlockingCall([&] { + RTC_DCHECK_RUN_ON(network_thread()); + for (auto it = sctp_data_channels_n_.begin(); + it != sctp_data_channels_n_.end();) { + if (!(*it)->sid_n().HasValue()) { + StreamId sid = sid_allocator_.AllocateSid(role); + if (sid.HasValue()) { + (*it)->SetSctpSid_n(sid); + AddSctpDataStream(sid); + channels_to_update.push_back(std::make_pair((*it).get(), sid)); + } else { + channels_to_close.push_back(std::move(*it)); + it = sctp_data_channels_n_.erase(it); + continue; } - channels_to_update.push_back(std::make_pair((*it).get(), sid)); - } else { - channels_to_close.push_back(std::move(*it)); - it = sctp_data_channels_n_.erase(it); - continue; } + ++it; } - ++it; - } + }); // Since closing modifies the list of channels, we have to do the actual // closing outside the loop. for (const auto& channel : channels_to_close) { channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID"); + // The channel should now have been removed from sctp_data_channels_. + RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) { + return c.get() == channel.get(); + }) == sctp_data_channels_.end()); + } + + for (auto& pair : channels_to_update) { + auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) { + return c.get() == pair.first; + }); + RTC_DCHECK(it != sctp_data_channels_.end()); + (*it)->SetSctpSid_s(pair.second); } } void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { - RTC_DCHECK_RUN_ON(network_thread()); - // After the closing procedure is done, it's safe to use this ID for - // another data channel. - if (channel->sid_n().HasValue()) { - sid_allocator_.ReleaseSid(channel->sid_n()); + RTC_DCHECK_RUN_ON(signaling_thread()); + + network_thread()->BlockingCall([&] { + RTC_DCHECK_RUN_ON(network_thread()); + // After the closing procedure is done, it's safe to use this ID for + // another data channel. + if (channel->sid_n().HasValue()) { + sid_allocator_.ReleaseSid(channel->sid_n()); + } + + auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { + return c.get() == channel; + }); + + if (it != sctp_data_channels_n_.end()) + sctp_data_channels_n_.erase(it); + }); + + for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end(); + ++it) { + if (it->get() == channel) { + // Since this method is triggered by a signal from the DataChannel, + // we can't free it directly here; we need to free it asynchronously. + rtc::scoped_refptr release = std::move(*it); + sctp_data_channels_.erase(it); + signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), + [release = std::move(release)] {})); + return; + } } - auto it = absl::c_find_if(sctp_data_channels_n_, - [&](const auto& c) { return c.get() == channel; }); - if (it != sctp_data_channels_n_.end()) - sctp_data_channels_n_.erase(it); } void DataChannelController::OnTransportChannelClosed(RTCError error) { - RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK_RUN_ON(signaling_thread()); // Use a temporary copy of the SCTP DataChannel list because the // DataChannel may callback to us and try to modify the list. // TODO(tommi): `OnTransportChannelClosed` is called from // `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before // `TeardownDataChannelTransport_n` is called (but on the network thread) from - // the same function. We can now get rid of this function - // (OnTransportChannelClosed) and run this loop from within the - // TeardownDataChannelTransport_n callback. + // the same function. Once `sctp_data_channels_` moves to the network thread, + // we can get rid of this function (OnTransportChannelClosed) and run this + // loop from within the TeardownDataChannelTransport_n callback. std::vector> temp_sctp_dcs; - temp_sctp_dcs.swap(sctp_data_channels_n_); + temp_sctp_dcs.swap(sctp_data_channels_); for (const auto& channel : temp_sctp_dcs) { channel->OnTransportChannelClosed(error); } @@ -435,10 +444,16 @@ RTCError DataChannelController::DataChannelSendData( StreamId sid, const SendDataParams& params, const rtc::CopyOnWriteBuffer& payload) { - RTC_DCHECK_RUN_ON(network_thread()); + // TODO(bugs.webrtc.org/11547): Expect method to be called on the network + // thread instead. Remove the BlockingCall() below and move associated state + // to the network thread. + RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK(data_channel_transport()); - return data_channel_transport()->SendData(sid.stream_id_int(), params, - payload); + + return network_thread()->BlockingCall([this, sid, params, payload] { + return data_channel_transport()->SendData(sid.stream_id_int(), params, + payload); + }); } void DataChannelController::NotifyDataChannelsOfTransportCreated() { @@ -448,8 +463,22 @@ void DataChannelController::NotifyDataChannelsOfTransportCreated() { for (const auto& channel : sctp_data_channels_n_) { if (channel->sid_n().HasValue()) AddSctpDataStream(channel->sid_n()); - channel->OnTransportChannelCreated(); } + + signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + for (const auto& channel : sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } + })); +} + +std::vector>::iterator +DataChannelController::FindChannel(StreamId stream_id) { + RTC_DCHECK_RUN_ON(signaling_thread()); + return absl::c_find_if(sctp_data_channels_, [&](const auto& c) { + return c->sid_s() == stream_id; + }); } rtc::Thread* DataChannelController::network_thread() const { diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index 074b1feadb..fa6c13e6ec 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -107,11 +107,6 @@ class DataChannelController : public SctpDataChannelControllerInterface, rtc::Thread* signaling_thread() const; private: - // Creates a new SctpDataChannel object on the network thread. - RTCErrorOr> CreateDataChannel( - const std::string& label, - InternalDataChannelInit& config) RTC_RUN_ON(network_thread()); - // Parses and handles open messages. Returns true if the message is an open // message and should be considered to be handled, false otherwise. bool HandleOpenMessage_n(int channel_id, @@ -119,8 +114,8 @@ class DataChannelController : public SctpDataChannelControllerInterface, const rtc::CopyOnWriteBuffer& buffer) RTC_RUN_ON(network_thread()); // Called when a valid data channel OPEN message is received. - void OnDataChannelOpenMessage(rtc::scoped_refptr channel, - bool ready_to_send) + void OnDataChannelOpenMessage(const std::string& label, + const InternalDataChannelInit& config) RTC_RUN_ON(signaling_thread()); // Accepts a `StreamId` which may be pre-negotiated or unassigned. For @@ -144,6 +139,9 @@ class DataChannelController : public SctpDataChannelControllerInterface, // (calls OnTransportChannelCreated on the signaling thread). void NotifyDataChannelsOfTransportCreated(); + std::vector>::iterator FindChannel( + StreamId stream_id); + // Plugin transport used for data channels. Pointer may be accessed and // checked from any thread, but the object may only be touched on the // network thread. @@ -151,16 +149,21 @@ class DataChannelController : public SctpDataChannelControllerInterface, // thread. DataChannelTransportInterface* data_channel_transport_ = nullptr; SctpSidAllocator sid_allocator_ RTC_GUARDED_BY(network_thread()); + std::vector> sctp_data_channels_ + RTC_GUARDED_BY(signaling_thread()); + // TODO(bugs.webrtc.org/11547): This vector will eventually take over from + // `sctp_data_channels_`. While we're migrating away from thread hops + // between the signaling and network threads, we need both, so this is + // a temporary situation. std::vector> sctp_data_channels_n_ RTC_GUARDED_BY(network_thread()); bool has_used_data_channels_ RTC_GUARDED_BY(signaling_thread()) = false; // Owning PeerConnection. PeerConnectionInternal* const pc_; - // The weak pointers must be dereferenced and invalidated on the network + // The weak pointers must be dereferenced and invalidated on the signalling // thread only. - rtc::WeakPtrFactory weak_factory_ - RTC_GUARDED_BY(network_thread()){this}; + rtc::WeakPtrFactory weak_factory_{this}; ScopedTaskSafety signaling_safety_; }; diff --git a/pc/data_channel_controller_unittest.cc b/pc/data_channel_controller_unittest.cc index fd3deae0d7..9b66faccd4 100644 --- a/pc/data_channel_controller_unittest.cc +++ b/pc/data_channel_controller_unittest.cc @@ -131,6 +131,44 @@ TEST_F(DataChannelControllerTest, CloseAfterControllerDestroyed) { channel->Close(); } +TEST_F(DataChannelControllerTest, AsyncChannelCloseTeardown) { + DataChannelControllerForTest dcc(pc_.get()); + auto ret = dcc.InternalCreateDataChannelWithProxy( + "label", InternalDataChannelInit(DataChannelInit())); + ASSERT_TRUE(ret.ok()); + auto channel = ret.MoveValue(); + SctpDataChannel* inner_channel = + DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting( + channel.get()); + // Grab a reference for testing purposes. + inner_channel->AddRef(); + + channel = nullptr; // dcc still holds a reference to `channel`. + EXPECT_TRUE(dcc.HasDataChannelsForTest()); + + // Trigger a Close() for the channel. This will send events back to dcc, + // eventually reaching `OnSctpDataChannelClosed` where dcc removes + // the channel from the internal list of data channels, but does not release + // the reference synchronously since that reference might be the last one. + inner_channel->Close(); + // Now there should be no tracked data channels. + EXPECT_FALSE(dcc.HasDataChannelsForTest()); + // But there should be an async operation queued that still holds a reference. + // That means that the test reference, must not be the last one. + ASSERT_NE(inner_channel->Release(), + rtc::RefCountReleaseStatus::kDroppedLastRef); + // Grab a reference again (using the pointer is safe since the object still + // exists and we control the single-threaded environment manually). + inner_channel->AddRef(); + // Now run the queued up async operations on the signaling (current) thread. + // This time, the reference formerly owned by dcc, should be release and the + // truly last reference is now held by the test. + run_loop_.Flush(); + // Check that this is the last reference. + EXPECT_EQ(inner_channel->Release(), + rtc::RefCountReleaseStatus::kDroppedLastRef); +} + // Allocate the maximum number of data channels and then one more. // The last allocation should fail. TEST_F(DataChannelControllerTest, MaxChannels) { diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc index 2582561282..fe75d385f3 100644 --- a/pc/data_channel_unittest.cc +++ b/pc/data_channel_unittest.cc @@ -40,19 +40,18 @@ static constexpr int kDefaultTimeout = 10000; class FakeDataChannelObserver : public DataChannelObserver { public: - FakeDataChannelObserver() { - // This implementation relies on the SctpDataChannel::ObserverAdapter - // implementation to post events to the signaling thread. - RTC_DCHECK(!IsOkToCallOnTheNetworkThread()); - } + FakeDataChannelObserver() + : messages_received_(0), + on_state_change_count_(0), + on_buffered_amount_change_count_(0) {} - void OnStateChange() override { ++on_state_change_count_; } + void OnStateChange() { ++on_state_change_count_; } - void OnBufferedAmountChange(uint64_t previous_amount) override { + void OnBufferedAmountChange(uint64_t previous_amount) { ++on_buffered_amount_change_count_; } - void OnMessage(const DataBuffer& buffer) override { ++messages_received_; } + void OnMessage(const DataBuffer& buffer) { ++messages_received_; } size_t messages_received() const { return messages_received_; } @@ -69,9 +68,9 @@ class FakeDataChannelObserver : public DataChannelObserver { } private: - size_t messages_received_ = 0u; - size_t on_state_change_count_ = 0u; - size_t on_buffered_amount_change_count_ = 0u; + size_t messages_received_; + size_t on_state_change_count_; + size_t on_buffered_amount_change_count_; }; class SctpDataChannelTest : public ::testing::Test { @@ -94,17 +93,11 @@ class SctpDataChannelTest : public ::testing::Test { void SetChannelReady() { controller_->set_transport_available(true); - StreamId sid(0); - network_thread_.BlockingCall([&]() { - RTC_DCHECK_RUN_ON(&network_thread_); - if (!inner_channel_->sid_n().HasValue()) { - inner_channel_->SetSctpSid_n(sid); - controller_->AddSctpDataStream(sid); - } - inner_channel_->OnTransportChannelCreated(); - }); + inner_channel_->OnTransportChannelCreated(); + if (!inner_channel_->sid_s().HasValue()) { + SetChannelSid(inner_channel_, StreamId(0)); + } controller_->set_ready_to_send(true); - run_loop_.Flush(); } // TODO(bugs.webrtc.org/11547): This mirrors what the DataChannelController @@ -115,10 +108,9 @@ class SctpDataChannelTest : public ::testing::Test { void SetChannelSid(const rtc::scoped_refptr& channel, StreamId sid) { RTC_DCHECK(sid.HasValue()); - network_thread_.BlockingCall([&]() { - channel->SetSctpSid_n(sid); - controller_->AddSctpDataStream(sid); - }); + network_thread_.BlockingCall( + [&]() { controller_->AddSctpDataStream(sid); }); + channel->SetSctpSid_s(sid); } void AddObserver() { @@ -154,13 +146,11 @@ TEST_F(SctpDataChannelTest, VerifyConfigurationGetters) { // Check the non-const part of the configuration. EXPECT_EQ(channel_->id(), init_.id); - network_thread_.BlockingCall( - [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId()); }); + EXPECT_EQ(inner_channel_->sid_s(), StreamId()); SetChannelReady(); EXPECT_EQ(channel_->id(), 0); - network_thread_.BlockingCall( - [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId(0)); }); + EXPECT_EQ(inner_channel_->sid_s(), StreamId(0)); } // Verifies that the data channel is connected to the transport after creation. @@ -168,15 +158,13 @@ TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) { controller_->set_transport_available(true); rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", init_); - EXPECT_TRUE(controller_->IsConnected(dc.get())); + EXPECT_TRUE(controller_->IsConnected(dc.get())); // The sid is not set yet, so it should not have added the streams. - StreamId sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); }); - EXPECT_FALSE(controller_->IsStreamAdded(sid)); + EXPECT_FALSE(controller_->IsStreamAdded(dc->sid_s())); SetChannelSid(dc, StreamId(0)); - sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); }); - EXPECT_TRUE(controller_->IsStreamAdded(sid)); + EXPECT_TRUE(controller_->IsStreamAdded(dc->sid_s())); } // Tests the state of the data channel. @@ -195,7 +183,7 @@ TEST_F(SctpDataChannelTest, StateTransition) { channel_->Close(); // The (simulated) transport close notifications runs on the network thread // and posts a completion notification to the signaling (current) thread. - // Allow that operation to complete before checking the state. + // Allow that ooperation to complete before checking the state. run_loop_.Flush(); EXPECT_EQ(DataChannelInterface::kClosed, channel_->state()); EXPECT_EQ(observer_->on_state_change_count(), 3u); @@ -213,7 +201,6 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { EXPECT_TRUE(channel_->Send(buffer)); size_t successful_send_count = 1; - run_loop_.Flush(); EXPECT_EQ(0U, channel_->buffered_amount()); EXPECT_EQ(successful_send_count, observer_->on_buffered_amount_change_count()); @@ -230,7 +217,6 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { observer_->on_buffered_amount_change_count()); controller_->set_send_blocked(false); - run_loop_.Flush(); successful_send_count += number_of_packets; EXPECT_EQ(0U, channel_->buffered_amount()); EXPECT_EQ(successful_send_count, @@ -351,9 +337,10 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) { SetChannelReady(); InternalDataChannelInit init; init.id = 1; - auto dc = webrtc::SctpDataChannel::CreateProxy( - controller_->CreateDataChannel("test1", init)); - EXPECT_EQ(DataChannelInterface::kOpen, dc->state()); + rtc::scoped_refptr dc = + controller_->CreateDataChannel("test1", init); + EXPECT_EQ(DataChannelInterface::kConnecting, dc->state()); + EXPECT_TRUE_WAIT(DataChannelInterface::kOpen == dc->state(), 1000); } // Tests that an unordered DataChannel sends data as ordered until the OPEN_ACK @@ -365,23 +352,21 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) { init.ordered = false; rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", init); - auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); // Sends a message and verifies it's ordered. DataBuffer buffer("some data"); - ASSERT_TRUE(proxy->Send(buffer)); + ASSERT_TRUE(dc->Send(buffer)); EXPECT_TRUE(controller_->last_send_data_params().ordered); // Emulates receiving an OPEN_ACK message. rtc::CopyOnWriteBuffer payload; WriteDataChannelOpenAckMessage(&payload); - network_thread_.BlockingCall( - [&] { dc->OnDataReceived(DataMessageType::kControl, payload); }); + dc->OnDataReceived(DataMessageType::kControl, payload); // Sends another message and verifies it's unordered. - ASSERT_TRUE(proxy->Send(buffer)); + ASSERT_TRUE(dc->Send(buffer)); EXPECT_FALSE(controller_->last_send_data_params().ordered); } @@ -394,17 +379,15 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) { init.ordered = false; rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", init); - auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); // Emulates receiving a DATA message. DataBuffer buffer("data"); - network_thread_.BlockingCall( - [&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); }); + dc->OnDataReceived(DataMessageType::kText, buffer.data); // Sends a message and verifies it's unordered. - ASSERT_TRUE(proxy->Send(buffer)); + ASSERT_TRUE(dc->Send(buffer)); EXPECT_FALSE(controller_->last_send_data_params().ordered); } @@ -457,10 +440,7 @@ TEST_F(SctpDataChannelTest, ReceiveDataWithValidId) { AddObserver(); DataBuffer buffer("abcd"); - network_thread_.BlockingCall([&] { - inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data); - }); - run_loop_.Flush(); + inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data); EXPECT_EQ(1U, observer_->messages_received()); } @@ -475,9 +455,8 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) { SetChannelReady(); rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", config); - auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); EXPECT_EQ(0, controller_->last_sid()); } @@ -501,10 +480,9 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) { EXPECT_EQ(0U, channel_->bytes_received()); // Receive three buffers while data channel isn't open. - network_thread_.BlockingCall([&] { - for (int i : {0, 1, 2}) - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data); - }); + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[0].data); + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[1].data); + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[2].data); EXPECT_EQ(0U, observer_->messages_received()); EXPECT_EQ(0U, channel_->messages_received()); EXPECT_EQ(0U, channel_->bytes_received()); @@ -518,11 +496,9 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) { EXPECT_EQ(bytes_received, channel_->bytes_received()); // Receive three buffers while open. - network_thread_.BlockingCall([&] { - for (int i : {3, 4, 5}) - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data); - }); - run_loop_.Flush(); + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[3].data); + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[4].data); + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[5].data); bytes_received += buffers[3].size() + buffers[4].size() + buffers[5].size(); EXPECT_EQ(6U, observer_->messages_received()); EXPECT_EQ(6U, channel_->messages_received()); @@ -540,9 +516,8 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) { SetChannelReady(); rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", config); - auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); EXPECT_EQ(config.id, controller_->last_sid()); EXPECT_EQ(DataMessageType::kControl, @@ -579,8 +554,9 @@ TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) { EXPECT_TRUE(channel_->Send(packet)); } - // The sending buffer should be full, `Send()` returns false. + // The sending buffer shoul be full, send returns false. EXPECT_FALSE(channel_->Send(packet)); + EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state()); } @@ -604,12 +580,10 @@ TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) { rtc::CopyOnWriteBuffer buffer(1024); memset(buffer.MutableData(), 0, buffer.size()); - network_thread_.BlockingCall([&] { - // Receiving data without having an observer will overflow the buffer. - for (size_t i = 0; i < 16 * 1024 + 1; ++i) { - inner_channel_->OnDataReceived(DataMessageType::kText, buffer); - } - }); + // Receiving data without having an observer will overflow the buffer. + for (size_t i = 0; i < 16 * 1024 + 1; ++i) { + inner_channel_->OnDataReceived(DataMessageType::kText, buffer); + } EXPECT_EQ(DataChannelInterface::kClosed, channel_->state()); EXPECT_FALSE(channel_->error().ok()); EXPECT_EQ(RTCErrorType::RESOURCE_EXHAUSTED, channel_->error().type()); @@ -630,8 +604,7 @@ TEST_F(SctpDataChannelTest, SendEmptyData) { // Tests that a channel can be closed without being opened or assigned an sid. TEST_F(SctpDataChannelTest, NeverOpened) { controller_->set_transport_available(true); - network_thread_.BlockingCall( - [&] { inner_channel_->OnTransportChannelCreated(); }); + inner_channel_->OnTransportChannelCreated(); channel_->Close(); } @@ -661,8 +634,7 @@ TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) { // transition to the "closed" state. RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, ""); error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); - network_thread_.BlockingCall( - [&] { inner_channel_->OnTransportChannelClosed(error); }); + inner_channel_->OnTransportChannelClosed(error); controller_.reset(nullptr); EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), kDefaultTimeout); @@ -682,8 +654,7 @@ TEST_F(SctpDataChannelTest, TransportGotErrorCode) { error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); error.set_sctp_cause_code( static_cast(cricket::SctpErrorCauseCode::kProtocolViolation)); - network_thread_.BlockingCall( - [&] { inner_channel_->OnTransportChannelClosed(error); }); + inner_channel_->OnTransportChannelClosed(error); controller_.reset(nullptr); EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), kDefaultTimeout); diff --git a/pc/legacy_stats_collector.cc b/pc/legacy_stats_collector.cc index 7ce85e411c..6829e359b8 100644 --- a/pc/legacy_stats_collector.cc +++ b/pc/legacy_stats_collector.cc @@ -670,7 +670,7 @@ void LegacyStatsCollector::UpdateStats( // to fetch stats, then applies them on the signaling thread. See if we need // to do this synchronously or if updating the stats without blocking is safe. std::map transport_names_by_mid = - ExtractSessionAndDataInfo(); + ExtractSessionInfo(); // TODO(tommi): All of these hop over to the worker thread to fetch // information. We could post a task to run all of these and post @@ -681,6 +681,7 @@ void LegacyStatsCollector::UpdateStats( ExtractBweInfo(); ExtractMediaInfo(transport_names_by_mid); ExtractSenderInfo(); + ExtractDataInfo(); UpdateTrackReports(); } @@ -855,26 +856,19 @@ StatsReport* LegacyStatsCollector::AddCandidateReport( return report; } -std::map -LegacyStatsCollector::ExtractSessionAndDataInfo() { - TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionAndDataInfo"); +std::map LegacyStatsCollector::ExtractSessionInfo() { + TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionInfo"); RTC_DCHECK_RUN_ON(pc_->signaling_thread()); SessionStats stats; - StatsCollection::Container data_report_collection; auto transceivers = pc_->GetTransceiversInternal(); pc_->network_thread()->BlockingCall( [&, sctp_transport_name = pc_->sctp_transport_name(), sctp_mid = pc_->sctp_mid()]() mutable { stats = ExtractSessionInfo_n( transceivers, std::move(sctp_transport_name), std::move(sctp_mid)); - StatsCollection data_reports; - ExtractDataInfo_n(&data_reports); - data_report_collection = data_reports.DetachCollection(); }); - reports_.MergeCollection(std::move(data_report_collection)); - ExtractSessionInfo_s(stats); return std::move(stats.transport_names_by_mid); @@ -1298,8 +1292,8 @@ void LegacyStatsCollector::ExtractSenderInfo() { } } -void LegacyStatsCollector::ExtractDataInfo_n(StatsCollection* reports) { - RTC_DCHECK_RUN_ON(pc_->network_thread()); +void LegacyStatsCollector::ExtractDataInfo() { + RTC_DCHECK_RUN_ON(pc_->signaling_thread()); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; @@ -1307,7 +1301,7 @@ void LegacyStatsCollector::ExtractDataInfo_n(StatsCollection* reports) { for (const auto& stats : data_stats) { StatsReport::Id id(StatsReport::NewTypedIntId( StatsReport::kStatsReportTypeDataChannel, stats.id)); - StatsReport* report = reports->ReplaceOrAddNew(id); + StatsReport* report = reports_.ReplaceOrAddNew(id); report->set_timestamp(stats_gathering_started_); report->AddString(StatsReport::kStatsValueNameLabel, stats.label); // Filter out the initial id (-1). diff --git a/pc/legacy_stats_collector.h b/pc/legacy_stats_collector.h index e905b39d48..cedd36c853 100644 --- a/pc/legacy_stats_collector.h +++ b/pc/legacy_stats_collector.h @@ -165,13 +165,11 @@ class LegacyStatsCollector : public LegacyStatsCollectorInterface { const StatsReport::Id& channel_report_id, const cricket::ConnectionInfo& info); - void ExtractDataInfo_n(StatsCollection* reports); + void ExtractDataInfo(); // Returns the `transport_names_by_mid` member from the SessionStats as - // gathered and used to populate the stats. Contains one synchronous hop - // to the network thread to get this information along with querying data - // channel stats at the same time and populating `reports_`. - std::map ExtractSessionAndDataInfo(); + // gathered and used to populate the stats. + std::map ExtractSessionInfo(); void ExtractBweInfo(); void ExtractMediaInfo( diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 82c5914a52..fdbd32bbce 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -2286,7 +2286,7 @@ bool PeerConnection::GetTransportDescription( } std::vector PeerConnection::GetDataChannelStats() const { - RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK_RUN_ON(signaling_thread()); return data_channel_controller_.GetDataChannelStats(); } diff --git a/pc/proxy.h b/pc/proxy.h index ebe60c0293..2be115fdf3 100644 --- a/pc/proxy.h +++ b/pc/proxy.h @@ -450,24 +450,6 @@ class ConstMethodCall { return c_->method(); \ } -// Allows a custom implementation of a method where the otherwise proxied -// implementation can do a more efficient, yet thread-safe, job than the proxy -// can do by default or when more flexibility is needed than can be provided -// by a proxy. -// Note that calls to these methods should be expected to be made from unknown -// threads. -#define BYPASS_PROXY_METHOD0(r, method) \ - r method() override { \ - TRACE_BOILERPLATE(method); \ - return c_->method(); \ - } - -// The 1 argument version of `BYPASS_PROXY_METHOD0`. -#define BYPASS_PROXY_METHOD1(r, method, t1) \ - r method(t1 a1) override { \ - TRACE_BOILERPLATE(method); \ - return c_->method(std::move(a1)); \ - } } // namespace webrtc #endif // PC_PROXY_H_ diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc index 574e1cc6ec..429e9d0268 100644 --- a/pc/rtc_stats_collector.cc +++ b/pc/rtc_stats_collector.cc @@ -1499,6 +1499,7 @@ void RTCStatsCollector::ProducePartialResultsOnSignalingThreadImpl( RTC_DCHECK_RUN_ON(signaling_thread_); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; + ProduceDataChannelStats_s(timestamp, partial_report); ProduceMediaStreamStats_s(timestamp, partial_report); ProduceMediaStreamTrackStats_s(timestamp, partial_report); ProduceMediaSourceStats_s(timestamp, partial_report); @@ -1518,8 +1519,6 @@ void RTCStatsCollector::ProducePartialResultsOnNetworkThread( // `network_report_event_` is reset before this method is invoked. network_report_ = RTCStatsReport::Create(timestamp); - ProduceDataChannelStats_n(timestamp, network_report_.get()); - std::set transport_names; if (sctp_transport_name) { transport_names.emplace(std::move(*sctp_transport_name)); @@ -1654,10 +1653,10 @@ void RTCStatsCollector::ProduceCertificateStats_n( } } -void RTCStatsCollector::ProduceDataChannelStats_n( +void RTCStatsCollector::ProduceDataChannelStats_s( Timestamp timestamp, RTCStatsReport* report) const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; std::vector data_stats = pc_->GetDataChannelStats(); for (const auto& stats : data_stats) { diff --git a/pc/rtc_stats_collector.h b/pc/rtc_stats_collector.h index 34962bf5f7..ac0453fb7c 100644 --- a/pc/rtc_stats_collector.h +++ b/pc/rtc_stats_collector.h @@ -186,7 +186,7 @@ class RTCStatsCollector : public rtc::RefCountInterface { const std::map& transport_cert_stats, RTCStatsReport* report) const; // Produces `RTCDataChannelStats`. - void ProduceDataChannelStats_n(Timestamp timestamp, + void ProduceDataChannelStats_s(Timestamp timestamp, RTCStatsReport* report) const; // Produces `RTCIceCandidatePairStats` and `RTCIceCandidateStats`. void ProduceIceCandidateAndPairStats_n( diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc index 892eca9aa7..623a153067 100644 --- a/pc/sctp_data_channel.cc +++ b/pc/sctp_data_channel.cc @@ -38,8 +38,8 @@ int GenerateUniqueId() { // Define proxy for DataChannelInterface. BEGIN_PROXY_MAP(DataChannel) PROXY_PRIMARY_THREAD_DESTRUCTOR() -BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*) -BYPASS_PROXY_METHOD0(void, UnregisterObserver) +PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*) +PROXY_METHOD0(void, UnregisterObserver) BYPASS_PROXY_CONSTMETHOD0(std::string, label) BYPASS_PROXY_CONSTMETHOD0(bool, reliable) BYPASS_PROXY_CONSTMETHOD0(bool, ordered) @@ -50,18 +50,20 @@ BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxPacketLifeTime) BYPASS_PROXY_CONSTMETHOD0(std::string, protocol) BYPASS_PROXY_CONSTMETHOD0(bool, negotiated) // Can't bypass the proxy since the id may change. -PROXY_SECONDARY_CONSTMETHOD0(int, id) +PROXY_CONSTMETHOD0(int, id) BYPASS_PROXY_CONSTMETHOD0(Priority, priority) -BYPASS_PROXY_CONSTMETHOD0(DataState, state) -PROXY_SECONDARY_CONSTMETHOD0(RTCError, error) -PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent) -PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent) -PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received) -PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received) -PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount) -PROXY_SECONDARY_METHOD0(void, Close) -PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&) +PROXY_CONSTMETHOD0(DataState, state) +PROXY_CONSTMETHOD0(RTCError, error) +PROXY_CONSTMETHOD0(uint32_t, messages_sent) +PROXY_CONSTMETHOD0(uint64_t, bytes_sent) +PROXY_CONSTMETHOD0(uint32_t, messages_received) +PROXY_CONSTMETHOD0(uint64_t, bytes_received) +PROXY_CONSTMETHOD0(uint64_t, buffered_amount) +PROXY_METHOD0(void, Close) +// TODO(bugs.webrtc.org/11547): Change to run on the network thread. +PROXY_METHOD1(bool, Send, const DataBuffer&) END_PROXY_MAP(DataChannel) + } // namespace InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base) @@ -140,78 +142,6 @@ void SctpSidAllocator::ReleaseSid(StreamId sid) { used_sids_.erase(sid); } -// A DataChannelObserver implementation that offers backwards compatibility with -// implementations that aren't yet ready to be called back on the network -// thread. This implementation posts events to the signaling thread where -// events are delivered. -// In the class, and together with the `SctpDataChannel` implementation, there's -// special handling for the `state()` property whereby if that property is -// queried on the channel object while inside an event callback, we return -// the state that was active at the time the event was issued. This is to avoid -// a problem with calling the `state()` getter on the proxy, which would do -// a blocking call to the network thread, effectively flushing operations on -// the network thread that could cause the state to change and eventually return -// a misleading or arguably, wrong, state value to the callback implementation. -// As a future improvement to the ObserverAdapter, we could do the same for -// other properties that need to be read on the network thread. Eventually -// all implementations should expect to be called on the network thread though -// and the ObserverAdapter no longer be necessary. -class SctpDataChannel::ObserverAdapter : public DataChannelObserver { - public: - explicit ObserverAdapter(DataChannelObserver* delegate, - SctpDataChannel* channel) - : delegate_(delegate), channel_(channel) {} - - bool IsInsideStateNotification() const { - RTC_DCHECK_RUN_ON(signaling_thread()); - return inside_state_change_; - } - - DataChannelInterface::DataState cached_state() const { - RTC_DCHECK_RUN_ON(signaling_thread()); - RTC_DCHECK(IsInsideStateNotification()); - return cached_state_; - } - - private: - void OnStateChange() override { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - SafeTask(safety_.flag(), [this, new_state = channel_->state()] { - RTC_DCHECK_RUN_ON(signaling_thread()); - cached_state_ = new_state; - inside_state_change_ = true; - delegate_->OnStateChange(); - inside_state_change_ = false; - })); - } - - void OnMessage(const DataBuffer& buffer) override { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - SafeTask(safety_.flag(), - [this, buffer = buffer] { delegate_->OnMessage(buffer); })); - } - - void OnBufferedAmountChange(uint64_t sent_data_size) override { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - SafeTask(safety_.flag(), [this, sent_data_size] { - delegate_->OnBufferedAmountChange(sent_data_size); - })); - } - - rtc::Thread* signaling_thread() const { return channel_->signaling_thread_; } - rtc::Thread* network_thread() const { return channel_->network_thread_; } - - DataChannelObserver* const delegate_; - SctpDataChannel* const channel_; - ScopedTaskSafety safety_; - bool inside_state_change_ RTC_GUARDED_BY(signaling_thread()) = false; - DataChannelInterface::DataState cached_state_ - RTC_GUARDED_BY(signaling_thread()) = DataChannelInterface::kConnecting; -}; - // static rtc::scoped_refptr SctpDataChannel::Create( rtc::WeakPtr controller, @@ -245,6 +175,7 @@ SctpDataChannel::SctpDataChannel( rtc::Thread* network_thread) : signaling_thread_(signaling_thread), network_thread_(network_thread), + id_s_(config.id), id_n_(config.id), internal_id_(GenerateUniqueId()), label_(label), @@ -276,81 +207,19 @@ SctpDataChannel::SctpDataChannel( } } -SctpDataChannel::~SctpDataChannel() {} +SctpDataChannel::~SctpDataChannel() { + RTC_DCHECK_RUN_ON(signaling_thread_); +} void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) { - // Note: at this point, we do not know on which thread we're being called - // from since this method bypasses the proxy. On Android in particular, - // registration methods are called from unknown threads. - - // Check if we should set up an observer adapter that will make sure that - // callbacks are delivered on the signaling thread rather than directly - // on the network thread. - const auto* current_thread = rtc::Thread::Current(); - // TODO(webrtc:11547): Eventually all DataChannelObserver implementations - // should be called on the network thread and IsOkToCallOnTheNetworkThread(). - if (!observer->IsOkToCallOnTheNetworkThread()) { - auto prepare_observer = [&]() { - RTC_DCHECK(!observer_adapter_); - observer_adapter_ = std::make_unique(observer, this); - return observer_adapter_.get(); - }; - // Instantiate the adapter in the right context and then substitute the - // observer pointer the SctpDataChannel will call back on, with the adapter. - if (signaling_thread_ == current_thread) { - observer = prepare_observer(); - } else { - observer = signaling_thread_->BlockingCall(std::move(prepare_observer)); - } - } - - // Now do the observer registration on the network thread. - auto register_observer = [&] { - RTC_DCHECK_RUN_ON(network_thread_); - observer_ = observer; - DeliverQueuedReceivedData(); - }; - - if (network_thread_ == current_thread) { - register_observer(); - } else { - network_thread_->BlockingCall(std::move(register_observer)); - } + RTC_DCHECK_RUN_ON(signaling_thread_); + observer_ = observer; + DeliverQueuedReceivedData(); } void SctpDataChannel::UnregisterObserver() { - // Note: As with `RegisterObserver`, the proxy is being bypassed. - const auto* current_thread = rtc::Thread::Current(); - // Callers must not be invoking the unregistration from the network thread - // (assuming a multi-threaded environment where we have a dedicated network - // thread). That would indicate non-network related work happening on the - // network thread or that unregistration is being done from within a callback - // (without unwinding the stack, which is a requirement). - // The network thread is not allowed to make blocking calls to the signaling - // thread, so that would blow up if attempted. Since we support an adapter - // for observers that are not safe to call on the network thread, we do - // need to check+free it on the signaling thread. - RTC_DCHECK(current_thread != network_thread_ || - network_thread_ == signaling_thread_); - - auto unregister_observer = [&] { - RTC_DCHECK_RUN_ON(network_thread_); - observer_ = nullptr; - }; - - if (current_thread == network_thread_) { - unregister_observer(); - } else { - network_thread_->BlockingCall(std::move(unregister_observer)); - } - - auto clear_observer = [&]() { observer_adapter_.reset(); }; - - if (current_thread != signaling_thread_) { - signaling_thread_->BlockingCall(std::move(clear_observer)); - } else { - clear_observer(); - } + RTC_DCHECK_RUN_ON(signaling_thread_); + observer_ = nullptr; } std::string SctpDataChannel::label() const { @@ -392,11 +261,8 @@ bool SctpDataChannel::negotiated() const { } int SctpDataChannel::id() const { - RTC_DCHECK_RUN_ON(network_thread_); - // TODO(tommi): Once an ID has been assigned, it won't change (can be - // considered const). We could do special handling of this and allow bypassing - // the proxy so that we can return a valid id without thread hopping. - return id_n_.stream_id_int(); + RTC_DCHECK_RUN_ON(signaling_thread_); + return id_s_.stream_id_int(); } Priority SctpDataChannel::priority() const { @@ -404,12 +270,12 @@ Priority SctpDataChannel::priority() const { } uint64_t SctpDataChannel::buffered_amount() const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); return queued_send_data_.byte_count(); } void SctpDataChannel::Close() { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == kClosing || state_ == kClosed) return; SetState(kClosing); @@ -418,58 +284,40 @@ void SctpDataChannel::Close() { } SctpDataChannel::DataState SctpDataChannel::state() const { - // Note: The proxy is bypassed for the `state()` accessor. This is to allow - // observer callbacks to query what the new state is from within a state - // update notification without having to do a blocking call to the network - // thread from within a callback. This also makes it so that the returned - // state is guaranteed to be the new state that provoked the state change - // notification, whereby a blocking call to the network thread might end up - // getting put behind other messages on the network thread and eventually - // fetch a different state value (since pending messages might cause the - // state to change in the meantime). - const auto* current_thread = rtc::Thread::Current(); - if (current_thread == signaling_thread_) { - if (observer_adapter_ && observer_adapter_->IsInsideStateNotification()) - return observer_adapter_->cached_state(); - } - - auto return_state = [&] { - RTC_DCHECK_RUN_ON(network_thread_); - return state_; - }; - - return current_thread == network_thread_ - ? return_state() - : network_thread_->BlockingCall(std::move(return_state)); + RTC_DCHECK_RUN_ON(signaling_thread_); + return state_; } RTCError SctpDataChannel::error() const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); return error_; } uint32_t SctpDataChannel::messages_sent() const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); return messages_sent_; } uint64_t SctpDataChannel::bytes_sent() const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); return bytes_sent_; } uint32_t SctpDataChannel::messages_received() const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); return messages_received_; } uint64_t SctpDataChannel::bytes_received() const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); return bytes_received_; } bool SctpDataChannel::Send(const DataBuffer& buffer) { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); + // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network + // thread. Bring buffer management etc to the network thread and keep the + // operational state management on the signaling thread. if (state_ != kOpen) { return false; @@ -487,17 +335,25 @@ bool SctpDataChannel::Send(const DataBuffer& buffer) { return true; } +void SctpDataChannel::SetSctpSid_s(StreamId sid) { + RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK(!id_s_.HasValue()); + RTC_DCHECK(sid.HasValue()); + RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck); + RTC_DCHECK_EQ(state_, kConnecting); + + id_s_ = sid; +} + void SctpDataChannel::SetSctpSid_n(StreamId sid) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK(!id_n_.HasValue()); RTC_DCHECK(sid.HasValue()); - RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck); - RTC_DCHECK_EQ(state_, kConnecting); id_n_ = sid; } void SctpDataChannel::OnClosingProcedureStartedRemotely() { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ != kClosing && state_ != kClosed) { // Don't bother sending queued data since the side that initiated the // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy @@ -513,7 +369,7 @@ void SctpDataChannel::OnClosingProcedureStartedRemotely() { } void SctpDataChannel::OnClosingProcedureComplete() { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); // If the closing procedure is complete, we should have finished sending // all pending data and transitioned to kClosing already. RTC_DCHECK_EQ(state_, kClosing); @@ -522,12 +378,12 @@ void SctpDataChannel::OnClosingProcedureComplete() { } void SctpDataChannel::OnTransportChannelCreated() { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); + connected_to_transport_ = true; } void SctpDataChannel::OnTransportChannelClosed(RTCError error) { - RTC_DCHECK_RUN_ON(network_thread_); // The SctpTransport is unusable, which could come from multiple reasons: // - the SCTP m= section was rejected // - the DTLS transport is closed @@ -536,7 +392,7 @@ void SctpDataChannel::OnTransportChannelClosed(RTCError error) { } DataChannelStats SctpDataChannel::GetStats() const { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); DataChannelStats stats{internal_id_, id(), label(), protocol(), state(), messages_sent(), messages_received(), bytes_sent(), bytes_received()}; @@ -545,25 +401,25 @@ DataChannelStats SctpDataChannel::GetStats() const { void SctpDataChannel::OnDataReceived(DataMessageType type, const rtc::CopyOnWriteBuffer& payload) { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); if (type == DataMessageType::kControl) { if (handshake_state_ != kHandshakeWaitingForAck) { // Ignore it if we are not expecting an ACK message. RTC_LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, sid = " - << id_n_.stream_id_int(); + << id_s_.stream_id_int(); return; } if (ParseDataChannelOpenAckMessage(payload)) { // We can send unordered as soon as we receive the ACK message. handshake_state_ = kHandshakeReady; RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = " - << id_n_.stream_id_int(); + << id_s_.stream_id_int(); } else { RTC_LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = " - << id_n_.stream_id_int(); + << id_s_.stream_id_int(); } return; } @@ -572,7 +428,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type, type == DataMessageType::kText); RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " - << id_n_.stream_id_int(); + << id_s_.stream_id_int(); // We can send unordered as soon as we receive any DATA message since the // remote side must have received the OPEN (and old clients do not send // OPEN_ACK). @@ -603,7 +459,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type, } void SctpDataChannel::OnTransportReady() { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); // TODO(bugs.webrtc.org/11547): The transport is configured inside // `PeerConnection::SetupDataChannelTransport_n`, which results in @@ -616,7 +472,6 @@ void SctpDataChannel::OnTransportReady() { // be on for the below `Send*` calls, which currently do a BlockingCall // from the signaling thread to the network thread. RTC_DCHECK(connected_to_transport_); - RTC_DCHECK(id_n_.HasValue()); SendQueuedControlMessages(); SendQueuedDataMessages(); @@ -625,7 +480,7 @@ void SctpDataChannel::OnTransportReady() { } void SctpDataChannel::CloseAbruptlyWithError(RTCError error) { - RTC_DCHECK_RUN_ON(network_thread_); + RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == kClosed) { return; @@ -646,14 +501,13 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) { void SctpDataChannel::CloseAbruptlyWithDataChannelFailure( const std::string& message) { - RTC_DCHECK_RUN_ON(network_thread_); RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message); error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE); CloseAbruptlyWithError(std::move(error)); } -// RTC_RUN_ON(network_thread_). void SctpDataChannel::UpdateState() { + RTC_DCHECK_RUN_ON(signaling_thread_); // UpdateState determines what to do from a few state variables. Include // all conditions required for each state transition here for // clarity. OnTransportReady(true) will send any queued data and then invoke @@ -681,7 +535,7 @@ void SctpDataChannel::UpdateState() { DeliverQueuedReceivedData(); } } else { - RTC_DCHECK(!id_n_.HasValue()); + RTC_DCHECK(!id_s_.HasValue()); } break; } @@ -697,9 +551,11 @@ void SctpDataChannel::UpdateState() { // to complete; after calling RemoveSctpDataStream, // OnClosingProcedureComplete will end up called asynchronously // afterwards. - if (!started_closing_procedure_ && id_n_.HasValue()) { + if (!started_closing_procedure_ && id_s_.HasValue()) { started_closing_procedure_ = true; - controller_->RemoveSctpDataStream(id_n_); + network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] { + c->RemoveSctpDataStream(sid); + }); } } } else { @@ -716,8 +572,8 @@ void SctpDataChannel::UpdateState() { } } -// RTC_RUN_ON(network_thread_). void SctpDataChannel::SetState(DataState state) { + RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == state) { return; } @@ -731,8 +587,8 @@ void SctpDataChannel::SetState(DataState state) { controller_->OnChannelStateChanged(this, state_); } -// RTC_RUN_ON(network_thread_). void SctpDataChannel::DeliverQueuedReceivedData() { + RTC_DCHECK_RUN_ON(signaling_thread_); if (!observer_) { return; } @@ -745,8 +601,8 @@ void SctpDataChannel::DeliverQueuedReceivedData() { } } -// RTC_RUN_ON(network_thread_). void SctpDataChannel::SendQueuedDataMessages() { + RTC_DCHECK_RUN_ON(signaling_thread_); if (queued_send_data_.Empty()) { return; } @@ -763,9 +619,9 @@ void SctpDataChannel::SendQueuedDataMessages() { } } -// RTC_RUN_ON(network_thread_). bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) { + RTC_DCHECK_RUN_ON(signaling_thread_); SendDataParams send_params; if (!controller_) { return false; @@ -785,7 +641,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, send_params.type = buffer.binary ? DataMessageType::kBinary : DataMessageType::kText; - RTCError error = controller_->SendData(id_n_, send_params, buffer.data); + RTCError error = controller_->SendData(id_s_, send_params, buffer.data); if (error.ok()) { ++messages_sent_; @@ -813,8 +669,8 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, return false; } -// RTC_RUN_ON(network_thread_). bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); size_t start_buffered_amount = queued_send_data_.byte_count(); if (start_buffered_amount + buffer.size() > DataChannelInterface::MaxSendQueueSize()) { @@ -825,8 +681,8 @@ bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) { return true; } -// RTC_RUN_ON(network_thread_). void SctpDataChannel::SendQueuedControlMessages() { + RTC_DCHECK_RUN_ON(signaling_thread_); PacketQueue control_packets; control_packets.Swap(&queued_control_data_); @@ -836,10 +692,10 @@ void SctpDataChannel::SendQueuedControlMessages() { } } -// RTC_RUN_ON(network_thread_). bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(connected_to_transport_); - RTC_DCHECK(id_n_.HasValue()); + RTC_DCHECK(id_s_.HasValue()); RTC_DCHECK(controller_); bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; @@ -852,10 +708,10 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { send_params.ordered = ordered_ || is_open_message; send_params.type = DataMessageType::kControl; - RTCError err = controller_->SendData(id_n_, send_params, buffer); + RTCError err = controller_->SendData(id_s_, send_params, buffer); if (err.ok()) { RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel " - << id_n_.stream_id_int(); + << id_s_.stream_id_int(); if (handshake_state_ == kHandshakeShouldSendAck) { handshake_state_ = kHandshakeReady; diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h index fcf1ffe6ed..588b0cbf63 100644 --- a/pc/sctp_data_channel.h +++ b/pc/sctp_data_channel.h @@ -192,6 +192,7 @@ class SctpDataChannel : public DataChannelInterface { // Sets the SCTP sid and adds to transport layer if not set yet. Should only // be called once. + void SetSctpSid_s(StreamId sid); void SetSctpSid_n(StreamId sid); // The remote side started the closing procedure by resetting its outgoing @@ -215,6 +216,10 @@ class SctpDataChannel : public DataChannelInterface { // stats purposes (see also `GetStats()`). int internal_id() const { return internal_id_; } + StreamId sid_s() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return id_s_; + } StreamId sid_n() const { RTC_DCHECK_RUN_ON(network_thread_); return id_n_; @@ -234,8 +239,6 @@ class SctpDataChannel : public DataChannelInterface { ~SctpDataChannel() override; private: - class ObserverAdapter; - // The OPEN(_ACK) signaling state. enum HandshakeState { kHandshakeInit, @@ -245,23 +248,21 @@ class SctpDataChannel : public DataChannelInterface { kHandshakeReady }; - void UpdateState() RTC_RUN_ON(network_thread_); - void SetState(DataState state) RTC_RUN_ON(network_thread_); + void UpdateState(); + void SetState(DataState state); - void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_); + void DeliverQueuedReceivedData(); - void SendQueuedDataMessages() RTC_RUN_ON(network_thread_); - bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) - RTC_RUN_ON(network_thread_); - bool QueueSendDataMessage(const DataBuffer& buffer) - RTC_RUN_ON(network_thread_); + void SendQueuedDataMessages(); + bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked); + bool QueueSendDataMessage(const DataBuffer& buffer); - void SendQueuedControlMessages() RTC_RUN_ON(network_thread_); - bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) - RTC_RUN_ON(network_thread_); + void SendQueuedControlMessages(); + bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer); rtc::Thread* const signaling_thread_; rtc::Thread* const network_thread_; + StreamId id_s_ RTC_GUARDED_BY(signaling_thread_); StreamId id_n_ RTC_GUARDED_BY(network_thread_); const int internal_id_; const std::string label_; @@ -272,26 +273,25 @@ class SctpDataChannel : public DataChannelInterface { const bool negotiated_; const bool ordered_; - DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr; - std::unique_ptr observer_adapter_; - DataState state_ RTC_GUARDED_BY(network_thread_) = kConnecting; - RTCError error_ RTC_GUARDED_BY(network_thread_); - uint32_t messages_sent_ RTC_GUARDED_BY(network_thread_) = 0; - uint64_t bytes_sent_ RTC_GUARDED_BY(network_thread_) = 0; - uint32_t messages_received_ RTC_GUARDED_BY(network_thread_) = 0; - uint64_t bytes_received_ RTC_GUARDED_BY(network_thread_) = 0; + DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr; + DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting; + RTCError error_ RTC_GUARDED_BY(signaling_thread_); + uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0; + uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0; rtc::WeakPtr controller_ - RTC_GUARDED_BY(network_thread_); - HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) = + RTC_GUARDED_BY(signaling_thread_); + HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_) = kHandshakeInit; - bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false; + bool connected_to_transport_ RTC_GUARDED_BY(signaling_thread_) = false; // Did we already start the graceful SCTP closing procedure? - bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false; + bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false; // Control messages that always have to get sent out before any queued // data. - PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_); - PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_); - PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_); + PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_); + PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_); + PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_); }; // Downcast a PeerConnectionInterface that points to a proxy object diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 90a6cd2b92..c382d611ae 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -3268,16 +3268,20 @@ void SdpOfferAnswerHandler::AllocateSctpSids() { return; } - absl::optional guessed_role = GuessSslRole(); - network_thread()->BlockingCall( - [&, data_channel_controller = data_channel_controller()] { - RTC_DCHECK_RUN_ON(network_thread()); - absl::optional role = pc_->GetSctpSslRole_n(); - if (!role) - role = guessed_role; - if (role) - data_channel_controller->AllocateSctpSids(*role); - }); + absl::optional role = network_thread()->BlockingCall([this] { + RTC_DCHECK_RUN_ON(network_thread()); + return pc_->GetSctpSslRole_n(); + }); + + if (!role) { + role = GuessSslRole(); + } + + if (role) { + // TODO(webrtc:11547): Make this call on the network thread too once + // `AllocateSctpSids` has been updated. + data_channel_controller()->AllocateSctpSids(*role); + } } absl::optional SdpOfferAnswerHandler::GuessSslRole() const { @@ -5113,13 +5117,13 @@ void SdpOfferAnswerHandler::DestroyDataChannelTransport(RTCError error) { RTC_DCHECK_RUN_ON(signaling_thread()); const bool has_sctp = pc_->sctp_mid().has_value(); - context_->network_thread()->BlockingCall( - [&, data_channel_controller = data_channel_controller()] { - RTC_DCHECK_RUN_ON(context_->network_thread()); - if (has_sctp) - data_channel_controller->OnTransportChannelClosed(error); - pc_->TeardownDataChannelTransport_n(); - }); + if (has_sctp) + data_channel_controller()->OnTransportChannelClosed(error); + + context_->network_thread()->BlockingCall([this] { + RTC_DCHECK_RUN_ON(context_->network_thread()); + pc_->TeardownDataChannelTransport_n(); + }); if (has_sctp) pc_->ResetSctpDataMid(); diff --git a/pc/test/fake_data_channel_controller.h b/pc/test/fake_data_channel_controller.h index d1b41fcf88..26ecc31378 100644 --- a/pc/test/fake_data_channel_controller.h +++ b/pc/test/fake_data_channel_controller.h @@ -29,31 +29,23 @@ class FakeDataChannelController transport_available_(false), ready_to_send_(false), transport_error_(false) {} - - ~FakeDataChannelController() override { - network_thread_->BlockingCall([&] { - RTC_DCHECK_RUN_ON(network_thread_); - weak_factory_.InvalidateWeakPtrs(); - }); - } + virtual ~FakeDataChannelController() {} rtc::WeakPtr weak_ptr() { - RTC_DCHECK_RUN_ON(network_thread_); return weak_factory_.GetWeakPtr(); } rtc::scoped_refptr CreateDataChannel( absl::string_view label, webrtc::InternalDataChannelInit init) { + rtc::WeakPtr my_weak_ptr = weak_ptr(); + // Explicitly associate the weak ptr instance with the current thread to + // catch early any inappropriate referencing of it on the network thread. + RTC_CHECK(my_weak_ptr); + rtc::scoped_refptr channel = network_thread_->BlockingCall([&]() { RTC_DCHECK_RUN_ON(network_thread_); - rtc::WeakPtr my_weak_ptr = weak_ptr(); - // Explicitly associate the weak ptr instance with the current thread - // to catch early any inappropriate referencing of it on the network - // thread. - RTC_CHECK(my_weak_ptr); - rtc::scoped_refptr channel = webrtc::SctpDataChannel::Create( std::move(my_weak_ptr), std::string(label), @@ -62,16 +54,17 @@ class FakeDataChannelController if (transport_available_ && channel->sid_n().HasValue()) { AddSctpDataStream(channel->sid_n()); } - if (ready_to_send_) { - network_thread_->PostTask([channel = channel] { - if (channel->state() != - webrtc::DataChannelInterface::DataState::kClosed) { - channel->OnTransportReady(); - } - }); - } return channel; }); + if (ready_to_send_) { + signaling_thread_->PostTask( + SafeTask(signaling_safety_.flag(), [channel = channel] { + if (channel->state() != + webrtc::DataChannelInterface::DataState::kClosed) { + channel->OnTransportReady(); + } + })); + } connected_channels_.insert(channel.get()); return channel; } @@ -79,7 +72,6 @@ class FakeDataChannelController webrtc::RTCError SendData(webrtc::StreamId sid, const webrtc::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload) override { - RTC_DCHECK_RUN_ON(network_thread_); RTC_CHECK(ready_to_send_); RTC_CHECK(transport_available_); if (send_blocked_) { @@ -108,14 +100,17 @@ class FakeDataChannelController RTC_DCHECK_RUN_ON(network_thread_); RTC_CHECK(sid.HasValue()); known_stream_ids_.erase(sid); - // Unlike the real SCTP transport, act like the closing procedure finished - // instantly. - auto it = absl::c_find_if(connected_channels_, - [&](const auto* c) { return c->sid_n() == sid; }); - // This path mimics the DCC's OnChannelClosed handler since the FDCC - // (this class) doesn't have a transport that would do that. - if (it != connected_channels_.end()) - (*it)->OnClosingProcedureComplete(); + signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] { + // Unlike the real SCTP transport, act like the closing procedure finished + // instantly. + auto it = absl::c_find_if(connected_channels_, [&](const auto* c) { + return c->sid_s() == sid; + }); + // This path mimics the DCC's OnChannelClosed handler since the FDCC + // (this class) doesn't have a transport that would do that. + if (it != connected_channels_.end()) + (*it)->OnClosingProcedureComplete(); + })); } void OnChannelStateChanged( @@ -131,40 +126,36 @@ class FakeDataChannelController // Set true to emulate the SCTP stream being blocked by congestion control. void set_send_blocked(bool blocked) { - network_thread_->BlockingCall([&]() { - send_blocked_ = blocked; - if (!blocked) { - RTC_CHECK(transport_available_); - // Make a copy since `connected_channels_` may change while - // OnTransportReady is called. - auto copy = connected_channels_; - for (webrtc::SctpDataChannel* ch : copy) { - ch->OnTransportReady(); - } + send_blocked_ = blocked; + if (!blocked) { + RTC_CHECK(transport_available_); + // Make a copy since `connected_channels_` may change while + // OnTransportReady is called. + auto copy = connected_channels_; + for (webrtc::SctpDataChannel* ch : copy) { + ch->OnTransportReady(); } - }); + } } // Set true to emulate the transport channel creation, e.g. after // setLocalDescription/setRemoteDescription called with data content. void set_transport_available(bool available) { - network_thread_->BlockingCall([&]() { transport_available_ = available; }); + transport_available_ = available; } // Set true to emulate the transport OnTransportReady signal when the // transport becomes writable for the first time. void set_ready_to_send(bool ready) { RTC_CHECK(transport_available_); - network_thread_->BlockingCall([&]() { - ready_to_send_ = ready; - if (ready) { - std::set::iterator it; - for (it = connected_channels_.begin(); it != connected_channels_.end(); - ++it) { - (*it)->OnTransportReady(); - } + ready_to_send_ = ready; + if (ready) { + std::set::iterator it; + for (it = connected_channels_.begin(); it != connected_channels_.end(); + ++it) { + (*it)->OnTransportReady(); } - }); + } } void set_transport_error() { transport_error_ = true; }