Using tasks to process packets in FakeNetworkSocketServer.
This way we can rely on existing task scheduling and execution functionality, reducing the required functionality to support the fake socket server. This prepares for support simulated time execution of peer connection level tests. Bug: webrtc:11255 Change-Id: I7de64a099c2e355c70929ecff79b8ea3b98b70b8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165398 Commit-Queue: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Artem Titov <titovartem@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30221}
This commit is contained in:
parent
10d8758251
commit
9d4bbc216b
@ -24,7 +24,7 @@ EmulatedNetworkManager::EmulatedNetworkManager(
|
||||
EndpointsContainer* endpoints_container)
|
||||
: task_queue_(task_queue),
|
||||
endpoints_container_(endpoints_container),
|
||||
socket_server_(clock, endpoints_container),
|
||||
socket_server_(endpoints_container),
|
||||
network_thread_(&socket_server_),
|
||||
sent_first_update_(false),
|
||||
start_count_(0) {
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "absl/algorithm/container.h"
|
||||
#include "rtc_base/async_invoker.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/thread.h"
|
||||
|
||||
@ -32,13 +33,12 @@ std::string ToString(const rtc::SocketAddress& addr) {
|
||||
class FakeNetworkSocket : public rtc::AsyncSocket,
|
||||
public EmulatedNetworkReceiverInterface {
|
||||
public:
|
||||
explicit FakeNetworkSocket(FakeNetworkSocketServer* scoket_manager);
|
||||
explicit FakeNetworkSocket(FakeNetworkSocketServer* scoket_manager,
|
||||
rtc::Thread* thread);
|
||||
~FakeNetworkSocket() override;
|
||||
|
||||
// Will be invoked by EmulatedEndpoint to deliver packets into this socket.
|
||||
void OnPacketReceived(EmulatedIpPacket packet) override;
|
||||
// Will fire read event for incoming packets.
|
||||
bool ProcessIo();
|
||||
|
||||
// rtc::Socket methods:
|
||||
rtc::SocketAddress GetLocalAddress() const override;
|
||||
@ -64,72 +64,62 @@ class FakeNetworkSocket : public rtc::AsyncSocket,
|
||||
int SetOption(Option opt, int value) override;
|
||||
|
||||
private:
|
||||
absl::optional<EmulatedIpPacket> PopFrontPacket();
|
||||
|
||||
FakeNetworkSocketServer* const socket_server_;
|
||||
EmulatedEndpointImpl* endpoint_;
|
||||
rtc::Thread* const thread_;
|
||||
EmulatedEndpointImpl* endpoint_ RTC_GUARDED_BY(&thread_);
|
||||
rtc::SocketAddress local_addr_ RTC_GUARDED_BY(&thread_);
|
||||
rtc::SocketAddress remote_addr_ RTC_GUARDED_BY(&thread_);
|
||||
ConnState state_ RTC_GUARDED_BY(&thread_);
|
||||
int error_ RTC_GUARDED_BY(&thread_);
|
||||
std::map<Option, int> options_map_ RTC_GUARDED_BY(&thread_);
|
||||
|
||||
rtc::SocketAddress local_addr_;
|
||||
rtc::SocketAddress remote_addr_;
|
||||
ConnState state_;
|
||||
int error_;
|
||||
std::map<Option, int> options_map_;
|
||||
|
||||
rtc::CriticalSection lock_;
|
||||
// Count of packets in the queue for which we didn't fire read event.
|
||||
// |pending_read_events_count_| can be different from |packet_queue_.size()|
|
||||
// because read events will be fired by one thread and packets in the queue
|
||||
// can be processed by another thread.
|
||||
int pending_read_events_count_;
|
||||
std::deque<EmulatedIpPacket> packet_queue_ RTC_GUARDED_BY(lock_);
|
||||
absl::optional<EmulatedIpPacket> pending_ RTC_GUARDED_BY(thread_);
|
||||
rtc::AsyncInvoker invoker_;
|
||||
};
|
||||
|
||||
FakeNetworkSocket::FakeNetworkSocket(FakeNetworkSocketServer* socket_server)
|
||||
FakeNetworkSocket::FakeNetworkSocket(FakeNetworkSocketServer* socket_server,
|
||||
rtc::Thread* thread)
|
||||
: socket_server_(socket_server),
|
||||
thread_(thread),
|
||||
state_(CS_CLOSED),
|
||||
error_(0),
|
||||
pending_read_events_count_(0) {}
|
||||
error_(0) {}
|
||||
|
||||
FakeNetworkSocket::~FakeNetworkSocket() {
|
||||
Close();
|
||||
socket_server_->Unregister(this);
|
||||
}
|
||||
|
||||
void FakeNetworkSocket::OnPacketReceived(EmulatedIpPacket packet) {
|
||||
{
|
||||
rtc::CritScope crit(&lock_);
|
||||
packet_queue_.push_back(std::move(packet));
|
||||
pending_read_events_count_++;
|
||||
}
|
||||
auto task = [this, packet = std::move(packet)]() mutable {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
if (!endpoint_->Enabled())
|
||||
return;
|
||||
RTC_DCHECK(!pending_);
|
||||
pending_ = std::move(packet);
|
||||
// Note that we expect that this will trigger exactly one call to RecvFrom()
|
||||
// where pending_packet will be read and reset. This call is done without
|
||||
// any thread switch (see AsyncUDPSocket::OnReadEvent) so it's safe to
|
||||
// assume that SignalReadEvent() will block until the packet has been read.
|
||||
SignalReadEvent(this);
|
||||
RTC_DCHECK(!pending_);
|
||||
};
|
||||
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, std::move(task));
|
||||
socket_server_->WakeUp();
|
||||
}
|
||||
|
||||
bool FakeNetworkSocket::ProcessIo() {
|
||||
{
|
||||
rtc::CritScope crit(&lock_);
|
||||
if (pending_read_events_count_ == 0) {
|
||||
return false;
|
||||
}
|
||||
pending_read_events_count_--;
|
||||
RTC_DCHECK_GE(pending_read_events_count_, 0);
|
||||
}
|
||||
if (!endpoint_->Enabled()) {
|
||||
// If endpoint disabled then just pop and discard packet.
|
||||
PopFrontPacket();
|
||||
return true;
|
||||
}
|
||||
SignalReadEvent(this);
|
||||
return true;
|
||||
}
|
||||
|
||||
rtc::SocketAddress FakeNetworkSocket::GetLocalAddress() const {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
return local_addr_;
|
||||
}
|
||||
|
||||
rtc::SocketAddress FakeNetworkSocket::GetRemoteAddress() const {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
return remote_addr_;
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::Bind(const rtc::SocketAddress& addr) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
RTC_CHECK(local_addr_.IsNil())
|
||||
<< "Socket already bound to address: " << ToString(local_addr_);
|
||||
local_addr_ = addr;
|
||||
@ -153,6 +143,7 @@ int FakeNetworkSocket::Bind(const rtc::SocketAddress& addr) {
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::Connect(const rtc::SocketAddress& addr) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
RTC_CHECK(remote_addr_.IsNil())
|
||||
<< "Socket already connected to address: " << ToString(remote_addr_);
|
||||
RTC_CHECK(!local_addr_.IsNil())
|
||||
@ -163,6 +154,7 @@ int FakeNetworkSocket::Connect(const rtc::SocketAddress& addr) {
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::Send(const void* pv, size_t cb) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
RTC_CHECK(state_ == CS_CONNECTED) << "Socket cannot send: not connected";
|
||||
return SendTo(pv, cb, remote_addr_);
|
||||
}
|
||||
@ -170,6 +162,7 @@ int FakeNetworkSocket::Send(const void* pv, size_t cb) {
|
||||
int FakeNetworkSocket::SendTo(const void* pv,
|
||||
size_t cb,
|
||||
const rtc::SocketAddress& addr) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
RTC_CHECK(!local_addr_.IsNil())
|
||||
<< "Socket have to be bind to some local address";
|
||||
if (!endpoint_->Enabled()) {
|
||||
@ -192,34 +185,32 @@ int FakeNetworkSocket::RecvFrom(void* pv,
|
||||
size_t cb,
|
||||
rtc::SocketAddress* paddr,
|
||||
int64_t* timestamp) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
|
||||
if (timestamp) {
|
||||
*timestamp = -1;
|
||||
}
|
||||
absl::optional<EmulatedIpPacket> packetOpt = PopFrontPacket();
|
||||
RTC_CHECK(pending_);
|
||||
|
||||
if (!packetOpt) {
|
||||
error_ = EAGAIN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
EmulatedIpPacket packet = std::move(packetOpt.value());
|
||||
*paddr = packet.from;
|
||||
size_t data_read = std::min(cb, packet.size());
|
||||
memcpy(pv, packet.cdata(), data_read);
|
||||
*timestamp = packet.arrival_time.us();
|
||||
*paddr = pending_->from;
|
||||
size_t data_read = std::min(cb, pending_->size());
|
||||
memcpy(pv, pending_->cdata(), data_read);
|
||||
*timestamp = pending_->arrival_time.us();
|
||||
|
||||
// According to RECV(2) Linux Man page
|
||||
// real socket will discard data, that won't fit into provided buffer,
|
||||
// but we won't to skip such error, so we will assert here.
|
||||
RTC_CHECK(data_read == packet.size())
|
||||
RTC_CHECK(data_read == pending_->size())
|
||||
<< "Too small buffer is provided for socket read. "
|
||||
<< "Received data size: " << packet.size()
|
||||
<< "Received data size: " << pending_->size()
|
||||
<< "; Provided buffer size: " << cb;
|
||||
|
||||
pending_.reset();
|
||||
|
||||
// According to RECV(2) Linux Man page
|
||||
// real socket will return message length, not data read. In our case it is
|
||||
// actually the same value.
|
||||
return static_cast<int>(packet.size());
|
||||
return static_cast<int>(data_read);
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::Listen(int backlog) {
|
||||
@ -231,6 +222,7 @@ rtc::AsyncSocket* FakeNetworkSocket::Accept(rtc::SocketAddress* /*paddr*/) {
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::Close() {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
state_ = CS_CLOSED;
|
||||
if (!local_addr_.IsNil()) {
|
||||
endpoint_->UnbindReceiver(local_addr_.port());
|
||||
@ -241,19 +233,23 @@ int FakeNetworkSocket::Close() {
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::GetError() const {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
return error_;
|
||||
}
|
||||
|
||||
void FakeNetworkSocket::SetError(int error) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
RTC_CHECK(error == 0);
|
||||
error_ = error;
|
||||
}
|
||||
|
||||
rtc::AsyncSocket::ConnState FakeNetworkSocket::GetState() const {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
return state_;
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::GetOption(Option opt, int* value) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
auto it = options_map_.find(opt);
|
||||
if (it == options_map_.end()) {
|
||||
return -1;
|
||||
@ -263,32 +259,19 @@ int FakeNetworkSocket::GetOption(Option opt, int* value) {
|
||||
}
|
||||
|
||||
int FakeNetworkSocket::SetOption(Option opt, int value) {
|
||||
RTC_DCHECK_RUN_ON(thread_);
|
||||
options_map_[opt] = value;
|
||||
return 0;
|
||||
}
|
||||
|
||||
absl::optional<EmulatedIpPacket> FakeNetworkSocket::PopFrontPacket() {
|
||||
rtc::CritScope crit(&lock_);
|
||||
if (packet_queue_.empty()) {
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
absl::optional<EmulatedIpPacket> packet =
|
||||
absl::make_optional(std::move(packet_queue_.front()));
|
||||
packet_queue_.pop_front();
|
||||
return packet;
|
||||
}
|
||||
|
||||
FakeNetworkSocketServer::FakeNetworkSocketServer(
|
||||
Clock* clock,
|
||||
EndpointsContainer* endpoints_container)
|
||||
: clock_(clock),
|
||||
endpoints_container_(endpoints_container),
|
||||
: endpoints_container_(endpoints_container),
|
||||
wakeup_(/*manual_reset=*/false, /*initially_signaled=*/false) {}
|
||||
FakeNetworkSocketServer::~FakeNetworkSocketServer() = default;
|
||||
|
||||
void FakeNetworkSocketServer::OnMessageQueueDestroyed() {
|
||||
msg_queue_ = nullptr;
|
||||
thread_ = nullptr;
|
||||
}
|
||||
|
||||
EmulatedEndpointImpl* FakeNetworkSocketServer::GetEndpointNode(
|
||||
@ -311,7 +294,8 @@ rtc::AsyncSocket* FakeNetworkSocketServer::CreateAsyncSocket(int family,
|
||||
RTC_DCHECK(family == AF_INET || family == AF_INET6);
|
||||
// We support only UDP sockets for now.
|
||||
RTC_DCHECK(type == SOCK_DGRAM) << "Only UDP sockets are supported";
|
||||
FakeNetworkSocket* out = new FakeNetworkSocket(this);
|
||||
RTC_DCHECK(thread_) << "must be attached to thread before creating sockets";
|
||||
FakeNetworkSocket* out = new FakeNetworkSocket(this, thread_);
|
||||
{
|
||||
rtc::CritScope crit(&lock_);
|
||||
sockets_.push_back(out);
|
||||
@ -319,28 +303,19 @@ rtc::AsyncSocket* FakeNetworkSocketServer::CreateAsyncSocket(int family,
|
||||
return out;
|
||||
}
|
||||
|
||||
void FakeNetworkSocketServer::SetMessageQueue(rtc::Thread* msg_queue) {
|
||||
msg_queue_ = msg_queue;
|
||||
if (msg_queue_) {
|
||||
msg_queue_->SignalQueueDestroyed.connect(
|
||||
void FakeNetworkSocketServer::SetMessageQueue(rtc::Thread* thread) {
|
||||
thread_ = thread;
|
||||
if (thread_) {
|
||||
thread_->SignalQueueDestroyed.connect(
|
||||
this, &FakeNetworkSocketServer::OnMessageQueueDestroyed);
|
||||
}
|
||||
}
|
||||
|
||||
// Always returns true (if return false, it won't be invoked again...)
|
||||
bool FakeNetworkSocketServer::Wait(int cms, bool process_io) {
|
||||
RTC_DCHECK(msg_queue_ == rtc::Thread::Current());
|
||||
if (!process_io) {
|
||||
RTC_DCHECK(thread_ == rtc::Thread::Current());
|
||||
if (cms != 0)
|
||||
wakeup_.Wait(cms);
|
||||
return true;
|
||||
}
|
||||
wakeup_.Wait(cms);
|
||||
|
||||
rtc::CritScope crit(&lock_);
|
||||
for (auto* socket : sockets_) {
|
||||
while (socket->ProcessIo()) {
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -348,9 +323,6 @@ void FakeNetworkSocketServer::WakeUp() {
|
||||
wakeup_.Set();
|
||||
}
|
||||
|
||||
Timestamp FakeNetworkSocketServer::Now() const {
|
||||
return clock_->CurrentTime();
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
} // namespace webrtc
|
||||
|
||||
@ -31,13 +31,9 @@ class FakeNetworkSocket;
|
||||
class FakeNetworkSocketServer : public rtc::SocketServer,
|
||||
public sigslot::has_slots<> {
|
||||
public:
|
||||
FakeNetworkSocketServer(Clock* clock,
|
||||
EndpointsContainer* endpoints_controller);
|
||||
explicit FakeNetworkSocketServer(EndpointsContainer* endpoints_controller);
|
||||
~FakeNetworkSocketServer() override;
|
||||
|
||||
EmulatedEndpointImpl* GetEndpointNode(const rtc::IPAddress& ip);
|
||||
void Unregister(FakeNetworkSocket* socket);
|
||||
void OnMessageQueueDestroyed();
|
||||
|
||||
// rtc::SocketFactory methods:
|
||||
rtc::Socket* CreateSocket(int family, int type) override;
|
||||
@ -46,17 +42,21 @@ class FakeNetworkSocketServer : public rtc::SocketServer,
|
||||
// rtc::SocketServer methods:
|
||||
// Called by the network thread when this server is installed, kicking off the
|
||||
// message handler loop.
|
||||
void SetMessageQueue(rtc::Thread* msg_queue) override;
|
||||
void SetMessageQueue(rtc::Thread* thread) override;
|
||||
bool Wait(int cms, bool process_io) override;
|
||||
void WakeUp() override;
|
||||
|
||||
private:
|
||||
Timestamp Now() const;
|
||||
protected:
|
||||
friend class FakeNetworkSocket;
|
||||
EmulatedEndpointImpl* GetEndpointNode(const rtc::IPAddress& ip);
|
||||
void Unregister(FakeNetworkSocket* socket);
|
||||
|
||||
private:
|
||||
void OnMessageQueueDestroyed();
|
||||
|
||||
Clock* const clock_;
|
||||
const EndpointsContainer* endpoints_container_;
|
||||
rtc::Event wakeup_;
|
||||
rtc::Thread* msg_queue_;
|
||||
rtc::Thread* thread_ = nullptr;
|
||||
|
||||
rtc::CriticalSection lock_;
|
||||
std::vector<FakeNetworkSocket*> sockets_ RTC_GUARDED_BY(lock_);
|
||||
|
||||
@ -44,13 +44,12 @@ class SocketReader : public sigslot::has_slots<> {
|
||||
|
||||
void OnReadEvent(rtc::AsyncSocket* socket) {
|
||||
RTC_DCHECK(socket_ == socket);
|
||||
network_thread_->PostTask(RTC_FROM_HERE, [this]() {
|
||||
int64_t timestamp;
|
||||
len_ = socket_->Recv(buf_, size_, ×tamp);
|
||||
RTC_DCHECK(network_thread_->IsCurrent());
|
||||
int64_t timestamp;
|
||||
len_ = socket_->Recv(buf_, size_, ×tamp);
|
||||
|
||||
rtc::CritScope crit(&lock_);
|
||||
received_count_++;
|
||||
});
|
||||
rtc::CritScope crit(&lock_);
|
||||
received_count_++;
|
||||
}
|
||||
|
||||
int ReceivedCount() {
|
||||
@ -201,30 +200,37 @@ TEST(NetworkEmulationManagerTest, Run) {
|
||||
EmulatedNetworkManagerInterface* nt2 =
|
||||
network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint});
|
||||
|
||||
rtc::Thread* t1 = nt1->network_thread();
|
||||
rtc::Thread* t2 = nt2->network_thread();
|
||||
|
||||
rtc::CopyOnWriteBuffer data("Hello");
|
||||
for (uint64_t j = 0; j < 2; j++) {
|
||||
auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket(
|
||||
AF_INET, SOCK_DGRAM);
|
||||
auto* s2 = nt2->network_thread()->socketserver()->CreateAsyncSocket(
|
||||
AF_INET, SOCK_DGRAM);
|
||||
auto* s1 = t1->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM);
|
||||
auto* s2 = t2->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM);
|
||||
|
||||
SocketReader r1(s1, nt1->network_thread());
|
||||
SocketReader r2(s2, nt2->network_thread());
|
||||
SocketReader r1(s1, t1);
|
||||
SocketReader r2(s2, t2);
|
||||
|
||||
rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0);
|
||||
rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0);
|
||||
|
||||
s1->Bind(a1);
|
||||
s2->Bind(a2);
|
||||
t1->Invoke<void>(RTC_FROM_HERE, [&] {
|
||||
s1->Bind(a1);
|
||||
a1 = s1->GetLocalAddress();
|
||||
});
|
||||
t2->Invoke<void>(RTC_FROM_HERE, [&] {
|
||||
s2->Bind(a2);
|
||||
a2 = s2->GetLocalAddress();
|
||||
});
|
||||
|
||||
s1->Connect(s2->GetLocalAddress());
|
||||
s2->Connect(s1->GetLocalAddress());
|
||||
t1->Invoke<void>(RTC_FROM_HERE, [&] { s1->Connect(a2); });
|
||||
t2->Invoke<void>(RTC_FROM_HERE, [&] { s2->Connect(a1); });
|
||||
|
||||
for (uint64_t i = 0; i < 1000; i++) {
|
||||
nt1->network_thread()->PostTask(
|
||||
RTC_FROM_HERE, [&]() { s1->Send(data.data(), data.size()); });
|
||||
nt2->network_thread()->PostTask(
|
||||
RTC_FROM_HERE, [&]() { s2->Send(data.data(), data.size()); });
|
||||
t1->PostTask(RTC_FROM_HERE,
|
||||
[&]() { s1->Send(data.data(), data.size()); });
|
||||
t2->PostTask(RTC_FROM_HERE,
|
||||
[&]() { s2->Send(data.data(), data.size()); });
|
||||
}
|
||||
|
||||
rtc::Event wait;
|
||||
@ -232,8 +238,8 @@ TEST(NetworkEmulationManagerTest, Run) {
|
||||
EXPECT_EQ(r1.ReceivedCount(), 1000);
|
||||
EXPECT_EQ(r2.ReceivedCount(), 1000);
|
||||
|
||||
delete s1;
|
||||
delete s2;
|
||||
t1->Invoke<void>(RTC_FROM_HERE, [&] { delete s1; });
|
||||
t2->Invoke<void>(RTC_FROM_HERE, [&] { delete s2; });
|
||||
}
|
||||
|
||||
const int64_t single_packet_size = data.size() + kOverheadIpv4Udp;
|
||||
@ -278,35 +284,40 @@ TEST(NetworkEmulationManagerTest, ThroughputStats) {
|
||||
EmulatedNetworkManagerInterface* nt2 =
|
||||
network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint});
|
||||
|
||||
rtc::Thread* t1 = nt1->network_thread();
|
||||
rtc::Thread* t2 = nt2->network_thread();
|
||||
|
||||
constexpr int64_t kUdpPayloadSize = 100;
|
||||
constexpr int64_t kSinglePacketSize = kUdpPayloadSize + kOverheadIpv4Udp;
|
||||
rtc::CopyOnWriteBuffer data(kUdpPayloadSize);
|
||||
auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket(
|
||||
AF_INET, SOCK_DGRAM);
|
||||
auto* s2 = nt2->network_thread()->socketserver()->CreateAsyncSocket(
|
||||
AF_INET, SOCK_DGRAM);
|
||||
auto* s1 = t1->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM);
|
||||
auto* s2 = t2->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM);
|
||||
|
||||
SocketReader r1(s1, nt1->network_thread());
|
||||
SocketReader r2(s2, nt2->network_thread());
|
||||
SocketReader r1(s1, t1);
|
||||
SocketReader r2(s2, t2);
|
||||
|
||||
rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0);
|
||||
rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0);
|
||||
|
||||
s1->Bind(a1);
|
||||
s2->Bind(a2);
|
||||
t1->Invoke<void>(RTC_FROM_HERE, [&] {
|
||||
s1->Bind(a1);
|
||||
a1 = s1->GetLocalAddress();
|
||||
});
|
||||
t2->Invoke<void>(RTC_FROM_HERE, [&] {
|
||||
s2->Bind(a2);
|
||||
a2 = s2->GetLocalAddress();
|
||||
});
|
||||
|
||||
s1->Connect(s2->GetLocalAddress());
|
||||
s2->Connect(s1->GetLocalAddress());
|
||||
t1->Invoke<void>(RTC_FROM_HERE, [&] { s1->Connect(a2); });
|
||||
t2->Invoke<void>(RTC_FROM_HERE, [&] { s2->Connect(a1); });
|
||||
|
||||
// Send 11 packets, totalizing 1 second between the first and the last.
|
||||
const int kNumPacketsSent = 11;
|
||||
const int kDelayMs = 100;
|
||||
rtc::Event wait;
|
||||
for (int i = 0; i < kNumPacketsSent; i++) {
|
||||
nt1->network_thread()->PostTask(
|
||||
RTC_FROM_HERE, [&]() { s1->Send(data.data(), data.size()); });
|
||||
nt2->network_thread()->PostTask(
|
||||
RTC_FROM_HERE, [&]() { s2->Send(data.data(), data.size()); });
|
||||
t1->PostTask(RTC_FROM_HERE, [&]() { s1->Send(data.data(), data.size()); });
|
||||
t2->PostTask(RTC_FROM_HERE, [&]() { s2->Send(data.data(), data.size()); });
|
||||
wait.Wait(kDelayMs);
|
||||
}
|
||||
|
||||
@ -325,8 +336,8 @@ TEST(NetworkEmulationManagerTest, ThroughputStats) {
|
||||
EXPECT_EQ(r1.ReceivedCount(), 11);
|
||||
EXPECT_EQ(r2.ReceivedCount(), 11);
|
||||
|
||||
delete s1;
|
||||
delete s2;
|
||||
t1->Invoke<void>(RTC_FROM_HERE, [&] { delete s1; });
|
||||
t2->Invoke<void>(RTC_FROM_HERE, [&] { delete s2; });
|
||||
}
|
||||
|
||||
// Testing that packets are delivered via all routes using a routing scheme as
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user