Migrate p2p/ to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: Iade96b4499e45401491c3eee941fafa51fb2849b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268147
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37482}
This commit is contained in:
Danil Chapovalov 2022-07-07 14:13:02 +02:00 committed by WebRTC LUCI CQ
parent 677c1ddde5
commit 7b19036b80
11 changed files with 105 additions and 102 deletions

View File

@ -139,7 +139,6 @@ rtc_library("rtc_p2p") {
# Needed by pseudo_tcp, which should move to a separate target.
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../rtc_base:safe_minmax",
"../rtc_base:weak_ptr",
"../rtc_base/network:sent_packet",
@ -167,7 +166,7 @@ if (rtc_include_tests) {
"../api:ice_transport_interface",
"../api:libjingle_peerconnection_api",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../rtc_base",
"../rtc_base:copy_on_write_buffer",
]
@ -322,7 +321,6 @@ rtc_library("p2p_server_utils") {
"../api:array_view",
"../api:packet_socket_factory",
"../api:sequence_checker",
"../api/task_queue:to_queued_task",
"../api/transport:stun_types",
"../rtc_base",
"../rtc_base:byte_buffer",
@ -353,8 +351,8 @@ rtc_library("libstunprober") {
"../api:packet_socket_factory",
"../api:sequence_checker",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/transport:stun_types",
"../api/units:time_delta",
"../rtc_base",
"../rtc_base:async_resolver_interface",
"../rtc_base:byte_buffer",

View File

@ -21,11 +21,13 @@
#include "absl/types/optional.h"
#include "api/ice_transport_interface.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "p2p/base/ice_transport_internal.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace cricket {
using ::webrtc::SafeTask;
using ::webrtc::TimeDelta;
// All methods must be called on the network thread (which is either the thread
// calling the constructor, or the separate thread explicitly passed to the
@ -310,12 +312,12 @@ class FakeIceTransport : public IceTransportInternal {
rtc::CopyOnWriteBuffer packet(std::move(send_packet_));
if (async_) {
network_thread_->PostDelayedTask(
ToQueuedTask(task_safety_.flag(),
[this, packet] {
RTC_DCHECK_RUN_ON(network_thread_);
FakeIceTransport::SendPacketInternal(packet);
}),
async_delay_ms_);
SafeTask(task_safety_.flag(),
[this, packet] {
RTC_DCHECK_RUN_ON(network_thread_);
FakeIceTransport::SendPacketInternal(packet);
}),
TimeDelta::Millis(async_delay_ms_));
} else {
SendPacketInternal(packet);
}

View File

@ -26,8 +26,7 @@
#include "api/async_dns_resolver.h"
#include "api/candidate.h"
#include "api/field_trials_view.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "logging/rtc_event_log/ice_logger.h"
#include "p2p/base/basic_async_resolver_factory.h"
#include "p2p/base/basic_ice_controller.h"
@ -98,9 +97,10 @@ rtc::RouteEndpoint CreateRouteEndpointFromCandidate(
namespace cricket {
using webrtc::RTCError;
using webrtc::RTCErrorType;
using webrtc::ToQueuedTask;
using ::webrtc::RTCError;
using ::webrtc::RTCErrorType;
using ::webrtc::SafeTask;
using ::webrtc::TimeDelta;
bool IceCredentialsChanged(absl::string_view old_ufrag,
absl::string_view old_pwd,
@ -309,11 +309,11 @@ bool P2PTransportChannel::MaybeSwitchSelectedConnection(
// currently selected connection. So we need to re-check whether it needs
// to be switched at a later time.
network_thread_->PostDelayedTask(
ToQueuedTask(task_safety_,
[this, reason = result.recheck_event->reason]() {
SortConnectionsAndUpdateState(reason);
}),
result.recheck_event->recheck_delay_ms);
SafeTask(task_safety_.flag(),
[this, reason = result.recheck_event->reason]() {
SortConnectionsAndUpdateState(reason);
}),
TimeDelta::Millis(result.recheck_event->recheck_delay_ms));
}
for (const auto* con : result.connections_to_forget_state_on) {
@ -1316,8 +1316,7 @@ void P2PTransportChannel::OnCandidateResolved(
std::unique_ptr<webrtc::AsyncDnsResolverInterface> to_delete =
std::move(p->resolver_);
// Delay the actual deletion of the resolver until the lambda executes.
network_thread_->PostTask(
ToQueuedTask([delete_this = std::move(to_delete)] {}));
network_thread_->PostTask([to_delete = std::move(to_delete)] {});
resolvers_.erase(p);
}
@ -1723,7 +1722,7 @@ void P2PTransportChannel::RequestSortAndStateUpdate(
RTC_DCHECK_RUN_ON(network_thread_);
if (!sort_dirty_) {
network_thread_->PostTask(
ToQueuedTask(task_safety_, [this, reason_to_sort]() {
SafeTask(task_safety_.flag(), [this, reason_to_sort]() {
SortConnectionsAndUpdateState(reason_to_sort);
}));
sort_dirty_ = true;
@ -1741,7 +1740,7 @@ void P2PTransportChannel::MaybeStartPinging() {
<< ": Have a pingable connection for the first time; "
"starting to ping.";
network_thread_->PostTask(
ToQueuedTask(task_safety_, [this]() { CheckAndPing(); }));
SafeTask(task_safety_.flag(), [this]() { CheckAndPing(); }));
regathering_controller_->Start();
started_pinging_ = true;
}
@ -2073,7 +2072,7 @@ void P2PTransportChannel::CheckAndPing() {
UpdateConnectionStates();
auto result = ice_controller_->SelectConnectionToPing(last_ping_sent_ms_);
int delay = result.recheck_delay_ms;
TimeDelta delay = TimeDelta::Millis(result.recheck_delay_ms);
if (result.connection.value_or(nullptr)) {
Connection* conn = FromIceController(*result.connection);
@ -2082,7 +2081,7 @@ void P2PTransportChannel::CheckAndPing() {
}
network_thread_->PostDelayedTask(
ToQueuedTask(task_safety_, [this]() { CheckAndPing(); }), delay);
SafeTask(task_safety_.flag(), [this]() { CheckAndPing(); }), delay);
}
// This method is only for unit testing.

View File

@ -928,8 +928,7 @@ void Port::DestroyConnectionInternal(Connection* conn, bool async) {
// so that the object will always be deleted, including if PostTask fails.
// In such a case (only tests), deletion would happen inside of the call
// to `DestroyConnection()`.
thread_->PostTask(
webrtc::ToQueuedTask([conn = absl::WrapUnique(conn)]() {}));
thread_->PostTask([conn = absl::WrapUnique(conn)]() {});
} else {
delete conn;
}

View File

@ -10,7 +10,8 @@
#include "p2p/base/regathering_controller.h"
#include "api/task_queue/to_queued_task.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/units/time_delta.h"
namespace webrtc {
@ -60,21 +61,20 @@ void BasicRegatheringController::
pending_regathering_.reset(new ScopedTaskSafety());
thread_->PostDelayedTask(
ToQueuedTask(*pending_regathering_.get(),
[this]() {
RTC_DCHECK_RUN_ON(thread_);
// Only regather when the current session is in the CLEARED
// state (i.e., not running or stopped). It is only
// possible to enter this state when we gather continually,
// so there is an implicit check on continual gathering
// here.
if (allocator_session_ &&
allocator_session_->IsCleared()) {
allocator_session_->RegatherOnFailedNetworks();
}
ScheduleRecurringRegatheringOnFailedNetworks();
}),
config_.regather_on_failed_networks_interval);
SafeTask(pending_regathering_->flag(),
[this]() {
RTC_DCHECK_RUN_ON(thread_);
// Only regather when the current session is in the CLEARED
// state (i.e., not running or stopped). It is only
// possible to enter this state when we gather continually,
// so there is an implicit check on continual gathering
// here.
if (allocator_session_ && allocator_session_->IsCleared()) {
allocator_session_->RegatherOnFailedNetworks();
}
ScheduleRecurringRegatheringOnFailedNetworks();
}),
TimeDelta::Millis(config_.regather_on_failed_networks_interval));
}
} // namespace webrtc

View File

@ -16,7 +16,7 @@
#include <vector>
#include "absl/memory/memory.h"
#include "api/task_queue/to_queued_task.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "rtc_base/checks.h"
#include "rtc_base/helpers.h"
#include "rtc_base/logging.h"
@ -24,6 +24,7 @@
#include "rtc_base/time_utils.h" // For TimeMillis
namespace cricket {
using ::webrtc::SafeTask;
// RFC 5389 says SHOULD be 500ms.
// For years, this was 100ms, but for networks that
@ -241,8 +242,7 @@ void StunRequest::SendInternal() {
void StunRequest::SendDelayed(webrtc::TimeDelta delay) {
network_thread()->PostDelayedTask(
webrtc::ToQueuedTask(task_safety_, [this]() { SendInternal(); }),
delay.ms());
SafeTask(task_safety_.flag(), [this]() { SendInternal(); }), delay);
}
void StunRequest::Send(webrtc::TimeDelta delay) {

View File

@ -74,7 +74,8 @@
#include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/to_queued_task.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/units/time_delta.h"
#include "p2p/base/p2p_constants.h"
#include "rtc_base/checks.h"
#include "rtc_base/ip_address.h"
@ -85,6 +86,8 @@
#include "rtc_base/third_party/sigslot/sigslot.h"
namespace cricket {
using ::webrtc::SafeTask;
using ::webrtc::TimeDelta;
TCPPort::TCPPort(rtc::Thread* thread,
rtc::PacketSocketFactory* factory,
@ -510,13 +513,13 @@ void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) {
// shutdown is intentional and reconnect is not necessary. We only reconnect
// when the connection is used to Send() or Ping().
network_thread()->PostDelayedTask(
webrtc::ToQueuedTask(network_safety_,
[this]() {
if (pretending_to_be_writable_) {
Destroy();
}
}),
reconnection_timeout());
SafeTask(network_safety_.flag(),
[this]() {
if (pretending_to_be_writable_) {
Destroy();
}
}),
TimeDelta::Millis(reconnection_timeout()));
} else if (!pretending_to_be_writable_) {
// OnClose could be called when the underneath socket times out during the
// initial connect() (i.e. `pretending_to_be_writable_` is false) . We have
@ -589,7 +592,7 @@ void TCPConnection::CreateOutgoingTcpSocket() {
// of Connection::Ping(), we are still using the request.
// Unwind the stack and defer the FailAndPrune.
network_thread()->PostTask(
webrtc::ToQueuedTask(network_safety_, [this]() { FailAndPrune(); }));
SafeTask(network_safety_.flag(), [this]() { FailAndPrune(); }));
}
}

View File

@ -19,7 +19,6 @@
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/task_queue/to_queued_task.h"
#include "api/transport/stun.h"
#include "p2p/base/connection.h"
#include "p2p/base/p2p_constants.h"
@ -32,6 +31,8 @@
#include "rtc_base/strings/string_builder.h"
namespace cricket {
using ::webrtc::SafeTask;
using ::webrtc::TimeDelta;
// TODO(juberti): Move to stun.h when relay messages have been renamed.
static const int TURN_ALLOCATE_REQUEST = STUN_ALLOCATE_REQUEST;
@ -45,7 +46,8 @@ const int STUN_ATTR_TURN_LOGGING_ID = 0xff05;
// TODO(juberti): Extract to turnmessage.h
static const int TURN_DEFAULT_PORT = 3478;
static const int TURN_CHANNEL_NUMBER_START = 0x4000;
static const int TURN_PERMISSION_TIMEOUT = 5 * 60 * 1000; // 5 minutes
static constexpr TimeDelta kTurnPermissionTimeout = TimeDelta::Minutes(5);
static const size_t TURN_CHANNEL_HEADER_SIZE = 4U;
@ -161,7 +163,7 @@ class TurnEntry : public sigslot::has_slots<> {
BindState state() const { return state_; }
// If the destruction timestamp is set, that means destruction has been
// scheduled (will occur TURN_PERMISSION_TIMEOUT after it's scheduled).
// scheduled (will occur kTurnPermissionTimeout after it's scheduled).
absl::optional<int64_t> destruction_timestamp() {
return destruction_timestamp_;
}
@ -1295,12 +1297,12 @@ void TurnPort::HandleConnectionDestroyed(Connection* conn) {
RTC_DCHECK(!entry->destruction_timestamp().has_value());
int64_t timestamp = rtc::TimeMillis();
entry->set_destruction_timestamp(timestamp);
thread()->PostDelayedTask(ToQueuedTask(task_safety_.flag(),
[this, entry, timestamp] {
DestroyEntryIfNotCancelled(
entry, timestamp);
}),
TURN_PERMISSION_TIMEOUT);
thread()->PostDelayedTask(SafeTask(task_safety_.flag(),
[this, entry, timestamp] {
DestroyEntryIfNotCancelled(entry,
timestamp);
}),
kTurnPermissionTimeout);
}
bool TurnPort::SetEntryChannelId(const rtc::SocketAddress& address,
@ -1752,10 +1754,10 @@ void TurnChannelBindRequest::OnResponse(StunMessage* response) {
// threshold. The channel binding has a longer lifetime, but
// this is the easiest way to keep both the channel and the
// permission from expiring.
int delay = TURN_PERMISSION_TIMEOUT - 60000;
entry_->SendChannelBindRequest(delay);
TimeDelta delay = kTurnPermissionTimeout - TimeDelta::Minutes(1);
entry_->SendChannelBindRequest(delay.ms());
RTC_LOG(LS_INFO) << port_->ToString() << ": Scheduled channel bind in "
<< delay << "ms.";
<< delay.ms() << "ms.";
}
}
@ -1855,11 +1857,11 @@ void TurnEntry::OnCreatePermissionSuccess() {
if (state_ != STATE_BOUND) {
// Refresh the permission request about 1 minute before the permission
// times out.
int delay = TURN_PERMISSION_TIMEOUT - 60000;
SendCreatePermissionRequest(delay);
TimeDelta delay = kTurnPermissionTimeout - TimeDelta::Minutes(1);
SendCreatePermissionRequest(delay.ms());
RTC_LOG(LS_INFO) << port_->ToString()
<< ": Scheduled create-permission-request in " << delay
<< "ms.";
<< ": Scheduled create-permission-request in "
<< delay.ms() << "ms.";
}
}

View File

@ -19,7 +19,6 @@
#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "api/packet_socket_factory.h"
#include "api/task_queue/to_queued_task.h"
#include "api/transport/stun.h"
#include "p2p/base/async_stun_tcp_socket.h"
#include "rtc_base/byte_buffer.h"
@ -571,8 +570,7 @@ void TurnServer::DestroyInternalSocket(rtc::AsyncPacketSocket* socket) {
// We must destroy the socket async to avoid invalidating the sigslot
// callback list iterator inside a sigslot callback. (In other words,
// deleting an object from within a callback from that object).
thread_->PostTask(webrtc::ToQueuedTask(
[socket_to_delete = std::move(socket_to_delete)] {}));
thread_->PostTask([socket_to_delete = std::move(socket_to_delete)] {});
}
}

View File

@ -21,8 +21,9 @@
#include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/to_queued_task.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/field_trial_based_config.h"
#include "api/units/time_delta.h"
#include "p2p/base/basic_packet_socket_factory.h"
#include "p2p/base/port.h"
#include "p2p/base/stun_port.h"
@ -36,10 +37,11 @@
#include "rtc_base/trace_event.h"
#include "system_wrappers/include/metrics.h"
using rtc::CreateRandomId;
namespace cricket {
namespace {
using ::rtc::CreateRandomId;
using ::webrtc::SafeTask;
using ::webrtc::TimeDelta;
const int PHASE_UDP = 0;
const int PHASE_RELAY = 1;
@ -410,8 +412,8 @@ void BasicPortAllocatorSession::StartGettingPorts() {
RTC_DCHECK_RUN_ON(network_thread_);
state_ = SessionState::GATHERING;
network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this] { GetPortConfigurations(); }));
network_thread_->PostTask(
SafeTask(network_safety_.flag(), [this] { GetPortConfigurations(); }));
RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
<< turn_port_prune_policy_;
@ -432,7 +434,7 @@ void BasicPortAllocatorSession::ClearGettingPorts() {
sequences_[i]->Stop();
}
network_thread_->PostTask(
webrtc::ToQueuedTask(network_safety_, [this] { OnConfigStop(); }));
SafeTask(network_safety_.flag(), [this] { OnConfigStop(); }));
state_ = SessionState::CLEARED;
}
@ -647,8 +649,8 @@ void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {
void BasicPortAllocatorSession::ConfigReady(
std::unique_ptr<PortConfiguration> config) {
RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this, config = std::move(config)]() mutable {
network_thread_->PostTask(SafeTask(
network_safety_.flag(), [this, config = std::move(config)]() mutable {
OnConfigReady(std::move(config));
}));
}
@ -696,8 +698,8 @@ void BasicPortAllocatorSession::OnConfigStop() {
void BasicPortAllocatorSession::AllocatePorts() {
RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this, allocation_epoch = allocation_epoch_] {
network_thread_->PostTask(SafeTask(
network_safety_.flag(), [this, allocation_epoch = allocation_epoch_] {
OnAllocate(allocation_epoch);
}));
}
@ -873,8 +875,9 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {
}
}
if (done_signal_needed) {
network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this] { OnAllocationSequenceObjectsCreated(); }));
network_thread_->PostTask(SafeTask(network_safety_.flag(), [this] {
OnAllocationSequenceObjectsCreated();
}));
}
}
@ -1391,8 +1394,8 @@ void AllocationSequence::DisableEquivalentPhases(const rtc::Network* network,
void AllocationSequence::Start() {
state_ = kRunning;
session_->network_thread()->PostTask(webrtc::ToQueuedTask(
safety_, [this, epoch = epoch_] { Process(epoch); }));
session_->network_thread()->PostTask(
SafeTask(safety_.flag(), [this, epoch = epoch_] { Process(epoch); }));
// Take a snapshot of the best IP, so that when DisableEquivalentPhases is
// called next time, we enable all phases if the best IP has since changed.
previous_best_ip_ = network_->GetBestIP();
@ -1440,9 +1443,8 @@ void AllocationSequence::Process(int epoch) {
if (state() == kRunning) {
++phase_;
session_->network_thread()->PostDelayedTask(
webrtc::ToQueuedTask(safety_,
[this, epoch = epoch_] { Process(epoch); }),
session_->allocator()->step_delay());
SafeTask(safety_.flag(), [this, epoch = epoch_] { Process(epoch); }),
TimeDelta::Millis(session_->allocator()->step_delay()));
} else {
// No allocation steps needed further if all phases in AllocationSequence
// are completed. Cause further Process calls in the previous epoch to be

View File

@ -17,8 +17,9 @@
#include <utility>
#include "api/packet_socket_factory.h"
#include "api/task_queue/to_queued_task.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/stun.h"
#include "api/units/time_delta.h"
#include "rtc_base/async_packet_socket.h"
#include "rtc_base/async_resolver_interface.h"
#include "rtc_base/checks.h"
@ -30,6 +31,8 @@
namespace stunprober {
namespace {
using ::webrtc::SafeTask;
using ::webrtc::TimeDelta;
const int THREAD_WAKE_UP_INTERVAL_MS = 5;
@ -355,8 +358,7 @@ void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) {
// Deletion of AsyncResolverInterface can't be done in OnResolveResult which
// handles SignalDone.
thread_->PostTask(
webrtc::ToQueuedTask([resolver] { resolver->Destroy(false); }));
thread_->PostTask([resolver] { resolver->Destroy(false); });
servers_.pop_back();
if (servers_.size()) {
@ -454,9 +456,8 @@ void StunProber::MaybeScheduleStunRequests() {
if (Done()) {
thread_->PostDelayedTask(
webrtc::ToQueuedTask(task_safety_.flag(),
[this] { ReportOnFinished(SUCCESS); }),
timeout_ms_);
SafeTask(task_safety_.flag(), [this] { ReportOnFinished(SUCCESS); }),
TimeDelta::Millis(timeout_ms_));
return;
}
if (should_send_next_request(now)) {
@ -467,9 +468,8 @@ void StunProber::MaybeScheduleStunRequests() {
next_request_time_ms_ = now + interval_ms_;
}
thread_->PostDelayedTask(
webrtc::ToQueuedTask(task_safety_.flag(),
[this] { MaybeScheduleStunRequests(); }),
get_wake_up_interval_ms());
SafeTask(task_safety_.flag(), [this] { MaybeScheduleStunRequests(); }),
TimeDelta::Millis(get_wake_up_interval_ms()));
}
bool StunProber::GetStats(StunProber::Stats* prob_stats) const {