Add ability to emulate degraded network in Call via field trial

This is especially useful in Chrome, allowing use to emulate network
conditions in incoming or outgoing media without the need for platform
specific tools or hacks. It also doesn't interfere with the rest of the
network traffic.

Also includes some refactorings.

Bug: webrtc:8910
Change-Id: I2656a2d4218acbe7f8ffd669de19a02275735438
Reviewed-on: https://webrtc-review.googlesource.com/33013
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#22418}
This commit is contained in:
Erik Språng 2018-03-14 10:47:02 +01:00 committed by Commit Bot
parent e10675a666
commit 31a12c557d
13 changed files with 1132 additions and 484 deletions

View File

@ -143,6 +143,8 @@ rtc_static_library("call") {
"call.cc",
"callfactory.cc",
"callfactory.h",
"degraded_call.cc",
"degraded_call.h",
"flexfec_receive_stream_impl.cc",
"flexfec_receive_stream_impl.h",
]
@ -155,6 +157,7 @@ rtc_static_library("call") {
deps = [
":bitrate_allocator",
":call_interfaces",
":fake_network",
":rtp_interfaces",
":rtp_receiver",
":rtp_sender",
@ -184,6 +187,7 @@ rtc_static_library("call") {
"../rtc_base:safe_minmax",
"../rtc_base:sequenced_task_checker",
"../system_wrappers",
"../system_wrappers:field_trial_api",
"../system_wrappers:metrics_api",
"../video",
]
@ -213,6 +217,29 @@ rtc_source_set("video_stream_api") {
]
}
rtc_source_set("fake_network") {
sources = [
"fake_network_pipe.cc",
"fake_network_pipe.h",
]
deps = [
":call_interfaces",
"..:typedefs",
"..:webrtc_common",
"../api:transport_api",
"../modules:module_api",
"../modules/rtp_rtcp",
"../rtc_base:rtc_base_approved",
"../rtc_base:sequenced_task_checker",
"../system_wrappers",
]
if (!build_with_chromium && is_clang) {
# Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
suppressed_configs += [ "//build/config/clang:find_bad_constructs" ]
}
}
if (rtc_include_tests) {
rtc_source_set("call_tests") {
testonly = true
@ -340,4 +367,24 @@ if (rtc_include_tests) {
"//test:test_support",
]
}
rtc_test("fake_network_unittests") {
deps = [
":call_interfaces",
":fake_network",
"../modules/rtp_rtcp",
"../rtc_base:rtc_base_approved",
"../system_wrappers",
"../test:test_common",
"../test:test_main",
"//testing/gtest",
]
sources = [
"test/fake_network_pipe_unittest.cc",
]
if (!build_with_chromium && is_clang) {
# Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
suppressed_configs += [ "//build/config/clang:find_bad_constructs" ]
}
}
}

View File

@ -292,19 +292,24 @@ class Call : public webrtc::Call,
// single mapping from ssrc to a more abstract receive stream, with
// accessor methods for all configuration we need at this level.
struct ReceiveRtpConfig {
ReceiveRtpConfig() = default; // Needed by std::map
ReceiveRtpConfig(const std::vector<RtpExtension>& extensions,
bool use_send_side_bwe)
: extensions(extensions), use_send_side_bwe(use_send_side_bwe) {}
explicit ReceiveRtpConfig(const webrtc::AudioReceiveStream::Config& config)
: extensions(config.rtp.extensions),
use_send_side_bwe(UseSendSideBwe(config)) {}
explicit ReceiveRtpConfig(const webrtc::VideoReceiveStream::Config& config)
: extensions(config.rtp.extensions),
use_send_side_bwe(UseSendSideBwe(config)) {}
explicit ReceiveRtpConfig(const FlexfecReceiveStream::Config& config)
: extensions(config.rtp_header_extensions),
use_send_side_bwe(UseSendSideBwe(config)) {}
// Registered RTP header extensions for each stream. Note that RTP header
// extensions are negotiated per track ("m= line") in the SDP, but we have
// no notion of tracks at the Call level. We therefore store the RTP header
// extensions per SSRC instead, which leads to some storage overhead.
RtpHeaderExtensionMap extensions;
const RtpHeaderExtensionMap extensions;
// Set if both RTP extension the RTCP feedback message needed for
// send side BWE are negotiated.
bool use_send_side_bwe = false;
const bool use_send_side_bwe;
};
std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
RTC_GUARDED_BY(receive_crit_);
@ -641,8 +646,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
module_process_thread_.get(), config, config_.audio_state, event_log_);
{
WriteLockScoped write_lock(*receive_crit_);
receive_rtp_config_[config.rtp.remote_ssrc] =
ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config));
receive_rtp_config_.emplace(config.rtp.remote_ssrc, config);
audio_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
@ -791,8 +795,6 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
module_process_thread_.get(), call_stats_.get());
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
ReceiveRtpConfig receive_config(config.rtp.extensions,
UseSendSideBwe(config));
{
WriteLockScoped write_lock(*receive_crit_);
if (config.rtp.rtx_ssrc) {
@ -800,9 +802,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
// stream. Since the transport_send_cc negotiation is per payload
// type, we may get an incorrect value for the rtx stream, but
// that is unlikely to matter in practice.
receive_rtp_config_[config.rtp.rtx_ssrc] = receive_config;
receive_rtp_config_.emplace(config.rtp.rtx_ssrc, config);
}
receive_rtp_config_[config.rtp.remote_ssrc] = receive_config;
receive_rtp_config_.emplace(config.rtp.remote_ssrc, config);
video_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
}
@ -865,8 +867,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
receive_rtp_config_.end());
receive_rtp_config_[config.remote_ssrc] =
ReceiveRtpConfig(config.rtp_header_extensions, UseSendSideBwe(config));
receive_rtp_config_.emplace(config.remote_ssrc, config);
}
// TODO(brandtr): Store config in RtcEventLog here.
@ -1312,7 +1313,7 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
// deregistering in the |receive_rtp_config_| map is protected by that lock.
// So by not passing the packet on to demuxing in this case, we prevent
// incoming packets to be passed on via the demuxer to a receive stream
// which is being torned down.
// which is being torn down.
return;
}
parsed_packet.IdentifyExtensions(it->second.extensions);

View File

