From 2b08e3188e181ecb2bac5cdd4edd160f0018c90e Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Mon, 25 Feb 2019 10:24:46 +0100 Subject: [PATCH] Adds CoDel implementation to network simulation. Adds an implementation of the CoDel active queue management algorithm to the network simulation. It is loosely based on CoDel pseudocode from ACMQueue: https://queue.acm.org/appendices/codel.html Bug: webrtc:9510 Change-Id: Ice485be35a01dafa6169d697b51b5c1b33a49ba6 Reviewed-on: https://webrtc-review.googlesource.com/c/123581 Commit-Queue: Sebastian Jansson Reviewed-by: Per Kjellander Reviewed-by: Christoffer Rodbro Cr-Commit-Position: refs/heads/master@{#26834} --- api/test/simulated_network.h | 2 + call/BUILD.gn | 4 + call/simulated_network.cc | 75 ++++++++++++++- call/simulated_network.h | 26 +++++ call/simulated_network_unittest.cc | 147 +++++++++++++++++++++++++++++ test/scenario/network_node.cc | 2 + test/scenario/scenario_config.h | 1 + 7 files changed, 254 insertions(+), 3 deletions(-) create mode 100644 call/simulated_network_unittest.cc diff --git a/api/test/simulated_network.h b/api/test/simulated_network.h index 5961724371..c5273c32fe 100644 --- a/api/test/simulated_network.h +++ b/api/test/simulated_network.h @@ -63,6 +63,8 @@ struct BuiltInNetworkBehaviorConfig { int avg_burst_loss_length = -1; // Additional bytes to add to packet size. int packet_overhead = 0; + // Enable CoDel active queue management. + bool codel_active_queue_management = false; }; class NetworkBehaviorInterface { diff --git a/call/BUILD.gn b/call/BUILD.gn index 97cf71aa39..210931cf86 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -286,6 +286,7 @@ rtc_source_set("simulated_network") { "../api/units:data_rate", "../api/units:data_size", "../api/units:time_delta", + "../api/units:timestamp", "../rtc_base:checks", "../rtc_base:rtc_base_approved", "../rtc_base:sequenced_task_checker", @@ -498,13 +499,16 @@ if (rtc_include_tests) { sources = [ "fake_network_pipe_unittest.cc", + "simulated_network_unittest.cc", ] deps = [ ":fake_network", ":simulated_network", + "../api/units:data_rate", "../system_wrappers", "../test:test_support", "//testing/gtest", + "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/memory", ] } diff --git a/call/simulated_network.cc b/call/simulated_network.cc index c80255f388..a8c962af21 100644 --- a/call/simulated_network.cc +++ b/call/simulated_network.cc @@ -21,7 +21,62 @@ namespace webrtc { namespace { -constexpr int64_t kDefaultProcessDelayUs = 5000; +constexpr TimeDelta kDefaultProcessDelay = TimeDelta::Millis<5>(); +} // namespace + +CoDelSimulation::CoDelSimulation() = default; +CoDelSimulation::~CoDelSimulation() = default; + +bool CoDelSimulation::DropDequeuedPacket(Timestamp now, + Timestamp enqueing_time, + DataSize packet_size, + DataSize queue_size) { + constexpr TimeDelta kWindow = TimeDelta::Millis<100>(); + constexpr TimeDelta kDelayThreshold = TimeDelta::Millis<5>(); + constexpr TimeDelta kDropCountMemory = TimeDelta::Millis<1600>(); + constexpr DataSize kMaxPacketSize = DataSize::Bytes<1500>(); + + // Compensates for process interval in simulation; not part of standard CoDel. + TimeDelta queuing_time = now - enqueing_time - kDefaultProcessDelay; + + if (queue_size < kMaxPacketSize || queuing_time < kDelayThreshold) { + enter_drop_state_at_ = Timestamp::PlusInfinity(); + state_ = kNormal; + return false; + } + switch (state_) { + case kNormal: + enter_drop_state_at_ = now + kWindow; + state_ = kPending; + return false; + + case kPending: + if (now >= enter_drop_state_at_) { + state_ = kDropping; + // Starting the drop counter with the drops made during the most recent + // drop state period. + drop_count_ = drop_count_ - previous_drop_count_; + if (now >= last_drop_at_ + kDropCountMemory) + drop_count_ = 0; + previous_drop_count_ = drop_count_; + last_drop_at_ = now; + ++drop_count_; + return true; + } + return false; + + case kDropping: + TimeDelta drop_delay = kWindow / sqrt(static_cast(drop_count_)); + Timestamp next_drop_at = last_drop_at_ + drop_delay; + if (now >= next_drop_at) { + if (queue_size - packet_size < kMaxPacketSize) + state_ = kPending; + last_drop_at_ = next_drop_at; + ++drop_count_; + return true; + } + return false; + } } SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config, @@ -80,7 +135,7 @@ bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) { queue_size_bytes_ += packet.size; capacity_link_.push({packet, packet.send_time_us}); if (!next_process_time_us_) { - next_process_time_us_ = packet.send_time_us + kDefaultProcessDelayUs; + next_process_time_us_ = packet.send_time_us + kDefaultProcessDelay.us(); } return true; @@ -133,6 +188,20 @@ void SimulatedNetwork::UpdateCapacityQueue(ConfigState state, capacity_link_.pop(); time_us += time_until_front_exits_us; + if (state.config.codel_active_queue_management) { + while (!capacity_link_.empty() && + codel_controller_.DropDequeuedPacket( + Timestamp::us(time_us), + Timestamp::us(capacity_link_.front().packet.send_time_us), + DataSize::bytes(capacity_link_.front().packet.size), + DataSize::bytes(queue_size_bytes_))) { + PacketInfo dropped = capacity_link_.front(); + capacity_link_.pop(); + queue_size_bytes_ -= dropped.packet.size; + dropped.arrival_time_us = PacketDeliveryInfo::kNotReceived; + delay_link_.emplace_back(dropped); + } + } RTC_DCHECK(time_us >= packet.packet.send_time_us); packet.arrival_time_us = std::max(state.pause_transmission_until_us, time_us); @@ -206,7 +275,7 @@ std::vector SimulatedNetwork::DequeueDeliverablePackets( if (!delay_link_.empty()) { next_process_time_us_ = delay_link_.front().arrival_time_us; } else if (!capacity_link_.empty()) { - next_process_time_us_ = receive_time_us + kDefaultProcessDelayUs; + next_process_time_us_ = receive_time_us + kDefaultProcessDelay.us(); } else { next_process_time_us_.reset(); } diff --git a/call/simulated_network.h b/call/simulated_network.h index 632eb5dfed..71060634de 100644 --- a/call/simulated_network.h +++ b/call/simulated_network.h @@ -17,6 +17,8 @@ #include "absl/types/optional.h" #include "api/test/simulated_network.h" +#include "api/units/data_size.h" +#include "api/units/timestamp.h" #include "rtc_base/critical_section.h" #include "rtc_base/race_checker.h" #include "rtc_base/random.h" @@ -24,6 +26,29 @@ #include "rtc_base/thread_checker.h" namespace webrtc { +// Implementation of the CoDel active queue management algorithm. Loosely based +// on CoDel pseudocode from ACMQueue. CoDel keeps queuing delays low by dropping +// packets when delay is high. For each packet ready for dequeue, call +// DropDequeuePacket with the packet parameters to update the CoDel state. +class CoDelSimulation { + public: + CoDelSimulation(); + ~CoDelSimulation(); + + // Returns true if packet should be dropped. + bool DropDequeuedPacket(Timestamp now, + Timestamp enqueing_time, + DataSize packet_size, + DataSize queue_size); + + private: + enum State { kNormal, kPending, kDropping }; + Timestamp enter_drop_state_at_ = Timestamp::PlusInfinity(); + Timestamp last_drop_at_ = Timestamp::MinusInfinity(); + int drop_count_ = 0; + int previous_drop_count_ = 0; + State state_ = State::kNormal; +}; // Class simulating a network link. This is a simple and naive solution just // faking capacity and adding an extra transport delay in addition to the @@ -73,6 +98,7 @@ class SimulatedNetwork : public NetworkBehaviorInterface { // |process_checker_| guards the data structures involved in delay and loss // processes, such as the packet queues. rtc::RaceChecker process_checker_; + CoDelSimulation codel_controller_ RTC_GUARDED_BY(process_checker_); std::queue capacity_link_ RTC_GUARDED_BY(process_checker_); Random random_; diff --git a/call/simulated_network_unittest.cc b/call/simulated_network_unittest.cc new file mode 100644 index 0000000000..5c92e57a05 --- /dev/null +++ b/call/simulated_network_unittest.cc @@ -0,0 +1,147 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include +#include +#include + +#include +#include "absl/algorithm/container.h" +#include "api/units/data_rate.h" +#include "call/simulated_network.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { +constexpr int kNotReceived = PacketDeliveryInfo::kNotReceived; +} + +TEST(SimulatedNetworkTest, CodelDoesNothingAtCapacity) { + const TimeDelta kRuntime = TimeDelta::seconds(30); + + DataRate link_capacity = DataRate::kbps(1000); + const DataSize packet_size = DataSize::bytes(1000); + + SimulatedNetwork::Config config; + config.codel_active_queue_management = true; + config.queue_delay_ms = 10; + config.link_capacity_kbps = link_capacity.kbps(); + SimulatedNetwork network(config); + + // Need to round up here as otherwise we actually will choke. + const TimeDelta packet_inverval = + packet_size / link_capacity + TimeDelta::ms(1); + + // Send at capacity and see we get no loss. + Timestamp start_time = Timestamp::ms(0); + Timestamp current_time = start_time; + Timestamp next_packet_time = start_time; + uint64_t next_id = 0; + std::set pending; + while (current_time - start_time < kRuntime) { + if (current_time >= next_packet_time) { + bool success = network.EnqueuePacket(PacketInFlightInfo{ + packet_size.bytes(), current_time.us(), next_id}); + EXPECT_TRUE(success); + pending.insert(next_id); + ++next_id; + next_packet_time += packet_inverval; + } + Timestamp next_delivery = Timestamp::PlusInfinity(); + if (network.NextDeliveryTimeUs()) + next_delivery = Timestamp::us(*network.NextDeliveryTimeUs()); + current_time = std::min(next_packet_time, next_delivery); + if (current_time >= next_delivery) { + for (PacketDeliveryInfo packet : + network.DequeueDeliverablePackets(current_time.us())) { + EXPECT_NE(packet.receive_time_us, kNotReceived); + pending.erase(packet.packet_id); + } + } + } + while (network.NextDeliveryTimeUs()) { + for (PacketDeliveryInfo packet : + network.DequeueDeliverablePackets(*network.NextDeliveryTimeUs())) { + EXPECT_NE(packet.receive_time_us, kNotReceived); + pending.erase(packet.packet_id); + } + } + EXPECT_EQ(pending.size(), 0u); +} + +TEST(SimulatedNetworkTest, CodelLimitsDelayAndDropsPacketsOnOverload) { + const TimeDelta kRuntime = TimeDelta::seconds(30); + const TimeDelta kCheckInterval = TimeDelta::ms(2000); + + DataRate link_capacity = DataRate::kbps(1000); + const DataSize rough_packet_size = DataSize::bytes(1500); + const double overload_rate = 1.5; + + SimulatedNetwork::Config config; + config.codel_active_queue_management = true; + config.queue_delay_ms = 10; + config.link_capacity_kbps = link_capacity.kbps(); + SimulatedNetwork network(config); + + const TimeDelta packet_inverval = rough_packet_size / link_capacity; + const DataSize packet_size = overload_rate * link_capacity * packet_inverval; + // Send above capacity and see delays are still controlled at the cost of + // packet loss. + Timestamp start_time = Timestamp::ms(0); + Timestamp current_time = start_time; + Timestamp next_packet_time = start_time; + Timestamp last_check = start_time; + uint64_t next_id = 1; + std::map send_times_us; + int lost = 0; + std::vector delays_us; + while (current_time - start_time < kRuntime) { + if (current_time >= next_packet_time) { + bool success = network.EnqueuePacket(PacketInFlightInfo{ + packet_size.bytes(), current_time.us(), next_id}); + send_times_us.insert({next_id, current_time.us()}); + ++next_id; + EXPECT_TRUE(success); + next_packet_time += packet_inverval; + } + Timestamp next_delivery = Timestamp::PlusInfinity(); + if (network.NextDeliveryTimeUs()) + next_delivery = Timestamp::us(*network.NextDeliveryTimeUs()); + current_time = std::min(next_packet_time, next_delivery); + if (current_time >= next_delivery) { + for (PacketDeliveryInfo packet : + network.DequeueDeliverablePackets(current_time.us())) { + if (packet.receive_time_us == kNotReceived) { + ++lost; + } else { + delays_us.push_back(packet.receive_time_us - + send_times_us[packet.packet_id]); + } + send_times_us.erase(packet.packet_id); + } + } + if (current_time > last_check + kCheckInterval) { + last_check = current_time; + TimeDelta average_delay = + TimeDelta::us(absl::c_accumulate(delays_us, 0)) / delays_us.size(); + double loss_ratio = static_cast(lost) / (lost + delays_us.size()); + EXPECT_LT(average_delay.ms(), 200) + << "Time " << (current_time - start_time).ms() << "\n"; + EXPECT_GT(loss_ratio, 0.5 * (overload_rate - 1)); + } + } + while (network.NextDeliveryTimeUs()) { + for (PacketDeliveryInfo packet : + network.DequeueDeliverablePackets(*network.NextDeliveryTimeUs())) { + send_times_us.erase(packet.packet_id); + } + } + EXPECT_EQ(send_times_us.size(), 0u); +} +} // namespace webrtc diff --git a/test/scenario/network_node.cc b/test/scenario/network_node.cc index 1f16105c64..a206a5f6ef 100644 --- a/test/scenario/network_node.cc +++ b/test/scenario/network_node.cc @@ -25,6 +25,8 @@ SimulatedNetwork::Config CreateSimulationConfig(NetworkNodeConfig config) { sim_config.queue_delay_ms = config.simulation.delay.ms(); sim_config.delay_standard_deviation_ms = config.simulation.delay_std_dev.ms(); sim_config.packet_overhead = config.packet_overhead.bytes(); + sim_config.codel_active_queue_management = + config.simulation.codel_active_queue_management; return sim_config; } } // namespace diff --git a/test/scenario/scenario_config.h b/test/scenario/scenario_config.h index 68fe6a07e1..5532dba809 100644 --- a/test/scenario/scenario_config.h +++ b/test/scenario/scenario_config.h @@ -242,6 +242,7 @@ struct NetworkNodeConfig { TimeDelta delay = TimeDelta::Zero(); TimeDelta delay_std_dev = TimeDelta::Zero(); double loss_rate = 0; + bool codel_active_queue_management = false; } simulation; DataSize packet_overhead = DataSize::Zero(); TimeDelta update_frequency = TimeDelta::ms(1);