diff --git a/api/data_channel_interface.h b/api/data_channel_interface.h index 4f74918ff9..35ef8e4f0d 100644 --- a/api/data_channel_interface.h +++ b/api/data_channel_interface.h @@ -100,6 +100,17 @@ class DataChannelObserver { // The data channel's buffered_amount has changed. virtual void OnBufferedAmountChange(uint64_t sent_data_size) {} + // Override this to get callbacks directly on the network thread. + // An implementation that does that must not block the network thread + // but rather only use the callback to trigger asynchronous processing + // elsewhere as a result of the notification. + // The default return value, `false`, means that notifications will be + // delivered on the signaling thread associated with the peerconnection + // instance. + // TODO(webrtc:11547): Eventually all DataChannelObserver implementations + // should be called on the network thread and this method removed. + virtual bool IsOkToCallOnTheNetworkThread() { return false; } + protected: virtual ~DataChannelObserver() = default; }; diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index 36e8be1411..20d5fe98b0 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -29,8 +29,15 @@ DataChannelController::~DataChannelController() { } bool DataChannelController::HasDataChannelsForTest() const { - RTC_DCHECK_RUN_ON(signaling_thread()); - return !sctp_data_channels_.empty(); + auto has_channels = [&] { + RTC_DCHECK_RUN_ON(network_thread()); + return !sctp_data_channels_n_.empty(); + }; + + if (network_thread()->IsCurrent()) + return has_channels(); + + return network_thread()->BlockingCall(std::move(has_channels)); } bool DataChannelController::HasUsedDataChannels() const { @@ -66,11 +73,15 @@ void DataChannelController::RemoveSctpDataStream(StreamId sid) { void DataChannelController::OnChannelStateChanged( SctpDataChannel* channel, DataChannelInterface::DataState state) { - RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK_RUN_ON(network_thread()); if (state == DataChannelInterface::DataState::kClosed) OnSctpDataChannelClosed(channel); - pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), + [this, channel_id = channel->internal_id(), state = state] { + pc_->OnSctpDataChannelStateChanged(channel_id, state); + })); } void DataChannelController::OnDataReceived( @@ -82,27 +93,22 @@ void DataChannelController::OnDataReceived( if (HandleOpenMessage_n(channel_id, type, buffer)) return; - signaling_thread()->PostTask( - SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] { - RTC_DCHECK_RUN_ON(signaling_thread()); - // TODO(bugs.webrtc.org/11547): The data being received should be - // delivered on the network thread. - auto it = FindChannel(StreamId(channel_id)); - if (it != sctp_data_channels_.end()) - (*it)->OnDataReceived(type, buffer); - })); + auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { + return c->sid_n().stream_id_int() == channel_id; + }); + + if (it != sctp_data_channels_n_.end()) + (*it)->OnDataReceived(type, buffer); } void DataChannelController::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - SafeTask(signaling_safety_.flag(), [this, channel_id] { - RTC_DCHECK_RUN_ON(signaling_thread()); - // TODO(bugs.webrtc.org/11547): Should run on the network thread. - auto it = FindChannel(StreamId(channel_id)); - if (it != sctp_data_channels_.end()) - (*it)->OnClosingProcedureStartedRemotely(); - })); + auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { + return c->sid_n().stream_id_int() == channel_id; + }); + + if (it != sctp_data_channels_n_.end()) + (*it)->OnClosingProcedureStartedRemotely(); } void DataChannelController::OnChannelClosed(int channel_id) { @@ -112,48 +118,44 @@ void DataChannelController::OnChannelClosed(int channel_id) { auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { return c->sid_n() == sid; }); - if (it != sctp_data_channels_n_.end()) + if (it != sctp_data_channels_n_.end()) { + rtc::scoped_refptr channel = std::move(*it); sctp_data_channels_n_.erase(it); - - signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] { - RTC_DCHECK_RUN_ON(signaling_thread()); - auto it = FindChannel(sid); - // Remove the channel from our list, close it and free up resources. - if (it != sctp_data_channels_.end()) { - rtc::scoped_refptr channel = std::move(*it); - // Note: this causes OnSctpDataChannelClosed() to not do anything - // when called from within `OnClosingProcedureComplete`. - sctp_data_channels_.erase(it); - - channel->OnClosingProcedureComplete(); - } - })); + channel->OnClosingProcedureComplete(); + } } void DataChannelController::OnReadyToSend() { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - auto copy = sctp_data_channels_; - for (const auto& channel : copy) + auto copy = sctp_data_channels_n_; + for (const auto& channel : copy) { + if (channel->sid_n().HasValue()) { channel->OnTransportReady(); - })); + } else { + // This happens for role==SSL_SERVER channels when we get notified by + // the transport *before* the SDP code calls `AllocateSctpSids` to + // trigger assignment of sids. In this case OnTransportReady() will be + // called from within `AllocateSctpSids` below. + RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel."; + } + } } void DataChannelController::OnTransportClosed(RTCError error) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - SafeTask(signaling_safety_.flag(), [this, error] { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportChannelClosed(error); - })); + // This loop will close all data channels and trigger a callback to + // `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so + // we create a local copy while we do the fan-out. + auto copy = sctp_data_channels_n_; + for (const auto& channel : copy) + channel->OnTransportChannelClosed(error); } void DataChannelController::SetupDataChannelTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); // There's a new data channel transport. This needs to be signaled to the - // `sctp_data_channels_` so that they can reopen and reconnect. This is + // `sctp_data_channels_n_` so that they can reopen and reconnect. This is // necessary when bundling is applied. NotifyDataChannelsOfTransportCreated(); } @@ -165,11 +167,12 @@ void DataChannelController::PrepareForShutdown() { void DataChannelController::TeardownDataChannelTransport_n() { RTC_DCHECK_RUN_ON(network_thread()); - if (data_channel_transport()) { - data_channel_transport()->SetDataSink(nullptr); + if (data_channel_transport_) { + data_channel_transport_->SetDataSink(nullptr); + set_data_channel_transport(nullptr); } - set_data_channel_transport(nullptr); sctp_data_channels_n_.clear(); + weak_factory_.InvalidateWeakPtrs(); } void DataChannelController::OnTransportChanged( @@ -185,7 +188,7 @@ void DataChannelController::OnTransportChanged( new_data_channel_transport->SetDataSink(this); // There's a new data channel transport. This needs to be signaled to the - // `sctp_data_channels_` so that they can reopen and reconnect. This is + // `sctp_data_channels_n_` so that they can reopen and reconnect. This is // necessary when bundling is applied. NotifyDataChannelsOfTransportCreated(); } @@ -194,10 +197,10 @@ void DataChannelController::OnTransportChanged( std::vector DataChannelController::GetDataChannelStats() const { - RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK_RUN_ON(network_thread()); std::vector stats; - stats.reserve(sctp_data_channels_.size()); - for (const auto& channel : sctp_data_channels_) + stats.reserve(sctp_data_channels_n_.size()); + for (const auto& channel : sctp_data_channels_n_) stats.push_back(channel->GetStats()); return stats; } @@ -219,28 +222,38 @@ bool DataChannelController::HandleOpenMessage_n( << channel_id; } else { config.open_handshake_role = InternalDataChannelInit::kAcker; - signaling_thread()->PostTask( - SafeTask(signaling_safety_.flag(), - [this, label = std::move(label), config = std::move(config)] { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnDataChannelOpenMessage(label, config); - })); + auto channel_or_error = CreateDataChannel(label, config); + if (channel_or_error.ok()) { + signaling_thread()->PostTask(SafeTask( + signaling_safety_.flag(), + [this, channel = channel_or_error.MoveValue(), + ready_to_send = data_channel_transport_->IsReadyToSend()] { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnDataChannelOpenMessage(std::move(channel), ready_to_send); + })); + } else { + RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message." + << ToString(channel_or_error.error().type()); + } } return true; } void DataChannelController::OnDataChannelOpenMessage( - const std::string& label, - const InternalDataChannelInit& config) { - auto channel_or_error = InternalCreateDataChannelWithProxy(label, config); - if (!channel_or_error.ok()) { - RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message." - << ToString(channel_or_error.error().type()); - return; - } + rtc::scoped_refptr channel, + bool ready_to_send) { + has_used_data_channels_ = true; + auto proxy = SctpDataChannel::CreateProxy(channel); - pc_->Observer()->OnDataChannel(channel_or_error.MoveValue()); + pc_->Observer()->OnDataChannel(proxy); pc_->NoteDataAddedEvent(); + + if (ready_to_send) { + network_thread()->PostTask([channel = std::move(channel)] { + if (channel->state() != DataChannelInterface::DataState::kClosed) + channel->OnTransportReady(); + }); + } } // RTC_RUN_ON(network_thread()) @@ -269,6 +282,31 @@ RTCError DataChannelController::ReserveOrAllocateSid( return RTCError::OK(); } +// RTC_RUN_ON(network_thread()) +RTCErrorOr> +DataChannelController::CreateDataChannel(const std::string& label, + InternalDataChannelInit& config) { + StreamId sid(config.id); + RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role); + if (!err.ok()) + return err; + + // In case `sid` has changed. Update `config` accordingly. + config.id = sid.stream_id_int(); + + rtc::scoped_refptr channel = SctpDataChannel::Create( + weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr, + config, signaling_thread(), network_thread()); + RTC_DCHECK(channel); + sctp_data_channels_n_.push_back(channel); + + // If we have an id already, notify the transport. + if (sid.HasValue()) + AddSctpDataStream(sid); + + return channel; +} + RTCErrorOr> DataChannelController::InternalCreateDataChannelWithProxy( const std::string& label, @@ -283,29 +321,25 @@ DataChannelController::InternalCreateDataChannelWithProxy( bool ready_to_send = false; InternalDataChannelInit new_config = config; StreamId sid(new_config.id); - auto weak_ptr = weak_factory_.GetWeakPtr(); - RTC_DCHECK(weak_ptr); // Associate with current thread. auto ret = network_thread()->BlockingCall( [&]() -> RTCErrorOr> { RTC_DCHECK_RUN_ON(network_thread()); - RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role); - if (!err.ok()) - return err; - - // In case `sid` has changed. Update `new_config` accordingly. - new_config.id = sid.stream_id_int(); + auto channel = CreateDataChannel(label, new_config); + if (!channel.ok()) + return channel; ready_to_send = data_channel_transport_ && data_channel_transport_->IsReadyToSend(); - - rtc::scoped_refptr channel(SctpDataChannel::Create( - std::move(weak_ptr), label, data_channel_transport_ != nullptr, - new_config, signaling_thread(), network_thread())); - RTC_DCHECK(channel); - sctp_data_channels_n_.push_back(channel); - - // If we have an id already, notify the transport. - if (sid.HasValue()) - AddSctpDataStream(sid); + if (ready_to_send) { + // If the transport is ready to send because the initial channel + // ready signal may have been sent before the DataChannel creation. + // This has to be done async because the upper layer objects (e.g. + // Chrome glue and WebKit) are not wired up properly until after + // `InternalCreateDataChannelWithProxy` returns. + network_thread()->PostTask([channel = channel.value()] { + if (channel->state() != DataChannelInterface::DataState::kClosed) + channel->OnTransportReady(); + }); + } return channel; }); @@ -313,114 +347,71 @@ DataChannelController::InternalCreateDataChannelWithProxy( if (!ret.ok()) return ret.MoveError(); - if (ready_to_send) { - // Checks if the transport is ready to send because the initial channel - // ready signal may have been sent before the DataChannel creation. - // This has to be done async because the upper layer objects (e.g. - // Chrome glue and WebKit) are not wired up properly until after this - // function returns. - signaling_thread()->PostTask( - SafeTask(signaling_safety_.flag(), [channel = ret.value()] { - if (channel->state() != DataChannelInterface::DataState::kClosed) - channel->OnTransportReady(); - })); - } - - sctp_data_channels_.push_back(ret.value()); has_used_data_channels_ = true; return SctpDataChannel::CreateProxy(ret.MoveValue()); } void DataChannelController::AllocateSctpSids(rtc::SSLRole role) { - RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK_RUN_ON(network_thread()); + + const bool ready_to_send = + data_channel_transport_ && data_channel_transport_->IsReadyToSend(); std::vector> channels_to_update; std::vector> channels_to_close; - - network_thread()->BlockingCall([&] { - RTC_DCHECK_RUN_ON(network_thread()); - for (auto it = sctp_data_channels_n_.begin(); - it != sctp_data_channels_n_.end();) { - if (!(*it)->sid_n().HasValue()) { - StreamId sid = sid_allocator_.AllocateSid(role); - if (sid.HasValue()) { - (*it)->SetSctpSid_n(sid); - AddSctpDataStream(sid); - channels_to_update.push_back(std::make_pair((*it).get(), sid)); - } else { - channels_to_close.push_back(std::move(*it)); - it = sctp_data_channels_n_.erase(it); - continue; + for (auto it = sctp_data_channels_n_.begin(); + it != sctp_data_channels_n_.end();) { + if (!(*it)->sid_n().HasValue()) { + StreamId sid = sid_allocator_.AllocateSid(role); + if (sid.HasValue()) { + (*it)->SetSctpSid_n(sid); + AddSctpDataStream(sid); + if (ready_to_send) { + RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send."; + (*it)->OnTransportReady(); } + channels_to_update.push_back(std::make_pair((*it).get(), sid)); + } else { + channels_to_close.push_back(std::move(*it)); + it = sctp_data_channels_n_.erase(it); + continue; } - ++it; } - }); + ++it; + } // Since closing modifies the list of channels, we have to do the actual // closing outside the loop. for (const auto& channel : channels_to_close) { channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID"); - // The channel should now have been removed from sctp_data_channels_. - RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) { - return c.get() == channel.get(); - }) == sctp_data_channels_.end()); - } - - for (auto& pair : channels_to_update) { - auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) { - return c.get() == pair.first; - }); - RTC_DCHECK(it != sctp_data_channels_.end()); - (*it)->SetSctpSid_s(pair.second); } } void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { - RTC_DCHECK_RUN_ON(signaling_thread()); - - network_thread()->BlockingCall([&] { - RTC_DCHECK_RUN_ON(network_thread()); - // After the closing procedure is done, it's safe to use this ID for - // another data channel. - if (channel->sid_n().HasValue()) { - sid_allocator_.ReleaseSid(channel->sid_n()); - } - - auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { - return c.get() == channel; - }); - - if (it != sctp_data_channels_n_.end()) - sctp_data_channels_n_.erase(it); - }); - - for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end(); - ++it) { - if (it->get() == channel) { - // Since this method is triggered by a signal from the DataChannel, - // we can't free it directly here; we need to free it asynchronously. - rtc::scoped_refptr release = std::move(*it); - sctp_data_channels_.erase(it); - signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), - [release = std::move(release)] {})); - return; - } + RTC_DCHECK_RUN_ON(network_thread()); + // After the closing procedure is done, it's safe to use this ID for + // another data channel. + if (channel->sid_n().HasValue()) { + sid_allocator_.ReleaseSid(channel->sid_n()); } + auto it = absl::c_find_if(sctp_data_channels_n_, + [&](const auto& c) { return c.get() == channel; }); + if (it != sctp_data_channels_n_.end()) + sctp_data_channels_n_.erase(it); } void DataChannelController::OnTransportChannelClosed(RTCError error) { - RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK_RUN_ON(network_thread()); // Use a temporary copy of the SCTP DataChannel list because the // DataChannel may callback to us and try to modify the list. // TODO(tommi): `OnTransportChannelClosed` is called from // `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before // `TeardownDataChannelTransport_n` is called (but on the network thread) from - // the same function. Once `sctp_data_channels_` moves to the network thread, - // we can get rid of this function (OnTransportChannelClosed) and run this - // loop from within the TeardownDataChannelTransport_n callback. + // the same function. We can now get rid of this function + // (OnTransportChannelClosed) and run this loop from within the + // TeardownDataChannelTransport_n callback. std::vector> temp_sctp_dcs; - temp_sctp_dcs.swap(sctp_data_channels_); + temp_sctp_dcs.swap(sctp_data_channels_n_); for (const auto& channel : temp_sctp_dcs) { channel->OnTransportChannelClosed(error); } @@ -444,16 +435,10 @@ RTCError DataChannelController::DataChannelSendData( StreamId sid, const SendDataParams& params, const rtc::CopyOnWriteBuffer& payload) { - // TODO(bugs.webrtc.org/11547): Expect method to be called on the network - // thread instead. Remove the BlockingCall() below and move associated state - // to the network thread. - RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(data_channel_transport()); - - return network_thread()->BlockingCall([this, sid, params, payload] { - return data_channel_transport()->SendData(sid.stream_id_int(), params, - payload); - }); + return data_channel_transport()->SendData(sid.stream_id_int(), params, + payload); } void DataChannelController::NotifyDataChannelsOfTransportCreated() { @@ -463,22 +448,8 @@ void DataChannelController::NotifyDataChannelsOfTransportCreated() { for (const auto& channel : sctp_data_channels_n_) { if (channel->sid_n().HasValue()) AddSctpDataStream(channel->sid_n()); + channel->OnTransportChannelCreated(); } - - signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - for (const auto& channel : sctp_data_channels_) { - channel->OnTransportChannelCreated(); - } - })); -} - -std::vector>::iterator -DataChannelController::FindChannel(StreamId stream_id) { - RTC_DCHECK_RUN_ON(signaling_thread()); - return absl::c_find_if(sctp_data_channels_, [&](const auto& c) { - return c->sid_s() == stream_id; - }); } rtc::Thread* DataChannelController::network_thread() const { diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index fa6c13e6ec..074b1feadb 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -107,6 +107,11 @@ class DataChannelController : public SctpDataChannelControllerInterface, rtc::Thread* signaling_thread() const; private: + // Creates a new SctpDataChannel object on the network thread. + RTCErrorOr> CreateDataChannel( + const std::string& label, + InternalDataChannelInit& config) RTC_RUN_ON(network_thread()); + // Parses and handles open messages. Returns true if the message is an open // message and should be considered to be handled, false otherwise. bool HandleOpenMessage_n(int channel_id, @@ -114,8 +119,8 @@ class DataChannelController : public SctpDataChannelControllerInterface, const rtc::CopyOnWriteBuffer& buffer) RTC_RUN_ON(network_thread()); // Called when a valid data channel OPEN message is received. - void OnDataChannelOpenMessage(const std::string& label, - const InternalDataChannelInit& config) + void OnDataChannelOpenMessage(rtc::scoped_refptr channel, + bool ready_to_send) RTC_RUN_ON(signaling_thread()); // Accepts a `StreamId` which may be pre-negotiated or unassigned. For @@ -139,9 +144,6 @@ class DataChannelController : public SctpDataChannelControllerInterface, // (calls OnTransportChannelCreated on the signaling thread). void NotifyDataChannelsOfTransportCreated(); - std::vector>::iterator FindChannel( - StreamId stream_id); - // Plugin transport used for data channels. Pointer may be accessed and // checked from any thread, but the object may only be touched on the // network thread. @@ -149,21 +151,16 @@ class DataChannelController : public SctpDataChannelControllerInterface, // thread. DataChannelTransportInterface* data_channel_transport_ = nullptr; SctpSidAllocator sid_allocator_ RTC_GUARDED_BY(network_thread()); - std::vector> sctp_data_channels_ - RTC_GUARDED_BY(signaling_thread()); - // TODO(bugs.webrtc.org/11547): This vector will eventually take over from - // `sctp_data_channels_`. While we're migrating away from thread hops - // between the signaling and network threads, we need both, so this is - // a temporary situation. std::vector> sctp_data_channels_n_ RTC_GUARDED_BY(network_thread()); bool has_used_data_channels_ RTC_GUARDED_BY(signaling_thread()) = false; // Owning PeerConnection. PeerConnectionInternal* const pc_; - // The weak pointers must be dereferenced and invalidated on the signalling + // The weak pointers must be dereferenced and invalidated on the network // thread only. - rtc::WeakPtrFactory weak_factory_{this}; + rtc::WeakPtrFactory weak_factory_ + RTC_GUARDED_BY(network_thread()){this}; ScopedTaskSafety signaling_safety_; }; diff --git a/pc/data_channel_controller_unittest.cc b/pc/data_channel_controller_unittest.cc index 9b66faccd4..fd3deae0d7 100644 --- a/pc/data_channel_controller_unittest.cc +++ b/pc/data_channel_controller_unittest.cc @@ -131,44 +131,6 @@ TEST_F(DataChannelControllerTest, CloseAfterControllerDestroyed) { channel->Close(); } -TEST_F(DataChannelControllerTest, AsyncChannelCloseTeardown) { - DataChannelControllerForTest dcc(pc_.get()); - auto ret = dcc.InternalCreateDataChannelWithProxy( - "label", InternalDataChannelInit(DataChannelInit())); - ASSERT_TRUE(ret.ok()); - auto channel = ret.MoveValue(); - SctpDataChannel* inner_channel = - DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting( - channel.get()); - // Grab a reference for testing purposes. - inner_channel->AddRef(); - - channel = nullptr; // dcc still holds a reference to `channel`. - EXPECT_TRUE(dcc.HasDataChannelsForTest()); - - // Trigger a Close() for the channel. This will send events back to dcc, - // eventually reaching `OnSctpDataChannelClosed` where dcc removes - // the channel from the internal list of data channels, but does not release - // the reference synchronously since that reference might be the last one. - inner_channel->Close(); - // Now there should be no tracked data channels. - EXPECT_FALSE(dcc.HasDataChannelsForTest()); - // But there should be an async operation queued that still holds a reference. - // That means that the test reference, must not be the last one. - ASSERT_NE(inner_channel->Release(), - rtc::RefCountReleaseStatus::kDroppedLastRef); - // Grab a reference again (using the pointer is safe since the object still - // exists and we control the single-threaded environment manually). - inner_channel->AddRef(); - // Now run the queued up async operations on the signaling (current) thread. - // This time, the reference formerly owned by dcc, should be release and the - // truly last reference is now held by the test. - run_loop_.Flush(); - // Check that this is the last reference. - EXPECT_EQ(inner_channel->Release(), - rtc::RefCountReleaseStatus::kDroppedLastRef); -} - // Allocate the maximum number of data channels and then one more. // The last allocation should fail. TEST_F(DataChannelControllerTest, MaxChannels) { diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc index fe75d385f3..2582561282 100644 --- a/pc/data_channel_unittest.cc +++ b/pc/data_channel_unittest.cc @@ -40,18 +40,19 @@ static constexpr int kDefaultTimeout = 10000; class FakeDataChannelObserver : public DataChannelObserver { public: - FakeDataChannelObserver() - : messages_received_(0), - on_state_change_count_(0), - on_buffered_amount_change_count_(0) {} + FakeDataChannelObserver() { + // This implementation relies on the SctpDataChannel::ObserverAdapter + // implementation to post events to the signaling thread. + RTC_DCHECK(!IsOkToCallOnTheNetworkThread()); + } - void OnStateChange() { ++on_state_change_count_; } + void OnStateChange() override { ++on_state_change_count_; } - void OnBufferedAmountChange(uint64_t previous_amount) { + void OnBufferedAmountChange(uint64_t previous_amount) override { ++on_buffered_amount_change_count_; } - void OnMessage(const DataBuffer& buffer) { ++messages_received_; } + void OnMessage(const DataBuffer& buffer) override { ++messages_received_; } size_t messages_received() const { return messages_received_; } @@ -68,9 +69,9 @@ class FakeDataChannelObserver : public DataChannelObserver { } private: - size_t messages_received_; - size_t on_state_change_count_; - size_t on_buffered_amount_change_count_; + size_t messages_received_ = 0u; + size_t on_state_change_count_ = 0u; + size_t on_buffered_amount_change_count_ = 0u; }; class SctpDataChannelTest : public ::testing::Test { @@ -93,11 +94,17 @@ class SctpDataChannelTest : public ::testing::Test { void SetChannelReady() { controller_->set_transport_available(true); - inner_channel_->OnTransportChannelCreated(); - if (!inner_channel_->sid_s().HasValue()) { - SetChannelSid(inner_channel_, StreamId(0)); - } + StreamId sid(0); + network_thread_.BlockingCall([&]() { + RTC_DCHECK_RUN_ON(&network_thread_); + if (!inner_channel_->sid_n().HasValue()) { + inner_channel_->SetSctpSid_n(sid); + controller_->AddSctpDataStream(sid); + } + inner_channel_->OnTransportChannelCreated(); + }); controller_->set_ready_to_send(true); + run_loop_.Flush(); } // TODO(bugs.webrtc.org/11547): This mirrors what the DataChannelController @@ -108,9 +115,10 @@ class SctpDataChannelTest : public ::testing::Test { void SetChannelSid(const rtc::scoped_refptr& channel, StreamId sid) { RTC_DCHECK(sid.HasValue()); - network_thread_.BlockingCall( - [&]() { controller_->AddSctpDataStream(sid); }); - channel->SetSctpSid_s(sid); + network_thread_.BlockingCall([&]() { + channel->SetSctpSid_n(sid); + controller_->AddSctpDataStream(sid); + }); } void AddObserver() { @@ -146,11 +154,13 @@ TEST_F(SctpDataChannelTest, VerifyConfigurationGetters) { // Check the non-const part of the configuration. EXPECT_EQ(channel_->id(), init_.id); - EXPECT_EQ(inner_channel_->sid_s(), StreamId()); + network_thread_.BlockingCall( + [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId()); }); SetChannelReady(); EXPECT_EQ(channel_->id(), 0); - EXPECT_EQ(inner_channel_->sid_s(), StreamId(0)); + network_thread_.BlockingCall( + [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId(0)); }); } // Verifies that the data channel is connected to the transport after creation. @@ -158,13 +168,15 @@ TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) { controller_->set_transport_available(true); rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", init_); - EXPECT_TRUE(controller_->IsConnected(dc.get())); + // The sid is not set yet, so it should not have added the streams. - EXPECT_FALSE(controller_->IsStreamAdded(dc->sid_s())); + StreamId sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); }); + EXPECT_FALSE(controller_->IsStreamAdded(sid)); SetChannelSid(dc, StreamId(0)); - EXPECT_TRUE(controller_->IsStreamAdded(dc->sid_s())); + sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); }); + EXPECT_TRUE(controller_->IsStreamAdded(sid)); } // Tests the state of the data channel. @@ -183,7 +195,7 @@ TEST_F(SctpDataChannelTest, StateTransition) { channel_->Close(); // The (simulated) transport close notifications runs on the network thread // and posts a completion notification to the signaling (current) thread. - // Allow that ooperation to complete before checking the state. + // Allow that operation to complete before checking the state. run_loop_.Flush(); EXPECT_EQ(DataChannelInterface::kClosed, channel_->state()); EXPECT_EQ(observer_->on_state_change_count(), 3u); @@ -201,6 +213,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { EXPECT_TRUE(channel_->Send(buffer)); size_t successful_send_count = 1; + run_loop_.Flush(); EXPECT_EQ(0U, channel_->buffered_amount()); EXPECT_EQ(successful_send_count, observer_->on_buffered_amount_change_count()); @@ -217,6 +230,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { observer_->on_buffered_amount_change_count()); controller_->set_send_blocked(false); + run_loop_.Flush(); successful_send_count += number_of_packets; EXPECT_EQ(0U, channel_->buffered_amount()); EXPECT_EQ(successful_send_count, @@ -337,10 +351,9 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) { SetChannelReady(); InternalDataChannelInit init; init.id = 1; - rtc::scoped_refptr dc = - controller_->CreateDataChannel("test1", init); - EXPECT_EQ(DataChannelInterface::kConnecting, dc->state()); - EXPECT_TRUE_WAIT(DataChannelInterface::kOpen == dc->state(), 1000); + auto dc = webrtc::SctpDataChannel::CreateProxy( + controller_->CreateDataChannel("test1", init)); + EXPECT_EQ(DataChannelInterface::kOpen, dc->state()); } // Tests that an unordered DataChannel sends data as ordered until the OPEN_ACK @@ -352,21 +365,23 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) { init.ordered = false; rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", init); + auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); // Sends a message and verifies it's ordered. DataBuffer buffer("some data"); - ASSERT_TRUE(dc->Send(buffer)); + ASSERT_TRUE(proxy->Send(buffer)); EXPECT_TRUE(controller_->last_send_data_params().ordered); // Emulates receiving an OPEN_ACK message. rtc::CopyOnWriteBuffer payload; WriteDataChannelOpenAckMessage(&payload); - dc->OnDataReceived(DataMessageType::kControl, payload); + network_thread_.BlockingCall( + [&] { dc->OnDataReceived(DataMessageType::kControl, payload); }); // Sends another message and verifies it's unordered. - ASSERT_TRUE(dc->Send(buffer)); + ASSERT_TRUE(proxy->Send(buffer)); EXPECT_FALSE(controller_->last_send_data_params().ordered); } @@ -379,15 +394,17 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) { init.ordered = false; rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", init); + auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); // Emulates receiving a DATA message. DataBuffer buffer("data"); - dc->OnDataReceived(DataMessageType::kText, buffer.data); + network_thread_.BlockingCall( + [&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); }); // Sends a message and verifies it's unordered. - ASSERT_TRUE(dc->Send(buffer)); + ASSERT_TRUE(proxy->Send(buffer)); EXPECT_FALSE(controller_->last_send_data_params().ordered); } @@ -440,7 +457,10 @@ TEST_F(SctpDataChannelTest, ReceiveDataWithValidId) { AddObserver(); DataBuffer buffer("abcd"); - inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data); + network_thread_.BlockingCall([&] { + inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data); + }); + run_loop_.Flush(); EXPECT_EQ(1U, observer_->messages_received()); } @@ -455,8 +475,9 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) { SetChannelReady(); rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", config); + auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); EXPECT_EQ(0, controller_->last_sid()); } @@ -480,9 +501,10 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) { EXPECT_EQ(0U, channel_->bytes_received()); // Receive three buffers while data channel isn't open. - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[0].data); - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[1].data); - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[2].data); + network_thread_.BlockingCall([&] { + for (int i : {0, 1, 2}) + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data); + }); EXPECT_EQ(0U, observer_->messages_received()); EXPECT_EQ(0U, channel_->messages_received()); EXPECT_EQ(0U, channel_->bytes_received()); @@ -496,9 +518,11 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesReceived) { EXPECT_EQ(bytes_received, channel_->bytes_received()); // Receive three buffers while open. - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[3].data); - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[4].data); - inner_channel_->OnDataReceived(DataMessageType::kText, buffers[5].data); + network_thread_.BlockingCall([&] { + for (int i : {3, 4, 5}) + inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data); + }); + run_loop_.Flush(); bytes_received += buffers[3].size() + buffers[4].size() + buffers[5].size(); EXPECT_EQ(6U, observer_->messages_received()); EXPECT_EQ(6U, channel_->messages_received()); @@ -516,8 +540,9 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) { SetChannelReady(); rtc::scoped_refptr dc = controller_->CreateDataChannel("test1", config); + auto proxy = webrtc::SctpDataChannel::CreateProxy(dc); - EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); EXPECT_EQ(config.id, controller_->last_sid()); EXPECT_EQ(DataMessageType::kControl, @@ -554,9 +579,8 @@ TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) { EXPECT_TRUE(channel_->Send(packet)); } - // The sending buffer shoul be full, send returns false. + // The sending buffer should be full, `Send()` returns false. EXPECT_FALSE(channel_->Send(packet)); - EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state()); } @@ -580,10 +604,12 @@ TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) { rtc::CopyOnWriteBuffer buffer(1024); memset(buffer.MutableData(), 0, buffer.size()); - // Receiving data without having an observer will overflow the buffer. - for (size_t i = 0; i < 16 * 1024 + 1; ++i) { - inner_channel_->OnDataReceived(DataMessageType::kText, buffer); - } + network_thread_.BlockingCall([&] { + // Receiving data without having an observer will overflow the buffer. + for (size_t i = 0; i < 16 * 1024 + 1; ++i) { + inner_channel_->OnDataReceived(DataMessageType::kText, buffer); + } + }); EXPECT_EQ(DataChannelInterface::kClosed, channel_->state()); EXPECT_FALSE(channel_->error().ok()); EXPECT_EQ(RTCErrorType::RESOURCE_EXHAUSTED, channel_->error().type()); @@ -604,7 +630,8 @@ TEST_F(SctpDataChannelTest, SendEmptyData) { // Tests that a channel can be closed without being opened or assigned an sid. TEST_F(SctpDataChannelTest, NeverOpened) { controller_->set_transport_available(true); - inner_channel_->OnTransportChannelCreated(); + network_thread_.BlockingCall( + [&] { inner_channel_->OnTransportChannelCreated(); }); channel_->Close(); } @@ -634,7 +661,8 @@ TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) { // transition to the "closed" state. RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, ""); error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); - inner_channel_->OnTransportChannelClosed(error); + network_thread_.BlockingCall( + [&] { inner_channel_->OnTransportChannelClosed(error); }); controller_.reset(nullptr); EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), kDefaultTimeout); @@ -654,7 +682,8 @@ TEST_F(SctpDataChannelTest, TransportGotErrorCode) { error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); error.set_sctp_cause_code( static_cast(cricket::SctpErrorCauseCode::kProtocolViolation)); - inner_channel_->OnTransportChannelClosed(error); + network_thread_.BlockingCall( + [&] { inner_channel_->OnTransportChannelClosed(error); }); controller_.reset(nullptr); EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), kDefaultTimeout); diff --git a/pc/legacy_stats_collector.cc b/pc/legacy_stats_collector.cc index 6829e359b8..7ce85e411c 100644 --- a/pc/legacy_stats_collector.cc +++ b/pc/legacy_stats_collector.cc @@ -670,7 +670,7 @@ void LegacyStatsCollector::UpdateStats( // to fetch stats, then applies them on the signaling thread. See if we need // to do this synchronously or if updating the stats without blocking is safe. std::map transport_names_by_mid = - ExtractSessionInfo(); + ExtractSessionAndDataInfo(); // TODO(tommi): All of these hop over to the worker thread to fetch // information. We could post a task to run all of these and post @@ -681,7 +681,6 @@ void LegacyStatsCollector::UpdateStats( ExtractBweInfo(); ExtractMediaInfo(transport_names_by_mid); ExtractSenderInfo(); - ExtractDataInfo(); UpdateTrackReports(); } @@ -856,19 +855,26 @@ StatsReport* LegacyStatsCollector::AddCandidateReport( return report; } -std::map LegacyStatsCollector::ExtractSessionInfo() { - TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionInfo"); +std::map +LegacyStatsCollector::ExtractSessionAndDataInfo() { + TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionAndDataInfo"); RTC_DCHECK_RUN_ON(pc_->signaling_thread()); SessionStats stats; + StatsCollection::Container data_report_collection; auto transceivers = pc_->GetTransceiversInternal(); pc_->network_thread()->BlockingCall( [&, sctp_transport_name = pc_->sctp_transport_name(), sctp_mid = pc_->sctp_mid()]() mutable { stats = ExtractSessionInfo_n( transceivers, std::move(sctp_transport_name), std::move(sctp_mid)); + StatsCollection data_reports; + ExtractDataInfo_n(&data_reports); + data_report_collection = data_reports.DetachCollection(); }); + reports_.MergeCollection(std::move(data_report_collection)); + ExtractSessionInfo_s(stats); return std::move(stats.transport_names_by_mid); @@ -1292,8 +1298,8 @@ void LegacyStatsCollector::ExtractSenderInfo() { } } -void LegacyStatsCollector::ExtractDataInfo() { - RTC_DCHECK_RUN_ON(pc_->signaling_thread()); +void LegacyStatsCollector::ExtractDataInfo_n(StatsCollection* reports) { + RTC_DCHECK_RUN_ON(pc_->network_thread()); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; @@ -1301,7 +1307,7 @@ void LegacyStatsCollector::ExtractDataInfo() { for (const auto& stats : data_stats) { StatsReport::Id id(StatsReport::NewTypedIntId( StatsReport::kStatsReportTypeDataChannel, stats.id)); - StatsReport* report = reports_.ReplaceOrAddNew(id); + StatsReport* report = reports->ReplaceOrAddNew(id); report->set_timestamp(stats_gathering_started_); report->AddString(StatsReport::kStatsValueNameLabel, stats.label); // Filter out the initial id (-1). diff --git a/pc/legacy_stats_collector.h b/pc/legacy_stats_collector.h index cedd36c853..e905b39d48 100644 --- a/pc/legacy_stats_collector.h +++ b/pc/legacy_stats_collector.h @@ -165,11 +165,13 @@ class LegacyStatsCollector : public LegacyStatsCollectorInterface { const StatsReport::Id& channel_report_id, const cricket::ConnectionInfo& info); - void ExtractDataInfo(); + void ExtractDataInfo_n(StatsCollection* reports); // Returns the `transport_names_by_mid` member from the SessionStats as - // gathered and used to populate the stats. - std::map ExtractSessionInfo(); + // gathered and used to populate the stats. Contains one synchronous hop + // to the network thread to get this information along with querying data + // channel stats at the same time and populating `reports_`. + std::map ExtractSessionAndDataInfo(); void ExtractBweInfo(); void ExtractMediaInfo( diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index fdbd32bbce..82c5914a52 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -2286,7 +2286,7 @@ bool PeerConnection::GetTransportDescription( } std::vector PeerConnection::GetDataChannelStats() const { - RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK_RUN_ON(network_thread()); return data_channel_controller_.GetDataChannelStats(); } diff --git a/pc/proxy.h b/pc/proxy.h index 2be115fdf3..ebe60c0293 100644 --- a/pc/proxy.h +++ b/pc/proxy.h @@ -450,6 +450,24 @@ class ConstMethodCall { return c_->method(); \ } +// Allows a custom implementation of a method where the otherwise proxied +// implementation can do a more efficient, yet thread-safe, job than the proxy +// can do by default or when more flexibility is needed than can be provided +// by a proxy. +// Note that calls to these methods should be expected to be made from unknown +// threads. +#define BYPASS_PROXY_METHOD0(r, method) \ + r method() override { \ + TRACE_BOILERPLATE(method); \ + return c_->method(); \ + } + +// The 1 argument version of `BYPASS_PROXY_METHOD0`. +#define BYPASS_PROXY_METHOD1(r, method, t1) \ + r method(t1 a1) override { \ + TRACE_BOILERPLATE(method); \ + return c_->method(std::move(a1)); \ + } } // namespace webrtc #endif // PC_PROXY_H_ diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc index 429e9d0268..574e1cc6ec 100644 --- a/pc/rtc_stats_collector.cc +++ b/pc/rtc_stats_collector.cc @@ -1499,7 +1499,6 @@ void RTCStatsCollector::ProducePartialResultsOnSignalingThreadImpl( RTC_DCHECK_RUN_ON(signaling_thread_); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; - ProduceDataChannelStats_s(timestamp, partial_report); ProduceMediaStreamStats_s(timestamp, partial_report); ProduceMediaStreamTrackStats_s(timestamp, partial_report); ProduceMediaSourceStats_s(timestamp, partial_report); @@ -1519,6 +1518,8 @@ void RTCStatsCollector::ProducePartialResultsOnNetworkThread( // `network_report_event_` is reset before this method is invoked. network_report_ = RTCStatsReport::Create(timestamp); + ProduceDataChannelStats_n(timestamp, network_report_.get()); + std::set transport_names; if (sctp_transport_name) { transport_names.emplace(std::move(*sctp_transport_name)); @@ -1653,10 +1654,10 @@ void RTCStatsCollector::ProduceCertificateStats_n( } } -void RTCStatsCollector::ProduceDataChannelStats_s( +void RTCStatsCollector::ProduceDataChannelStats_n( Timestamp timestamp, RTCStatsReport* report) const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; std::vector data_stats = pc_->GetDataChannelStats(); for (const auto& stats : data_stats) { diff --git a/pc/rtc_stats_collector.h b/pc/rtc_stats_collector.h index ac0453fb7c..34962bf5f7 100644 --- a/pc/rtc_stats_collector.h +++ b/pc/rtc_stats_collector.h @@ -186,7 +186,7 @@ class RTCStatsCollector : public rtc::RefCountInterface { const std::map& transport_cert_stats, RTCStatsReport* report) const; // Produces `RTCDataChannelStats`. - void ProduceDataChannelStats_s(Timestamp timestamp, + void ProduceDataChannelStats_n(Timestamp timestamp, RTCStatsReport* report) const; // Produces `RTCIceCandidatePairStats` and `RTCIceCandidateStats`. void ProduceIceCandidateAndPairStats_n( diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc index 623a153067..892eca9aa7 100644 --- a/pc/sctp_data_channel.cc +++ b/pc/sctp_data_channel.cc @@ -38,8 +38,8 @@ int GenerateUniqueId() { // Define proxy for DataChannelInterface. BEGIN_PROXY_MAP(DataChannel) PROXY_PRIMARY_THREAD_DESTRUCTOR() -PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*) -PROXY_METHOD0(void, UnregisterObserver) +BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*) +BYPASS_PROXY_METHOD0(void, UnregisterObserver) BYPASS_PROXY_CONSTMETHOD0(std::string, label) BYPASS_PROXY_CONSTMETHOD0(bool, reliable) BYPASS_PROXY_CONSTMETHOD0(bool, ordered) @@ -50,20 +50,18 @@ BYPASS_PROXY_CONSTMETHOD0(absl::optional, maxPacketLifeTime) BYPASS_PROXY_CONSTMETHOD0(std::string, protocol) BYPASS_PROXY_CONSTMETHOD0(bool, negotiated) // Can't bypass the proxy since the id may change. -PROXY_CONSTMETHOD0(int, id) +PROXY_SECONDARY_CONSTMETHOD0(int, id) BYPASS_PROXY_CONSTMETHOD0(Priority, priority) -PROXY_CONSTMETHOD0(DataState, state) -PROXY_CONSTMETHOD0(RTCError, error) -PROXY_CONSTMETHOD0(uint32_t, messages_sent) -PROXY_CONSTMETHOD0(uint64_t, bytes_sent) -PROXY_CONSTMETHOD0(uint32_t, messages_received) -PROXY_CONSTMETHOD0(uint64_t, bytes_received) -PROXY_CONSTMETHOD0(uint64_t, buffered_amount) -PROXY_METHOD0(void, Close) -// TODO(bugs.webrtc.org/11547): Change to run on the network thread. -PROXY_METHOD1(bool, Send, const DataBuffer&) +BYPASS_PROXY_CONSTMETHOD0(DataState, state) +PROXY_SECONDARY_CONSTMETHOD0(RTCError, error) +PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent) +PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent) +PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received) +PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received) +PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount) +PROXY_SECONDARY_METHOD0(void, Close) +PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&) END_PROXY_MAP(DataChannel) - } // namespace InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base) @@ -142,6 +140,78 @@ void SctpSidAllocator::ReleaseSid(StreamId sid) { used_sids_.erase(sid); } +// A DataChannelObserver implementation that offers backwards compatibility with +// implementations that aren't yet ready to be called back on the network +// thread. This implementation posts events to the signaling thread where +// events are delivered. +// In the class, and together with the `SctpDataChannel` implementation, there's +// special handling for the `state()` property whereby if that property is +// queried on the channel object while inside an event callback, we return +// the state that was active at the time the event was issued. This is to avoid +// a problem with calling the `state()` getter on the proxy, which would do +// a blocking call to the network thread, effectively flushing operations on +// the network thread that could cause the state to change and eventually return +// a misleading or arguably, wrong, state value to the callback implementation. +// As a future improvement to the ObserverAdapter, we could do the same for +// other properties that need to be read on the network thread. Eventually +// all implementations should expect to be called on the network thread though +// and the ObserverAdapter no longer be necessary. +class SctpDataChannel::ObserverAdapter : public DataChannelObserver { + public: + explicit ObserverAdapter(DataChannelObserver* delegate, + SctpDataChannel* channel) + : delegate_(delegate), channel_(channel) {} + + bool IsInsideStateNotification() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + return inside_state_change_; + } + + DataChannelInterface::DataState cached_state() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK(IsInsideStateNotification()); + return cached_state_; + } + + private: + void OnStateChange() override { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + SafeTask(safety_.flag(), [this, new_state = channel_->state()] { + RTC_DCHECK_RUN_ON(signaling_thread()); + cached_state_ = new_state; + inside_state_change_ = true; + delegate_->OnStateChange(); + inside_state_change_ = false; + })); + } + + void OnMessage(const DataBuffer& buffer) override { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + SafeTask(safety_.flag(), + [this, buffer = buffer] { delegate_->OnMessage(buffer); })); + } + + void OnBufferedAmountChange(uint64_t sent_data_size) override { + RTC_DCHECK_RUN_ON(network_thread()); + signaling_thread()->PostTask( + SafeTask(safety_.flag(), [this, sent_data_size] { + delegate_->OnBufferedAmountChange(sent_data_size); + })); + } + + rtc::Thread* signaling_thread() const { return channel_->signaling_thread_; } + rtc::Thread* network_thread() const { return channel_->network_thread_; } + + DataChannelObserver* const delegate_; + SctpDataChannel* const channel_; + ScopedTaskSafety safety_; + bool inside_state_change_ RTC_GUARDED_BY(signaling_thread()) = false; + DataChannelInterface::DataState cached_state_ + RTC_GUARDED_BY(signaling_thread()) = DataChannelInterface::kConnecting; +}; + // static rtc::scoped_refptr SctpDataChannel::Create( rtc::WeakPtr controller, @@ -175,7 +245,6 @@ SctpDataChannel::SctpDataChannel( rtc::Thread* network_thread) : signaling_thread_(signaling_thread), network_thread_(network_thread), - id_s_(config.id), id_n_(config.id), internal_id_(GenerateUniqueId()), label_(label), @@ -207,19 +276,81 @@ SctpDataChannel::SctpDataChannel( } } -SctpDataChannel::~SctpDataChannel() { - RTC_DCHECK_RUN_ON(signaling_thread_); -} +SctpDataChannel::~SctpDataChannel() {} void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) { - RTC_DCHECK_RUN_ON(signaling_thread_); - observer_ = observer; - DeliverQueuedReceivedData(); + // Note: at this point, we do not know on which thread we're being called + // from since this method bypasses the proxy. On Android in particular, + // registration methods are called from unknown threads. + + // Check if we should set up an observer adapter that will make sure that + // callbacks are delivered on the signaling thread rather than directly + // on the network thread. + const auto* current_thread = rtc::Thread::Current(); + // TODO(webrtc:11547): Eventually all DataChannelObserver implementations + // should be called on the network thread and IsOkToCallOnTheNetworkThread(). + if (!observer->IsOkToCallOnTheNetworkThread()) { + auto prepare_observer = [&]() { + RTC_DCHECK(!observer_adapter_); + observer_adapter_ = std::make_unique(observer, this); + return observer_adapter_.get(); + }; + // Instantiate the adapter in the right context and then substitute the + // observer pointer the SctpDataChannel will call back on, with the adapter. + if (signaling_thread_ == current_thread) { + observer = prepare_observer(); + } else { + observer = signaling_thread_->BlockingCall(std::move(prepare_observer)); + } + } + + // Now do the observer registration on the network thread. + auto register_observer = [&] { + RTC_DCHECK_RUN_ON(network_thread_); + observer_ = observer; + DeliverQueuedReceivedData(); + }; + + if (network_thread_ == current_thread) { + register_observer(); + } else { + network_thread_->BlockingCall(std::move(register_observer)); + } } void SctpDataChannel::UnregisterObserver() { - RTC_DCHECK_RUN_ON(signaling_thread_); - observer_ = nullptr; + // Note: As with `RegisterObserver`, the proxy is being bypassed. + const auto* current_thread = rtc::Thread::Current(); + // Callers must not be invoking the unregistration from the network thread + // (assuming a multi-threaded environment where we have a dedicated network + // thread). That would indicate non-network related work happening on the + // network thread or that unregistration is being done from within a callback + // (without unwinding the stack, which is a requirement). + // The network thread is not allowed to make blocking calls to the signaling + // thread, so that would blow up if attempted. Since we support an adapter + // for observers that are not safe to call on the network thread, we do + // need to check+free it on the signaling thread. + RTC_DCHECK(current_thread != network_thread_ || + network_thread_ == signaling_thread_); + + auto unregister_observer = [&] { + RTC_DCHECK_RUN_ON(network_thread_); + observer_ = nullptr; + }; + + if (current_thread == network_thread_) { + unregister_observer(); + } else { + network_thread_->BlockingCall(std::move(unregister_observer)); + } + + auto clear_observer = [&]() { observer_adapter_.reset(); }; + + if (current_thread != signaling_thread_) { + signaling_thread_->BlockingCall(std::move(clear_observer)); + } else { + clear_observer(); + } } std::string SctpDataChannel::label() const { @@ -261,8 +392,11 @@ bool SctpDataChannel::negotiated() const { } int SctpDataChannel::id() const { - RTC_DCHECK_RUN_ON(signaling_thread_); - return id_s_.stream_id_int(); + RTC_DCHECK_RUN_ON(network_thread_); + // TODO(tommi): Once an ID has been assigned, it won't change (can be + // considered const). We could do special handling of this and allow bypassing + // the proxy so that we can return a valid id without thread hopping. + return id_n_.stream_id_int(); } Priority SctpDataChannel::priority() const { @@ -270,12 +404,12 @@ Priority SctpDataChannel::priority() const { } uint64_t SctpDataChannel::buffered_amount() const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); return queued_send_data_.byte_count(); } void SctpDataChannel::Close() { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); if (state_ == kClosing || state_ == kClosed) return; SetState(kClosing); @@ -284,40 +418,58 @@ void SctpDataChannel::Close() { } SctpDataChannel::DataState SctpDataChannel::state() const { - RTC_DCHECK_RUN_ON(signaling_thread_); - return state_; + // Note: The proxy is bypassed for the `state()` accessor. This is to allow + // observer callbacks to query what the new state is from within a state + // update notification without having to do a blocking call to the network + // thread from within a callback. This also makes it so that the returned + // state is guaranteed to be the new state that provoked the state change + // notification, whereby a blocking call to the network thread might end up + // getting put behind other messages on the network thread and eventually + // fetch a different state value (since pending messages might cause the + // state to change in the meantime). + const auto* current_thread = rtc::Thread::Current(); + if (current_thread == signaling_thread_) { + if (observer_adapter_ && observer_adapter_->IsInsideStateNotification()) + return observer_adapter_->cached_state(); + } + + auto return_state = [&] { + RTC_DCHECK_RUN_ON(network_thread_); + return state_; + }; + + return current_thread == network_thread_ + ? return_state() + : network_thread_->BlockingCall(std::move(return_state)); } RTCError SctpDataChannel::error() const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); return error_; } uint32_t SctpDataChannel::messages_sent() const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); return messages_sent_; } uint64_t SctpDataChannel::bytes_sent() const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); return bytes_sent_; } uint32_t SctpDataChannel::messages_received() const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); return messages_received_; } uint64_t SctpDataChannel::bytes_received() const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); return bytes_received_; } bool SctpDataChannel::Send(const DataBuffer& buffer) { - RTC_DCHECK_RUN_ON(signaling_thread_); - // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network - // thread. Bring buffer management etc to the network thread and keep the - // operational state management on the signaling thread. + RTC_DCHECK_RUN_ON(network_thread_); if (state_ != kOpen) { return false; @@ -335,25 +487,17 @@ bool SctpDataChannel::Send(const DataBuffer& buffer) { return true; } -void SctpDataChannel::SetSctpSid_s(StreamId sid) { - RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(!id_s_.HasValue()); - RTC_DCHECK(sid.HasValue()); - RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck); - RTC_DCHECK_EQ(state_, kConnecting); - - id_s_ = sid; -} - void SctpDataChannel::SetSctpSid_n(StreamId sid) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK(!id_n_.HasValue()); RTC_DCHECK(sid.HasValue()); + RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck); + RTC_DCHECK_EQ(state_, kConnecting); id_n_ = sid; } void SctpDataChannel::OnClosingProcedureStartedRemotely() { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); if (state_ != kClosing && state_ != kClosed) { // Don't bother sending queued data since the side that initiated the // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy @@ -369,7 +513,7 @@ void SctpDataChannel::OnClosingProcedureStartedRemotely() { } void SctpDataChannel::OnClosingProcedureComplete() { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); // If the closing procedure is complete, we should have finished sending // all pending data and transitioned to kClosing already. RTC_DCHECK_EQ(state_, kClosing); @@ -378,12 +522,12 @@ void SctpDataChannel::OnClosingProcedureComplete() { } void SctpDataChannel::OnTransportChannelCreated() { - RTC_DCHECK_RUN_ON(signaling_thread_); - + RTC_DCHECK_RUN_ON(network_thread_); connected_to_transport_ = true; } void SctpDataChannel::OnTransportChannelClosed(RTCError error) { + RTC_DCHECK_RUN_ON(network_thread_); // The SctpTransport is unusable, which could come from multiple reasons: // - the SCTP m= section was rejected // - the DTLS transport is closed @@ -392,7 +536,7 @@ void SctpDataChannel::OnTransportChannelClosed(RTCError error) { } DataChannelStats SctpDataChannel::GetStats() const { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); DataChannelStats stats{internal_id_, id(), label(), protocol(), state(), messages_sent(), messages_received(), bytes_sent(), bytes_received()}; @@ -401,25 +545,25 @@ DataChannelStats SctpDataChannel::GetStats() const { void SctpDataChannel::OnDataReceived(DataMessageType type, const rtc::CopyOnWriteBuffer& payload) { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); if (type == DataMessageType::kControl) { if (handshake_state_ != kHandshakeWaitingForAck) { // Ignore it if we are not expecting an ACK message. RTC_LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, sid = " - << id_s_.stream_id_int(); + << id_n_.stream_id_int(); return; } if (ParseDataChannelOpenAckMessage(payload)) { // We can send unordered as soon as we receive the ACK message. handshake_state_ = kHandshakeReady; RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = " - << id_s_.stream_id_int(); + << id_n_.stream_id_int(); } else { RTC_LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = " - << id_s_.stream_id_int(); + << id_n_.stream_id_int(); } return; } @@ -428,7 +572,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type, type == DataMessageType::kText); RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " - << id_s_.stream_id_int(); + << id_n_.stream_id_int(); // We can send unordered as soon as we receive any DATA message since the // remote side must have received the OPEN (and old clients do not send // OPEN_ACK). @@ -459,7 +603,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type, } void SctpDataChannel::OnTransportReady() { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); // TODO(bugs.webrtc.org/11547): The transport is configured inside // `PeerConnection::SetupDataChannelTransport_n`, which results in @@ -472,6 +616,7 @@ void SctpDataChannel::OnTransportReady() { // be on for the below `Send*` calls, which currently do a BlockingCall // from the signaling thread to the network thread. RTC_DCHECK(connected_to_transport_); + RTC_DCHECK(id_n_.HasValue()); SendQueuedControlMessages(); SendQueuedDataMessages(); @@ -480,7 +625,7 @@ void SctpDataChannel::OnTransportReady() { } void SctpDataChannel::CloseAbruptlyWithError(RTCError error) { - RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_RUN_ON(network_thread_); if (state_ == kClosed) { return; @@ -501,13 +646,14 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) { void SctpDataChannel::CloseAbruptlyWithDataChannelFailure( const std::string& message) { + RTC_DCHECK_RUN_ON(network_thread_); RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message); error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE); CloseAbruptlyWithError(std::move(error)); } +// RTC_RUN_ON(network_thread_). void SctpDataChannel::UpdateState() { - RTC_DCHECK_RUN_ON(signaling_thread_); // UpdateState determines what to do from a few state variables. Include // all conditions required for each state transition here for // clarity. OnTransportReady(true) will send any queued data and then invoke @@ -535,7 +681,7 @@ void SctpDataChannel::UpdateState() { DeliverQueuedReceivedData(); } } else { - RTC_DCHECK(!id_s_.HasValue()); + RTC_DCHECK(!id_n_.HasValue()); } break; } @@ -551,11 +697,9 @@ void SctpDataChannel::UpdateState() { // to complete; after calling RemoveSctpDataStream, // OnClosingProcedureComplete will end up called asynchronously // afterwards. - if (!started_closing_procedure_ && id_s_.HasValue()) { + if (!started_closing_procedure_ && id_n_.HasValue()) { started_closing_procedure_ = true; - network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] { - c->RemoveSctpDataStream(sid); - }); + controller_->RemoveSctpDataStream(id_n_); } } } else { @@ -572,8 +716,8 @@ void SctpDataChannel::UpdateState() { } } +// RTC_RUN_ON(network_thread_). void SctpDataChannel::SetState(DataState state) { - RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == state) { return; } @@ -587,8 +731,8 @@ void SctpDataChannel::SetState(DataState state) { controller_->OnChannelStateChanged(this, state_); } +// RTC_RUN_ON(network_thread_). void SctpDataChannel::DeliverQueuedReceivedData() { - RTC_DCHECK_RUN_ON(signaling_thread_); if (!observer_) { return; } @@ -601,8 +745,8 @@ void SctpDataChannel::DeliverQueuedReceivedData() { } } +// RTC_RUN_ON(network_thread_). void SctpDataChannel::SendQueuedDataMessages() { - RTC_DCHECK_RUN_ON(signaling_thread_); if (queued_send_data_.Empty()) { return; } @@ -619,9 +763,9 @@ void SctpDataChannel::SendQueuedDataMessages() { } } +// RTC_RUN_ON(network_thread_). bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) { - RTC_DCHECK_RUN_ON(signaling_thread_); SendDataParams send_params; if (!controller_) { return false; @@ -641,7 +785,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, send_params.type = buffer.binary ? DataMessageType::kBinary : DataMessageType::kText; - RTCError error = controller_->SendData(id_s_, send_params, buffer.data); + RTCError error = controller_->SendData(id_n_, send_params, buffer.data); if (error.ok()) { ++messages_sent_; @@ -669,8 +813,8 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, return false; } +// RTC_RUN_ON(network_thread_). bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) { - RTC_DCHECK_RUN_ON(signaling_thread_); size_t start_buffered_amount = queued_send_data_.byte_count(); if (start_buffered_amount + buffer.size() > DataChannelInterface::MaxSendQueueSize()) { @@ -681,8 +825,8 @@ bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) { return true; } +// RTC_RUN_ON(network_thread_). void SctpDataChannel::SendQueuedControlMessages() { - RTC_DCHECK_RUN_ON(signaling_thread_); PacketQueue control_packets; control_packets.Swap(&queued_control_data_); @@ -692,10 +836,10 @@ void SctpDataChannel::SendQueuedControlMessages() { } } +// RTC_RUN_ON(network_thread_). bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { - RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(connected_to_transport_); - RTC_DCHECK(id_s_.HasValue()); + RTC_DCHECK(id_n_.HasValue()); RTC_DCHECK(controller_); bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; @@ -708,10 +852,10 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { send_params.ordered = ordered_ || is_open_message; send_params.type = DataMessageType::kControl; - RTCError err = controller_->SendData(id_s_, send_params, buffer); + RTCError err = controller_->SendData(id_n_, send_params, buffer); if (err.ok()) { RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel " - << id_s_.stream_id_int(); + << id_n_.stream_id_int(); if (handshake_state_ == kHandshakeShouldSendAck) { handshake_state_ = kHandshakeReady; diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h index 588b0cbf63..fcf1ffe6ed 100644 --- a/pc/sctp_data_channel.h +++ b/pc/sctp_data_channel.h @@ -192,7 +192,6 @@ class SctpDataChannel : public DataChannelInterface { // Sets the SCTP sid and adds to transport layer if not set yet. Should only // be called once. - void SetSctpSid_s(StreamId sid); void SetSctpSid_n(StreamId sid); // The remote side started the closing procedure by resetting its outgoing @@ -216,10 +215,6 @@ class SctpDataChannel : public DataChannelInterface { // stats purposes (see also `GetStats()`). int internal_id() const { return internal_id_; } - StreamId sid_s() const { - RTC_DCHECK_RUN_ON(signaling_thread_); - return id_s_; - } StreamId sid_n() const { RTC_DCHECK_RUN_ON(network_thread_); return id_n_; @@ -239,6 +234,8 @@ class SctpDataChannel : public DataChannelInterface { ~SctpDataChannel() override; private: + class ObserverAdapter; + // The OPEN(_ACK) signaling state. enum HandshakeState { kHandshakeInit, @@ -248,21 +245,23 @@ class SctpDataChannel : public DataChannelInterface { kHandshakeReady }; - void UpdateState(); - void SetState(DataState state); + void UpdateState() RTC_RUN_ON(network_thread_); + void SetState(DataState state) RTC_RUN_ON(network_thread_); - void DeliverQueuedReceivedData(); + void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_); - void SendQueuedDataMessages(); - bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked); - bool QueueSendDataMessage(const DataBuffer& buffer); + void SendQueuedDataMessages() RTC_RUN_ON(network_thread_); + bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) + RTC_RUN_ON(network_thread_); + bool QueueSendDataMessage(const DataBuffer& buffer) + RTC_RUN_ON(network_thread_); - void SendQueuedControlMessages(); - bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer); + void SendQueuedControlMessages() RTC_RUN_ON(network_thread_); + bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) + RTC_RUN_ON(network_thread_); rtc::Thread* const signaling_thread_; rtc::Thread* const network_thread_; - StreamId id_s_ RTC_GUARDED_BY(signaling_thread_); StreamId id_n_ RTC_GUARDED_BY(network_thread_); const int internal_id_; const std::string label_; @@ -273,25 +272,26 @@ class SctpDataChannel : public DataChannelInterface { const bool negotiated_; const bool ordered_; - DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr; - DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting; - RTCError error_ RTC_GUARDED_BY(signaling_thread_); - uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; - uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0; - uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0; - uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0; + DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr; + std::unique_ptr observer_adapter_; + DataState state_ RTC_GUARDED_BY(network_thread_) = kConnecting; + RTCError error_ RTC_GUARDED_BY(network_thread_); + uint32_t messages_sent_ RTC_GUARDED_BY(network_thread_) = 0; + uint64_t bytes_sent_ RTC_GUARDED_BY(network_thread_) = 0; + uint32_t messages_received_ RTC_GUARDED_BY(network_thread_) = 0; + uint64_t bytes_received_ RTC_GUARDED_BY(network_thread_) = 0; rtc::WeakPtr controller_ - RTC_GUARDED_BY(signaling_thread_); - HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_) = + RTC_GUARDED_BY(network_thread_); + HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) = kHandshakeInit; - bool connected_to_transport_ RTC_GUARDED_BY(signaling_thread_) = false; + bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false; // Did we already start the graceful SCTP closing procedure? - bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false; + bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false; // Control messages that always have to get sent out before any queued // data. - PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_); - PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_); - PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_); + PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_); + PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_); + PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_); }; // Downcast a PeerConnectionInterface that points to a proxy object diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index c382d611ae..90a6cd2b92 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -3268,20 +3268,16 @@ void SdpOfferAnswerHandler::AllocateSctpSids() { return; } - absl::optional role = network_thread()->BlockingCall([this] { - RTC_DCHECK_RUN_ON(network_thread()); - return pc_->GetSctpSslRole_n(); - }); - - if (!role) { - role = GuessSslRole(); - } - - if (role) { - // TODO(webrtc:11547): Make this call on the network thread too once - // `AllocateSctpSids` has been updated. - data_channel_controller()->AllocateSctpSids(*role); - } + absl::optional guessed_role = GuessSslRole(); + network_thread()->BlockingCall( + [&, data_channel_controller = data_channel_controller()] { + RTC_DCHECK_RUN_ON(network_thread()); + absl::optional role = pc_->GetSctpSslRole_n(); + if (!role) + role = guessed_role; + if (role) + data_channel_controller->AllocateSctpSids(*role); + }); } absl::optional SdpOfferAnswerHandler::GuessSslRole() const { @@ -5117,13 +5113,13 @@ void SdpOfferAnswerHandler::DestroyDataChannelTransport(RTCError error) { RTC_DCHECK_RUN_ON(signaling_thread()); const bool has_sctp = pc_->sctp_mid().has_value(); - if (has_sctp) - data_channel_controller()->OnTransportChannelClosed(error); - - context_->network_thread()->BlockingCall([this] { - RTC_DCHECK_RUN_ON(context_->network_thread()); - pc_->TeardownDataChannelTransport_n(); - }); + context_->network_thread()->BlockingCall( + [&, data_channel_controller = data_channel_controller()] { + RTC_DCHECK_RUN_ON(context_->network_thread()); + if (has_sctp) + data_channel_controller->OnTransportChannelClosed(error); + pc_->TeardownDataChannelTransport_n(); + }); if (has_sctp) pc_->ResetSctpDataMid(); diff --git a/pc/test/fake_data_channel_controller.h b/pc/test/fake_data_channel_controller.h index 26ecc31378..d1b41fcf88 100644 --- a/pc/test/fake_data_channel_controller.h +++ b/pc/test/fake_data_channel_controller.h @@ -29,23 +29,31 @@ class FakeDataChannelController transport_available_(false), ready_to_send_(false), transport_error_(false) {} - virtual ~FakeDataChannelController() {} + + ~FakeDataChannelController() override { + network_thread_->BlockingCall([&] { + RTC_DCHECK_RUN_ON(network_thread_); + weak_factory_.InvalidateWeakPtrs(); + }); + } rtc::WeakPtr weak_ptr() { + RTC_DCHECK_RUN_ON(network_thread_); return weak_factory_.GetWeakPtr(); } rtc::scoped_refptr CreateDataChannel( absl::string_view label, webrtc::InternalDataChannelInit init) { - rtc::WeakPtr my_weak_ptr = weak_ptr(); - // Explicitly associate the weak ptr instance with the current thread to - // catch early any inappropriate referencing of it on the network thread. - RTC_CHECK(my_weak_ptr); - rtc::scoped_refptr channel = network_thread_->BlockingCall([&]() { RTC_DCHECK_RUN_ON(network_thread_); + rtc::WeakPtr my_weak_ptr = weak_ptr(); + // Explicitly associate the weak ptr instance with the current thread + // to catch early any inappropriate referencing of it on the network + // thread. + RTC_CHECK(my_weak_ptr); + rtc::scoped_refptr channel = webrtc::SctpDataChannel::Create( std::move(my_weak_ptr), std::string(label), @@ -54,17 +62,16 @@ class FakeDataChannelController if (transport_available_ && channel->sid_n().HasValue()) { AddSctpDataStream(channel->sid_n()); } + if (ready_to_send_) { + network_thread_->PostTask([channel = channel] { + if (channel->state() != + webrtc::DataChannelInterface::DataState::kClosed) { + channel->OnTransportReady(); + } + }); + } return channel; }); - if (ready_to_send_) { - signaling_thread_->PostTask( - SafeTask(signaling_safety_.flag(), [channel = channel] { - if (channel->state() != - webrtc::DataChannelInterface::DataState::kClosed) { - channel->OnTransportReady(); - } - })); - } connected_channels_.insert(channel.get()); return channel; } @@ -72,6 +79,7 @@ class FakeDataChannelController webrtc::RTCError SendData(webrtc::StreamId sid, const webrtc::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload) override { + RTC_DCHECK_RUN_ON(network_thread_); RTC_CHECK(ready_to_send_); RTC_CHECK(transport_available_); if (send_blocked_) { @@ -100,17 +108,14 @@ class FakeDataChannelController RTC_DCHECK_RUN_ON(network_thread_); RTC_CHECK(sid.HasValue()); known_stream_ids_.erase(sid); - signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] { - // Unlike the real SCTP transport, act like the closing procedure finished - // instantly. - auto it = absl::c_find_if(connected_channels_, [&](const auto* c) { - return c->sid_s() == sid; - }); - // This path mimics the DCC's OnChannelClosed handler since the FDCC - // (this class) doesn't have a transport that would do that. - if (it != connected_channels_.end()) - (*it)->OnClosingProcedureComplete(); - })); + // Unlike the real SCTP transport, act like the closing procedure finished + // instantly. + auto it = absl::c_find_if(connected_channels_, + [&](const auto* c) { return c->sid_n() == sid; }); + // This path mimics the DCC's OnChannelClosed handler since the FDCC + // (this class) doesn't have a transport that would do that. + if (it != connected_channels_.end()) + (*it)->OnClosingProcedureComplete(); } void OnChannelStateChanged( @@ -126,36 +131,40 @@ class FakeDataChannelController // Set true to emulate the SCTP stream being blocked by congestion control. void set_send_blocked(bool blocked) { - send_blocked_ = blocked; - if (!blocked) { - RTC_CHECK(transport_available_); - // Make a copy since `connected_channels_` may change while - // OnTransportReady is called. - auto copy = connected_channels_; - for (webrtc::SctpDataChannel* ch : copy) { - ch->OnTransportReady(); + network_thread_->BlockingCall([&]() { + send_blocked_ = blocked; + if (!blocked) { + RTC_CHECK(transport_available_); + // Make a copy since `connected_channels_` may change while + // OnTransportReady is called. + auto copy = connected_channels_; + for (webrtc::SctpDataChannel* ch : copy) { + ch->OnTransportReady(); + } } - } + }); } // Set true to emulate the transport channel creation, e.g. after // setLocalDescription/setRemoteDescription called with data content. void set_transport_available(bool available) { - transport_available_ = available; + network_thread_->BlockingCall([&]() { transport_available_ = available; }); } // Set true to emulate the transport OnTransportReady signal when the // transport becomes writable for the first time. void set_ready_to_send(bool ready) { RTC_CHECK(transport_available_); - ready_to_send_ = ready; - if (ready) { - std::set::iterator it; - for (it = connected_channels_.begin(); it != connected_channels_.end(); - ++it) { - (*it)->OnTransportReady(); + network_thread_->BlockingCall([&]() { + ready_to_send_ = ready; + if (ready) { + std::set::iterator it; + for (it = connected_channels_.begin(); it != connected_channels_.end(); + ++it) { + (*it)->OnTransportReady(); + } } - } + }); } void set_transport_error() { transport_error_ = true; }