@ -11,12 +11,74 @@
#include "call/callfactory.h"
#include <memory>
#include <string>
#include <utility>
#include "api/optional.h"
#include "call/call.h"
#include "call/degraded_call.h"
#include "call/fake_network_pipe.h"
#include "system_wrappers/include/field_trial.h"
namespace webrtc {
namespace {
bool ParseConfigParam(std::string exp_name, int* field) {
std::string group = field_trial::FindFullName(exp_name);
if (group == "")
return false;
return (sscanf(group.c_str(), "%d", field) == 1);
}
rtc::Optional<webrtc::FakeNetworkPipe::Config> ParseDegradationConfig(
bool send) {
std::string exp_prefix = "WebRTCFakeNetwork";
if (send) {
exp_prefix += "Send";
} else {
exp_prefix += "Receive";
}
webrtc::FakeNetworkPipe::Config config;
bool configured = false;
configured |=
ParseConfigParam(exp_prefix + "DelayMs", &config.queue_delay_ms);
configured |= ParseConfigParam(exp_prefix + "DelayStdDevMs",
&config.delay_standard_deviation_ms);
int queue_length = 0;
if (ParseConfigParam(exp_prefix + "QueueLength", &queue_length)) {
RTC_CHECK_GE(queue_length, 0);
config.queue_length_packets = queue_length;
configured = true;
}
configured |=
ParseConfigParam(exp_prefix + "CapacityKbps", &config.link_capacity_kbps);
configured |=
ParseConfigParam(exp_prefix + "LossPercent", &config.loss_percent);
int allow_reordering = 0;
if (ParseConfigParam(exp_prefix + "AllowReordering", &allow_reordering)) {
config.allow_reordering = true;
configured = true;
}
configured |= ParseConfigParam(exp_prefix + "AvgBurstLossLength",
&config.avg_burst_loss_length);
return configured ? rtc::Optional<webrtc::FakeNetworkPipe::Config>(config)
: rtc::nullopt;
}
} // namespace
Call* CallFactory::CreateCall(const Call::Config& config) {
rtc::Optional<webrtc::FakeNetworkPipe::Config> send_degradation_config =
ParseDegradationConfig(true);
rtc::Optional<webrtc::FakeNetworkPipe::Config> receive_degradation_config =
ParseDegradationConfig(false);
if (send_degradation_config || receive_degradation_config) {
return new DegradedCall(std::unique_ptr<Call>(Call::Create(config)),
send_degradation_config,
receive_degradation_config);
}
return Call::Create(config);
}

209
call/degraded_call.cc Normal file
View File

@ -0,0 +1,209 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <utility>
#include "call/degraded_call.h"
#include "rtc_base/ptr_util.h"
namespace webrtc {
DegradedCall::DegradedCall(
std::unique_ptr<Call> call,
rtc::Optional<FakeNetworkPipe::Config> send_config,
rtc::Optional<FakeNetworkPipe::Config> receive_config)
: clock_(Clock::GetRealTimeClock()),
call_(std::move(call)),
send_config_(send_config),
send_process_thread_(
send_config_ ? ProcessThread::Create("DegradedSendThread") : nullptr),
num_send_streams_(0),
receive_config_(receive_config) {
if (receive_config_) {
receive_pipe_ =
rtc::MakeUnique<webrtc::FakeNetworkPipe>(clock_, *receive_config_);
receive_pipe_->SetReceiver(call_->Receiver());
}
if (send_process_thread_) {
send_process_thread_->Start();
}
}
DegradedCall::~DegradedCall() {
if (send_pipe_) {
send_process_thread_->DeRegisterModule(send_pipe_.get());
}
if (send_process_thread_) {
send_process_thread_->Stop();
}
}
AudioSendStream* DegradedCall::CreateAudioSendStream(
const AudioSendStream::Config& config) {
return call_->CreateAudioSendStream(config);
}
void DegradedCall::DestroyAudioSendStream(AudioSendStream* send_stream) {
call_->DestroyAudioSendStream(send_stream);
}
AudioReceiveStream* DegradedCall::CreateAudioReceiveStream(
const AudioReceiveStream::Config& config) {
return call_->CreateAudioReceiveStream(config);
}
void DegradedCall::DestroyAudioReceiveStream(
AudioReceiveStream* receive_stream) {
call_->DestroyAudioReceiveStream(receive_stream);
}
VideoSendStream* DegradedCall::CreateVideoSendStream(
VideoSendStream::Config config,
VideoEncoderConfig encoder_config) {
if (send_config_ && !send_pipe_) {
send_pipe_ = rtc::MakeUnique<FakeNetworkPipe>(clock_, *send_config_,
config.send_transport);
config.send_transport = this;
send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
}
++num_send_streams_;
return call_->CreateVideoSendStream(std::move(config),
std::move(encoder_config));
}
VideoSendStream* DegradedCall::CreateVideoSendStream(
VideoSendStream::Config config,
VideoEncoderConfig encoder_config,
std::unique_ptr<FecController> fec_controller) {
if (send_config_ && !send_pipe_) {
send_pipe_ = rtc::MakeUnique<FakeNetworkPipe>(clock_, *send_config_,
config.send_transport);
config.send_transport = this;
send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
}
++num_send_streams_;
return call_->CreateVideoSendStream(
std::move(config), std::move(encoder_config), std::move(fec_controller));
}
void DegradedCall::DestroyVideoSendStream(VideoSendStream* send_stream) {
if (send_pipe_ && num_send_streams_ > 0) {
--num_send_streams_;
if (num_send_streams_ == 0) {
send_process_thread_->DeRegisterModule(send_pipe_.get());
send_pipe_.reset();
}
}
call_->DestroyVideoSendStream(send_stream);
}
VideoReceiveStream* DegradedCall::CreateVideoReceiveStream(
VideoReceiveStream::Config configuration) {
return call_->CreateVideoReceiveStream(std::move(configuration));
}
void DegradedCall::DestroyVideoReceiveStream(
VideoReceiveStream* receive_stream) {
call_->DestroyVideoReceiveStream(receive_stream);
}
FlexfecReceiveStream* DegradedCall::CreateFlexfecReceiveStream(
const FlexfecReceiveStream::Config& config) {
return call_->CreateFlexfecReceiveStream(config);
}
void DegradedCall::DestroyFlexfecReceiveStream(
FlexfecReceiveStream* receive_stream) {
call_->DestroyFlexfecReceiveStream(receive_stream);
}
PacketReceiver* DegradedCall::Receiver() {
if (receive_config_) {
return this;
}
return call_->Receiver();
}
RtpTransportControllerSendInterface*
DegradedCall::GetTransportControllerSend() {
return call_->GetTransportControllerSend();
}
Call::Stats DegradedCall::GetStats() const {
return call_->GetStats();
}
void DegradedCall::SetBitrateAllocationStrategy(
std::unique_ptr<rtc::BitrateAllocationStrategy>
bitrate_allocation_strategy) {
call_->SetBitrateAllocationStrategy(std::move(bitrate_allocation_strategy));
}
void DegradedCall::SignalChannelNetworkState(MediaType media,
NetworkState state) {
call_->SignalChannelNetworkState(media, state);
}
void DegradedCall::OnTransportOverheadChanged(
MediaType media,
int transport_overhead_per_packet) {
call_->OnTransportOverheadChanged(media, transport_overhead_per_packet);
}
void DegradedCall::OnSentPacket(const rtc::SentPacket& sent_packet) {
if (send_config_) {
// If we have a degraded send-transport, we have already notified call
// about the supposed network send time. Discard the actual network send
// time in order to properly fool the BWE.
return;
}
call_->OnSentPacket(sent_packet);
}
bool DegradedCall::SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options) {
// A call here comes from the RTP stack (probably pacer). We intercept it and
// put it in the fake network pipe instead, but report to Call that is has
// been sent, so that the bandwidth estimator sees the delay we add.
send_pipe_->SendRtp(packet, length, options);
if (options.packet_id != -1) {
rtc::SentPacket packet_info;
packet_info.packet_id = options.packet_id;
packet_info.send_time_ms = clock_->TimeInMilliseconds();
call_->OnSentPacket(packet_info);
}
return true;
}
bool DegradedCall::SendRtcp(const uint8_t* packet, size_t length) {
send_pipe_->SendRtcp(packet, length);
return true;
}
PacketReceiver::DeliveryStatus DegradedCall::DeliverPacket(
MediaType media_type,
rtc::CopyOnWriteBuffer packet,
const PacketTime& packet_time) {
PacketReceiver::DeliveryStatus status =
receive_pipe_->DeliverPacket(media_type, std::move(packet), packet_time);
// This is not optimal, but there are many places where there are thread
// checks that fail if we're not using the worker thread call into this
// method. If we want to fix this we probably need a task queue to do handover
// of all overriden methods, which feels like overikill for the current use
// case.
// By just having this thread call out via the Process() method we work around
// that, with the tradeoff that a non-zero delay may become a little larger
// than anticipated at very low packet rates.
receive_pipe_->Process();
return status;
}
} // namespace webrtc

