diff --git a/test/network/emulated_network_manager.cc b/test/network/emulated_network_manager.cc index 9457244ccc..b4ee7d330d 100644 --- a/test/network/emulated_network_manager.cc +++ b/test/network/emulated_network_manager.cc @@ -24,7 +24,7 @@ EmulatedNetworkManager::EmulatedNetworkManager( EndpointsContainer* endpoints_container) : task_queue_(task_queue), endpoints_container_(endpoints_container), - socket_server_(clock, endpoints_container), + socket_server_(endpoints_container), network_thread_(&socket_server_), sent_first_update_(false), start_count_(0) { diff --git a/test/network/fake_network_socket_server.cc b/test/network/fake_network_socket_server.cc index c1263c407e..3e9c0ef211 100644 --- a/test/network/fake_network_socket_server.cc +++ b/test/network/fake_network_socket_server.cc @@ -16,6 +16,7 @@ #include #include "absl/algorithm/container.h" +#include "rtc_base/async_invoker.h" #include "rtc_base/logging.h" #include "rtc_base/thread.h" @@ -32,13 +33,12 @@ std::string ToString(const rtc::SocketAddress& addr) { class FakeNetworkSocket : public rtc::AsyncSocket, public EmulatedNetworkReceiverInterface { public: - explicit FakeNetworkSocket(FakeNetworkSocketServer* scoket_manager); + explicit FakeNetworkSocket(FakeNetworkSocketServer* scoket_manager, + rtc::Thread* thread); ~FakeNetworkSocket() override; // Will be invoked by EmulatedEndpoint to deliver packets into this socket. void OnPacketReceived(EmulatedIpPacket packet) override; - // Will fire read event for incoming packets. - bool ProcessIo(); // rtc::Socket methods: rtc::SocketAddress GetLocalAddress() const override; @@ -64,72 +64,62 @@ class FakeNetworkSocket : public rtc::AsyncSocket, int SetOption(Option opt, int value) override; private: - absl::optional PopFrontPacket(); - FakeNetworkSocketServer* const socket_server_; - EmulatedEndpointImpl* endpoint_; + rtc::Thread* const thread_; + EmulatedEndpointImpl* endpoint_ RTC_GUARDED_BY(&thread_); + rtc::SocketAddress local_addr_ RTC_GUARDED_BY(&thread_); + rtc::SocketAddress remote_addr_ RTC_GUARDED_BY(&thread_); + ConnState state_ RTC_GUARDED_BY(&thread_); + int error_ RTC_GUARDED_BY(&thread_); + std::map options_map_ RTC_GUARDED_BY(&thread_); - rtc::SocketAddress local_addr_; - rtc::SocketAddress remote_addr_; - ConnState state_; - int error_; - std::map options_map_; - - rtc::CriticalSection lock_; - // Count of packets in the queue for which we didn't fire read event. - // |pending_read_events_count_| can be different from |packet_queue_.size()| - // because read events will be fired by one thread and packets in the queue - // can be processed by another thread. - int pending_read_events_count_; - std::deque packet_queue_ RTC_GUARDED_BY(lock_); + absl::optional pending_ RTC_GUARDED_BY(thread_); + rtc::AsyncInvoker invoker_; }; -FakeNetworkSocket::FakeNetworkSocket(FakeNetworkSocketServer* socket_server) +FakeNetworkSocket::FakeNetworkSocket(FakeNetworkSocketServer* socket_server, + rtc::Thread* thread) : socket_server_(socket_server), + thread_(thread), state_(CS_CLOSED), - error_(0), - pending_read_events_count_(0) {} + error_(0) {} + FakeNetworkSocket::~FakeNetworkSocket() { Close(); socket_server_->Unregister(this); } void FakeNetworkSocket::OnPacketReceived(EmulatedIpPacket packet) { - { - rtc::CritScope crit(&lock_); - packet_queue_.push_back(std::move(packet)); - pending_read_events_count_++; - } + auto task = [this, packet = std::move(packet)]() mutable { + RTC_DCHECK_RUN_ON(thread_); + if (!endpoint_->Enabled()) + return; + RTC_DCHECK(!pending_); + pending_ = std::move(packet); + // Note that we expect that this will trigger exactly one call to RecvFrom() + // where pending_packet will be read and reset. This call is done without + // any thread switch (see AsyncUDPSocket::OnReadEvent) so it's safe to + // assume that SignalReadEvent() will block until the packet has been read. + SignalReadEvent(this); + RTC_DCHECK(!pending_); + }; + invoker_.AsyncInvoke(RTC_FROM_HERE, thread_, std::move(task)); socket_server_->WakeUp(); } -bool FakeNetworkSocket::ProcessIo() { - { - rtc::CritScope crit(&lock_); - if (pending_read_events_count_ == 0) { - return false; - } - pending_read_events_count_--; - RTC_DCHECK_GE(pending_read_events_count_, 0); - } - if (!endpoint_->Enabled()) { - // If endpoint disabled then just pop and discard packet. - PopFrontPacket(); - return true; - } - SignalReadEvent(this); - return true; -} rtc::SocketAddress FakeNetworkSocket::GetLocalAddress() const { + RTC_DCHECK_RUN_ON(thread_); return local_addr_; } rtc::SocketAddress FakeNetworkSocket::GetRemoteAddress() const { + RTC_DCHECK_RUN_ON(thread_); return remote_addr_; } int FakeNetworkSocket::Bind(const rtc::SocketAddress& addr) { + RTC_DCHECK_RUN_ON(thread_); RTC_CHECK(local_addr_.IsNil()) << "Socket already bound to address: " << ToString(local_addr_); local_addr_ = addr; @@ -153,6 +143,7 @@ int FakeNetworkSocket::Bind(const rtc::SocketAddress& addr) { } int FakeNetworkSocket::Connect(const rtc::SocketAddress& addr) { + RTC_DCHECK_RUN_ON(thread_); RTC_CHECK(remote_addr_.IsNil()) << "Socket already connected to address: " << ToString(remote_addr_); RTC_CHECK(!local_addr_.IsNil()) @@ -163,6 +154,7 @@ int FakeNetworkSocket::Connect(const rtc::SocketAddress& addr) { } int FakeNetworkSocket::Send(const void* pv, size_t cb) { + RTC_DCHECK_RUN_ON(thread_); RTC_CHECK(state_ == CS_CONNECTED) << "Socket cannot send: not connected"; return SendTo(pv, cb, remote_addr_); } @@ -170,6 +162,7 @@ int FakeNetworkSocket::Send(const void* pv, size_t cb) { int FakeNetworkSocket::SendTo(const void* pv, size_t cb, const rtc::SocketAddress& addr) { + RTC_DCHECK_RUN_ON(thread_); RTC_CHECK(!local_addr_.IsNil()) << "Socket have to be bind to some local address"; if (!endpoint_->Enabled()) { @@ -192,34 +185,32 @@ int FakeNetworkSocket::RecvFrom(void* pv, size_t cb, rtc::SocketAddress* paddr, int64_t* timestamp) { + RTC_DCHECK_RUN_ON(thread_); + if (timestamp) { *timestamp = -1; } - absl::optional packetOpt = PopFrontPacket(); + RTC_CHECK(pending_); - if (!packetOpt) { - error_ = EAGAIN; - return -1; - } - - EmulatedIpPacket packet = std::move(packetOpt.value()); - *paddr = packet.from; - size_t data_read = std::min(cb, packet.size()); - memcpy(pv, packet.cdata(), data_read); - *timestamp = packet.arrival_time.us(); + *paddr = pending_->from; + size_t data_read = std::min(cb, pending_->size()); + memcpy(pv, pending_->cdata(), data_read); + *timestamp = pending_->arrival_time.us(); // According to RECV(2) Linux Man page // real socket will discard data, that won't fit into provided buffer, // but we won't to skip such error, so we will assert here. - RTC_CHECK(data_read == packet.size()) + RTC_CHECK(data_read == pending_->size()) << "Too small buffer is provided for socket read. " - << "Received data size: " << packet.size() + << "Received data size: " << pending_->size() << "; Provided buffer size: " << cb; + pending_.reset(); + // According to RECV(2) Linux Man page // real socket will return message length, not data read. In our case it is // actually the same value. - return static_cast(packet.size()); + return static_cast(data_read); } int FakeNetworkSocket::Listen(int backlog) { @@ -231,6 +222,7 @@ rtc::AsyncSocket* FakeNetworkSocket::Accept(rtc::SocketAddress* /*paddr*/) { } int FakeNetworkSocket::Close() { + RTC_DCHECK_RUN_ON(thread_); state_ = CS_CLOSED; if (!local_addr_.IsNil()) { endpoint_->UnbindReceiver(local_addr_.port()); @@ -241,19 +233,23 @@ int FakeNetworkSocket::Close() { } int FakeNetworkSocket::GetError() const { + RTC_DCHECK_RUN_ON(thread_); return error_; } void FakeNetworkSocket::SetError(int error) { + RTC_DCHECK_RUN_ON(thread_); RTC_CHECK(error == 0); error_ = error; } rtc::AsyncSocket::ConnState FakeNetworkSocket::GetState() const { + RTC_DCHECK_RUN_ON(thread_); return state_; } int FakeNetworkSocket::GetOption(Option opt, int* value) { + RTC_DCHECK_RUN_ON(thread_); auto it = options_map_.find(opt); if (it == options_map_.end()) { return -1; @@ -263,32 +259,19 @@ int FakeNetworkSocket::GetOption(Option opt, int* value) { } int FakeNetworkSocket::SetOption(Option opt, int value) { + RTC_DCHECK_RUN_ON(thread_); options_map_[opt] = value; return 0; } -absl::optional FakeNetworkSocket::PopFrontPacket() { - rtc::CritScope crit(&lock_); - if (packet_queue_.empty()) { - return absl::nullopt; - } - - absl::optional packet = - absl::make_optional(std::move(packet_queue_.front())); - packet_queue_.pop_front(); - return packet; -} - FakeNetworkSocketServer::FakeNetworkSocketServer( - Clock* clock, EndpointsContainer* endpoints_container) - : clock_(clock), - endpoints_container_(endpoints_container), + : endpoints_container_(endpoints_container), wakeup_(/*manual_reset=*/false, /*initially_signaled=*/false) {} FakeNetworkSocketServer::~FakeNetworkSocketServer() = default; void FakeNetworkSocketServer::OnMessageQueueDestroyed() { - msg_queue_ = nullptr; + thread_ = nullptr; } EmulatedEndpointImpl* FakeNetworkSocketServer::GetEndpointNode( @@ -311,7 +294,8 @@ rtc::AsyncSocket* FakeNetworkSocketServer::CreateAsyncSocket(int family, RTC_DCHECK(family == AF_INET || family == AF_INET6); // We support only UDP sockets for now. RTC_DCHECK(type == SOCK_DGRAM) << "Only UDP sockets are supported"; - FakeNetworkSocket* out = new FakeNetworkSocket(this); + RTC_DCHECK(thread_) << "must be attached to thread before creating sockets"; + FakeNetworkSocket* out = new FakeNetworkSocket(this, thread_); { rtc::CritScope crit(&lock_); sockets_.push_back(out); @@ -319,28 +303,19 @@ rtc::AsyncSocket* FakeNetworkSocketServer::CreateAsyncSocket(int family, return out; } -void FakeNetworkSocketServer::SetMessageQueue(rtc::Thread* msg_queue) { - msg_queue_ = msg_queue; - if (msg_queue_) { - msg_queue_->SignalQueueDestroyed.connect( +void FakeNetworkSocketServer::SetMessageQueue(rtc::Thread* thread) { + thread_ = thread; + if (thread_) { + thread_->SignalQueueDestroyed.connect( this, &FakeNetworkSocketServer::OnMessageQueueDestroyed); } } // Always returns true (if return false, it won't be invoked again...) bool FakeNetworkSocketServer::Wait(int cms, bool process_io) { - RTC_DCHECK(msg_queue_ == rtc::Thread::Current()); - if (!process_io) { + RTC_DCHECK(thread_ == rtc::Thread::Current()); + if (cms != 0) wakeup_.Wait(cms); - return true; - } - wakeup_.Wait(cms); - - rtc::CritScope crit(&lock_); - for (auto* socket : sockets_) { - while (socket->ProcessIo()) { - } - } return true; } @@ -348,9 +323,6 @@ void FakeNetworkSocketServer::WakeUp() { wakeup_.Set(); } -Timestamp FakeNetworkSocketServer::Now() const { - return clock_->CurrentTime(); -} } // namespace test } // namespace webrtc diff --git a/test/network/fake_network_socket_server.h b/test/network/fake_network_socket_server.h index da25de3b76..3a007588e3 100644 --- a/test/network/fake_network_socket_server.h +++ b/test/network/fake_network_socket_server.h @@ -31,13 +31,9 @@ class FakeNetworkSocket; class FakeNetworkSocketServer : public rtc::SocketServer, public sigslot::has_slots<> { public: - FakeNetworkSocketServer(Clock* clock, - EndpointsContainer* endpoints_controller); + explicit FakeNetworkSocketServer(EndpointsContainer* endpoints_controller); ~FakeNetworkSocketServer() override; - EmulatedEndpointImpl* GetEndpointNode(const rtc::IPAddress& ip); - void Unregister(FakeNetworkSocket* socket); - void OnMessageQueueDestroyed(); // rtc::SocketFactory methods: rtc::Socket* CreateSocket(int family, int type) override; @@ -46,17 +42,21 @@ class FakeNetworkSocketServer : public rtc::SocketServer, // rtc::SocketServer methods: // Called by the network thread when this server is installed, kicking off the // message handler loop. - void SetMessageQueue(rtc::Thread* msg_queue) override; + void SetMessageQueue(rtc::Thread* thread) override; bool Wait(int cms, bool process_io) override; void WakeUp() override; - private: - Timestamp Now() const; + protected: + friend class FakeNetworkSocket; + EmulatedEndpointImpl* GetEndpointNode(const rtc::IPAddress& ip); + void Unregister(FakeNetworkSocket* socket); + + private: + void OnMessageQueueDestroyed(); - Clock* const clock_; const EndpointsContainer* endpoints_container_; rtc::Event wakeup_; - rtc::Thread* msg_queue_; + rtc::Thread* thread_ = nullptr; rtc::CriticalSection lock_; std::vector sockets_ RTC_GUARDED_BY(lock_); diff --git a/test/network/network_emulation_unittest.cc b/test/network/network_emulation_unittest.cc index 30d02be453..9a8a9823e2 100644 --- a/test/network/network_emulation_unittest.cc +++ b/test/network/network_emulation_unittest.cc @@ -44,13 +44,12 @@ class SocketReader : public sigslot::has_slots<> { void OnReadEvent(rtc::AsyncSocket* socket) { RTC_DCHECK(socket_ == socket); - network_thread_->PostTask(RTC_FROM_HERE, [this]() { - int64_t timestamp; - len_ = socket_->Recv(buf_, size_, ×tamp); + RTC_DCHECK(network_thread_->IsCurrent()); + int64_t timestamp; + len_ = socket_->Recv(buf_, size_, ×tamp); - rtc::CritScope crit(&lock_); - received_count_++; - }); + rtc::CritScope crit(&lock_); + received_count_++; } int ReceivedCount() { @@ -201,30 +200,37 @@ TEST(NetworkEmulationManagerTest, Run) { EmulatedNetworkManagerInterface* nt2 = network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint}); + rtc::Thread* t1 = nt1->network_thread(); + rtc::Thread* t2 = nt2->network_thread(); + rtc::CopyOnWriteBuffer data("Hello"); for (uint64_t j = 0; j < 2; j++) { - auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket( - AF_INET, SOCK_DGRAM); - auto* s2 = nt2->network_thread()->socketserver()->CreateAsyncSocket( - AF_INET, SOCK_DGRAM); + auto* s1 = t1->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM); + auto* s2 = t2->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM); - SocketReader r1(s1, nt1->network_thread()); - SocketReader r2(s2, nt2->network_thread()); + SocketReader r1(s1, t1); + SocketReader r2(s2, t2); rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0); rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0); - s1->Bind(a1); - s2->Bind(a2); + t1->Invoke(RTC_FROM_HERE, [&] { + s1->Bind(a1); + a1 = s1->GetLocalAddress(); + }); + t2->Invoke(RTC_FROM_HERE, [&] { + s2->Bind(a2); + a2 = s2->GetLocalAddress(); + }); - s1->Connect(s2->GetLocalAddress()); - s2->Connect(s1->GetLocalAddress()); + t1->Invoke(RTC_FROM_HERE, [&] { s1->Connect(a2); }); + t2->Invoke(RTC_FROM_HERE, [&] { s2->Connect(a1); }); for (uint64_t i = 0; i < 1000; i++) { - nt1->network_thread()->PostTask( - RTC_FROM_HERE, [&]() { s1->Send(data.data(), data.size()); }); - nt2->network_thread()->PostTask( - RTC_FROM_HERE, [&]() { s2->Send(data.data(), data.size()); }); + t1->PostTask(RTC_FROM_HERE, + [&]() { s1->Send(data.data(), data.size()); }); + t2->PostTask(RTC_FROM_HERE, + [&]() { s2->Send(data.data(), data.size()); }); } rtc::Event wait; @@ -232,8 +238,8 @@ TEST(NetworkEmulationManagerTest, Run) { EXPECT_EQ(r1.ReceivedCount(), 1000); EXPECT_EQ(r2.ReceivedCount(), 1000); - delete s1; - delete s2; + t1->Invoke(RTC_FROM_HERE, [&] { delete s1; }); + t2->Invoke(RTC_FROM_HERE, [&] { delete s2; }); } const int64_t single_packet_size = data.size() + kOverheadIpv4Udp; @@ -278,35 +284,40 @@ TEST(NetworkEmulationManagerTest, ThroughputStats) { EmulatedNetworkManagerInterface* nt2 = network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint}); + rtc::Thread* t1 = nt1->network_thread(); + rtc::Thread* t2 = nt2->network_thread(); + constexpr int64_t kUdpPayloadSize = 100; constexpr int64_t kSinglePacketSize = kUdpPayloadSize + kOverheadIpv4Udp; rtc::CopyOnWriteBuffer data(kUdpPayloadSize); - auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket( - AF_INET, SOCK_DGRAM); - auto* s2 = nt2->network_thread()->socketserver()->CreateAsyncSocket( - AF_INET, SOCK_DGRAM); + auto* s1 = t1->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM); + auto* s2 = t2->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM); - SocketReader r1(s1, nt1->network_thread()); - SocketReader r2(s2, nt2->network_thread()); + SocketReader r1(s1, t1); + SocketReader r2(s2, t2); rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0); rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0); - s1->Bind(a1); - s2->Bind(a2); + t1->Invoke(RTC_FROM_HERE, [&] { + s1->Bind(a1); + a1 = s1->GetLocalAddress(); + }); + t2->Invoke(RTC_FROM_HERE, [&] { + s2->Bind(a2); + a2 = s2->GetLocalAddress(); + }); - s1->Connect(s2->GetLocalAddress()); - s2->Connect(s1->GetLocalAddress()); + t1->Invoke(RTC_FROM_HERE, [&] { s1->Connect(a2); }); + t2->Invoke(RTC_FROM_HERE, [&] { s2->Connect(a1); }); // Send 11 packets, totalizing 1 second between the first and the last. const int kNumPacketsSent = 11; const int kDelayMs = 100; rtc::Event wait; for (int i = 0; i < kNumPacketsSent; i++) { - nt1->network_thread()->PostTask( - RTC_FROM_HERE, [&]() { s1->Send(data.data(), data.size()); }); - nt2->network_thread()->PostTask( - RTC_FROM_HERE, [&]() { s2->Send(data.data(), data.size()); }); + t1->PostTask(RTC_FROM_HERE, [&]() { s1->Send(data.data(), data.size()); }); + t2->PostTask(RTC_FROM_HERE, [&]() { s2->Send(data.data(), data.size()); }); wait.Wait(kDelayMs); } @@ -325,8 +336,8 @@ TEST(NetworkEmulationManagerTest, ThroughputStats) { EXPECT_EQ(r1.ReceivedCount(), 11); EXPECT_EQ(r2.ReceivedCount(), 11); - delete s1; - delete s2; + t1->Invoke(RTC_FROM_HERE, [&] { delete s1; }); + t2->Invoke(RTC_FROM_HERE, [&] { delete s2; }); } // Testing that packets are delivered via all routes using a routing scheme as