diff --git a/BUILD.gn b/BUILD.gn index d9c9d39686..8243281217 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -487,6 +487,7 @@ if (rtc_include_tests) { "rtc_base:sigslot_unittest", "rtc_base:weak_ptr_unittests", "rtc_base/experiments:experiments_unittests", + "test/scenario/network:network_emulation_unittests", ] if (rtc_enable_protobuf) { diff --git a/test/DEPS b/test/DEPS index f2bded5f3c..c7ae24f8ad 100644 --- a/test/DEPS +++ b/test/DEPS @@ -49,4 +49,9 @@ specific_include_rules = { "+pc", "+p2p", ], + ".*network_emulation_pc_unittest\.cc": [ + "+pc/peer_connection_wrapper.h", + "+pc/test/mock_peer_connection_observers.h", + "+p2p/client/basic_port_allocator.h", + ], } diff --git a/test/scenario/network/BUILD.gn b/test/scenario/network/BUILD.gn index 7df99ec9cf..c73a88967f 100644 --- a/test/scenario/network/BUILD.gn +++ b/test/scenario/network/BUILD.gn @@ -9,15 +9,91 @@ import("../../../webrtc.gni") rtc_source_set("emulated_network") { + testonly = true sources = [ + "fake_network_socket.cc", + "fake_network_socket.h", + "fake_network_socket_server.cc", + "fake_network_socket_server.h", "network_emulation.cc", "network_emulation.h", + "network_emulation_manager.cc", + "network_emulation_manager.h", ] deps = [ "../../../api:simulated_network_api", + "../../../api/units:data_rate", + "../../../api/units:data_size", + "../../../api/units:time_delta", "../../../api/units:timestamp", "../../../rtc_base:rtc_base", + "../../../rtc_base:rtc_task_queue_api", + "../../../rtc_base:safe_minmax", + "../../../rtc_base/task_utils:repeating_task", + "../../../rtc_base/third_party/sigslot:sigslot", + "../../../system_wrappers:system_wrappers", "//third_party/abseil-cpp/absl/memory:memory", "//third_party/abseil-cpp/absl/types:optional", ] } + +rtc_source_set("network_emulation_unittest") { + testonly = true + sources = [ + "network_emulation_unittest.cc", + ] + deps = [ + ":emulated_network", + "../../../api:simulated_network_api", + "../../../call:simulated_network", + "../../../rtc_base:logging", + "../../../rtc_base:rtc_event", + "../../../test:test_support", + "//third_party/abseil-cpp/absl/memory:memory", + ] +} + +rtc_source_set("network_emulation_pc_unittest") { + testonly = true + sources = [ + "network_emulation_pc_unittest.cc", + ] + deps = [ + ":emulated_network", + "../../../api:callfactory_api", + "../../../api:libjingle_peerconnection_api", + "../../../api:scoped_refptr", + "../../../api:simulated_network_api", + "../../../api/audio_codecs:builtin_audio_decoder_factory", + "../../../api/audio_codecs:builtin_audio_encoder_factory", + "../../../api/video_codecs:builtin_video_decoder_factory", + "../../../api/video_codecs:builtin_video_encoder_factory", + "../../../call:simulated_network", + "../../../logging:rtc_event_log_impl_base", + "../../../media:rtc_audio_video", + "../../../modules/audio_device:audio_device_impl", + "../../../p2p:rtc_p2p", + "../../../pc:pc_test_utils", + "../../../pc:peerconnection_wrapper", + "../../../rtc_base:gunit_helpers", + "../../../rtc_base:logging", + "../../../rtc_base:rtc_base", + "../../../rtc_base:rtc_base_tests_utils", + "../../../rtc_base:rtc_event", + "../../../test:test_support", + "//third_party/abseil-cpp/absl/memory:memory", + ] + + if (!build_with_chromium && is_clang) { + # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). + suppressed_configs += [ "//build/config/clang:find_bad_constructs" ] + } +} + +rtc_source_set("network_emulation_unittests") { + testonly = true + deps = [ + ":network_emulation_pc_unittest", + ":network_emulation_unittest", + ] +} diff --git a/test/scenario/network/fake_network_socket.cc b/test/scenario/network/fake_network_socket.cc new file mode 100644 index 0000000000..5d4ad3f514 --- /dev/null +++ b/test/scenario/network/fake_network_socket.cc @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "test/scenario/network/fake_network_socket.h" + +#include +#include +#include +#include + +#include "rtc_base/logging.h" +#include "rtc_base/thread.h" + +namespace webrtc { +namespace test { +namespace { + +std::string ToString(const rtc::SocketAddress& addr) { + return addr.HostAsURIString() + ":" + std::to_string(addr.port()); +} + +} // namespace + +FakeNetworkSocket::FakeNetworkSocket(SocketManager* socket_manager) + : socket_manager_(socket_manager), + state_(CS_CLOSED), + error_(0), + pending_read_events_count_(0) {} +FakeNetworkSocket::~FakeNetworkSocket() { + Close(); + socket_manager_->Unregister(this); +} + +void FakeNetworkSocket::OnPacketReceived(EmulatedIpPacket packet) { + { + rtc::CritScope crit(&lock_); + packet_queue_.push_back(std::move(packet)); + pending_read_events_count_++; + } + socket_manager_->WakeUp(); +} + +bool FakeNetworkSocket::ProcessIo() { + { + rtc::CritScope crit(&lock_); + if (pending_read_events_count_ == 0) { + return false; + } + pending_read_events_count_--; + RTC_DCHECK_GE(pending_read_events_count_, 0); + } + SignalReadEvent(this); + return true; +} + +rtc::SocketAddress FakeNetworkSocket::GetLocalAddress() const { + return local_addr_; +} + +rtc::SocketAddress FakeNetworkSocket::GetRemoteAddress() const { + return remote_addr_; +} + +int FakeNetworkSocket::Bind(const rtc::SocketAddress& addr) { + RTC_CHECK(local_addr_.IsNil()) + << "Socket already bound to address: " << ToString(local_addr_); + local_addr_ = addr; + endpoint_ = socket_manager_->GetEndpointNode(local_addr_.ipaddr()); + if (!endpoint_) { + local_addr_.Clear(); + RTC_LOG(INFO) << "No endpoint for address: " << ToString(addr); + error_ = EADDRNOTAVAIL; + return 2; + } + absl::optional port = + endpoint_->BindReceiver(local_addr_.port(), this); + if (!port) { + local_addr_.Clear(); + RTC_LOG(INFO) << "Cannot bind to in-use address: " << ToString(addr); + error_ = EADDRINUSE; + return 1; + } + local_addr_.SetPort(port.value()); + return 0; +} + +int FakeNetworkSocket::Connect(const rtc::SocketAddress& addr) { + RTC_CHECK(remote_addr_.IsNil()) + << "Socket already connected to address: " << ToString(remote_addr_); + RTC_CHECK(!local_addr_.IsNil()) + << "Socket have to be bind to some local address"; + remote_addr_ = addr; + state_ = CS_CONNECTED; + return 0; +} + +int FakeNetworkSocket::Send(const void* pv, size_t cb) { + RTC_CHECK(state_ == CS_CONNECTED) << "Socket cannot send: not connected"; + return SendTo(pv, cb, remote_addr_); +} + +int FakeNetworkSocket::SendTo(const void* pv, + size_t cb, + const rtc::SocketAddress& addr) { + RTC_CHECK(!local_addr_.IsNil()) + << "Socket have to be bind to some local address"; + rtc::CopyOnWriteBuffer packet(static_cast(pv), cb); + endpoint_->SendPacket(local_addr_, addr, packet); + return cb; +} + +int FakeNetworkSocket::Recv(void* pv, size_t cb, int64_t* timestamp) { + rtc::SocketAddress paddr; + return RecvFrom(pv, cb, &paddr, timestamp); +} + +// Reads 1 packet from internal queue. Reads up to |cb| bytes into |pv| +// and returns the length of received packet. +int FakeNetworkSocket::RecvFrom(void* pv, + size_t cb, + rtc::SocketAddress* paddr, + int64_t* timestamp) { + if (timestamp) { + *timestamp = -1; + } + absl::optional packetOpt = PopFrontPacket(); + + if (!packetOpt) { + error_ = EAGAIN; + return -1; + } + + EmulatedIpPacket packet = std::move(packetOpt.value()); + *paddr = packet.from; + size_t data_read = std::min(cb, packet.size()); + memcpy(pv, packet.cdata(), data_read); + *timestamp = packet.arrival_time.us(); + + // According to RECV(2) Linux Man page + // real socket will discard data, that won't fit into provided buffer, + // but we won't to skip such error, so we will assert here. + RTC_CHECK(data_read == packet.size()) + << "Too small buffer is provided for socket read. " + << "Received data size: " << packet.size() + << "; Provided buffer size: " << cb; + + // According to RECV(2) Linux Man page + // real socket will return message length, not data read. In our case it is + // actually the same value. + return static_cast(packet.size()); +} + +int FakeNetworkSocket::Listen(int backlog) { + RTC_CHECK(false) << "Listen() isn't valid for SOCK_DGRAM"; +} + +rtc::AsyncSocket* FakeNetworkSocket::Accept(rtc::SocketAddress* /*paddr*/) { + RTC_CHECK(false) << "Accept() isn't valid for SOCK_DGRAM"; +} + +int FakeNetworkSocket::Close() { + state_ = CS_CLOSED; + if (!local_addr_.IsNil()) { + endpoint_->UnbindReceiver(local_addr_.port()); + } + local_addr_.Clear(); + remote_addr_.Clear(); + return 0; +} + +int FakeNetworkSocket::GetError() const { + RTC_CHECK(error_ == 0); + return error_; +} + +void FakeNetworkSocket::SetError(int error) { + RTC_CHECK(error == 0); + error_ = error; +} + +rtc::AsyncSocket::ConnState FakeNetworkSocket::GetState() const { + return state_; +} + +int FakeNetworkSocket::GetOption(Option opt, int* value) { + auto it = options_map_.find(opt); + if (it == options_map_.end()) { + return -1; + } + *value = it->second; + return 0; +} + +int FakeNetworkSocket::SetOption(Option opt, int value) { + options_map_[opt] = value; + return 0; +} + +absl::optional FakeNetworkSocket::PopFrontPacket() { + rtc::CritScope crit(&lock_); + if (packet_queue_.empty()) { + return absl::nullopt; + } + + absl::optional packet = + absl::make_optional(std::move(packet_queue_.front())); + packet_queue_.pop_front(); + return packet; +} + +} // namespace test +} // namespace webrtc diff --git a/test/scenario/network/fake_network_socket.h b/test/scenario/network/fake_network_socket.h new file mode 100644 index 0000000000..fcd9d27d66 --- /dev/null +++ b/test/scenario/network/fake_network_socket.h @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_H_ +#define TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_H_ + +#include +#include +#include + +#include "rtc_base/async_socket.h" +#include "rtc_base/copy_on_write_buffer.h" +#include "rtc_base/critical_section.h" +#include "rtc_base/socket_address.h" +#include "test/scenario/network/network_emulation.h" + +namespace webrtc { +namespace test { + +class SocketIoProcessor { + public: + virtual ~SocketIoProcessor() = default; + + // Process single IO operation. + virtual bool ProcessIo() = 0; +}; + +class SocketManager { + public: + virtual ~SocketManager() = default; + + virtual void WakeUp() = 0; + virtual void Unregister(SocketIoProcessor* io_processor) = 0; + // Provides endpoints by IP address. + virtual EndpointNode* GetEndpointNode(const rtc::IPAddress& ip) = 0; +}; + +// Represents a socket, which will operate with emulated network. +class FakeNetworkSocket : public rtc::AsyncSocket, + public EmulatedNetworkReceiverInterface, + public SocketIoProcessor { + public: + explicit FakeNetworkSocket(SocketManager* scoket_manager); + ~FakeNetworkSocket() override; + + // Will be invoked by EndpointNode to deliver packets into this socket. + void OnPacketReceived(EmulatedIpPacket packet) override; + // Will fire read event for incoming packets. + bool ProcessIo() override; + + // rtc::Socket methods: + rtc::SocketAddress GetLocalAddress() const override; + rtc::SocketAddress GetRemoteAddress() const override; + int Bind(const rtc::SocketAddress& addr) override; + int Connect(const rtc::SocketAddress& addr) override; + int Close() override; + int Send(const void* pv, size_t cb) override; + int SendTo(const void* pv, + size_t cb, + const rtc::SocketAddress& addr) override; + int Recv(void* pv, size_t cb, int64_t* timestamp) override; + int RecvFrom(void* pv, + size_t cb, + rtc::SocketAddress* paddr, + int64_t* timestamp) override; + int Listen(int backlog) override; + rtc::AsyncSocket* Accept(rtc::SocketAddress* paddr) override; + int GetError() const override; + void SetError(int error) override; + ConnState GetState() const override; + int GetOption(Option opt, int* value) override; + int SetOption(Option opt, int value) override; + + private: + absl::optional PopFrontPacket(); + + SocketManager* const socket_manager_; + EndpointNode* endpoint_; + + rtc::SocketAddress local_addr_; + rtc::SocketAddress remote_addr_; + ConnState state_; + int error_; + std::map options_map_; + + rtc::CriticalSection lock_; + // Count of packets in the queue for which we didn't fire read event. + // |pending_read_events_count_| can be different from |packet_queue_.size()| + // because read events will be fired by one thread and packets in the queue + // can be processed by another thread. + int pending_read_events_count_; + std::deque packet_queue_ RTC_GUARDED_BY(lock_); +}; + +} // namespace test +} // namespace webrtc + +#endif // TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_H_ diff --git a/test/scenario/network/fake_network_socket_server.cc b/test/scenario/network/fake_network_socket_server.cc new file mode 100644 index 0000000000..b7d1fc4984 --- /dev/null +++ b/test/scenario/network/fake_network_socket_server.cc @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "test/scenario/network/fake_network_socket_server.h" + +#include + +namespace webrtc { +namespace test { + +FakeNetworkSocketServer::FakeNetworkSocketServer( + Clock* clock, + std::vector endpoints) + : clock_(clock), + endpoints_(std::move(endpoints)), + wakeup_(/*manual_reset=*/false, /*initially_signaled=*/false) {} +FakeNetworkSocketServer::~FakeNetworkSocketServer() = default; + +void FakeNetworkSocketServer::OnMessageQueueDestroyed() { + msg_queue_ = nullptr; +} + +EndpointNode* FakeNetworkSocketServer::GetEndpointNode( + const rtc::IPAddress& ip) { + for (auto* endpoint : endpoints_) { + rtc::IPAddress peerLocalAddress = endpoint->GetPeerLocalAddress(); + if (peerLocalAddress == ip) { + return endpoint; + } + } + RTC_CHECK(false) << "No network found for address" << ip.ToString(); +} + +void FakeNetworkSocketServer::Unregister(SocketIoProcessor* io_processor) { + rtc::CritScope crit(&lock_); + io_processors_.erase(io_processor); +} + +rtc::Socket* FakeNetworkSocketServer::CreateSocket(int /*family*/, + int /*type*/) { + RTC_CHECK(false) << "Only async sockets are supported"; +} + +rtc::AsyncSocket* FakeNetworkSocketServer::CreateAsyncSocket(int family, + int type) { + RTC_DCHECK(family == AF_INET || family == AF_INET6); + // We support only UDP sockets for now. + RTC_DCHECK(type == SOCK_DGRAM) << "Only UDP sockets are supported"; + FakeNetworkSocket* out = new FakeNetworkSocket(this); + { + rtc::CritScope crit(&lock_); + io_processors_.insert(out); + } + return out; +} + +void FakeNetworkSocketServer::SetMessageQueue(rtc::MessageQueue* msg_queue) { + msg_queue_ = msg_queue; + if (msg_queue_) { + msg_queue_->SignalQueueDestroyed.connect( + this, &FakeNetworkSocketServer::OnMessageQueueDestroyed); + } +} + +// Always returns true (if return false, it won't be invoked again...) +bool FakeNetworkSocketServer::Wait(int cms, bool process_io) { + RTC_DCHECK(msg_queue_ == rtc::Thread::Current()); + if (!process_io) { + wakeup_.Wait(cms); + return true; + } + wakeup_.Wait(cms); + + rtc::CritScope crit(&lock_); + for (auto* io_processor : io_processors_) { + while (io_processor->ProcessIo()) { + } + } + return true; +} + +void FakeNetworkSocketServer::WakeUp() { + wakeup_.Set(); +} + +Timestamp FakeNetworkSocketServer::Now() const { + return Timestamp::us(clock_->TimeInMicroseconds()); +} + +} // namespace test +} // namespace webrtc diff --git a/test/scenario/network/fake_network_socket_server.h b/test/scenario/network/fake_network_socket_server.h new file mode 100644 index 0000000000..f0f94969c8 --- /dev/null +++ b/test/scenario/network/fake_network_socket_server.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_SERVER_H_ +#define TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_SERVER_H_ + +#include +#include + +#include "api/units/timestamp.h" +#include "rtc_base/async_socket.h" +#include "rtc_base/critical_section.h" +#include "rtc_base/event.h" +#include "rtc_base/message_queue.h" +#include "rtc_base/socket.h" +#include "rtc_base/socket_address.h" +#include "rtc_base/socket_server.h" +#include "rtc_base/third_party/sigslot/sigslot.h" +#include "system_wrappers/include/clock.h" +#include "test/scenario/network/fake_network_socket.h" + +namespace webrtc { +namespace test { + +// FakeNetworkSocketServer must outlive any sockets it creates. +class FakeNetworkSocketServer : public rtc::SocketServer, + public sigslot::has_slots<>, + public SocketManager { + public: + FakeNetworkSocketServer(Clock* clock, std::vector endpoints); + ~FakeNetworkSocketServer() override; + + EndpointNode* GetEndpointNode(const rtc::IPAddress& ip) override; + void Unregister(SocketIoProcessor* io_processor) override; + void OnMessageQueueDestroyed(); + + // rtc::SocketFactory methods: + rtc::Socket* CreateSocket(int family, int type) override; + rtc::AsyncSocket* CreateAsyncSocket(int family, int type) override; + + // rtc::SocketServer methods: + // Called by the network thread when this server is installed, kicking off the + // message handler loop. + void SetMessageQueue(rtc::MessageQueue* msg_queue) override; + bool Wait(int cms, bool process_io) override; + void WakeUp() override; + + private: + Timestamp Now() const; + + Clock* const clock_; + const std::vector endpoints_; + rtc::Event wakeup_; + rtc::MessageQueue* msg_queue_; + + rtc::CriticalSection lock_; + std::set io_processors_ RTC_GUARDED_BY(lock_); +}; + +} // namespace test +} // namespace webrtc + +#endif // TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_SERVER_H_ diff --git a/test/scenario/network/network_emulation.cc b/test/scenario/network/network_emulation.cc index 9ff24e76ea..9fd49254f8 100644 --- a/test/scenario/network/network_emulation.cc +++ b/test/scenario/network/network_emulation.cc @@ -10,9 +10,11 @@ #include "test/scenario/network/network_emulation.h" +#include #include #include "absl/memory/memory.h" +#include "rtc_base/bind.h" #include "rtc_base/logging.h" namespace webrtc { @@ -28,10 +30,9 @@ EmulatedIpPacket::EmulatedIpPacket(const rtc::SocketAddress& from, dest_endpoint_id(dest_endpoint_id), data(data), arrival_time(arrival_time) {} - EmulatedIpPacket::~EmulatedIpPacket() = default; - EmulatedIpPacket::EmulatedIpPacket(EmulatedIpPacket&&) = default; +EmulatedIpPacket& EmulatedIpPacket::operator=(EmulatedIpPacket&&) = default; void EmulatedNetworkNode::CreateRoute( uint64_t receiver_id, @@ -57,8 +58,9 @@ EmulatedNetworkNode::~EmulatedNetworkNode() = default; void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) { rtc::CritScope crit(&lock_); - if (routing_.find(packet.dest_endpoint_id) == routing_.end()) + if (routing_.find(packet.dest_endpoint_id) == routing_.end()) { return; + } uint64_t packet_id = next_packet_id_++; bool sent = network_behavior_->EnqueuePacket( PacketInFlightInfo(packet.size(), packet.arrival_time.us(), packet_id)); @@ -119,7 +121,7 @@ void EmulatedNetworkNode::SetReceiver( .insert(std::pair( dest_endpoint_id, receiver)) .second) - << "Such routing already exists"; + << "Routing for endpoint " << dest_endpoint_id << " already exists"; } void EmulatedNetworkNode::RemoveReceiver(uint64_t dest_endpoint_id) { @@ -127,5 +129,111 @@ void EmulatedNetworkNode::RemoveReceiver(uint64_t dest_endpoint_id) { routing_.erase(dest_endpoint_id); } +EndpointNode::EndpointNode(uint64_t id, rtc::IPAddress ip, Clock* clock) + : id_(id), + peer_local_addr_(ip), + send_node_(nullptr), + clock_(clock), + next_port_(kFirstEphemeralPort), + connected_endpoint_id_(absl::nullopt) {} +EndpointNode::~EndpointNode() = default; + +uint64_t EndpointNode::GetId() const { + return id_; +} + +void EndpointNode::SetSendNode(EmulatedNetworkNode* send_node) { + send_node_ = send_node; +} + +void EndpointNode::SendPacket(const rtc::SocketAddress& from, + const rtc::SocketAddress& to, + rtc::CopyOnWriteBuffer packet) { + RTC_CHECK(from.ipaddr() == peer_local_addr_); + RTC_CHECK(connected_endpoint_id_); + RTC_CHECK(send_node_); + send_node_->OnPacketReceived(EmulatedIpPacket( + from, to, connected_endpoint_id_.value(), std::move(packet), + Timestamp::us(clock_->TimeInMicroseconds()))); +} + +absl::optional EndpointNode::BindReceiver( + uint16_t desired_port, + EmulatedNetworkReceiverInterface* receiver) { + rtc::CritScope crit(&receiver_lock_); + uint16_t port = desired_port; + if (port == 0) { + // Because client can specify its own port, next_port_ can be already in + // use, so we need to find next available port. + int ports_pool_size = + std::numeric_limits::max() - kFirstEphemeralPort + 1; + for (int i = 0; i < ports_pool_size; ++i) { + uint16_t next_port = NextPort(); + if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) { + port = next_port; + break; + } + } + } + RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint " + << id_; + bool result = port_to_receiver_.insert({port, receiver}).second; + if (!result) { + RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port + << " in endpoint " << id_; + return absl::nullopt; + } + RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port " + << port; + return port; +} + +uint16_t EndpointNode::NextPort() { + uint16_t out = next_port_; + if (next_port_ == std::numeric_limits::max()) { + next_port_ = kFirstEphemeralPort; + } else { + next_port_++; + } + return out; +} + +void EndpointNode::UnbindReceiver(uint16_t port) { + rtc::CritScope crit(&receiver_lock_); + port_to_receiver_.erase(port); +} + +rtc::IPAddress EndpointNode::GetPeerLocalAddress() const { + return peer_local_addr_; +} + +void EndpointNode::OnPacketReceived(EmulatedIpPacket packet) { + RTC_CHECK(packet.dest_endpoint_id == id_) + << "Routing error: wrong destination endpoint. Destination id: " + << packet.dest_endpoint_id << "; Receiver id: " << id_; + rtc::CritScope crit(&receiver_lock_); + auto it = port_to_receiver_.find(packet.to.port()); + if (it == port_to_receiver_.end()) { + // It can happen, that remote peer closed connection, but there still some + // packets, that are going to it. It can happen during peer connection close + // process: one peer closed connection, second still sending data. + RTC_LOG(INFO) << "No receiver registered in " << id_ << " on port " + << packet.to.port(); + return; + } + // Endpoint assumes frequent calls to bind and unbind methods, so it holds + // lock during packet processing to ensure that receiver won't be deleted + // before call to OnPacketReceived. + it->second->OnPacketReceived(std::move(packet)); +} + +EmulatedNetworkNode* EndpointNode::GetSendNode() const { + return send_node_; +} + +void EndpointNode::SetConnectedEndpointId(uint64_t endpoint_id) { + connected_endpoint_id_ = endpoint_id; +} + } // namespace test } // namespace webrtc diff --git a/test/scenario/network/network_emulation.h b/test/scenario/network/network_emulation.h index ba8798484e..d133337f0f 100644 --- a/test/scenario/network/network_emulation.h +++ b/test/scenario/network/network_emulation.h @@ -23,8 +23,10 @@ #include "api/units/timestamp.h" #include "rtc_base/async_socket.h" #include "rtc_base/copy_on_write_buffer.h" +#include "rtc_base/critical_section.h" #include "rtc_base/socket_address.h" #include "rtc_base/thread.h" +#include "system_wrappers/include/clock.h" namespace webrtc { namespace test { @@ -36,7 +38,6 @@ struct EmulatedIpPacket { uint64_t dest_endpoint_id, rtc::CopyOnWriteBuffer data, Timestamp arrival_time); - ~EmulatedIpPacket(); // This object is not copyable or assignable. EmulatedIpPacket(const EmulatedIpPacket&) = delete; @@ -107,6 +108,72 @@ class EmulatedNetworkNode : public EmulatedNetworkReceiverInterface { uint64_t next_packet_id_ RTC_GUARDED_BY(lock_) = 1; }; +// Represents single network interface on the device. +// It will be used as sender from socket side to send data to the network and +// will act as packet receiver from emulated network side to receive packets +// from other EmulatedNetworkNodes. +class EndpointNode : public EmulatedNetworkReceiverInterface { + public: + EndpointNode(uint64_t id, rtc::IPAddress, Clock* clock); + ~EndpointNode() override; + + uint64_t GetId() const; + + // Set network node, that will be used to send packets to the network. + void SetSendNode(EmulatedNetworkNode* send_node); + // Send packet into network. + // |from| will be used to set source address for the packet in destination + // socket. + // |to| will be used for routing verification and picking right socket by port + // on destination endpoint. + void SendPacket(const rtc::SocketAddress& from, + const rtc::SocketAddress& to, + rtc::CopyOnWriteBuffer packet); + + // Binds receiver to this endpoint to send and receive data. + // |desired_port| is a port that should be used. If it is equal to 0, + // endpoint will pick the first available port starting from + // |kFirstEphemeralPort|. + // + // Returns the port, that should be used (it will be equals to desired, if + // |desired_port| != 0 and is free or will be the one, selected by endpoint) + // or absl::nullopt if desired_port in used. Also fails if there are no more + // free ports to bind to. + absl::optional BindReceiver( + uint16_t desired_port, + EmulatedNetworkReceiverInterface* receiver); + void UnbindReceiver(uint16_t port); + + rtc::IPAddress GetPeerLocalAddress() const; + + // Will be called to deliver packet into endpoint from network node. + void OnPacketReceived(EmulatedIpPacket packet) override; + + protected: + friend class NetworkEmulationManager; + + EmulatedNetworkNode* GetSendNode() const; + void SetConnectedEndpointId(uint64_t endpoint_id); + + private: + static constexpr uint16_t kFirstEphemeralPort = 49152; + uint16_t NextPort() RTC_EXCLUSIVE_LOCKS_REQUIRED(receiver_lock_); + + rtc::CriticalSection receiver_lock_; + + uint64_t id_; + // Peer's local IP address for this endpoint network interface. + const rtc::IPAddress peer_local_addr_; + EmulatedNetworkNode* send_node_; + Clock* const clock_; + + uint16_t next_port_ RTC_GUARDED_BY(receiver_lock_); + std::map port_to_receiver_ + RTC_GUARDED_BY(receiver_lock_); + + absl::optional connected_endpoint_id_; +}; + } // namespace test } // namespace webrtc diff --git a/test/scenario/network/network_emulation_manager.cc b/test/scenario/network/network_emulation_manager.cc new file mode 100644 index 0000000000..629f448783 --- /dev/null +++ b/test/scenario/network/network_emulation_manager.cc @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "test/scenario/network/network_emulation_manager.h" + +#include +#include + +#include "absl/memory/memory.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" + +namespace webrtc { +namespace test { +namespace { + +constexpr int64_t kPacketProcessingIntervalMs = 1; + +} // namespace + +NetworkEmulationManager::NetworkEmulationManager(webrtc::Clock* clock) + : clock_(clock), + next_node_id_(1), + task_queue_("network_emulation_manager") {} +NetworkEmulationManager::~NetworkEmulationManager() { + Stop(); +} + +EmulatedNetworkNode* NetworkEmulationManager::CreateEmulatedNode( + std::unique_ptr network_behavior) { + auto node = + absl::make_unique(std::move(network_behavior)); + EmulatedNetworkNode* out = node.get(); + + struct Closure { + void operator()() { manager->network_nodes_.push_back(std::move(node)); } + NetworkEmulationManager* manager; + std::unique_ptr node; + }; + task_queue_.PostTask(Closure{this, std::move(node)}); + return out; +} + +EndpointNode* NetworkEmulationManager::CreateEndpoint(rtc::IPAddress ip) { + auto node = absl::make_unique(next_node_id_++, ip, clock_); + EndpointNode* out = node.get(); + endpoints_.push_back(std::move(node)); + return out; +} + +void NetworkEmulationManager::CreateRoute( + EndpointNode* from, + std::vector via_nodes, + EndpointNode* to) { + // Because endpoint has no send node by default at least one should be + // provided here. + RTC_CHECK(!via_nodes.empty()); + + from->SetSendNode(via_nodes[0]); + EmulatedNetworkNode* cur_node = via_nodes[0]; + for (size_t i = 1; i < via_nodes.size(); ++i) { + cur_node->SetReceiver(to->GetId(), via_nodes[i]); + cur_node = via_nodes[i]; + } + cur_node->SetReceiver(to->GetId(), to); + from->SetConnectedEndpointId(to->GetId()); +} + +void NetworkEmulationManager::ClearRoute( + EndpointNode* from, + std::vector via_nodes, + EndpointNode* to) { + // Remove receiver from intermediate nodes. + for (auto* node : via_nodes) { + node->RemoveReceiver(to->GetId()); + } + // Detach endpoint from current send node. + if (from->GetSendNode()) { + from->GetSendNode()->RemoveReceiver(to->GetId()); + from->SetSendNode(nullptr); + } +} + +rtc::Thread* NetworkEmulationManager::CreateNetworkThread( + std::vector endpoints) { + FakeNetworkSocketServer* socket_server = CreateSocketServer(endpoints); + std::unique_ptr network_thread = + absl::make_unique(socket_server); + network_thread->SetName("network_thread" + std::to_string(threads_.size()), + nullptr); + network_thread->Start(); + rtc::Thread* out = network_thread.get(); + threads_.push_back(std::move(network_thread)); + return out; +} + +void NetworkEmulationManager::Start() { + process_task_handle_ = RepeatingTaskHandle::Start(&task_queue_, [this] { + ProcessNetworkPackets(); + return TimeDelta::ms(kPacketProcessingIntervalMs); + }); +} + +void NetworkEmulationManager::Stop() { + process_task_handle_.PostStop(); +} + +FakeNetworkSocketServer* NetworkEmulationManager::CreateSocketServer( + std::vector endpoints) { + auto socket_server = + absl::make_unique(clock_, endpoints); + FakeNetworkSocketServer* out = socket_server.get(); + socket_servers_.push_back(std::move(socket_server)); + return out; +} + +void NetworkEmulationManager::ProcessNetworkPackets() { + Timestamp current_time = Now(); + for (auto& node : network_nodes_) { + node->Process(current_time); + } +} + +Timestamp NetworkEmulationManager::Now() const { + return Timestamp::us(clock_->TimeInMicroseconds()); +} + +} // namespace test +} // namespace webrtc diff --git a/test/scenario/network/network_emulation_manager.h b/test/scenario/network/network_emulation_manager.h new file mode 100644 index 0000000000..f06fb75a24 --- /dev/null +++ b/test/scenario/network/network_emulation_manager.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef TEST_SCENARIO_NETWORK_NETWORK_EMULATION_MANAGER_H_ +#define TEST_SCENARIO_NETWORK_NETWORK_EMULATION_MANAGER_H_ + +#include +#include +#include + +#include "api/test/simulated_network.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "rtc_base/logging.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/repeating_task.h" +#include "rtc_base/thread.h" +#include "test/scenario/network/fake_network_socket_server.h" +#include "test/scenario/network/network_emulation.h" + +namespace webrtc { +namespace test { + +class NetworkEmulationManager { + public: + explicit NetworkEmulationManager(Clock* clock); + ~NetworkEmulationManager(); + + EmulatedNetworkNode* CreateEmulatedNode( + std::unique_ptr network_behavior); + + // TODO(titovartem) add method without IP address, where manager + // will provided some unique generated address. + EndpointNode* CreateEndpoint(rtc::IPAddress ip); + + void CreateRoute(EndpointNode* from, + std::vector via_nodes, + EndpointNode* to); + void ClearRoute(EndpointNode* from, + std::vector via_nodes, + EndpointNode* to); + + rtc::Thread* CreateNetworkThread(std::vector endpoints); + + void Start(); + void Stop(); + + private: + FakeNetworkSocketServer* CreateSocketServer( + std::vector endpoints); + void ProcessNetworkPackets(); + Timestamp Now() const; + + Clock* const clock_; + int next_node_id_; + + RepeatingTaskHandle process_task_handle_; + + // All objects can be added to the manager only when it is idle. + std::vector> endpoints_; + std::vector> network_nodes_; + std::vector> socket_servers_; + std::vector> threads_; + + // Must be the last field, so it will be deleted first, because tasks + // in the TaskQueue can access other fields of the instance of this class. + rtc::TaskQueue task_queue_; +}; + +} // namespace test +} // namespace webrtc + +#endif // TEST_SCENARIO_NETWORK_NETWORK_EMULATION_MANAGER_H_ diff --git a/test/scenario/network/network_emulation_pc_unittest.cc b/test/scenario/network/network_emulation_pc_unittest.cc new file mode 100644 index 0000000000..105cb56bbd --- /dev/null +++ b/test/scenario/network/network_emulation_pc_unittest.cc @@ -0,0 +1,203 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include + +#include "absl/memory/memory.h" +#include "api/audio_codecs/builtin_audio_decoder_factory.h" +#include "api/audio_codecs/builtin_audio_encoder_factory.h" +#include "api/call/call_factory_interface.h" +#include "api/peer_connection_interface.h" +#include "api/scoped_refptr.h" +#include "api/video_codecs/builtin_video_decoder_factory.h" +#include "api/video_codecs/builtin_video_encoder_factory.h" +#include "call/simulated_network.h" +#include "logging/rtc_event_log/rtc_event_log_factory.h" +#include "media/engine/webrtc_media_engine.h" +#include "modules/audio_device/include/test_audio_device.h" +#include "p2p/client/basic_port_allocator.h" +#include "pc/peer_connection_wrapper.h" +#include "pc/test/mock_peer_connection_observers.h" +#include "rtc_base/async_invoker.h" +#include "rtc_base/fake_network.h" +#include "rtc_base/gunit.h" +#include "test/gmock.h" +#include "test/gtest.h" +#include "test/scenario/network/network_emulation.h" +#include "test/scenario/network/network_emulation_manager.h" + +namespace webrtc { +namespace test { +namespace { + +constexpr int kDefaultTimeoutMs = 1000; +constexpr int kMaxAptitude = 32000; +constexpr int kSamplingFrequency = 48000; +constexpr char kSignalThreadName[] = "signaling_thread"; + +bool AddIceCandidates(PeerConnectionWrapper* peer, + std::vector candidates) { + bool success = true; + for (const auto candidate : candidates) { + if (!peer->pc()->AddIceCandidate(candidate)) { + success = false; + } + } + return success; +} + +rtc::scoped_refptr CreatePeerConnectionFactory( + rtc::Thread* signaling_thread, + rtc::Thread* network_thread) { + PeerConnectionFactoryDependencies pcf_deps; + pcf_deps.call_factory = webrtc::CreateCallFactory(); + pcf_deps.event_log_factory = webrtc::CreateRtcEventLogFactory(); + pcf_deps.network_thread = network_thread; + pcf_deps.signaling_thread = signaling_thread; + pcf_deps.media_engine = cricket::WebRtcMediaEngineFactory::Create( + TestAudioDeviceModule::CreateTestAudioDeviceModule( + TestAudioDeviceModule::CreatePulsedNoiseCapturer(kMaxAptitude, + kSamplingFrequency), + TestAudioDeviceModule::CreateDiscardRenderer(kSamplingFrequency)), + webrtc::CreateBuiltinAudioEncoderFactory(), + webrtc::CreateBuiltinAudioDecoderFactory(), + webrtc::CreateBuiltinVideoEncoderFactory(), + webrtc::CreateBuiltinVideoDecoderFactory(), /*audio_mixer=*/nullptr, + webrtc::AudioProcessingBuilder().Create()); + return CreateModularPeerConnectionFactory(std::move(pcf_deps)); +} + +rtc::scoped_refptr CreatePeerConnection( + const rtc::scoped_refptr& pcf, + PeerConnectionObserver* observer, + rtc::NetworkManager* network_manager) { + PeerConnectionDependencies pc_deps(observer); + auto port_allocator = + absl::make_unique(network_manager); + + // This test does not support TCP + int flags = cricket::PORTALLOCATOR_DISABLE_TCP; + port_allocator->set_flags(port_allocator->flags() | flags); + + pc_deps.allocator = std::move(port_allocator); + PeerConnectionInterface::RTCConfiguration rtc_configuration; + rtc_configuration.sdp_semantics = SdpSemantics::kUnifiedPlan; + + return pcf->CreatePeerConnection(rtc_configuration, std::move(pc_deps)); +} + +} // namespace + +TEST(NetworkEmulationManagerPCTest, Run) { + std::unique_ptr signaling_thread = rtc::Thread::Create(); + signaling_thread->SetName(kSignalThreadName, nullptr); + signaling_thread->Start(); + + // Setup emulated network + NetworkEmulationManager network_manager(Clock::GetRealTimeClock()); + + EmulatedNetworkNode* alice_node = network_manager.CreateEmulatedNode( + absl::make_unique(BuiltInNetworkBehaviorConfig())); + EmulatedNetworkNode* bob_node = network_manager.CreateEmulatedNode( + absl::make_unique(BuiltInNetworkBehaviorConfig())); + rtc::IPAddress alice_ip(1); + EndpointNode* alice_endpoint = network_manager.CreateEndpoint(alice_ip); + rtc::IPAddress bob_ip(2); + EndpointNode* bob_endpoint = network_manager.CreateEndpoint(bob_ip); + network_manager.CreateRoute(alice_endpoint, {alice_node}, bob_endpoint); + network_manager.CreateRoute(bob_endpoint, {bob_node}, alice_endpoint); + + rtc::Thread* alice_network_thread = + network_manager.CreateNetworkThread({alice_endpoint}); + rtc::Thread* bob_network_thread = + network_manager.CreateNetworkThread({bob_endpoint}); + + // Setup peer connections. + rtc::scoped_refptr alice_pcf; + rtc::scoped_refptr alice_pc; + std::unique_ptr alice_observer = + absl::make_unique(); + std::unique_ptr alice_network_manager = + absl::make_unique(); + alice_network_manager->AddInterface(rtc::SocketAddress(alice_ip, 0)); + + rtc::scoped_refptr bob_pcf; + rtc::scoped_refptr bob_pc; + std::unique_ptr bob_observer = + absl::make_unique(); + std::unique_ptr bob_network_manager = + absl::make_unique(); + bob_network_manager->AddInterface(rtc::SocketAddress(bob_ip, 0)); + + signaling_thread->Invoke(RTC_FROM_HERE, [&]() { + alice_pcf = CreatePeerConnectionFactory(signaling_thread.get(), + alice_network_thread); + alice_pc = CreatePeerConnection(alice_pcf, alice_observer.get(), + alice_network_manager.get()); + + bob_pcf = + CreatePeerConnectionFactory(signaling_thread.get(), bob_network_thread); + bob_pc = CreatePeerConnection(bob_pcf, bob_observer.get(), + bob_network_manager.get()); + }); + + std::unique_ptr alice = + absl::make_unique(alice_pcf, alice_pc, + std::move(alice_observer)); + std::unique_ptr bob = + absl::make_unique(bob_pcf, bob_pc, + std::move(bob_observer)); + + network_manager.Start(); + + signaling_thread->Invoke(RTC_FROM_HERE, [&]() { + rtc::scoped_refptr channel = + alice->CreateDataChannel("data"); + + // Connect peers. + ASSERT_TRUE(alice->ExchangeOfferAnswerWith(bob.get())); + // Do the SDP negotiation, and also exchange ice candidates. + ASSERT_TRUE_WAIT( + alice->signaling_state() == PeerConnectionInterface::kStable, + kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(alice->IsIceGatheringDone(), kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(bob->IsIceGatheringDone(), kDefaultTimeoutMs); + + // Connect an ICE candidate pairs. + ASSERT_TRUE( + AddIceCandidates(bob.get(), alice->observer()->GetAllCandidates())); + ASSERT_TRUE( + AddIceCandidates(alice.get(), bob->observer()->GetAllCandidates())); + // This means that ICE and DTLS are connected. + ASSERT_TRUE_WAIT(bob->IsIceConnected(), kDefaultTimeoutMs); + ASSERT_TRUE_WAIT(alice->IsIceConnected(), kDefaultTimeoutMs); + + ASSERT_TRUE_WAIT(bob->observer()->last_datachannel_ != nullptr, + kDefaultTimeoutMs); + MockDataChannelObserver observer(bob->observer()->last_datachannel_); + channel->Send(DataBuffer("Test data")); + ASSERT_TRUE_WAIT(observer.received_message_count() == 1, kDefaultTimeoutMs); + ASSERT_EQ("Test data", observer.last_message()); + + // Close peer connections + alice->pc()->Close(); + bob->pc()->Close(); + + // Delete peers. + alice.reset(); + bob.reset(); + }); + + network_manager.Stop(); +} + +} // namespace test +} // namespace webrtc diff --git a/test/scenario/network/network_emulation_unittest.cc b/test/scenario/network/network_emulation_unittest.cc new file mode 100644 index 0000000000..d04a99f99e --- /dev/null +++ b/test/scenario/network/network_emulation_unittest.cc @@ -0,0 +1,114 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include + +#include "absl/memory/memory.h" +#include "api/test/simulated_network.h" +#include "call/simulated_network.h" +#include "rtc_base/event.h" +#include "rtc_base/logging.h" +#include "test/gmock.h" +#include "test/gtest.h" +#include "test/scenario/network/network_emulation.h" +#include "test/scenario/network/network_emulation_manager.h" + +namespace webrtc { +namespace test { + +class SocketReader : public sigslot::has_slots<> { + public: + explicit SocketReader(rtc::AsyncSocket* socket) : socket_(socket) { + socket_->SignalReadEvent.connect(this, &SocketReader::OnReadEvent); + size_ = 128 * 1024; + buf_ = new char[size_]; + } + ~SocketReader() override { delete[] buf_; } + + void OnReadEvent(rtc::AsyncSocket* socket) { + RTC_DCHECK(socket_ == socket); + int64_t timestamp; + len_ = socket_->Recv(buf_, size_, ×tamp); + { + rtc::CritScope crit(&lock_); + received_count_++; + } + } + + int ReceivedCount() { + rtc::CritScope crit(&lock_); + return received_count_; + } + + private: + rtc::AsyncSocket* socket_; + char* buf_; + size_t size_; + int len_; + + rtc::CriticalSection lock_; + int received_count_ RTC_GUARDED_BY(lock_) = 0; +}; + +TEST(NetworkEmulationManagerTest, Run) { + NetworkEmulationManager network_manager(Clock::GetRealTimeClock()); + + EmulatedNetworkNode* alice_node = network_manager.CreateEmulatedNode( + absl::make_unique(BuiltInNetworkBehaviorConfig())); + EmulatedNetworkNode* bob_node = network_manager.CreateEmulatedNode( + absl::make_unique(BuiltInNetworkBehaviorConfig())); + EndpointNode* alice_endpoint = + network_manager.CreateEndpoint(rtc::IPAddress(1)); + EndpointNode* bob_endpoint = + network_manager.CreateEndpoint(rtc::IPAddress(2)); + network_manager.CreateRoute(alice_endpoint, {alice_node}, bob_endpoint); + network_manager.CreateRoute(bob_endpoint, {bob_node}, alice_endpoint); + + auto* nt1 = network_manager.CreateNetworkThread({alice_endpoint}); + auto* nt2 = network_manager.CreateNetworkThread({bob_endpoint}); + + network_manager.Start(); + + for (uint64_t j = 0; j < 2; j++) { + auto* s1 = nt1->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM); + auto* s2 = nt2->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM); + + SocketReader r1(s1); + SocketReader r2(s2); + + rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0); + rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0); + + s1->Bind(a1); + s2->Bind(a2); + + s1->Connect(s1->GetLocalAddress()); + s2->Connect(s2->GetLocalAddress()); + + rtc::CopyOnWriteBuffer data("Hello"); + for (uint64_t i = 0; i < 1000; i++) { + s1->Send(data.data(), data.size()); + s2->Send(data.data(), data.size()); + } + + rtc::Event wait; + wait.Wait(1000); + ASSERT_EQ(r1.ReceivedCount(), 1000); + ASSERT_EQ(r2.ReceivedCount(), 1000); + + delete s1; + delete s2; + } + + network_manager.Stop(); +} + +} // namespace test +} // namespace webrtc