104
call/degraded_call.h Normal file
View File

@ -0,0 +1,104 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef CALL_DEGRADED_CALL_H_
#define CALL_DEGRADED_CALL_H_
#include <memory>
#include "api/call/transport.h"
#include "api/optional.h"
#include "call/call.h"
#include "call/fake_network_pipe.h"
#include "modules/utility/include/process_thread.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
class DegradedCall : public Call, private Transport, private PacketReceiver {
public:
explicit DegradedCall(std::unique_ptr<Call> call,
rtc::Optional<FakeNetworkPipe::Config> send_config,
rtc::Optional<FakeNetworkPipe::Config> receive_config);
~DegradedCall() override;
// Implements Call.
AudioSendStream* CreateAudioSendStream(
const AudioSendStream::Config& config) override;
void DestroyAudioSendStream(AudioSendStream* send_stream) override;
AudioReceiveStream* CreateAudioReceiveStream(
const AudioReceiveStream::Config& config) override;
void DestroyAudioReceiveStream(AudioReceiveStream* receive_stream) override;
VideoSendStream* CreateVideoSendStream(
VideoSendStream::Config config,
VideoEncoderConfig encoder_config) override;
VideoSendStream* CreateVideoSendStream(
VideoSendStream::Config config,
VideoEncoderConfig encoder_config,
std::unique_ptr<FecController> fec_controller) override;
void DestroyVideoSendStream(VideoSendStream* send_stream) override;
VideoReceiveStream* CreateVideoReceiveStream(
VideoReceiveStream::Config configuration) override;
void DestroyVideoReceiveStream(VideoReceiveStream* receive_stream) override;
FlexfecReceiveStream* CreateFlexfecReceiveStream(
const FlexfecReceiveStream::Config& config) override;
void DestroyFlexfecReceiveStream(
FlexfecReceiveStream* receive_stream) override;
PacketReceiver* Receiver() override;
RtpTransportControllerSendInterface* GetTransportControllerSend() override;
Stats GetStats() const override;
void SetBitrateAllocationStrategy(
std::unique_ptr<rtc::BitrateAllocationStrategy>
bitrate_allocation_strategy) override;
void SignalChannelNetworkState(MediaType media, NetworkState state) override;
void OnTransportOverheadChanged(MediaType media,
int transport_overhead_per_packet) override;
void OnSentPacket(const rtc::SentPacket& sent_packet) override;
protected:
// Implements Transport.
bool SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options) override;
bool SendRtcp(const uint8_t* packet, size_t length) override;
// Implements PacketReceiver.
DeliveryStatus DeliverPacket(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
const PacketTime& packet_time) override;
private:
Clock* const clock_;
const std::unique_ptr<Call> call_;
const rtc::Optional<FakeNetworkPipe::Config> send_config_;
const std::unique_ptr<ProcessThread> send_process_thread_;
std::unique_ptr<FakeNetworkPipe> send_pipe_;
size_t num_send_streams_;
const rtc::Optional<FakeNetworkPipe::Config> receive_config_;
std::unique_ptr<FakeNetworkPipe> receive_pipe_;
};
} // namespace webrtc
#endif // CALL_DEGRADED_CALL_H_

419
call/fake_network_pipe.cc Normal file
View File

