Extracting the simulation part of FakeNetworkPipe

This CL extracts the part of FakeNetworkPipe responsible for simulating
network behavior into the SimulatedNetwork class, which implements the
new FakeNetworkInterface.

This prepares for an upcoming CL where the network simulation can
be injected in FakeNetworkPipe, allowing custom simulation models to be
used.

Bug: None
Change-Id: I9b5fa0dd9ff1fd8ccd5a7ce2d9ea3a5b11c5215e
Reviewed-on: https://webrtc-review.googlesource.com/64405
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#23146}
This commit is contained in:
Sebastian Jansson 2018-05-07 14:49:39 +02:00 committed by Commit Bot
parent 9ad845d2ab
commit 7ee2e25afb
2 changed files with 230 additions and 97 deletions

View File

@ -19,6 +19,7 @@
#include "call/call.h"
#include "call/fake_network_pipe.h"
#include "rtc_base/logging.h"
#include "rtc_base/ptr_util.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
@ -26,11 +27,6 @@ namespace webrtc {
namespace {
constexpr int64_t kDefaultProcessIntervalMs = 5;
constexpr int64_t kLogIntervalMs = 5000;
struct PacketArrivalTimeComparator {
bool operator()(const NetworkPacket& p1, const NetworkPacket& p2) {
return p1.arrival_time() < p2.arrival_time();
}
};
} // namespace
NetworkPacket::NetworkPacket(rtc::CopyOnWriteBuffer packet,
@ -83,37 +79,29 @@ FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
PacketReceiver* receiver,
uint64_t seed)
: clock_(clock),
network_simulation_(rtc::MakeUnique<SimulatedNetwork>(config, seed)),
receiver_(receiver),
transport_(nullptr),
random_(seed),
clock_offset_ms_(0),
config_(),
dropped_packets_(0),
sent_packets_(0),
total_packet_delay_us_(0),
bursting_(false),
next_process_time_us_(clock_->TimeInMicroseconds()),
last_log_time_us_(clock_->TimeInMicroseconds()) {
SetConfig(config);
}
last_log_time_us_(clock_->TimeInMicroseconds()) {}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
Transport* transport)
: clock_(clock),
network_simulation_(rtc::MakeUnique<SimulatedNetwork>(config, 1)),
receiver_(nullptr),
transport_(transport),
random_(1),
clock_offset_ms_(0),
config_(),
dropped_packets_(0),
sent_packets_(0),
total_packet_delay_us_(0),
bursting_(false),
next_process_time_us_(clock_->TimeInMicroseconds()),
last_log_time_us_(clock_->TimeInMicroseconds()) {
SetConfig(config);
}
last_log_time_us_(clock_->TimeInMicroseconds()) {}
FakeNetworkPipe::~FakeNetworkPipe() = default;
@ -153,7 +141,17 @@ void FakeNetworkPipe::SetClockOffset(int64_t offset_ms) {
clock_offset_ms_ = offset_ms;
}
SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config,
uint64_t random_seed)
: random_(random_seed), bursting_(false) {
SetConfig(config);
}
void FakeNetworkPipe::SetConfig(const FakeNetworkPipe::Config& config) {
network_simulation_->SetConfig(config);
}
void SimulatedNetwork::SetConfig(const SimulatedNetwork::Config& config) {
rtc::CritScope crit(&config_lock_);
config_ = config; // Shallow copy of the struct.
double prob_loss = config.loss_percent / 100.0;
@ -176,11 +174,7 @@ void FakeNetworkPipe::SetConfig(const FakeNetworkPipe::Config& config) {
}
}
bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
rtc::Optional<PacketOptions> options,
bool is_rtcp,
MediaType media_type,
rtc::Optional<PacketTime> packet_time) {
bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
Config config;
{
rtc::CritScope crit(&config_lock_);
@ -190,12 +184,9 @@ bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
if (config.queue_length_packets > 0 &&
capacity_link_.size() >= config.queue_length_packets) {
// Too many packet on the link, drop this one.
++dropped_packets_;
return false;
}
int64_t time_now_us = clock_->TimeInMicroseconds();
// Delay introduced by the link capacity.
int64_t capacity_delay_ms = 0;
if (config.link_capacity_kbps > 0) {
@ -203,26 +194,57 @@ bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
const int64_t bytes_per_millisecond = config.link_capacity_kbps / 8;
// To round to the closest millisecond we add half a milliseconds worth of
// bytes to the delay calculation.
capacity_delay_ms = (packet.size() + capacity_delay_error_bytes_ +
capacity_delay_ms = (packet.size + capacity_delay_error_bytes_ +
bytes_per_millisecond / 2) /
bytes_per_millisecond;
capacity_delay_error_bytes_ +=
packet.size() - capacity_delay_ms * bytes_per_millisecond;
packet.size - capacity_delay_ms * bytes_per_millisecond;
}
int64_t network_start_time_us = time_now_us;
int64_t network_start_time_us = packet.send_time_us;
// Check if there already are packets on the link and change network start
// time forward if there is.
if (!capacity_link_.empty() &&
network_start_time_us < capacity_link_.back().arrival_time())
network_start_time_us = capacity_link_.back().arrival_time();
network_start_time_us < capacity_link_.back().arrival_time_us)
network_start_time_us = capacity_link_.back().arrival_time_us;
int64_t arrival_time_us = network_start_time_us + capacity_delay_ms * 1000;
capacity_link_.emplace(std::move(packet), time_now_us, arrival_time_us,
options, is_rtcp, media_type, packet_time);
capacity_link_.push({packet, arrival_time_us});
return true;
}
rtc::Optional<int64_t> SimulatedNetwork::NextDeliveryTimeUs() const {
if (!delay_link_.empty())
return delay_link_.begin()->arrival_time_us;
return rtc::nullopt;
}
FakeNetworkPipe::StoredPacket::StoredPacket(NetworkPacket&& packet)
: packet(std::move(packet)) {}
bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
rtc::Optional<PacketOptions> options,
bool is_rtcp,
MediaType media_type,
rtc::Optional<PacketTime> packet_time) {
int64_t time_now_us = clock_->TimeInMicroseconds();
rtc::CritScope crit(&process_lock_);
size_t packet_size = packet.size();
NetworkPacket net_packet(std::move(packet), time_now_us, time_now_us, options,
is_rtcp, media_type, packet_time);
packets_in_flight_.emplace_back(StoredPacket(std::move(net_packet)));
int64_t packet_id = reinterpret_cast<uint64_t>(&packets_in_flight_.back());
bool sent = network_simulation_->EnqueuePacket(
PacketInFlightInfo(packet_size, time_now_us, packet_id));
if (!sent) {
packets_in_flight_.pop_back();
++dropped_packets_;
}
return sent;
}
float FakeNetworkPipe::PercentageLoss() {
rtc::CritScope crit(&process_lock_);
if (sent_packets_ == 0)
@ -251,9 +273,9 @@ size_t FakeNetworkPipe::SentPackets() {
return sent_packets_;
}
void FakeNetworkPipe::Process() {
int64_t time_now_us = clock_->TimeInMicroseconds();
std::queue<NetworkPacket> packets_to_deliver;
std::vector<PacketDeliveryInfo> SimulatedNetwork::DequeueDeliverablePackets(
int64_t receive_time_us) {
int64_t time_now_us = receive_time_us;
Config config;
double prob_loss_bursting;
double prob_start_bursting;
@ -265,24 +287,15 @@ void FakeNetworkPipe::Process() {
}
{
rtc::CritScope crit(&process_lock_);
if (time_now_us - last_log_time_us_ > kLogIntervalMs * 1000) {
int64_t queueing_delay_us = 0;
if (!capacity_link_.empty()) {
queueing_delay_us = time_now_us - capacity_link_.front().send_time();
}
RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_us << " us.";
last_log_time_us_ = time_now_us;
}
// Check the capacity link first.
if (!capacity_link_.empty()) {
int64_t last_arrival_time_us =
delay_link_.empty() ? -1 : delay_link_.back().arrival_time();
delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us;
bool needs_sort = false;
while (!capacity_link_.empty() &&
time_now_us >= capacity_link_.front().arrival_time()) {
time_now_us >= capacity_link_.front().arrival_time_us) {
// Time to get this packet.
NetworkPacket packet = std::move(capacity_link_.front());
PacketInfo packet = std::move(capacity_link_.front());
capacity_link_.pop();
// Drop packets at an average rate of |config_.loss_percent| with
@ -303,13 +316,14 @@ void FakeNetworkPipe::Process() {
// If reordering is not allowed then adjust arrival_time_jitter
// to make sure all packets are sent in order.
if (!config.allow_reordering && !delay_link_.empty() &&
packet.arrival_time() + arrival_time_jitter_us <
packet.arrival_time_us + arrival_time_jitter_us <
last_arrival_time_us) {
arrival_time_jitter_us = last_arrival_time_us - packet.arrival_time();
arrival_time_jitter_us =
last_arrival_time_us - packet.arrival_time_us;
}
packet.IncrementArrivalTime(arrival_time_jitter_us);
if (packet.arrival_time() >= last_arrival_time_us) {
last_arrival_time_us = packet.arrival_time();
packet.arrival_time_us += arrival_time_jitter_us;
if (packet.arrival_time_us >= last_arrival_time_us) {
last_arrival_time_us = packet.arrival_time_us;
} else {
needs_sort = true;
}
@ -319,21 +333,76 @@ void FakeNetworkPipe::Process() {
if (needs_sort) {
// Packet(s) arrived out of order, make sure list is sorted.
std::sort(delay_link_.begin(), delay_link_.end(),
PacketArrivalTimeComparator());
[](const PacketInfo& p1, const PacketInfo& p2) {
return p1.arrival_time_us < p2.arrival_time_us;
});
}
}
std::vector<PacketDeliveryInfo> packets_to_deliver;
// Check the extra delay queue.
while (!delay_link_.empty() &&
time_now_us >= delay_link_.front().arrival_time()) {
// Deliver this packet.
NetworkPacket packet(std::move(delay_link_.front()));
time_now_us >= delay_link_.front().arrival_time_us) {
PacketInfo packet_info = delay_link_.front();
packets_to_deliver.emplace_back(
PacketDeliveryInfo(packet_info.packet, packet_info.arrival_time_us));
delay_link_.pop_front();
// |time_now| might be later than when the packet should have arrived, due
// to NetworkProcess being called too late. For stats, use the time it
// should have been on the link.
total_packet_delay_us_ += packet.arrival_time() - packet.send_time();
packets_to_deliver.push(std::move(packet));
}
return packets_to_deliver;
}
}
void FakeNetworkPipe::Process() {
int64_t time_now_us = clock_->TimeInMicroseconds();
std::queue<NetworkPacket> packets_to_deliver;
{
rtc::CritScope crit(&process_lock_);
if (time_now_us - last_log_time_us_ > kLogIntervalMs * 1000) {
int64_t queueing_delay_us = 0;
if (!packets_in_flight_.empty())
queueing_delay_us =
time_now_us - packets_in_flight_.front().packet.send_time();
RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_us / 1000
<< " ms.";
last_log_time_us_ = time_now_us;
}
std::vector<PacketDeliveryInfo> delivery_infos =
network_simulation_->DequeueDeliverablePackets(time_now_us);
for (auto& delivery_info : delivery_infos) {
// In the common case where no reordering happens, find will return early
// as the first packet will be a match.
auto packet_it =
std::find_if(packets_in_flight_.begin(), packets_in_flight_.end(),
[&delivery_info](StoredPacket& packet_ref) {
return reinterpret_cast<uint64_t>(&packet_ref) ==
delivery_info.packet_id;
});
// Check that the packet is in the deque of packets in flight.
RTC_CHECK(packet_it != packets_in_flight_.end());
// Check that the packet is not already removed.
RTC_DCHECK(!packet_it->removed);
NetworkPacket packet = std::move(packet_it->packet);
packet_it->removed = true;
// Cleanup of removed packets at the beginning of the deque.
while (!packets_in_flight_.empty() &&
packets_in_flight_.front().removed) {
packets_in_flight_.pop_front();
}
if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
int64_t added_delay_us =
delivery_info.receive_time_us - packet.send_time();
packet.IncrementArrivalTime(added_delay_us);
packets_to_deliver.emplace(std::move(packet));
// |time_now_us| might be later than when the packet should have
// arrived, due to NetworkProcess being called too late. For stats, use
// the time it should have been on the link.
total_packet_delay_us_ += added_delay_us;
}
}
sent_packets_ += packets_to_deliver.size();
}
@ -344,9 +413,10 @@ void FakeNetworkPipe::Process() {
packets_to_deliver.pop();
DeliverPacket(&packet);
}
next_process_time_us_ = !delay_link_.empty()
? delay_link_.begin()->arrival_time()
rtc::Optional<int64_t> delivery_us =
network_simulation_->NextDeliveryTimeUs();
next_process_time_us_ = delivery_us
? *delivery_us
: time_now_us + kDefaultProcessIntervalMs * 1000;
}
@ -399,11 +469,6 @@ void FakeNetworkPipe::ResetStats() {
total_packet_delay_us_ = 0;
}
int FakeNetworkPipe::GetConfigCapacityKbps() const {
rtc::CritScope crit(&config_lock_);
return config_.link_capacity_kbps;
}
void FakeNetworkPipe::AddToPacketDropCount() {
rtc::CritScope crit(&process_lock_);
++dropped_packets_;
@ -423,10 +488,6 @@ int64_t FakeNetworkPipe::GetTimeInMicroseconds() const {
return clock_->TimeInMicroseconds();
}
bool FakeNetworkPipe::IsRandomLoss(double prob_loss) {
return random_.Rand<double>() < prob_loss;
}
bool FakeNetworkPipe::ShouldProcess(int64_t time_now_us) const {
return time_now_us >= next_process_time_us_;
}

View File

@ -15,7 +15,9 @@
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <string>
#include <vector>
#include "api/call/transport.h"
#include "call/call.h"
@ -82,12 +84,38 @@ class NetworkPacket {
rtc::Optional<PacketTime> packet_time_;
};
struct PacketInFlightInfo {
PacketInFlightInfo(size_t size, int64_t send_time_us, uint64_t packet_id)
: size(size), send_time_us(send_time_us), packet_id(packet_id) {}
// Class faking a network link. This is a simple and naive solution just faking
// capacity and adding an extra transport delay in addition to the capacity
// introduced delay.
size_t size;
int64_t send_time_us;
// Unique identifier for the packet in relation to other packets in flight.
uint64_t packet_id;
};
class FakeNetworkPipe : public Transport, public PacketReceiver, public Module {
struct PacketDeliveryInfo {
static constexpr int kNotReceived = -1;
PacketDeliveryInfo(PacketInFlightInfo source, int64_t receive_time_us)
: receive_time_us(receive_time_us), packet_id(source.packet_id) {}
int64_t receive_time_us;
uint64_t packet_id;
};
class NetworkSimulationInterface {
public:
virtual bool EnqueuePacket(PacketInFlightInfo packet_info) = 0;
// Retrieves all packets that should be delivered by the given receive time.
virtual std::vector<PacketDeliveryInfo> DequeueDeliverablePackets(
int64_t receive_time_us) = 0;
virtual rtc::Optional<int64_t> NextDeliveryTimeUs() const = 0;
virtual ~NetworkSimulationInterface() = default;
};
// Class simulating a network link. This is a simple and naive solution just
// faking capacity and adding an extra transport delay in addition to the
// capacity introduced delay.
class SimulatedNetwork : public NetworkSimulationInterface {
public:
struct Config {
Config() {}
@ -106,6 +134,53 @@ class FakeNetworkPipe : public Transport, public PacketReceiver, public Module {
// The average length of a burst of lost packets.
int avg_burst_loss_length = -1;
};
explicit SimulatedNetwork(Config config, uint64_t random_seed = 1);
// Sets a new configuration. This won't affect packets already in the pipe.
void SetConfig(const Config& config);
// NetworkSimulationInterface
bool EnqueuePacket(PacketInFlightInfo packet) override;
std::vector<PacketDeliveryInfo> DequeueDeliverablePackets(
int64_t receive_time_us) override;
rtc::Optional<int64_t> NextDeliveryTimeUs() const override;
private:
struct PacketInfo {
PacketInFlightInfo packet;
int64_t arrival_time_us;
};
rtc::CriticalSection config_lock_;
// |process_lock| guards the data structures involved in delay and loss
// processes, such as the packet queues.
rtc::CriticalSection process_lock_;
std::queue<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_lock_);
Random random_;
std::deque<PacketInfo> delay_link_;
// Link configuration.
Config config_ RTC_GUARDED_BY(config_lock_);
// Are we currently dropping a burst of packets?
bool bursting_;
// The probability to drop the packet if we are currently dropping a
// burst of packet
double prob_loss_bursting_ RTC_GUARDED_BY(config_lock_);
// The probability to drop a burst of packets.
double prob_start_bursting_ RTC_GUARDED_BY(config_lock_);
int64_t capacity_delay_error_bytes_ = 0;
};
// Class faking a network link, internally is uses an implementation of a
// SimulatedNetworkInterface to simulate network behavior.
class FakeNetworkPipe : public Transport, public PacketReceiver, public Module {
public:
using Config = SimulatedNetwork::Config;
// Use these constructors if you plan to insert packets using DeliverPacket().
FakeNetworkPipe(Clock* clock, const FakeNetworkPipe::Config& config);
@ -165,16 +240,24 @@ class FakeNetworkPipe : public Transport, public PacketReceiver, public Module {
protected:
void DeliverPacketWithLock(NetworkPacket* packet);
int GetConfigCapacityKbps() const;
void AddToPacketDropCount();
void AddToPacketSentCount(int count);
void AddToTotalDelay(int delay_us);
int64_t GetTimeInMicroseconds() const;
bool IsRandomLoss(double prob_loss);
bool ShouldProcess(int64_t time_now_us) const;
void SetTimeToNextProcess(int64_t skip_us);
private:
struct StoredPacket {
NetworkPacket packet;
bool removed = false;
explicit StoredPacket(NetworkPacket&& packet);
StoredPacket(StoredPacket&&) = default;
StoredPacket(const StoredPacket&) = delete;
StoredPacket& operator=(const StoredPacket&) = delete;
StoredPacket() = delete;
};
// Returns true if enqueued, or false if packet was dropped.
virtual bool EnqueuePacket(rtc::CopyOnWriteBuffer packet,
rtc::Optional<PacketOptions> options,
@ -189,43 +272,32 @@ class FakeNetworkPipe : public Transport, public PacketReceiver, public Module {
Clock* const clock_;
// |config_lock| guards the mostly constant things like the callbacks.
rtc::CriticalSection config_lock_;
const std::unique_ptr<SimulatedNetwork> network_simulation_;
PacketReceiver* receiver_ RTC_GUARDED_BY(config_lock_);
Transport* const transport_ RTC_GUARDED_BY(config_lock_);
// |process_lock| guards the data structures involved in delay and loss
// processes, such as the packet queues.
rtc::CriticalSection process_lock_;
std::queue<NetworkPacket> capacity_link_ RTC_GUARDED_BY(process_lock_);
Random random_;
std::deque<NetworkPacket> delay_link_;
// Packets are added at the back of the deque, this makes the deque ordered
// by increasing send time. The common case when removing packets from the
// deque is removing early packets, which will be close to the front of the
// deque. This makes finding the packets in the deque efficient in the common
// case.
std::deque<StoredPacket> packets_in_flight_ RTC_GUARDED_BY(process_lock_);
int64_t clock_offset_ms_ RTC_GUARDED_BY(config_lock_);
// Link configuration.
Config config_ RTC_GUARDED_BY(config_lock_);
// Statistics.
size_t dropped_packets_ RTC_GUARDED_BY(process_lock_);
size_t sent_packets_ RTC_GUARDED_BY(process_lock_);
int64_t total_packet_delay_us_ RTC_GUARDED_BY(process_lock_);
// Are we currently dropping a burst of packets?
bool bursting_;
// The probability to drop the packet if we are currently dropping a
// burst of packet
double prob_loss_bursting_ RTC_GUARDED_BY(config_lock_);
// The probability to drop a burst of packets.
double prob_start_bursting_ RTC_GUARDED_BY(config_lock_);
int64_t next_process_time_us_;
int64_t last_log_time_us_;
int64_t capacity_delay_error_bytes_ = 0;
RTC_DISALLOW_COPY_AND_ASSIGN(FakeNetworkPipe);
};