From 425d6aaa4ce256b263e307322195bd113c55d59f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Mon, 29 Jul 2019 16:38:27 +0200 Subject: [PATCH] Add RtpPacketPacer interface for pacer control MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The PacedSender is being reworked and will need an interface so we can inject different implementations of it. This CL introduces a new RtpPacketPacer interface inside the pacing module. This interface handles the details of _how_ packets should be paced, such as pacing rates/account for audio/max queue length etc. The RtpPacketSender interface exposed from the rtp_rtcp module handles only the actual sending of packets. Some minor cleanups are included here. Bug: webrtc:10809 Change-Id: I150b1a6262306d99e3f9d5f0b4afdb16a50e5ad8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/145212 Reviewed-by: Sebastian Jansson Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/master@{#28699} --- call/BUILD.gn | 1 + call/call.cc | 10 +- call/rtp_transport_controller_send.cc | 61 ++++--- call/rtp_transport_controller_send.h | 5 +- .../rtp_transport_controller_send_interface.h | 3 +- .../test/mock_rtp_transport_controller_send.h | 2 +- modules/pacing/BUILD.gn | 5 + modules/pacing/mock/mock_paced_sender.h | 4 +- modules/pacing/paced_sender.cc | 82 +++++---- modules/pacing/paced_sender.h | 97 ++++++---- modules/pacing/paced_sender_unittest.cc | 165 ++++++++++-------- modules/pacing/rtp_packet_pacer.h | 73 ++++++++ 12 files changed, 326 insertions(+), 182 deletions(-) create mode 100644 modules/pacing/rtp_packet_pacer.h diff --git a/call/BUILD.gn b/call/BUILD.gn index 8c2f455642..c6bbae868b 100644 --- a/call/BUILD.gn +++ b/call/BUILD.gn @@ -77,6 +77,7 @@ rtc_source_set("rtp_interfaces") { "../api:libjingle_peerconnection_api", "../api:rtp_headers", "../api/transport:bitrate_settings", + "../api/units:timestamp", "../logging:rtc_event_log_api", "../modules/rtp_rtcp:rtp_rtcp_format", "../rtc_base:rtc_base_approved", diff --git a/call/call.cc b/call/call.cc index 07b29d507c..22c8a0c439 100644 --- a/call/call.cc +++ b/call/call.cc @@ -256,7 +256,7 @@ class Call final : public webrtc::Call, MediaType media_type) RTC_SHARED_LOCKS_REQUIRED(receive_crit_); - void UpdateSendHistograms(int64_t first_sent_packet_ms) + void UpdateSendHistograms(Timestamp first_sent_packet) RTC_EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_); void UpdateReceiveHistograms(); void UpdateHistograms(); @@ -503,8 +503,8 @@ Call::~Call() { call_stats_->DeregisterStatsObserver(&receive_side_cc_); } - absl::optional first_sent_packet_ms = - transport_send_->GetFirstPacketTimeMs(); + absl::optional first_sent_packet_ms = + transport_send_->GetFirstPacketTime(); // Only update histograms after process threads have been shut down, so that // they won't try to concurrently update stats. if (first_sent_packet_ms) { @@ -619,9 +619,9 @@ void Call::UpdateHistograms() { (clock_->TimeInMilliseconds() - start_ms_) / 1000); } -void Call::UpdateSendHistograms(int64_t first_sent_packet_ms) { +void Call::UpdateSendHistograms(Timestamp first_sent_packet) { int64_t elapsed_sec = - (clock_->TimeInMilliseconds() - first_sent_packet_ms) / 1000; + (clock_->TimeInMilliseconds() - first_sent_packet.ms()) / 1000; if (elapsed_sec < metrics::kMinRunTimeInSeconds) return; const int kMinRequiredPeriodicSamples = 5; diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index e08e0f143d..20c7627d80 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -93,7 +93,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( initial_config_.key_value_config = &trial_based_config_; RTC_DCHECK(bitrate_config.start_bitrate_bps > 0); - pacer_.SetPacingRates(bitrate_config.start_bitrate_bps, 0); + pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps), + DataRate::Zero()); process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE); process_thread_->Start(); @@ -149,6 +150,18 @@ void RtpTransportControllerSend::UpdateControlState() { observer_->OnTargetTransferRate(*update); } +RtpPacketPacer* RtpTransportControllerSend::pacer() { + // TODO(bugs.webrtc.org/10809): Return reference to the correct + // pacer implementation. + return &pacer_; +} + +const RtpPacketPacer* RtpTransportControllerSend::pacer() const { + // TODO(bugs.webrtc.org/10809): Return reference to the correct + // pacer implementation. + return &pacer_; +} + rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() { return &task_queue_; } @@ -168,6 +181,8 @@ RtpTransportControllerSend::transport_feedback_observer() { } RtpPacketSender* RtpTransportControllerSend::packet_sender() { + // TODO(bugs.webrtc.org/10809): Return reference to the correct + // pacer implementation. return &pacer_; } @@ -189,7 +204,7 @@ void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) { UpdateStreamsConfig(); } void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) { - pacer_.SetQueueTimeLimit(limit_ms); + pacer()->SetQueueTimeLimit(TimeDelta::ms(limit_ms)); } void RtpTransportControllerSend::RegisterPacketFeedbackObserver( PacketFeedbackObserver* observer) { @@ -266,7 +281,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( } else { UpdateInitialConstraints(msg.constraints); } - pacer_.UpdateOutstandingData(0); + pacer()->UpdateOutstandingData(DataSize::Zero()); }); } } @@ -282,11 +297,11 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) { return; network_available_ = msg.network_available; if (network_available_) { - pacer_.Resume(); + pacer()->Resume(); } else { - pacer_.Pause(); + pacer()->Pause(); } - pacer_.UpdateOutstandingData(0); + pacer()->UpdateOutstandingData(DataSize::Zero()); if (controller_) { control_handler_->SetNetworkAvailability(network_available_); @@ -305,10 +320,11 @@ RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() { return this; } int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const { - return pacer_.QueueInMs(); + return pacer()->OldestPacketWaitTime().ms(); } -int64_t RtpTransportControllerSend::GetFirstPacketTimeMs() const { - return pacer_.FirstSentPacketTimeMs(); +absl::optional RtpTransportControllerSend::GetFirstPacketTime() + const { + return pacer()->FirstSentPacketTime(); } void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { task_queue_.PostTask([this, enable]() { @@ -328,8 +344,8 @@ void RtpTransportControllerSend::OnSentPacket( PostUpdates(controller_->OnSentPacket(*packet_msg)); }); } - pacer_.UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData().bytes()); + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); } void RtpTransportControllerSend::OnReceivedPacket( @@ -400,7 +416,7 @@ void RtpTransportControllerSend::OnTransportOverheadChanged( void RtpTransportControllerSend::AccountForAudioPacketsInPacedSender( bool account_for_audio) { - pacer_.SetAccountForAudioPackets(account_for_audio); + pacer()->SetAccountForAudioPackets(account_for_audio); } void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) { @@ -457,8 +473,8 @@ void RtpTransportControllerSend::OnTransportFeedback( PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); }); } - pacer_.UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData().bytes()); + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); } void RtpTransportControllerSend::OnRemoteNetworkEstimate( @@ -510,8 +526,7 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() { pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart( task_queue_.Get(), kPacerQueueUpdateInterval, [this]() { RTC_DCHECK_RUN_ON(&task_queue_); - TimeDelta expected_queue_time = - TimeDelta::ms(pacer_.ExpectedQueueTimeMs()); + TimeDelta expected_queue_time = pacer()->ExpectedQueueTime(); control_handler_->SetPacerQueue(expected_queue_time); UpdateControlState(); return kPacerQueueUpdateInterval; @@ -533,7 +548,7 @@ void RtpTransportControllerSend::UpdateControllerWithTimeInterval() { ProcessInterval msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); if (add_pacing_to_cwin_) - msg.pacer_queue = DataSize::bytes(pacer_.QueueSizeBytes()); + msg.pacer_queue = pacer()->QueueSizeData(); PostUpdates(controller_->OnProcessInterval(msg)); } @@ -545,18 +560,14 @@ void RtpTransportControllerSend::UpdateStreamsConfig() { void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) { if (update.congestion_window) { - if (update.congestion_window->IsFinite()) - pacer_.SetCongestionWindow(update.congestion_window->bytes()); - else - pacer_.SetCongestionWindow(PacedSender::kNoCongestionWindow); + pacer()->SetCongestionWindow(*update.congestion_window); } if (update.pacer_config) { - pacer_.SetPacingRates(update.pacer_config->data_rate().bps(), - update.pacer_config->pad_rate().bps()); + pacer()->SetPacingRates(update.pacer_config->data_rate(), + update.pacer_config->pad_rate()); } for (const auto& probe : update.probe_cluster_configs) { - int64_t bitrate_bps = probe.target_data_rate.bps(); - pacer_.CreateProbeCluster(bitrate_bps, probe.id); + pacer()->CreateProbeCluster(probe.target_data_rate, probe.id); } if (update.target_rate) { control_handler_->SetTargetRate(*update.target_rate); diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 3a844ea3a6..bbf3e238e5 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -25,6 +25,7 @@ #include "modules/congestion_controller/rtp/control_handler.h" #include "modules/congestion_controller/rtp/transport_feedback_adapter.h" #include "modules/pacing/packet_router.h" +#include "modules/pacing/rtp_packet_pacer.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/network_route.h" @@ -95,7 +96,7 @@ class RtpTransportControllerSend final void OnNetworkAvailability(bool network_available) override; RtcpBandwidthObserver* GetBandwidthObserver() override; int64_t GetPacerQueuingDelayMs() const override; - int64_t GetFirstPacketTimeMs() const override; + absl::optional GetFirstPacketTime() const override; void EnablePeriodicAlrProbing(bool enable) override; void OnSentPacket(const rtc::SentPacket& sent_packet) override; void OnReceivedPacket(const ReceivedPacket& packet_msg) override; @@ -135,6 +136,8 @@ class RtpTransportControllerSend final RTC_RUN_ON(task_queue_); void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_); void UpdateControlState() RTC_RUN_ON(task_queue_); + RtpPacketPacer* pacer(); + const RtpPacketPacer* pacer() const; Clock* const clock_; RtcEventLog* const event_log_; diff --git a/call/rtp_transport_controller_send_interface.h b/call/rtp_transport_controller_send_interface.h index 39358d57f1..d8b6982606 100644 --- a/call/rtp_transport_controller_send_interface.h +++ b/call/rtp_transport_controller_send_interface.h @@ -23,6 +23,7 @@ #include "api/crypto/crypto_options.h" #include "api/fec_controller.h" #include "api/transport/bitrate_settings.h" +#include "api/units/timestamp.h" #include "call/rtp_config.h" #include "logging/rtc_event_log/rtc_event_log.h" #include "modules/rtp_rtcp/include/report_block_data.h" @@ -148,7 +149,7 @@ class RtpTransportControllerSendInterface { virtual void OnNetworkAvailability(bool network_available) = 0; virtual RtcpBandwidthObserver* GetBandwidthObserver() = 0; virtual int64_t GetPacerQueuingDelayMs() const = 0; - virtual int64_t GetFirstPacketTimeMs() const = 0; + virtual absl::optional GetFirstPacketTime() const = 0; virtual void EnablePeriodicAlrProbing(bool enable) = 0; virtual void OnSentPacket(const rtc::SentPacket& sent_packet) = 0; virtual void OnReceivedPacket(const ReceivedPacket& received_packet) = 0; diff --git a/call/test/mock_rtp_transport_controller_send.h b/call/test/mock_rtp_transport_controller_send.h index 74041b30a1..eb3ad5cbf8 100644 --- a/call/test/mock_rtp_transport_controller_send.h +++ b/call/test/mock_rtp_transport_controller_send.h @@ -61,7 +61,7 @@ class MockRtpTransportControllerSend MOCK_METHOD1(OnNetworkAvailability, void(bool)); MOCK_METHOD0(GetBandwidthObserver, RtcpBandwidthObserver*()); MOCK_CONST_METHOD0(GetPacerQueuingDelayMs, int64_t()); - MOCK_CONST_METHOD0(GetFirstPacketTimeMs, int64_t()); + MOCK_CONST_METHOD0(GetFirstPacketTime, absl::optional()); MOCK_METHOD1(EnablePeriodicAlrProbing, void(bool)); MOCK_METHOD1(OnSentPacket, void(const rtc::SentPacket&)); MOCK_METHOD1(SetSdpBitrateParameters, void(const BitrateConstraints&)); diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index f93d400faf..2846524619 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -23,6 +23,7 @@ rtc_static_library("pacing") { "packet_router.h", "round_robin_packet_queue.cc", "round_robin_packet_queue.h", + "rtp_packet_pacer.h", ] deps = [ @@ -32,6 +33,10 @@ rtc_static_library("pacing") { "../../api/transport:field_trial_based_config", "../../api/transport:network_control", "../../api/transport:webrtc_key_value_config", + "../../api/units:data_rate", + "../../api/units:data_size", + "../../api/units:time_delta", + "../../api/units:timestamp", "../../logging:rtc_event_bwe", "../../logging:rtc_event_log_api", "../../logging:rtc_event_pacing", diff --git a/modules/pacing/mock/mock_paced_sender.h b/modules/pacing/mock/mock_paced_sender.h index 992c21d578..34ef24afb9 100644 --- a/modules/pacing/mock/mock_paced_sender.h +++ b/modules/pacing/mock/mock_paced_sender.h @@ -30,9 +30,9 @@ class MockPacedSender : public PacedSender { int64_t capture_time_ms, size_t bytes, bool retransmission)); - MOCK_METHOD2(CreateProbeCluster, void(int, int)); + MOCK_METHOD2(CreateProbeCluster, void(DataRate, int)); MOCK_METHOD1(SetEstimatedBitrate, void(uint32_t)); - MOCK_METHOD2(SetPacingRates, void(uint32_t, uint32_t)); + MOCK_METHOD2(SetPacingRates, void(DataRate, DataRate)); MOCK_CONST_METHOD0(QueueInMs, int64_t()); MOCK_CONST_METHOD0(QueueInPackets, int()); MOCK_CONST_METHOD0(ExpectedQueueTimeMs, int64_t()); diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 18334e2a4a..609c9b4ec2 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -21,6 +21,7 @@ #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/time_utils.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -92,12 +93,14 @@ PacedSender::PacedSender(Clock* clock, padding_budget_(0), prober_(*field_trials_), probing_send_failure_(false), - pacing_bitrate_kbps_(0), + pacing_bitrate_(DataRate::Zero()), time_last_process_us_(clock->TimeInMicroseconds()), last_send_time_us_(clock->TimeInMicroseconds()), - first_sent_packet_ms_(-1), packets_(clock->TimeInMicroseconds()), packet_counter_(0), + congestion_window_size_(DataSize::PlusInfinity()), + outstanding_data_(DataSize::Zero()), + process_thread_(nullptr), queue_time_limit(kMaxQueueLengthMs), account_for_audio_(false), legacy_packet_referencing_( @@ -113,9 +116,9 @@ PacedSender::PacedSender(Clock* clock, PacedSender::~PacedSender() {} -void PacedSender::CreateProbeCluster(int bitrate_bps, int cluster_id) { +void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) { rtc::CritScope cs(&critsect_); - prober_.CreateProbeCluster(bitrate_bps, TimeMilliseconds(), cluster_id); + prober_.CreateProbeCluster(bitrate.bps(), TimeMilliseconds(), cluster_id); } void PacedSender::Pause() { @@ -148,20 +151,21 @@ void PacedSender::Resume() { process_thread_->WakeUp(this); } -void PacedSender::SetCongestionWindow(int64_t congestion_window_bytes) { +void PacedSender::SetCongestionWindow(DataSize congestion_window_size) { rtc::CritScope cs(&critsect_); - congestion_window_bytes_ = congestion_window_bytes; + congestion_window_size_ = congestion_window_size; } -void PacedSender::UpdateOutstandingData(int64_t outstanding_bytes) { +void PacedSender::UpdateOutstandingData(DataSize outstanding_data) { rtc::CritScope cs(&critsect_); - outstanding_bytes_ = outstanding_bytes; + outstanding_data_ = outstanding_data; } bool PacedSender::Congested() const { - if (congestion_window_bytes_ == kNoCongestionWindow) - return false; - return outstanding_bytes_ >= congestion_window_bytes_; + if (congestion_window_size_.IsFinite()) { + return outstanding_data_ >= congestion_window_size_; + } + return false; } int64_t PacedSender::TimeMilliseconds() const { @@ -183,16 +187,15 @@ void PacedSender::SetProbingEnabled(bool enabled) { prober_.SetEnabled(enabled); } -void PacedSender::SetPacingRates(uint32_t pacing_rate_bps, - uint32_t padding_rate_bps) { +void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) { rtc::CritScope cs(&critsect_); - RTC_DCHECK(pacing_rate_bps > 0); - pacing_bitrate_kbps_ = pacing_rate_bps / 1000; - padding_budget_.set_target_rate_kbps(padding_rate_bps / 1000); + RTC_DCHECK_GT(pacing_rate, DataRate::Zero()); + pacing_bitrate_ = pacing_rate; + padding_budget_.set_target_rate_kbps(padding_rate.kbps()); RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" - << pacing_bitrate_kbps_ - << " padding_budget_kbps=" << padding_rate_bps / 1000; + << pacing_bitrate_.kbps() + << " padding_budget_kbps=" << padding_rate.kbps(); } void PacedSender::InsertPacket(RtpPacketSender::Priority priority, @@ -202,7 +205,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, size_t bytes, bool retransmission) { rtc::CritScope cs(&critsect_); - RTC_DCHECK(pacing_bitrate_kbps_ > 0) + RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) << "SetPacingRate must be called before InsertPacket."; int64_t now_ms = TimeMilliseconds(); @@ -229,7 +232,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, void PacedSender::EnqueuePacket(std::unique_ptr packet) { rtc::CritScope cs(&critsect_); - RTC_DCHECK(pacing_bitrate_kbps_ > 0) + RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) << "SetPacingRate must be called before InsertPacket."; int64_t now_ms = TimeMilliseconds(); @@ -249,11 +252,12 @@ void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { account_for_audio_ = account_for_audio; } -int64_t PacedSender::ExpectedQueueTimeMs() const { +TimeDelta PacedSender::ExpectedQueueTime() const { rtc::CritScope cs(&critsect_); - RTC_DCHECK_GT(pacing_bitrate_kbps_, 0); - return static_cast(packets_.SizeInBytes() * 8 / - pacing_bitrate_kbps_); + RTC_DCHECK_GT(pacing_bitrate_, DataRate::Zero()); + return TimeDelta::ms( + (QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) / + pacing_bitrate_.bps()); } size_t PacedSender::QueueSizePackets() const { @@ -261,24 +265,25 @@ size_t PacedSender::QueueSizePackets() const { return packets_.SizeInPackets(); } -int64_t PacedSender::QueueSizeBytes() const { +DataSize PacedSender::QueueSizeData() const { rtc::CritScope cs(&critsect_); - return packets_.SizeInBytes(); + return DataSize::bytes(packets_.SizeInBytes()); } -int64_t PacedSender::FirstSentPacketTimeMs() const { +absl::optional PacedSender::FirstSentPacketTime() const { rtc::CritScope cs(&critsect_); - return first_sent_packet_ms_; + return first_sent_packet_time_; } -int64_t PacedSender::QueueInMs() const { +TimeDelta PacedSender::OldestPacketWaitTime() const { rtc::CritScope cs(&critsect_); int64_t oldest_packet = packets_.OldestEnqueueTimeMs(); - if (oldest_packet == 0) - return 0; + if (oldest_packet == 0) { + return TimeDelta::Zero(); + } - return TimeMilliseconds() - oldest_packet; + return TimeDelta::ms(TimeMilliseconds() - oldest_packet); } int64_t PacedSender::TimeUntilNextProcess() { @@ -356,7 +361,7 @@ void PacedSender::Process() { return; if (elapsed_time_ms > 0) { - int target_bitrate_kbps = pacing_bitrate_kbps_; + int target_bitrate_kbps = pacing_bitrate_.kbps(); size_t queue_size_bytes = packets_.SizeInBytes(); if (queue_size_bytes > 0) { // Assuming equal size packets and input/output rate, the average packet @@ -540,8 +545,9 @@ RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket( } void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) { - if (first_sent_packet_ms_ == -1) - first_sent_packet_ms_ = TimeMilliseconds(); + if (!first_sent_packet_time_) { + first_sent_packet_time_ = Timestamp::ms(TimeMilliseconds()); + } bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; if (!audio_packet || account_for_audio_) { // Update media bytes sent. @@ -566,14 +572,14 @@ void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { } void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) { - outstanding_bytes_ += bytes_sent; + outstanding_data_ += DataSize::bytes(bytes_sent); media_budget_.UseBudget(bytes_sent); padding_budget_.UseBudget(bytes_sent); } -void PacedSender::SetQueueTimeLimit(int limit_ms) { +void PacedSender::SetQueueTimeLimit(TimeDelta limit) { rtc::CritScope cs(&critsect_); - queue_time_limit = limit_ms; + queue_time_limit = limit.ms(); } } // namespace webrtc diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 85a49eca9c..817d87fbdf 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -27,6 +27,7 @@ #include "modules/pacing/interval_budget.h" #include "modules/pacing/packet_router.h" #include "modules/pacing/round_robin_packet_queue.h" +#include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/include/rtp_packet_sender.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "modules/utility/include/process_thread.h" @@ -38,10 +39,10 @@ namespace webrtc { class Clock; class RtcEventLog; -class PacedSender : public Module, public RtpPacketSender { +class PacedSender : public Module, + public RtpPacketPacer, + public RtpPacketSender { public: - static constexpr int64_t kNoCongestionWindow = -1; - // Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than // this value, the packet producers should wait (eg drop frames rather than // encoding them). Bitrate sent may temporarily exceed target set by @@ -61,24 +62,7 @@ class PacedSender : public Module, public RtpPacketSender { ~PacedSender() override; - virtual void CreateProbeCluster(int bitrate_bps, int cluster_id); - - // Temporarily pause all sending. - void Pause(); - - // Resume sending packets. - void Resume(); - - void SetCongestionWindow(int64_t congestion_window_bytes); - void UpdateOutstandingData(int64_t outstanding_bytes); - - // Enable bitrate probing. Enabled by default, mostly here to simplify - // testing. Must be called before any packets are being sent to have an - // effect. - void SetProbingEnabled(bool enabled); - - // Sets the pacing rates. Must be called once before packets can be sent. - void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps); + // Methods implementing RtpPacketSender. // Adds the packet information to the queue and calls TimeToSendPacket // when it's time to send. @@ -88,30 +72,71 @@ class PacedSender : public Module, public RtpPacketSender { int64_t capture_time_ms, size_t bytes, bool retransmission) override; - // Adds the packet to the queue and calls PacketRouter::SendPacket() when // it's time to send. void EnqueuePacket(std::unique_ptr packet) override; + // Methods implementing RtpPacketPacer: + + void CreateProbeCluster(DataRate bitrate, int cluster_id) override; + // TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone. + void CreateProbeCluster(int bitrate_bps, int cluster_id) { + CreateProbeCluster(DataRate::bps(bitrate_bps), cluster_id); + } + + // Temporarily pause all sending. + void Pause() override; + + // Resume sending packets. + void Resume() override; + + void SetCongestionWindow(DataSize congestion_window_size) override; + void UpdateOutstandingData(DataSize outstanding_data) override; + + // Sets the pacing rates. Must be called once before packets can be sent. + void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override; + // TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone. + void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps) { + SetPacingRates(DataRate::bps(pacing_rate_bps), + DataRate::bps(padding_rate_bps)); + } + // Currently audio traffic is not accounted by pacer and passed through. // With the introduction of audio BWE audio traffic will be accounted for // the pacer budget calculation. The audio traffic still will be injected // at high priority. - void SetAccountForAudioPackets(bool account_for_audio); + void SetAccountForAudioPackets(bool account_for_audio) override; // Returns the time since the oldest queued packet was enqueued. - virtual int64_t QueueInMs() const; + TimeDelta OldestPacketWaitTime() const override; + // TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone. + int64_t QueueInMs() const { return OldestPacketWaitTime().ms(); } - virtual size_t QueueSizePackets() const; - virtual int64_t QueueSizeBytes() const; + size_t QueueSizePackets() const override; + DataSize QueueSizeData() const override; - // Returns the time when the first packet was sent, or -1 if no packet is - // sent. - virtual int64_t FirstSentPacketTimeMs() const; + // Returns the time when the first packet was sent; + absl::optional FirstSentPacketTime() const override; // Returns the number of milliseconds it will take to send the current // packets in the queue, given the current size and bitrate, ignoring prio. - virtual int64_t ExpectedQueueTimeMs() const; + TimeDelta ExpectedQueueTime() const override; + + void SetQueueTimeLimit(TimeDelta limit) override; + // TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone. + void SetQueueTimeLimit(int limit_ms) { + SetQueueTimeLimit(TimeDelta::ms(limit_ms)); + } + + // Below are methods specific to this implementation, such as things related + // to module processing thread specifics or methods exposed for test. + + // Enable bitrate probing. Enabled by default, mostly here to simplify + // testing. Must be called before any packets are being sent to have an + // effect. + void SetProbingEnabled(bool enabled); + + // Methods implementing Module. // Returns the number of milliseconds until the module want a worker thread // to call Process. @@ -122,7 +147,6 @@ class PacedSender : public Module, public RtpPacketSender { // Called when the prober is associated with a process thread. void ProcessThreadAttached(ProcessThread* process_thread) override; - void SetQueueTimeLimit(int limit_ms); private: int64_t UpdateTimeAndGetElapsedMs(int64_t now_us) @@ -177,18 +201,17 @@ class PacedSender : public Module, public RtpPacketSender { BitrateProber prober_ RTC_GUARDED_BY(critsect_); bool probing_send_failure_ RTC_GUARDED_BY(critsect_); - uint32_t pacing_bitrate_kbps_ RTC_GUARDED_BY(critsect_); + DataRate pacing_bitrate_ RTC_GUARDED_BY(critsect_); int64_t time_last_process_us_ RTC_GUARDED_BY(critsect_); int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_); - int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_); + absl::optional first_sent_packet_time_ RTC_GUARDED_BY(critsect_); RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_); uint64_t packet_counter_ RTC_GUARDED_BY(critsect_); - int64_t congestion_window_bytes_ RTC_GUARDED_BY(critsect_) = - kNoCongestionWindow; - int64_t outstanding_bytes_ RTC_GUARDED_BY(critsect_) = 0; + DataSize congestion_window_size_ RTC_GUARDED_BY(critsect_); + DataSize outstanding_data_ RTC_GUARDED_BY(critsect_); // Lock to avoid race when attaching process thread. This can happen due to // the Call class setting network state on RtpTransportControllerSend, which @@ -196,7 +219,7 @@ class PacedSender : public Module, public RtpPacketSender { // pacer process thread. If RtpTransportControllerSend is running on a task // queue separate from the thread used by Call, this causes a race. rtc::CriticalSection process_thread_lock_; - ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr; + ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_); int64_t queue_time_limit RTC_GUARDED_BY(critsect_); bool account_for_audio_ RTC_GUARDED_BY(critsect_); diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index 01023969fb..c6c82fca02 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -270,13 +270,16 @@ class PacedSenderTest : public ::testing::TestWithParam { } void Init() { - send_bucket_->CreateProbeCluster(kFirstClusterBps, /*cluster_id=*/0); - send_bucket_->CreateProbeCluster(kSecondClusterBps, /*cluster_id=*/1); + send_bucket_->CreateProbeCluster(DataRate::bps(kFirstClusterBps), + /*cluster_id=*/0); + send_bucket_->CreateProbeCluster(DataRate::bps(kSecondClusterBps), + /*cluster_id=*/1); // Default to bitrate probing disabled for testing purposes. Probing tests // have to enable probing, either by creating a new PacedSender instance or // by calling SetProbingEnabled(true). send_bucket_->SetProbingEnabled(false); - send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, 0); + send_bucket_->SetPacingRates( + DataRate::bps(kTargetBitrateBps * kPaceMultiplier), DataRate::Zero()); clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); } @@ -381,7 +384,7 @@ class PacedSenderFieldTrialTest : public ::testing::TestWithParam { TEST_P(PacedSenderFieldTrialTest, DefaultNoPaddingInSilence) { PacedSender pacer(&clock_, &callback_, nullptr); - pacer.SetPacingRates(kTargetBitrateBps, 0); + pacer.SetPacingRates(DataRate::bps(kTargetBitrateBps), DataRate::Zero()); // Video packet to reset last send time and provide padding data. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -397,7 +400,7 @@ TEST_P(PacedSenderFieldTrialTest, PaddingInSilenceWithTrial) { ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) + "WebRTC-Pacer-PadInSilence/Enabled/"); PacedSender pacer(&clock_, &callback_, nullptr); - pacer.SetPacingRates(kTargetBitrateBps, 0); + pacer.SetPacingRates(DataRate::bps(kTargetBitrateBps), DataRate::Zero()); // Video packet to reset last send time and provide padding data. InsertPacket(&pacer, &video); if (GetParam() == PacerMode::kReferencePackets) { @@ -417,9 +420,9 @@ TEST_P(PacedSenderFieldTrialTest, PaddingInSilenceWithTrial) { TEST_P(PacedSenderFieldTrialTest, DefaultCongestionWindowAffectsAudio) { EXPECT_CALL(callback_, SendPadding).Times(0); PacedSender pacer(&clock_, &callback_, nullptr); - pacer.SetPacingRates(10000000, 0); - pacer.SetCongestionWindow(800); - pacer.UpdateOutstandingData(0); + pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero()); + pacer.SetCongestionWindow(DataSize::bytes(800)); + pacer.UpdateOutstandingData(DataSize::Zero()); // Video packet fills congestion window. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -431,7 +434,7 @@ TEST_P(PacedSenderFieldTrialTest, DefaultCongestionWindowAffectsAudio) { ProcessNext(&pacer); // Audio packet unblocked when congestion window clear. ::testing::Mock::VerifyAndClearExpectations(&callback_); - pacer.UpdateOutstandingData(0); + pacer.UpdateOutstandingData(DataSize::Zero()); EXPECT_CALL(callback_, SendPacket).Times(1); ProcessNext(&pacer); } @@ -441,9 +444,9 @@ TEST_P(PacedSenderFieldTrialTest, CongestionWindowDoesNotAffectAudioInTrial) { "WebRTC-Pacer-BlockAudio/Disabled/"); EXPECT_CALL(callback_, SendPadding).Times(0); PacedSender pacer(&clock_, &callback_, nullptr); - pacer.SetPacingRates(10000000, 0); - pacer.SetCongestionWindow(800); - pacer.UpdateOutstandingData(0); + pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero()); + pacer.SetCongestionWindow(DataSize::bytes(800)); + pacer.UpdateOutstandingData(DataSize::Zero()); // Video packet fills congestion window. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -456,8 +459,9 @@ TEST_P(PacedSenderFieldTrialTest, CongestionWindowDoesNotAffectAudioInTrial) { TEST_P(PacedSenderFieldTrialTest, DefaultBudgetAffectsAudio) { PacedSender pacer(&clock_, &callback_, nullptr); - pacer.SetPacingRates(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond, - 0); + pacer.SetPacingRates( + DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond), + DataRate::Zero()); // Video fills budget for following process periods. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -479,8 +483,9 @@ TEST_P(PacedSenderFieldTrialTest, BudgetDoesNotAffectAudioInTrial) { "WebRTC-Pacer-BlockAudio/Disabled/"); EXPECT_CALL(callback_, SendPadding).Times(0); PacedSender pacer(&clock_, &callback_, nullptr); - pacer.SetPacingRates(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond, - 0); + pacer.SetPacingRates( + DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond), + DataRate::Zero()); // Video fills budget for following process periods. InsertPacket(&pacer, &video); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -504,7 +509,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) { const int64_t kStartMs = clock_.TimeInMilliseconds(); // No packet sent. - EXPECT_EQ(-1, send_bucket_->FirstSentPacketTimeMs()); + EXPECT_FALSE(send_bucket_->FirstSentPacketTime().has_value()); for (size_t i = 0; i < kPacketToSend; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, kSsrc, sequence_number++, @@ -512,7 +517,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) { send_bucket_->Process(); clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); } - EXPECT_EQ(kStartMs, send_bucket_->FirstSentPacketTimeMs()); + EXPECT_EQ(Timestamp::ms(kStartMs), send_bucket_->FirstSentPacketTime()); } TEST_P(PacedSenderTest, QueuePacket) { @@ -644,8 +649,9 @@ TEST_P(PacedSenderTest, Padding) { uint32_t ssrc = 12345; uint16_t sequence_number = 1234; - send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, - kTargetBitrateBps); + send_bucket_->SetPacingRates( + DataRate::bps(kTargetBitrateBps) * kPaceMultiplier, + DataRate::bps(kTargetBitrateBps)); // Due to the multiplicative factor we can send 5 packets during a send // interval. (network capacity * multiplier / (8 bits per byte * @@ -680,8 +686,9 @@ TEST_P(PacedSenderTest, Padding) { } TEST_P(PacedSenderTest, NoPaddingBeforeNormalPacket) { - send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, - kTargetBitrateBps); + send_bucket_->SetPacingRates( + DataRate::bps(kTargetBitrateBps) * kPaceMultiplier, + DataRate::bps(kTargetBitrateBps)); EXPECT_CALL(callback_, SendPadding).Times(0); send_bucket_->Process(); @@ -707,8 +714,9 @@ TEST_P(PacedSenderTest, VerifyPaddingUpToBitrate) { int64_t capture_time_ms = 56789; const int kTimeStep = 5; const int64_t kBitrateWindow = 100; - send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, - kTargetBitrateBps); + send_bucket_->SetPacingRates( + DataRate::bps(kTargetBitrateBps) * kPaceMultiplier, + DataRate::bps(kTargetBitrateBps)); int64_t start_time = clock_.TimeInMilliseconds(); while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { @@ -730,8 +738,9 @@ TEST_P(PacedSenderTest, VerifyAverageBitrateVaryingMediaPayload) { PacedSenderPadding callback; send_bucket_.reset(new PacedSender(&clock_, &callback, nullptr)); send_bucket_->SetProbingEnabled(false); - send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, - kTargetBitrateBps); + send_bucket_->SetPacingRates( + DataRate::bps(kTargetBitrateBps) * kPaceMultiplier, + DataRate::bps(kTargetBitrateBps)); int64_t start_time = clock_.TimeInMilliseconds(); size_t media_bytes = 0; @@ -895,8 +904,8 @@ TEST_P(PacedSenderTest, SendsOnlyPaddingWhenCongested) { int kPacketSize = 250; int kCongestionWindow = kPacketSize * 10; - send_bucket_->UpdateOutstandingData(0); - send_bucket_->SetCongestionWindow(kCongestionWindow); + send_bucket_->UpdateOutstandingData(DataSize::Zero()); + send_bucket_->SetCongestionWindow(DataSize::bytes(kCongestionWindow)); int sent_data = 0; while (sent_data < kCongestionWindow) { sent_data += kPacketSize; @@ -935,10 +944,11 @@ TEST_P(PacedSenderTest, DoesNotAllowOveruseAfterCongestion) { EXPECT_CALL(callback_, SendPadding).Times(0); // The pacing rate is low enough that the budget should not allow two packets // to be sent in a row. - send_bucket_->SetPacingRates(400 * 8 * 1000 / 5, 0); + send_bucket_->SetPacingRates(DataRate::bps(400 * 8 * 1000 / 5), + DataRate::Zero()); // The congestion window is small enough to only let one packet through. - send_bucket_->SetCongestionWindow(800); - send_bucket_->UpdateOutstandingData(0); + send_bucket_->SetCongestionWindow(DataSize::bytes(800)); + send_bucket_->UpdateOutstandingData(DataSize::Zero()); // Not yet budget limited or congested, packet is sent. Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(1); @@ -954,13 +964,13 @@ TEST_P(PacedSenderTest, DoesNotAllowOveruseAfterCongestion) { EXPECT_CALL(callback_, SendPacket).Times(0); clock_.AdvanceTimeMilliseconds(5); send_bucket_->Process(); - send_bucket_->UpdateOutstandingData(0); + send_bucket_->UpdateOutstandingData(DataSize::Zero()); // Congestion removed and budget has recovered, packet is sent. Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(1); clock_.AdvanceTimeMilliseconds(5); send_bucket_->Process(); - send_bucket_->UpdateOutstandingData(0); + send_bucket_->UpdateOutstandingData(DataSize::Zero()); // Should be blocked due to budget limitation as congestion has be removed. Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size); EXPECT_CALL(callback_, SendPacket).Times(0); @@ -976,8 +986,8 @@ TEST_P(PacedSenderTest, ResumesSendingWhenCongestionEnds) { int64_t kCongestionWindow = kPacketSize * kCongestionCount; int64_t kCongestionTimeMs = 1000; - send_bucket_->UpdateOutstandingData(0); - send_bucket_->SetCongestionWindow(kCongestionWindow); + send_bucket_->UpdateOutstandingData(DataSize::Zero()); + send_bucket_->SetCongestionWindow(DataSize::bytes(kCongestionWindow)); int sent_data = 0; while (sent_data < kCongestionWindow) { sent_data += kPacketSize; @@ -1002,8 +1012,8 @@ TEST_P(PacedSenderTest, ResumesSendingWhenCongestionEnds) { // as many are sent int ack_count = kCongestionCount / 2; EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count); - send_bucket_->UpdateOutstandingData(kCongestionWindow - - kPacketSize * ack_count); + send_bucket_->UpdateOutstandingData( + DataSize::bytes(kCongestionWindow - kPacketSize * ack_count)); for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { clock_.AdvanceTimeMilliseconds(5); @@ -1017,7 +1027,7 @@ TEST_P(PacedSenderTest, ResumesSendingWhenCongestionEnds) { EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)) .Times(unacked_packets); for (int duration = 0; duration < kCongestionTimeMs; duration += 5) { - send_bucket_->UpdateOutstandingData(0); + send_bucket_->UpdateOutstandingData(DataSize::Zero()); clock_.AdvanceTimeMilliseconds(5); send_bucket_->Process(); } @@ -1030,7 +1040,7 @@ TEST_P(PacedSenderTest, Pause) { uint16_t sequence_number = 1234; int64_t capture_time_ms = clock_.TimeInMilliseconds(); - EXPECT_EQ(0, send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime()); // Due to the multiplicative factor we can send 5 packets during a send // interval. (network capacity * multiplier / (8 bits per byte * @@ -1066,8 +1076,8 @@ TEST_P(PacedSenderTest, Pause) { } // Expect everything to be queued. - EXPECT_EQ(second_capture_time_ms - capture_time_ms, - send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::ms(second_capture_time_ms - capture_time_ms), + send_bucket_->OldestPacketWaitTime()); EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1)); @@ -1133,7 +1143,7 @@ TEST_P(PacedSenderTest, Pause) { clock_.AdvanceTimeMilliseconds(5); } - EXPECT_EQ(0, send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime()); } TEST_P(PacedSenderTest, ResendPacket) { @@ -1151,7 +1161,7 @@ TEST_P(PacedSenderTest, ResendPacket) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; int64_t capture_time_ms = clock_.TimeInMilliseconds(); - EXPECT_EQ(0, send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime()); send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, sequence_number, capture_time_ms, 250, false); @@ -1160,8 +1170,8 @@ TEST_P(PacedSenderTest, ResendPacket) { sequence_number + 1, capture_time_ms + 1, 250, false); clock_.AdvanceTimeMilliseconds(9999); - EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms, - send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms), + send_bucket_->OldestPacketWaitTime()); // Fails to send first packet so only one call. EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms, false, _)) @@ -1171,8 +1181,8 @@ TEST_P(PacedSenderTest, ResendPacket) { send_bucket_->Process(); // Queue remains unchanged. - EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms, - send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms), + send_bucket_->OldestPacketWaitTime()); // Fails to send second packet. EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms, @@ -1185,8 +1195,8 @@ TEST_P(PacedSenderTest, ResendPacket) { send_bucket_->Process(); // Queue is reduced by 1 packet. - EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms - 1, - send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms - 1), + send_bucket_->OldestPacketWaitTime()); // Send second packet and queue becomes empty. EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number + 1, @@ -1194,7 +1204,7 @@ TEST_P(PacedSenderTest, ResendPacket) { .WillOnce(Return(RtpPacketSendResult::kSuccess)); clock_.AdvanceTimeMilliseconds(10000); send_bucket_->Process(); - EXPECT_EQ(0, send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime()); } TEST_P(PacedSenderTest, ExpectedQueueTimeMs) { @@ -1203,18 +1213,19 @@ TEST_P(PacedSenderTest, ExpectedQueueTimeMs) { const size_t kNumPackets = 60; const size_t kPacketSize = 1200; const int32_t kMaxBitrate = kPaceMultiplier * 30000; - EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime()); - send_bucket_->SetPacingRates(30000 * kPaceMultiplier, 0); + send_bucket_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier), + DataRate::Zero()); for (size_t i = 0; i < kNumPackets; ++i) { SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), kPacketSize); } // Queue in ms = 1000 * (bytes in queue) *8 / (bits per second) - int64_t queue_in_ms = - static_cast(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate); - EXPECT_EQ(queue_in_ms, send_bucket_->ExpectedQueueTimeMs()); + TimeDelta queue_time = + TimeDelta::ms(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate); + EXPECT_EQ(queue_time, send_bucket_->ExpectedQueueTime()); int64_t time_start = clock_.TimeInMilliseconds(); while (send_bucket_->QueueSizePackets() > 0) { @@ -1227,7 +1238,7 @@ TEST_P(PacedSenderTest, ExpectedQueueTimeMs) { } int64_t duration = clock_.TimeInMilliseconds() - time_start; - EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->ExpectedQueueTime()); // Allow for aliasing, duration should be within one pack of max time limit. EXPECT_NEAR(duration, PacedSender::kMaxQueueLengthMs, @@ -1237,16 +1248,17 @@ TEST_P(PacedSenderTest, ExpectedQueueTimeMs) { TEST_P(PacedSenderTest, QueueTimeGrowsOverTime) { uint32_t ssrc = 12346; uint16_t sequence_number = 1234; - EXPECT_EQ(0, send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime()); - send_bucket_->SetPacingRates(30000 * kPaceMultiplier, 0); + send_bucket_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier), + DataRate::Zero()); SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, clock_.TimeInMilliseconds(), 1200); clock_.AdvanceTimeMilliseconds(500); - EXPECT_EQ(500, send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::ms(500), send_bucket_->OldestPacketWaitTime()); send_bucket_->Process(); - EXPECT_EQ(0, send_bucket_->QueueInMs()); + EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime()); } TEST_P(PacedSenderTest, ProbingWithInsertedPackets) { @@ -1257,9 +1269,12 @@ TEST_P(PacedSenderTest, ProbingWithInsertedPackets) { PacedSenderProbing packet_sender; send_bucket_.reset(new PacedSender(&clock_, &packet_sender, nullptr)); - send_bucket_->CreateProbeCluster(kFirstClusterBps, /*cluster_id=*/0); - send_bucket_->CreateProbeCluster(kSecondClusterBps, /*cluster_id=*/1); - send_bucket_->SetPacingRates(kInitialBitrateBps * kPaceMultiplier, 0); + send_bucket_->CreateProbeCluster(DataRate::bps(kFirstClusterBps), + /*cluster_id=*/0); + send_bucket_->CreateProbeCluster(DataRate::bps(kSecondClusterBps), + /*cluster_id=*/1); + send_bucket_->SetPacingRates( + DataRate::bps(kInitialBitrateBps * kPaceMultiplier), DataRate::Zero()); for (int i = 0; i < 10; ++i) { Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, @@ -1302,8 +1317,10 @@ TEST_P(PacedSenderTest, ProbingWithPaddingSupport) { PacedSenderProbing packet_sender; send_bucket_.reset(new PacedSender(&clock_, &packet_sender, nullptr)); - send_bucket_->CreateProbeCluster(kFirstClusterBps, /*cluster_id=*/0); - send_bucket_->SetPacingRates(kInitialBitrateBps * kPaceMultiplier, 0); + send_bucket_->CreateProbeCluster(DataRate::bps(kFirstClusterBps), + /*cluster_id=*/0); + send_bucket_->SetPacingRates( + DataRate::bps(kInitialBitrateBps * kPaceMultiplier), DataRate::Zero()); for (int i = 0; i < 3; ++i) { Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, @@ -1335,7 +1352,8 @@ TEST_P(PacedSenderTest, PaddingOveruse) { const size_t kPacketSize = 1200; send_bucket_->Process(); - send_bucket_->SetPacingRates(60000 * kPaceMultiplier, 0); + send_bucket_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier), + DataRate::Zero()); SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), kPacketSize); @@ -1344,11 +1362,12 @@ TEST_P(PacedSenderTest, PaddingOveruse) { // Add 30kbit padding. When increasing budget, media budget will increase from // negative (overuse) while padding budget will increase from 0. clock_.AdvanceTimeMilliseconds(5); - send_bucket_->SetPacingRates(60000 * kPaceMultiplier, 30000); + send_bucket_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier), + DataRate::bps(30000)); SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, clock_.TimeInMilliseconds(), kPacketSize); - EXPECT_LT(5u, send_bucket_->ExpectedQueueTimeMs()); + EXPECT_LT(TimeDelta::ms(5), send_bucket_->ExpectedQueueTime()); // Don't send padding if queue is non-empty, even if padding budget > 0. EXPECT_CALL(callback_, SendPadding).Times(0); send_bucket_->Process(); @@ -1364,8 +1383,9 @@ TEST_P(PacedSenderTest, ProbeClusterId) { uint16_t sequence_number = 1234; const size_t kPacketSize = 1200; - send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, - kTargetBitrateBps); + send_bucket_->SetPacingRates( + DataRate::bps(kTargetBitrateBps) * kPaceMultiplier, + DataRate::bps(kTargetBitrateBps)); send_bucket_->SetProbingEnabled(true); for (int i = 0; i < 10; ++i) { Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, @@ -1447,8 +1467,9 @@ TEST_P(PacedSenderTest, AvoidBusyLoopOnSendFailure) { uint16_t sequence_number = 1234; const size_t kPacketSize = kFirstClusterBps / (8000 / 10); - send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, - kTargetBitrateBps); + send_bucket_->SetPacingRates( + DataRate::bps(kTargetBitrateBps) * kPaceMultiplier, + DataRate::bps(kTargetBitrateBps)); send_bucket_->SetProbingEnabled(true); Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, clock_.TimeInMilliseconds(), kPacketSize); diff --git a/modules/pacing/rtp_packet_pacer.h b/modules/pacing/rtp_packet_pacer.h new file mode 100644 index 0000000000..b344705f58 --- /dev/null +++ b/modules/pacing/rtp_packet_pacer.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_RTP_PACKET_PACER_H_ +#define MODULES_PACING_RTP_PACKET_PACER_H_ + +#include + +#include "absl/types/optional.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "modules/rtp_rtcp/include/rtp_packet_sender.h" + +namespace webrtc { + +class RtpPacketPacer { + public: + virtual ~RtpPacketPacer() = default; + + virtual void CreateProbeCluster(DataRate bitrate, int cluster_id) = 0; + + // Temporarily pause all sending. + virtual void Pause() = 0; + + // Resume sending packets. + virtual void Resume() = 0; + + virtual void SetCongestionWindow(DataSize congestion_window_size) = 0; + virtual void UpdateOutstandingData(DataSize outstanding_data) = 0; + + // Sets the pacing rates. Must be called once before packets can be sent. + virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0; + + // Time since the oldest packet currently in the queue was added. + virtual TimeDelta OldestPacketWaitTime() const = 0; + + // Current number of packets curently in the pacer queue. + virtual size_t QueueSizePackets() const = 0; + + // Sum of payload + padding bytes of all packets currently in the pacer queue. + virtual DataSize QueueSizeData() const = 0; + + // Returns the time when the first packet was sent. + virtual absl::optional FirstSentPacketTime() const = 0; + + // Returns the expected number of milliseconds it will take to send the + // current packets in the queue, given the current size and bitrate, ignoring + // priority. + virtual TimeDelta ExpectedQueueTime() const = 0; + + // Set the average upper bound on pacer queuing delay. The pacer may send at + // a higher rate than what was configured via SetPacingRates() in order to + // keep ExpectedQueueTimeMs() below |limit_ms| on average. + virtual void SetQueueTimeLimit(TimeDelta limit) = 0; + + // Currently audio traffic is not accounted by pacer and passed through. + // With the introduction of audio BWE audio traffic will be accounted for + // the pacer budget calculation. The audio traffic still will be injected + // at high priority. + virtual void SetAccountForAudioPackets(bool account_for_audio) = 0; +}; + +} // namespace webrtc +#endif // MODULES_PACING_RTP_PACKET_PACER_H_