@ -0,0 +1,419 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <assert.h>
#include <math.h>
#include <string.h>
#include <algorithm>
#include <cmath>
#include <utility>
#include "call/call.h"
#include "call/fake_network_pipe.h"
#include "modules/rtp_rtcp/include/rtp_header_parser.h"
#include "rtc_base/logging.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
namespace {
constexpr int64_t kDefaultProcessIntervalMs = 5;
struct PacketArrivalTimeComparator {
bool operator()(const NetworkPacket& p1, const NetworkPacket& p2) {
return p1.arrival_time() < p2.arrival_time();
}
};
} // namespace
NetworkPacket::NetworkPacket(rtc::CopyOnWriteBuffer packet,
int64_t send_time,
int64_t arrival_time,
rtc::Optional<PacketOptions> packet_options,
bool is_rtcp,
MediaType media_type,
rtc::Optional<PacketTime> packet_time)
: packet_(std::move(packet)),
send_time_(send_time),
arrival_time_(arrival_time),
packet_options_(packet_options),
is_rtcp_(is_rtcp),
media_type_(media_type),
packet_time_(packet_time) {}
NetworkPacket::NetworkPacket(NetworkPacket&& o)
: packet_(std::move(o.packet_)),
send_time_(o.send_time_),
arrival_time_(o.arrival_time_),
packet_options_(o.packet_options_),
is_rtcp_(o.is_rtcp_),
media_type_(o.media_type_),
packet_time_(o.packet_time_) {}
NetworkPacket& NetworkPacket::operator=(NetworkPacket&& o) {
packet_ = std::move(o.packet_);
send_time_ = o.send_time_;
arrival_time_ = o.arrival_time_;
packet_options_ = o.packet_options_;
is_rtcp_ = o.is_rtcp_;
media_type_ = o.media_type_;
packet_time_ = o.packet_time_;
return *this;
}
DemuxerImpl::DemuxerImpl(const std::map<uint8_t, MediaType>& payload_type_map)
: packet_receiver_(nullptr), payload_type_map_(payload_type_map) {}
void DemuxerImpl::SetReceiver(PacketReceiver* receiver) {
packet_receiver_ = receiver;
}
void DemuxerImpl::DeliverPacket(const NetworkPacket* packet,
const PacketTime& packet_time) {
// No packet receiver means that this demuxer will terminate the flow of
// packets.
if (!packet_receiver_)
return;
const uint8_t* const packet_data = packet->data();
const size_t packet_length = packet->data_length();
MediaType media_type = MediaType::ANY;
if (!RtpHeaderParser::IsRtcp(packet_data, packet_length)) {
RTC_CHECK_GE(packet_length, 2);
const uint8_t payload_type = packet_data[1] & 0x7f;
std::map<uint8_t, MediaType>::const_iterator it =
payload_type_map_.find(payload_type);
RTC_CHECK(it != payload_type_map_.end())
<< "payload type " << static_cast<int>(payload_type) << " unknown.";
media_type = it->second;
}
packet_receiver_->DeliverPacket(
media_type, rtc::CopyOnWriteBuffer(packet_data, packet_length),
packet_time);
}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config)
: FakeNetworkPipe(clock, config, nullptr, 1) {}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer)
: FakeNetworkPipe(clock, config, std::move(demuxer), 1) {}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer,
uint64_t seed)
: clock_(clock),
demuxer_(std::move(demuxer)),
receiver_(nullptr),
transport_(nullptr),
random_(seed),
config_(),
dropped_packets_(0),
sent_packets_(0),
total_packet_delay_(0),
bursting_(false),
next_process_time_(clock_->TimeInMilliseconds()),
last_log_time_(clock_->TimeInMilliseconds()) {
SetConfig(config);
}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
Transport* transport)
: clock_(clock),
receiver_(nullptr),
transport_(transport),
random_(1),
config_(),
dropped_packets_(0),
sent_packets_(0),
total_packet_delay_(0),
bursting_(false),
next_process_time_(clock_->TimeInMilliseconds()),
last_log_time_(clock_->TimeInMilliseconds()) {
SetConfig(config);
}
FakeNetworkPipe::~FakeNetworkPipe() = default;
void FakeNetworkPipe::SetReceiver(PacketReceiver* receiver) {
rtc::CritScope crit(&config_lock_);
if (demuxer_)
demuxer_->SetReceiver(receiver);
receiver_ = receiver;
}
bool FakeNetworkPipe::SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options) {
RTC_DCHECK(HasTransport());
EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), options, false,
MediaType::ANY, rtc::nullopt);
return true;
}
bool FakeNetworkPipe::SendRtcp(const uint8_t* packet, size_t length) {
RTC_DCHECK(HasTransport());
EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), rtc::nullopt, true,
MediaType::ANY, rtc::nullopt);
return true;
}
PacketReceiver::DeliveryStatus FakeNetworkPipe::DeliverPacket(
MediaType media_type,
rtc::CopyOnWriteBuffer packet,
const PacketTime& packet_time) {
return EnqueuePacket(std::move(packet), rtc::nullopt, false, media_type,
packet_time)
? PacketReceiver::DELIVERY_OK
: PacketReceiver::DELIVERY_PACKET_ERROR;
}
void FakeNetworkPipe::SetConfig(const FakeNetworkPipe::Config& config) {
rtc::CritScope crit(&config_lock_);
config_ = config; // Shallow copy of the struct.
double prob_loss = config.loss_percent / 100.0;
if (config_.avg_burst_loss_length == -1) {
// Uniform loss
prob_loss_bursting_ = prob_loss;
prob_start_bursting_ = prob_loss;
} else {
// Lose packets according to a gilbert-elliot model.
int avg_burst_loss_length = config.avg_burst_loss_length;
int min_avg_burst_loss_length = std::ceil(prob_loss / (1 - prob_loss));
RTC_CHECK_GT(avg_burst_loss_length, min_avg_burst_loss_length)
<< "For a total packet loss of " << config.loss_percent << "%% then"
<< " avg_burst_loss_length must be " << min_avg_burst_loss_length + 1
<< " or higher.";
prob_loss_bursting_ = (1.0 - 1.0 / avg_burst_loss_length);
prob_start_bursting_ = prob_loss / (1 - prob_loss) / avg_burst_loss_length;
}
}
void FakeNetworkPipe::SendPacket(const uint8_t* data, size_t data_length) {
RTC_DCHECK(HasDemuxer());
EnqueuePacket(rtc::CopyOnWriteBuffer(data, data_length), rtc::nullopt, false,
MediaType::ANY, rtc::nullopt);
}
bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
rtc::Optional<PacketOptions> options,
bool is_rtcp,
MediaType media_type,
rtc::Optional<PacketTime> packet_time) {
Config config;
{
rtc::CritScope crit(&config_lock_);
config = config_;
}
rtc::CritScope crit(&process_lock_);
if (config.queue_length_packets > 0 &&
capacity_link_.size() >= config.queue_length_packets) {
// Too many packet on the link, drop this one.
++dropped_packets_;
return false;
}
int64_t time_now = clock_->TimeInMilliseconds();
// Delay introduced by the link capacity.
int64_t capacity_delay_ms = 0;
if (config.link_capacity_kbps > 0) {
const int bytes_per_millisecond = config.link_capacity_kbps / 8;
// To round to the closest millisecond we add half a milliseconds worth of
// bytes to the delay calculation.
capacity_delay_ms = (packet.size() + capacity_delay_error_bytes_ +
bytes_per_millisecond / 2) /
bytes_per_millisecond;
capacity_delay_error_bytes_ +=
packet.size() - capacity_delay_ms * bytes_per_millisecond;
}
int64_t network_start_time = time_now;
// Check if there already are packets on the link and change network start
// time forward if there is.
if (!capacity_link_.empty() &&
network_start_time < capacity_link_.back().arrival_time())
network_start_time = capacity_link_.back().arrival_time();
int64_t arrival_time = network_start_time + capacity_delay_ms;
capacity_link_.emplace(std::move(packet), time_now, arrival_time, options,
is_rtcp, media_type, packet_time);
return true;
}
float FakeNetworkPipe::PercentageLoss() {
rtc::CritScope crit(&process_lock_);
if (sent_packets_ == 0)
return 0;
return static_cast<float>(dropped_packets_) /
(sent_packets_ + dropped_packets_);
}
int FakeNetworkPipe::AverageDelay() {
rtc::CritScope crit(&process_lock_);
if (sent_packets_ == 0)
return 0;
return static_cast<int>(total_packet_delay_ /
static_cast<int64_t>(sent_packets_));
}
size_t FakeNetworkPipe::DroppedPackets() {
rtc::CritScope crit(&process_lock_);
return dropped_packets_;
}
size_t FakeNetworkPipe::SentPackets() {
rtc::CritScope crit(&process_lock_);
return sent_packets_;
}
void FakeNetworkPipe::Process() {
int64_t time_now = clock_->TimeInMilliseconds();
std::queue<NetworkPacket> packets_to_deliver;
Config config;
double prob_loss_bursting;
double prob_start_bursting;
{
rtc::CritScope crit(&config_lock_);
config = config_;
prob_loss_bursting = prob_loss_bursting_;
prob_start_bursting = prob_start_bursting_;
}
{
rtc::CritScope crit(&process_lock_);
if (time_now - last_log_time_ > 5000) {
int64_t queueing_delay_ms = 0;
if (!capacity_link_.empty()) {
queueing_delay_ms = time_now - capacity_link_.front().send_time();
}
RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_ms << " ms.";
last_log_time_ = time_now;
}
// Check the capacity link first.
if (!capacity_link_.empty()) {
int64_t last_arrival_time =
delay_link_.empty() ? -1 : delay_link_.back().arrival_time();
bool needs_sort = false;
while (!capacity_link_.empty() &&
time_now >= capacity_link_.front().arrival_time()) {
// Time to get this packet.
NetworkPacket packet = std::move(capacity_link_.front());
capacity_link_.pop();
// Drop packets at an average rate of |config_.loss_percent| with
// and average loss burst length of |config_.avg_burst_loss_length|.
if ((bursting_ && random_.Rand<double>() < prob_loss_bursting) ||
(!bursting_ && random_.Rand<double>() < prob_start_bursting)) {
bursting_ = true;
continue;
} else {
bursting_ = false;
}
int arrival_time_jitter = random_.Gaussian(
config.queue_delay_ms, config.delay_standard_deviation_ms);
// If reordering is not allowed then adjust arrival_time_jitter
// to make sure all packets are sent in order.
if (!config.allow_reordering && !delay_link_.empty() &&
packet.arrival_time() + arrival_time_jitter < last_arrival_time) {
arrival_time_jitter = last_arrival_time - packet.arrival_time();
}
packet.IncrementArrivalTime(arrival_time_jitter);
if (packet.arrival_time() >= last_arrival_time) {
last_arrival_time = packet.arrival_time();
} else {
needs_sort = true;
}
delay_link_.emplace_back(std::move(packet));
}
if (needs_sort) {
// Packet(s) arrived out of order, make sure list is sorted.
std::sort(delay_link_.begin(), delay_link_.end(),
PacketArrivalTimeComparator());
}
}
// Check the extra delay queue.
while (!delay_link_.empty() &&
time_now >= delay_link_.front().arrival_time()) {
// Deliver this packet.
NetworkPacket packet(std::move(delay_link_.front()));
delay_link_.pop_front();
// |time_now| might be later than when the packet should have arrived, due
// to NetworkProcess being called too late. For stats, use the time it
// should have been on the link.
total_packet_delay_ += packet.arrival_time() - packet.send_time();
packets_to_deliver.push(std::move(packet));
}
sent_packets_ += packets_to_deliver.size();
}
rtc::CritScope crit(&config_lock_);
while (!packets_to_deliver.empty()) {
NetworkPacket packet = std::move(packets_to_deliver.front());
packets_to_deliver.pop();
DeliverPacket(&packet);
}
next_process_time_ = !delay_link_.empty()
? delay_link_.begin()->arrival_time()
: time_now + kDefaultProcessIntervalMs;
}
void FakeNetworkPipe::DeliverPacket(NetworkPacket* packet) {
if (demuxer_) {
demuxer_->DeliverPacket(packet, PacketTime());
} else if (transport_) {
if (packet->is_rtcp()) {
transport_->SendRtcp(packet->data(), packet->data_length());
} else {
transport_->SendRtp(packet->data(), packet->data_length(),
packet->packet_options());
}
} else if (receiver_) {
PacketTime packet_time = packet->packet_time();
if (packet_time.timestamp != -1) {
int64_t queue_time = packet->arrival_time() - packet->send_time();
RTC_CHECK(queue_time >= 0);
packet_time.timestamp += (queue_time * 1000);
}
receiver_->DeliverPacket(packet->media_type(),
std::move(*packet->raw_packet()), packet_time);
}
}
int64_t FakeNetworkPipe::TimeUntilNextProcess() {
rtc::CritScope crit(&process_lock_);
return std::max<int64_t>(next_process_time_ - clock_->TimeInMilliseconds(),
0);
}
bool FakeNetworkPipe::HasTransport() const {
rtc::CritScope crit(&config_lock_);
return transport_ != nullptr;
}
bool FakeNetworkPipe::HasDemuxer() const {
rtc::CritScope crit(&config_lock_);
return demuxer_ != nullptr;
}
} // namespace webrtc

