From 34f6d1c06d9a4b4b5ce68fa10a1e592d708d10df Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Fri, 19 Aug 2022 18:16:59 +0200 Subject: [PATCH] Migrate cricket::Port asynchronous calls to TaskQueueBase interface Bug: webrtc:9702 Change-Id: I13e05ced190ca64a217961d74ee92dd9c15ed8ce Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/271641 Commit-Queue: Danil Chapovalov Reviewed-by: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#37849} --- p2p/BUILD.gn | 1 - p2p/base/port.cc | 43 +++++++++++++++-------- p2p/base/port.h | 21 +++++------- p2p/base/tcp_port.cc | 1 - p2p/base/turn_port.cc | 79 +++++++++++++++++++------------------------ p2p/base/turn_port.h | 15 +++----- 6 files changed, 75 insertions(+), 85 deletions(-) diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn index 7c78774b14..bd7a3318de 100644 --- a/p2p/BUILD.gn +++ b/p2p/BUILD.gn @@ -117,7 +117,6 @@ rtc_library("rtc_p2p") { "../rtc_base:checks", "../rtc_base:event_tracer", "../rtc_base:ip_address", - "../rtc_base:location", "../rtc_base:logging", "../rtc_base:macromagic", "../rtc_base:net_helpers", diff --git a/p2p/base/port.cc b/p2p/base/port.cc index ab4d447baa..6792939009 100644 --- a/p2p/base/port.cc +++ b/p2p/base/port.cc @@ -37,8 +37,14 @@ #include "rtc_base/third_party/base64/base64.h" #include "rtc_base/trace_event.h" +namespace cricket { namespace { +using ::webrtc::RTCError; +using ::webrtc::RTCErrorType; +using ::webrtc::TaskQueueBase; +using ::webrtc::TimeDelta; + rtc::PacketInfoProtocolType ConvertProtocolTypeToPacketInfoProtocolType( cricket::ProtocolType type) { switch (type) { @@ -61,11 +67,6 @@ const int kPortTimeoutDelay = cricket::STUN_TOTAL_TIMEOUT + 5000; } // namespace -namespace cricket { - -using webrtc::RTCError; -using webrtc::RTCErrorType; - // TODO(ronghuawu): Use "local", "srflx", "prflx" and "relay". But this requires // the signaling part be updated correspondingly as well. const char LOCAL_PORT_TYPE[] = "local"; @@ -105,7 +106,7 @@ std::string Port::ComputeFoundation(absl::string_view type, return rtc::ToString(rtc::ComputeCrc32(sb.Release())); } -Port::Port(rtc::Thread* thread, +Port::Port(TaskQueueBase* thread, absl::string_view type, rtc::PacketSocketFactory* factory, const rtc::Network* network, @@ -134,7 +135,7 @@ Port::Port(rtc::Thread* thread, Construct(); } -Port::Port(rtc::Thread* thread, +Port::Port(TaskQueueBase* thread, absl::string_view type, rtc::PacketSocketFactory* factory, const rtc::Network* network, @@ -177,8 +178,7 @@ void Port::Construct() { network_->SignalTypeChanged.connect(this, &Port::OnNetworkTypeChanged); network_cost_ = network_->GetCost(field_trials()); - thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this, - MSG_DESTROY_IF_DEAD); + PostDestroyIfDead(/*delayed=*/true); RTC_LOG(LS_INFO) << ToString() << ": Port created with network cost " << network_cost_; } @@ -822,19 +822,33 @@ void Port::KeepAliveUntilPruned() { void Port::Prune() { state_ = State::PRUNED; - thread_->Post(RTC_FROM_HERE, this, MSG_DESTROY_IF_DEAD); + PostDestroyIfDead(/*delayed=*/false); } // Call to stop any currently pending operations from running. void Port::CancelPendingTasks() { TRACE_EVENT0("webrtc", "Port::CancelPendingTasks"); RTC_DCHECK_RUN_ON(thread_); - thread_->Clear(this); + weak_factory_.InvalidateWeakPtrs(); } -void Port::OnMessage(rtc::Message* pmsg) { +void Port::PostDestroyIfDead(bool delayed) { + rtc::WeakPtr weak_ptr = NewWeakPtr(); + auto task = [weak_ptr = std::move(weak_ptr)] { + if (weak_ptr) { + weak_ptr->DestroyIfDead(); + } + }; + if (delayed) { + thread_->PostDelayedTask(std::move(task), + TimeDelta::Millis(timeout_delay_)); + } else { + thread_->PostTask(std::move(task)); + } +} + +void Port::DestroyIfDead() { RTC_DCHECK_RUN_ON(thread_); - RTC_DCHECK(pmsg->message_id == MSG_DESTROY_IF_DEAD); bool dead = (state_ == State::INIT || state_ == State::PRUNED) && connections_.empty() && @@ -908,8 +922,7 @@ bool Port::OnConnectionDestroyed(Connection* conn) { // not cause the Port to be destroyed. if (connections_.empty()) { last_time_all_connections_removed_ = rtc::TimeMillis(); - thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this, - MSG_DESTROY_IF_DEAD); + PostDestroyIfDead(/*delayed=*/true); } return true; diff --git a/p2p/base/port.h b/p2p/base/port.h index ceb435d70d..a1377a827b 100644 --- a/p2p/base/port.h +++ b/p2p/base/port.h @@ -24,6 +24,7 @@ #include "api/field_trials_view.h" #include "api/packet_socket_factory.h" #include "api/rtc_error.h" +#include "api/task_queue/task_queue_base.h" #include "api/transport/field_trial_based_config.h" #include "api/transport/stun.h" #include "logging/rtc_event_log/events/rtc_event_ice_candidate_pair.h" @@ -46,7 +47,6 @@ #include "rtc_base/socket_address.h" #include "rtc_base/system/rtc_export.h" #include "rtc_base/third_party/sigslot/sigslot.h" -#include "rtc_base/thread.h" #include "rtc_base/weak_ptr.h" namespace cricket { @@ -176,9 +176,7 @@ typedef std::set ServerAddresses; // Represents a local communication mechanism that can be used to create // connections to similar mechanisms of the other client. Subclasses of this // one add support for specific mechanisms like local UDP ports. -class Port : public PortInterface, - public rtc::MessageHandler, - public sigslot::has_slots<> { +class Port : public PortInterface, public sigslot::has_slots<> { public: // INIT: The state when a port is just created. // KEEP_ALIVE_UNTIL_PRUNED: A port should not be destroyed even if no @@ -186,14 +184,14 @@ class Port : public PortInterface, // PRUNED: It will be destroyed if no connection is using it for a period of // 30 seconds. enum class State { INIT, KEEP_ALIVE_UNTIL_PRUNED, PRUNED }; - Port(rtc::Thread* thread, + Port(webrtc::TaskQueueBase* thread, absl::string_view type, rtc::PacketSocketFactory* factory, const rtc::Network* network, absl::string_view username_fragment, absl::string_view password, const webrtc::FieldTrialsView* field_trials = nullptr); - Port(rtc::Thread* thread, + Port(webrtc::TaskQueueBase* thread, absl::string_view type, rtc::PacketSocketFactory* factory, const rtc::Network* network, @@ -232,7 +230,7 @@ class Port : public PortInterface, void CancelPendingTasks(); // The thread on which this port performs its I/O. - rtc::Thread* thread() { return thread_; } + webrtc::TaskQueueBase* thread() { return thread_; } // The factory used to create the sockets of this port. rtc::PacketSocketFactory* socket_factory() const { return factory_; } @@ -346,8 +344,6 @@ class Port : public PortInterface, // Called if the port has no connections and is no longer useful. void Destroy(); - void OnMessage(rtc::Message* pmsg) override; - // Debugging description of this port std::string ToString() const override; uint16_t min_port() { return min_port_; } @@ -396,8 +392,6 @@ class Port : public PortInterface, const rtc::SocketAddress& base_address); protected: - enum { MSG_DESTROY_IF_DEAD = 0, MSG_FIRST_AVAILABLE }; - virtual void UpdateNetworkCost(); void set_type(absl::string_view type) { type_ = std::string(type); } @@ -470,6 +464,9 @@ class Port : public PortInterface, private: void Construct(); + void PostDestroyIfDead(bool delayed); + void DestroyIfDead(); + // Called internally when deleting a connection object. // Returns true if the connection object was removed from the `connections_` // list and the state updated accordingly. If the connection was not found @@ -485,7 +482,7 @@ class Port : public PortInterface, void OnNetworkTypeChanged(const rtc::Network* network); - rtc::Thread* const thread_; + webrtc::TaskQueueBase* const thread_; rtc::PacketSocketFactory* const factory_; std::string type_; bool send_retransmit_count_attribute_; diff --git a/p2p/base/tcp_port.cc b/p2p/base/tcp_port.cc index 84d0a68252..fbda2999f9 100644 --- a/p2p/base/tcp_port.cc +++ b/p2p/base/tcp_port.cc @@ -79,7 +79,6 @@ #include "p2p/base/p2p_constants.h" #include "rtc_base/checks.h" #include "rtc_base/ip_address.h" -#include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/net_helper.h" #include "rtc_base/rate_tracker.h" diff --git a/p2p/base/turn_port.cc b/p2p/base/turn_port.cc index 67b82c0358..1c9f3f34ea 100644 --- a/p2p/base/turn_port.cc +++ b/p2p/base/turn_port.cc @@ -31,7 +31,9 @@ #include "rtc_base/strings/string_builder.h" namespace cricket { + using ::webrtc::SafeTask; +using ::webrtc::TaskQueueBase; using ::webrtc::TimeDelta; // TODO(juberti): Move to stun.h when relay messages have been renamed. @@ -210,7 +212,7 @@ class TurnEntry : public sigslot::has_slots<> { std::string remote_ufrag_; }; -TurnPort::TurnPort(rtc::Thread* thread, +TurnPort::TurnPort(TaskQueueBase* thread, rtc::PacketSocketFactory* factory, const rtc::Network* network, rtc::AsyncPacketSocket* socket, @@ -250,7 +252,7 @@ TurnPort::TurnPort(rtc::Thread* thread, allocate_mismatch_retries_(0), turn_customizer_(customizer) {} -TurnPort::TurnPort(rtc::Thread* thread, +TurnPort::TurnPort(TaskQueueBase* thread, rtc::PacketSocketFactory* factory, const rtc::Network* network, uint16_t min_port, @@ -894,7 +896,8 @@ void TurnPort::OnAllocateError(int error_code, absl::string_view reason) { // We will send SignalPortError asynchronously as this can be sent during // port initialization. This way it will not be blocking other port // creation. - thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATE_ERROR); + thread()->PostTask( + SafeTask(task_safety_.flag(), [this] { SignalPortError(this); })); std::string address = GetLocalAddress().HostAsSensitiveURIString(); int port = GetLocalAddress().port(); if (server_address_.proto == PROTO_TCP && @@ -911,7 +914,8 @@ void TurnPort::OnRefreshError() { // Need to clear the requests asynchronously because otherwise, the refresh // request may be deleted twice: once at the end of the message processing // and the other in HandleRefreshError(). - thread()->Post(RTC_FROM_HERE, this, MSG_REFRESH_ERROR); + thread()->PostTask( + SafeTask(task_safety_.flag(), [this] { HandleRefreshError(); })); } void TurnPort::HandleRefreshError() { @@ -967,39 +971,21 @@ bool TurnPort::AllowedTurnPort(int port, return false; } -void TurnPort::OnMessage(rtc::Message* message) { - switch (message->message_id) { - case MSG_ALLOCATE_ERROR: - SignalPortError(this); - break; - case MSG_ALLOCATE_MISMATCH: - OnAllocateMismatch(); - break; - case MSG_REFRESH_ERROR: - HandleRefreshError(); - break; - case MSG_TRY_ALTERNATE_SERVER: - if (server_address().proto == PROTO_UDP) { - // Send another allocate request to alternate server, with the received - // realm and nonce values. - SendRequest(new TurnAllocateRequest(this), 0); - } else { - // Since it's TCP, we have to delete the connected socket and reconnect - // with the alternate server. PrepareAddress will send stun binding once - // the new socket is connected. - RTC_DCHECK(server_address().proto == PROTO_TCP || - server_address().proto == PROTO_TLS); - RTC_DCHECK(!SharedSocket()); - delete socket_; - socket_ = NULL; - PrepareAddress(); - } - break; - case MSG_ALLOCATION_RELEASED: - Close(); - break; - default: - Port::OnMessage(message); +void TurnPort::TryAlternateServer() { + if (server_address().proto == PROTO_UDP) { + // Send another allocate request to alternate server, with the received + // realm and nonce values. + SendRequest(new TurnAllocateRequest(this), 0); + } else { + // Since it's TCP, we have to delete the connected socket and reconnect + // with the alternate server. PrepareAddress will send stun binding once + // the new socket is connected. + RTC_DCHECK(server_address().proto == PROTO_TCP || + server_address().proto == PROTO_TLS); + RTC_DCHECK(!SharedSocket()); + delete socket_; + socket_ = nullptr; + PrepareAddress(); } } @@ -1449,12 +1435,13 @@ void TurnAllocateRequest::OnErrorResponse(StunMessage* response) { case STUN_ERROR_TRY_ALTERNATE: OnTryAlternate(response, error_code); break; - case STUN_ERROR_ALLOCATION_MISMATCH: + case STUN_ERROR_ALLOCATION_MISMATCH: { // We must handle this error async because trying to delete the socket in // OnErrorResponse will cause a deadlock on the socket. - port_->thread()->Post(RTC_FROM_HERE, port_, - TurnPort::MSG_ALLOCATE_MISMATCH); - break; + TurnPort* port = port_; + port->thread()->PostTask(SafeTask( + port->task_safety_.flag(), [port] { port->OnAllocateMismatch(); })); + } break; default: RTC_LOG(LS_WARNING) << port_->ToString() << ": Received TURN allocate error response, id=" @@ -1551,8 +1538,9 @@ void TurnAllocateRequest::OnTryAlternate(StunMessage* response, int code) { // For TCP, we can't close the original Tcp socket during handling a 300 as // we're still inside that socket's event handler. Doing so will cause // deadlock. - port_->thread()->Post(RTC_FROM_HERE, port_, - TurnPort::MSG_TRY_ALTERNATE_SERVER); + TurnPort* port = port_; + port->thread()->PostTask(SafeTask(port->task_safety_.flag(), + [port] { port->TryAlternateServer(); })); } TurnRefreshRequest::TurnRefreshRequest(TurnPort* port, int lifetime /*= -1*/) @@ -1602,8 +1590,9 @@ void TurnRefreshRequest::OnResponse(StunMessage* response) { } else { // If we scheduled a refresh with lifetime 0, we're releasing this // allocation; see TurnPort::Release. - port_->thread()->Post(RTC_FROM_HERE, port_, - TurnPort::MSG_ALLOCATION_RELEASED); + TurnPort* port = port_; + port->thread()->PostTask( + SafeTask(port->task_safety_.flag(), [port] { port->Close(); })); } port_->SignalTurnRefreshResult(port_, TURN_SUCCESS_RESULT_CODE); diff --git a/p2p/base/turn_port.h b/p2p/base/turn_port.h index e33f75e947..eaf796a2f0 100644 --- a/p2p/base/turn_port.h +++ b/p2p/base/turn_port.h @@ -24,6 +24,7 @@ #include "absl/strings/string_view.h" #include "api/async_dns_resolver.h" #include "api/task_queue/pending_task_safety_flag.h" +#include "api/task_queue/task_queue_base.h" #include "p2p/base/port.h" #include "p2p/client/basic_port_allocator.h" #include "rtc_base/async_packet_socket.h" @@ -204,7 +205,7 @@ class TurnPort : public Port { void CloseForTest() { Close(); } protected: - TurnPort(rtc::Thread* thread, + TurnPort(webrtc::TaskQueueBase* thread, rtc::PacketSocketFactory* factory, const rtc::Network* network, rtc::AsyncPacketSocket* socket, @@ -219,7 +220,7 @@ class TurnPort : public Port { rtc::SSLCertificateVerifier* tls_cert_verifier = nullptr, const webrtc::FieldTrialsView* field_trials = nullptr); - TurnPort(rtc::Thread* thread, + TurnPort(webrtc::TaskQueueBase* thread, rtc::PacketSocketFactory* factory, const rtc::Network* network, uint16_t min_port, @@ -249,21 +250,13 @@ class TurnPort : public Port { void Close(); private: - enum { - MSG_ALLOCATE_ERROR = MSG_FIRST_AVAILABLE, - MSG_ALLOCATE_MISMATCH, - MSG_TRY_ALTERNATE_SERVER, - MSG_REFRESH_ERROR, - MSG_ALLOCATION_RELEASED - }; - typedef std::list EntryList; typedef std::map SocketOptionsMap; typedef std::set AttemptedServerSet; static bool AllowedTurnPort(int port, const webrtc::FieldTrialsView* field_trials); - void OnMessage(rtc::Message* pmsg) override; + void TryAlternateServer(); bool CreateTurnClientSocket();