/* * Copyright 2018 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "test/scenario/simulated_time.h" #include #include #include #include #include #include "absl/types/optional.h" #include "rtc_base/checks.h" #include "rtc_base/socket_address.h" namespace webrtc { namespace test { namespace { struct RawFeedbackReportPacket { static constexpr int MAX_FEEDBACKS = 10; struct Feedback { int16_t seq_offset; int32_t recv_offset_ms; }; uint8_t count; int64_t first_seq_num; int64_t first_recv_time_ms; Feedback feedbacks[MAX_FEEDBACKS - 1]; }; } // namespace PacketStream::PacketStream(PacketStreamConfig config) : config_(config) {} std::vector PacketStream::PullPackets(Timestamp at_time) { if (next_frame_time_.IsInfinite()) next_frame_time_ = at_time; TimeDelta frame_interval = TimeDelta::seconds(1) / config_.frame_rate; int64_t frame_allowance = (frame_interval * target_rate_).bytes(); if (next_frame_is_keyframe_) { frame_allowance *= config_.keyframe_multiplier; next_frame_is_keyframe_ = false; } std::vector packets; while (at_time >= next_frame_time_) { next_frame_time_ += frame_interval; int64_t frame_size = budget_ + frame_allowance; frame_size = std::max(frame_size, config_.min_frame_size.bytes()); budget_ += frame_allowance - frame_size; int64_t packet_budget = frame_size; int64_t max_packet_size = config_.max_packet_size.bytes(); while (packet_budget > max_packet_size) { packets.push_back(max_packet_size); packet_budget -= max_packet_size; } packets.push_back(packet_budget); } for (int64_t& packet : packets) packet += config_.packet_overhead.bytes(); return packets; } void PacketStream::OnTargetRateUpdate(DataRate target_rate) { target_rate_ = std::min(target_rate, config_.max_data_rate); } SimpleFeedbackReportPacket FeedbackFromBuffer( rtc::CopyOnWriteBuffer raw_buffer) { RTC_CHECK_LE(sizeof(RawFeedbackReportPacket), raw_buffer.size()); const RawFeedbackReportPacket& raw_packet = *reinterpret_cast(raw_buffer.cdata()); RTC_CHECK_GE(raw_packet.count, 1); SimpleFeedbackReportPacket packet; packet.receive_times.emplace_back(SimpleFeedbackReportPacket::ReceiveInfo{ raw_packet.first_seq_num, Timestamp::ms(raw_packet.first_recv_time_ms)}); for (int i = 1; i < raw_packet.count; ++i) packet.receive_times.emplace_back(SimpleFeedbackReportPacket::ReceiveInfo{ raw_packet.first_seq_num + raw_packet.feedbacks[i - 1].seq_offset, Timestamp::ms(raw_packet.first_recv_time_ms + raw_packet.feedbacks[i - 1].recv_offset_ms)}); return packet; } rtc::CopyOnWriteBuffer FeedbackToBuffer( const SimpleFeedbackReportPacket packet) { RTC_CHECK_LE(packet.receive_times.size(), RawFeedbackReportPacket::MAX_FEEDBACKS); RawFeedbackReportPacket report; report.count = packet.receive_times.size(); RTC_CHECK(!packet.receive_times.empty()); report.first_seq_num = packet.receive_times.front().sequence_number; report.first_recv_time_ms = packet.receive_times.front().receive_time.ms(); for (int i = 1; i < report.count; ++i) { report.feedbacks[i - 1].seq_offset = static_cast( packet.receive_times[i].sequence_number - report.first_seq_num); report.feedbacks[i - 1].recv_offset_ms = static_cast( packet.receive_times[i].receive_time.ms() - report.first_recv_time_ms); } return rtc::CopyOnWriteBuffer(reinterpret_cast(&report), sizeof(RawFeedbackReportPacket)); } SimulatedSender::SimulatedSender(EmulatedNetworkNode* send_node, uint64_t send_receiver_id) : send_node_(send_node), send_receiver_id_(send_receiver_id) {} SimulatedSender::~SimulatedSender() {} TransportPacketsFeedback SimulatedSender::PullFeedbackReport( SimpleFeedbackReportPacket packet, Timestamp at_time) { TransportPacketsFeedback report; report.prior_in_flight = data_in_flight_; report.feedback_time = at_time; for (auto& receive_info : packet.receive_times) { // Look up sender side information for all packets up to and including each // packet with feedback in the report. for (; next_feedback_seq_num_ <= receive_info.sequence_number; ++next_feedback_seq_num_) { PacketResult feedback; if (next_feedback_seq_num_ == receive_info.sequence_number) { feedback.receive_time = receive_info.receive_time; } else { // If we did not get any feedback for this packet, mark it as lost by // setting receive time to infinity. Note that this can also happen due // to reordering, we will newer send feedback out of order. In this case // the packet was not really lost, but we don't have that information. feedback.receive_time = Timestamp::PlusInfinity(); } // Looking up send side information. for (auto it = sent_packets_.begin(); it != sent_packets_.end(); ++it) { if (it->sequence_number == next_feedback_seq_num_) { feedback.sent_packet = *it; if (feedback.receive_time.IsFinite()) sent_packets_.erase(it); break; } } data_in_flight_ -= feedback.sent_packet.size; report.packet_feedbacks.push_back(feedback); } } report.data_in_flight = data_in_flight_; return report; } // Applies pacing and congetsion window based on the configuration from the // congestion controller. This is not a complete implementation of the real // pacer but useful for unit tests since it isn't limited to real time. std::vector SimulatedSender::PaceAndPullSendPackets(Timestamp at_time) { // TODO(srte): Extract the behavior of PacedSender to a threading and time // independent component and use that here to allow a truthful simulation. if (last_update_.IsInfinite()) { pacing_budget_ = 0; } else { TimeDelta delta = at_time - last_update_; pacing_budget_ += (delta * pacer_config_.data_rate()).bytes(); } std::vector to_send; while (data_in_flight_ <= max_in_flight_ && pacing_budget_ >= 0 && !packet_queue_.empty()) { PendingPacket pending = packet_queue_.front(); pacing_budget_ -= pending.size; packet_queue_.pop_front(); SentPacket sent; sent.sequence_number = next_sequence_number_++; sent.size = DataSize::bytes(pending.size); data_in_flight_ += sent.size; sent.data_in_flight = data_in_flight_; sent.pacing_info = PacedPacketInfo(); sent.send_time = at_time; sent_packets_.push_back(sent); rtc::CopyOnWriteBuffer packet( std::max(pending.size, sizeof(sent.sequence_number))); memcpy(packet.data(), &sent.sequence_number, sizeof(sent.sequence_number)); to_send.emplace_back(PacketReadyToSend{sent, packet}); } pacing_budget_ = std::min(pacing_budget_, 0); last_update_ = at_time; return to_send; } void SimulatedSender::Update(NetworkControlUpdate update) { if (update.pacer_config) pacer_config_ = *update.pacer_config; if (update.congestion_window) max_in_flight_ = *update.congestion_window; } SimulatedFeedback::SimulatedFeedback(SimulatedTimeClientConfig config, uint64_t return_receiver_id, EmulatedNetworkNode* return_node) : config_(config), return_receiver_id_(return_receiver_id), return_node_(return_node) {} // Polls receiver side for a feedback report and sends it to the stream sender // via return_node_, void SimulatedFeedback::OnPacketReceived(EmulatedIpPacket packet) { int64_t sequence_number; memcpy(&sequence_number, packet.cdata(), sizeof(sequence_number)); receive_times_.insert({sequence_number, packet.arrival_time}); if (last_feedback_time_.IsInfinite()) last_feedback_time_ = packet.arrival_time; if (packet.arrival_time >= last_feedback_time_ + config_.feedback.interval) { SimpleFeedbackReportPacket report; for (; next_feedback_seq_num_ <= sequence_number; ++next_feedback_seq_num_) { auto it = receive_times_.find(next_feedback_seq_num_); if (it != receive_times_.end()) { report.receive_times.emplace_back( SimpleFeedbackReportPacket::ReceiveInfo{next_feedback_seq_num_, it->second}); receive_times_.erase(it); } if (report.receive_times.size() >= RawFeedbackReportPacket::MAX_FEEDBACKS) { return_node_->OnPacketReceived( EmulatedIpPacket(packet.to, packet.from, return_receiver_id_, FeedbackToBuffer(report), packet.arrival_time)); report = SimpleFeedbackReportPacket(); } } if (!report.receive_times.empty()) return_node_->OnPacketReceived( EmulatedIpPacket(packet.to, packet.from, return_receiver_id_, FeedbackToBuffer(report), packet.arrival_time)); last_feedback_time_ = packet.arrival_time; } } SimulatedTimeClient::SimulatedTimeClient( std::unique_ptr log_writer_factory, SimulatedTimeClientConfig config, std::vector stream_configs, std::vector send_link, std::vector return_link, uint64_t send_receiver_id, uint64_t return_receiver_id, Timestamp at_time) : log_writer_factory_(std::move(log_writer_factory)), network_controller_factory_(log_writer_factory_.get(), config.transport), send_link_(send_link), return_link_(return_link), sender_(send_link.front(), send_receiver_id), feedback_(config, return_receiver_id, return_link.front()) { current_contraints_.at_time = at_time; current_contraints_.starting_rate = config.transport.rates.start_rate; current_contraints_.min_data_rate = config.transport.rates.min_rate; current_contraints_.max_data_rate = config.transport.rates.max_rate; NetworkControllerConfig initial_config; initial_config.constraints = current_contraints_; initial_config.stream_based_config.max_padding_rate = config.transport.rates.max_padding_rate; congestion_controller_ = network_controller_factory_.Create(initial_config); for (auto& stream_config : stream_configs) packet_streams_.emplace_back(new PacketStream(stream_config)); EmulatedNetworkNode::CreateRoute(send_receiver_id, send_link, &feedback_); EmulatedNetworkNode::CreateRoute(return_receiver_id, return_link, this); CongestionProcess(at_time); network_controller_factory_.LogCongestionControllerStats(at_time); if (log_writer_factory_) { packet_log_ = log_writer_factory_->Create(".packets.txt"); packet_log_->Write( "transport_seq packet_size send_time recv_time feed_time\n"); } } // Pulls feedback reports from sender side based on the recieved feedback // packet. Updates congestion controller with the resulting report. void SimulatedTimeClient::OnPacketReceived(EmulatedIpPacket packet) { auto report = sender_.PullFeedbackReport(FeedbackFromBuffer(packet.data), packet.arrival_time); for (PacketResult& feedback : report.packet_feedbacks) { if (packet_log_) LogWriteFormat(packet_log_.get(), "%" PRId64 " %" PRId64 " %.3lf %.3lf %.3lf\n", feedback.sent_packet.sequence_number, feedback.sent_packet.size.bytes(), feedback.sent_packet.send_time.seconds(), feedback.receive_time.seconds(), packet.arrival_time.seconds()); } Update(congestion_controller_->OnTransportPacketsFeedback(report)); } SimulatedTimeClient::~SimulatedTimeClient() { } void SimulatedTimeClient::Update(NetworkControlUpdate update) { sender_.Update(update); if (update.target_rate) { // TODO(srte): Implement more realistic distribution of bandwidths between // streams. Either using BitrateAllocationStrategy directly or using // BitrateAllocation. double ratio_per_stream = 1.0 / packet_streams_.size(); DataRate rate_per_stream = update.target_rate->target_rate * ratio_per_stream; target_rate_ = update.target_rate->target_rate; link_capacity_ = update.target_rate->network_estimate.bandwidth; for (auto& stream : packet_streams_) stream->OnTargetRateUpdate(rate_per_stream); } } void SimulatedTimeClient::CongestionProcess(Timestamp at_time) { ProcessInterval msg; msg.at_time = at_time; Update(congestion_controller_->OnProcessInterval(msg)); } void SimulatedTimeClient::PacerProcess(Timestamp at_time) { ProcessFrames(at_time); for (const auto& to_send : sender_.PaceAndPullSendPackets(at_time)) { sender_.send_node_->OnPacketReceived(EmulatedIpPacket( rtc::SocketAddress() /*from*/, rtc::SocketAddress() /*to*/, sender_.send_receiver_id_, to_send.data, at_time)); Update(congestion_controller_->OnSentPacket(to_send.send_info)); } } void SimulatedTimeClient::ProcessFrames(Timestamp at_time) { for (auto& stream : packet_streams_) { for (int64_t packet_size : stream->PullPackets(at_time)) { sender_.packet_queue_.push_back( SimulatedSender::PendingPacket{packet_size}); } } } void SimulatedTimeClient::TriggerFakeReroute(Timestamp at_time) { NetworkRouteChange msg; msg.at_time = at_time; msg.constraints = current_contraints_; msg.constraints.at_time = at_time; Update(congestion_controller_->OnNetworkRouteChange(msg)); } TimeDelta SimulatedTimeClient::GetNetworkControllerProcessInterval() const { return network_controller_factory_.GetProcessInterval(); } DataRate SimulatedTimeClient::link_capacity() const { return link_capacity_; } double SimulatedTimeClient::target_rate_kbps() const { return target_rate_.kbps(); } DataRate SimulatedTimeClient::padding_rate() const { return sender_.pacer_config_.pad_rate(); } } // namespace test } // namespace webrtc