248
call/fake_network_pipe.h Normal file
View File

@ -0,0 +1,248 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef CALL_FAKE_NETWORK_PIPE_H_
#define CALL_FAKE_NETWORK_PIPE_H_
#include <deque>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include "api/call/transport.h"
#include "call/call.h"
#include "common_types.h" // NOLINT(build/include)
#include "modules/include/module.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/random.h"
#include "rtc_base/thread_annotations.h"
#include "typedefs.h" // NOLINT(build/include)
namespace webrtc {
class Clock;
class PacketReceiver;
enum class MediaType;
class NetworkPacket {
public:
NetworkPacket(rtc::CopyOnWriteBuffer packet,
int64_t send_time,
int64_t arrival_time,
rtc::Optional<PacketOptions> packet_options,
bool is_rtcp,
MediaType media_type_,
rtc::Optional<PacketTime> packet_time_);
// Disallow copy constructor (no deep copies of |data_|).
NetworkPacket(const NetworkPacket&) = delete;
// Allow move constructor/assignment, so that we can use in stl containers.
NetworkPacket(NetworkPacket&&);
NetworkPacket& operator=(NetworkPacket&&);
const uint8_t* data() const { return packet_.data(); }
size_t data_length() const { return packet_.size(); }
rtc::CopyOnWriteBuffer* raw_packet() { return &packet_; }
int64_t send_time() const { return send_time_; }
int64_t arrival_time() const { return arrival_time_; }
void IncrementArrivalTime(int64_t extra_delay) {
arrival_time_ += extra_delay;
}
PacketOptions packet_options() const {
return packet_options_.value_or(PacketOptions());
}
bool is_rtcp() const { return is_rtcp_; }
MediaType media_type() const { return media_type_; }
PacketTime packet_time() const { return packet_time_.value_or(PacketTime()); }
private:
rtc::CopyOnWriteBuffer packet_;
// The time the packet was sent out on the network.
int64_t send_time_;
// The time the packet should arrive at the receiver.
int64_t arrival_time_;
// If using a Transport for outgoing degradation, populate with
// PacketOptions (transport-wide sequence number) for RTP.
rtc::Optional<PacketOptions> packet_options_;
bool is_rtcp_;
// If using a PacketReceiver for incoming degradation, populate with
// appropriate MediaType and PacketTime. This type/timing will be kept and
// forwarded. The PacketTime might be altered to reflect time spent in fake
// network pipe.
MediaType media_type_;
rtc::Optional<PacketTime> packet_time_;
};
class Demuxer {
public:
virtual ~Demuxer() = default;
virtual void SetReceiver(PacketReceiver* receiver) = 0;
virtual void DeliverPacket(const NetworkPacket* packet,
const PacketTime& packet_time) = 0;
};
// This class doesn't have any internal thread safety, so caller must make sure
// SetReceiver and and DeliverPacket aren't called in a racy manner.
class DemuxerImpl final : public Demuxer {
public:
explicit DemuxerImpl(const std::map<uint8_t, MediaType>& payload_type_map);
void SetReceiver(PacketReceiver* receiver) override;
void DeliverPacket(const NetworkPacket* packet,
const PacketTime& packet_time) override;
private:
PacketReceiver* packet_receiver_;
const std::map<uint8_t, MediaType> payload_type_map_;
RTC_DISALLOW_COPY_AND_ASSIGN(DemuxerImpl);
};
// Class faking a network link. This is a simple and naive solution just faking
// capacity and adding an extra transport delay in addition to the capacity
// introduced delay.
class FakeNetworkPipe : public Transport, public PacketReceiver, public Module {
public:
struct Config {
Config() {}
// Queue length in number of packets.
size_t queue_length_packets = 0;
// Delay in addition to capacity induced delay.
int queue_delay_ms = 0;
// Standard deviation of the extra delay.
int delay_standard_deviation_ms = 0;
// Link capacity in kbps.
int link_capacity_kbps = 0;
// Random packet loss.
int loss_percent = 0;
// If packets are allowed to be reordered.
bool allow_reordering = false;
// The average length of a burst of lost packets.
int avg_burst_loss_length = -1;
};
// Use this constructor if you plan to insert packets using DeliverPacket().
FakeNetworkPipe(Clock* clock, const FakeNetworkPipe::Config& config);
// Use these constructors if you plan to insert packets using SendPacket().
FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer);
FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer,
uint64_t seed);
// Use this constructor if you plan to insert packets using SendRt[c?]p().
FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
Transport* transport);
virtual ~FakeNetworkPipe();
// Sets a new configuration. This won't affect packets already in the pipe.
void SetConfig(const FakeNetworkPipe::Config& config);
// Sends a new packet to the link. When/if packets are delivered, they will
// be passed to the receiver instance given in SetReceiver(). This method
// should only be used if a Demuxer was provided in the constructor.
virtual void SendPacket(const uint8_t* packet, size_t packet_length);
// Must not be called in parallel with SendPacket or Process.
void SetReceiver(PacketReceiver* receiver);
// Implements Transport interface. When/if packets are delivered, they will
// be passed to the transport instance given in SetReceiverTransport(). These
// methods should only be called if a Transport instance was provided in the
// constructor.
bool SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options) override;
bool SendRtcp(const uint8_t* packet, size_t length) override;
// Implements the PacketReceiver interface. When/if packets are delivered,
// they will be passed directly to the receiver instance given in
// SetReceiver(), without passing through a Demuxer. The receive time in
// PacketTime will be increased by the amount of time the packet spent in the
// fake network pipe.
PacketReceiver::DeliveryStatus DeliverPacket(
MediaType media_type,
rtc::CopyOnWriteBuffer packet,
const PacketTime& packet_time) override;
// Processes the network queues and trigger PacketReceiver::IncomingPacket for
// packets ready to be delivered.
void Process() override;
int64_t TimeUntilNextProcess() override;
// Get statistics.
float PercentageLoss();
int AverageDelay();
size_t DroppedPackets();
size_t SentPackets();
private:
// Returns true if enqueued, or false if packet was dropped.
bool EnqueuePacket(rtc::CopyOnWriteBuffer packet,
rtc::Optional<PacketOptions> options,
bool is_rtcp,
MediaType media_type,
rtc::Optional<PacketTime> packet_time);
void DeliverPacket(NetworkPacket* packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(config_lock_);
bool HasTransport() const;
bool HasDemuxer() const;
Clock* const clock_;
// |config_lock| guards the mostly constant things like the callbacks.
rtc::CriticalSection config_lock_;
const std::unique_ptr<Demuxer> demuxer_ RTC_GUARDED_BY(config_lock_);
PacketReceiver* receiver_ RTC_GUARDED_BY(config_lock_);
Transport* const transport_ RTC_GUARDED_BY(config_lock_);
// |process_lock| guards the data structures involved in delay and loss
// processes, such as the packet queues.
rtc::CriticalSection process_lock_;
std::queue<NetworkPacket> capacity_link_ RTC_GUARDED_BY(process_lock_);
Random random_;
std::deque<NetworkPacket> delay_link_;
// Link configuration.
Config config_ RTC_GUARDED_BY(config_lock_);
// Statistics.
size_t dropped_packets_ RTC_GUARDED_BY(process_lock_);
size_t sent_packets_ RTC_GUARDED_BY(process_lock_);
int64_t total_packet_delay_ RTC_GUARDED_BY(process_lock_);
// Are we currently dropping a burst of packets?
bool bursting_;
// The probability to drop the packet if we are currently dropping a
// burst of packet
double prob_loss_bursting_ RTC_GUARDED_BY(config_lock_);
// The probability to drop a burst of packets.
double prob_start_bursting_ RTC_GUARDED_BY(config_lock_);
int64_t next_process_time_;
int64_t last_log_time_;
int64_t capacity_delay_error_bytes_ = 0;
RTC_DISALLOW_COPY_AND_ASSIGN(FakeNetworkPipe);
};
} // namespace webrtc
#endif // CALL_FAKE_NETWORK_PIPE_H_

