webrtc_m130/test/network/network_emulation.cc
Artem Titov 14b46a77b2 Provide per destination statistic for network outgoing stats
Network emulation layer provides per source split for incoming stats for
endpoint. Do the same for outgoing stats per destination.

Bug: webrtc:11756
Change-Id: I2369ae8906546c27133273b1be17ce74c253c6e8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/180500
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Reviewed-by: Andrey Logvin <landrey@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31820}
2020-07-31 11:52:13 +00:00

587 lines
20 KiB
C++

/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/network/network_emulation.h"
#include <algorithm>
#include <limits>
#include <memory>
#include "api/units/data_size.h"
#include "rtc_base/bind.h"
#include "rtc_base/logging.h"
namespace webrtc {
DataRate EmulatedNetworkOutgoingStatsImpl::AverageSendRate() const {
RTC_DCHECK_GE(packets_sent_, 2);
RTC_DCHECK(first_packet_sent_time_.IsFinite());
RTC_DCHECK(last_packet_sent_time_.IsFinite());
return (bytes_sent_ - first_sent_packet_size_) /
(last_packet_sent_time_ - first_packet_sent_time_);
}
DataRate EmulatedNetworkIncomingStatsImpl::AverageReceiveRate() const {
RTC_DCHECK_GE(packets_received_, 2);
RTC_DCHECK(first_packet_received_time_.IsFinite());
RTC_DCHECK(last_packet_received_time_.IsFinite());
return (bytes_received_ - first_received_packet_size_) /
(last_packet_received_time_ - first_packet_received_time_);
}
std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkOutgoingStats>>
EmulatedNetworkStatsImpl::OutgoingStatsPerDestination() const {
std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkOutgoingStats>> out;
for (const auto& entry : outgoing_stats_per_destination_) {
out.emplace(entry.first, std::make_unique<EmulatedNetworkOutgoingStatsImpl>(
*entry.second));
}
return out;
}
std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkIncomingStats>>
EmulatedNetworkStatsImpl::IncomingStatsPerSource() const {
std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkIncomingStats>> out;
for (const auto& entry : incoming_stats_per_source_) {
out.emplace(entry.first, std::make_unique<EmulatedNetworkIncomingStatsImpl>(
*entry.second));
}
return out;
}
std::unique_ptr<EmulatedNetworkOutgoingStats>
EmulatedNetworkStatsImpl::GetOverallOutgoingStats() const {
EmulatedNetworkOutgoingStatsBuilder builder;
for (const auto& entry : outgoing_stats_per_destination_) {
builder.AddOutgoingStats(*entry.second);
}
return builder.Build();
}
std::unique_ptr<EmulatedNetworkIncomingStats>
EmulatedNetworkStatsImpl::GetOverallIncomingStats() const {
EmulatedNetworkIncomingStatsBuilder builder;
for (const auto& entry : incoming_stats_per_source_) {
builder.AddIncomingStats(*entry.second);
}
return builder.Build();
}
EmulatedNetworkOutgoingStatsBuilder::EmulatedNetworkOutgoingStatsBuilder() {
sequence_checker_.Detach();
}
void EmulatedNetworkOutgoingStatsBuilder::OnPacketSent(Timestamp sent_time,
DataSize packet_size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_CHECK_GE(packet_size, DataSize::Zero());
if (first_packet_sent_time_.IsInfinite()) {
first_packet_sent_time_ = sent_time;
first_sent_packet_size_ = packet_size;
}
last_packet_sent_time_ = sent_time;
packets_sent_++;
bytes_sent_ += packet_size;
}
void EmulatedNetworkOutgoingStatsBuilder::AddOutgoingStats(
const EmulatedNetworkOutgoingStats& stats) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
packets_sent_ += stats.PacketsSent();
bytes_sent_ += stats.BytesSent();
if (first_packet_sent_time_ > stats.FirstPacketSentTime()) {
first_packet_sent_time_ = stats.FirstPacketSentTime();
first_sent_packet_size_ = stats.FirstSentPacketSize();
}
if (last_packet_sent_time_ < stats.LastPacketSentTime()) {
last_packet_sent_time_ = stats.LastPacketSentTime();
}
}
std::unique_ptr<EmulatedNetworkOutgoingStats>
EmulatedNetworkOutgoingStatsBuilder::Build() const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
return std::make_unique<EmulatedNetworkOutgoingStatsImpl>(
packets_sent_, bytes_sent_, first_sent_packet_size_,
first_packet_sent_time_, last_packet_sent_time_);
}
EmulatedNetworkIncomingStatsBuilder::EmulatedNetworkIncomingStatsBuilder() {
sequence_checker_.Detach();
}
void EmulatedNetworkIncomingStatsBuilder::OnPacketDropped(
DataSize packet_size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
packets_dropped_++;
bytes_dropped_ += packet_size;
}
void EmulatedNetworkIncomingStatsBuilder::OnPacketReceived(
Timestamp received_time,
DataSize packet_size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_CHECK_GE(packet_size, DataSize::Zero());
if (first_packet_received_time_.IsInfinite()) {
first_packet_received_time_ = received_time;
first_received_packet_size_ = packet_size;
}
last_packet_received_time_ = received_time;
packets_received_++;
bytes_received_ += packet_size;
}
void EmulatedNetworkIncomingStatsBuilder::AddIncomingStats(
const EmulatedNetworkIncomingStats& stats) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
packets_received_ += stats.PacketsReceived();
bytes_received_ += stats.BytesReceived();
packets_dropped_ += stats.PacketsDropped();
bytes_dropped_ += stats.BytesDropped();
if (first_packet_received_time_ > stats.FirstPacketReceivedTime()) {
first_packet_received_time_ = stats.FirstPacketReceivedTime();
first_received_packet_size_ = stats.FirstReceivedPacketSize();
}
if (last_packet_received_time_ < stats.LastPacketReceivedTime()) {
last_packet_received_time_ = stats.LastPacketReceivedTime();
}
}
std::unique_ptr<EmulatedNetworkIncomingStats>
EmulatedNetworkIncomingStatsBuilder::Build() const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
return std::make_unique<EmulatedNetworkIncomingStatsImpl>(
packets_received_, bytes_received_, packets_dropped_, bytes_dropped_,
first_received_packet_size_, first_packet_received_time_,
last_packet_received_time_);
}
EmulatedNetworkStatsBuilder::EmulatedNetworkStatsBuilder() {
sequence_checker_.Detach();
}
EmulatedNetworkStatsBuilder::EmulatedNetworkStatsBuilder(
rtc::IPAddress local_ip) {
local_addresses_.push_back(local_ip);
sequence_checker_.Detach();
}
void EmulatedNetworkStatsBuilder::OnPacketSent(Timestamp sent_time,
rtc::IPAddress destination_ip,
DataSize packet_size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
outgoing_stats_per_destination_[destination_ip].OnPacketSent(sent_time,
packet_size);
}
void EmulatedNetworkStatsBuilder::OnPacketDropped(rtc::IPAddress source_ip,
DataSize packet_size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
incoming_stats_per_source_[source_ip].OnPacketDropped(packet_size);
}
void EmulatedNetworkStatsBuilder::OnPacketReceived(Timestamp received_time,
rtc::IPAddress source_ip,
DataSize packet_size) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
incoming_stats_per_source_[source_ip].OnPacketReceived(received_time,
packet_size);
}
void EmulatedNetworkStatsBuilder::AddEmulatedNetworkStats(
const EmulatedNetworkStats& stats) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// Append IPs from other endpoints stats to the builder.
for (const rtc::IPAddress& addr : stats.LocalAddresses()) {
local_addresses_.push_back(addr);
}
// Add outgoing stats from other endpoints to the builder.
const std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkOutgoingStats>>
outgoing_stats_per_destination = stats.OutgoingStatsPerDestination();
for (const auto& entry : outgoing_stats_per_destination) {
outgoing_stats_per_destination_[entry.first].AddOutgoingStats(
*entry.second);
}
// Add incoming stats from other endpoints to the builder.
const std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkIncomingStats>>
incoming_stats_per_source = stats.IncomingStatsPerSource();
for (const auto& entry : incoming_stats_per_source) {
incoming_stats_per_source_[entry.first].AddIncomingStats(*entry.second);
}
}
std::unique_ptr<EmulatedNetworkStats> EmulatedNetworkStatsBuilder::Build()
const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkOutgoingStats>>
outgoing_stats;
for (const auto& entry : outgoing_stats_per_destination_) {
outgoing_stats.emplace(entry.first, entry.second.Build());
}
std::map<rtc::IPAddress, std::unique_ptr<EmulatedNetworkIncomingStats>>
incoming_stats;
for (const auto& entry : incoming_stats_per_source_) {
incoming_stats.emplace(entry.first, entry.second.Build());
}
return std::make_unique<EmulatedNetworkStatsImpl>(
local_addresses_, std::move(outgoing_stats), std::move(incoming_stats));
}
void LinkEmulation::OnPacketReceived(EmulatedIpPacket packet) {
task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
RTC_DCHECK_RUN_ON(task_queue_);
uint64_t packet_id = next_packet_id_++;
bool sent = network_behavior_->EnqueuePacket(PacketInFlightInfo(
packet.ip_packet_size(), packet.arrival_time.us(), packet_id));
if (sent) {
packets_.emplace_back(StoredPacket{packet_id, std::move(packet), false});
}
if (process_task_.Running())
return;
absl::optional<int64_t> next_time_us =
network_behavior_->NextDeliveryTimeUs();
if (!next_time_us)
return;
Timestamp current_time = clock_->CurrentTime();
process_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_->Get(),
std::max(TimeDelta::Zero(),
Timestamp::Micros(*next_time_us) - current_time),
[this]() {
RTC_DCHECK_RUN_ON(task_queue_);
Timestamp current_time = clock_->CurrentTime();
Process(current_time);
absl::optional<int64_t> next_time_us =
network_behavior_->NextDeliveryTimeUs();
if (!next_time_us) {
process_task_.Stop();
return TimeDelta::Zero(); // This is ignored.
}
RTC_DCHECK_GE(*next_time_us, current_time.us());
return Timestamp::Micros(*next_time_us) - current_time;
});
});
}
void LinkEmulation::Process(Timestamp at_time) {
std::vector<PacketDeliveryInfo> delivery_infos =
network_behavior_->DequeueDeliverablePackets(at_time.us());
for (PacketDeliveryInfo& delivery_info : delivery_infos) {
StoredPacket* packet = nullptr;
for (auto& stored_packet : packets_) {
if (stored_packet.id == delivery_info.packet_id) {
packet = &stored_packet;
break;
}
}
RTC_CHECK(packet);
RTC_DCHECK(!packet->removed);
packet->removed = true;
if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
packet->packet.arrival_time =
Timestamp::Micros(delivery_info.receive_time_us);
receiver_->OnPacketReceived(std::move(packet->packet));
}
while (!packets_.empty() && packets_.front().removed) {
packets_.pop_front();
}
}
}
NetworkRouterNode::NetworkRouterNode(rtc::TaskQueue* task_queue)
: task_queue_(task_queue) {}
void NetworkRouterNode::OnPacketReceived(EmulatedIpPacket packet) {
RTC_DCHECK_RUN_ON(task_queue_);
if (watcher_) {
watcher_(packet);
}
if (filter_) {
if (!filter_(packet))
return;
}
auto receiver_it = routing_.find(packet.to.ipaddr());
if (receiver_it == routing_.end()) {
return;
}
RTC_CHECK(receiver_it != routing_.end());
receiver_it->second->OnPacketReceived(std::move(packet));
}
void NetworkRouterNode::SetReceiver(
const rtc::IPAddress& dest_ip,
EmulatedNetworkReceiverInterface* receiver) {
task_queue_->PostTask([=] {
RTC_DCHECK_RUN_ON(task_queue_);
EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
<< "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
routing_[dest_ip] = receiver;
});
}
void NetworkRouterNode::RemoveReceiver(const rtc::IPAddress& dest_ip) {
RTC_DCHECK_RUN_ON(task_queue_);
routing_.erase(dest_ip);
}
void NetworkRouterNode::SetWatcher(
std::function<void(const EmulatedIpPacket&)> watcher) {
task_queue_->PostTask([=] {
RTC_DCHECK_RUN_ON(task_queue_);
watcher_ = watcher;
});
}
void NetworkRouterNode::SetFilter(
std::function<bool(const EmulatedIpPacket&)> filter) {
task_queue_->PostTask([=] {
RTC_DCHECK_RUN_ON(task_queue_);
filter_ = filter;
});
}
EmulatedNetworkNode::EmulatedNetworkNode(
Clock* clock,
rtc::TaskQueue* task_queue,
std::unique_ptr<NetworkBehaviorInterface> network_behavior)
: router_(task_queue),
link_(clock, task_queue, std::move(network_behavior), &router_) {}
void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
link_.OnPacketReceived(std::move(packet));
}
void EmulatedNetworkNode::CreateRoute(
const rtc::IPAddress& receiver_ip,
std::vector<EmulatedNetworkNode*> nodes,
EmulatedNetworkReceiverInterface* receiver) {
RTC_CHECK(!nodes.empty());
for (size_t i = 0; i + 1 < nodes.size(); ++i)
nodes[i]->router()->SetReceiver(receiver_ip, nodes[i + 1]);
nodes.back()->router()->SetReceiver(receiver_ip, receiver);
}
void EmulatedNetworkNode::ClearRoute(const rtc::IPAddress& receiver_ip,
std::vector<EmulatedNetworkNode*> nodes) {
for (EmulatedNetworkNode* node : nodes)
node->router()->RemoveReceiver(receiver_ip);
}
EmulatedNetworkNode::~EmulatedNetworkNode() = default;
EmulatedEndpointImpl::EmulatedEndpointImpl(uint64_t id,
const rtc::IPAddress& ip,
bool is_enabled,
rtc::AdapterType type,
rtc::TaskQueue* task_queue,
Clock* clock)
: id_(id),
peer_local_addr_(ip),
is_enabled_(is_enabled),
type_(type),
clock_(clock),
task_queue_(task_queue),
router_(task_queue_),
next_port_(kFirstEphemeralPort),
stats_builder_(peer_local_addr_) {
constexpr int kIPv4NetworkPrefixLength = 24;
constexpr int kIPv6NetworkPrefixLength = 64;
int prefix_length = 0;
if (ip.family() == AF_INET) {
prefix_length = kIPv4NetworkPrefixLength;
} else if (ip.family() == AF_INET6) {
prefix_length = kIPv6NetworkPrefixLength;
}
rtc::IPAddress prefix = TruncateIP(ip, prefix_length);
network_ = std::make_unique<rtc::Network>(
ip.ToString(), "Endpoint id=" + std::to_string(id_), prefix,
prefix_length, type_);
network_->AddIP(ip);
enabled_state_checker_.Detach();
}
EmulatedEndpointImpl::~EmulatedEndpointImpl() = default;
uint64_t EmulatedEndpointImpl::GetId() const {
return id_;
}
void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from,
const rtc::SocketAddress& to,
rtc::CopyOnWriteBuffer packet_data,
uint16_t application_overhead) {
RTC_CHECK(from.ipaddr() == peer_local_addr_);
EmulatedIpPacket packet(from, to, std::move(packet_data),
clock_->CurrentTime(), application_overhead);
task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
RTC_DCHECK_RUN_ON(task_queue_);
stats_builder_.OnPacketSent(clock_->CurrentTime(), packet.to.ipaddr(),
DataSize::Bytes(packet.ip_packet_size()));
router_.OnPacketReceived(std::move(packet));
});
}
absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver) {
rtc::CritScope crit(&receiver_lock_);
uint16_t port = desired_port;
if (port == 0) {
// Because client can specify its own port, next_port_ can be already in
// use, so we need to find next available port.
int ports_pool_size =
std::numeric_limits<uint16_t>::max() - kFirstEphemeralPort + 1;
for (int i = 0; i < ports_pool_size; ++i) {
uint16_t next_port = NextPort();
if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) {
port = next_port;
break;
}
}
}
RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
<< id_;
bool result = port_to_receiver_.insert({port, receiver}).second;
if (!result) {
RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port
<< " in endpoint " << id_;
return absl::nullopt;
}
RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port "
<< port;
return port;
}
uint16_t EmulatedEndpointImpl::NextPort() {
uint16_t out = next_port_;
if (next_port_ == std::numeric_limits<uint16_t>::max()) {
next_port_ = kFirstEphemeralPort;
} else {
next_port_++;
}
return out;
}
void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
rtc::CritScope crit(&receiver_lock_);
port_to_receiver_.erase(port);
}
rtc::IPAddress EmulatedEndpointImpl::GetPeerLocalAddress() const {
return peer_local_addr_;
}
void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
RTC_DCHECK_RUN_ON(task_queue_);
RTC_CHECK(packet.to.ipaddr() == peer_local_addr_)
<< "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
<< packet.to.ipaddr().ToString()
<< "; Receiver peer_local_addr_=" << peer_local_addr_.ToString();
rtc::CritScope crit(&receiver_lock_);
stats_builder_.OnPacketReceived(clock_->CurrentTime(), packet.from.ipaddr(),
DataSize::Bytes(packet.ip_packet_size()));
auto it = port_to_receiver_.find(packet.to.port());
if (it == port_to_receiver_.end()) {
// It can happen, that remote peer closed connection, but there still some
// packets, that are going to it. It can happen during peer connection close
// process: one peer closed connection, second still sending data.
RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
<< " on port " << packet.to.port();
stats_builder_.OnPacketDropped(packet.from.ipaddr(),
DataSize::Bytes(packet.ip_packet_size()));
return;
}
// Endpoint assumes frequent calls to bind and unbind methods, so it holds
// lock during packet processing to ensure that receiver won't be deleted
// before call to OnPacketReceived.
it->second->OnPacketReceived(std::move(packet));
}
void EmulatedEndpointImpl::Enable() {
RTC_DCHECK_RUN_ON(&enabled_state_checker_);
RTC_CHECK(!is_enabled_);
is_enabled_ = true;
}
void EmulatedEndpointImpl::Disable() {
RTC_DCHECK_RUN_ON(&enabled_state_checker_);
RTC_CHECK(is_enabled_);
is_enabled_ = false;
}
bool EmulatedEndpointImpl::Enabled() const {
RTC_DCHECK_RUN_ON(&enabled_state_checker_);
return is_enabled_;
}
std::unique_ptr<EmulatedNetworkStats> EmulatedEndpointImpl::stats() const {
RTC_DCHECK_RUN_ON(task_queue_);
return stats_builder_.Build();
}
EndpointsContainer::EndpointsContainer(
const std::vector<EmulatedEndpointImpl*>& endpoints)
: endpoints_(endpoints) {}
EmulatedEndpointImpl* EndpointsContainer::LookupByLocalAddress(
const rtc::IPAddress& local_ip) const {
for (auto* endpoint : endpoints_) {
rtc::IPAddress peer_local_address = endpoint->GetPeerLocalAddress();
if (peer_local_address == local_ip) {
return endpoint;
}
}
RTC_CHECK(false) << "No network found for address" << local_ip.ToString();
}
bool EndpointsContainer::HasEndpoint(EmulatedEndpointImpl* endpoint) const {
for (auto* e : endpoints_) {
if (e->GetId() == endpoint->GetId()) {
return true;
}
}
return false;
}
std::vector<std::unique_ptr<rtc::Network>>
EndpointsContainer::GetEnabledNetworks() const {
std::vector<std::unique_ptr<rtc::Network>> networks;
for (auto* endpoint : endpoints_) {
if (endpoint->Enabled()) {
networks.emplace_back(
std::make_unique<rtc::Network>(endpoint->network()));
}
}
return networks;
}
std::vector<EmulatedEndpoint*> EndpointsContainer::endpoints() const {
return std::vector<EmulatedEndpoint*>(endpoints_.begin(), endpoints_.end());
}
std::unique_ptr<EmulatedNetworkStats> EndpointsContainer::GetStats() const {
EmulatedNetworkStatsBuilder stats_builder;
for (auto* endpoint : endpoints_) {
stats_builder.AddEmulatedNetworkStats(*endpoint->stats());
}
return stats_builder.Build();
}
} // namespace webrtc