diff --git a/api/BUILD.gn b/api/BUILD.gn index 31dd8983ce..45d3bb5980 100644 --- a/api/BUILD.gn +++ b/api/BUILD.gn @@ -434,6 +434,9 @@ rtc_source_set("network_emulation_manager_api") { deps = [ ":simulated_network_api", "../rtc_base", + "units:data_rate", + "units:data_size", + "units:timestamp", ] } diff --git a/api/test/network_emulation_manager.h b/api/test/network_emulation_manager.h index b409bce6d5..404a8c07cb 100644 --- a/api/test/network_emulation_manager.h +++ b/api/test/network_emulation_manager.h @@ -15,6 +15,9 @@ #include #include "api/test/simulated_network.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/timestamp.h" #include "rtc_base/network.h" #include "rtc_base/thread.h" @@ -51,14 +54,51 @@ struct EmulatedEndpointConfig { bool start_as_enabled = true; }; +struct EmulatedNetworkStats { + int64_t packets_sent = 0; + DataSize bytes_sent = DataSize::Zero(); + // Total amount of packets received with or without destination. + int64_t packets_received = 0; + // Total amount of bytes in received packets. + DataSize bytes_received = DataSize::Zero(); + // Total amount of packets that were received, but no destination was found. + int64_t packets_dropped = 0; + // Total amount of bytes in dropped packets. + DataSize bytes_dropped = DataSize::Zero(); + + DataSize first_received_packet_size = DataSize::Zero(); + DataSize first_sent_packet_size = DataSize::Zero(); + + Timestamp first_packet_sent_time = Timestamp::PlusInfinity(); + Timestamp last_packet_sent_time = Timestamp::PlusInfinity(); + Timestamp first_packet_received_time = Timestamp::PlusInfinity(); + Timestamp last_packet_received_time = Timestamp::PlusInfinity(); + + DataRate AverageSendRate() const { + RTC_DCHECK_GE(packets_sent, 2); + return (bytes_sent - first_sent_packet_size) / + (last_packet_sent_time - first_packet_sent_time); + } + DataRate AverageReceiveRate() const { + RTC_DCHECK_GE(packets_received, 2); + return (bytes_received - first_received_packet_size) / + (last_packet_received_time - first_packet_received_time); + } +}; + // Provide interface to obtain all required objects to inject network emulation -// layer into PeerConnection. +// layer into PeerConnection. Also contains information about network interfaces +// accessible by PeerConnection. class EmulatedNetworkManagerInterface { public: virtual ~EmulatedNetworkManagerInterface() = default; virtual rtc::Thread* network_thread() = 0; virtual rtc::NetworkManager* network_manager() = 0; + + // Returns summarized network stats for endpoints for this manager. + virtual void GetStats( + std::function stats_callback) const = 0; }; // Provides an API for creating and configuring emulated network layer. diff --git a/test/scenario/network/emulated_network_manager.cc b/test/scenario/network/emulated_network_manager.cc index b6f349b82a..5ee9c1654f 100644 --- a/test/scenario/network/emulated_network_manager.cc +++ b/test/scenario/network/emulated_network_manager.cc @@ -20,8 +20,10 @@ namespace test { EmulatedNetworkManager::EmulatedNetworkManager( Clock* clock, + TaskQueueForTest* task_queue, EndpointsContainer* endpoints_container) - : endpoints_container_(endpoints_container), + : task_queue_(task_queue), + endpoints_container_(endpoints_container), socket_server_(clock, endpoints_container), network_thread_(&socket_server_), sent_first_update_(false), @@ -77,6 +79,13 @@ void EmulatedNetworkManager::StopUpdating() { } } +void EmulatedNetworkManager::GetStats( + std::function stats_callback) const { + task_queue_->PostTask([stats_callback, this]() { + stats_callback(endpoints_container_->GetStats()); + }); +} + void EmulatedNetworkManager::UpdateNetworksOnce() { RTC_DCHECK_RUN_ON(&network_thread_); diff --git a/test/scenario/network/emulated_network_manager.h b/test/scenario/network/emulated_network_manager.h index 7fb831bf08..7f941a46ab 100644 --- a/test/scenario/network/emulated_network_manager.h +++ b/test/scenario/network/emulated_network_manager.h @@ -32,7 +32,9 @@ class EmulatedNetworkManager : public rtc::NetworkManagerBase, public sigslot::has_slots<>, public EmulatedNetworkManagerInterface { public: - EmulatedNetworkManager(Clock* clock, EndpointsContainer* endpoints_container); + EmulatedNetworkManager(Clock* clock, + TaskQueueForTest* task_queue, + EndpointsContainer* endpoints_container); void EnableEndpoint(EmulatedEndpoint* endpoint); void DisableEndpoint(EmulatedEndpoint* endpoint); @@ -45,11 +47,14 @@ class EmulatedNetworkManager : public rtc::NetworkManagerBase, // EmulatedNetworkManagerInterface API rtc::Thread* network_thread() override { return &network_thread_; } rtc::NetworkManager* network_manager() override { return this; } + void GetStats( + std::function stats_callback) const override; private: void UpdateNetworksOnce(); void MaybeSignalNetworksChanged(); + TaskQueueForTest* const task_queue_; EndpointsContainer* const endpoints_container_; FakeNetworkSocketServer socket_server_; rtc::Thread network_thread_; diff --git a/test/scenario/network/network_emulation.cc b/test/scenario/network/network_emulation.cc index 323e79cc5c..194cf9853a 100644 --- a/test/scenario/network/network_emulation.cc +++ b/test/scenario/network/network_emulation.cc @@ -14,6 +14,7 @@ #include #include "absl/memory/memory.h" +#include "api/units/data_size.h" #include "rtc_base/bind.h" #include "rtc_base/logging.h" @@ -198,7 +199,10 @@ void EmulatedEndpoint::SendPacket(const rtc::SocketAddress& from, rtc::CopyOnWriteBuffer packet) { RTC_CHECK(from.ipaddr() == peer_local_addr_); struct Closure { - void operator()() { endpoint->router_.OnPacketReceived(std::move(packet)); } + void operator()() { + endpoint->UpdateSendStats(packet); + endpoint->router_.OnPacketReceived(std::move(packet)); + } EmulatedEndpoint* endpoint; EmulatedIpPacket packet; }; @@ -258,18 +262,22 @@ rtc::IPAddress EmulatedEndpoint::GetPeerLocalAddress() const { } void EmulatedEndpoint::OnPacketReceived(EmulatedIpPacket packet) { + RTC_DCHECK_RUN_ON(task_queue_); RTC_CHECK(packet.to.ipaddr() == peer_local_addr_) << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: " << packet.to.ipaddr().ToString() << "; Receiver peer_local_addr_=" << peer_local_addr_.ToString(); rtc::CritScope crit(&receiver_lock_); + UpdateReceiveStats(packet); auto it = port_to_receiver_.find(packet.to.port()); if (it == port_to_receiver_.end()) { // It can happen, that remote peer closed connection, but there still some // packets, that are going to it. It can happen during peer connection close // process: one peer closed connection, second still sending data. - RTC_LOG(INFO) << "No receiver registered in " << id_ << " on port " - << packet.to.port(); + RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_ + << " on port " << packet.to.port(); + stats_.packets_dropped++; + stats_.bytes_dropped += DataSize::bytes(packet.size()); return; } // Endpoint assumes frequent calls to bind and unbind methods, so it holds @@ -295,6 +303,35 @@ bool EmulatedEndpoint::Enabled() const { return is_enabled_; } +EmulatedNetworkStats EmulatedEndpoint::stats() { + RTC_DCHECK_RUN_ON(task_queue_); + return stats_; +} + +void EmulatedEndpoint::UpdateSendStats(const EmulatedIpPacket& packet) { + RTC_DCHECK_RUN_ON(task_queue_); + Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds()); + if (stats_.first_packet_sent_time.IsInfinite()) { + stats_.first_packet_sent_time = current_time; + stats_.first_sent_packet_size = DataSize::bytes(packet.size()); + } + stats_.last_packet_sent_time = current_time; + stats_.packets_sent++; + stats_.bytes_sent += DataSize::bytes(packet.size()); +} + +void EmulatedEndpoint::UpdateReceiveStats(const EmulatedIpPacket& packet) { + RTC_DCHECK_RUN_ON(task_queue_); + Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds()); + if (stats_.first_packet_received_time.IsInfinite()) { + stats_.first_packet_received_time = current_time; + stats_.first_received_packet_size = DataSize::bytes(packet.size()); + } + stats_.last_packet_received_time = current_time; + stats_.packets_received++; + stats_.bytes_received += DataSize::bytes(packet.size()); +} + EndpointsContainer::EndpointsContainer( const std::vector& endpoints) : endpoints_(endpoints) {} @@ -331,4 +368,39 @@ EndpointsContainer::GetEnabledNetworks() const { return networks; } +EmulatedNetworkStats EndpointsContainer::GetStats() const { + EmulatedNetworkStats stats; + for (auto* endpoint : endpoints_) { + EmulatedNetworkStats endpoint_stats = endpoint->stats(); + stats.packets_sent += endpoint_stats.packets_sent; + stats.bytes_sent += endpoint_stats.bytes_sent; + stats.packets_received += endpoint_stats.packets_received; + stats.bytes_received += endpoint_stats.bytes_received; + stats.packets_dropped += endpoint_stats.packets_dropped; + stats.bytes_dropped += endpoint_stats.bytes_dropped; + if (stats.first_packet_received_time > + endpoint_stats.first_packet_received_time) { + stats.first_packet_received_time = + endpoint_stats.first_packet_received_time; + stats.first_received_packet_size = + endpoint_stats.first_received_packet_size; + } + if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) { + stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time; + stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size; + } + if (stats.last_packet_received_time.IsInfinite() || + stats.last_packet_received_time < + endpoint_stats.last_packet_received_time) { + stats.last_packet_received_time = + endpoint_stats.last_packet_received_time; + } + if (stats.last_packet_sent_time.IsInfinite() || + stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) { + stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time; + } + } + return stats; +} + } // namespace webrtc diff --git a/test/scenario/network/network_emulation.h b/test/scenario/network/network_emulation.h index 84fd7ae6e3..b3fe5ebf43 100644 --- a/test/scenario/network/network_emulation.h +++ b/test/scenario/network/network_emulation.h @@ -19,6 +19,7 @@ #include #include "absl/types/optional.h" +#include "api/test/network_emulation_manager.h" #include "api/test/simulated_network.h" #include "api/units/timestamp.h" #include "rtc_base/copy_on_write_buffer.h" @@ -193,9 +194,13 @@ class EmulatedEndpoint : public EmulatedNetworkReceiverInterface { const rtc::Network& network() const { return *network_.get(); } + EmulatedNetworkStats stats(); + private: static constexpr uint16_t kFirstEphemeralPort = 49152; uint16_t NextPort() RTC_EXCLUSIVE_LOCKS_REQUIRED(receiver_lock_); + void UpdateSendStats(const EmulatedIpPacket& packet); + void UpdateReceiveStats(const EmulatedIpPacket& packet); rtc::CriticalSection receiver_lock_; rtc::ThreadChecker enabled_state_checker_; @@ -212,6 +217,8 @@ class EmulatedEndpoint : public EmulatedNetworkReceiverInterface { uint16_t next_port_ RTC_GUARDED_BY(receiver_lock_); std::map port_to_receiver_ RTC_GUARDED_BY(receiver_lock_); + + EmulatedNetworkStats stats_ RTC_GUARDED_BY(task_queue_); }; class EmulatedRoute { @@ -236,6 +243,7 @@ class EndpointsContainer { // Returns list of networks for enabled endpoints. Caller takes ownership of // returned rtc::Network objects. std::vector> GetEnabledNetworks() const; + EmulatedNetworkStats GetStats() const; private: const std::vector endpoints_; diff --git a/test/scenario/network/network_emulation_manager.cc b/test/scenario/network/network_emulation_manager.cc index 1efc5b4682..d04805f4bb 100644 --- a/test/scenario/network/network_emulation_manager.cc +++ b/test/scenario/network/network_emulation_manager.cc @@ -200,7 +200,7 @@ NetworkEmulationManagerImpl::CreateEmulatedNetworkManagerInterface( const std::vector& endpoints) { auto endpoints_container = absl::make_unique(endpoints); auto network_manager = absl::make_unique( - clock_, endpoints_container.get()); + clock_, &task_queue_, endpoints_container.get()); for (auto* endpoint : endpoints) { // Associate endpoint with network manager. bool insertion_result = diff --git a/test/scenario/network/network_emulation_unittest.cc b/test/scenario/network/network_emulation_unittest.cc index 2ed4e6b9f1..6abd40dd2f 100644 --- a/test/scenario/network/network_emulation_unittest.cc +++ b/test/scenario/network/network_emulation_unittest.cc @@ -28,6 +28,7 @@ namespace test { namespace { constexpr int kNetworkPacketWaitTimeoutMs = 100; +constexpr int kStatsWaitTimeoutMs = 1000; class SocketReader : public sigslot::has_slots<> { public: @@ -195,6 +196,7 @@ TEST(NetworkEmulationManagerTest, Run) { EmulatedNetworkManagerInterface* nt2 = network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint}); + rtc::CopyOnWriteBuffer data("Hello"); for (uint64_t j = 0; j < 2; j++) { auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket( AF_INET, SOCK_DGRAM); @@ -213,7 +215,6 @@ TEST(NetworkEmulationManagerTest, Run) { s1->Connect(s2->GetLocalAddress()); s2->Connect(s1->GetLocalAddress()); - rtc::CopyOnWriteBuffer data("Hello"); for (uint64_t i = 0; i < 1000; i++) { s1->Send(data.data(), data.size()); s2->Send(data.data(), data.size()); @@ -221,12 +222,96 @@ TEST(NetworkEmulationManagerTest, Run) { rtc::Event wait; wait.Wait(1000); - ASSERT_EQ(r1.ReceivedCount(), 1000); - ASSERT_EQ(r2.ReceivedCount(), 1000); + EXPECT_EQ(r1.ReceivedCount(), 1000); + EXPECT_EQ(r2.ReceivedCount(), 1000); delete s1; delete s2; } + + int64_t single_packet_size = data.size(); + std::atomic received_stats_count{0}; + nt1->GetStats([&](EmulatedNetworkStats st) { + EXPECT_EQ(st.packets_sent, 2000l); + EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l); + EXPECT_EQ(st.packets_received, 2000l); + EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l); + EXPECT_EQ(st.packets_dropped, 0l); + EXPECT_EQ(st.bytes_dropped.bytes(), 0l); + received_stats_count++; + }); + nt2->GetStats([&](EmulatedNetworkStats st) { + EXPECT_EQ(st.packets_sent, 2000l); + EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l); + EXPECT_EQ(st.packets_received, 2000l); + EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l); + EXPECT_EQ(st.packets_dropped, 0l); + EXPECT_EQ(st.bytes_dropped.bytes(), 0l); + received_stats_count++; + }); + ASSERT_EQ_WAIT(received_stats_count.load(), 2, kStatsWaitTimeoutMs); +} + +TEST(NetworkEmulationManagerTest, ThoughputStats) { + NetworkEmulationManagerImpl network_manager; + + EmulatedNetworkNode* alice_node = network_manager.CreateEmulatedNode( + absl::make_unique(BuiltInNetworkBehaviorConfig())); + EmulatedNetworkNode* bob_node = network_manager.CreateEmulatedNode( + absl::make_unique(BuiltInNetworkBehaviorConfig())); + EmulatedEndpoint* alice_endpoint = + network_manager.CreateEndpoint(EmulatedEndpointConfig()); + EmulatedEndpoint* bob_endpoint = + network_manager.CreateEndpoint(EmulatedEndpointConfig()); + network_manager.CreateRoute(alice_endpoint, {alice_node}, bob_endpoint); + network_manager.CreateRoute(bob_endpoint, {bob_node}, alice_endpoint); + + EmulatedNetworkManagerInterface* nt1 = + network_manager.CreateEmulatedNetworkManagerInterface({alice_endpoint}); + EmulatedNetworkManagerInterface* nt2 = + network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint}); + + int64_t single_packet_size = 100; + rtc::CopyOnWriteBuffer data(single_packet_size); + auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket( + AF_INET, SOCK_DGRAM); + auto* s2 = nt2->network_thread()->socketserver()->CreateAsyncSocket( + AF_INET, SOCK_DGRAM); + + SocketReader r1(s1); + SocketReader r2(s2); + + rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0); + rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0); + + s1->Bind(a1); + s2->Bind(a2); + + s1->Connect(s2->GetLocalAddress()); + s2->Connect(s1->GetLocalAddress()); + + // Send 10 packets for 1 + rtc::Event wait; + for (uint64_t i = 0; i < 11; i++) { + s1->Send(data.data(), data.size()); + s2->Send(data.data(), data.size()); + wait.Wait(100); + } + EXPECT_EQ(r1.ReceivedCount(), 11); + EXPECT_EQ(r2.ReceivedCount(), 11); + + delete s1; + delete s2; + + std::atomic received_stats_count{0}; + nt1->GetStats([&](EmulatedNetworkStats st) { + EXPECT_EQ(st.packets_sent, 11l); + EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 11l); + EXPECT_NEAR(st.AverageSendRate().bps(), DataRate::bytes_per_sec(1000).bps(), + 1000); + received_stats_count++; + }); + ASSERT_EQ_WAIT(received_stats_count.load(), 1, kStatsWaitTimeoutMs); } // Testing that packets are delivered via all routes using a routing scheme as