View File

@ -8,12 +8,13 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#include "call/fake_network_pipe.h"
#include <memory>
#include "call/call.h"
#include "modules/rtp_rtcp/include/rtp_header_parser.h"
#include "system_wrappers/include/clock.h"
#include "test/fake_network_pipe.h"
#include "test/gmock.h"
#include "test/gtest.h"
@ -95,8 +96,8 @@ TEST_F(FakeNetworkPipeTest, CapacityTest) {
SendPackets(pipe.get(), kNumPackets, kPacketSize);
// Time to get one packet through the link.
const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps,
kPacketSize);
const int kPacketTimeMs =
PacketTimeMs(config.link_capacity_kbps, kPacketSize);
// Time haven't increased yet, so we souldn't get any packets.
EXPECT_CALL(*demuxer, DeliverPacket(_, _)).Times(0);
@ -133,8 +134,8 @@ TEST_F(FakeNetworkPipeTest, ExtraDelayTest) {
SendPackets(pipe.get(), kNumPackets, kPacketSize);
// Time to get one packet through the link.
const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps,
kPacketSize);
const int kPacketTimeMs =
PacketTimeMs(config.link_capacity_kbps, kPacketSize);
// Increase more than kPacketTimeMs, but not more than the extra delay.
fake_clock_.AdvanceTimeMilliseconds(kPacketTimeMs);
@ -163,8 +164,8 @@ TEST_F(FakeNetworkPipeTest, QueueLengthTest) {
&fake_clock_, config, std::unique_ptr<Demuxer>(demuxer)));
const int kPacketSize = 1000;
const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps,
kPacketSize);
const int kPacketTimeMs =
PacketTimeMs(config.link_capacity_kbps, kPacketSize);
// Send three packets and verify only 2 are delivered.
SendPackets(pipe.get(), 3, kPacketSize);
@ -187,8 +188,8 @@ TEST_F(FakeNetworkPipeTest, StatisticsTest) {
&fake_clock_, config, std::unique_ptr<Demuxer>(demuxer)));
const int kPacketSize = 1000;
const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps,
kPacketSize);
const int kPacketTimeMs =
PacketTimeMs(config.link_capacity_kbps, kPacketSize);
// Send three packets and verify only 2 are delivered.
SendPackets(pipe.get(), 3, kPacketSize);
@ -201,8 +202,8 @@ TEST_F(FakeNetworkPipeTest, StatisticsTest) {
// Packet 1: kPacketTimeMs + config.queue_delay_ms,
// packet 2: 2 * kPacketTimeMs + config.queue_delay_ms => 170 ms average.
EXPECT_EQ(pipe->AverageDelay(), 170);
EXPECT_EQ(pipe->sent_packets(), 2u);
EXPECT_EQ(pipe->dropped_packets(), 1u);
EXPECT_EQ(pipe->SentPackets(), 2u);
EXPECT_EQ(pipe->DroppedPackets(), 1u);
EXPECT_EQ(pipe->PercentageLoss(), 1 / 3.f);
}
@ -259,7 +260,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithEmptyPipeTest) {
}
// Check that all the packets were sent.
EXPECT_EQ(static_cast<size_t>(2 * kNumPackets), pipe->sent_packets());
EXPECT_EQ(static_cast<size_t>(2 * kNumPackets), pipe->SentPackets());
fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess());
EXPECT_CALL(*demuxer, DeliverPacket(_, _)).Times(0);
pipe->Process();
@ -313,7 +314,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithPacketsInPipeTest) {
}
// Check that all the packets were sent.
EXPECT_EQ(static_cast<size_t>(2 * kNumPackets), pipe->sent_packets());
EXPECT_EQ(static_cast<size_t>(2 * kNumPackets), pipe->SentPackets());
fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess());
EXPECT_CALL(*demuxer, DeliverPacket(_, _)).Times(0);
pipe->Process();
@ -427,17 +428,18 @@ TEST(DemuxerImplTest, Demuxing) {
MockReceiver mock_receiver;
demuxer.SetReceiver(&mock_receiver);
std::vector<uint8_t> data(kPacketSize);
rtc::CopyOnWriteBuffer data(kPacketSize);
data[1] = kVideoPayloadType;
std::unique_ptr<NetworkPacket> packet(
new NetworkPacket(&data[0], kPacketSize, kTimeNow, kArrivalTime));
new NetworkPacket(data, kTimeNow, kArrivalTime, rtc::nullopt, false,
MediaType::ANY, rtc::nullopt));
EXPECT_CALL(mock_receiver, DeliverPacket(MediaType::VIDEO, _, _))
.WillOnce(Return(PacketReceiver::DELIVERY_OK));
demuxer.DeliverPacket(packet.get(), PacketTime());
data[1] = kAudioPayloadType;
packet.reset(
new NetworkPacket(&data[0], kPacketSize, kTimeNow, kArrivalTime));
packet.reset(new NetworkPacket(data, kTimeNow, kArrivalTime, rtc::nullopt,
false, MediaType::ANY, rtc::nullopt));
EXPECT_CALL(mock_receiver, DeliverPacket(MediaType::AUDIO, _, _))
.WillOnce(Return(PacketReceiver::DELIVERY_OK));
demuxer.DeliverPacket(packet.get(), PacketTime());

View File

