From 37931c4b851919e55b17ef571118d696317d0aa3 Mon Sep 17 00:00:00 2001 From: Guo-wei Shieh Date: Fri, 15 May 2015 10:26:52 -0700 Subject: [PATCH] Stunprober interface, its implementation and a command line driver. Chrome will only see stunprober.h and stunprobercontext.h and link with libstunprober.a. It has support for shared and non-shared mode. In shared mode, a socket will be used to ping all resolved IPs once. In non-shared mode, each ping will get a new socket. The thread scheduling will try to run MaybeScheduleStunRequest every 1 ms. When the time is up for next ping, it'll send it out. BUG=4576 R=pthatcher@webrtc.org Review URL: https://webrtc-codereview.appspot.com/51729004 Cr-Commit-Position: refs/heads/master@{#9194} --- webrtc/p2p/p2p.gyp | 28 ++ webrtc/p2p/p2p_tests.gypi | 1 + webrtc/p2p/stunprober/main.cc | 188 ++++++++ webrtc/p2p/stunprober/stunprober.cc | 409 ++++++++++++++++++ webrtc/p2p/stunprober/stunprober.h | 329 ++++++++++++++ .../p2p/stunprober/stunprober_dependencies.h | 182 ++++++++ webrtc/p2p/stunprober/stunprober_unittest.cc | 208 +++++++++ webrtc/webrtc_tests.gypi | 2 + 8 files changed, 1347 insertions(+) create mode 100644 webrtc/p2p/stunprober/main.cc create mode 100644 webrtc/p2p/stunprober/stunprober.cc create mode 100644 webrtc/p2p/stunprober/stunprober.h create mode 100644 webrtc/p2p/stunprober/stunprober_dependencies.h create mode 100644 webrtc/p2p/stunprober/stunprober_unittest.cc diff --git a/webrtc/p2p/p2p.gyp b/webrtc/p2p/p2p.gyp index 556a5d16b2..ac7a4641de 100644 --- a/webrtc/p2p/p2p.gyp +++ b/webrtc/p2p/p2p.gyp @@ -108,6 +108,34 @@ ], }], ], + }, + { + 'target_name': 'libstunprober', + 'type': 'static_library', + 'dependencies': [ + '<(webrtc_root)/base/base.gyp:rtc_base', + '<(webrtc_root)/common.gyp:webrtc_common', + ], + 'cflags_cc!': [ + '-Wnon-virtual-dtor', + ], + 'sources': [ + 'stunprober/stunprober.cc', + ], + }, + { + 'target_name': 'stun_prober', + 'type': 'executable', + 'dependencies': [ + 'libstunprober', + 'rtc_p2p' + ], + 'cflags_cc!': [ + '-Wnon-virtual-dtor', + ], + 'sources': [ + 'stunprober/main.cc', + ], }], } diff --git a/webrtc/p2p/p2p_tests.gypi b/webrtc/p2p/p2p_tests.gypi index 3387532ece..c685a95412 100644 --- a/webrtc/p2p/p2p_tests.gypi +++ b/webrtc/p2p/p2p_tests.gypi @@ -34,6 +34,7 @@ 'client/connectivitychecker_unittest.cc', 'client/fakeportallocator.h', 'client/portallocator_unittest.cc', + 'stunprober/stunprober_unittest.cc', ], }, }, diff --git a/webrtc/p2p/stunprober/main.cc b/webrtc/p2p/stunprober/main.cc new file mode 100644 index 0000000000..31b0443fbc --- /dev/null +++ b/webrtc/p2p/stunprober/main.cc @@ -0,0 +1,188 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include +#include + +#include +#include + +#include "webrtc/base/checks.h" +#include "webrtc/base/flags.h" +#include "webrtc/base/helpers.h" +#include "webrtc/base/nethelpers.h" +#include "webrtc/base/logging.h" +#include "webrtc/base/scoped_ptr.h" +#include "webrtc/base/ssladapter.h" +#include "webrtc/base/stringutils.h" +#include "webrtc/base/thread.h" +#include "webrtc/base/timeutils.h" +#include "webrtc/p2p/stunprober/stunprober.h" +#include "webrtc/p2p/stunprober/stunprober_dependencies.h" + +using stunprober::HostNameResolverInterface; +using stunprober::TaskRunner; +using stunprober::SocketFactory; +using stunprober::StunProber; +using stunprober::AsyncCallback; +using stunprober::ClientSocketInterface; +using stunprober::ServerSocketInterface; +using stunprober::SocketFactory; +using stunprober::TaskRunner; + +DEFINE_bool(help, false, "Prints this message"); +DEFINE_int(interval, 10, "Interval of consecutive stun pings in milliseconds"); +DEFINE_bool(shared_socket, false, "Share socket mode for different remote IPs"); +DEFINE_int(pings_per_ip, + 10, + "Number of consecutive stun pings to send for each IP"); +DEFINE_int(timeout, + 1000, + "Milliseconds of wait after the last ping sent before exiting"); +DEFINE_int(port, 3478, "STUN server port"); +DEFINE_string(server, "stun.voxgratia.org", "STUN server address"); + +namespace { + +class HostNameResolver : public HostNameResolverInterface, + public sigslot::has_slots<> { + public: + HostNameResolver() { resolver_ = new rtc::AsyncResolver(); } + virtual ~HostNameResolver() { + // rtc::AsyncResolver inherits from SignalThread which requires explicit + // Release(). + resolver_->Release(); + } + + void Resolve(const rtc::SocketAddress& addr, + std::vector* addresses, + AsyncCallback callback) override { + DCHECK(callback_.empty()); + addr_ = addr; + callback_ = callback; + result_ = addresses; + resolver_->SignalDone.connect(this, &HostNameResolver::OnResolveResult); + resolver_->Start(addr); + } + + void OnResolveResult(rtc::AsyncResolverInterface* resolver) { + DCHECK(resolver); + int rv = resolver_->GetError(); + LOG(LS_INFO) << "ResolveResult for " << addr_.ToString() << " : " << rv; + if (rv == 0 && result_) { + *result_ = resolver_->addresses(); + + for (auto& ip : *result_) { + LOG(LS_INFO) << "\t" << ip.ToString(); + } + } + if (!callback_.empty()) { + // Need to be the last statement as the object could be deleted by the + // callback_ in the failure case. + callback_(rv); + } + } + + private: + AsyncCallback callback_; + rtc::SocketAddress addr_; + std::vector* result_; + + // Not using smart ptr here as this requires specific release pattern. + rtc::AsyncResolver* resolver_; +}; + +std::string HistogramName(bool behind_nat, + bool is_src_port_shared, + int interval_ms, + std::string suffix) { + char output[1000]; + rtc::sprintfn(output, sizeof(output), "NetConnectivity6.%s.%s.%dms.%s", + behind_nat ? "NAT" : "NoNAT", + is_src_port_shared ? "SrcPortShared" : "SrcPortUnique", + interval_ms, suffix.c_str()); + return std::string(output); +} + +void PrintStats(StunProber* prober) { + StunProber::Stats stats; + if (!prober->GetStats(&stats)) { + LOG(LS_WARNING) << "Results are inconclusive."; + return; + } + + LOG(LS_INFO) << "Requests sent: " << stats.num_request_sent; + LOG(LS_INFO) << "Responses received: " << stats.num_response_received; + LOG(LS_INFO) << "Target interval (ns): " << stats.target_request_interval_ns; + LOG(LS_INFO) << "Actual interval (ns): " << stats.actual_request_interval_ns; + LOG(LS_INFO) << "Behind NAT: " << stats.behind_nat; + if (stats.behind_nat) { + LOG(LS_INFO) << "NAT is symmetrical: " << (stats.srflx_ips.size() > 1); + } + LOG(LS_INFO) << "Host IP: " << stats.host_ip; + LOG(LS_INFO) << "Server-reflexive ips: "; + for (auto& ip : stats.srflx_ips) { + LOG(LS_INFO) << "\t" << ip; + } + + std::string histogram_name = HistogramName( + stats.behind_nat, FLAG_shared_socket, FLAG_interval, "SuccessPercent"); + + LOG(LS_INFO) << "Histogram '" << histogram_name.c_str() + << "' = " << stats.success_percent; + + histogram_name = HistogramName(stats.behind_nat, FLAG_shared_socket, + FLAG_interval, "ResponseLatency"); + + LOG(LS_INFO) << "Histogram '" << histogram_name.c_str() + << "' = " << stats.average_rtt_ms << " ms"; +} + +void StopTrial(rtc::Thread* thread, StunProber* prober, int result) { + thread->Quit(); + if (prober) { + LOG(LS_INFO) << "Result: " << result; + if (result == StunProber::SUCCESS) { + PrintStats(prober); + } + } +} + +} // namespace + +int main(int argc, char** argv) { + rtc::FlagList::SetFlagsFromCommandLine(&argc, argv, true); + if (FLAG_help) { + rtc::FlagList::Print(nullptr, false); + return 0; + } + + // Abort if the user specifies a port that is outside the allowed + // range [1, 65535]. + if ((FLAG_port < 1) || (FLAG_port > 65535)) { + printf("Error: %i is not a valid port.\n", FLAG_port); + return -1; + } + + rtc::InitializeSSL(); + rtc::InitRandom(rtc::Time()); + rtc::Thread* thread = rtc::ThreadManager::Instance()->WrapCurrentThread(); + StunProber* prober = new StunProber(new HostNameResolver(), + new SocketFactory(), new TaskRunner()); + auto finish_callback = + [thread, prober](int result) { StopTrial(thread, prober, result); }; + prober->Start(FLAG_server, FLAG_port, FLAG_shared_socket, FLAG_interval, + FLAG_pings_per_ip, FLAG_timeout, + AsyncCallback(finish_callback)); + thread->Run(); + delete prober; + return 0; +} diff --git a/webrtc/p2p/stunprober/stunprober.cc b/webrtc/p2p/stunprober/stunprober.cc new file mode 100644 index 0000000000..6f16ba4627 --- /dev/null +++ b/webrtc/p2p/stunprober/stunprober.cc @@ -0,0 +1,409 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include +#include + +#include "webrtc/base/bind.h" +#include "webrtc/base/checks.h" +#include "webrtc/base/helpers.h" +#include "webrtc/base/timeutils.h" +#include "webrtc/p2p/base/stun.h" +#include "webrtc/p2p/stunprober/stunprober.h" + +namespace stunprober { + +StunProber::Requester::Requester(StunProber* prober, + ServerSocketInterface* socket, + const std::vector server_ips, + uint16 port) + : prober_(prober), + socket_(socket), + response_packet_(new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)), + server_ips_(server_ips), + port_(port), + thread_checker_(prober->thread_checker_) { +} + +StunProber::Requester::~Requester() { + if (socket_) { + socket_->Close(); + } + for (auto req : requests_) { + if (req) { + delete req; + } + } +} + +void StunProber::Requester::SendStunRequest() { + DCHECK(thread_checker_.CalledOnValidThread()); + requests_.push_back(new Request()); + Request& request = *(requests_.back()); + cricket::StunMessage message; + + // Random transaction ID, STUN_BINDING_REQUEST + message.SetTransactionID( + rtc::CreateRandomString(cricket::kStunTransactionIdLength)); + message.SetType(cricket::STUN_BINDING_REQUEST); + + rtc::scoped_ptr request_packet( + new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)); + if (!message.Write(request_packet.get())) { + prober_->End(WRITE_FAILED, 0); + return; + } + + auto addr = rtc::SocketAddress(server_ips_[num_request_sent_], port_); + request.server_addr = addr.ipaddr(); + + int rv = 0; + + // Only bind to the interface at the first request. + if (num_request_sent_ == 0) { + rtc::IPAddress local_addr; + rv = prober_->GetLocalAddress(&local_addr); + if (rv != 0) { + prober_->End(GENERIC_FAILURE, rv); + return; + } + rv = socket_->Bind(rtc::SocketAddress(local_addr, 0)); + if (rv < 0) { + prober_->End(GENERIC_FAILURE, rv); + return; + } + } + + // The write must succeed immediately. Otherwise, the calculating of the STUN + // request timing could become too complicated. Callback is ignored by passing + // empty AsyncCallback. + rv = socket_->SendTo(addr, const_cast(request_packet->Data()), + request_packet->Length(), AsyncCallback()); + if (rv < 0) { + prober_->End(WRITE_FAILED, rv); + return; + } + + request.sent_time_ns = rtc::Time(); + + // Post a read waiting for response. For share mode, the subsequent read will + // be posted inside OnStunResponseReceived. + if (num_request_sent_ == 0) { + ReadStunResponse(); + } + + num_request_sent_++; + DCHECK(static_cast(num_request_sent_) <= server_ips_.size()); +} + +void StunProber::Requester::ReadStunResponse() { + DCHECK(thread_checker_.CalledOnValidThread()); + if (!socket_) { + return; + } + + int rv = socket_->RecvFrom( + response_packet_->ReserveWriteBuffer(kMaxUdpBufferSize), + kMaxUdpBufferSize, &addr_, + [this](int result) { this->OnStunResponseReceived(result); }); + if (rv != SocketInterface::IO_PENDING) { + OnStunResponseReceived(rv); + } +} + +void StunProber::Requester::Request::ProcessResponse( + rtc::ByteBuffer* message, + int buf_len, + const rtc::IPAddress& local_addr) { + int64 now = rtc::Time(); + + cricket::StunMessage stun_response; + if (!stun_response.Read(message)) { + // Invalid or incomplete STUN packet. + received_time_ns = 0; + return; + } + + // Get external address of the socket. + const cricket::StunAddressAttribute* addr_attr = + stun_response.GetAddress(cricket::STUN_ATTR_MAPPED_ADDRESS); + if (addr_attr == nullptr) { + // Addresses not available to detect whether or not behind a NAT. + return; + } + + if (addr_attr->family() != cricket::STUN_ADDRESS_IPV4 && + addr_attr->family() != cricket::STUN_ADDRESS_IPV6) { + return; + } + + received_time_ns = now; + + srflx_ip = addr_attr->ipaddr().ToString(); + + // Calculate behind_nat. + behind_nat = (srflx_ip.compare(local_addr.ToString()) != 0); +} + +void StunProber::Requester::OnStunResponseReceived(int result) { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK(socket_); + + if (result < 0) { + // Something is wrong, finish the test. + prober_->End(READ_FAILED, result); + return; + } + + Request* request = GetRequestByAddress(addr_.ipaddr()); + if (!request) { + // Something is wrong, finish the test. + prober_->End(GENERIC_FAILURE, result); + return; + } + + num_response_received_++; + + // Resize will set the end_ to indicate that there are data available in this + // ByteBuffer. + response_packet_->Resize(result); + request->ProcessResponse(response_packet_.get(), result, + prober_->local_addr_); + + if (static_cast(num_response_received_) < server_ips_.size()) { + // Post another read response. + ReadStunResponse(); + } +} + +StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress( + const rtc::IPAddress& ipaddr) { + DCHECK(thread_checker_.CalledOnValidThread()); + for (auto request : requests_) { + if (request->server_addr == ipaddr) { + return request; + } + } + + return nullptr; +} + +StunProber::StunProber(HostNameResolverInterface* host_name_resolver, + SocketFactoryInterface* socket_factory, + TaskRunnerInterface* task_runner) + : interval_ms_(0), + socket_factory_(socket_factory), + resolver_(host_name_resolver), + task_runner_(task_runner) { +} + +StunProber::~StunProber() { + for (auto req : requesters_) { + if (req) { + delete req; + } + } +} + +bool StunProber::Start(const std::string& server, + uint16 port, + bool shared_socket_mode, + int interval_ms, + int num_request_per_ip, + int timeout_ms, + const AsyncCallback callback) { + DCHECK(thread_checker_.CalledOnValidThread()); + interval_ms_ = interval_ms; + shared_socket_mode_ = shared_socket_mode; + + requests_per_ip_ = num_request_per_ip; + if (requests_per_ip_ == 0) { + return false; + } + + timeout_ms_ = timeout_ms; + server_ = rtc::SocketAddress(server, port); + finished_callback_ = callback; + resolver_->Resolve(server_, &server_ips_, + [this](int result) { this->OnServerResolved(result); }); + return true; +} + +void StunProber::OnServerResolved(int result) { + DCHECK(thread_checker_.CalledOnValidThread()); + if (result != 0 || server_ips_.size() == 0) { + End(RESOLVE_FAILED, result); + return; + } + + // Dedupe. + std::set addrs(server_ips_.begin(), server_ips_.end()); + server_ips_.assign(addrs.begin(), addrs.end()); + + rtc::IPAddress addr; + if (GetLocalAddress(&addr) != 0) { + End(GENERIC_FAILURE, result); + return; + } + + MaybeScheduleStunRequests(); +} + +int StunProber::GetLocalAddress(rtc::IPAddress* addr) { + DCHECK(thread_checker_.CalledOnValidThread()); + if (local_addr_.family() == AF_UNSPEC) { + rtc::SocketAddress sock_addr; + rtc::scoped_ptr socket( + socket_factory_->CreateClientSocket()); + int rv = + socket->Connect(rtc::SocketAddress(server_ips_[0], server_.port())); + if (rv != SUCCESS) { + End(GENERIC_FAILURE, rv); + return rv; + } + rv = socket->GetLocalAddress(&sock_addr); + if (rv != SUCCESS) { + End(GENERIC_FAILURE, rv); + return rv; + } + local_addr_ = sock_addr.ipaddr(); + socket->Close(); + } + *addr = local_addr_; + return 0; +} + +StunProber::Requester* StunProber::CreateRequester() { + DCHECK(thread_checker_.CalledOnValidThread()); + rtc::scoped_ptr socket( + socket_factory_->CreateServerSocket(kMaxUdpBufferSize, + kMaxUdpBufferSize)); + if (!socket) { + return nullptr; + } + if (shared_socket_mode_) { + return new Requester(this, socket.release(), server_ips_, server_.port()); + } else { + std::vector server_ip; + server_ip.push_back(server_ips_[(num_request_sent_ % server_ips_.size())]); + return new Requester(this, socket.release(), server_ip, server_.port()); + } +} + +bool StunProber::SendNextRequest() { + if (!current_requester_ || current_requester_->Done()) { + current_requester_ = CreateRequester(); + requesters_.push_back(current_requester_); + } + if (!current_requester_) { + return false; + } + current_requester_->SendStunRequest(); + num_request_sent_++; + return true; +} + +void StunProber::MaybeScheduleStunRequests() { + DCHECK(thread_checker_.CalledOnValidThread()); + uint32 now = rtc::Time(); + + if (Done()) { + task_runner_->PostTask(rtc::Bind(&StunProber::End, this, SUCCESS, 0), + timeout_ms_); + return; + } + if (now >= next_request_time_ms_) { + if (!SendNextRequest()) { + End(GENERIC_FAILURE, 0); + return; + } + next_request_time_ms_ = now + interval_ms_; + } + task_runner_->PostTask( + rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), 1 /* ms */); +} + +bool StunProber::GetStats(StunProber::Stats* prob_stats) { + // No need to be on the same thread. + if (!prob_stats) { + return false; + } + + StunProber::Stats stats; + + int num_sent = 0, num_received = 0; + int rtt_sum = 0; + bool behind_nat_set = false; + int64 first_sent_time = 0; + int64 last_sent_time = 0; + + for (auto* requester : requesters_) { + for (auto request : requester->requests()) { + if (request->sent_time_ns <= 0) { + continue; + } + num_sent++; + + if (!first_sent_time) { + first_sent_time = request->sent_time_ns; + } + last_sent_time = request->sent_time_ns; + + if (request->received_time_ns < request->sent_time_ns) { + continue; + } + num_received++; + rtt_sum += request->rtt(); + if (!behind_nat_set) { + stats.behind_nat = request->behind_nat; + behind_nat_set = true; + } else if (stats.behind_nat != request->behind_nat) { + // Detect the inconsistency in NAT presence. + return false; + } + stats.srflx_ips.insert(request->srflx_ip); + } + } + + stats.host_ip = local_addr_.ToString(); + stats.num_request_sent = num_sent; + stats.num_response_received = num_received; + stats.target_request_interval_ns = interval_ms_ * 1000; + + if (num_sent) { + stats.success_percent = static_cast(100 * num_received / num_sent); + } + + if (num_sent > 1) { + stats.actual_request_interval_ns = + (100 * (last_sent_time - first_sent_time)) / (num_sent - 1); + } + + if (num_received) { + stats.average_rtt_ms = static_cast((rtt_sum / num_received)); + } + + *prob_stats = stats; + return true; +} + +void StunProber::End(StunProber::Status status, int result) { + DCHECK(thread_checker_.CalledOnValidThread()); + if (!finished_callback_.empty()) { + AsyncCallback callback = finished_callback_; + finished_callback_ = AsyncCallback(); + + // Callback at the last since the prober might be deleted in the callback. + callback(status); + } +} + +} // namespace stunprober diff --git a/webrtc/p2p/stunprober/stunprober.h b/webrtc/p2p/stunprober/stunprober.h new file mode 100644 index 0000000000..a932cb0cce --- /dev/null +++ b/webrtc/p2p/stunprober/stunprober.h @@ -0,0 +1,329 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_P2P_STUNPROBER_STUNPROBER_H_ +#define WEBRTC_P2P_STUNPROBER_STUNPROBER_H_ + +#include +#include +#include + +#include "webrtc/base/basictypes.h" +#include "webrtc/base/bytebuffer.h" +#include "webrtc/base/callback.h" +#include "webrtc/base/ipaddress.h" +#include "webrtc/base/scoped_ptr.h" +#include "webrtc/base/socketaddress.h" +#include "webrtc/base/thread_checker.h" +#include "webrtc/typedefs.h" + +namespace stunprober { + +static const int kMaxUdpBufferSize = 1200; + +typedef rtc::Callback1 AsyncCallback; + +class HostNameResolverInterface { + public: + HostNameResolverInterface() {} + virtual void Resolve(const rtc::SocketAddress& addr, + std::vector* addresses, + AsyncCallback callback) = 0; + + virtual ~HostNameResolverInterface() {} + + private: + DISALLOW_COPY_AND_ASSIGN(HostNameResolverInterface); +}; + +// Chrome has client and server socket. Client socket supports Connect but not +// Bind. Server is opposite. +class SocketInterface { + public: + enum { + IO_PENDING = -1, + FAILED = -2, + }; + SocketInterface() {} + virtual int GetLocalAddress(rtc::SocketAddress* local_address) = 0; + virtual void Close() = 0; + virtual ~SocketInterface() {} + + private: + DISALLOW_COPY_AND_ASSIGN(SocketInterface); +}; + +class ClientSocketInterface : public SocketInterface { + public: + ClientSocketInterface() {} + // Even though we have SendTo and RecvFrom, if Connect is not called first, + // getsockname will only return 0.0.0.0. + virtual int Connect(const rtc::SocketAddress& addr) = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(ClientSocketInterface); +}; + +class ServerSocketInterface : public SocketInterface { + public: + ServerSocketInterface() {} + virtual int Bind(const rtc::SocketAddress& addr) = 0; + + virtual int SendTo(const rtc::SocketAddress& addr, + char* buf, + size_t buf_len, + AsyncCallback callback) = 0; + + // If the returned value is positive, it means that buf has been + // sent. Otherwise, it should return IO_PENDING. Callback will be invoked + // after the data is successfully read into buf. + virtual int RecvFrom(char* buf, + size_t buf_len, + rtc::SocketAddress* addr, + AsyncCallback callback) = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(ServerSocketInterface); +}; + +class SocketFactoryInterface { + public: + SocketFactoryInterface() {} + virtual ClientSocketInterface* CreateClientSocket() = 0; + virtual ServerSocketInterface* CreateServerSocket( + size_t send_buffer_size, + size_t receive_buffer_size) = 0; + virtual ~SocketFactoryInterface() {} + + private: + DISALLOW_COPY_AND_ASSIGN(SocketFactoryInterface); +}; + +class TaskRunnerInterface { + public: + TaskRunnerInterface() {} + virtual void PostTask(rtc::Callback0, uint32_t delay_ms) = 0; + virtual ~TaskRunnerInterface() {} + + private: + DISALLOW_COPY_AND_ASSIGN(TaskRunnerInterface); +}; + +class StunProber { + public: + enum Status { // Used in UMA_HISTOGRAM_ENUMERATION. + SUCCESS, // Successfully received bytes from the server. + GENERIC_FAILURE, // Generic failure. + RESOLVE_FAILED, // Host resolution failed. + WRITE_FAILED, // Sending a message to the server failed. + READ_FAILED, // Reading the reply from the server failed. + }; + + struct Stats { + Stats() {} + int num_request_sent = 0; + int num_response_received = 0; + bool behind_nat = false; + int average_rtt_ms = -1; + int success_percent = 0; + int target_request_interval_ns = 0; + int actual_request_interval_ns = 0; + std::string host_ip; + + // If the srflx_ips has more than 1 element, the NAT is symmetric. + std::set srflx_ips; + + bool symmetric_nat() { return srflx_ips.size() > 1; } + }; + + // StunProber is not thread safe. It's task_runner's responsibility to ensure + // all calls happen sequentially. + StunProber(HostNameResolverInterface* host_name_resolver, + SocketFactoryInterface* socket_factory, + TaskRunnerInterface* task_runner); + virtual ~StunProber(); + + // Begin performing the probe test against the |server| with |port|. If + // |shared_socket_mode| is false, each request will be done with a new socket. + // Otherwise, a unique socket will be used for a single round of requests + // against all resolved IPs. No single socket will be used against a given IP + // more than once. The interval of requests will be as close to the requested + // inter-probe interval |stun_ta_interval_ms| as possible. After sending out + // the last scheduled request, the probe will wait |timeout_ms| for request + // responses and then call |finish_callback|. |requests_per_ip| indicates how + // many requests should be tried for each resolved IP address. In shared mode, + // (the number of sockets to be created) equals to |requests_per_ip|. In + // non-shared mode, (the number of sockets) equals to requests_per_ip * (the + // number of resolved IP addresses). + bool Start(const std::string& server, + uint16 port, + bool shared_socket_mode, + int stun_ta_interval_ms, + int requests_per_ip, + int timeout_ms, + const AsyncCallback finish_callback); + + // Method to retrieve the Stats once |finish_callback| is invoked. Returning + // false when the result is inconclusive, for example, whether it's behind a + // NAT or not. + bool GetStats(Stats* stats); + + private: + // A requester tracks the requests and responses from a single socket to many + // STUN servers + class Requester { + public: + // Each Request maps to a request and response. + struct Request { + // Actual time the STUN bind request was sent. + int64 sent_time_ns = 0; + // Time the response was received. + int64 received_time_ns = 0; + + // See whether the observed address returned matches the + // local address as in StunProber.local_addr_. + bool behind_nat = false; + + // Server reflexive address from STUN response for this given request. + std::string srflx_ip; + + rtc::IPAddress server_addr; + + int64 rtt() { return received_time_ns - sent_time_ns; } + void ProcessResponse(rtc::ByteBuffer* message, + int buf_len, + const rtc::IPAddress& local_addr); + }; + + // StunProber provides |server_ips| for Requester to probe. For shared + // socket mode, it'll be all the resolved IP addresses. For non-shared mode, + // it'll just be a single address. + Requester(StunProber* prober, + ServerSocketInterface* socket, + const std::vector server_ips, + uint16 port); + virtual ~Requester(); + + // There is no callback for SendStunRequest as the underneath socket send is + // expected to be completed immediately. Otherwise, it'll skip this request + // and move to the next one. + void SendStunRequest(); + + void ReadStunResponse(); + + // |result| is the positive return value from RecvFrom when data is + // available. + void OnStunResponseReceived(int result); + + const std::vector& requests() { return requests_; } + + // Whether this Requester has completed all requests. + bool Done() { + return static_cast(num_request_sent_) == server_ips_.size(); + } + + private: + Request* GetRequestByAddress(const rtc::IPAddress& ip); + + StunProber* prober_; + + // The socket for this session. + rtc::scoped_ptr socket_; + + // Temporary SocketAddress and buffer for RecvFrom. + rtc::SocketAddress addr_; + rtc::scoped_ptr response_packet_; + + std::vector requests_; + std::vector server_ips_; + int16 num_request_sent_ = 0; + int16 num_response_received_ = 0; + uint16 port_ = 0; + + rtc::ThreadChecker& thread_checker_; + + DISALLOW_COPY_AND_ASSIGN(Requester); + }; + + private: + void OnServerResolved(int result); + + bool Done() { + return num_request_sent_ >= requests_per_ip_ * server_ips_.size(); + } + + bool SendNextRequest(); + + // Will be invoked in 1ms intervals and schedule the next request from the + // |current_requester_| if the time has passed for another request. + void MaybeScheduleStunRequests(); + + // End the probe with the given |status|. Invokes |fininsh_callback|, which + // may destroy the class. + void End(StunProber::Status status, int result); + + // Create a socket, connect to the first resolved server, and return the + // result of getsockname(). All Requesters will bind to this name. We do this + // because if a socket is not bound nor connected, getsockname will return + // 0.0.0.0. We can't connect to a single STUN server IP either as that will + // fail subsequent requests in shared mode. + int GetLocalAddress(rtc::IPAddress* addr); + + Requester* CreateRequester(); + + Requester* current_requester_ = nullptr; + + // The time when the next request should go out. + uint64 next_request_time_ms_ = 0; + + // Total requests sent so far. + uint32 num_request_sent_ = 0; + + bool shared_socket_mode_ = false; + + // How many requests should be done against each resolved IP. + uint32 requests_per_ip_ = 0; + + // Milliseconds to pause between each STUN request. + int interval_ms_; + + // Timeout period after the last request is sent. + int timeout_ms_; + + // STUN server name to be resolved. + rtc::SocketAddress server_; + + // The local address that each probing socket will be bound to. + rtc::IPAddress local_addr_; + + // Owned pointers. + rtc::scoped_ptr socket_factory_; + rtc::scoped_ptr resolver_; + rtc::scoped_ptr task_runner_; + + // Addresses filled out by HostNameResolver after host resolution is + // completed. + std::vector server_ips_; + + // Caller-supplied callback executed when testing is completed, called by + // End(). + AsyncCallback finished_callback_; + + // The set of STUN probe sockets and their state. + std::vector requesters_; + + rtc::ThreadChecker thread_checker_; + + DISALLOW_COPY_AND_ASSIGN(StunProber); +}; + +} // namespace stunprober + +#endif // WEBRTC_P2P_STUNPROBER_STUNPROBER_H_ diff --git a/webrtc/p2p/stunprober/stunprober_dependencies.h b/webrtc/p2p/stunprober/stunprober_dependencies.h new file mode 100644 index 0000000000..813a2f931c --- /dev/null +++ b/webrtc/p2p/stunprober/stunprober_dependencies.h @@ -0,0 +1,182 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef WEBRTC_P2P_STUNPROBER_STUNPROBER_DEPENDENCIES_H_ +#define WEBRTC_P2P_STUNPROBER_STUNPROBER_DEPENDENCIES_H_ + +#include "webrtc/base/checks.h" +#include "webrtc/base/helpers.h" +#include "webrtc/base/logging.h" +#include "webrtc/base/scoped_ptr.h" +#include "webrtc/base/thread.h" +#include "webrtc/base/timeutils.h" +#include "webrtc/p2p/stunprober/stunprober.h" + +// Common classes used by both the command line driver and the unit tests. +namespace stunprober { + +class Socket : public ClientSocketInterface, + public ServerSocketInterface, + public sigslot::has_slots<> { + public: + explicit Socket(rtc::AsyncSocket* socket) : socket_(socket) { + socket_->SignalReadEvent.connect(this, &Socket::OnReadEvent); + socket_->SignalWriteEvent.connect(this, &Socket::OnWriteEvent); + } + + int Connect(const rtc::SocketAddress& addr) override { + return MapResult(socket_->Connect(addr)); + } + + int Bind(const rtc::SocketAddress& addr) override { + return MapResult(socket_->Bind(addr)); + } + + int SendTo(const rtc::SocketAddress& addr, + char* buf, + size_t buf_len, + AsyncCallback callback) override { + write_ = NetworkWrite(addr, buf, buf_len, callback); + return MapResult(socket_->SendTo(buf, buf_len, addr)); + } + + int RecvFrom(char* buf, + size_t buf_len, + rtc::SocketAddress* addr, + AsyncCallback callback) override { + read_ = NetworkRead(buf, buf_len, addr, callback); + return MapResult(socket_->RecvFrom(buf, buf_len, addr)); + } + + int GetLocalAddress(rtc::SocketAddress* local_address) override { + *local_address = socket_->GetLocalAddress(); + return 0; + } + + void Close() override { socket_->Close(); } + + virtual ~Socket() {} + + protected: + int MapResult(int rv) { + if (rv >= 0) { + return rv; + } + int err = socket_->GetError(); + if (err == EWOULDBLOCK || err == EAGAIN) { + return IO_PENDING; + } else { + return FAILED; + } + } + + void OnReadEvent(rtc::AsyncSocket* socket) { + DCHECK(socket_ == socket); + NetworkRead read = read_; + read_ = NetworkRead(); + if (!read.callback.empty()) { + read.callback(socket_->RecvFrom(read.buf, read.buf_len, read.addr)); + } + } + + void OnWriteEvent(rtc::AsyncSocket* socket) { + DCHECK(socket_ == socket); + NetworkWrite write = write_; + write_ = NetworkWrite(); + if (!write.callback.empty()) { + write.callback(socket_->SendTo(write.buf, write.buf_len, write.addr)); + } + } + + struct NetworkWrite { + NetworkWrite() : buf(nullptr), buf_len(0) {} + NetworkWrite(const rtc::SocketAddress& addr, + char* buf, + size_t buf_len, + AsyncCallback callback) + : buf(buf), buf_len(buf_len), addr(addr), callback(callback) {} + char* buf; + size_t buf_len; + rtc::SocketAddress addr; + AsyncCallback callback; + }; + + NetworkWrite write_; + + struct NetworkRead { + NetworkRead() : buf(nullptr), buf_len(0) {} + NetworkRead(char* buf, + size_t buf_len, + rtc::SocketAddress* addr, + AsyncCallback callback) + : buf(buf), buf_len(buf_len), addr(addr), callback(callback) {} + + char* buf; + size_t buf_len; + rtc::SocketAddress* addr; + AsyncCallback callback; + }; + + NetworkRead read_; + + rtc::scoped_ptr socket_; +}; + +class SocketFactory : public SocketFactoryInterface { + public: + ClientSocketInterface* CreateClientSocket() override { + return new Socket( + rtc::Thread::Current()->socketserver()->CreateAsyncSocket(SOCK_DGRAM)); + } + ServerSocketInterface* CreateServerSocket(size_t send_buffer_size, + size_t recv_buffer_size) override { + rtc::scoped_ptr socket( + rtc::Thread::Current()->socketserver()->CreateAsyncSocket(SOCK_DGRAM)); + + if (socket) { + socket->SetOption(rtc::AsyncSocket::OPT_SNDBUF, + static_cast(send_buffer_size)); + socket->SetOption(rtc::AsyncSocket::OPT_RCVBUF, + static_cast(recv_buffer_size)); + return new Socket(socket.release()); + } else { + return nullptr; + } + } +}; + +class TaskRunner : public TaskRunnerInterface, public rtc::MessageHandler { + protected: + class CallbackMessageData : public rtc::MessageData { + public: + explicit CallbackMessageData(rtc::Callback0 callback) + : callback_(callback) {} + rtc::Callback0 callback_; + }; + + public: + void PostTask(rtc::Callback0 callback, uint32_t delay_ms) { + if (delay_ms == 0) { + rtc::Thread::Current()->Post(this, 0, new CallbackMessageData(callback)); + } else { + rtc::Thread::Current()->PostDelayed(delay_ms, this, 0, + new CallbackMessageData(callback)); + } + } + + void OnMessage(rtc::Message* msg) { + rtc::scoped_ptr callback( + reinterpret_cast(msg->pdata)); + callback->callback_(); + } +}; + +} // namespace stunprober +#endif // WEBRTC_P2P_STUNPROBER_STUNPROBER_DEPENDENCIES_H_ diff --git a/webrtc/p2p/stunprober/stunprober_unittest.cc b/webrtc/p2p/stunprober/stunprober_unittest.cc new file mode 100644 index 0000000000..5a7058584c --- /dev/null +++ b/webrtc/p2p/stunprober/stunprober_unittest.cc @@ -0,0 +1,208 @@ +/* + * Copyright 2015 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "webrtc/base/basictypes.h" +#include "webrtc/base/bind.h" +#include "webrtc/base/checks.h" +#include "webrtc/base/gunit.h" +#include "webrtc/base/physicalsocketserver.h" +#include "webrtc/base/scoped_ptr.h" +#include "webrtc/base/ssladapter.h" +#include "webrtc/base/virtualsocketserver.h" + +#include "webrtc/p2p/base/teststunserver.h" +#include "webrtc/p2p/stunprober/stunprober.h" +#include "webrtc/p2p/stunprober/stunprober_dependencies.h" + +using stunprober::HostNameResolverInterface; +using stunprober::TaskRunner; +using stunprober::SocketFactory; +using stunprober::StunProber; +using stunprober::AsyncCallback; +using stunprober::ClientSocketInterface; +using stunprober::ServerSocketInterface; +using stunprober::SocketFactory; +using stunprober::TaskRunner; + +namespace stunprober { + +namespace { + +const rtc::SocketAddress kLocalAddr("192.168.0.1", 0); +const rtc::SocketAddress kStunAddr1("1.1.1.1", 3478); +const rtc::SocketAddress kStunAddr2("1.1.1.2", 3478); +const rtc::SocketAddress kStunMappedAddr1("77.77.77.77", 0); +const rtc::SocketAddress kStunMappedAddr2("88.77.77.77", 0); + +class TestSocketServer : public rtc::VirtualSocketServer { + public: + using rtc::VirtualSocketServer::CreateAsyncSocket; + explicit TestSocketServer(SocketServer* ss) : rtc::VirtualSocketServer(ss) {} + void SetLocalAddress(const rtc::SocketAddress& addr) { addr_ = addr; } + + // CreateAsyncSocket is used by StunProber to create both client and server + // sockets. The first socket is used to retrieve local address which will be + // used later for Bind(). + rtc::AsyncSocket* CreateAsyncSocket(int type) override { + rtc::VirtualSocket* socket = static_cast( + rtc::VirtualSocketServer::CreateAsyncSocket(type)); + if (!local_addr_set_) { + // Only the first socket can SetLocalAddress. For others, Bind will fail + // if local address is set. + socket->SetLocalAddress(addr_); + local_addr_set_ = true; + } else { + sockets_.push_back(socket); + } + return socket; + } + + size_t num_socket() { return sockets_.size(); } + + private: + bool local_addr_set_ = false; + std::vector sockets_; + rtc::SocketAddress addr_; +}; + +class FakeHostNameResolver : public HostNameResolverInterface { + public: + FakeHostNameResolver() {} + void set_result(int ret) { ret_ = ret; } + void set_addresses(const std::vector& addresses) { + server_ips_ = addresses; + } + const std::vector& get_addresses() { return server_ips_; } + void add_address(const rtc::IPAddress& ip) { server_ips_.push_back(ip); } + void Resolve(const rtc::SocketAddress& addr, + std::vector* addresses, + stunprober::AsyncCallback callback) override { + *addresses = server_ips_; + callback(ret_); + } + + private: + int ret_ = 0; + std::vector server_ips_; +}; + +} // namespace + +class StunProberTest : public testing::Test { + public: + StunProberTest() + : main_(rtc::Thread::Current()), + pss_(new rtc::PhysicalSocketServer), + ss_(new TestSocketServer(pss_.get())), + ss_scope_(ss_.get()), + result_(StunProber::SUCCESS), + stun_server_1_(cricket::TestStunServer::Create(rtc::Thread::Current(), + kStunAddr1)), + stun_server_2_(cricket::TestStunServer::Create(rtc::Thread::Current(), + kStunAddr2)) { + stun_server_1_->set_fake_stun_addr(kStunMappedAddr1); + stun_server_2_->set_fake_stun_addr(kStunMappedAddr2); + rtc::InitializeSSL(); + } + + void set_expected_result(int result) { result_ = result; } + + void StartProbing(HostNameResolverInterface* resolver, + SocketFactoryInterface* socket_factory, + const std::string& server, + uint16 port, + bool shared_socket, + uint16 interval, + uint16 pings_per_ip) { + prober.reset(new StunProber(resolver, socket_factory, new TaskRunner())); + prober->Start(server, port, shared_socket, interval, pings_per_ip, + 100 /* timeout_ms */, + [this](int result) { this->StopCallback(result); }); + } + + void RunProber(bool shared_mode) { + const int pings_per_ip = 3; + const uint16 port = kStunAddr1.port(); + rtc::SocketAddress addr("stun.l.google.com", port); + + // Set up the resolver for 2 stun server addresses. + rtc::scoped_ptr resolver(new FakeHostNameResolver()); + resolver->add_address(kStunAddr1.ipaddr()); + resolver->add_address(kStunAddr2.ipaddr()); + + rtc::scoped_ptr socket_factory(new SocketFactory()); + + // Set local address in socketserver so getsockname will return kLocalAddr + // instead of 0.0.0.0 for the first socket. + ss_->SetLocalAddress(kLocalAddr); + + // Set up the expected results for verification. + std::set srflx_addresses; + srflx_addresses.insert(kStunMappedAddr1.ipaddr().ToString()); + srflx_addresses.insert(kStunMappedAddr2.ipaddr().ToString()); + const uint32 total_pings = + static_cast(pings_per_ip * resolver->get_addresses().size()); + size_t total_sockets = shared_mode ? pings_per_ip : total_pings; + + StartProbing(resolver.release(), socket_factory.release(), addr.hostname(), + addr.port(), shared_mode, 3, pings_per_ip); + + WAIT(stopped_, 1000); + + StunProber::Stats stats; + EXPECT_EQ(ss_->num_socket(), total_sockets); + EXPECT_TRUE(prober->GetStats(&stats)); + EXPECT_EQ(stats.success_percent, 100); + EXPECT_TRUE(stats.behind_nat); + EXPECT_EQ(stats.host_ip, kLocalAddr.ipaddr().ToString()); + EXPECT_EQ(stats.srflx_ips, srflx_addresses); + EXPECT_EQ(static_cast(stats.num_request_sent), total_pings); + EXPECT_EQ(static_cast(stats.num_response_received), total_pings); + } + + private: + void StopCallback(int result) { + EXPECT_EQ(result, result_); + stopped_ = true; + } + + rtc::Thread* main_; + rtc::scoped_ptr pss_; + rtc::scoped_ptr ss_; + rtc::SocketServerScope ss_scope_; + rtc::scoped_ptr prober; + int result_ = 0; + bool stopped_ = false; + rtc::scoped_ptr stun_server_1_; + rtc::scoped_ptr stun_server_2_; +}; + +TEST_F(StunProberTest, DNSFailure) { + rtc::SocketAddress addr("stun.l.google.com", 19302); + rtc::scoped_ptr resolver(new FakeHostNameResolver()); + rtc::scoped_ptr socket_factory(new SocketFactory()); + + set_expected_result(StunProber::RESOLVE_FAILED); + + // None 0 value is treated as failure. + resolver->set_result(1); + StartProbing(resolver.release(), socket_factory.release(), addr.hostname(), + addr.port(), false, 10, 30); +} + +TEST_F(StunProberTest, NonSharedMode) { + RunProber(false); +} + +TEST_F(StunProberTest, SharedMode) { + RunProber(true); +} + +} // namespace stunprober diff --git a/webrtc/webrtc_tests.gypi b/webrtc/webrtc_tests.gypi index 08f3ae8ff2..cdb1ed6a96 100644 --- a/webrtc/webrtc_tests.gypi +++ b/webrtc/webrtc_tests.gypi @@ -17,12 +17,14 @@ 'libjingle/xmllite/xmllite.gyp:rtc_xmllite', 'libjingle/xmpp/xmpp.gyp:rtc_xmpp', 'p2p/p2p.gyp:rtc_p2p', + 'p2p/p2p.gyp:libstunprober', 'rtc_p2p_unittest', 'rtc_sound_tests', 'rtc_xmllite_unittest', 'rtc_xmpp_unittest', 'sound/sound.gyp:rtc_sound', '<(DEPTH)/testing/gtest.gyp:gtest', + '<(DEPTH)/testing/gmock.gyp:gmock', ], 'conditions': [ ['OS=="android"', {