Reduces locking in SimulatedNetwork class.
Bug: webrtc:9883 Change-Id: I07c78fd2dbba5e7481194b0d1aabfb56809ff6fc Reviewed-on: https://webrtc-review.googlesource.com/c/120612 Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org> Commit-Queue: Sebastian Jansson <srte@webrtc.org> Cr-Commit-Position: refs/heads/master@{#26491}
This commit is contained in:
parent
8102a1a8ea
commit
eceea31cc2
@ -287,6 +287,7 @@ rtc_source_set("simulated_network") {
|
||||
"../api/units:time_delta",
|
||||
"../rtc_base:checks",
|
||||
"../rtc_base:rtc_base_approved",
|
||||
"../rtc_base:sequenced_task_checker",
|
||||
"//third_party/abseil-cpp/absl/memory",
|
||||
"//third_party/abseil-cpp/absl/types:optional",
|
||||
]
|
||||
|
||||
@ -31,12 +31,12 @@ SimulatedNetwork::~SimulatedNetwork() = default;
|
||||
|
||||
void SimulatedNetwork::SetConfig(const SimulatedNetwork::Config& config) {
|
||||
rtc::CritScope crit(&config_lock_);
|
||||
config_ = config; // Shallow copy of the struct.
|
||||
config_state_.config = config; // Shallow copy of the struct.
|
||||
double prob_loss = config.loss_percent / 100.0;
|
||||
if (config_.avg_burst_loss_length == -1) {
|
||||
if (config_state_.config.avg_burst_loss_length == -1) {
|
||||
// Uniform loss
|
||||
prob_loss_bursting_ = prob_loss;
|
||||
prob_start_bursting_ = prob_loss;
|
||||
config_state_.prob_loss_bursting = prob_loss;
|
||||
config_state_.prob_start_bursting = prob_loss;
|
||||
} else {
|
||||
// Lose packets according to a gilbert-elliot model.
|
||||
int avg_burst_loss_length = config.avg_burst_loss_length;
|
||||
@ -47,29 +47,27 @@ void SimulatedNetwork::SetConfig(const SimulatedNetwork::Config& config) {
|
||||
<< " avg_burst_loss_length must be " << min_avg_burst_loss_length + 1
|
||||
<< " or higher.";
|
||||
|
||||
prob_loss_bursting_ = (1.0 - 1.0 / avg_burst_loss_length);
|
||||
prob_start_bursting_ = prob_loss / (1 - prob_loss) / avg_burst_loss_length;
|
||||
config_state_.prob_loss_bursting = (1.0 - 1.0 / avg_burst_loss_length);
|
||||
config_state_.prob_start_bursting =
|
||||
prob_loss / (1 - prob_loss) / avg_burst_loss_length;
|
||||
}
|
||||
}
|
||||
|
||||
void SimulatedNetwork::PauseTransmissionUntil(int64_t until_us) {
|
||||
rtc::CritScope crit(&config_lock_);
|
||||
pause_transmission_until_us_ = until_us;
|
||||
config_state_.pause_transmission_until_us = until_us;
|
||||
}
|
||||
|
||||
bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
|
||||
Config config;
|
||||
{
|
||||
rtc::CritScope crit(&config_lock_);
|
||||
config = config_;
|
||||
}
|
||||
RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
|
||||
ConfigState state = GetConfigState();
|
||||
|
||||
UpdateCapacityQueue(packet.send_time_us);
|
||||
UpdateCapacityQueue(state, packet.send_time_us);
|
||||
|
||||
packet.size += config.packet_overhead;
|
||||
rtc::CritScope crit(&process_lock_);
|
||||
if (config.queue_length_packets > 0 &&
|
||||
capacity_link_.size() >= config.queue_length_packets) {
|
||||
packet.size += state.config.packet_overhead;
|
||||
|
||||
if (state.config.queue_length_packets > 0 &&
|
||||
capacity_link_.size() >= state.config.queue_length_packets) {
|
||||
// Too many packet on the link, drop this one.
|
||||
return false;
|
||||
}
|
||||
@ -83,123 +81,114 @@ bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
|
||||
}
|
||||
|
||||
absl::optional<int64_t> SimulatedNetwork::NextDeliveryTimeUs() const {
|
||||
rtc::CritScope crit(&process_lock_);
|
||||
RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
|
||||
if (!delay_link_.empty())
|
||||
return delay_link_.begin()->arrival_time_us;
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
void SimulatedNetwork::UpdateCapacityQueue(int64_t time_now_us) {
|
||||
Config config;
|
||||
double prob_loss_bursting;
|
||||
double prob_start_bursting;
|
||||
int64_t pause_transmission_until_us;
|
||||
{
|
||||
rtc::CritScope crit(&config_lock_);
|
||||
config = config_;
|
||||
prob_loss_bursting = prob_loss_bursting_;
|
||||
prob_start_bursting = prob_start_bursting_;
|
||||
pause_transmission_until_us = pause_transmission_until_us_.value_or(0);
|
||||
}
|
||||
{
|
||||
rtc::CritScope crit(&process_lock_);
|
||||
bool needs_sort = false;
|
||||
void SimulatedNetwork::UpdateCapacityQueue(ConfigState state,
|
||||
int64_t time_now_us) {
|
||||
bool needs_sort = false;
|
||||
|
||||
// Catch for thread races.
|
||||
if (time_now_us < last_capacity_link_visit_us_.value_or(time_now_us))
|
||||
return;
|
||||
// Catch for thread races.
|
||||
if (time_now_us < last_capacity_link_visit_us_.value_or(time_now_us))
|
||||
return;
|
||||
|
||||
int64_t time_us = last_capacity_link_visit_us_.value_or(time_now_us);
|
||||
// Check the capacity link first.
|
||||
while (!capacity_link_.empty()) {
|
||||
int64_t time_until_front_exits_us = 0;
|
||||
if (config.link_capacity_kbps > 0) {
|
||||
int64_t remaining_bits =
|
||||
capacity_link_.front().packet.size * 8 - pending_drain_bits_;
|
||||
RTC_DCHECK(remaining_bits > 0);
|
||||
// Division rounded up - packet not delivered until its last bit is.
|
||||
time_until_front_exits_us =
|
||||
(1000 * remaining_bits + config.link_capacity_kbps - 1) /
|
||||
config.link_capacity_kbps;
|
||||
}
|
||||
|
||||
if (time_us + time_until_front_exits_us > time_now_us) {
|
||||
// Packet at front will not exit yet. Will not enter here on infinite
|
||||
// capacity(=0) so no special handling needed.
|
||||
pending_drain_bits_ +=
|
||||
((time_now_us - time_us) * config.link_capacity_kbps) / 1000;
|
||||
break;
|
||||
}
|
||||
if (config.link_capacity_kbps > 0) {
|
||||
pending_drain_bits_ +=
|
||||
(time_until_front_exits_us * config.link_capacity_kbps) / 1000;
|
||||
} else {
|
||||
// Enough to drain the whole queue.
|
||||
pending_drain_bits_ = queue_size_bytes_ * 8;
|
||||
}
|
||||
|
||||
// Time to get this packet.
|
||||
PacketInfo packet = std::move(capacity_link_.front());
|
||||
capacity_link_.pop();
|
||||
|
||||
time_us += time_until_front_exits_us;
|
||||
RTC_DCHECK(time_us >= packet.packet.send_time_us);
|
||||
packet.arrival_time_us = std::max(pause_transmission_until_us, time_us);
|
||||
queue_size_bytes_ -= packet.packet.size;
|
||||
pending_drain_bits_ -= packet.packet.size * 8;
|
||||
RTC_DCHECK(pending_drain_bits_ >= 0);
|
||||
|
||||
// Drop packets at an average rate of |config_.loss_percent| with
|
||||
// and average loss burst length of |config_.avg_burst_loss_length|.
|
||||
if ((bursting_ && random_.Rand<double>() < prob_loss_bursting) ||
|
||||
(!bursting_ && random_.Rand<double>() < prob_start_bursting)) {
|
||||
bursting_ = true;
|
||||
packet.arrival_time_us = PacketDeliveryInfo::kNotReceived;
|
||||
} else {
|
||||
bursting_ = false;
|
||||
int64_t arrival_time_jitter_us = std::max(
|
||||
random_.Gaussian(config.queue_delay_ms * 1000,
|
||||
config.delay_standard_deviation_ms * 1000),
|
||||
0.0);
|
||||
|
||||
// If reordering is not allowed then adjust arrival_time_jitter
|
||||
// to make sure all packets are sent in order.
|
||||
int64_t last_arrival_time_us =
|
||||
delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us;
|
||||
if (!config.allow_reordering && !delay_link_.empty() &&
|
||||
packet.arrival_time_us + arrival_time_jitter_us <
|
||||
last_arrival_time_us) {
|
||||
arrival_time_jitter_us =
|
||||
last_arrival_time_us - packet.arrival_time_us;
|
||||
}
|
||||
packet.arrival_time_us += arrival_time_jitter_us;
|
||||
if (packet.arrival_time_us >= last_arrival_time_us) {
|
||||
last_arrival_time_us = packet.arrival_time_us;
|
||||
} else {
|
||||
needs_sort = true;
|
||||
}
|
||||
}
|
||||
delay_link_.emplace_back(std::move(packet));
|
||||
int64_t time_us = last_capacity_link_visit_us_.value_or(time_now_us);
|
||||
// Check the capacity link first.
|
||||
while (!capacity_link_.empty()) {
|
||||
int64_t time_until_front_exits_us = 0;
|
||||
if (state.config.link_capacity_kbps > 0) {
|
||||
int64_t remaining_bits =
|
||||
capacity_link_.front().packet.size * 8 - pending_drain_bits_;
|
||||
RTC_DCHECK(remaining_bits > 0);
|
||||
// Division rounded up - packet not delivered until its last bit is.
|
||||
time_until_front_exits_us =
|
||||
(1000 * remaining_bits + state.config.link_capacity_kbps - 1) /
|
||||
state.config.link_capacity_kbps;
|
||||
}
|
||||
last_capacity_link_visit_us_ = time_now_us;
|
||||
// Cannot save unused capacity for later.
|
||||
pending_drain_bits_ = std::min(pending_drain_bits_, queue_size_bytes_ * 8);
|
||||
|
||||
if (needs_sort) {
|
||||
// Packet(s) arrived out of order, make sure list is sorted.
|
||||
std::sort(delay_link_.begin(), delay_link_.end(),
|
||||
[](const PacketInfo& p1, const PacketInfo& p2) {
|
||||
return p1.arrival_time_us < p2.arrival_time_us;
|
||||
});
|
||||
if (time_us + time_until_front_exits_us > time_now_us) {
|
||||
// Packet at front will not exit yet. Will not enter here on infinite
|
||||
// capacity(=0) so no special handling needed.
|
||||
pending_drain_bits_ +=
|
||||
((time_now_us - time_us) * state.config.link_capacity_kbps) / 1000;
|
||||
break;
|
||||
}
|
||||
if (state.config.link_capacity_kbps > 0) {
|
||||
pending_drain_bits_ +=
|
||||
(time_until_front_exits_us * state.config.link_capacity_kbps) / 1000;
|
||||
} else {
|
||||
// Enough to drain the whole queue.
|
||||
pending_drain_bits_ = queue_size_bytes_ * 8;
|
||||
}
|
||||
|
||||
// Time to get this packet.
|
||||
PacketInfo packet = std::move(capacity_link_.front());
|
||||
capacity_link_.pop();
|
||||
|
||||
time_us += time_until_front_exits_us;
|
||||
RTC_DCHECK(time_us >= packet.packet.send_time_us);
|
||||
packet.arrival_time_us =
|
||||
std::max(state.pause_transmission_until_us, time_us);
|
||||
queue_size_bytes_ -= packet.packet.size;
|
||||
pending_drain_bits_ -= packet.packet.size * 8;
|
||||
RTC_DCHECK(pending_drain_bits_ >= 0);
|
||||
|
||||
// Drop packets at an average rate of |state.config.loss_percent| with
|
||||
// and average loss burst length of |state.config.avg_burst_loss_length|.
|
||||
if ((bursting_ && random_.Rand<double>() < state.prob_loss_bursting) ||
|
||||
(!bursting_ && random_.Rand<double>() < state.prob_start_bursting)) {
|
||||
bursting_ = true;
|
||||
packet.arrival_time_us = PacketDeliveryInfo::kNotReceived;
|
||||
} else {
|
||||
bursting_ = false;
|
||||
int64_t arrival_time_jitter_us = std::max(
|
||||
random_.Gaussian(state.config.queue_delay_ms * 1000,
|
||||
state.config.delay_standard_deviation_ms * 1000),
|
||||
0.0);
|
||||
|
||||
// If reordering is not allowed then adjust arrival_time_jitter
|
||||
// to make sure all packets are sent in order.
|
||||
int64_t last_arrival_time_us =
|
||||
delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us;
|
||||
if (!state.config.allow_reordering && !delay_link_.empty() &&
|
||||
packet.arrival_time_us + arrival_time_jitter_us <
|
||||
last_arrival_time_us) {
|
||||
arrival_time_jitter_us = last_arrival_time_us - packet.arrival_time_us;
|
||||
}
|
||||
packet.arrival_time_us += arrival_time_jitter_us;
|
||||
if (packet.arrival_time_us >= last_arrival_time_us) {
|
||||
last_arrival_time_us = packet.arrival_time_us;
|
||||
} else {
|
||||
needs_sort = true;
|
||||
}
|
||||
}
|
||||
delay_link_.emplace_back(std::move(packet));
|
||||
}
|
||||
last_capacity_link_visit_us_ = time_now_us;
|
||||
// Cannot save unused capacity for later.
|
||||
pending_drain_bits_ = std::min(pending_drain_bits_, queue_size_bytes_ * 8);
|
||||
|
||||
if (needs_sort) {
|
||||
// Packet(s) arrived out of order, make sure list is sorted.
|
||||
std::sort(delay_link_.begin(), delay_link_.end(),
|
||||
[](const PacketInfo& p1, const PacketInfo& p2) {
|
||||
return p1.arrival_time_us < p2.arrival_time_us;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
SimulatedNetwork::ConfigState SimulatedNetwork::GetConfigState() const {
|
||||
rtc::CritScope crit(&config_lock_);
|
||||
return config_state_;
|
||||
}
|
||||
|
||||
std::vector<PacketDeliveryInfo> SimulatedNetwork::DequeueDeliverablePackets(
|
||||
int64_t receive_time_us) {
|
||||
UpdateCapacityQueue(receive_time_us);
|
||||
|
||||
rtc::CritScope crit(&process_lock_);
|
||||
RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
|
||||
UpdateCapacityQueue(GetConfigState(), receive_time_us);
|
||||
std::vector<PacketDeliveryInfo> packets_to_deliver;
|
||||
// Check the extra delay queue.
|
||||
while (!delay_link_.empty() &&
|
||||
|
||||
@ -18,8 +18,10 @@
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/test/simulated_network.h"
|
||||
#include "rtc_base/critical_section.h"
|
||||
#include "rtc_base/race_checker.h"
|
||||
#include "rtc_base/random.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
#include "rtc_base/thread_checker.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
@ -48,39 +50,43 @@ class SimulatedNetwork : public NetworkBehaviorInterface {
|
||||
PacketInFlightInfo packet;
|
||||
int64_t arrival_time_us;
|
||||
};
|
||||
// Contains current configuration state.
|
||||
struct ConfigState {
|
||||
// Static link configuration.
|
||||
Config config;
|
||||
// The probability to drop the packet if we are currently dropping a
|
||||
// burst of packet
|
||||
double prob_loss_bursting;
|
||||
// The probability to drop a burst of packets.
|
||||
double prob_start_bursting;
|
||||
// Used for temporary delay spikes.
|
||||
int64_t pause_transmission_until_us = 0;
|
||||
};
|
||||
|
||||
// Moves packets from capacity- to delay link.
|
||||
void UpdateCapacityQueue(int64_t time_now_us);
|
||||
void UpdateCapacityQueue(ConfigState state, int64_t time_now_us)
|
||||
RTC_RUN_ON(&process_checker_);
|
||||
ConfigState GetConfigState() const;
|
||||
|
||||
rtc::CriticalSection config_lock_;
|
||||
|
||||
// |process_lock| guards the data structures involved in delay and loss
|
||||
// |process_checker_| guards the data structures involved in delay and loss
|
||||
// processes, such as the packet queues.
|
||||
rtc::CriticalSection process_lock_;
|
||||
std::queue<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_lock_);
|
||||
rtc::RaceChecker process_checker_;
|
||||
std::queue<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_checker_);
|
||||
Random random_;
|
||||
|
||||
std::deque<PacketInfo> delay_link_ RTC_GUARDED_BY(process_lock_);
|
||||
std::deque<PacketInfo> delay_link_ RTC_GUARDED_BY(process_checker_);
|
||||
|
||||
// Link configuration.
|
||||
Config config_ RTC_GUARDED_BY(config_lock_);
|
||||
absl::optional<int64_t> pause_transmission_until_us_
|
||||
RTC_GUARDED_BY(config_lock_);
|
||||
ConfigState config_state_ RTC_GUARDED_BY(config_lock_);
|
||||
|
||||
// Are we currently dropping a burst of packets?
|
||||
bool bursting_;
|
||||
|
||||
// The probability to drop the packet if we are currently dropping a
|
||||
// burst of packet
|
||||
double prob_loss_bursting_ RTC_GUARDED_BY(config_lock_);
|
||||
|
||||
// The probability to drop a burst of packets.
|
||||
double prob_start_bursting_ RTC_GUARDED_BY(config_lock_);
|
||||
|
||||
int64_t queue_size_bytes_ RTC_GUARDED_BY(process_lock_) = 0;
|
||||
int64_t pending_drain_bits_ RTC_GUARDED_BY(process_lock_) = 0;
|
||||
int64_t queue_size_bytes_ RTC_GUARDED_BY(process_checker_) = 0;
|
||||
int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0;
|
||||
absl::optional<int64_t> last_capacity_link_visit_us_
|
||||
RTC_GUARDED_BY(process_lock_);
|
||||
RTC_GUARDED_BY(process_checker_);
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user