From da8a45fdaa7035a07026c95efdb4b882adc52cc4 Mon Sep 17 00:00:00 2001 From: Markus Handell Date: Mon, 7 Jun 2021 13:41:15 +0200 Subject: [PATCH] AllocationSequence: migrate from rtc::Message to TaskQueue. AllocationSequence uses legacy rtc::Thread message handling. In order to cancel callbacks it uses rtc::Thread::Clear() which uses locks and necessitates looping through all currently queued (unbounded) messages in the thread. In particular, these Clear calls are common during negotiation and the probability of having a lot of queued messages is high due to a long-running network thread function invoked on the network thread. Fix this by migrating AllocationSequence to task queues. Bug: webrtc:12840, webrtc:9702 Change-Id: I42bbdb59fb2c88b50e866326ba15134dcc6ce691 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221369 Commit-Queue: Markus Handell Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/master@{#34241} --- p2p/client/basic_port_allocator.cc | 36 ++++++++++++++---------------- p2p/client/basic_port_allocator.h | 14 +++++++----- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/p2p/client/basic_port_allocator.cc b/p2p/client/basic_port_allocator.cc index 49d4958d59..9d7a4f9005 100644 --- a/p2p/client/basic_port_allocator.cc +++ b/p2p/client/basic_port_allocator.cc @@ -39,10 +39,6 @@ using rtc::CreateRandomId; namespace cricket { namespace { -enum { - MSG_ALLOCATION_PHASE, -}; - const int PHASE_UDP = 0; const int PHASE_RELAY = 1; const int PHASE_TCP = 2; @@ -1261,10 +1257,6 @@ void AllocationSequence::OnNetworkFailed() { Stop(); } -AllocationSequence::~AllocationSequence() { - session_->network_thread()->Clear(this); -} - void AllocationSequence::DisableEquivalentPhases(rtc::Network* network, PortConfiguration* config, uint32_t* flags) { @@ -1339,7 +1331,9 @@ void AllocationSequence::DisableEquivalentPhases(rtc::Network* network, void AllocationSequence::Start() { state_ = kRunning; - session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE); + + session_->network_thread()->PostTask(webrtc::ToQueuedTask( + safety_, [this, epoch = epoch_] { Process(epoch); })); // Take a snapshot of the best IP, so that when DisableEquivalentPhases is // called next time, we enable all phases if the best IP has since changed. previous_best_ip_ = network_->GetBestIP(); @@ -1349,16 +1343,18 @@ void AllocationSequence::Stop() { // If the port is completed, don't set it to stopped. if (state_ == kRunning) { state_ = kStopped; - session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); + // Cause further Process calls in the previous epoch to be ignored. + ++epoch_; } } -void AllocationSequence::OnMessage(rtc::Message* msg) { +void AllocationSequence::Process(int epoch) { RTC_DCHECK(rtc::Thread::Current() == session_->network_thread()); - RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE); - const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"}; + if (epoch != epoch_) + return; + // Perform all of the phases in the current step. RTC_LOG(LS_INFO) << network_->ToString() << ": Allocation Phase=" << PHASE_NAMES[phase_]; @@ -1384,13 +1380,15 @@ void AllocationSequence::OnMessage(rtc::Message* msg) { if (state() == kRunning) { ++phase_; - session_->network_thread()->PostDelayed(RTC_FROM_HERE, - session_->allocator()->step_delay(), - this, MSG_ALLOCATION_PHASE); + session_->network_thread()->PostDelayedTask( + webrtc::ToQueuedTask(safety_, + [this, epoch = epoch_] { Process(epoch); }), + session_->allocator()->step_delay()); } else { - // If all phases in AllocationSequence are completed, no allocation - // steps needed further. Canceling pending signal. - session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); + // No allocation steps needed further if all phases in AllocationSequence + // are completed. Cause further Process calls in the previous epoch to be + // ignored. + ++epoch_; port_allocation_complete_callback_(); } } diff --git a/p2p/client/basic_port_allocator.h b/p2p/client/basic_port_allocator.h index ede9395a94..77aceb1e9c 100644 --- a/p2p/client/basic_port_allocator.h +++ b/p2p/client/basic_port_allocator.h @@ -327,8 +327,8 @@ class TurnPort; // Performs the allocation of ports, in a sequenced (timed) manner, for a given // network and IP address. -class AllocationSequence : public rtc::MessageHandler, - public sigslot::has_slots<> { +// This class is thread-compatible. +class AllocationSequence : public sigslot::has_slots<> { public: enum State { kInit, // Initial state. @@ -350,7 +350,6 @@ class AllocationSequence : public rtc::MessageHandler, PortConfiguration* config, uint32_t flags, std::function port_allocation_complete_callback); - ~AllocationSequence() override; void Init(); void Clear(); void OnNetworkFailed(); @@ -372,9 +371,6 @@ class AllocationSequence : public rtc::MessageHandler, void Start(); void Stop(); - // MessageHandler - void OnMessage(rtc::Message* msg) override; - protected: // For testing. void CreateTurnPort(const RelayServerConfig& config); @@ -382,6 +378,7 @@ class AllocationSequence : public rtc::MessageHandler, private: typedef std::vector ProtocolList; + void Process(int epoch); bool IsFlagSet(uint32_t flag) { return ((flags_ & flag) != 0); } void CreateUDPPorts(); void CreateTCPPorts(); @@ -411,6 +408,11 @@ class AllocationSequence : public rtc::MessageHandler, std::vector relay_ports_; int phase_; std::function port_allocation_complete_callback_; + // This counter is sampled and passed together with tasks when tasks are + // posted. If the sampled counter doesn't match |epoch_| on reception, the + // posted task is ignored. + int epoch_ = 0; + webrtc::ScopedTaskSafety safety_; }; } // namespace cricket