From e2ab77ba57bff5db8eaa7a8442fa6b2f43914b69 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Mon, 21 Jun 2021 18:20:55 +0000 Subject: [PATCH] Reland "Port: migrate to TaskQueue." This reverts commit a4aabb921353125f6d3a2caa2ceb9cda7e971f22. Reason for revert: downstream tests fixed. TBR=hta@webrtc.org Original change's description: > Revert "Port: migrate to TaskQueue." > > This reverts commit 06540166ca97028454adea48cec9bf109b771ddc. > > Reason for revert: breaks downstream test. > > Original change's description: > > Port: migrate to TaskQueue. > > > > Port uses legacy rtc::Thread message handling. In order > > to cancel callbacks it uses rtc::Thread::Clear() which uses locks and > > necessitates looping through all currently queued (unbounded) messages > > in the thread. In particular, these Clear calls are common during > > negotiation and the probability of having a lot of queued messages is > > high due to a long-running network thread function invoked on the > > network thread. > > > > Fix this by migrating Port to task queues. > > > > > > Bug: webrtc:12840, webrtc:9702 > > Change-Id: I6c6fb83323899b56091f0857a1c2d15d19199002 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221370 > > Reviewed-by: Harald Alvestrand > > Commit-Queue: Markus Handell > > Cr-Commit-Position: refs/heads/master@{#34338} > > TBR=hta@webrtc.org,handellm@webrtc.org,webrtc-scoped@luci-project-accounts.iam.gserviceaccount.com > > Change-Id: I014ef9267d224c10595cfa1c12899eabe0093306 > No-Presubmit: true > No-Tree-Checks: true > No-Try: true > Bug: webrtc:12840, webrtc:9702 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/223062 > Reviewed-by: Markus Handell > Commit-Queue: Markus Handell > Cr-Commit-Position: refs/heads/master@{#34339} # Not skipping CQ checks because this is a reland. Bug: webrtc:12840, webrtc:9702 Change-Id: I4d2e086b686da8d5272d67293406300a07edef81 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/223260 Reviewed-by: Markus Handell Commit-Queue: Markus Handell Cr-Commit-Position: refs/heads/master@{#34345} --- p2p/base/port.cc | 26 +++++++++++--------------- p2p/base/port.h | 13 ++++--------- p2p/base/turn_port.cc | 2 +- p2p/base/turn_port.h | 5 +++-- 4 files changed, 19 insertions(+), 27 deletions(-) diff --git a/p2p/base/port.cc b/p2p/base/port.cc index a03a0d6a66..9b2adaf484 100644 --- a/p2p/base/port.cc +++ b/p2p/base/port.cc @@ -32,6 +32,7 @@ #include "rtc_base/string_encode.h" #include "rtc_base/string_utils.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/third_party/base64/base64.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/field_trial.h" @@ -173,15 +174,13 @@ void Port::Construct() { network_->SignalTypeChanged.connect(this, &Port::OnNetworkTypeChanged); network_cost_ = network_->GetCost(); - thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this, - MSG_DESTROY_IF_DEAD); + ScheduleDelayedDestructionIfDead(); RTC_LOG(LS_INFO) << ToString() << ": Port created with network cost " << network_cost_; } Port::~Port() { RTC_DCHECK_RUN_ON(thread_); - CancelPendingTasks(); // Delete all of the remaining connections. We copy the list up front // because each deletion will cause it to be modified. @@ -822,19 +821,11 @@ void Port::KeepAliveUntilPruned() { void Port::Prune() { state_ = State::PRUNED; - thread_->Post(RTC_FROM_HERE, this, MSG_DESTROY_IF_DEAD); + thread_->PostTask(webrtc::ToQueuedTask(safety_, [this] { DestroyIfDead(); })); } -// Call to stop any currently pending operations from running. -void Port::CancelPendingTasks() { - TRACE_EVENT0("webrtc", "Port::CancelPendingTasks"); +void Port::DestroyIfDead() { RTC_DCHECK_RUN_ON(thread_); - thread_->Clear(this); -} - -void Port::OnMessage(rtc::Message* pmsg) { - RTC_DCHECK_RUN_ON(thread_); - RTC_DCHECK(pmsg->message_id == MSG_DESTROY_IF_DEAD); bool dead = (state_ == State::INIT || state_ == State::PRUNED) && connections_.empty() && @@ -858,6 +849,12 @@ void Port::OnNetworkTypeChanged(const rtc::Network* network) { UpdateNetworkCost(); } +void Port::ScheduleDelayedDestructionIfDead() { + thread_->PostDelayedTask( + webrtc::ToQueuedTask(safety_, [this] { DestroyIfDead(); }), + timeout_delay_); +} + std::string Port::ToString() const { rtc::StringBuilder ss; ss << "Port[" << rtc::ToHex(reinterpret_cast(this)) << ":" @@ -908,8 +905,7 @@ void Port::OnConnectionDestroyed(Connection* conn) { // not cause the Port to be destroyed. if (connections_.empty()) { last_time_all_connections_removed_ = rtc::TimeMillis(); - thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this, - MSG_DESTROY_IF_DEAD); + ScheduleDelayedDestructionIfDead(); } } diff --git a/p2p/base/port.h b/p2p/base/port.h index 2c18f1adeb..9a0073a5da 100644 --- a/p2p/base/port.h +++ b/p2p/base/port.h @@ -41,6 +41,7 @@ #include "rtc_base/rate_tracker.h" #include "rtc_base/socket_address.h" #include "rtc_base/system/rtc_export.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread.h" #include "rtc_base/weak_ptr.h" @@ -171,7 +172,6 @@ typedef std::set ServerAddresses; // connections to similar mechanisms of the other client. Subclasses of this // one add support for specific mechanisms like local UDP ports. class Port : public PortInterface, - public rtc::MessageHandler, public sigslot::has_slots<> { public: // INIT: The state when a port is just created. @@ -220,9 +220,6 @@ class Port : public PortInterface, // Allows a port to be destroyed if no connection is using it. void Prune(); - // Call to stop any currently pending operations from running. - void CancelPendingTasks(); - // The thread on which this port performs its I/O. rtc::Thread* thread() { return thread_; } @@ -328,8 +325,6 @@ class Port : public PortInterface, // Called if the port has no connections and is no longer useful. void Destroy(); - void OnMessage(rtc::Message* pmsg) override; - // Debugging description of this port std::string ToString() const override; uint16_t min_port() { return min_port_; } @@ -380,8 +375,6 @@ class Port : public PortInterface, const rtc::SocketAddress& base_address); protected: - enum { MSG_DESTROY_IF_DEAD = 0, MSG_FIRST_AVAILABLE }; - virtual void UpdateNetworkCost(); void set_type(const std::string& type) { type_ = type; } @@ -448,8 +441,9 @@ class Port : public PortInterface, void Construct(); // Called when one of our connections deletes itself. void OnConnectionDestroyed(Connection* conn); - void OnNetworkTypeChanged(const rtc::Network* network); + void ScheduleDelayedDestructionIfDead(); + void DestroyIfDead(); rtc::Thread* const thread_; rtc::PacketSocketFactory* const factory_; @@ -499,6 +493,7 @@ class Port : public PortInterface, friend class Connection; webrtc::CallbackList port_destroyed_callback_list_; + webrtc::ScopedTaskSafety safety_; }; } // namespace cricket diff --git a/p2p/base/turn_port.cc b/p2p/base/turn_port.cc index 33925d43e7..a018caafa7 100644 --- a/p2p/base/turn_port.cc +++ b/p2p/base/turn_port.cc @@ -990,7 +990,7 @@ void TurnPort::OnMessage(rtc::Message* message) { Close(); break; default: - Port::OnMessage(message); + RTC_NOTREACHED(); } } diff --git a/p2p/base/turn_port.h b/p2p/base/turn_port.h index 55dbda5ece..8ed7cefa8e 100644 --- a/p2p/base/turn_port.h +++ b/p2p/base/turn_port.h @@ -25,6 +25,7 @@ #include "p2p/client/basic_port_allocator.h" #include "rtc_base/async_packet_socket.h" #include "rtc_base/async_resolver_interface.h" +#include "rtc_base/message_handler.h" #include "rtc_base/ssl_certificate.h" #include "rtc_base/task_utils/pending_task_safety_flag.h" @@ -41,7 +42,7 @@ extern const char TURN_PORT_TYPE[]; class TurnAllocateRequest; class TurnEntry; -class TurnPort : public Port { +class TurnPort : public Port, public rtc::MessageHandler { public: enum PortState { STATE_CONNECTING, // Initial state, cannot send any packets. @@ -298,7 +299,7 @@ class TurnPort : public Port { private: enum { - MSG_ALLOCATE_ERROR = MSG_FIRST_AVAILABLE, + MSG_ALLOCATE_ERROR, MSG_ALLOCATE_MISMATCH, MSG_TRY_ALTERNATE_SERVER, MSG_REFRESH_ERROR,