Introduce network emulation layer stats API.

Bug: webrtc:10138
Change-Id: I32133cd14c7a1933dcbeaa37d4c9ce6748274ebe
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/131383
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27588}
This commit is contained in:
Artem Titov 2019-04-12 12:17:19 +02:00 committed by Commit Bot
parent 80bea1eeaa
commit 806299e09b
8 changed files with 232 additions and 10 deletions

View File

@ -434,6 +434,9 @@ rtc_source_set("network_emulation_manager_api") {
deps = [
":simulated_network_api",
"../rtc_base",
"units:data_rate",
"units:data_size",
"units:timestamp",
]
}

View File

@ -15,6 +15,9 @@
#include <vector>
#include "api/test/simulated_network.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/timestamp.h"
#include "rtc_base/network.h"
#include "rtc_base/thread.h"
@ -51,14 +54,51 @@ struct EmulatedEndpointConfig {
bool start_as_enabled = true;
};
struct EmulatedNetworkStats {
int64_t packets_sent = 0;
DataSize bytes_sent = DataSize::Zero();
// Total amount of packets received with or without destination.
int64_t packets_received = 0;
// Total amount of bytes in received packets.
DataSize bytes_received = DataSize::Zero();
// Total amount of packets that were received, but no destination was found.
int64_t packets_dropped = 0;
// Total amount of bytes in dropped packets.
DataSize bytes_dropped = DataSize::Zero();
DataSize first_received_packet_size = DataSize::Zero();
DataSize first_sent_packet_size = DataSize::Zero();
Timestamp first_packet_sent_time = Timestamp::PlusInfinity();
Timestamp last_packet_sent_time = Timestamp::PlusInfinity();
Timestamp first_packet_received_time = Timestamp::PlusInfinity();
Timestamp last_packet_received_time = Timestamp::PlusInfinity();
DataRate AverageSendRate() const {
RTC_DCHECK_GE(packets_sent, 2);
return (bytes_sent - first_sent_packet_size) /
(last_packet_sent_time - first_packet_sent_time);
}
DataRate AverageReceiveRate() const {
RTC_DCHECK_GE(packets_received, 2);
return (bytes_received - first_received_packet_size) /
(last_packet_received_time - first_packet_received_time);
}
};
// Provide interface to obtain all required objects to inject network emulation
// layer into PeerConnection.
// layer into PeerConnection. Also contains information about network interfaces
// accessible by PeerConnection.
class EmulatedNetworkManagerInterface {
public:
virtual ~EmulatedNetworkManagerInterface() = default;
virtual rtc::Thread* network_thread() = 0;
virtual rtc::NetworkManager* network_manager() = 0;
// Returns summarized network stats for endpoints for this manager.
virtual void GetStats(
std::function<void(EmulatedNetworkStats)> stats_callback) const = 0;
};
// Provides an API for creating and configuring emulated network layer.

View File