@ -336,7 +336,6 @@ if (rtc_include_tests) {
"../system_wrappers",
]
sources = [
"fake_network_pipe_unittest.cc",
"frame_generator_unittest.cc",
"rtp_file_reader_unittest.cc",
"rtp_file_writer_unittest.cc",
@ -490,8 +489,6 @@ rtc_source_set("direct_transport") {
sources = [
"direct_transport.cc",
"direct_transport.h",
"fake_network_pipe.cc",
"fake_network_pipe.h",
]
if (!build_with_chromium && is_clang) {
# Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
@ -501,7 +498,6 @@ rtc_source_set("direct_transport") {
"..:webrtc_common",
"../:typedefs",
"../api:transport_api",
"../call",
"../call:call_interfaces",
"../modules/rtp_rtcp",
"../rtc_base:rtc_base_approved",
@ -510,6 +506,7 @@ rtc_source_set("direct_transport") {
]
public_deps = [
":single_threaded_task_queue",
"../call:fake_network",
]
}

View File

@ -16,9 +16,9 @@
#include "api/call/transport.h"
#include "call/call.h"
#include "call/fake_network_pipe.h"
#include "rtc_base/sequenced_task_checker.h"
#include "rtc_base/thread_annotations.h"
#include "test/fake_network_pipe.h"
#include "test/single_threaded_task_queue.h"
namespace webrtc {

View File

@ -1,256 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/fake_network_pipe.h"
#include <assert.h>
#include <math.h>
#include <string.h>
#include <algorithm>
#include <cmath>
#include "call/call.h"
#include "modules/rtp_rtcp/include/rtp_header_parser.h"
#include "rtc_base/logging.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
namespace {
constexpr int64_t kDefaultProcessIntervalMs = 5;
}
DemuxerImpl::DemuxerImpl(const std::map<uint8_t, MediaType>& payload_type_map)
: packet_receiver_(nullptr), payload_type_map_(payload_type_map) {}
void DemuxerImpl::SetReceiver(PacketReceiver* receiver) {
packet_receiver_ = receiver;
}
void DemuxerImpl::DeliverPacket(const NetworkPacket* packet,
const PacketTime& packet_time) {
// No packet receiver means that this demuxer will terminate the flow of
// packets.
if (!packet_receiver_)
return;
const uint8_t* const packet_data = packet->data();
const size_t packet_length = packet->data_length();
MediaType media_type = MediaType::ANY;
if (!RtpHeaderParser::IsRtcp(packet_data, packet_length)) {
RTC_CHECK_GE(packet_length, 2);
const uint8_t payload_type = packet_data[1] & 0x7f;
std::map<uint8_t, MediaType>::const_iterator it =
payload_type_map_.find(payload_type);
RTC_CHECK(it != payload_type_map_.end())
<< "payload type " << static_cast<int>(payload_type) << " unknown.";
media_type = it->second;
}
packet_receiver_->DeliverPacket(
media_type, rtc::CopyOnWriteBuffer(packet_data, packet_length),
packet_time);
}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer)
: FakeNetworkPipe(clock, config, std::move(demuxer), 1) {}
FakeNetworkPipe::FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer,
uint64_t seed)
: clock_(clock),
demuxer_(std::move(demuxer)),
random_(seed),
config_(),
dropped_packets_(0),
sent_packets_(0),
total_packet_delay_(0),
bursting_(false),
next_process_time_(clock_->TimeInMilliseconds()),
last_log_time_(clock_->TimeInMilliseconds()) {
SetConfig(config);
}
FakeNetworkPipe::~FakeNetworkPipe() {
while (!capacity_link_.empty()) {
delete capacity_link_.front();
capacity_link_.pop();
}
while (!delay_link_.empty()) {
delete *delay_link_.begin();
delay_link_.erase(delay_link_.begin());
}
}
void FakeNetworkPipe::SetReceiver(PacketReceiver* receiver) {
RTC_CHECK(demuxer_);
demuxer_->SetReceiver(receiver);
}
void FakeNetworkPipe::SetConfig(const FakeNetworkPipe::Config& config) {
rtc::CritScope crit(&lock_);
config_ = config; // Shallow copy of the struct.
double prob_loss = config.loss_percent / 100.0;
if (config_.avg_burst_loss_length == -1) {
// Uniform loss
prob_loss_bursting_ = prob_loss;
prob_start_bursting_ = prob_loss;
} else {
// Lose packets according to a gilbert-elliot model.
int avg_burst_loss_length = config.avg_burst_loss_length;
int min_avg_burst_loss_length = std::ceil(prob_loss / (1 - prob_loss));
RTC_CHECK_GT(avg_burst_loss_length, min_avg_burst_loss_length)
<< "For a total packet loss of " << config.loss_percent << "%% then"
<< " avg_burst_loss_length must be " << min_avg_burst_loss_length + 1
<< " or higher.";
prob_loss_bursting_ = (1.0 - 1.0 / avg_burst_loss_length);
prob_start_bursting_ = prob_loss / (1 - prob_loss) / avg_burst_loss_length;
}
}
void FakeNetworkPipe::SendPacket(const uint8_t* data, size_t data_length) {
RTC_CHECK(demuxer_);
rtc::CritScope crit(&lock_);
if (config_.queue_length_packets > 0 &&
capacity_link_.size() >= config_.queue_length_packets) {
// Too many packet on the link, drop this one.
++dropped_packets_;
return;
}
int64_t time_now = clock_->TimeInMilliseconds();
// Delay introduced by the link capacity.
int64_t capacity_delay_ms = 0;
if (config_.link_capacity_kbps > 0) {
const int bytes_per_millisecond = config_.link_capacity_kbps / 8;
// To round to the closest millisecond we add half a milliseconds worth of
// bytes to the delay calculation.
capacity_delay_ms = (data_length + capacity_delay_error_bytes_ +
bytes_per_millisecond / 2) /
bytes_per_millisecond;
capacity_delay_error_bytes_ +=
data_length - capacity_delay_ms * bytes_per_millisecond;
}
int64_t network_start_time = time_now;
// Check if there already are packets on the link and change network start
// time forward if there is.
if (!capacity_link_.empty() &&
network_start_time < capacity_link_.back()->arrival_time())
network_start_time = capacity_link_.back()->arrival_time();
int64_t arrival_time = network_start_time + capacity_delay_ms;
NetworkPacket* packet = new NetworkPacket(data, data_length, time_now,
arrival_time);
capacity_link_.push(packet);
}
float FakeNetworkPipe::PercentageLoss() {
rtc::CritScope crit(&lock_);
if (sent_packets_ == 0)
return 0;
return static_cast<float>(dropped_packets_) /
(sent_packets_ + dropped_packets_);
}
int FakeNetworkPipe::AverageDelay() {
rtc::CritScope crit(&lock_);
if (sent_packets_ == 0)
return 0;
return static_cast<int>(total_packet_delay_ /
static_cast<int64_t>(sent_packets_));
}
void FakeNetworkPipe::Process() {
int64_t time_now = clock_->TimeInMilliseconds();
std::queue<NetworkPacket*> packets_to_deliver;
{
rtc::CritScope crit(&lock_);
if (time_now - last_log_time_ > 5000) {
int64_t queueing_delay_ms = 0;
if (!capacity_link_.empty()) {
queueing_delay_ms = time_now - capacity_link_.front()->send_time();
}
RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_ms << " ms.";
last_log_time_ = time_now;
}
// Check the capacity link first.
while (!capacity_link_.empty() &&
time_now >= capacity_link_.front()->arrival_time()) {
// Time to get this packet.
NetworkPacket* packet = capacity_link_.front();
capacity_link_.pop();
// Drop packets at an average rate of |config_.loss_percent| with
// and average loss burst length of |config_.avg_burst_loss_length|.
if ((bursting_ && random_.Rand<double>() < prob_loss_bursting_) ||
(!bursting_ && random_.Rand<double>() < prob_start_bursting_)) {
bursting_ = true;
delete packet;
continue;
} else {
bursting_ = false;
}
int arrival_time_jitter = random_.Gaussian(
config_.queue_delay_ms, config_.delay_standard_deviation_ms);
// If reordering is not allowed then adjust arrival_time_jitter
// to make sure all packets are sent in order.
if (!config_.allow_reordering && !delay_link_.empty() &&
packet->arrival_time() + arrival_time_jitter <
(*delay_link_.rbegin())->arrival_time()) {
arrival_time_jitter =
(*delay_link_.rbegin())->arrival_time() - packet->arrival_time();
}
packet->IncrementArrivalTime(arrival_time_jitter);
delay_link_.insert(packet);
}
// Check the extra delay queue.
while (!delay_link_.empty() &&
time_now >= (*delay_link_.begin())->arrival_time()) {
// Deliver this packet.
NetworkPacket* packet = *delay_link_.begin();
packets_to_deliver.push(packet);
delay_link_.erase(delay_link_.begin());
// |time_now| might be later than when the packet should have arrived, due
// to NetworkProcess being called too late. For stats, use the time it
// should have been on the link.
total_packet_delay_ += packet->arrival_time() - packet->send_time();
}
sent_packets_ += packets_to_deliver.size();
}
while (!packets_to_deliver.empty()) {
NetworkPacket* packet = packets_to_deliver.front();
packets_to_deliver.pop();
demuxer_->DeliverPacket(packet, PacketTime());
delete packet;
}
next_process_time_ = !delay_link_.empty()
? (*delay_link_.begin())->arrival_time()
: time_now + kDefaultProcessIntervalMs;
}
int64_t FakeNetworkPipe::TimeUntilNextProcess() const {
rtc::CritScope crit(&lock_);
return std::max<int64_t>(next_process_time_ - clock_->TimeInMilliseconds(),
0);
}
} // namespace webrtc

View File

@ -1,185 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef TEST_FAKE_NETWORK_PIPE_H_
#define TEST_FAKE_NETWORK_PIPE_H_
#include <string.h>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include "common_types.h" // NOLINT(build/include)
#include "rtc_base/constructormagic.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/random.h"
#include "typedefs.h" // NOLINT(build/include)
namespace webrtc {
class Clock;
class PacketReceiver;
enum class MediaType;
class NetworkPacket {
public:
NetworkPacket(const uint8_t* data,
size_t length,
int64_t send_time,
int64_t arrival_time)
: data_(new uint8_t[length]),
data_length_(length),
send_time_(send_time),
arrival_time_(arrival_time) {
memcpy(data_.get(), data, length);
}
uint8_t* data() const { return data_.get(); }
size_t data_length() const { return data_length_; }
int64_t send_time() const { return send_time_; }
int64_t arrival_time() const { return arrival_time_; }
void IncrementArrivalTime(int64_t extra_delay) {
arrival_time_ += extra_delay;
}
private:
// The packet data.
std::unique_ptr<uint8_t[]> data_;
// Length of data_.
size_t data_length_;
// The time the packet was sent out on the network.
const int64_t send_time_;
// The time the packet should arrive at the receiver.
int64_t arrival_time_;
};
class Demuxer {
public:
virtual ~Demuxer() = default;
virtual void SetReceiver(PacketReceiver* receiver) = 0;
virtual void DeliverPacket(const NetworkPacket* packet,
const PacketTime& packet_time) = 0;
};
class DemuxerImpl final : public Demuxer {
public:
explicit DemuxerImpl(const std::map<uint8_t, MediaType>& payload_type_map);
void SetReceiver(PacketReceiver* receiver) override;
void DeliverPacket(const NetworkPacket* packet,
const PacketTime& packet_time) override;
private:
PacketReceiver* packet_receiver_;
const std::map<uint8_t, MediaType> payload_type_map_;
RTC_DISALLOW_COPY_AND_ASSIGN(DemuxerImpl);
};
// Class faking a network link. This is a simple and naive solution just faking
// capacity and adding an extra transport delay in addition to the capacity
// introduced delay.
class FakeNetworkPipe {
public:
struct Config {
Config() {}
// Queue length in number of packets.
size_t queue_length_packets = 0;
// Delay in addition to capacity induced delay.
int queue_delay_ms = 0;
// Standard deviation of the extra delay.
int delay_standard_deviation_ms = 0;
// Link capacity in kbps.
int link_capacity_kbps = 0;
// Random packet loss.
int loss_percent = 0;
// If packets are allowed to be reordered.
bool allow_reordering = false;
// The average length of a burst of lost packets.
int avg_burst_loss_length = -1;
};
FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer);
FakeNetworkPipe(Clock* clock,
const FakeNetworkPipe::Config& config,
std::unique_ptr<Demuxer> demuxer,
uint64_t seed);
virtual ~FakeNetworkPipe();
// Sets a new configuration. This won't affect packets already in the pipe.
void SetConfig(const FakeNetworkPipe::Config& config);
// Sends a new packet to the link.
virtual void SendPacket(const uint8_t* packet, size_t packet_length);
// Must not be called in parallel with SendPacket or Process.
void SetReceiver(PacketReceiver* receiver);
// Processes the network queues and trigger PacketReceiver::IncomingPacket for
// packets ready to be delivered.
virtual void Process();
int64_t TimeUntilNextProcess() const;
// Get statistics.
float PercentageLoss();
int AverageDelay();
size_t dropped_packets() { return dropped_packets_; }
size_t sent_packets() { return sent_packets_; }
protected:
Clock* const clock_;
rtc::CriticalSection lock_;
const std::unique_ptr<Demuxer> demuxer_;
std::queue<NetworkPacket*> capacity_link_;
Random random_;
// Since we need to access both the packet with the earliest and latest
// arrival time we need to use a multiset to keep all packets sorted,
// hence, we cannot use a priority queue.
struct PacketArrivalTimeComparator {
bool operator()(const NetworkPacket* p1, const NetworkPacket* p2) {
return p1->arrival_time() < p2->arrival_time();
}
};
std::multiset<NetworkPacket*, PacketArrivalTimeComparator> delay_link_;
// Link configuration.
Config config_;
// Statistics.
size_t dropped_packets_;
size_t sent_packets_;
int64_t total_packet_delay_;
// Are we currently dropping a burst of packets?
bool bursting_;
// The probability to drop the packet if we are currently dropping a
// burst of packet
double prob_loss_bursting_;
// The probability to drop a burst of packets.
double prob_start_bursting_;
int64_t next_process_time_;
int64_t last_log_time_;
int64_t capacity_delay_error_bytes_ = 0;
RTC_DISALLOW_COPY_AND_ASSIGN(FakeNetworkPipe);
};
} // namespace webrtc
#endif // TEST_FAKE_NETWORK_PIPE_H_

View File

@ -10,13 +10,13 @@
#ifndef TEST_LAYER_FILTERING_TRANSPORT_H_
#define TEST_LAYER_FILTERING_TRANSPORT_H_
#include "call/call.h"
#include "test/direct_transport.h"
#include "test/fake_network_pipe.h"
#include "test/single_threaded_task_queue.h"
#include <map>
#include "call/call.h"
#include "call/fake_network_pipe.h"
#include "test/direct_transport.h"
#include "test/single_threaded_task_queue.h"
namespace webrtc {
namespace test {