From 4314a494cf68666d93b62d366a300fd53825c964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Tue, 26 Nov 2019 17:48:49 +0100 Subject: [PATCH] Implements a task-queue based PacedSender, wires it up for field trials MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: webrtc:10809 Change-Id: Ia181c16559f4598f32dd399c24802d0a289e250b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150942 Reviewed-by: Stefan Holmer Reviewed-by: Ilya Nikolaevskiy Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/master@{#29946} --- call/rtp_transport_controller_send.cc | 55 ++-- call/rtp_transport_controller_send.h | 10 +- modules/pacing/BUILD.gn | 8 + modules/pacing/pacing_controller.cc | 4 + modules/pacing/pacing_controller.h | 5 + modules/pacing/task_queue_paced_sender.cc | 254 ++++++++++++++++++ modules/pacing/task_queue_paced_sender.h | 162 +++++++++++ .../task_queue_paced_sender_unittest.cc | 176 ++++++++++++ 8 files changed, 656 insertions(+), 18 deletions(-) create mode 100644 modules/pacing/task_queue_paced_sender.cc create mode 100644 modules/pacing/task_queue_paced_sender.h create mode 100644 modules/pacing/task_queue_paced_sender_unittest.cc diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index c7ccc927ed..a44b534033 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -72,9 +72,24 @@ RtpTransportControllerSend::RtpTransportControllerSend( const WebRtcKeyValueConfig* trials) : clock_(clock), event_log_(event_log), + field_trials_(trials ? trials : &fallback_field_trials_), bitrate_configurator_(bitrate_config), process_thread_(std::move(process_thread)), - pacer_(clock, &packet_router_, event_log, trials, process_thread_.get()), + use_task_queue_pacer_(IsEnabled(field_trials_, "WebRTC-TaskQueuePacer")), + process_thread_pacer_(use_task_queue_pacer_ + ? nullptr + : new PacedSender(clock, + &packet_router_, + event_log, + field_trials_, + process_thread_.get())), + task_queue_pacer_(use_task_queue_pacer_ + ? new TaskQueuePacedSender(clock, + &packet_router_, + event_log, + field_trials_, + task_queue_factory) + : nullptr), observer_(nullptr), controller_factory_override_(controller_factory), controller_factory_fallback_( @@ -82,11 +97,12 @@ RtpTransportControllerSend::RtpTransportControllerSend( process_interval_(controller_factory_fallback_->GetProcessInterval()), last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())), reset_feedback_on_route_change_( - !IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")), + !IsEnabled(field_trials_, "WebRTC-Bwe-NoFeedbackReset")), send_side_bwe_with_overhead_( - IsEnabled(trials, "WebRTC-SendSideBwe-WithOverhead")), + IsEnabled(field_trials_, "WebRTC-SendSideBwe-WithOverhead")), add_pacing_to_cwin_( - IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")), + IsEnabled(field_trials_, + "WebRTC-AddPacingToCongestionWindowPushback")), transport_overhead_bytes_per_packet_(0), network_available_(false), retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs), @@ -95,17 +111,21 @@ RtpTransportControllerSend::RtpTransportControllerSend( TaskQueueFactory::Priority::NORMAL)) { initial_config_.constraints = ConvertConstraints(bitrate_config, clock_); initial_config_.event_log = event_log; - initial_config_.key_value_config = &trial_based_config_; + initial_config_.key_value_config = field_trials_; RTC_DCHECK(bitrate_config.start_bitrate_bps > 0); pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps), DataRate::Zero()); - process_thread_->Start(); + if (!use_task_queue_pacer_) { + process_thread_->Start(); + } } RtpTransportControllerSend::~RtpTransportControllerSend() { - process_thread_->Stop(); + if (!use_task_queue_pacer_) { + process_thread_->Stop(); + } } RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender( @@ -153,15 +173,17 @@ void RtpTransportControllerSend::UpdateControlState() { } RtpPacketPacer* RtpTransportControllerSend::pacer() { - // TODO(bugs.webrtc.org/10809): Return reference to the correct - // pacer implementation. - return &pacer_; + if (use_task_queue_pacer_) { + return task_queue_pacer_.get(); + } + return process_thread_pacer_.get(); } const RtpPacketPacer* RtpTransportControllerSend::pacer() const { - // TODO(bugs.webrtc.org/10809): Return reference to the correct - // pacer implementation. - return &pacer_; + if (use_task_queue_pacer_) { + return task_queue_pacer_.get(); + } + return process_thread_pacer_.get(); } rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() { @@ -183,9 +205,10 @@ RtpTransportControllerSend::transport_feedback_observer() { } RtpPacketSender* RtpTransportControllerSend::packet_sender() { - // TODO(bugs.webrtc.org/10809): Return reference to the correct - // pacer implementation. - return &pacer_; + if (use_task_queue_pacer_) { + return task_queue_pacer_.get(); + } + return process_thread_pacer_.get(); } void RtpTransportControllerSend::SetAllocatedSendBitrateLimits( diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 2cadaa3d8f..32c762bd8d 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -24,8 +24,10 @@ #include "call/rtp_video_sender.h" #include "modules/congestion_controller/rtp/control_handler.h" #include "modules/congestion_controller/rtp/transport_feedback_adapter.h" +#include "modules/pacing/paced_sender.h" #include "modules/pacing/packet_router.h" #include "modules/pacing/rtp_packet_pacer.h" +#include "modules/pacing/task_queue_paced_sender.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/network_route.h" @@ -137,13 +139,17 @@ class RtpTransportControllerSend final Clock* const clock_; RtcEventLog* const event_log_; - const FieldTrialBasedConfig trial_based_config_; + // TODO(sprang): Remove fallback field-trials. + const FieldTrialBasedConfig fallback_field_trials_; + const WebRtcKeyValueConfig* field_trials_; PacketRouter packet_router_; std::vector> video_rtp_senders_; RtpBitrateConfigurator bitrate_configurator_; std::map network_routes_; const std::unique_ptr process_thread_; - PacedSender pacer_; + const bool use_task_queue_pacer_; + std::unique_ptr process_thread_pacer_; + std::unique_ptr task_queue_pacer_; TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 5166cabd34..d59d2b93a4 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -26,6 +26,8 @@ rtc_library("pacing") { "round_robin_packet_queue.cc", "round_robin_packet_queue.h", "rtp_packet_pacer.h", + "task_queue_paced_sender.cc", + "task_queue_paced_sender.h", ] deps = [ @@ -33,6 +35,7 @@ rtc_library("pacing") { "..:module_api", "../../api:function_view", "../../api/rtc_event_log", + "../../api/task_queue:task_queue", "../../api/transport:field_trial_based_config", "../../api/transport:network_control", "../../api/transport:webrtc_key_value_config", @@ -44,7 +47,10 @@ rtc_library("pacing") { "../../logging:rtc_event_pacing", "../../rtc_base:checks", "../../rtc_base:rtc_base_approved", + "../../rtc_base:rtc_task_queue", "../../rtc_base/experiments:field_trial_parser", + "../../rtc_base/synchronization:sequence_checker", + "../../rtc_base/task_utils:to_queued_task", "../../system_wrappers", "../../system_wrappers:metrics", "../remote_bitrate_estimator", @@ -78,6 +84,7 @@ if (rtc_include_tests) { "paced_sender_unittest.cc", "pacing_controller_unittest.cc", "packet_router_unittest.cc", + "task_queue_paced_sender_unittest.cc", ] deps = [ ":interval_budget", @@ -93,6 +100,7 @@ if (rtc_include_tests) { "../../system_wrappers:field_trial", "../../test:field_trial", "../../test:test_support", + "../../test/time_controller:time_controller", "../rtp_rtcp", "../rtp_rtcp:mock_rtp_rtcp", "../rtp_rtcp:rtp_rtcp_format", diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 8be62090d6..0d0d1ae5dd 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -241,6 +241,10 @@ DataSize PacingController::QueueSizeData() const { return packet_queue_.Size(); } +DataSize PacingController::CurrentBufferLevel() const { + return std::max(media_debt_, padding_debt_); +} + absl::optional PacingController::FirstSentPacketTime() const { return first_sent_packet_time_; } diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index 6a05eac438..f39887d2dc 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -111,9 +111,14 @@ class PacingController { // Returns the time since the oldest queued packet was enqueued. TimeDelta OldestPacketWaitTime() const; + // Number of packets in the pacer queue. size_t QueueSizePackets() const; + // Totals size of packets in the pacer queue. DataSize QueueSizeData() const; + // Current buffer level, i.e. max of media and padding debt. + DataSize CurrentBufferLevel() const; + // Returns the time when the first packet was sent; absl::optional FirstSentPacketTime() const; diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc new file mode 100644 index 0000000000..e1745db9d5 --- /dev/null +++ b/modules/pacing/task_queue_paced_sender.cc @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/task_queue_paced_sender.h" + +#include +#include +#include "absl/memory/memory.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" + +namespace webrtc { +namespace { +// If no calls to MaybeProcessPackets() happen, make sure we update stats +// at least every |kMaxTimeBetweenStatsUpdates| as long as the pacer isn't +// completely drained. +constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis<33>(); +// Don't call UpdateStats() more than |kMinTimeBetweenStatsUpdates| apart, +// for performance reasons. +constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis<1>(); +} // namespace + +TaskQueuePacedSender::TaskQueuePacedSender( + Clock* clock, + PacketRouter* packet_router, + RtcEventLog* event_log, + const WebRtcKeyValueConfig* field_trials, + TaskQueueFactory* task_queue_factory) + : clock_(clock), + packet_router_(packet_router), + pacing_controller_(clock, + static_cast(this), + event_log, + field_trials, + PacingController::ProcessMode::kDynamic), + next_process_time_(Timestamp::MinusInfinity()), + stats_update_scheduled_(false), + last_stats_time_(Timestamp::MinusInfinity()), + is_shutdown_(false), + task_queue_(task_queue_factory->CreateTaskQueue( + "TaskQueuePacedSender", + TaskQueueFactory::Priority::NORMAL)) {} + +TaskQueuePacedSender::~TaskQueuePacedSender() { + // Post an immediate task to mark the queue as shutting down. + // The rtc::TaskQueue destructor will wait for pending tasks to + // complete before continuing. + task_queue_.PostTask([&]() { + RTC_DCHECK_RUN_ON(&task_queue_); + is_shutdown_ = true; + }); +} + +void TaskQueuePacedSender::CreateProbeCluster(DataRate bitrate, + int cluster_id) { + task_queue_.PostTask([this, bitrate, cluster_id]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.CreateProbeCluster(bitrate, cluster_id); + MaybeProcessPackets(Timestamp::MinusInfinity()); + }); +} + +void TaskQueuePacedSender::Pause() { + task_queue_.PostTask([this]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.Pause(); + }); +} + +void TaskQueuePacedSender::Resume() { + task_queue_.PostTask([this]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.Resume(); + MaybeProcessPackets(Timestamp::MinusInfinity()); + }); +} + +void TaskQueuePacedSender::SetCongestionWindow( + DataSize congestion_window_size) { + task_queue_.PostTask([this, congestion_window_size]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.SetCongestionWindow(congestion_window_size); + MaybeProcessPackets(Timestamp::MinusInfinity()); + }); +} + +void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) { + if (task_queue_.IsCurrent()) { + RTC_DCHECK_RUN_ON(&task_queue_); + // Fast path since this can be called once per sent packet while on the + // task queue. + pacing_controller_.UpdateOutstandingData(outstanding_data); + return; + } + + task_queue_.PostTask([this, outstanding_data]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.UpdateOutstandingData(outstanding_data); + MaybeProcessPackets(Timestamp::MinusInfinity()); + }); +} + +void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate, + DataRate padding_rate) { + task_queue_.PostTask([this, pacing_rate, padding_rate]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.SetPacingRates(pacing_rate, padding_rate); + MaybeProcessPackets(Timestamp::MinusInfinity()); + }); +} + +void TaskQueuePacedSender::EnqueuePackets( + std::vector> packets) { + task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable { + RTC_DCHECK_RUN_ON(&task_queue_); + for (auto& packet : packets_) { + pacing_controller_.EnqueuePacket(std::move(packet)); + } + MaybeProcessPackets(Timestamp::MinusInfinity()); + }); +} + +void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) { + task_queue_.PostTask([this, account_for_audio]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.SetAccountForAudioPackets(account_for_audio); + }); +} + +void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) { + task_queue_.PostTask([this, limit]() { + RTC_DCHECK_RUN_ON(&task_queue_); + pacing_controller_.SetQueueTimeLimit(limit); + MaybeProcessPackets(Timestamp::MinusInfinity()); + }); +} + +TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const { + return GetStats().expected_queue_time; +} + +DataSize TaskQueuePacedSender::QueueSizeData() const { + return GetStats().queue_size; +} + +absl::optional TaskQueuePacedSender::FirstSentPacketTime() const { + return GetStats().first_sent_packet_time; +} + +TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const { + return GetStats().oldest_packet_wait_time; +} + +void TaskQueuePacedSender::MaybeProcessPackets( + Timestamp scheduled_process_time) { + RTC_DCHECK_RUN_ON(&task_queue_); + + if (is_shutdown_) { + return; + } + + const Timestamp now = clock_->CurrentTime(); + // Run ProcessPackets() only if this is the schedules task, or if there is + // no scheduled task and we need to process immediately. + if ((scheduled_process_time.IsFinite() && + scheduled_process_time == next_process_time_) || + (next_process_time_.IsInfinite() && + pacing_controller_.NextSendTime() <= now)) { + pacing_controller_.ProcessPackets(); + next_process_time_ = Timestamp::MinusInfinity(); + } + + Timestamp next_process_time = std::max(now + PacingController::kMinSleepTime, + pacing_controller_.NextSendTime()); + TimeDelta sleep_time = next_process_time - now; + if (next_process_time_.IsMinusInfinity() || + next_process_time <= + next_process_time_ - PacingController::kMinSleepTime) { + next_process_time_ = next_process_time; + + task_queue_.PostDelayedTask( + [this, next_process_time]() { MaybeProcessPackets(next_process_time); }, + sleep_time.ms()); + } + + MaybeUpdateStats(false); +} + +std::vector> +TaskQueuePacedSender::GeneratePadding(DataSize size) { + return packet_router_->GeneratePadding(size.bytes()); +} + +void TaskQueuePacedSender::SendRtpPacket( + std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + packet_router_->SendPacket(std::move(packet), cluster_info); +} + +void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) { + if (is_shutdown_) { + return; + } + + Timestamp now = clock_->CurrentTime(); + if (!is_scheduled_call && + now - last_stats_time_ < kMinTimeBetweenStatsUpdates) { + // Too frequent unscheduled stats update, return early. + return; + } + + rtc::CritScope cs(&stats_crit_); + current_stats_.expected_queue_time = pacing_controller_.ExpectedQueueTime(); + current_stats_.first_sent_packet_time = + pacing_controller_.FirstSentPacketTime(); + current_stats_.oldest_packet_wait_time = + pacing_controller_.OldestPacketWaitTime(); + current_stats_.queue_size = pacing_controller_.QueueSizeData(); + last_stats_time_ = now; + + bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 && + pacing_controller_.CurrentBufferLevel().IsZero(); + + // If there's anything interesting to get from the pacer and this is a + // scheduled call (no scheduled call in flight), post a new scheduled stats + // update. + if (!pacer_drained && (is_scheduled_call || !stats_update_scheduled_)) { + task_queue_.PostDelayedTask( + [this]() { + RTC_DCHECK_RUN_ON(&task_queue_); + MaybeUpdateStats(true); + }, + kMaxTimeBetweenStatsUpdates.ms()); + stats_update_scheduled_ = true; + } else { + stats_update_scheduled_ = false; + } +} + +TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const { + rtc::CritScope cs(&stats_crit_); + return current_stats_; +} + +} // namespace webrtc diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h new file mode 100644 index 0000000000..719886a931 --- /dev/null +++ b/modules/pacing/task_queue_paced_sender.h @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_ +#define MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_ + +#include +#include + +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "api/task_queue/task_queue_factory.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "modules/include/module.h" +#include "modules/pacing/pacing_controller.h" +#include "modules/pacing/packet_router.h" +#include "modules/pacing/rtp_packet_pacer.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" +#include "rtc_base/critical_section.h" +#include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/thread_annotations.h" + +namespace webrtc { +class Clock; +class RtcEventLog; + +class TaskQueuePacedSender : public RtpPacketPacer, + public RtpPacketSender, + private PacingController::PacketSender { + public: + TaskQueuePacedSender(Clock* clock, + PacketRouter* packet_router, + RtcEventLog* event_log, + const WebRtcKeyValueConfig* field_trials, + TaskQueueFactory* task_queue_factory); + + ~TaskQueuePacedSender() override; + + // Methods implementing RtpPacketSender. + + // Adds the packet to the queue and calls PacketRouter::SendPacket() when + // it's time to send. + void EnqueuePackets( + std::vector> packets) override; + + // Methods implementing RtpPacketPacer: + + void CreateProbeCluster(DataRate bitrate, int cluster_id) override; + + // Temporarily pause all sending. + void Pause() override; + + // Resume sending packets. + void Resume() override; + + void SetCongestionWindow(DataSize congestion_window_size) override; + void UpdateOutstandingData(DataSize outstanding_data) override; + + // Sets the pacing rates. Must be called once before packets can be sent. + void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override; + + // Currently audio traffic is not accounted for by pacer and passed through. + // With the introduction of audio BWE, audio traffic will be accounted for + // in the pacer budget calculation. The audio traffic will still be injected + // at high priority. + void SetAccountForAudioPackets(bool account_for_audio) override; + + // Returns the time since the oldest queued packet was enqueued. + TimeDelta OldestPacketWaitTime() const override; + + // Returns total size of all packets in the pacer queue. + DataSize QueueSizeData() const override; + + // Returns the time when the first packet was sent; + absl::optional FirstSentPacketTime() const override; + + // Returns the number of milliseconds it will take to send the current + // packets in the queue, given the current size and bitrate, ignoring prio. + TimeDelta ExpectedQueueTime() const override; + + // Set the max desired queuing delay, pacer will override the pacing rate + // specified by SetPacingRates() if needed to achieve this goal. + void SetQueueTimeLimit(TimeDelta limit) override; + + private: + struct Stats { + Stats() + : oldest_packet_wait_time(TimeDelta::Zero()), + queue_size(DataSize::Zero()), + expected_queue_time(TimeDelta::Zero()) {} + TimeDelta oldest_packet_wait_time; + DataSize queue_size; + TimeDelta expected_queue_time; + absl::optional first_sent_packet_time; + }; + + // Check if it is time to send packets, or schedule a delayed task if not. + // Use Timestamp::MinusInfinity() to indicate that this call has _not_ + // been scheduled by the pacing controller. If this is the case, check if + // can execute immediately otherwise schedule a delay task that calls this + // method again with desired (finite) scheduled process time. + void MaybeProcessPackets(Timestamp scheduled_process_time); + + // Methods implementing PacedSenderController:PacketSender. + + void SendRtpPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) override + RTC_RUN_ON(task_queue_); + + std::vector> GeneratePadding( + DataSize size) override RTC_RUN_ON(task_queue_); + + void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_); + Stats GetStats() const; + + Clock* const clock_; + PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_); + PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_); + + // We want only one (valid) delayed process task in flight at a time. + // If the value of |next_process_time_| is finite, it is an id for a + // delayed task that will call MaybeProcessPackets() with that time + // as parameter. + // Timestamp::MinusInfinity() indicates no valid pending task. + Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_); + + // Since we don't want to support synchronous calls that wait for a + // task execution, we poll the stats at some interval and update + // |current_stats_|, which can in turn be polled at any time. + + // True iff there is delayed task in flight that that will call + // UdpateStats(). + bool stats_update_scheduled_ RTC_GUARDED_BY(task_queue_); + // Last time stats were updated. + Timestamp last_stats_time_ RTC_GUARDED_BY(task_queue_); + + // Indicates if this task queue is shutting down. If so, don't allow + // posting any more delayed tasks as that can cause the task queue to + // never drain. + bool is_shutdown_ RTC_GUARDED_BY(task_queue_); + + rtc::CriticalSection stats_crit_; + Stats current_stats_ RTC_GUARDED_BY(stats_crit_); + + rtc::TaskQueue task_queue_; +}; +} // namespace webrtc +#endif // MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_ diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc new file mode 100644 index 0000000000..390523f093 --- /dev/null +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/task_queue_paced_sender.h" + +#include +#include +#include +#include +#include + +#include "modules/pacing/packet_router.h" +#include "modules/utility/include/mock/mock_process_thread.h" +#include "test/field_trial.h" +#include "test/gmock.h" +#include "test/gtest.h" +#include "test/time_controller/simulated_time_controller.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SaveArg; + +namespace webrtc { +namespace { +constexpr uint32_t kAudioSsrc = 12345; +constexpr uint32_t kVideoSsrc = 234565; +constexpr uint32_t kVideoRtxSsrc = 34567; +constexpr uint32_t kFlexFecSsrc = 45678; +constexpr size_t kDefaultPacketSize = 1234; + +class MockPacketRouter : public PacketRouter { + public: + MOCK_METHOD2(SendPacket, + void(std::unique_ptr packet, + const PacedPacketInfo& cluster_info)); + MOCK_METHOD1( + GeneratePadding, + std::vector>(size_t target_size_bytes)); +}; +} // namespace + +namespace test { + +class TaskQueuePacedSenderTest : public ::testing::Test { + public: + TaskQueuePacedSenderTest() + : time_controller_(Timestamp::ms(1234)), + pacer_(time_controller_.GetClock(), + &packet_router_, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller_.GetTaskQueueFactory()) {} + + protected: + std::unique_ptr BuildRtpPacket(RtpPacketToSend::Type type) { + auto packet = std::make_unique(nullptr); + packet->set_packet_type(type); + switch (type) { + case RtpPacketToSend::Type::kAudio: + packet->SetSsrc(kAudioSsrc); + break; + case RtpPacketToSend::Type::kVideo: + packet->SetSsrc(kVideoSsrc); + break; + case RtpPacketToSend::Type::kRetransmission: + case RtpPacketToSend::Type::kPadding: + packet->SetSsrc(kVideoRtxSsrc); + break; + case RtpPacketToSend::Type::kForwardErrorCorrection: + packet->SetSsrc(kFlexFecSsrc); + break; + } + + packet->SetPayloadSize(kDefaultPacketSize); + return packet; + } + + std::vector> GeneratePackets( + RtpPacketToSend::Type type, + size_t num_packets) { + std::vector> packets; + for (size_t i = 0; i < num_packets; ++i) { + packets.push_back(BuildRtpPacket(type)); + } + return packets; + } + + Timestamp CurrentTime() { return time_controller_.GetClock()->CurrentTime(); } + + GlobalSimulatedTimeController time_controller_; + MockPacketRouter packet_router_; + TaskQueuePacedSender pacer_; +}; + +TEST_F(TaskQueuePacedSenderTest, PacesPackets) { + // Insert a number of packets, covering one second. + static constexpr size_t kPacketsToSend = 42; + pacer_.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend), + DataRate::Zero()); + pacer_.EnqueuePackets( + GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsToSend)); + + // Expect all of them to be sent. + size_t packets_sent = 0; + Timestamp end_time = Timestamp::PlusInfinity(); + EXPECT_CALL(packet_router_, SendPacket) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + ++packets_sent; + if (packets_sent == kPacketsToSend) { + end_time = time_controller_.GetClock()->CurrentTime(); + } + }); + + const Timestamp start_time = time_controller_.GetClock()->CurrentTime(); + + // Packets should be sent over a period of close to 1s. Expect a little lower + // than this since initial probing is a bit quicker. + time_controller_.Sleep(TimeDelta::seconds(1)); + EXPECT_EQ(packets_sent, kPacketsToSend); + ASSERT_TRUE(end_time.IsFinite()); + EXPECT_NEAR((end_time - start_time).ms(), 1000.0, 50.0); +} + +TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { + // Insert a number of packets to be sent 200ms apart. + const size_t kPacketsPerSecond = 5; + const DataRate kPacingRate = + DataRate::bps(kDefaultPacketSize * 8 * kPacketsPerSecond); + pacer_.SetPacingRates(kPacingRate, DataRate::Zero()); + + // Send some initial packets to be rid of any probes. + EXPECT_CALL(packet_router_, SendPacket).Times(kPacketsPerSecond); + pacer_.EnqueuePackets( + GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsPerSecond)); + time_controller_.Sleep(TimeDelta::seconds(1)); + + // Insert three packets, and record send time of each of them. + // After the second packet is sent, double the send rate so we can + // check the third packets is sent after half the wait time. + Timestamp first_packet_time = Timestamp::MinusInfinity(); + Timestamp second_packet_time = Timestamp::MinusInfinity(); + Timestamp third_packet_time = Timestamp::MinusInfinity(); + + EXPECT_CALL(packet_router_, SendPacket) + .Times(3) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + if (first_packet_time.IsInfinite()) { + first_packet_time = CurrentTime(); + } else if (second_packet_time.IsInfinite()) { + second_packet_time = CurrentTime(); + pacer_.SetPacingRates(2 * kPacingRate, DataRate::Zero()); + } else { + third_packet_time = CurrentTime(); + } + }); + + pacer_.EnqueuePackets(GeneratePackets(RtpPacketToSend::Type::kVideo, 3)); + time_controller_.Sleep(TimeDelta::ms(500)); + ASSERT_TRUE(third_packet_time.IsFinite()); + EXPECT_NEAR((second_packet_time - first_packet_time).ms(), 200.0, + 1.0); + EXPECT_NEAR((third_packet_time - second_packet_time).ms(), 100.0, + 1.0); +} + +} // namespace test +} // namespace webrtc