From 43a69b3f46059b103c35079744dc09a0e2bb2948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Bostr=C3=B6m?= Date: Wed, 16 Mar 2022 10:16:29 +0100 Subject: [PATCH] Experimentally reduce TaskQueuePacedSender's delayed task precision. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This CL reduces the delayed task precision of non-probes in accordance with DD go/slacked-task-queue-paced-sender. The precision is only deduced if field trial "WebRTC-SlackedTaskQueuePacedSender" is enabled though. Bug: webrtc:13824 Change-Id: I37e53b24e343f4f08059be08a3cda74f5484cc05 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/255341 Reviewed-by: Erik Språng Commit-Queue: Henrik Boström Cr-Commit-Position: refs/heads/main@{#36214} --- modules/pacing/BUILD.gn | 3 + modules/pacing/task_queue_paced_sender.cc | 26 ++- modules/pacing/task_queue_paced_sender.h | 11 ++ .../task_queue_paced_sender_unittest.cc | 174 ++++++++++++++++++ 4 files changed, 211 insertions(+), 3 deletions(-) diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 4064c12151..90e0eb7134 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -35,6 +35,7 @@ rtc_library("pacing") { "..:module_api", "../../api:function_view", "../../api:sequence_checker", + "../../api:webrtc_key_value_config", "../../api/rtc_event_log", "../../api/task_queue:task_queue", "../../api/transport:field_trial_based_config", @@ -93,6 +94,7 @@ if (rtc_include_tests) { deps = [ ":interval_budget", ":pacing", + "../../api/task_queue:task_queue", "../../api/transport:network_control", "../../api/units:data_rate", "../../api/units:time_delta", @@ -101,6 +103,7 @@ if (rtc_include_tests) { "../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_base_tests_utils", "../../rtc_base/experiments:alr_experiment", + "../../rtc_base/task_utils:to_queued_task", "../../system_wrappers", "../../test:explicit_key_value_config", "../../test:scoped_key_value_config", diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index 620a54135e..fb3d3cab00 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -12,6 +12,7 @@ #include #include + #include "absl/memory/memory.h" #include "rtc_base/checks.h" #include "rtc_base/event.h" @@ -21,6 +22,13 @@ namespace webrtc { +namespace { + +constexpr const char* kSlackedTaskQueuePacedSenderFieldTrial = + "WebRTC-SlackedTaskQueuePacedSender"; + +} // namespace + TaskQueuePacedSender::TaskQueuePacedSender( Clock* clock, PacingController::PacketSender* packet_sender, @@ -46,8 +54,13 @@ TaskQueuePacedSender::TaskQueuePacedSender( TimeDelta max_hold_back_window, int max_hold_back_window_in_packets) : clock_(clock), - max_hold_back_window_(max_hold_back_window), - max_hold_back_window_in_packets_(max_hold_back_window_in_packets), + allow_low_precision_( + field_trials.IsEnabled(kSlackedTaskQueuePacedSenderFieldTrial)), + max_hold_back_window_(allow_low_precision_ + ? PacingController::kMinSleepTime + : max_hold_back_window), + max_hold_back_window_in_packets_( + allow_low_precision_ ? 0 : max_hold_back_window_in_packets), pacing_controller_(clock, packet_sender, event_log, @@ -287,7 +300,14 @@ void TaskQueuePacedSender::MaybeProcessPackets( // Set a new scheduled process time and post a delayed task. next_process_time_ = next_process_time; - task_queue_.PostDelayedHighPrecisionTask( + // Prefer low precision if allowed and not probing. + TaskQueueBase::DelayPrecision precision = + allow_low_precision_ && !pacing_controller_.IsProbing() + ? TaskQueueBase::DelayPrecision::kLow + : TaskQueueBase::DelayPrecision::kHigh; + + task_queue_.PostDelayedTaskWithPrecision( + precision, [this, next_process_time]() { MaybeProcessPackets(next_process_time); }, time_to_next_process->ms()); } diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 61a625521d..61f4d20eb8 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -26,6 +26,7 @@ #include "api/units/data_size.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" +#include "api/webrtc_key_value_config.h" #include "modules/pacing/pacing_controller.h" #include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" @@ -144,6 +145,16 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { Stats GetStats() const; Clock* const clock_; + // If `kSlackedTaskQueuePacedSenderFieldTrial` is enabled, delayed tasks + // invoking MaybeProcessPackets() are scheduled using low precision instead of + // high precision, resulting in less idle wake ups and packets being sent in + // bursts if the `task_queue_` implementation supports slack. + // + // When probing, high precision is used regardless of `allow_low_precision_` + // to ensure good bandwidth estimation. + const bool allow_low_precision_; + // The holdback window prevents too frequent delayed MaybeProcessPackets() + // calls. These are only applicable if `allow_low_precision_` is false. const TimeDelta max_hold_back_window_; const int max_hold_back_window_in_packets_; diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index a22ff6d164..cb6d6156b4 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -11,15 +11,19 @@ #include "modules/pacing/task_queue_paced_sender.h" #include +#include #include #include #include #include #include +#include "api/task_queue/task_queue_base.h" #include "api/transport/network_types.h" +#include "api/units/data_rate.h" #include "modules/pacing/packet_router.h" #include "modules/utility/include/mock/mock_process_thread.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "test/gmock.h" #include "test/gtest.h" #include "test/scoped_key_value_config.h" @@ -76,6 +80,88 @@ std::vector> GeneratePadding( return padding_packets; } +class TaskQueueWithFakePrecisionFactory : public TaskQueueFactory { + public: + explicit TaskQueueWithFakePrecisionFactory( + TaskQueueFactory* task_queue_factory) + : task_queue_factory_(task_queue_factory) {} + + std::unique_ptr CreateTaskQueue( + absl::string_view name, + Priority priority) const override { + return std::unique_ptr( + new TaskQueueWithFakePrecision( + const_cast(this), + task_queue_factory_)); + } + + int delayed_low_precision_count() const { + return delayed_low_precision_count_; + } + int delayed_high_precision_count() const { + return delayed_high_precision_count_; + } + + private: + friend class TaskQueueWithFakePrecision; + + class TaskQueueWithFakePrecision : public TaskQueueBase { + public: + TaskQueueWithFakePrecision( + TaskQueueWithFakePrecisionFactory* parent_factory, + TaskQueueFactory* task_queue_factory) + : parent_factory_(parent_factory), + task_queue_(task_queue_factory->CreateTaskQueue( + "TaskQueueWithFakePrecision", + TaskQueueFactory::Priority::NORMAL)) {} + ~TaskQueueWithFakePrecision() override {} + + void Delete() override { + // `task_queue_->Delete()` is implicitly called in the destructor due to + // TaskQueueDeleter. + delete this; + } + void PostTask(std::unique_ptr task) override { + task_queue_->PostTask( + ToQueuedTask([this, task = std::move(task)]() mutable { + RunTask(std::move(task)); + })); + } + void PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) override { + ++parent_factory_->delayed_low_precision_count_; + task_queue_->PostDelayedTask( + ToQueuedTask([this, task = std::move(task)]() mutable { + RunTask(std::move(task)); + }), + milliseconds); + } + void PostDelayedHighPrecisionTask(std::unique_ptr task, + uint32_t milliseconds) override { + ++parent_factory_->delayed_high_precision_count_; + task_queue_->PostDelayedHighPrecisionTask( + ToQueuedTask([this, task = std::move(task)]() mutable { + RunTask(std::move(task)); + }), + milliseconds); + } + + private: + void RunTask(std::unique_ptr task) { + CurrentTaskQueueSetter set_current(this); + if (!task->Run()) + task.release(); + } + + TaskQueueWithFakePrecisionFactory* parent_factory_; + std::unique_ptr task_queue_; + }; + + TaskQueueFactory* task_queue_factory_; + std::atomic delayed_low_precision_count_ = 0u; + std::atomic delayed_high_precision_count_ = 0u; +}; + } // namespace namespace test { @@ -597,5 +683,93 @@ TEST(TaskQueuePacedSenderTest, Stats) { EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero()); } +TEST(TaskQueuePacedSenderTest, HighPrecisionPacingWhenSlackIsDisabled) { + test::ScopedKeyValueConfig experiments( + "WebRTC-SlackedTaskQueuePacedSender/Disabled/"); + + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + TaskQueueWithFakePrecisionFactory task_queue_factory( + time_controller.GetTaskQueueFactory()); + + MockPacketRouter packet_router; + TaskQueuePacedSender pacer( + time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, + experiments, &task_queue_factory, PacingController::kMinSleepTime, + kNoPacketHoldback); + + // Send enough packets (covering one second) that pacing is triggered, i.e. + // delayed tasks being scheduled. + static constexpr size_t kPacketsToSend = 42; + static constexpr DataRate kPacingRate = + DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend); + pacer.SetPacingRates(kPacingRate, DataRate::Zero()); + pacer.EnsureStarted(); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); + // Expect all of them to be sent. + size_t packets_sent = 0; + EXPECT_CALL(packet_router, SendPacket) + .WillRepeatedly( + [&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { ++packets_sent; }); + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + EXPECT_EQ(packets_sent, kPacketsToSend); + + // Expect pacing to make use of high precision. + EXPECT_EQ(task_queue_factory.delayed_low_precision_count(), 0); + EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0); + + // Create probe cluster which is also high precision. + pacer.CreateProbeCluster(kPacingRate, 123); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + EXPECT_EQ(task_queue_factory.delayed_low_precision_count(), 0); + EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0); +} + +TEST(TaskQueuePacedSenderTest, LowPrecisionPacingWhenSlackIsEnabled) { + test::ScopedKeyValueConfig experiments( + "WebRTC-SlackedTaskQueuePacedSender/Enabled/"); + + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + TaskQueueWithFakePrecisionFactory task_queue_factory( + time_controller.GetTaskQueueFactory()); + + MockPacketRouter packet_router; + TaskQueuePacedSender pacer( + time_controller.GetClock(), &packet_router, /*event_log=*/nullptr, + experiments, &task_queue_factory, PacingController::kMinSleepTime, + kNoPacketHoldback); + + // Send enough packets (covering one second) that pacing is triggered, i.e. + // delayed tasks being scheduled. + static constexpr size_t kPacketsToSend = 42; + static constexpr DataRate kPacingRate = + DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend); + pacer.SetPacingRates(kPacingRate, DataRate::Zero()); + pacer.EnsureStarted(); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); + // Expect all of them to be sent. + size_t packets_sent = 0; + EXPECT_CALL(packet_router, SendPacket) + .WillRepeatedly( + [&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { ++packets_sent; }); + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + EXPECT_EQ(packets_sent, kPacketsToSend); + + // Expect pacing to make use of low precision. + EXPECT_GT(task_queue_factory.delayed_low_precision_count(), 0); + EXPECT_EQ(task_queue_factory.delayed_high_precision_count(), 0); + + // Create probe cluster, which uses high precision despite regular pacing + // being low precision. + pacer.CreateProbeCluster(kPacingRate, 123); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0); +} + } // namespace test } // namespace webrtc