@ -20,8 +20,10 @@ namespace test {
EmulatedNetworkManager::EmulatedNetworkManager(
Clock* clock,
TaskQueueForTest* task_queue,
EndpointsContainer* endpoints_container)
: endpoints_container_(endpoints_container),
: task_queue_(task_queue),
endpoints_container_(endpoints_container),
socket_server_(clock, endpoints_container),
network_thread_(&socket_server_),
sent_first_update_(false),
@ -77,6 +79,13 @@ void EmulatedNetworkManager::StopUpdating() {
}
}
void EmulatedNetworkManager::GetStats(
std::function<void(EmulatedNetworkStats)> stats_callback) const {
task_queue_->PostTask([stats_callback, this]() {
stats_callback(endpoints_container_->GetStats());
});
}
void EmulatedNetworkManager::UpdateNetworksOnce() {
RTC_DCHECK_RUN_ON(&network_thread_);

View File

@ -32,7 +32,9 @@ class EmulatedNetworkManager : public rtc::NetworkManagerBase,
public sigslot::has_slots<>,
public EmulatedNetworkManagerInterface {
public:
EmulatedNetworkManager(Clock* clock, EndpointsContainer* endpoints_container);
EmulatedNetworkManager(Clock* clock,
TaskQueueForTest* task_queue,
EndpointsContainer* endpoints_container);
void EnableEndpoint(EmulatedEndpoint* endpoint);
void DisableEndpoint(EmulatedEndpoint* endpoint);
@ -45,11 +47,14 @@ class EmulatedNetworkManager : public rtc::NetworkManagerBase,
// EmulatedNetworkManagerInterface API
rtc::Thread* network_thread() override { return &network_thread_; }
rtc::NetworkManager* network_manager() override { return this; }
void GetStats(
std::function<void(EmulatedNetworkStats)> stats_callback) const override;
private:
void UpdateNetworksOnce();
void MaybeSignalNetworksChanged();
TaskQueueForTest* const task_queue_;
EndpointsContainer* const endpoints_container_;
FakeNetworkSocketServer socket_server_;
rtc::Thread network_thread_;

View File

@ -14,6 +14,7 @@
#include <memory>
#include "absl/memory/memory.h"
#include "api/units/data_size.h"
#include "rtc_base/bind.h"
#include "rtc_base/logging.h"
@ -198,7 +199,10 @@ void EmulatedEndpoint::SendPacket(const rtc::SocketAddress& from,
rtc::CopyOnWriteBuffer packet) {
RTC_CHECK(from.ipaddr() == peer_local_addr_);
struct Closure {
void operator()() { endpoint->router_.OnPacketReceived(std::move(packet)); }
void operator()() {
endpoint->UpdateSendStats(packet);
endpoint->router_.OnPacketReceived(std::move(packet));
}
EmulatedEndpoint* endpoint;
EmulatedIpPacket packet;
};
@ -258,18 +262,22 @@ rtc::IPAddress EmulatedEndpoint::GetPeerLocalAddress() const {
}
void EmulatedEndpoint::OnPacketReceived(EmulatedIpPacket packet) {
RTC_DCHECK_RUN_ON(task_queue_);
RTC_CHECK(packet.to.ipaddr() == peer_local_addr_)
<< "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
<< packet.to.ipaddr().ToString()
<< "; Receiver peer_local_addr_=" << peer_local_addr_.ToString();
rtc::CritScope crit(&receiver_lock_);
UpdateReceiveStats(packet);
auto it = port_to_receiver_.find(packet.to.port());
if (it == port_to_receiver_.end()) {
// It can happen, that remote peer closed connection, but there still some
// packets, that are going to it. It can happen during peer connection close
// process: one peer closed connection, second still sending data.
RTC_LOG(INFO) << "No receiver registered in " << id_ << " on port "
<< packet.to.port();
RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
<< " on port " << packet.to.port();
stats_.packets_dropped++;
stats_.bytes_dropped += DataSize::bytes(packet.size());
return;
}
// Endpoint assumes frequent calls to bind and unbind methods, so it holds
@ -295,6 +303,35 @@ bool EmulatedEndpoint::Enabled() const {
return is_enabled_;
}
EmulatedNetworkStats EmulatedEndpoint::stats() {
RTC_DCHECK_RUN_ON(task_queue_);
return stats_;
}
void EmulatedEndpoint::UpdateSendStats(const EmulatedIpPacket& packet) {
RTC_DCHECK_RUN_ON(task_queue_);
Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds());
if (stats_.first_packet_sent_time.IsInfinite()) {
stats_.first_packet_sent_time = current_time;
stats_.first_sent_packet_size = DataSize::bytes(packet.size());
}
stats_.last_packet_sent_time = current_time;
stats_.packets_sent++;
stats_.bytes_sent += DataSize::bytes(packet.size());
}
void EmulatedEndpoint::UpdateReceiveStats(const EmulatedIpPacket& packet) {
RTC_DCHECK_RUN_ON(task_queue_);
Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds());
if (stats_.first_packet_received_time.IsInfinite()) {
stats_.first_packet_received_time = current_time;
stats_.first_received_packet_size = DataSize::bytes(packet.size());
}
stats_.last_packet_received_time = current_time;
stats_.packets_received++;
stats_.bytes_received += DataSize::bytes(packet.size());
}
EndpointsContainer::EndpointsContainer(
const std::vector<EmulatedEndpoint*>& endpoints)
: endpoints_(endpoints) {}
@ -331,4 +368,39 @@ EndpointsContainer::GetEnabledNetworks() const {
return networks;
}
EmulatedNetworkStats EndpointsContainer::GetStats() const {
EmulatedNetworkStats stats;
for (auto* endpoint : endpoints_) {
EmulatedNetworkStats endpoint_stats = endpoint->stats();
stats.packets_sent += endpoint_stats.packets_sent;
stats.bytes_sent += endpoint_stats.bytes_sent;
stats.packets_received += endpoint_stats.packets_received;
stats.bytes_received += endpoint_stats.bytes_received;
stats.packets_dropped += endpoint_stats.packets_dropped;
stats.bytes_dropped += endpoint_stats.bytes_dropped;
if (stats.first_packet_received_time >
endpoint_stats.first_packet_received_time) {
stats.first_packet_received_time =
endpoint_stats.first_packet_received_time;
stats.first_received_packet_size =
endpoint_stats.first_received_packet_size;
}
if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) {
stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time;
stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size;
}
if (stats.last_packet_received_time.IsInfinite() ||
stats.last_packet_received_time <
endpoint_stats.last_packet_received_time) {
stats.last_packet_received_time =
endpoint_stats.last_packet_received_time;
}
if (stats.last_packet_sent_time.IsInfinite() ||
stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time;
}
}
return stats;
}
} // namespace webrtc

