From 236e36c25f76c0093469257e1ecf7c42ac816d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niels=20M=C3=B6ller?= Date: Tue, 23 Mar 2021 09:23:10 +0100 Subject: [PATCH] Delete AsyncInvoker usage in DataChannelController Tasks access this via WeakPtrFactory. Bug: webrtc:12339 Change-Id: I0aaeffd4bed59a6abfadf995286644c24c1fd716 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/212721 Reviewed-by: Harald Alvestrand Commit-Queue: Niels Moller Cr-Commit-Position: refs/heads/master@{#33560} --- pc/data_channel_controller.cc | 101 +++++++++++++++++++--------------- pc/data_channel_controller.h | 3 +- 2 files changed, 59 insertions(+), 45 deletions(-) diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index 9f0a490a09..df35d0c8d3 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -22,6 +22,7 @@ #include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/string_encode.h" +#include "rtc_base/task_utils/to_queued_task.h" namespace webrtc { @@ -137,59 +138,69 @@ void DataChannelController::OnDataReceived( cricket::ReceiveDataParams params; params.sid = channel_id; params.type = ToCricketDataMessageType(type); - data_channel_transport_invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { - RTC_DCHECK_RUN_ON(signaling_thread()); - // TODO(bugs.webrtc.org/11547): The data being received should be - // delivered on the network thread. The way HandleOpenMessage_s works - // right now is that it's called for all types of buffers and operates - // as a selector function. Change this so that it's only called for - // buffers that it should be able to handle. Once we do that, we can - // deliver all other buffers on the network thread (change - // SignalDataChannelTransportReceivedData_s to - // SignalDataChannelTransportReceivedData_n). - if (!HandleOpenMessage_s(params, buffer)) { - SignalDataChannelTransportReceivedData_s(params, buffer); + signaling_thread()->PostTask( + ToQueuedTask([self = weak_factory_.GetWeakPtr(), params, buffer] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + // TODO(bugs.webrtc.org/11547): The data being received should be + // delivered on the network thread. The way HandleOpenMessage_s works + // right now is that it's called for all types of buffers and operates + // as a selector function. Change this so that it's only called for + // buffers that it should be able to handle. Once we do that, we can + // deliver all other buffers on the network thread (change + // SignalDataChannelTransportReceivedData_s to + // SignalDataChannelTransportReceivedData_n). + if (!self->HandleOpenMessage_s(params, buffer)) { + self->SignalDataChannelTransportReceivedData_s(params, buffer); + } } - }); + })); } void DataChannelController::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, channel_id] { - RTC_DCHECK_RUN_ON(signaling_thread()); - SignalDataChannelTransportChannelClosing_s(channel_id); - }); + signaling_thread()->PostTask( + ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->SignalDataChannelTransportChannelClosing_s(channel_id); + } + })); } void DataChannelController::OnChannelClosed(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this, channel_id] { - RTC_DCHECK_RUN_ON(signaling_thread()); - SignalDataChannelTransportChannelClosed_s(channel_id); - }); + signaling_thread()->PostTask( + ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->SignalDataChannelTransportChannelClosed_s(channel_id); + } + })); } void DataChannelController::OnReadyToSend() { RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - data_channel_transport_ready_to_send_ = true; - SignalDataChannelTransportWritable_s( - data_channel_transport_ready_to_send_); - }); + signaling_thread()->PostTask( + ToQueuedTask([self = weak_factory_.GetWeakPtr()] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->data_channel_transport_ready_to_send_ = true; + self->SignalDataChannelTransportWritable_s( + self->data_channel_transport_ready_to_send_); + } + })); } void DataChannelController::OnTransportClosed() { RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportChannelClosed(); - }); + signaling_thread()->PostTask( + ToQueuedTask([self = weak_factory_.GetWeakPtr()] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->OnTransportChannelClosed(); + } + })); } void DataChannelController::SetupDataChannelTransport_n() { @@ -392,12 +403,12 @@ void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { sctp_data_channels_to_free_.push_back(*it); sctp_data_channels_.erase(it); signaling_thread()->PostTask( - RTC_FROM_HERE, [self = weak_factory_.GetWeakPtr()] { + ToQueuedTask([self = weak_factory_.GetWeakPtr()] { if (self) { RTC_DCHECK_RUN_ON(self->signaling_thread()); self->sctp_data_channels_to_free_.clear(); } - }); + })); return; } } @@ -598,13 +609,15 @@ bool DataChannelController::DataChannelSendData( void DataChannelController::NotifyDataChannelsOfTransportCreated() { RTC_DCHECK_RUN_ON(network_thread()); - data_channel_transport_invoker_.AsyncInvoke( - RTC_FROM_HERE, signaling_thread(), [this] { - RTC_DCHECK_RUN_ON(signaling_thread()); - for (const auto& channel : sctp_data_channels_) { - channel->OnTransportChannelCreated(); + signaling_thread()->PostTask( + ToQueuedTask([self = weak_factory_.GetWeakPtr()] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + for (const auto& channel : self->sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } } - }); + })); } rtc::Thread* DataChannelController::network_thread() const { diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index 40f4e4c989..f6d4409f55 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -29,7 +29,6 @@ #include "pc/data_channel_utils.h" #include "pc/rtp_data_channel.h" #include "pc/sctp_data_channel.h" -#include "rtc_base/async_invoker.h" #include "rtc_base/checks.h" #include "rtc_base/copy_on_write_buffer.h" #include "rtc_base/ssl_stream_adapter.h" @@ -245,6 +244,8 @@ class DataChannelController : public RtpDataChannelProviderInterface, // Owning PeerConnection. PeerConnection* const pc_; + // The weak pointers must be dereferenced and invalidated on the signalling + // thread only. rtc::WeakPtrFactory weak_factory_{this}; };