diff --git a/call/BUILD.gn b/call/BUILD.gn index b9a6dc1cad..e447d3765c 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -143,6 +143,8 @@ rtc_static_library("call") { "call.cc", "callfactory.cc", "callfactory.h", + "degraded_call.cc", + "degraded_call.h", "flexfec_receive_stream_impl.cc", "flexfec_receive_stream_impl.h", ] @@ -155,6 +157,7 @@ rtc_static_library("call") { deps = [ ":bitrate_allocator", ":call_interfaces", + ":fake_network", ":rtp_interfaces", ":rtp_receiver", ":rtp_sender", @@ -184,6 +187,7 @@ rtc_static_library("call") { "../rtc_base:safe_minmax", "../rtc_base:sequenced_task_checker", "../system_wrappers", + "../system_wrappers:field_trial_api", "../system_wrappers:metrics_api", "../video", ] @@ -213,6 +217,29 @@ rtc_source_set("video_stream_api") { ] } +rtc_source_set("fake_network") { + sources = [ + "fake_network_pipe.cc", + "fake_network_pipe.h", + ] + deps = [ + ":call_interfaces", + "..:typedefs", + "..:webrtc_common", + "../api:transport_api", + "../modules:module_api", + "../modules/rtp_rtcp", + "../rtc_base:rtc_base_approved", + "../rtc_base:sequenced_task_checker", + "../system_wrappers", + ] + + if (!build_with_chromium && is_clang) { + # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). + suppressed_configs += [ "//build/config/clang:find_bad_constructs" ] + } +} + if (rtc_include_tests) { rtc_source_set("call_tests") { testonly = true @@ -340,4 +367,24 @@ if (rtc_include_tests) { "//test:test_support", ] } + + rtc_test("fake_network_unittests") { + deps = [ + ":call_interfaces", + ":fake_network", + "../modules/rtp_rtcp", + "../rtc_base:rtc_base_approved", + "../system_wrappers", + "../test:test_common", + "../test:test_main", + "//testing/gtest", + ] + sources = [ + "test/fake_network_pipe_unittest.cc", + ] + if (!build_with_chromium && is_clang) { + # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). + suppressed_configs += [ "//build/config/clang:find_bad_constructs" ] + } + } } diff --git a/call/call.cc b/call/call.cc index daca232777..a398181e8c 100644 --- a/call/call.cc +++ b/call/call.cc @@ -292,19 +292,24 @@ class Call : public webrtc::Call, // single mapping from ssrc to a more abstract receive stream, with // accessor methods for all configuration we need at this level. struct ReceiveRtpConfig { - ReceiveRtpConfig() = default; // Needed by std::map - ReceiveRtpConfig(const std::vector& extensions, - bool use_send_side_bwe) - : extensions(extensions), use_send_side_bwe(use_send_side_bwe) {} + explicit ReceiveRtpConfig(const webrtc::AudioReceiveStream::Config& config) + : extensions(config.rtp.extensions), + use_send_side_bwe(UseSendSideBwe(config)) {} + explicit ReceiveRtpConfig(const webrtc::VideoReceiveStream::Config& config) + : extensions(config.rtp.extensions), + use_send_side_bwe(UseSendSideBwe(config)) {} + explicit ReceiveRtpConfig(const FlexfecReceiveStream::Config& config) + : extensions(config.rtp_header_extensions), + use_send_side_bwe(UseSendSideBwe(config)) {} // Registered RTP header extensions for each stream. Note that RTP header // extensions are negotiated per track ("m= line") in the SDP, but we have // no notion of tracks at the Call level. We therefore store the RTP header // extensions per SSRC instead, which leads to some storage overhead. - RtpHeaderExtensionMap extensions; + const RtpHeaderExtensionMap extensions; // Set if both RTP extension the RTCP feedback message needed for // send side BWE are negotiated. - bool use_send_side_bwe = false; + const bool use_send_side_bwe; }; std::map receive_rtp_config_ RTC_GUARDED_BY(receive_crit_); @@ -641,8 +646,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( module_process_thread_.get(), config, config_.audio_state, event_log_); { WriteLockScoped write_lock(*receive_crit_); - receive_rtp_config_[config.rtp.remote_ssrc] = - ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config)); + receive_rtp_config_.emplace(config.rtp.remote_ssrc, config); audio_receive_streams_.insert(receive_stream); ConfigureSync(config.sync_group); @@ -791,8 +795,6 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( module_process_thread_.get(), call_stats_.get()); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); - ReceiveRtpConfig receive_config(config.rtp.extensions, - UseSendSideBwe(config)); { WriteLockScoped write_lock(*receive_crit_); if (config.rtp.rtx_ssrc) { @@ -800,9 +802,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( // stream. Since the transport_send_cc negotiation is per payload // type, we may get an incorrect value for the rtx stream, but // that is unlikely to matter in practice. - receive_rtp_config_[config.rtp.rtx_ssrc] = receive_config; + receive_rtp_config_.emplace(config.rtp.rtx_ssrc, config); } - receive_rtp_config_[config.rtp.remote_ssrc] = receive_config; + receive_rtp_config_.emplace(config.rtp.remote_ssrc, config); video_receive_streams_.insert(receive_stream); ConfigureSync(config.sync_group); } @@ -865,8 +867,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == receive_rtp_config_.end()); - receive_rtp_config_[config.remote_ssrc] = - ReceiveRtpConfig(config.rtp_header_extensions, UseSendSideBwe(config)); + receive_rtp_config_.emplace(config.remote_ssrc, config); } // TODO(brandtr): Store config in RtcEventLog here. @@ -1312,7 +1313,7 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { // deregistering in the |receive_rtp_config_| map is protected by that lock. // So by not passing the packet on to demuxing in this case, we prevent // incoming packets to be passed on via the demuxer to a receive stream - // which is being torned down. + // which is being torn down. return; } parsed_packet.IdentifyExtensions(it->second.extensions); diff --git a/call/callfactory.cc b/call/callfactory.cc index 0f2eecb9f1..4ab5b89d6d 100644 --- a/call/callfactory.cc +++ b/call/callfactory.cc @@ -11,12 +11,74 @@ #include "call/callfactory.h" #include +#include +#include +#include "api/optional.h" #include "call/call.h" +#include "call/degraded_call.h" +#include "call/fake_network_pipe.h" +#include "system_wrappers/include/field_trial.h" namespace webrtc { +namespace { +bool ParseConfigParam(std::string exp_name, int* field) { + std::string group = field_trial::FindFullName(exp_name); + if (group == "") + return false; + + return (sscanf(group.c_str(), "%d", field) == 1); +} + +rtc::Optional ParseDegradationConfig( + bool send) { + std::string exp_prefix = "WebRTCFakeNetwork"; + if (send) { + exp_prefix += "Send"; + } else { + exp_prefix += "Receive"; + } + + webrtc::FakeNetworkPipe::Config config; + bool configured = false; + configured |= + ParseConfigParam(exp_prefix + "DelayMs", &config.queue_delay_ms); + configured |= ParseConfigParam(exp_prefix + "DelayStdDevMs", + &config.delay_standard_deviation_ms); + int queue_length = 0; + if (ParseConfigParam(exp_prefix + "QueueLength", &queue_length)) { + RTC_CHECK_GE(queue_length, 0); + config.queue_length_packets = queue_length; + configured = true; + } + configured |= + ParseConfigParam(exp_prefix + "CapacityKbps", &config.link_capacity_kbps); + configured |= + ParseConfigParam(exp_prefix + "LossPercent", &config.loss_percent); + int allow_reordering = 0; + if (ParseConfigParam(exp_prefix + "AllowReordering", &allow_reordering)) { + config.allow_reordering = true; + configured = true; + } + configured |= ParseConfigParam(exp_prefix + "AvgBurstLossLength", + &config.avg_burst_loss_length); + return configured ? rtc::Optional(config) + : rtc::nullopt; +} +} // namespace Call* CallFactory::CreateCall(const Call::Config& config) { + rtc::Optional send_degradation_config = + ParseDegradationConfig(true); + rtc::Optional receive_degradation_config = + ParseDegradationConfig(false); + + if (send_degradation_config || receive_degradation_config) { + return new DegradedCall(std::unique_ptr(Call::Create(config)), + send_degradation_config, + receive_degradation_config); + } + return Call::Create(config); } diff --git a/call/degraded_call.cc b/call/degraded_call.cc new file mode 100644 index 0000000000..356147fd8b --- /dev/null +++ b/call/degraded_call.cc @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include + +#include "call/degraded_call.h" + +#include "rtc_base/ptr_util.h" + +namespace webrtc { +DegradedCall::DegradedCall( + std::unique_ptr call, + rtc::Optional send_config, + rtc::Optional receive_config) + : clock_(Clock::GetRealTimeClock()), + call_(std::move(call)), + send_config_(send_config), + send_process_thread_( + send_config_ ? ProcessThread::Create("DegradedSendThread") : nullptr), + num_send_streams_(0), + receive_config_(receive_config) { + if (receive_config_) { + receive_pipe_ = + rtc::MakeUnique(clock_, *receive_config_); + receive_pipe_->SetReceiver(call_->Receiver()); + } + if (send_process_thread_) { + send_process_thread_->Start(); + } +} + +DegradedCall::~DegradedCall() { + if (send_pipe_) { + send_process_thread_->DeRegisterModule(send_pipe_.get()); + } + if (send_process_thread_) { + send_process_thread_->Stop(); + } +} + +AudioSendStream* DegradedCall::CreateAudioSendStream( + const AudioSendStream::Config& config) { + return call_->CreateAudioSendStream(config); +} + +void DegradedCall::DestroyAudioSendStream(AudioSendStream* send_stream) { + call_->DestroyAudioSendStream(send_stream); +} + +AudioReceiveStream* DegradedCall::CreateAudioReceiveStream( + const AudioReceiveStream::Config& config) { + return call_->CreateAudioReceiveStream(config); +} + +void DegradedCall::DestroyAudioReceiveStream( + AudioReceiveStream* receive_stream) { + call_->DestroyAudioReceiveStream(receive_stream); +} + +VideoSendStream* DegradedCall::CreateVideoSendStream( + VideoSendStream::Config config, + VideoEncoderConfig encoder_config) { + if (send_config_ && !send_pipe_) { + send_pipe_ = rtc::MakeUnique(clock_, *send_config_, + config.send_transport); + config.send_transport = this; + send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); + } + ++num_send_streams_; + return call_->CreateVideoSendStream(std::move(config), + std::move(encoder_config)); +} + +VideoSendStream* DegradedCall::CreateVideoSendStream( + VideoSendStream::Config config, + VideoEncoderConfig encoder_config, + std::unique_ptr fec_controller) { + if (send_config_ && !send_pipe_) { + send_pipe_ = rtc::MakeUnique(clock_, *send_config_, + config.send_transport); + config.send_transport = this; + send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE); + } + ++num_send_streams_; + return call_->CreateVideoSendStream( + std::move(config), std::move(encoder_config), std::move(fec_controller)); +} + +void DegradedCall::DestroyVideoSendStream(VideoSendStream* send_stream) { + if (send_pipe_ && num_send_streams_ > 0) { + --num_send_streams_; + if (num_send_streams_ == 0) { + send_process_thread_->DeRegisterModule(send_pipe_.get()); + send_pipe_.reset(); + } + } + call_->DestroyVideoSendStream(send_stream); +} + +VideoReceiveStream* DegradedCall::CreateVideoReceiveStream( + VideoReceiveStream::Config configuration) { + return call_->CreateVideoReceiveStream(std::move(configuration)); +} + +void DegradedCall::DestroyVideoReceiveStream( + VideoReceiveStream* receive_stream) { + call_->DestroyVideoReceiveStream(receive_stream); +} + +FlexfecReceiveStream* DegradedCall::CreateFlexfecReceiveStream( + const FlexfecReceiveStream::Config& config) { + return call_->CreateFlexfecReceiveStream(config); +} + +void DegradedCall::DestroyFlexfecReceiveStream( + FlexfecReceiveStream* receive_stream) { + call_->DestroyFlexfecReceiveStream(receive_stream); +} + +PacketReceiver* DegradedCall::Receiver() { + if (receive_config_) { + return this; + } + return call_->Receiver(); +} + +RtpTransportControllerSendInterface* +DegradedCall::GetTransportControllerSend() { + return call_->GetTransportControllerSend(); +} + +Call::Stats DegradedCall::GetStats() const { + return call_->GetStats(); +} + +void DegradedCall::SetBitrateAllocationStrategy( + std::unique_ptr + bitrate_allocation_strategy) { + call_->SetBitrateAllocationStrategy(std::move(bitrate_allocation_strategy)); +} + +void DegradedCall::SignalChannelNetworkState(MediaType media, + NetworkState state) { + call_->SignalChannelNetworkState(media, state); +} + +void DegradedCall::OnTransportOverheadChanged( + MediaType media, + int transport_overhead_per_packet) { + call_->OnTransportOverheadChanged(media, transport_overhead_per_packet); +} + +void DegradedCall::OnSentPacket(const rtc::SentPacket& sent_packet) { + if (send_config_) { + // If we have a degraded send-transport, we have already notified call + // about the supposed network send time. Discard the actual network send + // time in order to properly fool the BWE. + return; + } + call_->OnSentPacket(sent_packet); +} + +bool DegradedCall::SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options) { + // A call here comes from the RTP stack (probably pacer). We intercept it and + // put it in the fake network pipe instead, but report to Call that is has + // been sent, so that the bandwidth estimator sees the delay we add. + send_pipe_->SendRtp(packet, length, options); + if (options.packet_id != -1) { + rtc::SentPacket packet_info; + packet_info.packet_id = options.packet_id; + packet_info.send_time_ms = clock_->TimeInMilliseconds(); + call_->OnSentPacket(packet_info); + } + return true; +} + +bool DegradedCall::SendRtcp(const uint8_t* packet, size_t length) { + send_pipe_->SendRtcp(packet, length); + return true; +} + +PacketReceiver::DeliveryStatus DegradedCall::DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + const PacketTime& packet_time) { + PacketReceiver::DeliveryStatus status = + receive_pipe_->DeliverPacket(media_type, std::move(packet), packet_time); + // This is not optimal, but there are many places where there are thread + // checks that fail if we're not using the worker thread call into this + // method. If we want to fix this we probably need a task queue to do handover + // of all overriden methods, which feels like overikill for the current use + // case. + // By just having this thread call out via the Process() method we work around + // that, with the tradeoff that a non-zero delay may become a little larger + // than anticipated at very low packet rates. + receive_pipe_->Process(); + return status; +} + +} // namespace webrtc diff --git a/call/degraded_call.h b/call/degraded_call.h new file mode 100644 index 0000000000..5658873922 --- /dev/null +++ b/call/degraded_call.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef CALL_DEGRADED_CALL_H_ +#define CALL_DEGRADED_CALL_H_ + +#include + +#include "api/call/transport.h" +#include "api/optional.h" +#include "call/call.h" +#include "call/fake_network_pipe.h" +#include "modules/utility/include/process_thread.h" +#include "system_wrappers/include/clock.h" + +namespace webrtc { + +class DegradedCall : public Call, private Transport, private PacketReceiver { + public: + explicit DegradedCall(std::unique_ptr call, + rtc::Optional send_config, + rtc::Optional receive_config); + ~DegradedCall() override; + + // Implements Call. + AudioSendStream* CreateAudioSendStream( + const AudioSendStream::Config& config) override; + void DestroyAudioSendStream(AudioSendStream* send_stream) override; + + AudioReceiveStream* CreateAudioReceiveStream( + const AudioReceiveStream::Config& config) override; + void DestroyAudioReceiveStream(AudioReceiveStream* receive_stream) override; + + VideoSendStream* CreateVideoSendStream( + VideoSendStream::Config config, + VideoEncoderConfig encoder_config) override; + VideoSendStream* CreateVideoSendStream( + VideoSendStream::Config config, + VideoEncoderConfig encoder_config, + std::unique_ptr fec_controller) override; + void DestroyVideoSendStream(VideoSendStream* send_stream) override; + + VideoReceiveStream* CreateVideoReceiveStream( + VideoReceiveStream::Config configuration) override; + void DestroyVideoReceiveStream(VideoReceiveStream* receive_stream) override; + + FlexfecReceiveStream* CreateFlexfecReceiveStream( + const FlexfecReceiveStream::Config& config) override; + void DestroyFlexfecReceiveStream( + FlexfecReceiveStream* receive_stream) override; + + PacketReceiver* Receiver() override; + + RtpTransportControllerSendInterface* GetTransportControllerSend() override; + + Stats GetStats() const override; + + void SetBitrateAllocationStrategy( + std::unique_ptr + bitrate_allocation_strategy) override; + + void SignalChannelNetworkState(MediaType media, NetworkState state) override; + + void OnTransportOverheadChanged(MediaType media, + int transport_overhead_per_packet) override; + + void OnSentPacket(const rtc::SentPacket& sent_packet) override; + + protected: + // Implements Transport. + bool SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options) override; + + bool SendRtcp(const uint8_t* packet, size_t length) override; + + // Implements PacketReceiver. + DeliveryStatus DeliverPacket(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + const PacketTime& packet_time) override; + + private: + Clock* const clock_; + const std::unique_ptr call_; + + const rtc::Optional send_config_; + const std::unique_ptr send_process_thread_; + std::unique_ptr send_pipe_; + size_t num_send_streams_; + + const rtc::Optional receive_config_; + std::unique_ptr receive_pipe_; +}; + +} // namespace webrtc + +#endif // CALL_DEGRADED_CALL_H_ diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc new file mode 100644 index 0000000000..7fc935df42 --- /dev/null +++ b/call/fake_network_pipe.cc @@ -0,0 +1,419 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include +#include +#include + +#include +#include +#include + +#include "call/call.h" +#include "call/fake_network_pipe.h" +#include "modules/rtp_rtcp/include/rtp_header_parser.h" +#include "rtc_base/logging.h" +#include "system_wrappers/include/clock.h" + +namespace webrtc { + +namespace { +constexpr int64_t kDefaultProcessIntervalMs = 5; +struct PacketArrivalTimeComparator { + bool operator()(const NetworkPacket& p1, const NetworkPacket& p2) { + return p1.arrival_time() < p2.arrival_time(); + } +}; +} // namespace + +NetworkPacket::NetworkPacket(rtc::CopyOnWriteBuffer packet, + int64_t send_time, + int64_t arrival_time, + rtc::Optional packet_options, + bool is_rtcp, + MediaType media_type, + rtc::Optional packet_time) + : packet_(std::move(packet)), + send_time_(send_time), + arrival_time_(arrival_time), + packet_options_(packet_options), + is_rtcp_(is_rtcp), + media_type_(media_type), + packet_time_(packet_time) {} + +NetworkPacket::NetworkPacket(NetworkPacket&& o) + : packet_(std::move(o.packet_)), + send_time_(o.send_time_), + arrival_time_(o.arrival_time_), + packet_options_(o.packet_options_), + is_rtcp_(o.is_rtcp_), + media_type_(o.media_type_), + packet_time_(o.packet_time_) {} + +NetworkPacket& NetworkPacket::operator=(NetworkPacket&& o) { + packet_ = std::move(o.packet_); + send_time_ = o.send_time_; + arrival_time_ = o.arrival_time_; + packet_options_ = o.packet_options_; + is_rtcp_ = o.is_rtcp_; + media_type_ = o.media_type_; + packet_time_ = o.packet_time_; + + return *this; +} + +DemuxerImpl::DemuxerImpl(const std::map& payload_type_map) + : packet_receiver_(nullptr), payload_type_map_(payload_type_map) {} + +void DemuxerImpl::SetReceiver(PacketReceiver* receiver) { + packet_receiver_ = receiver; +} + +void DemuxerImpl::DeliverPacket(const NetworkPacket* packet, + const PacketTime& packet_time) { + // No packet receiver means that this demuxer will terminate the flow of + // packets. + if (!packet_receiver_) + return; + const uint8_t* const packet_data = packet->data(); + const size_t packet_length = packet->data_length(); + MediaType media_type = MediaType::ANY; + if (!RtpHeaderParser::IsRtcp(packet_data, packet_length)) { + RTC_CHECK_GE(packet_length, 2); + const uint8_t payload_type = packet_data[1] & 0x7f; + std::map::const_iterator it = + payload_type_map_.find(payload_type); + RTC_CHECK(it != payload_type_map_.end()) + << "payload type " << static_cast(payload_type) << " unknown."; + media_type = it->second; + } + packet_receiver_->DeliverPacket( + media_type, rtc::CopyOnWriteBuffer(packet_data, packet_length), + packet_time); +} + +FakeNetworkPipe::FakeNetworkPipe(Clock* clock, + const FakeNetworkPipe::Config& config) + : FakeNetworkPipe(clock, config, nullptr, 1) {} + +FakeNetworkPipe::FakeNetworkPipe(Clock* clock, + const FakeNetworkPipe::Config& config, + std::unique_ptr demuxer) + : FakeNetworkPipe(clock, config, std::move(demuxer), 1) {} + +FakeNetworkPipe::FakeNetworkPipe(Clock* clock, + const FakeNetworkPipe::Config& config, + std::unique_ptr demuxer, + uint64_t seed) + : clock_(clock), + demuxer_(std::move(demuxer)), + receiver_(nullptr), + transport_(nullptr), + random_(seed), + config_(), + dropped_packets_(0), + sent_packets_(0), + total_packet_delay_(0), + bursting_(false), + next_process_time_(clock_->TimeInMilliseconds()), + last_log_time_(clock_->TimeInMilliseconds()) { + SetConfig(config); +} + +FakeNetworkPipe::FakeNetworkPipe(Clock* clock, + const FakeNetworkPipe::Config& config, + Transport* transport) + : clock_(clock), + receiver_(nullptr), + transport_(transport), + random_(1), + config_(), + dropped_packets_(0), + sent_packets_(0), + total_packet_delay_(0), + bursting_(false), + next_process_time_(clock_->TimeInMilliseconds()), + last_log_time_(clock_->TimeInMilliseconds()) { + SetConfig(config); +} + +FakeNetworkPipe::~FakeNetworkPipe() = default; + +void FakeNetworkPipe::SetReceiver(PacketReceiver* receiver) { + rtc::CritScope crit(&config_lock_); + if (demuxer_) + demuxer_->SetReceiver(receiver); + receiver_ = receiver; +} + +bool FakeNetworkPipe::SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options) { + RTC_DCHECK(HasTransport()); + EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), options, false, + MediaType::ANY, rtc::nullopt); + return true; +} + +bool FakeNetworkPipe::SendRtcp(const uint8_t* packet, size_t length) { + RTC_DCHECK(HasTransport()); + EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), rtc::nullopt, true, + MediaType::ANY, rtc::nullopt); + return true; +} + +PacketReceiver::DeliveryStatus FakeNetworkPipe::DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + const PacketTime& packet_time) { + return EnqueuePacket(std::move(packet), rtc::nullopt, false, media_type, + packet_time) + ? PacketReceiver::DELIVERY_OK + : PacketReceiver::DELIVERY_PACKET_ERROR; +} + +void FakeNetworkPipe::SetConfig(const FakeNetworkPipe::Config& config) { + rtc::CritScope crit(&config_lock_); + config_ = config; // Shallow copy of the struct. + double prob_loss = config.loss_percent / 100.0; + if (config_.avg_burst_loss_length == -1) { + // Uniform loss + prob_loss_bursting_ = prob_loss; + prob_start_bursting_ = prob_loss; + } else { + // Lose packets according to a gilbert-elliot model. + int avg_burst_loss_length = config.avg_burst_loss_length; + int min_avg_burst_loss_length = std::ceil(prob_loss / (1 - prob_loss)); + + RTC_CHECK_GT(avg_burst_loss_length, min_avg_burst_loss_length) + << "For a total packet loss of " << config.loss_percent << "%% then" + << " avg_burst_loss_length must be " << min_avg_burst_loss_length + 1 + << " or higher."; + + prob_loss_bursting_ = (1.0 - 1.0 / avg_burst_loss_length); + prob_start_bursting_ = prob_loss / (1 - prob_loss) / avg_burst_loss_length; + } +} + +void FakeNetworkPipe::SendPacket(const uint8_t* data, size_t data_length) { + RTC_DCHECK(HasDemuxer()); + EnqueuePacket(rtc::CopyOnWriteBuffer(data, data_length), rtc::nullopt, false, + MediaType::ANY, rtc::nullopt); +} + +bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet, + rtc::Optional options, + bool is_rtcp, + MediaType media_type, + rtc::Optional packet_time) { + Config config; + { + rtc::CritScope crit(&config_lock_); + config = config_; + } + rtc::CritScope crit(&process_lock_); + if (config.queue_length_packets > 0 && + capacity_link_.size() >= config.queue_length_packets) { + // Too many packet on the link, drop this one. + ++dropped_packets_; + return false; + } + + int64_t time_now = clock_->TimeInMilliseconds(); + + // Delay introduced by the link capacity. + int64_t capacity_delay_ms = 0; + if (config.link_capacity_kbps > 0) { + const int bytes_per_millisecond = config.link_capacity_kbps / 8; + // To round to the closest millisecond we add half a milliseconds worth of + // bytes to the delay calculation. + capacity_delay_ms = (packet.size() + capacity_delay_error_bytes_ + + bytes_per_millisecond / 2) / + bytes_per_millisecond; + capacity_delay_error_bytes_ += + packet.size() - capacity_delay_ms * bytes_per_millisecond; + } + int64_t network_start_time = time_now; + + // Check if there already are packets on the link and change network start + // time forward if there is. + if (!capacity_link_.empty() && + network_start_time < capacity_link_.back().arrival_time()) + network_start_time = capacity_link_.back().arrival_time(); + + int64_t arrival_time = network_start_time + capacity_delay_ms; + capacity_link_.emplace(std::move(packet), time_now, arrival_time, options, + is_rtcp, media_type, packet_time); + return true; +} + +float FakeNetworkPipe::PercentageLoss() { + rtc::CritScope crit(&process_lock_); + if (sent_packets_ == 0) + return 0; + + return static_cast(dropped_packets_) / + (sent_packets_ + dropped_packets_); +} + +int FakeNetworkPipe::AverageDelay() { + rtc::CritScope crit(&process_lock_); + if (sent_packets_ == 0) + return 0; + + return static_cast(total_packet_delay_ / + static_cast(sent_packets_)); +} + +size_t FakeNetworkPipe::DroppedPackets() { + rtc::CritScope crit(&process_lock_); + return dropped_packets_; +} + +size_t FakeNetworkPipe::SentPackets() { + rtc::CritScope crit(&process_lock_); + return sent_packets_; +} + +void FakeNetworkPipe::Process() { + int64_t time_now = clock_->TimeInMilliseconds(); + std::queue packets_to_deliver; + Config config; + double prob_loss_bursting; + double prob_start_bursting; + { + rtc::CritScope crit(&config_lock_); + config = config_; + prob_loss_bursting = prob_loss_bursting_; + prob_start_bursting = prob_start_bursting_; + } + { + rtc::CritScope crit(&process_lock_); + if (time_now - last_log_time_ > 5000) { + int64_t queueing_delay_ms = 0; + if (!capacity_link_.empty()) { + queueing_delay_ms = time_now - capacity_link_.front().send_time(); + } + RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_ms << " ms."; + last_log_time_ = time_now; + } + + // Check the capacity link first. + if (!capacity_link_.empty()) { + int64_t last_arrival_time = + delay_link_.empty() ? -1 : delay_link_.back().arrival_time(); + bool needs_sort = false; + while (!capacity_link_.empty() && + time_now >= capacity_link_.front().arrival_time()) { + // Time to get this packet. + NetworkPacket packet = std::move(capacity_link_.front()); + capacity_link_.pop(); + + // Drop packets at an average rate of |config_.loss_percent| with + // and average loss burst length of |config_.avg_burst_loss_length|. + if ((bursting_ && random_.Rand() < prob_loss_bursting) || + (!bursting_ && random_.Rand() < prob_start_bursting)) { + bursting_ = true; + continue; + } else { + bursting_ = false; + } + + int arrival_time_jitter = random_.Gaussian( + config.queue_delay_ms, config.delay_standard_deviation_ms); + + // If reordering is not allowed then adjust arrival_time_jitter + // to make sure all packets are sent in order. + if (!config.allow_reordering && !delay_link_.empty() && + packet.arrival_time() + arrival_time_jitter < last_arrival_time) { + arrival_time_jitter = last_arrival_time - packet.arrival_time(); + } + packet.IncrementArrivalTime(arrival_time_jitter); + if (packet.arrival_time() >= last_arrival_time) { + last_arrival_time = packet.arrival_time(); + } else { + needs_sort = true; + } + delay_link_.emplace_back(std::move(packet)); + } + + if (needs_sort) { + // Packet(s) arrived out of order, make sure list is sorted. + std::sort(delay_link_.begin(), delay_link_.end(), + PacketArrivalTimeComparator()); + } + } + + // Check the extra delay queue. + while (!delay_link_.empty() && + time_now >= delay_link_.front().arrival_time()) { + // Deliver this packet. + NetworkPacket packet(std::move(delay_link_.front())); + delay_link_.pop_front(); + // |time_now| might be later than when the packet should have arrived, due + // to NetworkProcess being called too late. For stats, use the time it + // should have been on the link. + total_packet_delay_ += packet.arrival_time() - packet.send_time(); + packets_to_deliver.push(std::move(packet)); + } + sent_packets_ += packets_to_deliver.size(); + } + + rtc::CritScope crit(&config_lock_); + while (!packets_to_deliver.empty()) { + NetworkPacket packet = std::move(packets_to_deliver.front()); + packets_to_deliver.pop(); + DeliverPacket(&packet); + } + + next_process_time_ = !delay_link_.empty() + ? delay_link_.begin()->arrival_time() + : time_now + kDefaultProcessIntervalMs; +} + +void FakeNetworkPipe::DeliverPacket(NetworkPacket* packet) { + if (demuxer_) { + demuxer_->DeliverPacket(packet, PacketTime()); + } else if (transport_) { + if (packet->is_rtcp()) { + transport_->SendRtcp(packet->data(), packet->data_length()); + } else { + transport_->SendRtp(packet->data(), packet->data_length(), + packet->packet_options()); + } + } else if (receiver_) { + PacketTime packet_time = packet->packet_time(); + if (packet_time.timestamp != -1) { + int64_t queue_time = packet->arrival_time() - packet->send_time(); + RTC_CHECK(queue_time >= 0); + packet_time.timestamp += (queue_time * 1000); + } + receiver_->DeliverPacket(packet->media_type(), + std::move(*packet->raw_packet()), packet_time); + } +} + +int64_t FakeNetworkPipe::TimeUntilNextProcess() { + rtc::CritScope crit(&process_lock_); + return std::max(next_process_time_ - clock_->TimeInMilliseconds(), + 0); +} + +bool FakeNetworkPipe::HasTransport() const { + rtc::CritScope crit(&config_lock_); + return transport_ != nullptr; +} + +bool FakeNetworkPipe::HasDemuxer() const { + rtc::CritScope crit(&config_lock_); + return demuxer_ != nullptr; +} + +} // namespace webrtc diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h new file mode 100644 index 0000000000..b4cf70b760 --- /dev/null +++ b/call/fake_network_pipe.h @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef CALL_FAKE_NETWORK_PIPE_H_ +#define CALL_FAKE_NETWORK_PIPE_H_ + +#include +#include +#include +#include +#include + +#include "api/call/transport.h" +#include "call/call.h" +#include "common_types.h" // NOLINT(build/include) +#include "modules/include/module.h" +#include "rtc_base/constructormagic.h" +#include "rtc_base/criticalsection.h" +#include "rtc_base/random.h" +#include "rtc_base/thread_annotations.h" +#include "typedefs.h" // NOLINT(build/include) + +namespace webrtc { + +class Clock; +class PacketReceiver; +enum class MediaType; + +class NetworkPacket { + public: + NetworkPacket(rtc::CopyOnWriteBuffer packet, + int64_t send_time, + int64_t arrival_time, + rtc::Optional packet_options, + bool is_rtcp, + MediaType media_type_, + rtc::Optional packet_time_); + // Disallow copy constructor (no deep copies of |data_|). + NetworkPacket(const NetworkPacket&) = delete; + // Allow move constructor/assignment, so that we can use in stl containers. + NetworkPacket(NetworkPacket&&); + NetworkPacket& operator=(NetworkPacket&&); + + const uint8_t* data() const { return packet_.data(); } + size_t data_length() const { return packet_.size(); } + rtc::CopyOnWriteBuffer* raw_packet() { return &packet_; } + int64_t send_time() const { return send_time_; } + int64_t arrival_time() const { return arrival_time_; } + void IncrementArrivalTime(int64_t extra_delay) { + arrival_time_ += extra_delay; + } + PacketOptions packet_options() const { + return packet_options_.value_or(PacketOptions()); + } + bool is_rtcp() const { return is_rtcp_; } + MediaType media_type() const { return media_type_; } + PacketTime packet_time() const { return packet_time_.value_or(PacketTime()); } + + private: + rtc::CopyOnWriteBuffer packet_; + // The time the packet was sent out on the network. + int64_t send_time_; + // The time the packet should arrive at the receiver. + int64_t arrival_time_; + // If using a Transport for outgoing degradation, populate with + // PacketOptions (transport-wide sequence number) for RTP. + rtc::Optional packet_options_; + bool is_rtcp_; + // If using a PacketReceiver for incoming degradation, populate with + // appropriate MediaType and PacketTime. This type/timing will be kept and + // forwarded. The PacketTime might be altered to reflect time spent in fake + // network pipe. + MediaType media_type_; + rtc::Optional packet_time_; +}; + +class Demuxer { + public: + virtual ~Demuxer() = default; + virtual void SetReceiver(PacketReceiver* receiver) = 0; + virtual void DeliverPacket(const NetworkPacket* packet, + const PacketTime& packet_time) = 0; +}; + +// This class doesn't have any internal thread safety, so caller must make sure +// SetReceiver and and DeliverPacket aren't called in a racy manner. +class DemuxerImpl final : public Demuxer { + public: + explicit DemuxerImpl(const std::map& payload_type_map); + + void SetReceiver(PacketReceiver* receiver) override; + void DeliverPacket(const NetworkPacket* packet, + const PacketTime& packet_time) override; + + private: + PacketReceiver* packet_receiver_; + const std::map payload_type_map_; + RTC_DISALLOW_COPY_AND_ASSIGN(DemuxerImpl); +}; + +// Class faking a network link. This is a simple and naive solution just faking +// capacity and adding an extra transport delay in addition to the capacity +// introduced delay. + +class FakeNetworkPipe : public Transport, public PacketReceiver, public Module { + public: + struct Config { + Config() {} + // Queue length in number of packets. + size_t queue_length_packets = 0; + // Delay in addition to capacity induced delay. + int queue_delay_ms = 0; + // Standard deviation of the extra delay. + int delay_standard_deviation_ms = 0; + // Link capacity in kbps. + int link_capacity_kbps = 0; + // Random packet loss. + int loss_percent = 0; + // If packets are allowed to be reordered. + bool allow_reordering = false; + // The average length of a burst of lost packets. + int avg_burst_loss_length = -1; + }; + + // Use this constructor if you plan to insert packets using DeliverPacket(). + FakeNetworkPipe(Clock* clock, const FakeNetworkPipe::Config& config); + + // Use these constructors if you plan to insert packets using SendPacket(). + FakeNetworkPipe(Clock* clock, + const FakeNetworkPipe::Config& config, + std::unique_ptr demuxer); + FakeNetworkPipe(Clock* clock, + const FakeNetworkPipe::Config& config, + std::unique_ptr demuxer, + uint64_t seed); + + // Use this constructor if you plan to insert packets using SendRt[c?]p(). + FakeNetworkPipe(Clock* clock, + const FakeNetworkPipe::Config& config, + Transport* transport); + + virtual ~FakeNetworkPipe(); + + // Sets a new configuration. This won't affect packets already in the pipe. + void SetConfig(const FakeNetworkPipe::Config& config); + + // Sends a new packet to the link. When/if packets are delivered, they will + // be passed to the receiver instance given in SetReceiver(). This method + // should only be used if a Demuxer was provided in the constructor. + virtual void SendPacket(const uint8_t* packet, size_t packet_length); + + // Must not be called in parallel with SendPacket or Process. + void SetReceiver(PacketReceiver* receiver); + + // Implements Transport interface. When/if packets are delivered, they will + // be passed to the transport instance given in SetReceiverTransport(). These + // methods should only be called if a Transport instance was provided in the + // constructor. + bool SendRtp(const uint8_t* packet, + size_t length, + const PacketOptions& options) override; + bool SendRtcp(const uint8_t* packet, size_t length) override; + + // Implements the PacketReceiver interface. When/if packets are delivered, + // they will be passed directly to the receiver instance given in + // SetReceiver(), without passing through a Demuxer. The receive time in + // PacketTime will be increased by the amount of time the packet spent in the + // fake network pipe. + PacketReceiver::DeliveryStatus DeliverPacket( + MediaType media_type, + rtc::CopyOnWriteBuffer packet, + const PacketTime& packet_time) override; + + // Processes the network queues and trigger PacketReceiver::IncomingPacket for + // packets ready to be delivered. + void Process() override; + int64_t TimeUntilNextProcess() override; + + // Get statistics. + float PercentageLoss(); + int AverageDelay(); + size_t DroppedPackets(); + size_t SentPackets(); + + private: + // Returns true if enqueued, or false if packet was dropped. + bool EnqueuePacket(rtc::CopyOnWriteBuffer packet, + rtc::Optional options, + bool is_rtcp, + MediaType media_type, + rtc::Optional packet_time); + void DeliverPacket(NetworkPacket* packet) + RTC_EXCLUSIVE_LOCKS_REQUIRED(config_lock_); + bool HasTransport() const; + bool HasDemuxer() const; + + Clock* const clock_; + // |config_lock| guards the mostly constant things like the callbacks. + rtc::CriticalSection config_lock_; + const std::unique_ptr demuxer_ RTC_GUARDED_BY(config_lock_); + PacketReceiver* receiver_ RTC_GUARDED_BY(config_lock_); + Transport* const transport_ RTC_GUARDED_BY(config_lock_); + + // |process_lock| guards the data structures involved in delay and loss + // processes, such as the packet queues. + rtc::CriticalSection process_lock_; + std::queue capacity_link_ RTC_GUARDED_BY(process_lock_); + Random random_; + + std::deque delay_link_; + + // Link configuration. + Config config_ RTC_GUARDED_BY(config_lock_); + + // Statistics. + size_t dropped_packets_ RTC_GUARDED_BY(process_lock_); + size_t sent_packets_ RTC_GUARDED_BY(process_lock_); + int64_t total_packet_delay_ RTC_GUARDED_BY(process_lock_); + + // Are we currently dropping a burst of packets? + bool bursting_; + + // The probability to drop the packet if we are currently dropping a + // burst of packet + double prob_loss_bursting_ RTC_GUARDED_BY(config_lock_); + + // The probability to drop a burst of packets. + double prob_start_bursting_ RTC_GUARDED_BY(config_lock_); + + int64_t next_process_time_; + + int64_t last_log_time_; + + int64_t capacity_delay_error_bytes_ = 0; + + RTC_DISALLOW_COPY_AND_ASSIGN(FakeNetworkPipe); +}; + +} // namespace webrtc + +#endif // CALL_FAKE_NETWORK_PIPE_H_ diff --git a/test/fake_network_pipe_unittest.cc b/call/test/fake_network_pipe_unittest.cc similarity index 93% rename from test/fake_network_pipe_unittest.cc rename to call/test/fake_network_pipe_unittest.cc index a2cf80f27c..e63b7ed762 100644 --- a/test/fake_network_pipe_unittest.cc +++ b/call/test/fake_network_pipe_unittest.cc @@ -8,12 +8,13 @@ * be found in the AUTHORS file in the root of the source tree. */ +#include "call/fake_network_pipe.h" + #include #include "call/call.h" #include "modules/rtp_rtcp/include/rtp_header_parser.h" #include "system_wrappers/include/clock.h" -#include "test/fake_network_pipe.h" #include "test/gmock.h" #include "test/gtest.h" @@ -95,8 +96,8 @@ TEST_F(FakeNetworkPipeTest, CapacityTest) { SendPackets(pipe.get(), kNumPackets, kPacketSize); // Time to get one packet through the link. - const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps, - kPacketSize); + const int kPacketTimeMs = + PacketTimeMs(config.link_capacity_kbps, kPacketSize); // Time haven't increased yet, so we souldn't get any packets. EXPECT_CALL(*demuxer, DeliverPacket(_, _)).Times(0); @@ -133,8 +134,8 @@ TEST_F(FakeNetworkPipeTest, ExtraDelayTest) { SendPackets(pipe.get(), kNumPackets, kPacketSize); // Time to get one packet through the link. - const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps, - kPacketSize); + const int kPacketTimeMs = + PacketTimeMs(config.link_capacity_kbps, kPacketSize); // Increase more than kPacketTimeMs, but not more than the extra delay. fake_clock_.AdvanceTimeMilliseconds(kPacketTimeMs); @@ -163,8 +164,8 @@ TEST_F(FakeNetworkPipeTest, QueueLengthTest) { &fake_clock_, config, std::unique_ptr(demuxer))); const int kPacketSize = 1000; - const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps, - kPacketSize); + const int kPacketTimeMs = + PacketTimeMs(config.link_capacity_kbps, kPacketSize); // Send three packets and verify only 2 are delivered. SendPackets(pipe.get(), 3, kPacketSize); @@ -187,8 +188,8 @@ TEST_F(FakeNetworkPipeTest, StatisticsTest) { &fake_clock_, config, std::unique_ptr(demuxer))); const int kPacketSize = 1000; - const int kPacketTimeMs = PacketTimeMs(config.link_capacity_kbps, - kPacketSize); + const int kPacketTimeMs = + PacketTimeMs(config.link_capacity_kbps, kPacketSize); // Send three packets and verify only 2 are delivered. SendPackets(pipe.get(), 3, kPacketSize); @@ -201,9 +202,9 @@ TEST_F(FakeNetworkPipeTest, StatisticsTest) { // Packet 1: kPacketTimeMs + config.queue_delay_ms, // packet 2: 2 * kPacketTimeMs + config.queue_delay_ms => 170 ms average. EXPECT_EQ(pipe->AverageDelay(), 170); - EXPECT_EQ(pipe->sent_packets(), 2u); - EXPECT_EQ(pipe->dropped_packets(), 1u); - EXPECT_EQ(pipe->PercentageLoss(), 1/3.f); + EXPECT_EQ(pipe->SentPackets(), 2u); + EXPECT_EQ(pipe->DroppedPackets(), 1u); + EXPECT_EQ(pipe->PercentageLoss(), 1 / 3.f); } // Change the link capacity half-way through the test and verify that the @@ -259,7 +260,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithEmptyPipeTest) { } // Check that all the packets were sent. - EXPECT_EQ(static_cast(2 * kNumPackets), pipe->sent_packets()); + EXPECT_EQ(static_cast(2 * kNumPackets), pipe->SentPackets()); fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); EXPECT_CALL(*demuxer, DeliverPacket(_, _)).Times(0); pipe->Process(); @@ -313,7 +314,7 @@ TEST_F(FakeNetworkPipeTest, ChangingCapacityWithPacketsInPipeTest) { } // Check that all the packets were sent. - EXPECT_EQ(static_cast(2 * kNumPackets), pipe->sent_packets()); + EXPECT_EQ(static_cast(2 * kNumPackets), pipe->SentPackets()); fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess()); EXPECT_CALL(*demuxer, DeliverPacket(_, _)).Times(0); pipe->Process(); @@ -427,17 +428,18 @@ TEST(DemuxerImplTest, Demuxing) { MockReceiver mock_receiver; demuxer.SetReceiver(&mock_receiver); - std::vector data(kPacketSize); + rtc::CopyOnWriteBuffer data(kPacketSize); data[1] = kVideoPayloadType; std::unique_ptr packet( - new NetworkPacket(&data[0], kPacketSize, kTimeNow, kArrivalTime)); + new NetworkPacket(data, kTimeNow, kArrivalTime, rtc::nullopt, false, + MediaType::ANY, rtc::nullopt)); EXPECT_CALL(mock_receiver, DeliverPacket(MediaType::VIDEO, _, _)) .WillOnce(Return(PacketReceiver::DELIVERY_OK)); demuxer.DeliverPacket(packet.get(), PacketTime()); data[1] = kAudioPayloadType; - packet.reset( - new NetworkPacket(&data[0], kPacketSize, kTimeNow, kArrivalTime)); + packet.reset(new NetworkPacket(data, kTimeNow, kArrivalTime, rtc::nullopt, + false, MediaType::ANY, rtc::nullopt)); EXPECT_CALL(mock_receiver, DeliverPacket(MediaType::AUDIO, _, _)) .WillOnce(Return(PacketReceiver::DELIVERY_OK)); demuxer.DeliverPacket(packet.get(), PacketTime()); diff --git a/test/BUILD.gn b/test/BUILD.gn index 275ca65a36..6f0605bb3a 100644 --- a/test/BUILD.gn +++ b/test/BUILD.gn @@ -336,7 +336,6 @@ if (rtc_include_tests) { "../system_wrappers", ] sources = [ - "fake_network_pipe_unittest.cc", "frame_generator_unittest.cc", "rtp_file_reader_unittest.cc", "rtp_file_writer_unittest.cc", @@ -490,8 +489,6 @@ rtc_source_set("direct_transport") { sources = [ "direct_transport.cc", "direct_transport.h", - "fake_network_pipe.cc", - "fake_network_pipe.h", ] if (!build_with_chromium && is_clang) { # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). @@ -501,7 +498,6 @@ rtc_source_set("direct_transport") { "..:webrtc_common", "../:typedefs", "../api:transport_api", - "../call", "../call:call_interfaces", "../modules/rtp_rtcp", "../rtc_base:rtc_base_approved", @@ -510,6 +506,7 @@ rtc_source_set("direct_transport") { ] public_deps = [ ":single_threaded_task_queue", + "../call:fake_network", ] } diff --git a/test/direct_transport.h b/test/direct_transport.h index 85cd892ec2..2a59096889 100644 --- a/test/direct_transport.h +++ b/test/direct_transport.h @@ -16,9 +16,9 @@ #include "api/call/transport.h" #include "call/call.h" +#include "call/fake_network_pipe.h" #include "rtc_base/sequenced_task_checker.h" #include "rtc_base/thread_annotations.h" -#include "test/fake_network_pipe.h" #include "test/single_threaded_task_queue.h" namespace webrtc { diff --git a/test/fake_network_pipe.cc b/test/fake_network_pipe.cc deleted file mode 100644 index 1e209c7fb4..0000000000 --- a/test/fake_network_pipe.cc +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "test/fake_network_pipe.h" - -#include -#include -#include - -#include -#include - -#include "call/call.h" -#include "modules/rtp_rtcp/include/rtp_header_parser.h" -#include "rtc_base/logging.h" -#include "system_wrappers/include/clock.h" - -namespace webrtc { - -namespace { -constexpr int64_t kDefaultProcessIntervalMs = 5; -} - -DemuxerImpl::DemuxerImpl(const std::map& payload_type_map) - : packet_receiver_(nullptr), payload_type_map_(payload_type_map) {} - -void DemuxerImpl::SetReceiver(PacketReceiver* receiver) { - packet_receiver_ = receiver; -} - -void DemuxerImpl::DeliverPacket(const NetworkPacket* packet, - const PacketTime& packet_time) { - // No packet receiver means that this demuxer will terminate the flow of - // packets. - if (!packet_receiver_) - return; - const uint8_t* const packet_data = packet->data(); - const size_t packet_length = packet->data_length(); - MediaType media_type = MediaType::ANY; - if (!RtpHeaderParser::IsRtcp(packet_data, packet_length)) { - RTC_CHECK_GE(packet_length, 2); - const uint8_t payload_type = packet_data[1] & 0x7f; - std::map::const_iterator it = - payload_type_map_.find(payload_type); - RTC_CHECK(it != payload_type_map_.end()) - << "payload type " << static_cast(payload_type) << " unknown."; - media_type = it->second; - } - packet_receiver_->DeliverPacket( - media_type, rtc::CopyOnWriteBuffer(packet_data, packet_length), - packet_time); -} - -FakeNetworkPipe::FakeNetworkPipe(Clock* clock, - const FakeNetworkPipe::Config& config, - std::unique_ptr demuxer) - : FakeNetworkPipe(clock, config, std::move(demuxer), 1) {} - -FakeNetworkPipe::FakeNetworkPipe(Clock* clock, - const FakeNetworkPipe::Config& config, - std::unique_ptr demuxer, - uint64_t seed) - : clock_(clock), - demuxer_(std::move(demuxer)), - random_(seed), - config_(), - dropped_packets_(0), - sent_packets_(0), - total_packet_delay_(0), - bursting_(false), - next_process_time_(clock_->TimeInMilliseconds()), - last_log_time_(clock_->TimeInMilliseconds()) { - SetConfig(config); -} - -FakeNetworkPipe::~FakeNetworkPipe() { - while (!capacity_link_.empty()) { - delete capacity_link_.front(); - capacity_link_.pop(); - } - while (!delay_link_.empty()) { - delete *delay_link_.begin(); - delay_link_.erase(delay_link_.begin()); - } -} - -void FakeNetworkPipe::SetReceiver(PacketReceiver* receiver) { - RTC_CHECK(demuxer_); - demuxer_->SetReceiver(receiver); -} - -void FakeNetworkPipe::SetConfig(const FakeNetworkPipe::Config& config) { - rtc::CritScope crit(&lock_); - config_ = config; // Shallow copy of the struct. - double prob_loss = config.loss_percent / 100.0; - if (config_.avg_burst_loss_length == -1) { - // Uniform loss - prob_loss_bursting_ = prob_loss; - prob_start_bursting_ = prob_loss; - } else { - // Lose packets according to a gilbert-elliot model. - int avg_burst_loss_length = config.avg_burst_loss_length; - int min_avg_burst_loss_length = std::ceil(prob_loss / (1 - prob_loss)); - - RTC_CHECK_GT(avg_burst_loss_length, min_avg_burst_loss_length) - << "For a total packet loss of " << config.loss_percent << "%% then" - << " avg_burst_loss_length must be " << min_avg_burst_loss_length + 1 - << " or higher."; - - prob_loss_bursting_ = (1.0 - 1.0 / avg_burst_loss_length); - prob_start_bursting_ = prob_loss / (1 - prob_loss) / avg_burst_loss_length; - } -} - -void FakeNetworkPipe::SendPacket(const uint8_t* data, size_t data_length) { - RTC_CHECK(demuxer_); - rtc::CritScope crit(&lock_); - if (config_.queue_length_packets > 0 && - capacity_link_.size() >= config_.queue_length_packets) { - // Too many packet on the link, drop this one. - ++dropped_packets_; - return; - } - - int64_t time_now = clock_->TimeInMilliseconds(); - - // Delay introduced by the link capacity. - int64_t capacity_delay_ms = 0; - if (config_.link_capacity_kbps > 0) { - const int bytes_per_millisecond = config_.link_capacity_kbps / 8; - // To round to the closest millisecond we add half a milliseconds worth of - // bytes to the delay calculation. - capacity_delay_ms = (data_length + capacity_delay_error_bytes_ + - bytes_per_millisecond / 2) / - bytes_per_millisecond; - capacity_delay_error_bytes_ += - data_length - capacity_delay_ms * bytes_per_millisecond; - } - int64_t network_start_time = time_now; - - // Check if there already are packets on the link and change network start - // time forward if there is. - if (!capacity_link_.empty() && - network_start_time < capacity_link_.back()->arrival_time()) - network_start_time = capacity_link_.back()->arrival_time(); - - int64_t arrival_time = network_start_time + capacity_delay_ms; - NetworkPacket* packet = new NetworkPacket(data, data_length, time_now, - arrival_time); - capacity_link_.push(packet); -} - -float FakeNetworkPipe::PercentageLoss() { - rtc::CritScope crit(&lock_); - if (sent_packets_ == 0) - return 0; - - return static_cast(dropped_packets_) / - (sent_packets_ + dropped_packets_); -} - -int FakeNetworkPipe::AverageDelay() { - rtc::CritScope crit(&lock_); - if (sent_packets_ == 0) - return 0; - - return static_cast(total_packet_delay_ / - static_cast(sent_packets_)); -} - -void FakeNetworkPipe::Process() { - int64_t time_now = clock_->TimeInMilliseconds(); - std::queue packets_to_deliver; - { - rtc::CritScope crit(&lock_); - if (time_now - last_log_time_ > 5000) { - int64_t queueing_delay_ms = 0; - if (!capacity_link_.empty()) { - queueing_delay_ms = time_now - capacity_link_.front()->send_time(); - } - RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_ms << " ms."; - last_log_time_ = time_now; - } - // Check the capacity link first. - while (!capacity_link_.empty() && - time_now >= capacity_link_.front()->arrival_time()) { - // Time to get this packet. - NetworkPacket* packet = capacity_link_.front(); - capacity_link_.pop(); - - // Drop packets at an average rate of |config_.loss_percent| with - // and average loss burst length of |config_.avg_burst_loss_length|. - if ((bursting_ && random_.Rand() < prob_loss_bursting_) || - (!bursting_ && random_.Rand() < prob_start_bursting_)) { - bursting_ = true; - delete packet; - continue; - } else { - bursting_ = false; - } - - int arrival_time_jitter = random_.Gaussian( - config_.queue_delay_ms, config_.delay_standard_deviation_ms); - - // If reordering is not allowed then adjust arrival_time_jitter - // to make sure all packets are sent in order. - if (!config_.allow_reordering && !delay_link_.empty() && - packet->arrival_time() + arrival_time_jitter < - (*delay_link_.rbegin())->arrival_time()) { - arrival_time_jitter = - (*delay_link_.rbegin())->arrival_time() - packet->arrival_time(); - } - packet->IncrementArrivalTime(arrival_time_jitter); - delay_link_.insert(packet); - } - - // Check the extra delay queue. - while (!delay_link_.empty() && - time_now >= (*delay_link_.begin())->arrival_time()) { - // Deliver this packet. - NetworkPacket* packet = *delay_link_.begin(); - packets_to_deliver.push(packet); - delay_link_.erase(delay_link_.begin()); - // |time_now| might be later than when the packet should have arrived, due - // to NetworkProcess being called too late. For stats, use the time it - // should have been on the link. - total_packet_delay_ += packet->arrival_time() - packet->send_time(); - } - sent_packets_ += packets_to_deliver.size(); - } - while (!packets_to_deliver.empty()) { - NetworkPacket* packet = packets_to_deliver.front(); - packets_to_deliver.pop(); - demuxer_->DeliverPacket(packet, PacketTime()); - delete packet; - } - - next_process_time_ = !delay_link_.empty() - ? (*delay_link_.begin())->arrival_time() - : time_now + kDefaultProcessIntervalMs; -} - -int64_t FakeNetworkPipe::TimeUntilNextProcess() const { - rtc::CritScope crit(&lock_); - return std::max(next_process_time_ - clock_->TimeInMilliseconds(), - 0); -} - -} // namespace webrtc diff --git a/test/fake_network_pipe.h b/test/fake_network_pipe.h deleted file mode 100644 index 1cfaa382f4..0000000000 --- a/test/fake_network_pipe.h +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef TEST_FAKE_NETWORK_PIPE_H_ -#define TEST_FAKE_NETWORK_PIPE_H_ - -#include -#include -#include -#include -#include - -#include "common_types.h" // NOLINT(build/include) -#include "rtc_base/constructormagic.h" -#include "rtc_base/criticalsection.h" -#include "rtc_base/random.h" -#include "typedefs.h" // NOLINT(build/include) - -namespace webrtc { - -class Clock; -class PacketReceiver; -enum class MediaType; - -class NetworkPacket { - public: - NetworkPacket(const uint8_t* data, - size_t length, - int64_t send_time, - int64_t arrival_time) - : data_(new uint8_t[length]), - data_length_(length), - send_time_(send_time), - arrival_time_(arrival_time) { - memcpy(data_.get(), data, length); - } - - uint8_t* data() const { return data_.get(); } - size_t data_length() const { return data_length_; } - int64_t send_time() const { return send_time_; } - int64_t arrival_time() const { return arrival_time_; } - void IncrementArrivalTime(int64_t extra_delay) { - arrival_time_ += extra_delay; - } - - private: - // The packet data. - std::unique_ptr data_; - // Length of data_. - size_t data_length_; - // The time the packet was sent out on the network. - const int64_t send_time_; - // The time the packet should arrive at the receiver. - int64_t arrival_time_; -}; - -class Demuxer { - public: - virtual ~Demuxer() = default; - virtual void SetReceiver(PacketReceiver* receiver) = 0; - virtual void DeliverPacket(const NetworkPacket* packet, - const PacketTime& packet_time) = 0; -}; - -class DemuxerImpl final : public Demuxer { - public: - explicit DemuxerImpl(const std::map& payload_type_map); - - void SetReceiver(PacketReceiver* receiver) override; - void DeliverPacket(const NetworkPacket* packet, - const PacketTime& packet_time) override; - - private: - PacketReceiver* packet_receiver_; - const std::map payload_type_map_; - RTC_DISALLOW_COPY_AND_ASSIGN(DemuxerImpl); -}; - -// Class faking a network link. This is a simple and naive solution just faking -// capacity and adding an extra transport delay in addition to the capacity -// introduced delay. - -class FakeNetworkPipe { - public: - struct Config { - Config() {} - // Queue length in number of packets. - size_t queue_length_packets = 0; - // Delay in addition to capacity induced delay. - int queue_delay_ms = 0; - // Standard deviation of the extra delay. - int delay_standard_deviation_ms = 0; - // Link capacity in kbps. - int link_capacity_kbps = 0; - // Random packet loss. - int loss_percent = 0; - // If packets are allowed to be reordered. - bool allow_reordering = false; - // The average length of a burst of lost packets. - int avg_burst_loss_length = -1; - }; - - FakeNetworkPipe(Clock* clock, - const FakeNetworkPipe::Config& config, - std::unique_ptr demuxer); - FakeNetworkPipe(Clock* clock, - const FakeNetworkPipe::Config& config, - std::unique_ptr demuxer, - uint64_t seed); - virtual ~FakeNetworkPipe(); - - // Sets a new configuration. This won't affect packets already in the pipe. - void SetConfig(const FakeNetworkPipe::Config& config); - - // Sends a new packet to the link. - virtual void SendPacket(const uint8_t* packet, size_t packet_length); - - // Must not be called in parallel with SendPacket or Process. - void SetReceiver(PacketReceiver* receiver); - - // Processes the network queues and trigger PacketReceiver::IncomingPacket for - // packets ready to be delivered. - virtual void Process(); - int64_t TimeUntilNextProcess() const; - - // Get statistics. - float PercentageLoss(); - int AverageDelay(); - size_t dropped_packets() { return dropped_packets_; } - size_t sent_packets() { return sent_packets_; } - - protected: - Clock* const clock_; - rtc::CriticalSection lock_; - const std::unique_ptr demuxer_; - std::queue capacity_link_; - Random random_; - - // Since we need to access both the packet with the earliest and latest - // arrival time we need to use a multiset to keep all packets sorted, - // hence, we cannot use a priority queue. - struct PacketArrivalTimeComparator { - bool operator()(const NetworkPacket* p1, const NetworkPacket* p2) { - return p1->arrival_time() < p2->arrival_time(); - } - }; - std::multiset delay_link_; - - // Link configuration. - Config config_; - - // Statistics. - size_t dropped_packets_; - size_t sent_packets_; - int64_t total_packet_delay_; - - // Are we currently dropping a burst of packets? - bool bursting_; - - // The probability to drop the packet if we are currently dropping a - // burst of packet - double prob_loss_bursting_; - - // The probability to drop a burst of packets. - double prob_start_bursting_; - - int64_t next_process_time_; - - int64_t last_log_time_; - - int64_t capacity_delay_error_bytes_ = 0; - - RTC_DISALLOW_COPY_AND_ASSIGN(FakeNetworkPipe); -}; - -} // namespace webrtc - -#endif // TEST_FAKE_NETWORK_PIPE_H_ diff --git a/test/layer_filtering_transport.h b/test/layer_filtering_transport.h index 40d73dfc49..b63dc29fab 100644 --- a/test/layer_filtering_transport.h +++ b/test/layer_filtering_transport.h @@ -10,13 +10,13 @@ #ifndef TEST_LAYER_FILTERING_TRANSPORT_H_ #define TEST_LAYER_FILTERING_TRANSPORT_H_ -#include "call/call.h" -#include "test/direct_transport.h" -#include "test/fake_network_pipe.h" -#include "test/single_threaded_task_queue.h" - #include +#include "call/call.h" +#include "call/fake_network_pipe.h" +#include "test/direct_transport.h" +#include "test/single_threaded_task_queue.h" + namespace webrtc { namespace test {