diff --git a/webrtc/p2p/stunprober/stunprober.cc b/webrtc/p2p/stunprober/stunprober.cc index d7d527a37b..ee9eb2258c 100644 --- a/webrtc/p2p/stunprober/stunprober.cc +++ b/webrtc/p2p/stunprober/stunprober.cc @@ -28,7 +28,7 @@ namespace stunprober { namespace { -const int thread_wake_up_interval_ms = 5; +const int THREAD_WAKE_UP_INTERVAL_MS = 5; template void IncrementCounterByAddress(std::map* counter_per_ip, const T& ip) { @@ -143,7 +143,7 @@ void StunProber::Requester::SendStunRequest() { rtc::scoped_ptr request_packet( new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)); if (!message.Write(request_packet.get())) { - prober_->End(WRITE_FAILED); + prober_->ReportOnFinished(WRITE_FAILED); return; } @@ -157,7 +157,7 @@ void StunProber::Requester::SendStunRequest() { int rv = socket_->SendTo(const_cast(request_packet->Data()), request_packet->Length(), addr, options); if (rv < 0) { - prober_->End(WRITE_FAILED); + prober_->ReportOnFinished(WRITE_FAILED); return; } @@ -207,7 +207,7 @@ void StunProber::Requester::OnStunResponseReceived( Request* request = GetRequestByAddress(addr.ipaddr()); if (!request) { // Something is wrong, finish the test. - prober_->End(GENERIC_FAILURE); + prober_->ReportOnFinished(GENERIC_FAILURE); return; } @@ -255,6 +255,17 @@ bool StunProber::Start(const std::vector& servers, int num_request_per_ip, int timeout_ms, const AsyncCallback callback) { + observer_adapter_.set_callback(callback); + return Prepare(servers, shared_socket_mode, interval_ms, num_request_per_ip, + timeout_ms, &observer_adapter_); +} + +bool StunProber::Prepare(const std::vector& servers, + bool shared_socket_mode, + int interval_ms, + int num_request_per_ip, + int timeout_ms, + StunProber::Observer* observer) { RTC_DCHECK(thread_checker_.CalledOnValidThread()); interval_ms_ = interval_ms; shared_socket_mode_ = shared_socket_mode; @@ -266,10 +277,19 @@ bool StunProber::Start(const std::vector& servers, timeout_ms_ = timeout_ms; servers_ = servers; - finished_callback_ = callback; + observer_ = observer; return ResolveServerName(servers_.back()); } +bool StunProber::Start(StunProber::Observer* observer) { + observer_ = observer; + if (total_ready_sockets_ != total_socket_required()) { + return false; + } + MaybeScheduleStunRequests(); + return true; +} + bool StunProber::ResolveServerName(const rtc::SocketAddress& addr) { rtc::AsyncResolverInterface* resolver = socket_factory_->CreateAsyncResolver(); @@ -285,7 +305,7 @@ void StunProber::OnSocketReady(rtc::AsyncPacketSocket* socket, const rtc::SocketAddress& addr) { total_ready_sockets_++; if (total_ready_sockets_ == total_socket_required()) { - MaybeScheduleStunRequests(); + ReportOnPrepared(SUCCESS); } } @@ -307,13 +327,13 @@ void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) { if (servers_.size()) { if (!ResolveServerName(servers_.back())) { - End(RESOLVE_FAILED); + ReportOnPrepared(RESOLVE_FAILED); } return; } if (all_servers_addrs_.size() == 0) { - End(RESOLVE_FAILED); + ReportOnPrepared(RESOLVE_FAILED); return; } @@ -328,7 +348,7 @@ void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) { socket_factory_->CreateUdpSocket(rtc::SocketAddress(INADDR_ANY, 0), 0, 0)); if (!socket) { - End(GENERIC_FAILURE); + ReportOnPrepared(GENERIC_FAILURE); return; } // Chrome and WebRTC behave differently in terms of the state of a socket @@ -374,25 +394,42 @@ bool StunProber::SendNextRequest() { return true; } +bool StunProber::should_send_next_request(uint32_t now) { + if (interval_ms_ < THREAD_WAKE_UP_INTERVAL_MS) { + return now >= next_request_time_ms_; + } else { + return (now + (THREAD_WAKE_UP_INTERVAL_MS / 2)) >= next_request_time_ms_; + } +} + +int StunProber::get_wake_up_interval_ms() { + if (interval_ms_ < THREAD_WAKE_UP_INTERVAL_MS) { + return 1; + } else { + return THREAD_WAKE_UP_INTERVAL_MS; + } +} + void StunProber::MaybeScheduleStunRequests() { RTC_DCHECK(thread_checker_.CalledOnValidThread()); uint32_t now = rtc::Time(); if (Done()) { invoker_.AsyncInvokeDelayed( - thread_, rtc::Bind(&StunProber::End, this, SUCCESS), timeout_ms_); + thread_, rtc::Bind(&StunProber::ReportOnFinished, this, SUCCESS), + timeout_ms_); return; } - if ((now + (thread_wake_up_interval_ms / 2)) >= next_request_time_ms_) { + if (should_send_next_request(now)) { if (!SendNextRequest()) { - End(GENERIC_FAILURE); + ReportOnFinished(GENERIC_FAILURE); return; } next_request_time_ms_ = now + interval_ms_; } invoker_.AsyncInvokeDelayed( thread_, rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), - thread_wake_up_interval_ms /* ms */); + get_wake_up_interval_ms()); } bool StunProber::GetStats(StunProber::Stats* prob_stats) const { @@ -520,14 +557,15 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const { return true; } -void StunProber::End(StunProber::Status status) { - RTC_DCHECK(thread_checker_.CalledOnValidThread()); - if (!finished_callback_.empty()) { - AsyncCallback callback = finished_callback_; - finished_callback_ = AsyncCallback(); +void StunProber::ReportOnPrepared(StunProber::Status status) { + if (observer_) { + observer_->OnPrepared(this, status); + } +} - // Callback at the last since the prober might be deleted in the callback. - callback(this, status); +void StunProber::ReportOnFinished(StunProber::Status status) { + if (observer_) { + observer_->OnFinished(this, status); } } diff --git a/webrtc/p2p/stunprober/stunprober.h b/webrtc/p2p/stunprober/stunprober.h index b71d52374f..9d2ad222e5 100644 --- a/webrtc/p2p/stunprober/stunprober.h +++ b/webrtc/p2p/stunprober/stunprober.h @@ -32,6 +32,7 @@ class AsyncPacketSocket; class PacketSocketFactory; class Thread; class NetworkManager; +class AsyncResolverInterface; } // namespace rtc namespace stunprober { @@ -60,6 +61,13 @@ class StunProber : public sigslot::has_slots<> { READ_FAILED, // Reading the reply from the server failed. }; + class Observer { + public: + virtual ~Observer() = default; + virtual void OnPrepared(StunProber* prober, StunProber::Status status) = 0; + virtual void OnFinished(StunProber* prober, StunProber::Status status) = 0; + }; + struct Stats { Stats() {} @@ -98,7 +106,8 @@ class StunProber : public sigslot::has_slots<> { // many requests should be tried for each resolved IP address. In shared mode, // (the number of sockets to be created) equals to |requests_per_ip|. In // non-shared mode, (the number of sockets) equals to requests_per_ip * (the - // number of resolved IP addresses). + // number of resolved IP addresses). TODO(guoweis): Remove this once + // everything moved to Prepare() and Run(). bool Start(const std::vector& servers, bool shared_socket_mode, int stun_ta_interval_ms, @@ -106,16 +115,53 @@ class StunProber : public sigslot::has_slots<> { int timeout_ms, const AsyncCallback finish_callback); + // TODO(guoweis): The combination of Prepare() and Run() are equivalent to the + // Start() above. Remove Start() once everything is migrated. + bool Prepare(const std::vector& servers, + bool shared_socket_mode, + int stun_ta_interval_ms, + int requests_per_ip, + int timeout_ms, + StunProber::Observer* observer); + + // Start to send out the STUN probes. + bool Start(StunProber::Observer* observer); + // Method to retrieve the Stats once |finish_callback| is invoked. Returning // false when the result is inconclusive, for example, whether it's behind a // NAT or not. bool GetStats(Stats* stats) const; + int estimated_execution_time() { + return static_cast(requests_per_ip_ * all_servers_addrs_.size() * + interval_ms_); + } + private: // A requester tracks the requests and responses from a single socket to many // STUN servers. class Requester; + // TODO(guoweis): Remove this once all dependencies move away from + // AsyncCallback. + class ObserverAdapter : public Observer { + public: + void set_callback(AsyncCallback callback) { callback_ = callback; } + void OnPrepared(StunProber* stunprober, Status status) { + if (status == SUCCESS) { + stunprober->Start(this); + } else { + callback_(stunprober, status); + } + } + void OnFinished(StunProber* stunprober, Status status) { + callback_(stunprober, status); + } + + private: + AsyncCallback callback_; + }; + bool ResolveServerName(const rtc::SocketAddress& addr); void OnServerResolved(rtc::AsyncResolverInterface* resolver); @@ -131,15 +177,17 @@ class StunProber : public sigslot::has_slots<> { requests_per_ip_; } + bool should_send_next_request(uint32_t now); + int get_wake_up_interval_ms(); + bool SendNextRequest(); // Will be invoked in 1ms intervals and schedule the next request from the // |current_requester_| if the time has passed for another request. void MaybeScheduleStunRequests(); - // End the probe with the given |status|. Invokes |fininsh_callback|, which - // may destroy the class. - void End(StunProber::Status status); + void ReportOnPrepared(StunProber::Status status); + void ReportOnFinished(StunProber::Status status); Requester* CreateRequester(); @@ -172,10 +220,6 @@ class StunProber : public sigslot::has_slots<> { // Accumulate all resolved addresses. std::vector all_servers_addrs_; - // Caller-supplied callback executed when testing is completed, called by - // End(). - AsyncCallback finished_callback_; - // The set of STUN probe sockets and their state. std::vector requesters_; @@ -188,6 +232,11 @@ class StunProber : public sigslot::has_slots<> { rtc::AsyncInvoker invoker_; + Observer* observer_ = nullptr; + // TODO(guoweis): Remove this once all dependencies move away from + // AsyncCallback. + ObserverAdapter observer_adapter_; + rtc::NetworkManager::NetworkList networks_; RTC_DISALLOW_COPY_AND_ASSIGN(StunProber);