From a30439bbe68d2e6596399f0f447719308ff22184 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Thu, 7 Jul 2022 10:08:49 +0200 Subject: [PATCH] Migrate pc/ to absl::AnyInvocable based TaskQueueBase interface Bug: webrtc:14245 Change-Id: I9043aa507421a93f0d7ba7406e237f727999b696 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268121 Commit-Queue: Danil Chapovalov Reviewed-by: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#37478} --- pc/BUILD.gn | 24 ++++---- pc/audio_rtp_receiver.cc | 10 ++-- pc/channel.cc | 8 +-- pc/channel_unittest.cc | 9 ++- pc/connection_context.cc | 5 +- pc/data_channel_controller.cc | 91 +++++++++++++---------------- pc/dtmf_sender.cc | 21 +++---- pc/dtmf_sender.h | 8 +-- pc/peer_connection.cc | 47 ++++++++------- pc/proxy.h | 26 ++++----- pc/rtc_stats_collector.cc | 32 ++-------- pc/rtp_transceiver.cc | 5 +- pc/sctp_data_channel.cc | 15 +++-- pc/test/integration_test_helpers.cc | 3 +- pc/test/integration_test_helpers.h | 24 ++++---- 15 files changed, 140 insertions(+), 188 deletions(-) diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 632a072ca2..866f213e2b 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -78,7 +78,6 @@ rtc_source_set("channel") { "../api:sequence_checker", "../api/crypto:options", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/units:timestamp", "../call:rtp_interfaces", "../call:rtp_receiver", @@ -798,7 +797,6 @@ rtc_source_set("peerconnection") { "../api/rtc_event_log", "../api/task_queue", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/transport:bitrate_settings", "../api/transport:datagram_transport_interface", "../api/transport:enums", @@ -868,7 +866,6 @@ rtc_library("sctp_data_channel") { "../api:priority", "../api:rtc_error", "../api:scoped_refptr", - "../api/task_queue:to_queued_task", "../api/transport:datagram_transport_interface", "../media:rtc_data_sctp_transport_internal", "../media:rtc_media_base", @@ -883,7 +880,10 @@ rtc_library("sctp_data_channel") { "../rtc_base/system:unused", "../rtc_base/third_party/sigslot:sigslot", ] - absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] + absl_deps = [ + "//third_party/abseil-cpp/absl/cleanup", + "//third_party/abseil-cpp/absl/types:optional", + ] } rtc_library("data_channel_utils") { @@ -916,7 +916,6 @@ rtc_library("connection_context") { "../api:scoped_refptr", "../api:sequence_checker", "../api/neteq:neteq_api", - "../api/task_queue:to_queued_task", "../api/transport:field_trial_based_config", "../api/transport:sctp_transport_factory_interface", "../media:rtc_data_sctp_transport_factory", @@ -948,7 +947,6 @@ rtc_source_set("data_channel_controller") { "../api:rtc_error", "../api:scoped_refptr", "../api:sequence_checker", - "../api/task_queue:to_queued_task", "../api/transport:datagram_transport_interface", "../media:rtc_media_base", "../rtc_base", @@ -1053,6 +1051,7 @@ rtc_source_set("rtc_stats_collector") { "../rtc_base/third_party/sigslot:sigslot", ] absl_deps = [ + "//third_party/abseil-cpp/absl/functional:bind_front", "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", ] @@ -1213,7 +1212,6 @@ rtc_source_set("peer_connection") { "../api/crypto:options", "../api/rtc_event_log", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/transport:bitrate_settings", "../api/transport:datagram_transport_interface", "../api/transport:enums", @@ -1577,7 +1575,6 @@ rtc_library("rtp_transceiver") { "../api:sequence_checker", "../api/task_queue", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/video:video_bitrate_allocator_factory", "../media:rtc_media_base", "../rtc_base:checks", @@ -1709,7 +1706,6 @@ rtc_library("audio_rtp_receiver") { "../api:sequence_checker", "../api/crypto:frame_decryptor_interface", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/transport/rtp:rtp_source", "../media:rtc_media_base", "../rtc_base", @@ -1950,15 +1946,14 @@ rtc_library("dtmf_sender") { "../api:libjingle_peerconnection_api", "../api:scoped_refptr", "../api:sequence_checker", + "../api/task_queue", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", + "../api/units:time_delta", "../rtc_base:checks", "../rtc_base:location", "../rtc_base:logging", "../rtc_base:macromagic", "../rtc_base:refcount", - "../rtc_base:rtc_base", - "../rtc_base:threading", "../rtc_base/third_party/sigslot", ] absl_deps = [ @@ -2098,7 +2093,6 @@ if (rtc_include_tests && !build_with_chromium) { "../api:sequence_checker", "../api/task_queue:pending_task_safety_flag", "../api/task_queue:task_queue", - "../api/task_queue:to_queued_task", "../api/transport:datagram_transport_interface", "../api/transport:enums", "../api/video:builtin_video_bitrate_allocator_factory", @@ -2135,7 +2129,10 @@ if (rtc_include_tests && !build_with_chromium) { "../test:scoped_key_value_config", "../test:test_main", "../test:test_support", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", + "//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", @@ -2566,7 +2563,6 @@ if (rtc_include_tests && !build_with_chromium) { "../api/task_queue", "../api/task_queue:default_task_queue_factory", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/transport:field_trial_based_config", "../api/transport/rtp:rtp_source", "../api/units:time_delta", diff --git a/pc/audio_rtp_receiver.cc b/pc/audio_rtp_receiver.cc index be2d81e080..6ed163a196 100644 --- a/pc/audio_rtp_receiver.cc +++ b/pc/audio_rtp_receiver.cc @@ -17,7 +17,6 @@ #include #include "api/sequence_checker.h" -#include "api/task_queue/to_queued_task.h" #include "pc/audio_track.h" #include "pc/media_stream_track_proxy.h" #include "rtc_base/checks.h" @@ -78,11 +77,10 @@ void AudioRtpReceiver::OnChanged() { if (cached_track_enabled_ == enabled) return; cached_track_enabled_ = enabled; - worker_thread_->PostTask( - ToQueuedTask(worker_thread_safety_, [this, enabled]() { - RTC_DCHECK_RUN_ON(worker_thread_); - Reconfigure(enabled); - })); + worker_thread_->PostTask(SafeTask(worker_thread_safety_, [this, enabled]() { + RTC_DCHECK_RUN_ON(worker_thread_); + Reconfigure(enabled); + })); } // RTC_RUN_ON(worker_thread_) diff --git a/pc/channel.cc b/pc/channel.cc index 01cd432139..077731493e 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -20,7 +20,6 @@ #include "api/rtp_parameters.h" #include "api/sequence_checker.h" #include "api/task_queue/pending_task_safety_flag.h" -#include "api/task_queue/to_queued_task.h" #include "api/units/timestamp.h" #include "media/base/codec.h" #include "media/base/rid_description.h" @@ -43,7 +42,6 @@ using ::rtc::StringFormat; using ::rtc::UniqueRandomIdGenerator; using ::webrtc::PendingTaskSafetyFlag; using ::webrtc::SdpType; -using ::webrtc::ToQueuedTask; // Finds a stream based on target's Primary SSRC or RIDs. // This struct is used in BaseChannel::UpdateLocalStreams_w. @@ -197,7 +195,7 @@ bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) { if (rtp_transport_) { DisconnectFromRtpTransport_n(); // Clear the cached header extensions on the worker. - worker_thread_->PostTask(ToQueuedTask(alive_, [this] { + worker_thread_->PostTask(SafeTask(alive_, [this] { RTC_DCHECK_RUN_ON(worker_thread()); rtp_header_extensions_.clear(); })); @@ -237,7 +235,7 @@ void BaseChannel::Enable(bool enable) { enabled_s_ = enable; - worker_thread_->PostTask(ToQueuedTask(alive_, [this, enable] { + worker_thread_->PostTask(SafeTask(alive_, [this, enable] { RTC_DCHECK_RUN_ON(worker_thread()); // Sanity check to make sure that enabled_ and enabled_s_ // stay in sync. @@ -552,7 +550,7 @@ void BaseChannel::ChannelWritable_n() { // We only have to do this PostTask once, when first transitioning to // writable. if (!was_ever_writable_n_) { - worker_thread_->PostTask(ToQueuedTask(alive_, [this] { + worker_thread_->PostTask(SafeTask(alive_, [this] { RTC_DCHECK_RUN_ON(worker_thread()); was_ever_writable_ = true; UpdateMediaSendRecvState_w(); diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc index 4e718d3e3a..40beff8f9c 100644 --- a/pc/channel_unittest.cc +++ b/pc/channel_unittest.cc @@ -16,11 +16,11 @@ #include #include +#include "absl/functional/any_invocable.h" #include "api/array_view.h" #include "api/audio_options.h" #include "api/rtp_parameters.h" #include "api/task_queue/pending_task_safety_flag.h" -#include "api/task_queue/to_queued_task.h" #include "media/base/codec.h" #include "media/base/fake_media_engine.h" #include "media/base/fake_rtp.h" @@ -419,7 +419,7 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { } void SendRtp(typename T::MediaChannel* media_channel, rtc::Buffer data) { - network_thread_->PostTask(webrtc::ToQueuedTask( + network_thread_->PostTask(webrtc::SafeTask( network_thread_safety_, [media_channel, data = std::move(data)]() { media_channel->SendRtp(data.data(), data.size(), rtc::PacketOptions()); @@ -503,11 +503,10 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> { // destroyed before this object goes out of scope. class ScopedCallThread { public: - template - explicit ScopedCallThread(FunctorT&& functor) + explicit ScopedCallThread(absl::AnyInvocable functor) : thread_(rtc::Thread::Create()) { thread_->Start(); - thread_->PostTask(std::forward(functor)); + thread_->PostTask(std::move(functor)); } ~ScopedCallThread() { thread_->Stop(); } diff --git a/pc/connection_context.cc b/pc/connection_context.cc index de0597b0b8..13c598ae87 100644 --- a/pc/connection_context.cc +++ b/pc/connection_context.cc @@ -14,7 +14,6 @@ #include #include -#include "api/task_queue/to_queued_task.h" #include "api/transport/field_trial_based_config.h" #include "media/base/media_engine.h" #include "media/sctp/sctp_transport_factory.h" @@ -120,7 +119,7 @@ ConnectionContext::ConnectionContext( // network_thread_->IsCurrent() == true means signaling_thread_ is // network_thread_. In this case, no further action is required as // signaling_thread_ can already invoke network_thread_. - network_thread_->PostTask(ToQueuedTask( + network_thread_->PostTask( [thread = network_thread_, worker_thread = worker_thread_.get()] { thread->DisallowBlockingCalls(); thread->DisallowAllInvokes(); @@ -128,7 +127,7 @@ ConnectionContext::ConnectionContext( // In this case, worker_thread_ == network_thread_ thread->AllowInvokesToThread(thread); } - })); + }); } rtc::InitRandom(rtc::Time32()); diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index b1461cde9a..b655b530a5 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -14,7 +14,6 @@ #include "api/peer_connection_interface.h" #include "api/rtc_error.h" -#include "api/task_queue/to_queued_task.h" #include "pc/peer_connection_internal.h" #include "pc/sctp_utils.h" #include "rtc_base/location.h" @@ -114,7 +113,7 @@ void DataChannelController::OnDataReceived( params.sid = channel_id; params.type = type; signaling_thread()->PostTask( - ToQueuedTask([self = weak_factory_.GetWeakPtr(), params, buffer] { + [self = weak_factory_.GetWeakPtr(), params, buffer] { if (self) { RTC_DCHECK_RUN_ON(self->signaling_thread()); // TODO(bugs.webrtc.org/11547): The data being received should be @@ -129,53 +128,49 @@ void DataChannelController::OnDataReceived( self->SignalDataChannelTransportReceivedData_s(params, buffer); } } - })); + }); } void DataChannelController::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->SignalDataChannelTransportChannelClosing_s(channel_id); - } - })); + signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->SignalDataChannelTransportChannelClosing_s(channel_id); + } + }); } void DataChannelController::OnChannelClosed(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->SignalDataChannelTransportChannelClosed_s(channel_id); - } - })); + signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->SignalDataChannelTransportChannelClosed_s(channel_id); + } + }); } void DataChannelController::OnReadyToSend() { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask([self = weak_factory_.GetWeakPtr()] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->data_channel_transport_ready_to_send_ = true; - self->SignalDataChannelTransportWritable_s( - self->data_channel_transport_ready_to_send_); - } - })); + signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->data_channel_transport_ready_to_send_ = true; + self->SignalDataChannelTransportWritable_s( + self->data_channel_transport_ready_to_send_); + } + }); } void DataChannelController::OnTransportClosed(RTCError error) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask([self = weak_factory_.GetWeakPtr(), error] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->OnTransportChannelClosed(error); - } - })); + signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), error] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->OnTransportChannelClosed(error); + } + }); } void DataChannelController::SetupDataChannelTransport_n() { @@ -345,13 +340,12 @@ void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { // we can't free it directly here; we need to free it asynchronously. sctp_data_channels_to_free_.push_back(*it); sctp_data_channels_.erase(it); - signaling_thread()->PostTask( - ToQueuedTask([self = weak_factory_.GetWeakPtr()] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - self->sctp_data_channels_to_free_.clear(); - } - })); + signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + self->sctp_data_channels_to_free_.clear(); + } + }); return; } } @@ -413,15 +407,14 @@ bool DataChannelController::DataChannelSendData( void DataChannelController::NotifyDataChannelsOfTransportCreated() { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask([self = weak_factory_.GetWeakPtr()] { - if (self) { - RTC_DCHECK_RUN_ON(self->signaling_thread()); - for (const auto& channel : self->sctp_data_channels_) { - channel->OnTransportChannelCreated(); - } - } - })); + signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] { + if (self) { + RTC_DCHECK_RUN_ON(self->signaling_thread()); + for (const auto& channel : self->sctp_data_channels_) { + channel->OnTransportChannelCreated(); + } + } + }); } rtc::Thread* DataChannelController::network_thread() const { diff --git a/pc/dtmf_sender.cc b/pc/dtmf_sender.cc index 40d9e38848..91d642cc3d 100644 --- a/pc/dtmf_sender.cc +++ b/pc/dtmf_sender.cc @@ -13,10 +13,11 @@ #include #include -#include "api/task_queue/to_queued_task.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" -#include "rtc_base/thread.h" namespace webrtc { @@ -57,7 +58,7 @@ bool GetDtmfCode(char tone, int* code) { } rtc::scoped_refptr DtmfSender::Create( - rtc::Thread* signaling_thread, + TaskQueueBase* signaling_thread, DtmfProviderInterface* provider) { if (!signaling_thread) { return nullptr; @@ -65,7 +66,7 @@ rtc::scoped_refptr DtmfSender::Create( return rtc::make_ref_counted(signaling_thread, provider); } -DtmfSender::DtmfSender(rtc::Thread* signaling_thread, +DtmfSender::DtmfSender(TaskQueueBase* signaling_thread, DtmfProviderInterface* provider) : observer_(nullptr), signaling_thread_(signaling_thread), @@ -165,12 +166,12 @@ int DtmfSender::comma_delay() const { void DtmfSender::QueueInsertDtmf(const rtc::Location& posted_from, uint32_t delay_ms) { signaling_thread_->PostDelayedHighPrecisionTask( - ToQueuedTask(safety_flag_, - [this] { - RTC_DCHECK_RUN_ON(signaling_thread_); - DoInsertDtmf(); - }), - delay_ms); + SafeTask(safety_flag_, + [this] { + RTC_DCHECK_RUN_ON(signaling_thread_); + DoInsertDtmf(); + }), + TimeDelta::Millis(delay_ms)); } void DtmfSender::DoInsertDtmf() { diff --git a/pc/dtmf_sender.h b/pc/dtmf_sender.h index 06cd3a2605..eb3bf5fa7b 100644 --- a/pc/dtmf_sender.h +++ b/pc/dtmf_sender.h @@ -19,11 +19,11 @@ #include "api/scoped_refptr.h" #include "api/sequence_checker.h" #include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" #include "pc/proxy.h" #include "rtc_base/location.h" #include "rtc_base/ref_count.h" #include "rtc_base/third_party/sigslot/sigslot.h" -#include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h" // DtmfSender is the native implementation of the RTCDTMFSender defined by @@ -53,7 +53,7 @@ class DtmfProviderInterface { class DtmfSender : public DtmfSenderInterface, public sigslot::has_slots<> { public: - static rtc::scoped_refptr Create(rtc::Thread* signaling_thread, + static rtc::scoped_refptr Create(TaskQueueBase* signaling_thread, DtmfProviderInterface* provider); // Implements DtmfSenderInterface. @@ -70,7 +70,7 @@ class DtmfSender : public DtmfSenderInterface, public sigslot::has_slots<> { int comma_delay() const override; protected: - DtmfSender(rtc::Thread* signaling_thread, DtmfProviderInterface* provider); + DtmfSender(TaskQueueBase* signaling_thread, DtmfProviderInterface* provider); virtual ~DtmfSender(); DtmfSender(const DtmfSender&) = delete; @@ -90,7 +90,7 @@ class DtmfSender : public DtmfSenderInterface, public sigslot::has_slots<> { void StopSending() RTC_RUN_ON(signaling_thread_); DtmfSenderObserverInterface* observer_ RTC_GUARDED_BY(signaling_thread_); - rtc::Thread* signaling_thread_; + TaskQueueBase* const signaling_thread_; DtmfProviderInterface* provider_ RTC_GUARDED_BY(signaling_thread_); std::string tones_ RTC_GUARDED_BY(signaling_thread_); int duration_ RTC_GUARDED_BY(signaling_thread_); diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 9251ba9cd1..f2727a4fdf 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -25,7 +25,6 @@ #include "api/jsep_ice_candidate.h" #include "api/rtp_parameters.h" #include "api/rtp_transceiver_direction.h" -#include "api/task_queue/to_queued_task.h" #include "api/uma_metrics.h" #include "api/video/video_codec_constants.h" #include "call/audio_state.h" @@ -726,7 +725,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n( ReportTransportStats(); } signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + SafeTask(signaling_thread_safety_.flag(), [this, s]() { RTC_DCHECK_RUN_ON(signaling_thread()); OnTransportControllerConnectionState(s); })); @@ -735,7 +734,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n( [this](PeerConnectionInterface::PeerConnectionState s) { RTC_DCHECK_RUN_ON(network_thread()); signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + SafeTask(signaling_thread_safety_.flag(), [this, s]() { RTC_DCHECK_RUN_ON(signaling_thread()); SetConnectionState(s); })); @@ -744,7 +743,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n( [this](PeerConnectionInterface::IceConnectionState s) { RTC_DCHECK_RUN_ON(network_thread()); signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + SafeTask(signaling_thread_safety_.flag(), [this, s]() { RTC_DCHECK_RUN_ON(signaling_thread()); SetStandardizedIceConnectionState(s); })); @@ -753,7 +752,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n( [this](cricket::IceGatheringState s) { RTC_DCHECK_RUN_ON(network_thread()); signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { + SafeTask(signaling_thread_safety_.flag(), [this, s]() { RTC_DCHECK_RUN_ON(signaling_thread()); OnTransportControllerGatheringState(s); })); @@ -763,17 +762,17 @@ JsepTransportController* PeerConnection::InitializeTransportController_n( const std::vector& candidates) { RTC_DCHECK_RUN_ON(network_thread()); signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), - [this, t = transport, c = candidates]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidatesGathered(t, c); - })); + SafeTask(signaling_thread_safety_.flag(), + [this, t = transport, c = candidates]() { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidatesGathered(t, c); + })); }); transport_controller_->SubscribeIceCandidateError( [this](const cricket::IceCandidateErrorEvent& event) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask(ToQueuedTask( - signaling_thread_safety_.flag(), [this, event = event]() { + signaling_thread()->PostTask( + SafeTask(signaling_thread_safety_.flag(), [this, event = event]() { RTC_DCHECK_RUN_ON(signaling_thread()); OnTransportControllerCandidateError(event); })); @@ -782,7 +781,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n( [this](const std::vector& c) { RTC_DCHECK_RUN_ON(network_thread()); signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() { + SafeTask(signaling_thread_safety_.flag(), [this, c = c]() { RTC_DCHECK_RUN_ON(signaling_thread()); OnTransportControllerCandidatesRemoved(c); })); @@ -790,8 +789,8 @@ JsepTransportController* PeerConnection::InitializeTransportController_n( transport_controller_->SubscribeIceCandidatePairChanged( [this](const cricket::CandidatePairChangeEvent& event) { RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask(ToQueuedTask( - signaling_thread_safety_.flag(), [this, event = event]() { + signaling_thread()->PostTask( + SafeTask(signaling_thread_safety_.flag(), [this, event = event]() { RTC_DCHECK_RUN_ON(signaling_thread()); OnTransportControllerCandidateChanged(event); })); @@ -2497,11 +2496,11 @@ bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) { transport_controller_->GetDtlsTransport(mid); if (dtls_transport) { signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), - [this, name = dtls_transport->transport_name()] { - RTC_DCHECK_RUN_ON(signaling_thread()); - sctp_transport_name_s_ = std::move(name); - })); + SafeTask(signaling_thread_safety_.flag(), + [this, name = dtls_transport->transport_name()] { + RTC_DCHECK_RUN_ON(signaling_thread()); + sctp_transport_name_s_ = std::move(name); + })); } // Note: setting the data sink and checking initial state must be done last, @@ -2662,14 +2661,14 @@ void PeerConnection::AddRemoteCandidate(const std::string& mid, const cricket::Candidate& candidate) { RTC_DCHECK_RUN_ON(signaling_thread()); - network_thread()->PostTask(ToQueuedTask( + network_thread()->PostTask(SafeTask( network_thread_safety_, [this, mid = mid, candidate = candidate] { RTC_DCHECK_RUN_ON(network_thread()); std::vector candidates = {candidate}; RTCError error = transport_controller_->AddRemoteCandidates(mid, candidates); if (error.ok()) { - signaling_thread()->PostTask(ToQueuedTask( + signaling_thread()->PostTask(SafeTask( signaling_thread_safety_.flag(), [this, candidate = std::move(candidate)] { ReportRemoteIceCandidateAdded(candidate); @@ -2916,7 +2915,7 @@ bool PeerConnection::OnTransportChanged( if (mid == sctp_mid_n_) { data_channel_controller_.OnTransportChanged(data_channel_transport); if (dtls_transport) { - signaling_thread()->PostTask(ToQueuedTask( + signaling_thread()->PostTask(SafeTask( signaling_thread_safety_.flag(), [this, name = std::string(dtls_transport->internal()->transport_name())] { @@ -2942,7 +2941,7 @@ void PeerConnection::StartSctpTransport(int local_port, if (!sctp_mid_s_) return; - network_thread()->PostTask(ToQueuedTask( + network_thread()->PostTask(SafeTask( network_thread_safety_, [this, mid = *sctp_mid_s_, local_port, remote_port, max_message_size] { rtc::scoped_refptr sctp_transport = diff --git a/pc/proxy.h b/pc/proxy.h index 2f3865d5bc..89a7f51150 100644 --- a/pc/proxy.h +++ b/pc/proxy.h @@ -123,7 +123,7 @@ class ReturnType { }; template -class MethodCall : public QueuedTask { +class MethodCall { public: typedef R (C::*Method)(Args...); MethodCall(C* c, Method m, Args&&... args) @@ -135,19 +135,16 @@ class MethodCall : public QueuedTask { if (t->IsCurrent()) { Invoke(std::index_sequence_for()); } else { - t->PostTask(std::unique_ptr(this)); + t->PostTask([this] { + Invoke(std::index_sequence_for()); + event_.Set(); + }); event_.Wait(rtc::Event::kForever); } return r_.moved_result(); } private: - bool Run() override { - Invoke(std::index_sequence_for()); - event_.Set(); - return false; - } - template void Invoke(std::index_sequence) { r_.Invoke(c_, m_, std::move(std::get(args_))...); @@ -161,7 +158,7 @@ class MethodCall : public QueuedTask { }; template -class ConstMethodCall : public QueuedTask { +class ConstMethodCall { public: typedef R (C::*Method)(Args...) const; ConstMethodCall(const C* c, Method m, Args&&... args) @@ -173,19 +170,16 @@ class ConstMethodCall : public QueuedTask { if (t->IsCurrent()) { Invoke(std::index_sequence_for()); } else { - t->PostTask(std::unique_ptr(this)); + t->PostTask([this] { + Invoke(std::index_sequence_for()); + event_.Set(); + }); event_.Wait(rtc::Event::kForever); } return r_.moved_result(); } private: - bool Run() override { - Invoke(std::index_sequence_for()); - event_.Set(); - return false; - } - template void Invoke(std::index_sequence) { r_.Invoke(c_, m_, std::move(std::get(args_))...); diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc index f54198df44..478f714ddb 100644 --- a/pc/rtc_stats_collector.cc +++ b/pc/rtc_stats_collector.cc @@ -21,6 +21,7 @@ #include #include +#include "absl/functional/bind_front.h" #include "absl/strings/string_view.h" #include "api/array_view.h" #include "api/candidate.h" @@ -1304,33 +1305,10 @@ void RTCStatsCollector::GetStatsReportInternal( // We have a fresh cached report to deliver. Deliver asynchronously, since // the caller may not be expecting a synchronous callback, and it avoids // reentrancy problems. - std::vector requests; - requests.swap(requests_); - - // Task subclass to take ownership of the requests. - // TODO(nisse): Delete when we can use C++14, and do lambda capture with - // std::move. - class DeliveryTask : public QueuedTask { - public: - DeliveryTask(rtc::scoped_refptr collector, - rtc::scoped_refptr cached_report, - std::vector requests) - : collector_(collector), - cached_report_(cached_report), - requests_(std::move(requests)) {} - bool Run() override { - collector_->DeliverCachedReport(cached_report_, std::move(requests_)); - return true; - } - - private: - rtc::scoped_refptr collector_; - rtc::scoped_refptr cached_report_; - std::vector requests_; - }; - signaling_thread_->PostTask(std::make_unique( - rtc::scoped_refptr(this), cached_report_, - std::move(requests))); + signaling_thread_->PostTask( + absl::bind_front(&RTCStatsCollector::DeliverCachedReport, + rtc::scoped_refptr(this), + cached_report_, std::move(requests_))); } else if (!num_pending_partial_reports_) { // Only start gathering stats if we're not already gathering stats. In the // case of already gathering stats, `callback_` will be invoked when there diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc index 387d3449f3..44a96d4c61 100644 --- a/pc/rtp_transceiver.cc +++ b/pc/rtp_transceiver.cc @@ -21,7 +21,6 @@ #include "api/peer_connection_interface.h" #include "api/rtp_parameters.h" #include "api/sequence_checker.h" -#include "api/task_queue/to_queued_task.h" #include "media/base/codec.h" #include "media/base/media_constants.h" #include "media/base/media_engine.h" @@ -287,8 +286,8 @@ void RtpTransceiver::SetChannel( channel_->SetRtpTransport(transport_lookup(channel_->mid())); channel_->SetFirstPacketReceivedCallback( [thread = thread_, flag = signaling_thread_safety_, this]() mutable { - thread->PostTask(ToQueuedTask(std::move(flag), - [this]() { OnFirstPacketReceived(); })); + thread->PostTask( + SafeTask(std::move(flag), [this]() { OnFirstPacketReceived(); })); }); }); PushNewMediaChannelAndDeleteChannel(nullptr); diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc index 08882c910c..a5e0d76b5b 100644 --- a/pc/sctp_data_channel.cc +++ b/pc/sctp_data_channel.cc @@ -15,7 +15,7 @@ #include #include -#include "api/task_queue/to_queued_task.h" +#include "absl/cleanup/cleanup.h" #include "media/sctp/sctp_transport_internal.h" #include "pc/proxy.h" #include "pc/sctp_utils.h" @@ -221,13 +221,12 @@ bool SctpDataChannel::Init() { RTC_DCHECK(!controller_detached_); if (controller_->ReadyToSendData()) { AddRef(); - rtc::Thread::Current()->PostTask(ToQueuedTask( - [this] { - RTC_DCHECK_RUN_ON(signaling_thread_); - if (state_ != kClosed) - OnTransportReady(true); - }, - [this] { Release(); })); + absl::Cleanup release = [this] { Release(); }; + rtc::Thread::Current()->PostTask([this, release = std::move(release)] { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (state_ != kClosed) + OnTransportReady(true); + }); } return true; diff --git a/pc/test/integration_test_helpers.cc b/pc/test/integration_test_helpers.cc index 3f07f361fc..9bfd9fdcbd 100644 --- a/pc/test/integration_test_helpers.cc +++ b/pc/test/integration_test_helpers.cc @@ -64,8 +64,7 @@ TaskQueueMetronome::TaskQueueMetronome(TaskQueueFactory* factory, tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] { MutexLock lock(&mutex_); for (auto* listener : listeners_) { - listener->OnTickTaskQueue()->PostTask( - ToQueuedTask([listener] { listener->OnTick(); })); + listener->OnTickTaskQueue()->PostTask([listener] { listener->OnTick(); }); } return tick_period_; }); diff --git a/pc/test/integration_test_helpers.h b/pc/test/integration_test_helpers.h index 3b53ece9b3..f68b96ed17 100644 --- a/pc/test/integration_test_helpers.h +++ b/pc/test/integration_test_helpers.h @@ -54,9 +54,9 @@ #include "api/task_queue/default_task_queue_factory.h" #include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/task_queue_factory.h" -#include "api/task_queue/to_queued_task.h" #include "api/transport/field_trial_based_config.h" #include "api/uma_metrics.h" +#include "api/units/time_delta.h" #include "api/video/video_rotation.h" #include "api/video_codecs/sdp_video_format.h" #include "api/video_codecs/video_decoder_factory.h" @@ -1007,11 +1007,11 @@ class PeerConnectionIntegrationWrapper : public webrtc::PeerConnectionObserver, RelaySdpMessageIfReceiverExists(type, msg); } else { rtc::Thread::Current()->PostDelayedTask( - ToQueuedTask(task_safety_.flag(), - [this, type, msg] { - RelaySdpMessageIfReceiverExists(type, msg); - }), - signaling_delay_ms_); + SafeTask(task_safety_.flag(), + [this, type, msg] { + RelaySdpMessageIfReceiverExists(type, msg); + }), + TimeDelta::Millis(signaling_delay_ms_)); } } @@ -1030,12 +1030,12 @@ class PeerConnectionIntegrationWrapper : public webrtc::PeerConnectionObserver, RelayIceMessageIfReceiverExists(sdp_mid, sdp_mline_index, msg); } else { rtc::Thread::Current()->PostDelayedTask( - ToQueuedTask(task_safety_.flag(), - [this, sdp_mid, sdp_mline_index, msg] { - RelayIceMessageIfReceiverExists(sdp_mid, - sdp_mline_index, msg); - }), - signaling_delay_ms_); + SafeTask(task_safety_.flag(), + [this, sdp_mid, sdp_mline_index, msg] { + RelayIceMessageIfReceiverExists(sdp_mid, sdp_mline_index, + msg); + }), + TimeDelta::Millis(signaling_delay_ms_)); } }