Adds CoDel implementation to network simulation.

Adds an implementation of the CoDel active queue management algorithm
to the network simulation. It is loosely based on CoDel pseudocode
from ACMQueue: https://queue.acm.org/appendices/codel.html

Bug: webrtc:9510
Change-Id: Ice485be35a01dafa6169d697b51b5c1b33a49ba6
Reviewed-on: https://webrtc-review.googlesource.com/c/123581
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26834}
This commit is contained in:
Sebastian Jansson 2019-02-25 10:24:46 +01:00 committed by Commit Bot
parent 418dd0b96a
commit 2b08e3188e
7 changed files with 254 additions and 3 deletions

View File

@ -63,6 +63,8 @@ struct BuiltInNetworkBehaviorConfig {
int avg_burst_loss_length = -1;
// Additional bytes to add to packet size.
int packet_overhead = 0;
// Enable CoDel active queue management.
bool codel_active_queue_management = false;
};
class NetworkBehaviorInterface {

View File

@ -286,6 +286,7 @@ rtc_source_set("simulated_network") {
"../api/units:data_rate",
"../api/units:data_size",
"../api/units:time_delta",
"../api/units:timestamp",
"../rtc_base:checks",
"../rtc_base:rtc_base_approved",
"../rtc_base:sequenced_task_checker",
@ -498,13 +499,16 @@ if (rtc_include_tests) {
sources = [
"fake_network_pipe_unittest.cc",
"simulated_network_unittest.cc",
]
deps = [
":fake_network",
":simulated_network",
"../api/units:data_rate",
"../system_wrappers",
"../test:test_support",
"//testing/gtest",
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/memory",
]
}

View File

@ -21,7 +21,62 @@
namespace webrtc {
namespace {
constexpr int64_t kDefaultProcessDelayUs = 5000;
constexpr TimeDelta kDefaultProcessDelay = TimeDelta::Millis<5>();
} // namespace
CoDelSimulation::CoDelSimulation() = default;
CoDelSimulation::~CoDelSimulation() = default;
bool CoDelSimulation::DropDequeuedPacket(Timestamp now,
Timestamp enqueing_time,
DataSize packet_size,
DataSize queue_size) {
constexpr TimeDelta kWindow = TimeDelta::Millis<100>();
constexpr TimeDelta kDelayThreshold = TimeDelta::Millis<5>();
constexpr TimeDelta kDropCountMemory = TimeDelta::Millis<1600>();
constexpr DataSize kMaxPacketSize = DataSize::Bytes<1500>();
// Compensates for process interval in simulation; not part of standard CoDel.
TimeDelta queuing_time = now - enqueing_time - kDefaultProcessDelay;
if (queue_size < kMaxPacketSize || queuing_time < kDelayThreshold) {
enter_drop_state_at_ = Timestamp::PlusInfinity();
state_ = kNormal;
return false;
}
switch (state_) {
case kNormal:
enter_drop_state_at_ = now + kWindow;
state_ = kPending;
return false;
case kPending:
if (now >= enter_drop_state_at_) {
state_ = kDropping;
// Starting the drop counter with the drops made during the most recent
// drop state period.
drop_count_ = drop_count_ - previous_drop_count_;
if (now >= last_drop_at_ + kDropCountMemory)
drop_count_ = 0;
previous_drop_count_ = drop_count_;
last_drop_at_ = now;
++drop_count_;
return true;
}
return false;
case kDropping:
TimeDelta drop_delay = kWindow / sqrt(static_cast<double>(drop_count_));
Timestamp next_drop_at = last_drop_at_ + drop_delay;
if (now >= next_drop_at) {
if (queue_size - packet_size < kMaxPacketSize)
state_ = kPending;
last_drop_at_ = next_drop_at;
++drop_count_;
return true;
}
return false;
}
}
SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config,
@ -80,7 +135,7 @@ bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
queue_size_bytes_ += packet.size;
capacity_link_.push({packet, packet.send_time_us});
if (!next_process_time_us_) {
next_process_time_us_ = packet.send_time_us + kDefaultProcessDelayUs;
next_process_time_us_ = packet.send_time_us + kDefaultProcessDelay.us();
}
return true;
@ -133,6 +188,20 @@ void SimulatedNetwork::UpdateCapacityQueue(ConfigState state,
capacity_link_.pop();
time_us += time_until_front_exits_us;
if (state.config.codel_active_queue_management) {
while (!capacity_link_.empty() &&
codel_controller_.DropDequeuedPacket(
Timestamp::us(time_us),
Timestamp::us(capacity_link_.front().packet.send_time_us),
DataSize::bytes(capacity_link_.front().packet.size),
DataSize::bytes(queue_size_bytes_))) {
PacketInfo dropped = capacity_link_.front();
capacity_link_.pop();
queue_size_bytes_ -= dropped.packet.size;
dropped.arrival_time_us = PacketDeliveryInfo::kNotReceived;
delay_link_.emplace_back(dropped);
}
}
RTC_DCHECK(time_us >= packet.packet.send_time_us);
packet.arrival_time_us =
std::max(state.pause_transmission_until_us, time_us);
@ -206,7 +275,7 @@ std::vector<PacketDeliveryInfo> SimulatedNetwork::DequeueDeliverablePackets(
if (!delay_link_.empty()) {
next_process_time_us_ = delay_link_.front().arrival_time_us;
} else if (!capacity_link_.empty()) {
next_process_time_us_ = receive_time_us + kDefaultProcessDelayUs;
next_process_time_us_ = receive_time_us + kDefaultProcessDelay.us();
} else {
next_process_time_us_.reset();
}

View File

@ -17,6 +17,8 @@
#include "absl/types/optional.h"
#include "api/test/simulated_network.h"
#include "api/units/data_size.h"
#include "api/units/timestamp.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/random.h"
@ -24,6 +26,29 @@
#include "rtc_base/thread_checker.h"
namespace webrtc {
// Implementation of the CoDel active queue management algorithm. Loosely based
// on CoDel pseudocode from ACMQueue. CoDel keeps queuing delays low by dropping
// packets when delay is high. For each packet ready for dequeue, call
// DropDequeuePacket with the packet parameters to update the CoDel state.
class CoDelSimulation {
public:
CoDelSimulation();
~CoDelSimulation();
// Returns true if packet should be dropped.
bool DropDequeuedPacket(Timestamp now,
Timestamp enqueing_time,
DataSize packet_size,
DataSize queue_size);
private:
enum State { kNormal, kPending, kDropping };
Timestamp enter_drop_state_at_ = Timestamp::PlusInfinity();
Timestamp last_drop_at_ = Timestamp::MinusInfinity();
int drop_count_ = 0;
int previous_drop_count_ = 0;
State state_ = State::kNormal;
};
// Class simulating a network link. This is a simple and naive solution just
// faking capacity and adding an extra transport delay in addition to the
@ -73,6 +98,7 @@ class SimulatedNetwork : public NetworkBehaviorInterface {
// |process_checker_| guards the data structures involved in delay and loss
// processes, such as the packet queues.
rtc::RaceChecker process_checker_;
CoDelSimulation codel_controller_ RTC_GUARDED_BY(process_checker_);
std::queue<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_checker_);
Random random_;

View File

@ -0,0 +1,147 @@
/*
* Copyright 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <map>
#include <set>
#include <vector>
#include <algorithm>
#include "absl/algorithm/container.h"
#include "api/units/data_rate.h"
#include "call/simulated_network.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
constexpr int kNotReceived = PacketDeliveryInfo::kNotReceived;
}
TEST(SimulatedNetworkTest, CodelDoesNothingAtCapacity) {
const TimeDelta kRuntime = TimeDelta::seconds(30);
DataRate link_capacity = DataRate::kbps(1000);
const DataSize packet_size = DataSize::bytes(1000);
SimulatedNetwork::Config config;
config.codel_active_queue_management = true;
config.queue_delay_ms = 10;
config.link_capacity_kbps = link_capacity.kbps();
SimulatedNetwork network(config);
// Need to round up here as otherwise we actually will choke.
const TimeDelta packet_inverval =
packet_size / link_capacity + TimeDelta::ms(1);
// Send at capacity and see we get no loss.
Timestamp start_time = Timestamp::ms(0);
Timestamp current_time = start_time;
Timestamp next_packet_time = start_time;
uint64_t next_id = 0;
std::set<uint64_t> pending;
while (current_time - start_time < kRuntime) {
if (current_time >= next_packet_time) {
bool success = network.EnqueuePacket(PacketInFlightInfo{
packet_size.bytes<size_t>(), current_time.us(), next_id});
EXPECT_TRUE(success);
pending.insert(next_id);
++next_id;
next_packet_time += packet_inverval;
}
Timestamp next_delivery = Timestamp::PlusInfinity();
if (network.NextDeliveryTimeUs())
next_delivery = Timestamp::us(*network.NextDeliveryTimeUs());
current_time = std::min(next_packet_time, next_delivery);
if (current_time >= next_delivery) {
for (PacketDeliveryInfo packet :
network.DequeueDeliverablePackets(current_time.us())) {
EXPECT_NE(packet.receive_time_us, kNotReceived);
pending.erase(packet.packet_id);
}
}
}
while (network.NextDeliveryTimeUs()) {
for (PacketDeliveryInfo packet :
network.DequeueDeliverablePackets(*network.NextDeliveryTimeUs())) {
EXPECT_NE(packet.receive_time_us, kNotReceived);
pending.erase(packet.packet_id);
}
}
EXPECT_EQ(pending.size(), 0u);
}
TEST(SimulatedNetworkTest, CodelLimitsDelayAndDropsPacketsOnOverload) {
const TimeDelta kRuntime = TimeDelta::seconds(30);
const TimeDelta kCheckInterval = TimeDelta::ms(2000);
DataRate link_capacity = DataRate::kbps(1000);
const DataSize rough_packet_size = DataSize::bytes(1500);
const double overload_rate = 1.5;
SimulatedNetwork::Config config;
config.codel_active_queue_management = true;
config.queue_delay_ms = 10;
config.link_capacity_kbps = link_capacity.kbps();
SimulatedNetwork network(config);
const TimeDelta packet_inverval = rough_packet_size / link_capacity;
const DataSize packet_size = overload_rate * link_capacity * packet_inverval;
// Send above capacity and see delays are still controlled at the cost of
// packet loss.
Timestamp start_time = Timestamp::ms(0);
Timestamp current_time = start_time;
Timestamp next_packet_time = start_time;
Timestamp last_check = start_time;
uint64_t next_id = 1;
std::map<uint64_t, int64_t> send_times_us;
int lost = 0;
std::vector<int64_t> delays_us;
while (current_time - start_time < kRuntime) {
if (current_time >= next_packet_time) {
bool success = network.EnqueuePacket(PacketInFlightInfo{
packet_size.bytes<size_t>(), current_time.us(), next_id});
send_times_us.insert({next_id, current_time.us()});
++next_id;
EXPECT_TRUE(success);
next_packet_time += packet_inverval;
}
Timestamp next_delivery = Timestamp::PlusInfinity();
if (network.NextDeliveryTimeUs())
next_delivery = Timestamp::us(*network.NextDeliveryTimeUs());
current_time = std::min(next_packet_time, next_delivery);
if (current_time >= next_delivery) {
for (PacketDeliveryInfo packet :
network.DequeueDeliverablePackets(current_time.us())) {
if (packet.receive_time_us == kNotReceived) {
++lost;
} else {
delays_us.push_back(packet.receive_time_us -
send_times_us[packet.packet_id]);
}
send_times_us.erase(packet.packet_id);
}
}
if (current_time > last_check + kCheckInterval) {
last_check = current_time;
TimeDelta average_delay =
TimeDelta::us(absl::c_accumulate(delays_us, 0)) / delays_us.size();
double loss_ratio = static_cast<double>(lost) / (lost + delays_us.size());
EXPECT_LT(average_delay.ms(), 200)
<< "Time " << (current_time - start_time).ms() << "\n";
EXPECT_GT(loss_ratio, 0.5 * (overload_rate - 1));
}
}
while (network.NextDeliveryTimeUs()) {
for (PacketDeliveryInfo packet :
network.DequeueDeliverablePackets(*network.NextDeliveryTimeUs())) {
send_times_us.erase(packet.packet_id);
}
}
EXPECT_EQ(send_times_us.size(), 0u);
}
} // namespace webrtc

View File

@ -25,6 +25,8 @@ SimulatedNetwork::Config CreateSimulationConfig(NetworkNodeConfig config) {
sim_config.queue_delay_ms = config.simulation.delay.ms();
sim_config.delay_standard_deviation_ms = config.simulation.delay_std_dev.ms();
sim_config.packet_overhead = config.packet_overhead.bytes<int>();
sim_config.codel_active_queue_management =
config.simulation.codel_active_queue_management;
return sim_config;
}
} // namespace

View File

@ -242,6 +242,7 @@ struct NetworkNodeConfig {
TimeDelta delay = TimeDelta::Zero();
TimeDelta delay_std_dev = TimeDelta::Zero();
double loss_rate = 0;
bool codel_active_queue_management = false;
} simulation;
DataSize packet_overhead = DataSize::Zero();
TimeDelta update_frequency = TimeDelta::ms(1);