View File

@ -19,6 +19,7 @@
#include <vector>
#include "absl/types/optional.h"
#include "api/test/network_emulation_manager.h"
#include "api/test/simulated_network.h"
#include "api/units/timestamp.h"
#include "rtc_base/copy_on_write_buffer.h"
@ -193,9 +194,13 @@ class EmulatedEndpoint : public EmulatedNetworkReceiverInterface {
const rtc::Network& network() const { return *network_.get(); }
EmulatedNetworkStats stats();
private:
static constexpr uint16_t kFirstEphemeralPort = 49152;
uint16_t NextPort() RTC_EXCLUSIVE_LOCKS_REQUIRED(receiver_lock_);
void UpdateSendStats(const EmulatedIpPacket& packet);
void UpdateReceiveStats(const EmulatedIpPacket& packet);
rtc::CriticalSection receiver_lock_;
rtc::ThreadChecker enabled_state_checker_;
@ -212,6 +217,8 @@ class EmulatedEndpoint : public EmulatedNetworkReceiverInterface {
uint16_t next_port_ RTC_GUARDED_BY(receiver_lock_);
std::map<uint16_t, EmulatedNetworkReceiverInterface*> port_to_receiver_
RTC_GUARDED_BY(receiver_lock_);
EmulatedNetworkStats stats_ RTC_GUARDED_BY(task_queue_);
};
class EmulatedRoute {
@ -236,6 +243,7 @@ class EndpointsContainer {
// Returns list of networks for enabled endpoints. Caller takes ownership of
// returned rtc::Network objects.
std::vector<std::unique_ptr<rtc::Network>> GetEnabledNetworks() const;
EmulatedNetworkStats GetStats() const;
private:
const std::vector<EmulatedEndpoint*> endpoints_;

View File

@ -200,7 +200,7 @@ NetworkEmulationManagerImpl::CreateEmulatedNetworkManagerInterface(
const std::vector<EmulatedEndpoint*>& endpoints) {
auto endpoints_container = absl::make_unique<EndpointsContainer>(endpoints);
auto network_manager = absl::make_unique<EmulatedNetworkManager>(
clock_, endpoints_container.get());
clock_, &task_queue_, endpoints_container.get());
for (auto* endpoint : endpoints) {
// Associate endpoint with network manager.
bool insertion_result =

View File

@ -28,6 +28,7 @@ namespace test {
namespace {
constexpr int kNetworkPacketWaitTimeoutMs = 100;
constexpr int kStatsWaitTimeoutMs = 1000;
class SocketReader : public sigslot::has_slots<> {
public:
@ -195,6 +196,7 @@ TEST(NetworkEmulationManagerTest, Run) {
EmulatedNetworkManagerInterface* nt2 =
network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint});
rtc::CopyOnWriteBuffer data("Hello");
for (uint64_t j = 0; j < 2; j++) {
auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket(
AF_INET, SOCK_DGRAM);
@ -213,7 +215,6 @@ TEST(NetworkEmulationManagerTest, Run) {
s1->Connect(s2->GetLocalAddress());
s2->Connect(s1->GetLocalAddress());
rtc::CopyOnWriteBuffer data("Hello");
for (uint64_t i = 0; i < 1000; i++) {
s1->Send(data.data(), data.size());
s2->Send(data.data(), data.size());
@ -221,12 +222,96 @@ TEST(NetworkEmulationManagerTest, Run) {
rtc::Event wait;
wait.Wait(1000);
ASSERT_EQ(r1.ReceivedCount(), 1000);
ASSERT_EQ(r2.ReceivedCount(), 1000);
EXPECT_EQ(r1.ReceivedCount(), 1000);
EXPECT_EQ(r2.ReceivedCount(), 1000);
delete s1;
delete s2;
}
int64_t single_packet_size = data.size();
std::atomic<int> received_stats_count{0};
nt1->GetStats([&](EmulatedNetworkStats st) {
EXPECT_EQ(st.packets_sent, 2000l);
EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l);
EXPECT_EQ(st.packets_received, 2000l);
EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l);
EXPECT_EQ(st.packets_dropped, 0l);
EXPECT_EQ(st.bytes_dropped.bytes(), 0l);
received_stats_count++;
});
nt2->GetStats([&](EmulatedNetworkStats st) {
EXPECT_EQ(st.packets_sent, 2000l);
EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l);
EXPECT_EQ(st.packets_received, 2000l);
EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l);
EXPECT_EQ(st.packets_dropped, 0l);
EXPECT_EQ(st.bytes_dropped.bytes(), 0l);
received_stats_count++;
});
ASSERT_EQ_WAIT(received_stats_count.load(), 2, kStatsWaitTimeoutMs);
}
TEST(NetworkEmulationManagerTest, ThoughputStats) {
NetworkEmulationManagerImpl network_manager;
EmulatedNetworkNode* alice_node = network_manager.CreateEmulatedNode(
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
EmulatedNetworkNode* bob_node = network_manager.CreateEmulatedNode(
absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
EmulatedEndpoint* alice_endpoint =
network_manager.CreateEndpoint(EmulatedEndpointConfig());
EmulatedEndpoint* bob_endpoint =
network_manager.CreateEndpoint(EmulatedEndpointConfig());
network_manager.CreateRoute(alice_endpoint, {alice_node}, bob_endpoint);
network_manager.CreateRoute(bob_endpoint, {bob_node}, alice_endpoint);
EmulatedNetworkManagerInterface* nt1 =
network_manager.CreateEmulatedNetworkManagerInterface({alice_endpoint});
EmulatedNetworkManagerInterface* nt2 =
network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint});
int64_t single_packet_size = 100;
rtc::CopyOnWriteBuffer data(single_packet_size);
auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket(
AF_INET, SOCK_DGRAM);
auto* s2 = nt2->network_thread()->socketserver()->CreateAsyncSocket(
AF_INET, SOCK_DGRAM);
SocketReader r1(s1);
SocketReader r2(s2);
rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0);
rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0);
s1->Bind(a1);
s2->Bind(a2);
s1->Connect(s2->GetLocalAddress());
s2->Connect(s1->GetLocalAddress());
// Send 10 packets for 1
rtc::Event wait;
for (uint64_t i = 0; i < 11; i++) {
s1->Send(data.data(), data.size());
s2->Send(data.data(), data.size());
wait.Wait(100);
}
EXPECT_EQ(r1.ReceivedCount(), 11);
EXPECT_EQ(r2.ReceivedCount(), 11);
delete s1;
delete s2;
std::atomic<int> received_stats_count{0};
nt1->GetStats([&](EmulatedNetworkStats st) {
EXPECT_EQ(st.packets_sent, 11l);
EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 11l);
EXPECT_NEAR(st.AverageSendRate().bps(), DataRate::bytes_per_sec(1000).bps(),
1000);
received_stats_count++;
});
ASSERT_EQ_WAIT(received_stats_count.load(), 1, kStatsWaitTimeoutMs);
}
// Testing that packets are delivered via all routes using a routing scheme as