From 13759bac93e027f2c9934de9477feece126bdfda Mon Sep 17 00:00:00 2001 From: Tommi Date: Mon, 6 Mar 2023 12:51:39 +0100 Subject: [PATCH] Add safety flag to DataChannelController for PostTask. Bug: none Change-Id: If70c1cd9fb4e6fefefa96df819deac052655c49a Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/296140 Reviewed-by: Markus Handell Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#39485} --- pc/BUILD.gn | 1 + pc/data_channel_controller.cc | 100 +++++++++++++++------------------- pc/data_channel_controller.h | 2 + 3 files changed, 46 insertions(+), 57 deletions(-) diff --git a/pc/BUILD.gn b/pc/BUILD.gn index b071145c94..2bcf9d3e8d 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -956,6 +956,7 @@ rtc_source_set("data_channel_controller") { "../api:rtc_error", "../api:scoped_refptr", "../api:sequence_checker", + "../api/task_queue:pending_task_safety_flag", "../api/transport:datagram_transport_interface", "../media:media_channel", "../media:rtc_media_base", diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index 6338c55b5b..d0b39fc385 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -119,58 +119,50 @@ void DataChannelController::OnDataReceived( return; signaling_thread()->PostTask( - [self = weak_factory_.GetWeakPtr(), params, buffer] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - // TODO(bugs.webrtc.org/11547): The data being received should be - // delivered on the network thread (change - // SignalDataChannelTransportReceivedData_s to - // SignalDataChannelTransportReceivedData_n). - self->SignalDataChannelTransportReceivedData_s(params, buffer); - } - }); + SafeTask(signaling_safety_.flag(), [this, params, buffer] { + RTC_DCHECK_RUN_ON(signaling_thread()); + // TODO(bugs.webrtc.org/11547): The data being received should be + // delivered on the network thread (change + // SignalDataChannelTransportReceivedData_s to + // SignalDataChannelTransportReceivedData_n). + SignalDataChannelTransportReceivedData_s(params, buffer); + })); } void DataChannelController::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->SignalDataChannelTransportChannelClosing_s(channel_id); - } - }); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), [this, channel_id] { + RTC_DCHECK_RUN_ON(signaling_thread()); + SignalDataChannelTransportChannelClosing_s(channel_id); + })); } void DataChannelController::OnChannelClosed(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->SignalDataChannelTransportChannelClosed_s(channel_id); - } - }); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), [this, channel_id] { + RTC_DCHECK_RUN_ON(signaling_thread()); + SignalDataChannelTransportChannelClosed_s(channel_id); + })); } void DataChannelController::OnReadyToSend() { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->data_channel_transport_ready_to_send_ = true; - self->SignalDataChannelTransportWritable_s( - self->data_channel_transport_ready_to_send_); - } - }); + signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + data_channel_transport_ready_to_send_ = true; + SignalDataChannelTransportWritable_s(data_channel_transport_ready_to_send_); + })); } void DataChannelController::OnTransportClosed(RTCError error) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), error] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->OnTransportChannelClosed(error); - } - }); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), [this, error] { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportChannelClosed(error); + })); } void DataChannelController::SetupDataChannelTransport_n() { @@ -236,14 +228,12 @@ bool DataChannelController::HandleOpenMessage_n( << params.sid; } else { config.open_handshake_role = InternalDataChannelInit::kAcker; - signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), - label = std::move(label), - config = std::move(config)] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->OnDataChannelOpenMessage(label, config); - } - }); + signaling_thread()->PostTask( + SafeTask(signaling_safety_.flag(), + [this, label = std::move(label), config = std::move(config)] { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnDataChannelOpenMessage(label, config); + })); } return true; } @@ -345,12 +335,10 @@ void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { // we can't free it directly here; we need to free it asynchronously. sctp_data_channels_to_free_.push_back(*it); sctp_data_channels_.erase(it); - signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->sctp_data_channels_to_free_.clear(); - } - }); + signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + sctp_data_channels_to_free_.clear(); + })); return; } } @@ -411,14 +399,12 @@ bool DataChannelController::DataChannelSendData( void DataChannelController::NotifyDataChannelsOfTransportCreated() { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - for (const auto& channel : self->sctp_data_channels_) { - channel->OnTransportChannelCreated(); - } + signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] { + RTC_DCHECK_RUN_ON(signaling_thread()); + for (const auto& channel : sctp_data_channels_) { + channel->OnTransportChannelCreated(); } - }); + })); } rtc::Thread* DataChannelController::network_thread() const { diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index aa3f54fb8d..691f2cd419 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -18,6 +18,7 @@ #include "api/rtc_error.h" #include "api/scoped_refptr.h" #include "api/sequence_checker.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "api/transport/data_channel_transport_interface.h" #include "media/base/media_channel.h" #include "pc/data_channel_utils.h" @@ -172,6 +173,7 @@ class DataChannelController : public SctpDataChannelControllerInterface, // The weak pointers must be dereferenced and invalidated on the signalling // thread only. rtc::WeakPtrFactory weak_factory_{this}; + ScopedTaskSafety signaling_safety_; }; } // namespace webrtc