From 7b19036b80ef8377f8c479ae4bd0bd499362c218 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Thu, 7 Jul 2022 14:13:02 +0200 Subject: [PATCH] Migrate p2p/ to absl::AnyInvocable based TaskQueueBase interface Bug: webrtc:14245 Change-Id: Iade96b4499e45401491c3eee941fafa51fb2849b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268147 Reviewed-by: Tomas Gunnarsson Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#37482} --- p2p/BUILD.gn | 6 ++--- p2p/base/fake_ice_transport.h | 16 +++++++------ p2p/base/p2p_transport_channel.cc | 31 +++++++++++++------------ p2p/base/port.cc | 3 +-- p2p/base/regathering_controller.cc | 32 +++++++++++++------------- p2p/base/stun_request.cc | 6 ++--- p2p/base/tcp_port.cc | 21 +++++++++-------- p2p/base/turn_port.cc | 34 +++++++++++++++------------- p2p/base/turn_server.cc | 4 +--- p2p/client/basic_port_allocator.cc | 36 ++++++++++++++++-------------- p2p/stunprober/stun_prober.cc | 18 +++++++-------- 11 files changed, 105 insertions(+), 102 deletions(-) diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn index 18264eff5d..60fe37d24a 100644 --- a/p2p/BUILD.gn +++ b/p2p/BUILD.gn @@ -139,7 +139,6 @@ rtc_library("rtc_p2p") { # Needed by pseudo_tcp, which should move to a separate target. "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../rtc_base:safe_minmax", "../rtc_base:weak_ptr", "../rtc_base/network:sent_packet", @@ -167,7 +166,7 @@ if (rtc_include_tests) { "../api:ice_transport_interface", "../api:libjingle_peerconnection_api", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", + "../api/units:time_delta", "../rtc_base", "../rtc_base:copy_on_write_buffer", ] @@ -322,7 +321,6 @@ rtc_library("p2p_server_utils") { "../api:array_view", "../api:packet_socket_factory", "../api:sequence_checker", - "../api/task_queue:to_queued_task", "../api/transport:stun_types", "../rtc_base", "../rtc_base:byte_buffer", @@ -353,8 +351,8 @@ rtc_library("libstunprober") { "../api:packet_socket_factory", "../api:sequence_checker", "../api/task_queue:pending_task_safety_flag", - "../api/task_queue:to_queued_task", "../api/transport:stun_types", + "../api/units:time_delta", "../rtc_base", "../rtc_base:async_resolver_interface", "../rtc_base:byte_buffer", diff --git a/p2p/base/fake_ice_transport.h b/p2p/base/fake_ice_transport.h index 4c636cf148..31db8296bd 100644 --- a/p2p/base/fake_ice_transport.h +++ b/p2p/base/fake_ice_transport.h @@ -21,11 +21,13 @@ #include "absl/types/optional.h" #include "api/ice_transport_interface.h" #include "api/task_queue/pending_task_safety_flag.h" -#include "api/task_queue/to_queued_task.h" +#include "api/units/time_delta.h" #include "p2p/base/ice_transport_internal.h" #include "rtc_base/copy_on_write_buffer.h" namespace cricket { +using ::webrtc::SafeTask; +using ::webrtc::TimeDelta; // All methods must be called on the network thread (which is either the thread // calling the constructor, or the separate thread explicitly passed to the @@ -310,12 +312,12 @@ class FakeIceTransport : public IceTransportInternal { rtc::CopyOnWriteBuffer packet(std::move(send_packet_)); if (async_) { network_thread_->PostDelayedTask( - ToQueuedTask(task_safety_.flag(), - [this, packet] { - RTC_DCHECK_RUN_ON(network_thread_); - FakeIceTransport::SendPacketInternal(packet); - }), - async_delay_ms_); + SafeTask(task_safety_.flag(), + [this, packet] { + RTC_DCHECK_RUN_ON(network_thread_); + FakeIceTransport::SendPacketInternal(packet); + }), + TimeDelta::Millis(async_delay_ms_)); } else { SendPacketInternal(packet); } diff --git a/p2p/base/p2p_transport_channel.cc b/p2p/base/p2p_transport_channel.cc index 3283294074..87338bfb90 100644 --- a/p2p/base/p2p_transport_channel.cc +++ b/p2p/base/p2p_transport_channel.cc @@ -26,8 +26,7 @@ #include "api/async_dns_resolver.h" #include "api/candidate.h" #include "api/field_trials_view.h" -#include "api/task_queue/queued_task.h" -#include "api/task_queue/to_queued_task.h" +#include "api/units/time_delta.h" #include "logging/rtc_event_log/ice_logger.h" #include "p2p/base/basic_async_resolver_factory.h" #include "p2p/base/basic_ice_controller.h" @@ -98,9 +97,10 @@ rtc::RouteEndpoint CreateRouteEndpointFromCandidate( namespace cricket { -using webrtc::RTCError; -using webrtc::RTCErrorType; -using webrtc::ToQueuedTask; +using ::webrtc::RTCError; +using ::webrtc::RTCErrorType; +using ::webrtc::SafeTask; +using ::webrtc::TimeDelta; bool IceCredentialsChanged(absl::string_view old_ufrag, absl::string_view old_pwd, @@ -309,11 +309,11 @@ bool P2PTransportChannel::MaybeSwitchSelectedConnection( // currently selected connection. So we need to re-check whether it needs // to be switched at a later time. network_thread_->PostDelayedTask( - ToQueuedTask(task_safety_, - [this, reason = result.recheck_event->reason]() { - SortConnectionsAndUpdateState(reason); - }), - result.recheck_event->recheck_delay_ms); + SafeTask(task_safety_.flag(), + [this, reason = result.recheck_event->reason]() { + SortConnectionsAndUpdateState(reason); + }), + TimeDelta::Millis(result.recheck_event->recheck_delay_ms)); } for (const auto* con : result.connections_to_forget_state_on) { @@ -1316,8 +1316,7 @@ void P2PTransportChannel::OnCandidateResolved( std::unique_ptr to_delete = std::move(p->resolver_); // Delay the actual deletion of the resolver until the lambda executes. - network_thread_->PostTask( - ToQueuedTask([delete_this = std::move(to_delete)] {})); + network_thread_->PostTask([to_delete = std::move(to_delete)] {}); resolvers_.erase(p); } @@ -1723,7 +1722,7 @@ void P2PTransportChannel::RequestSortAndStateUpdate( RTC_DCHECK_RUN_ON(network_thread_); if (!sort_dirty_) { network_thread_->PostTask( - ToQueuedTask(task_safety_, [this, reason_to_sort]() { + SafeTask(task_safety_.flag(), [this, reason_to_sort]() { SortConnectionsAndUpdateState(reason_to_sort); })); sort_dirty_ = true; @@ -1741,7 +1740,7 @@ void P2PTransportChannel::MaybeStartPinging() { << ": Have a pingable connection for the first time; " "starting to ping."; network_thread_->PostTask( - ToQueuedTask(task_safety_, [this]() { CheckAndPing(); })); + SafeTask(task_safety_.flag(), [this]() { CheckAndPing(); })); regathering_controller_->Start(); started_pinging_ = true; } @@ -2073,7 +2072,7 @@ void P2PTransportChannel::CheckAndPing() { UpdateConnectionStates(); auto result = ice_controller_->SelectConnectionToPing(last_ping_sent_ms_); - int delay = result.recheck_delay_ms; + TimeDelta delay = TimeDelta::Millis(result.recheck_delay_ms); if (result.connection.value_or(nullptr)) { Connection* conn = FromIceController(*result.connection); @@ -2082,7 +2081,7 @@ void P2PTransportChannel::CheckAndPing() { } network_thread_->PostDelayedTask( - ToQueuedTask(task_safety_, [this]() { CheckAndPing(); }), delay); + SafeTask(task_safety_.flag(), [this]() { CheckAndPing(); }), delay); } // This method is only for unit testing. diff --git a/p2p/base/port.cc b/p2p/base/port.cc index d622664642..7335b705bf 100644 --- a/p2p/base/port.cc +++ b/p2p/base/port.cc @@ -928,8 +928,7 @@ void Port::DestroyConnectionInternal(Connection* conn, bool async) { // so that the object will always be deleted, including if PostTask fails. // In such a case (only tests), deletion would happen inside of the call // to `DestroyConnection()`. - thread_->PostTask( - webrtc::ToQueuedTask([conn = absl::WrapUnique(conn)]() {})); + thread_->PostTask([conn = absl::WrapUnique(conn)]() {}); } else { delete conn; } diff --git a/p2p/base/regathering_controller.cc b/p2p/base/regathering_controller.cc index 3fe5c00cfb..572c2a616f 100644 --- a/p2p/base/regathering_controller.cc +++ b/p2p/base/regathering_controller.cc @@ -10,7 +10,8 @@ #include "p2p/base/regathering_controller.h" -#include "api/task_queue/to_queued_task.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/units/time_delta.h" namespace webrtc { @@ -60,21 +61,20 @@ void BasicRegatheringController:: pending_regathering_.reset(new ScopedTaskSafety()); thread_->PostDelayedTask( - ToQueuedTask(*pending_regathering_.get(), - [this]() { - RTC_DCHECK_RUN_ON(thread_); - // Only regather when the current session is in the CLEARED - // state (i.e., not running or stopped). It is only - // possible to enter this state when we gather continually, - // so there is an implicit check on continual gathering - // here. - if (allocator_session_ && - allocator_session_->IsCleared()) { - allocator_session_->RegatherOnFailedNetworks(); - } - ScheduleRecurringRegatheringOnFailedNetworks(); - }), - config_.regather_on_failed_networks_interval); + SafeTask(pending_regathering_->flag(), + [this]() { + RTC_DCHECK_RUN_ON(thread_); + // Only regather when the current session is in the CLEARED + // state (i.e., not running or stopped). It is only + // possible to enter this state when we gather continually, + // so there is an implicit check on continual gathering + // here. + if (allocator_session_ && allocator_session_->IsCleared()) { + allocator_session_->RegatherOnFailedNetworks(); + } + ScheduleRecurringRegatheringOnFailedNetworks(); + }), + TimeDelta::Millis(config_.regather_on_failed_networks_interval)); } } // namespace webrtc diff --git a/p2p/base/stun_request.cc b/p2p/base/stun_request.cc index 3fe02aff22..c940027f91 100644 --- a/p2p/base/stun_request.cc +++ b/p2p/base/stun_request.cc @@ -16,7 +16,7 @@ #include #include "absl/memory/memory.h" -#include "api/task_queue/to_queued_task.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "rtc_base/checks.h" #include "rtc_base/helpers.h" #include "rtc_base/logging.h" @@ -24,6 +24,7 @@ #include "rtc_base/time_utils.h" // For TimeMillis namespace cricket { +using ::webrtc::SafeTask; // RFC 5389 says SHOULD be 500ms. // For years, this was 100ms, but for networks that @@ -241,8 +242,7 @@ void StunRequest::SendInternal() { void StunRequest::SendDelayed(webrtc::TimeDelta delay) { network_thread()->PostDelayedTask( - webrtc::ToQueuedTask(task_safety_, [this]() { SendInternal(); }), - delay.ms()); + SafeTask(task_safety_.flag(), [this]() { SendInternal(); }), delay); } void StunRequest::Send(webrtc::TimeDelta delay) { diff --git a/p2p/base/tcp_port.cc b/p2p/base/tcp_port.cc index 87165ac73f..bc6ccab674 100644 --- a/p2p/base/tcp_port.cc +++ b/p2p/base/tcp_port.cc @@ -74,7 +74,8 @@ #include "absl/algorithm/container.h" #include "absl/memory/memory.h" #include "absl/strings/string_view.h" -#include "api/task_queue/to_queued_task.h" +#include "api/task_queue/pending_task_safety_flag.h" +#include "api/units/time_delta.h" #include "p2p/base/p2p_constants.h" #include "rtc_base/checks.h" #include "rtc_base/ip_address.h" @@ -85,6 +86,8 @@ #include "rtc_base/third_party/sigslot/sigslot.h" namespace cricket { +using ::webrtc::SafeTask; +using ::webrtc::TimeDelta; TCPPort::TCPPort(rtc::Thread* thread, rtc::PacketSocketFactory* factory, @@ -510,13 +513,13 @@ void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) { // shutdown is intentional and reconnect is not necessary. We only reconnect // when the connection is used to Send() or Ping(). network_thread()->PostDelayedTask( - webrtc::ToQueuedTask(network_safety_, - [this]() { - if (pretending_to_be_writable_) { - Destroy(); - } - }), - reconnection_timeout()); + SafeTask(network_safety_.flag(), + [this]() { + if (pretending_to_be_writable_) { + Destroy(); + } + }), + TimeDelta::Millis(reconnection_timeout())); } else if (!pretending_to_be_writable_) { // OnClose could be called when the underneath socket times out during the // initial connect() (i.e. `pretending_to_be_writable_` is false) . We have @@ -589,7 +592,7 @@ void TCPConnection::CreateOutgoingTcpSocket() { // of Connection::Ping(), we are still using the request. // Unwind the stack and defer the FailAndPrune. network_thread()->PostTask( - webrtc::ToQueuedTask(network_safety_, [this]() { FailAndPrune(); })); + SafeTask(network_safety_.flag(), [this]() { FailAndPrune(); })); } } diff --git a/p2p/base/turn_port.cc b/p2p/base/turn_port.cc index c1b4c102d8..5d24ceb34c 100644 --- a/p2p/base/turn_port.cc +++ b/p2p/base/turn_port.cc @@ -19,7 +19,6 @@ #include "absl/strings/match.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" -#include "api/task_queue/to_queued_task.h" #include "api/transport/stun.h" #include "p2p/base/connection.h" #include "p2p/base/p2p_constants.h" @@ -32,6 +31,8 @@ #include "rtc_base/strings/string_builder.h" namespace cricket { +using ::webrtc::SafeTask; +using ::webrtc::TimeDelta; // TODO(juberti): Move to stun.h when relay messages have been renamed. static const int TURN_ALLOCATE_REQUEST = STUN_ALLOCATE_REQUEST; @@ -45,7 +46,8 @@ const int STUN_ATTR_TURN_LOGGING_ID = 0xff05; // TODO(juberti): Extract to turnmessage.h static const int TURN_DEFAULT_PORT = 3478; static const int TURN_CHANNEL_NUMBER_START = 0x4000; -static const int TURN_PERMISSION_TIMEOUT = 5 * 60 * 1000; // 5 minutes + +static constexpr TimeDelta kTurnPermissionTimeout = TimeDelta::Minutes(5); static const size_t TURN_CHANNEL_HEADER_SIZE = 4U; @@ -161,7 +163,7 @@ class TurnEntry : public sigslot::has_slots<> { BindState state() const { return state_; } // If the destruction timestamp is set, that means destruction has been - // scheduled (will occur TURN_PERMISSION_TIMEOUT after it's scheduled). + // scheduled (will occur kTurnPermissionTimeout after it's scheduled). absl::optional destruction_timestamp() { return destruction_timestamp_; } @@ -1295,12 +1297,12 @@ void TurnPort::HandleConnectionDestroyed(Connection* conn) { RTC_DCHECK(!entry->destruction_timestamp().has_value()); int64_t timestamp = rtc::TimeMillis(); entry->set_destruction_timestamp(timestamp); - thread()->PostDelayedTask(ToQueuedTask(task_safety_.flag(), - [this, entry, timestamp] { - DestroyEntryIfNotCancelled( - entry, timestamp); - }), - TURN_PERMISSION_TIMEOUT); + thread()->PostDelayedTask(SafeTask(task_safety_.flag(), + [this, entry, timestamp] { + DestroyEntryIfNotCancelled(entry, + timestamp); + }), + kTurnPermissionTimeout); } bool TurnPort::SetEntryChannelId(const rtc::SocketAddress& address, @@ -1752,10 +1754,10 @@ void TurnChannelBindRequest::OnResponse(StunMessage* response) { // threshold. The channel binding has a longer lifetime, but // this is the easiest way to keep both the channel and the // permission from expiring. - int delay = TURN_PERMISSION_TIMEOUT - 60000; - entry_->SendChannelBindRequest(delay); + TimeDelta delay = kTurnPermissionTimeout - TimeDelta::Minutes(1); + entry_->SendChannelBindRequest(delay.ms()); RTC_LOG(LS_INFO) << port_->ToString() << ": Scheduled channel bind in " - << delay << "ms."; + << delay.ms() << "ms."; } } @@ -1855,11 +1857,11 @@ void TurnEntry::OnCreatePermissionSuccess() { if (state_ != STATE_BOUND) { // Refresh the permission request about 1 minute before the permission // times out. - int delay = TURN_PERMISSION_TIMEOUT - 60000; - SendCreatePermissionRequest(delay); + TimeDelta delay = kTurnPermissionTimeout - TimeDelta::Minutes(1); + SendCreatePermissionRequest(delay.ms()); RTC_LOG(LS_INFO) << port_->ToString() - << ": Scheduled create-permission-request in " << delay - << "ms."; + << ": Scheduled create-permission-request in " + << delay.ms() << "ms."; } } diff --git a/p2p/base/turn_server.cc b/p2p/base/turn_server.cc index bf050c2319..fd6b9223a8 100644 --- a/p2p/base/turn_server.cc +++ b/p2p/base/turn_server.cc @@ -19,7 +19,6 @@ #include "absl/strings/string_view.h" #include "api/array_view.h" #include "api/packet_socket_factory.h" -#include "api/task_queue/to_queued_task.h" #include "api/transport/stun.h" #include "p2p/base/async_stun_tcp_socket.h" #include "rtc_base/byte_buffer.h" @@ -571,8 +570,7 @@ void TurnServer::DestroyInternalSocket(rtc::AsyncPacketSocket* socket) { // We must destroy the socket async to avoid invalidating the sigslot // callback list iterator inside a sigslot callback. (In other words, // deleting an object from within a callback from that object). - thread_->PostTask(webrtc::ToQueuedTask( - [socket_to_delete = std::move(socket_to_delete)] {})); + thread_->PostTask([socket_to_delete = std::move(socket_to_delete)] {}); } } diff --git a/p2p/client/basic_port_allocator.cc b/p2p/client/basic_port_allocator.cc index 6aedf90b28..8963d4eb8f 100644 --- a/p2p/client/basic_port_allocator.cc +++ b/p2p/client/basic_port_allocator.cc @@ -21,8 +21,9 @@ #include "absl/algorithm/container.h" #include "absl/memory/memory.h" #include "absl/strings/string_view.h" -#include "api/task_queue/to_queued_task.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "api/transport/field_trial_based_config.h" +#include "api/units/time_delta.h" #include "p2p/base/basic_packet_socket_factory.h" #include "p2p/base/port.h" #include "p2p/base/stun_port.h" @@ -36,10 +37,11 @@ #include "rtc_base/trace_event.h" #include "system_wrappers/include/metrics.h" -using rtc::CreateRandomId; - namespace cricket { namespace { +using ::rtc::CreateRandomId; +using ::webrtc::SafeTask; +using ::webrtc::TimeDelta; const int PHASE_UDP = 0; const int PHASE_RELAY = 1; @@ -410,8 +412,8 @@ void BasicPortAllocatorSession::StartGettingPorts() { RTC_DCHECK_RUN_ON(network_thread_); state_ = SessionState::GATHERING; - network_thread_->PostTask(webrtc::ToQueuedTask( - network_safety_, [this] { GetPortConfigurations(); })); + network_thread_->PostTask( + SafeTask(network_safety_.flag(), [this] { GetPortConfigurations(); })); RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy " << turn_port_prune_policy_; @@ -432,7 +434,7 @@ void BasicPortAllocatorSession::ClearGettingPorts() { sequences_[i]->Stop(); } network_thread_->PostTask( - webrtc::ToQueuedTask(network_safety_, [this] { OnConfigStop(); })); + SafeTask(network_safety_.flag(), [this] { OnConfigStop(); })); state_ = SessionState::CLEARED; } @@ -647,8 +649,8 @@ void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { void BasicPortAllocatorSession::ConfigReady( std::unique_ptr config) { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->PostTask(webrtc::ToQueuedTask( - network_safety_, [this, config = std::move(config)]() mutable { + network_thread_->PostTask(SafeTask( + network_safety_.flag(), [this, config = std::move(config)]() mutable { OnConfigReady(std::move(config)); })); } @@ -696,8 +698,8 @@ void BasicPortAllocatorSession::OnConfigStop() { void BasicPortAllocatorSession::AllocatePorts() { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->PostTask(webrtc::ToQueuedTask( - network_safety_, [this, allocation_epoch = allocation_epoch_] { + network_thread_->PostTask(SafeTask( + network_safety_.flag(), [this, allocation_epoch = allocation_epoch_] { OnAllocate(allocation_epoch); })); } @@ -873,8 +875,9 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) { } } if (done_signal_needed) { - network_thread_->PostTask(webrtc::ToQueuedTask( - network_safety_, [this] { OnAllocationSequenceObjectsCreated(); })); + network_thread_->PostTask(SafeTask(network_safety_.flag(), [this] { + OnAllocationSequenceObjectsCreated(); + })); } } @@ -1391,8 +1394,8 @@ void AllocationSequence::DisableEquivalentPhases(const rtc::Network* network, void AllocationSequence::Start() { state_ = kRunning; - session_->network_thread()->PostTask(webrtc::ToQueuedTask( - safety_, [this, epoch = epoch_] { Process(epoch); })); + session_->network_thread()->PostTask( + SafeTask(safety_.flag(), [this, epoch = epoch_] { Process(epoch); })); // Take a snapshot of the best IP, so that when DisableEquivalentPhases is // called next time, we enable all phases if the best IP has since changed. previous_best_ip_ = network_->GetBestIP(); @@ -1440,9 +1443,8 @@ void AllocationSequence::Process(int epoch) { if (state() == kRunning) { ++phase_; session_->network_thread()->PostDelayedTask( - webrtc::ToQueuedTask(safety_, - [this, epoch = epoch_] { Process(epoch); }), - session_->allocator()->step_delay()); + SafeTask(safety_.flag(), [this, epoch = epoch_] { Process(epoch); }), + TimeDelta::Millis(session_->allocator()->step_delay())); } else { // No allocation steps needed further if all phases in AllocationSequence // are completed. Cause further Process calls in the previous epoch to be diff --git a/p2p/stunprober/stun_prober.cc b/p2p/stunprober/stun_prober.cc index 55122b9597..977ead4d72 100644 --- a/p2p/stunprober/stun_prober.cc +++ b/p2p/stunprober/stun_prober.cc @@ -17,8 +17,9 @@ #include #include "api/packet_socket_factory.h" -#include "api/task_queue/to_queued_task.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "api/transport/stun.h" +#include "api/units/time_delta.h" #include "rtc_base/async_packet_socket.h" #include "rtc_base/async_resolver_interface.h" #include "rtc_base/checks.h" @@ -30,6 +31,8 @@ namespace stunprober { namespace { +using ::webrtc::SafeTask; +using ::webrtc::TimeDelta; const int THREAD_WAKE_UP_INTERVAL_MS = 5; @@ -355,8 +358,7 @@ void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) { // Deletion of AsyncResolverInterface can't be done in OnResolveResult which // handles SignalDone. - thread_->PostTask( - webrtc::ToQueuedTask([resolver] { resolver->Destroy(false); })); + thread_->PostTask([resolver] { resolver->Destroy(false); }); servers_.pop_back(); if (servers_.size()) { @@ -454,9 +456,8 @@ void StunProber::MaybeScheduleStunRequests() { if (Done()) { thread_->PostDelayedTask( - webrtc::ToQueuedTask(task_safety_.flag(), - [this] { ReportOnFinished(SUCCESS); }), - timeout_ms_); + SafeTask(task_safety_.flag(), [this] { ReportOnFinished(SUCCESS); }), + TimeDelta::Millis(timeout_ms_)); return; } if (should_send_next_request(now)) { @@ -467,9 +468,8 @@ void StunProber::MaybeScheduleStunRequests() { next_request_time_ms_ = now + interval_ms_; } thread_->PostDelayedTask( - webrtc::ToQueuedTask(task_safety_.flag(), - [this] { MaybeScheduleStunRequests(); }), - get_wake_up_interval_ms()); + SafeTask(task_safety_.flag(), [this] { MaybeScheduleStunRequests(); }), + TimeDelta::Millis(get_wake_up_interval_ms())); } bool StunProber::GetStats(StunProber::Stats* prob_stats) const {