diff --git a/call/BUILD.gn b/call/BUILD.gn index 8d65aa23c9..38b2f4145c 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -287,6 +287,7 @@ rtc_source_set("simulated_network") { "../api/units:time_delta", "../rtc_base:checks", "../rtc_base:rtc_base_approved", + "../rtc_base:sequenced_task_checker", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/types:optional", ] diff --git a/call/simulated_network.cc b/call/simulated_network.cc index 92e945b4c4..9bb8bab360 100644 --- a/call/simulated_network.cc +++ b/call/simulated_network.cc @@ -31,12 +31,12 @@ SimulatedNetwork::~SimulatedNetwork() = default; void SimulatedNetwork::SetConfig(const SimulatedNetwork::Config& config) { rtc::CritScope crit(&config_lock_); - config_ = config; // Shallow copy of the struct. + config_state_.config = config; // Shallow copy of the struct. double prob_loss = config.loss_percent / 100.0; - if (config_.avg_burst_loss_length == -1) { + if (config_state_.config.avg_burst_loss_length == -1) { // Uniform loss - prob_loss_bursting_ = prob_loss; - prob_start_bursting_ = prob_loss; + config_state_.prob_loss_bursting = prob_loss; + config_state_.prob_start_bursting = prob_loss; } else { // Lose packets according to a gilbert-elliot model. int avg_burst_loss_length = config.avg_burst_loss_length; @@ -47,29 +47,27 @@ void SimulatedNetwork::SetConfig(const SimulatedNetwork::Config& config) { << " avg_burst_loss_length must be " << min_avg_burst_loss_length + 1 << " or higher."; - prob_loss_bursting_ = (1.0 - 1.0 / avg_burst_loss_length); - prob_start_bursting_ = prob_loss / (1 - prob_loss) / avg_burst_loss_length; + config_state_.prob_loss_bursting = (1.0 - 1.0 / avg_burst_loss_length); + config_state_.prob_start_bursting = + prob_loss / (1 - prob_loss) / avg_burst_loss_length; } } void SimulatedNetwork::PauseTransmissionUntil(int64_t until_us) { rtc::CritScope crit(&config_lock_); - pause_transmission_until_us_ = until_us; + config_state_.pause_transmission_until_us = until_us; } bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) { - Config config; - { - rtc::CritScope crit(&config_lock_); - config = config_; - } + RTC_DCHECK_RUNS_SERIALIZED(&process_checker_); + ConfigState state = GetConfigState(); - UpdateCapacityQueue(packet.send_time_us); + UpdateCapacityQueue(state, packet.send_time_us); - packet.size += config.packet_overhead; - rtc::CritScope crit(&process_lock_); - if (config.queue_length_packets > 0 && - capacity_link_.size() >= config.queue_length_packets) { + packet.size += state.config.packet_overhead; + + if (state.config.queue_length_packets > 0 && + capacity_link_.size() >= state.config.queue_length_packets) { // Too many packet on the link, drop this one. return false; } @@ -83,123 +81,114 @@ bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) { } absl::optional SimulatedNetwork::NextDeliveryTimeUs() const { - rtc::CritScope crit(&process_lock_); + RTC_DCHECK_RUNS_SERIALIZED(&process_checker_); if (!delay_link_.empty()) return delay_link_.begin()->arrival_time_us; return absl::nullopt; } -void SimulatedNetwork::UpdateCapacityQueue(int64_t time_now_us) { - Config config; - double prob_loss_bursting; - double prob_start_bursting; - int64_t pause_transmission_until_us; - { - rtc::CritScope crit(&config_lock_); - config = config_; - prob_loss_bursting = prob_loss_bursting_; - prob_start_bursting = prob_start_bursting_; - pause_transmission_until_us = pause_transmission_until_us_.value_or(0); - } - { - rtc::CritScope crit(&process_lock_); - bool needs_sort = false; +void SimulatedNetwork::UpdateCapacityQueue(ConfigState state, + int64_t time_now_us) { + bool needs_sort = false; - // Catch for thread races. - if (time_now_us < last_capacity_link_visit_us_.value_or(time_now_us)) - return; + // Catch for thread races. + if (time_now_us < last_capacity_link_visit_us_.value_or(time_now_us)) + return; - int64_t time_us = last_capacity_link_visit_us_.value_or(time_now_us); - // Check the capacity link first. - while (!capacity_link_.empty()) { - int64_t time_until_front_exits_us = 0; - if (config.link_capacity_kbps > 0) { - int64_t remaining_bits = - capacity_link_.front().packet.size * 8 - pending_drain_bits_; - RTC_DCHECK(remaining_bits > 0); - // Division rounded up - packet not delivered until its last bit is. - time_until_front_exits_us = - (1000 * remaining_bits + config.link_capacity_kbps - 1) / - config.link_capacity_kbps; - } - - if (time_us + time_until_front_exits_us > time_now_us) { - // Packet at front will not exit yet. Will not enter here on infinite - // capacity(=0) so no special handling needed. - pending_drain_bits_ += - ((time_now_us - time_us) * config.link_capacity_kbps) / 1000; - break; - } - if (config.link_capacity_kbps > 0) { - pending_drain_bits_ += - (time_until_front_exits_us * config.link_capacity_kbps) / 1000; - } else { - // Enough to drain the whole queue. - pending_drain_bits_ = queue_size_bytes_ * 8; - } - - // Time to get this packet. - PacketInfo packet = std::move(capacity_link_.front()); - capacity_link_.pop(); - - time_us += time_until_front_exits_us; - RTC_DCHECK(time_us >= packet.packet.send_time_us); - packet.arrival_time_us = std::max(pause_transmission_until_us, time_us); - queue_size_bytes_ -= packet.packet.size; - pending_drain_bits_ -= packet.packet.size * 8; - RTC_DCHECK(pending_drain_bits_ >= 0); - - // Drop packets at an average rate of |config_.loss_percent| with - // and average loss burst length of |config_.avg_burst_loss_length|. - if ((bursting_ && random_.Rand() < prob_loss_bursting) || - (!bursting_ && random_.Rand() < prob_start_bursting)) { - bursting_ = true; - packet.arrival_time_us = PacketDeliveryInfo::kNotReceived; - } else { - bursting_ = false; - int64_t arrival_time_jitter_us = std::max( - random_.Gaussian(config.queue_delay_ms * 1000, - config.delay_standard_deviation_ms * 1000), - 0.0); - - // If reordering is not allowed then adjust arrival_time_jitter - // to make sure all packets are sent in order. - int64_t last_arrival_time_us = - delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us; - if (!config.allow_reordering && !delay_link_.empty() && - packet.arrival_time_us + arrival_time_jitter_us < - last_arrival_time_us) { - arrival_time_jitter_us = - last_arrival_time_us - packet.arrival_time_us; - } - packet.arrival_time_us += arrival_time_jitter_us; - if (packet.arrival_time_us >= last_arrival_time_us) { - last_arrival_time_us = packet.arrival_time_us; - } else { - needs_sort = true; - } - } - delay_link_.emplace_back(std::move(packet)); + int64_t time_us = last_capacity_link_visit_us_.value_or(time_now_us); + // Check the capacity link first. + while (!capacity_link_.empty()) { + int64_t time_until_front_exits_us = 0; + if (state.config.link_capacity_kbps > 0) { + int64_t remaining_bits = + capacity_link_.front().packet.size * 8 - pending_drain_bits_; + RTC_DCHECK(remaining_bits > 0); + // Division rounded up - packet not delivered until its last bit is. + time_until_front_exits_us = + (1000 * remaining_bits + state.config.link_capacity_kbps - 1) / + state.config.link_capacity_kbps; } - last_capacity_link_visit_us_ = time_now_us; - // Cannot save unused capacity for later. - pending_drain_bits_ = std::min(pending_drain_bits_, queue_size_bytes_ * 8); - if (needs_sort) { - // Packet(s) arrived out of order, make sure list is sorted. - std::sort(delay_link_.begin(), delay_link_.end(), - [](const PacketInfo& p1, const PacketInfo& p2) { - return p1.arrival_time_us < p2.arrival_time_us; - }); + if (time_us + time_until_front_exits_us > time_now_us) { + // Packet at front will not exit yet. Will not enter here on infinite + // capacity(=0) so no special handling needed. + pending_drain_bits_ += + ((time_now_us - time_us) * state.config.link_capacity_kbps) / 1000; + break; } + if (state.config.link_capacity_kbps > 0) { + pending_drain_bits_ += + (time_until_front_exits_us * state.config.link_capacity_kbps) / 1000; + } else { + // Enough to drain the whole queue. + pending_drain_bits_ = queue_size_bytes_ * 8; + } + + // Time to get this packet. + PacketInfo packet = std::move(capacity_link_.front()); + capacity_link_.pop(); + + time_us += time_until_front_exits_us; + RTC_DCHECK(time_us >= packet.packet.send_time_us); + packet.arrival_time_us = + std::max(state.pause_transmission_until_us, time_us); + queue_size_bytes_ -= packet.packet.size; + pending_drain_bits_ -= packet.packet.size * 8; + RTC_DCHECK(pending_drain_bits_ >= 0); + + // Drop packets at an average rate of |state.config.loss_percent| with + // and average loss burst length of |state.config.avg_burst_loss_length|. + if ((bursting_ && random_.Rand() < state.prob_loss_bursting) || + (!bursting_ && random_.Rand() < state.prob_start_bursting)) { + bursting_ = true; + packet.arrival_time_us = PacketDeliveryInfo::kNotReceived; + } else { + bursting_ = false; + int64_t arrival_time_jitter_us = std::max( + random_.Gaussian(state.config.queue_delay_ms * 1000, + state.config.delay_standard_deviation_ms * 1000), + 0.0); + + // If reordering is not allowed then adjust arrival_time_jitter + // to make sure all packets are sent in order. + int64_t last_arrival_time_us = + delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us; + if (!state.config.allow_reordering && !delay_link_.empty() && + packet.arrival_time_us + arrival_time_jitter_us < + last_arrival_time_us) { + arrival_time_jitter_us = last_arrival_time_us - packet.arrival_time_us; + } + packet.arrival_time_us += arrival_time_jitter_us; + if (packet.arrival_time_us >= last_arrival_time_us) { + last_arrival_time_us = packet.arrival_time_us; + } else { + needs_sort = true; + } + } + delay_link_.emplace_back(std::move(packet)); } + last_capacity_link_visit_us_ = time_now_us; + // Cannot save unused capacity for later. + pending_drain_bits_ = std::min(pending_drain_bits_, queue_size_bytes_ * 8); + + if (needs_sort) { + // Packet(s) arrived out of order, make sure list is sorted. + std::sort(delay_link_.begin(), delay_link_.end(), + [](const PacketInfo& p1, const PacketInfo& p2) { + return p1.arrival_time_us < p2.arrival_time_us; + }); + } +} + +SimulatedNetwork::ConfigState SimulatedNetwork::GetConfigState() const { + rtc::CritScope crit(&config_lock_); + return config_state_; } std::vector SimulatedNetwork::DequeueDeliverablePackets( int64_t receive_time_us) { - UpdateCapacityQueue(receive_time_us); - - rtc::CritScope crit(&process_lock_); + RTC_DCHECK_RUNS_SERIALIZED(&process_checker_); + UpdateCapacityQueue(GetConfigState(), receive_time_us); std::vector packets_to_deliver; // Check the extra delay queue. while (!delay_link_.empty() && diff --git a/call/simulated_network.h b/call/simulated_network.h index 4085600d2c..6adb412edf 100644 --- a/call/simulated_network.h +++ b/call/simulated_network.h @@ -18,8 +18,10 @@ #include "absl/types/optional.h" #include "api/test/simulated_network.h" #include "rtc_base/critical_section.h" +#include "rtc_base/race_checker.h" #include "rtc_base/random.h" #include "rtc_base/thread_annotations.h" +#include "rtc_base/thread_checker.h" namespace webrtc { @@ -48,39 +50,43 @@ class SimulatedNetwork : public NetworkBehaviorInterface { PacketInFlightInfo packet; int64_t arrival_time_us; }; + // Contains current configuration state. + struct ConfigState { + // Static link configuration. + Config config; + // The probability to drop the packet if we are currently dropping a + // burst of packet + double prob_loss_bursting; + // The probability to drop a burst of packets. + double prob_start_bursting; + // Used for temporary delay spikes. + int64_t pause_transmission_until_us = 0; + }; // Moves packets from capacity- to delay link. - void UpdateCapacityQueue(int64_t time_now_us); + void UpdateCapacityQueue(ConfigState state, int64_t time_now_us) + RTC_RUN_ON(&process_checker_); + ConfigState GetConfigState() const; rtc::CriticalSection config_lock_; - // |process_lock| guards the data structures involved in delay and loss + // |process_checker_| guards the data structures involved in delay and loss // processes, such as the packet queues. - rtc::CriticalSection process_lock_; - std::queue capacity_link_ RTC_GUARDED_BY(process_lock_); + rtc::RaceChecker process_checker_; + std::queue capacity_link_ RTC_GUARDED_BY(process_checker_); Random random_; - std::deque delay_link_ RTC_GUARDED_BY(process_lock_); + std::deque delay_link_ RTC_GUARDED_BY(process_checker_); - // Link configuration. - Config config_ RTC_GUARDED_BY(config_lock_); - absl::optional pause_transmission_until_us_ - RTC_GUARDED_BY(config_lock_); + ConfigState config_state_ RTC_GUARDED_BY(config_lock_); // Are we currently dropping a burst of packets? bool bursting_; - // The probability to drop the packet if we are currently dropping a - // burst of packet - double prob_loss_bursting_ RTC_GUARDED_BY(config_lock_); - - // The probability to drop a burst of packets. - double prob_start_bursting_ RTC_GUARDED_BY(config_lock_); - - int64_t queue_size_bytes_ RTC_GUARDED_BY(process_lock_) = 0; - int64_t pending_drain_bits_ RTC_GUARDED_BY(process_lock_) = 0; + int64_t queue_size_bytes_ RTC_GUARDED_BY(process_checker_) = 0; + int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0; absl::optional last_capacity_link_visit_us_ - RTC_GUARDED_BY(process_lock_); + RTC_GUARDED_BY(process_checker_); }; } // namespace webrtc