Replace MessageHandler by AsyncInvoker in P2PTransportChannel.

The existing asynchronous task execution in P2PTransportChannel is
implemented by posting messages to its network thread (a rtc::Thread)
and consuming these messages as a MessageHandler. The readability of
the implementation can be improved by using AsyncInvoker, which is
exactly designed for this scenario.

Bug: None
Change-Id: Ibee830d0d2bc19fc1ca5b894f194d9b69c40eef4
Reviewed-on: https://webrtc-review.googlesource.com/74642
Commit-Queue: Qingsi Wang <qingsi@google.com>
Reviewed-by: Taylor Brandstetter <deadbeef@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#23267}
This commit is contained in:
Qingsi Wang 2018-05-16 17:01:37 -07:00 committed by Commit Bot
parent 10a0e516bf
commit 502db3df4e
2 changed files with 42 additions and 75 deletions

View File

@ -31,23 +31,9 @@
namespace {
// messages for queuing up work for ourselves
enum {
MSG_SORT_AND_UPDATE_STATE = 1,
MSG_CHECK_AND_PING,
MSG_REGATHER_ON_FAILED_NETWORKS,
MSG_REGATHER_ON_ALL_NETWORKS
};
// The minimum improvement in RTT that justifies a switch.
const int kMinImprovement = 10;
struct SortCandidatePairRequest : public rtc::MessageData {
explicit SortCandidatePairRequest(const std::string& reason_to_sort)
: reason_to_sort(reason_to_sort) {}
const std::string reason_to_sort;
};
bool IsRelayRelay(const cricket::Connection* conn) {
return conn->local_candidate().type() == cricket::RELAY_PORT_TYPE &&
conn->remote_candidate().type() == cricket::RELAY_PORT_TYPE;
@ -274,12 +260,13 @@ bool P2PTransportChannel::MaybeSwitchSelectedConnection(
// threshold, the new connection is in a better receiving state than the
// currently selected connection. So we need to re-check whether it needs
// to be switched at a later time.
std::unique_ptr<SortCandidatePairRequest> request_message(
new SortCandidatePairRequest(reason +
" (after switching dampening interval)"));
thread()->PostDelayed(RTC_FROM_HERE,
config_.receiving_switching_delay_or_default(), this,
MSG_SORT_AND_UPDATE_STATE, request_message.release());
const std::string reason_to_sort =
reason + " (after switching dampening interval)";
invoker_.AsyncInvokeDelayed<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::SortConnectionsAndUpdateState, this,
reason_to_sort),
config_.receiving_switching_delay_or_default());
}
return false;
}
@ -1281,10 +1268,10 @@ void P2PTransportChannel::UpdateConnectionStates() {
void P2PTransportChannel::RequestSortAndStateUpdate(
const std::string& reason_to_sort) {
if (!sort_dirty_) {
std::unique_ptr<SortCandidatePairRequest> request_message(
new SortCandidatePairRequest(reason_to_sort));
network_thread_->Post(RTC_FROM_HERE, this, MSG_SORT_AND_UPDATE_STATE,
request_message.release());
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::SortConnectionsAndUpdateState, this,
reason_to_sort));
sort_dirty_ = true;
}
}
@ -1301,15 +1288,18 @@ void P2PTransportChannel::MaybeStartPinging() {
RTC_LOG(LS_INFO) << ToString()
<< ": Have a pingable connection for the first time; "
"starting to ping.";
thread()->Post(RTC_FROM_HERE, this, MSG_CHECK_AND_PING);
thread()->PostDelayed(
RTC_FROM_HERE,
config_.regather_on_failed_networks_interval_or_default(), this,
MSG_REGATHER_ON_FAILED_NETWORKS);
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::CheckAndPing, this));
invoker_.AsyncInvokeDelayed<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::RegatherOnFailedNetworks, this),
config_.regather_on_failed_networks_interval_or_default());
if (config_.regather_all_networks_interval_range) {
thread()->PostDelayed(RTC_FROM_HERE,
SampleRegatherAllNetworksInterval(), this,
MSG_REGATHER_ON_ALL_NETWORKS);
invoker_.AsyncInvokeDelayed<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::RegatherOnAllNetworks, this),
SampleRegatherAllNetworksInterval());
}
started_pinging_ = true;
}
@ -1821,33 +1811,8 @@ bool P2PTransportChannel::ReadyToSend(Connection* connection) const {
PresumedWritable(connection));
}
// Handle any queued up requests
void P2PTransportChannel::OnMessage(rtc::Message *pmsg) {
switch (pmsg->message_id) {
case MSG_SORT_AND_UPDATE_STATE: {
RTC_DCHECK(pmsg->pdata);
std::unique_ptr<SortCandidatePairRequest> request_message(
static_cast<SortCandidatePairRequest*>(pmsg->pdata));
SortConnectionsAndUpdateState(request_message->reason_to_sort);
break;
}
case MSG_CHECK_AND_PING:
OnCheckAndPing();
break;
case MSG_REGATHER_ON_FAILED_NETWORKS:
OnRegatherOnFailedNetworks();
break;
case MSG_REGATHER_ON_ALL_NETWORKS:
OnRegatherOnAllNetworks();
break;
default:
RTC_NOTREACHED();
break;
}
}
// Handle queued up check-and-ping request
void P2PTransportChannel::OnCheckAndPing() {
void P2PTransportChannel::CheckAndPing() {
// Make sure the states of the connections are up-to-date (since this affects
// which ones are pingable).
UpdateConnectionStates();
@ -1870,7 +1835,9 @@ void P2PTransportChannel::OnCheckAndPing() {
}
}
int delay = std::min(ping_interval, check_receiving_interval());
thread()->PostDelayed(RTC_FROM_HERE, delay, this, MSG_CHECK_AND_PING);
invoker_.AsyncInvokeDelayed<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::CheckAndPing, this), delay);
}
// A connection is considered a backup connection if the channel state
@ -2218,7 +2185,7 @@ void P2PTransportChannel::OnCandidatesRemoved(
SignalCandidatesRemoved(this, candidates_to_remove);
}
void P2PTransportChannel::OnRegatherOnFailedNetworks() {
void P2PTransportChannel::RegatherOnFailedNetworks() {
// Only re-gather when the current session is in the CLEARED state (i.e., not
// running or stopped). It is only possible to enter this state when we gather
// continually, so there is an implicit check on continual gathering here.
@ -2226,19 +2193,21 @@ void P2PTransportChannel::OnRegatherOnFailedNetworks() {
allocator_session()->RegatherOnFailedNetworks();
}
thread()->PostDelayed(
RTC_FROM_HERE, config_.regather_on_failed_networks_interval_or_default(),
this, MSG_REGATHER_ON_FAILED_NETWORKS);
invoker_.AsyncInvokeDelayed<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::RegatherOnFailedNetworks, this),
config_.regather_on_failed_networks_interval_or_default());
}
void P2PTransportChannel::OnRegatherOnAllNetworks() {
void P2PTransportChannel::RegatherOnAllNetworks() {
if (!allocator_sessions_.empty() && allocator_session()->IsCleared()) {
allocator_session()->RegatherOnAllNetworks();
}
thread()->PostDelayed(RTC_FROM_HERE,
SampleRegatherAllNetworksInterval(), this,
MSG_REGATHER_ON_ALL_NETWORKS);
invoker_.AsyncInvokeDelayed<void>(
RTC_FROM_HERE, thread(),
rtc::Bind(&P2PTransportChannel::RegatherOnAllNetworks, this),
SampleRegatherAllNetworksInterval());
}
void P2PTransportChannel::PruneAllPorts() {

View File

@ -36,6 +36,7 @@
#include "p2p/base/p2pconstants.h"
#include "p2p/base/portallocator.h"
#include "p2p/base/portinterface.h"
#include "rtc_base/asyncinvoker.h"
#include "rtc_base/asyncpacketsocket.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/random.h"
@ -72,8 +73,7 @@ class RemoteCandidate : public Candidate {
// P2PTransportChannel manages the candidates and connection process to keep
// two P2P clients connected to each other.
class P2PTransportChannel : public IceTransportInternal,
public rtc::MessageHandler {
class P2PTransportChannel : public IceTransportInternal {
public:
P2PTransportChannel(const std::string& transport_name,
int component,
@ -288,10 +288,9 @@ class P2PTransportChannel : public IceTransportInternal,
void OnNominated(Connection* conn);
void OnMessage(rtc::Message* pmsg) override;
void OnCheckAndPing();
void OnRegatherOnFailedNetworks();
void OnRegatherOnAllNetworks();
void CheckAndPing();
void RegatherOnFailedNetworks();
void RegatherOnAllNetworks();
void LogCandidatePairEvent(Connection* conn,
webrtc::IceCandidatePairEventType type);
@ -412,10 +411,9 @@ class P2PTransportChannel : public IceTransportInternal,
bool receiving_ = false;
bool writable_ = false;
rtc::AsyncInvoker invoker_;
webrtc::MetricsObserverInterface* metrics_observer_ = nullptr;
rtc::Optional<rtc::NetworkRoute> network_route_;
webrtc::IceEventLog ice_event_log_;
RTC_DISALLOW_COPY_AND_ASSIGN(P2PTransportChannel);