From 502db3df4e60a692687c56997c951de4440c67a7 Mon Sep 17 00:00:00 2001 From: Qingsi Wang Date: Wed, 16 May 2018 17:01:37 -0700 Subject: [PATCH] Replace MessageHandler by AsyncInvoker in P2PTransportChannel. The existing asynchronous task execution in P2PTransportChannel is implemented by posting messages to its network thread (a rtc::Thread) and consuming these messages as a MessageHandler. The readability of the implementation can be improved by using AsyncInvoker, which is exactly designed for this scenario. Bug: None Change-Id: Ibee830d0d2bc19fc1ca5b894f194d9b69c40eef4 Reviewed-on: https://webrtc-review.googlesource.com/74642 Commit-Queue: Qingsi Wang Reviewed-by: Taylor Brandstetter Cr-Commit-Position: refs/heads/master@{#23267} --- p2p/base/p2ptransportchannel.cc | 103 +++++++++++--------------------- p2p/base/p2ptransportchannel.h | 14 ++--- 2 files changed, 42 insertions(+), 75 deletions(-) diff --git a/p2p/base/p2ptransportchannel.cc b/p2p/base/p2ptransportchannel.cc index 689d2784b9..c769097f98 100644 --- a/p2p/base/p2ptransportchannel.cc +++ b/p2p/base/p2ptransportchannel.cc @@ -31,23 +31,9 @@ namespace { -// messages for queuing up work for ourselves -enum { - MSG_SORT_AND_UPDATE_STATE = 1, - MSG_CHECK_AND_PING, - MSG_REGATHER_ON_FAILED_NETWORKS, - MSG_REGATHER_ON_ALL_NETWORKS -}; - // The minimum improvement in RTT that justifies a switch. const int kMinImprovement = 10; -struct SortCandidatePairRequest : public rtc::MessageData { - explicit SortCandidatePairRequest(const std::string& reason_to_sort) - : reason_to_sort(reason_to_sort) {} - const std::string reason_to_sort; -}; - bool IsRelayRelay(const cricket::Connection* conn) { return conn->local_candidate().type() == cricket::RELAY_PORT_TYPE && conn->remote_candidate().type() == cricket::RELAY_PORT_TYPE; @@ -274,12 +260,13 @@ bool P2PTransportChannel::MaybeSwitchSelectedConnection( // threshold, the new connection is in a better receiving state than the // currently selected connection. So we need to re-check whether it needs // to be switched at a later time. - std::unique_ptr request_message( - new SortCandidatePairRequest(reason + - " (after switching dampening interval)")); - thread()->PostDelayed(RTC_FROM_HERE, - config_.receiving_switching_delay_or_default(), this, - MSG_SORT_AND_UPDATE_STATE, request_message.release()); + const std::string reason_to_sort = + reason + " (after switching dampening interval)"; + invoker_.AsyncInvokeDelayed( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::SortConnectionsAndUpdateState, this, + reason_to_sort), + config_.receiving_switching_delay_or_default()); } return false; } @@ -1281,10 +1268,10 @@ void P2PTransportChannel::UpdateConnectionStates() { void P2PTransportChannel::RequestSortAndStateUpdate( const std::string& reason_to_sort) { if (!sort_dirty_) { - std::unique_ptr request_message( - new SortCandidatePairRequest(reason_to_sort)); - network_thread_->Post(RTC_FROM_HERE, this, MSG_SORT_AND_UPDATE_STATE, - request_message.release()); + invoker_.AsyncInvoke( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::SortConnectionsAndUpdateState, this, + reason_to_sort)); sort_dirty_ = true; } } @@ -1301,15 +1288,18 @@ void P2PTransportChannel::MaybeStartPinging() { RTC_LOG(LS_INFO) << ToString() << ": Have a pingable connection for the first time; " "starting to ping."; - thread()->Post(RTC_FROM_HERE, this, MSG_CHECK_AND_PING); - thread()->PostDelayed( - RTC_FROM_HERE, - config_.regather_on_failed_networks_interval_or_default(), this, - MSG_REGATHER_ON_FAILED_NETWORKS); + invoker_.AsyncInvoke( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::CheckAndPing, this)); + invoker_.AsyncInvokeDelayed( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::RegatherOnFailedNetworks, this), + config_.regather_on_failed_networks_interval_or_default()); if (config_.regather_all_networks_interval_range) { - thread()->PostDelayed(RTC_FROM_HERE, - SampleRegatherAllNetworksInterval(), this, - MSG_REGATHER_ON_ALL_NETWORKS); + invoker_.AsyncInvokeDelayed( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::RegatherOnAllNetworks, this), + SampleRegatherAllNetworksInterval()); } started_pinging_ = true; } @@ -1821,33 +1811,8 @@ bool P2PTransportChannel::ReadyToSend(Connection* connection) const { PresumedWritable(connection)); } -// Handle any queued up requests -void P2PTransportChannel::OnMessage(rtc::Message *pmsg) { - switch (pmsg->message_id) { - case MSG_SORT_AND_UPDATE_STATE: { - RTC_DCHECK(pmsg->pdata); - std::unique_ptr request_message( - static_cast(pmsg->pdata)); - SortConnectionsAndUpdateState(request_message->reason_to_sort); - break; - } - case MSG_CHECK_AND_PING: - OnCheckAndPing(); - break; - case MSG_REGATHER_ON_FAILED_NETWORKS: - OnRegatherOnFailedNetworks(); - break; - case MSG_REGATHER_ON_ALL_NETWORKS: - OnRegatherOnAllNetworks(); - break; - default: - RTC_NOTREACHED(); - break; - } -} - // Handle queued up check-and-ping request -void P2PTransportChannel::OnCheckAndPing() { +void P2PTransportChannel::CheckAndPing() { // Make sure the states of the connections are up-to-date (since this affects // which ones are pingable). UpdateConnectionStates(); @@ -1870,7 +1835,9 @@ void P2PTransportChannel::OnCheckAndPing() { } } int delay = std::min(ping_interval, check_receiving_interval()); - thread()->PostDelayed(RTC_FROM_HERE, delay, this, MSG_CHECK_AND_PING); + invoker_.AsyncInvokeDelayed( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::CheckAndPing, this), delay); } // A connection is considered a backup connection if the channel state @@ -2218,7 +2185,7 @@ void P2PTransportChannel::OnCandidatesRemoved( SignalCandidatesRemoved(this, candidates_to_remove); } -void P2PTransportChannel::OnRegatherOnFailedNetworks() { +void P2PTransportChannel::RegatherOnFailedNetworks() { // Only re-gather when the current session is in the CLEARED state (i.e., not // running or stopped). It is only possible to enter this state when we gather // continually, so there is an implicit check on continual gathering here. @@ -2226,19 +2193,21 @@ void P2PTransportChannel::OnRegatherOnFailedNetworks() { allocator_session()->RegatherOnFailedNetworks(); } - thread()->PostDelayed( - RTC_FROM_HERE, config_.regather_on_failed_networks_interval_or_default(), - this, MSG_REGATHER_ON_FAILED_NETWORKS); + invoker_.AsyncInvokeDelayed( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::RegatherOnFailedNetworks, this), + config_.regather_on_failed_networks_interval_or_default()); } -void P2PTransportChannel::OnRegatherOnAllNetworks() { +void P2PTransportChannel::RegatherOnAllNetworks() { if (!allocator_sessions_.empty() && allocator_session()->IsCleared()) { allocator_session()->RegatherOnAllNetworks(); } - thread()->PostDelayed(RTC_FROM_HERE, - SampleRegatherAllNetworksInterval(), this, - MSG_REGATHER_ON_ALL_NETWORKS); + invoker_.AsyncInvokeDelayed( + RTC_FROM_HERE, thread(), + rtc::Bind(&P2PTransportChannel::RegatherOnAllNetworks, this), + SampleRegatherAllNetworksInterval()); } void P2PTransportChannel::PruneAllPorts() { diff --git a/p2p/base/p2ptransportchannel.h b/p2p/base/p2ptransportchannel.h index 1632ac5375..5525067f39 100644 --- a/p2p/base/p2ptransportchannel.h +++ b/p2p/base/p2ptransportchannel.h @@ -36,6 +36,7 @@ #include "p2p/base/p2pconstants.h" #include "p2p/base/portallocator.h" #include "p2p/base/portinterface.h" +#include "rtc_base/asyncinvoker.h" #include "rtc_base/asyncpacketsocket.h" #include "rtc_base/constructormagic.h" #include "rtc_base/random.h" @@ -72,8 +73,7 @@ class RemoteCandidate : public Candidate { // P2PTransportChannel manages the candidates and connection process to keep // two P2P clients connected to each other. -class P2PTransportChannel : public IceTransportInternal, - public rtc::MessageHandler { +class P2PTransportChannel : public IceTransportInternal { public: P2PTransportChannel(const std::string& transport_name, int component, @@ -288,10 +288,9 @@ class P2PTransportChannel : public IceTransportInternal, void OnNominated(Connection* conn); - void OnMessage(rtc::Message* pmsg) override; - void OnCheckAndPing(); - void OnRegatherOnFailedNetworks(); - void OnRegatherOnAllNetworks(); + void CheckAndPing(); + void RegatherOnFailedNetworks(); + void RegatherOnAllNetworks(); void LogCandidatePairEvent(Connection* conn, webrtc::IceCandidatePairEventType type); @@ -412,10 +411,9 @@ class P2PTransportChannel : public IceTransportInternal, bool receiving_ = false; bool writable_ = false; + rtc::AsyncInvoker invoker_; webrtc::MetricsObserverInterface* metrics_observer_ = nullptr; - rtc::Optional network_route_; - webrtc::IceEventLog ice_event_log_; RTC_DISALLOW_COPY_AND_ASSIGN(P2PTransportChannel);