diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 56c5e55ca1..9baf164a60 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -91,13 +91,16 @@ RtpTransportControllerSend::RtpTransportControllerSend( event_log, trials, process_thread_.get())), - task_queue_pacer_(use_task_queue_pacer_ - ? new TaskQueuePacedSender(clock, - &packet_router_, - event_log, - trials, - task_queue_factory) - : nullptr), + task_queue_pacer_( + use_task_queue_pacer_ + ? new TaskQueuePacedSender( + clock, + &packet_router_, + event_log, + trials, + task_queue_factory, + /*hold_back_window = */ PacingController::kMinSleepTime) + : nullptr), observer_(nullptr), controller_factory_override_(controller_factory), controller_factory_fallback_( diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index f21e63733f..b1f6e896a7 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -193,6 +193,10 @@ bool PacingController::Congested() const { return false; } +bool PacingController::IsProbing() const { + return prober_.is_probing(); +} + Timestamp PacingController::CurrentTime() const { Timestamp time = clock_->CurrentTime(); if (time < last_timestamp_) { diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index 27f1614b08..20d2539e45 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -146,6 +146,8 @@ class PacingController { bool Congested() const; + bool IsProbing() const; + private: void EnqueuePacketInternal(std::unique_ptr packet, int priority); diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index a4ce9fe9d6..d460d60048 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -34,8 +34,10 @@ TaskQueuePacedSender::TaskQueuePacedSender( PacketRouter* packet_router, RtcEventLog* event_log, const WebRtcKeyValueConfig* field_trials, - TaskQueueFactory* task_queue_factory) + TaskQueueFactory* task_queue_factory, + TimeDelta hold_back_window) : clock_(clock), + hold_back_window_(hold_back_window), packet_router_(packet_router), pacing_controller_(clock, static_cast(this), @@ -200,8 +202,10 @@ void TaskQueuePacedSender::MaybeProcessPackets( next_process_time = pacing_controller_.NextSendTime(); } - next_process_time = - std::max(now + PacingController::kMinSleepTime, next_process_time); + const TimeDelta min_sleep = pacing_controller_.IsProbing() + ? PacingController::kMinSleepTime + : hold_back_window_; + next_process_time = std::max(now + min_sleep, next_process_time); TimeDelta sleep_time = next_process_time - now; if (next_process_time_.IsMinusInfinity() || diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 8b47f5ee3d..3241d3fb63 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -42,11 +42,18 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender, private PacingController::PacketSender { public: - TaskQueuePacedSender(Clock* clock, - PacketRouter* packet_router, - RtcEventLog* event_log, - const WebRtcKeyValueConfig* field_trials, - TaskQueueFactory* task_queue_factory); + // The |hold_back_window| parameter sets a lower bound on time to sleep if + // there is currently a pacer queue and packets can't immediately be + // processed. Increasing this reduces thread wakeups at the expense of higher + // latency. + // TODO(bugs.webrtc.org/10809): Remove default value for hold_back_window. + TaskQueuePacedSender( + Clock* clock, + PacketRouter* packet_router, + RtcEventLog* event_log, + const WebRtcKeyValueConfig* field_trials, + TaskQueueFactory* task_queue_factory, + TimeDelta hold_back_window = PacingController::kMinSleepTime); ~TaskQueuePacedSender() override; @@ -131,6 +138,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, Stats GetStats() const; Clock* const clock_; + const TimeDelta hold_back_window_; PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_); PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_); diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index ba2aad21ff..e93f776f38 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -24,6 +24,7 @@ #include "test/time_controller/simulated_time_controller.h" using ::testing::_; +using ::testing::AtLeast; using ::testing::Return; using ::testing::SaveArg; @@ -48,17 +49,6 @@ class MockPacketRouter : public PacketRouter { namespace test { -class TaskQueuePacedSenderTest : public ::testing::Test { - public: - TaskQueuePacedSenderTest() - : time_controller_(Timestamp::Millis(1234)), - pacer_(time_controller_.GetClock(), - &packet_router_, - /*event_log=*/nullptr, - /*field_trials=*/nullptr, - time_controller_.GetTaskQueueFactory()) {} - - protected: std::unique_ptr BuildRtpPacket(RtpPacketMediaType type) { auto packet = std::make_unique(nullptr); packet->set_packet_type(type); @@ -92,109 +82,193 @@ class TaskQueuePacedSenderTest : public ::testing::Test { return packets; } - Timestamp CurrentTime() { return time_controller_.GetClock()->CurrentTime(); } + TEST(TaskQueuePacedSenderTest, PacesPackets) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); - GlobalSimulatedTimeController time_controller_; - MockPacketRouter packet_router_; - TaskQueuePacedSender pacer_; -}; + // Insert a number of packets, covering one second. + static constexpr size_t kPacketsToSend = 42; + pacer.SetPacingRates( + DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), + DataRate::Zero()); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); -TEST_F(TaskQueuePacedSenderTest, PacesPackets) { - // Insert a number of packets, covering one second. - static constexpr size_t kPacketsToSend = 42; - pacer_.SetPacingRates( - DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), - DataRate::Zero()); - pacer_.EnqueuePackets( - GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); + // Expect all of them to be sent. + size_t packets_sent = 0; + Timestamp end_time = Timestamp::PlusInfinity(); + EXPECT_CALL(packet_router, SendPacket) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + ++packets_sent; + if (packets_sent == kPacketsToSend) { + end_time = time_controller.GetClock()->CurrentTime(); + } + }); - // Expect all of them to be sent. - size_t packets_sent = 0; - Timestamp end_time = Timestamp::PlusInfinity(); - EXPECT_CALL(packet_router_, SendPacket) - .WillRepeatedly([&](std::unique_ptr packet, - const PacedPacketInfo& cluster_info) { - ++packets_sent; - if (packets_sent == kPacketsToSend) { - end_time = time_controller_.GetClock()->CurrentTime(); - } - }); + const Timestamp start_time = time_controller.GetClock()->CurrentTime(); - const Timestamp start_time = time_controller_.GetClock()->CurrentTime(); + // Packets should be sent over a period of close to 1s. Expect a little + // lower than this since initial probing is a bit quicker. + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + EXPECT_EQ(packets_sent, kPacketsToSend); + ASSERT_TRUE(end_time.IsFinite()); + EXPECT_NEAR((end_time - start_time).ms(), 1000.0, 50.0); + } - // Packets should be sent over a period of close to 1s. Expect a little lower - // than this since initial probing is a bit quicker. - time_controller_.AdvanceTime(TimeDelta::Seconds(1)); - EXPECT_EQ(packets_sent, kPacketsToSend); - ASSERT_TRUE(end_time.IsFinite()); - EXPECT_NEAR((end_time - start_time).ms(), 1000.0, 50.0); -} + TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); -TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { - // Insert a number of packets to be sent 200ms apart. - const size_t kPacketsPerSecond = 5; - const DataRate kPacingRate = - DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond); - pacer_.SetPacingRates(kPacingRate, DataRate::Zero()); + // Insert a number of packets to be sent 200ms apart. + const size_t kPacketsPerSecond = 5; + const DataRate kPacingRate = + DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond); + pacer.SetPacingRates(kPacingRate, DataRate::Zero()); - // Send some initial packets to be rid of any probes. - EXPECT_CALL(packet_router_, SendPacket).Times(kPacketsPerSecond); - pacer_.EnqueuePackets( - GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond)); - time_controller_.AdvanceTime(TimeDelta::Seconds(1)); + // Send some initial packets to be rid of any probes. + EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond)); + time_controller.AdvanceTime(TimeDelta::Seconds(1)); - // Insert three packets, and record send time of each of them. - // After the second packet is sent, double the send rate so we can - // check the third packets is sent after half the wait time. - Timestamp first_packet_time = Timestamp::MinusInfinity(); - Timestamp second_packet_time = Timestamp::MinusInfinity(); - Timestamp third_packet_time = Timestamp::MinusInfinity(); + // Insert three packets, and record send time of each of them. + // After the second packet is sent, double the send rate so we can + // check the third packets is sent after half the wait time. + Timestamp first_packet_time = Timestamp::MinusInfinity(); + Timestamp second_packet_time = Timestamp::MinusInfinity(); + Timestamp third_packet_time = Timestamp::MinusInfinity(); - EXPECT_CALL(packet_router_, SendPacket) - .Times(3) - .WillRepeatedly([&](std::unique_ptr packet, - const PacedPacketInfo& cluster_info) { - if (first_packet_time.IsInfinite()) { - first_packet_time = CurrentTime(); - } else if (second_packet_time.IsInfinite()) { - second_packet_time = CurrentTime(); - pacer_.SetPacingRates(2 * kPacingRate, DataRate::Zero()); - } else { - third_packet_time = CurrentTime(); - } - }); + EXPECT_CALL(packet_router, SendPacket) + .Times(3) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + if (first_packet_time.IsInfinite()) { + first_packet_time = time_controller.GetClock()->CurrentTime(); + } else if (second_packet_time.IsInfinite()) { + second_packet_time = time_controller.GetClock()->CurrentTime(); + pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); + } else { + third_packet_time = time_controller.GetClock()->CurrentTime(); + } + }); - pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3)); - time_controller_.AdvanceTime(TimeDelta::Millis(500)); - ASSERT_TRUE(third_packet_time.IsFinite()); - EXPECT_NEAR((second_packet_time - first_packet_time).ms(), 200.0, - 1.0); - EXPECT_NEAR((third_packet_time - second_packet_time).ms(), 100.0, - 1.0); -} + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3)); + time_controller.AdvanceTime(TimeDelta::Millis(500)); + ASSERT_TRUE(third_packet_time.IsFinite()); + EXPECT_NEAR((second_packet_time - first_packet_time).ms(), 200.0, + 1.0); + EXPECT_NEAR((third_packet_time - second_packet_time).ms(), 100.0, + 1.0); + } -TEST_F(TaskQueuePacedSenderTest, SendsAudioImmediately) { - const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); - const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); - const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate; + TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); - pacer_.SetPacingRates(kPacingDataRate, DataRate::Zero()); + const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate; - // Add some initial video packets, only one should be sent. - EXPECT_CALL(packet_router_, SendPacket); - pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); - time_controller_.AdvanceTime(TimeDelta::Zero()); - ::testing::Mock::VerifyAndClearExpectations(&packet_router_); + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); - // Advance time, but still before next packet should be sent. - time_controller_.AdvanceTime(kPacketPacingTime / 2); + // Add some initial video packets, only one should be sent. + EXPECT_CALL(packet_router, SendPacket); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); - // Insert an audio packet, it should be sent immediately. - EXPECT_CALL(packet_router_, SendPacket); - pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); - time_controller_.AdvanceTime(TimeDelta::Zero()); - ::testing::Mock::VerifyAndClearExpectations(&packet_router_); -} + // Advance time, but still before next packet should be sent. + time_controller.AdvanceTime(kPacketPacingTime / 2); + + // Insert an audio packet, it should be sent immediately. + EXPECT_CALL(packet_router, SendPacket); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); + } + + TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow); + + // Set rates so one packet adds one ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + + // Add 10 packets. The first should be sent immediately since the buffers + // are clear. + EXPECT_CALL(packet_router, SendPacket); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); + + // Advance time to 1ms before the coalescing window ends. No packets should + // be sent. + EXPECT_CALL(packet_router, SendPacket).Times(0); + time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); + + // Advance time to where coalescing window ends. All packets that should + // have been sent up til now will be sent. + EXPECT_CALL(packet_router, SendPacket).Times(5); + time_controller.AdvanceTime(TimeDelta::Millis(1)); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); + } + + TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { + const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, + time_controller.GetTaskQueueFactory(), + kCoalescingWindow); + + // Set rates so one packet adds one ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + + pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); + + // Add 10 packets. The first should be sent immediately since the buffers + // are clear. This will also trigger the probe to start. + EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); + pacer.CreateProbeCluster(kPacingDataRate * 2, 17); + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); + time_controller.AdvanceTime(TimeDelta::Zero()); + ::testing::Mock::VerifyAndClearExpectations(&packet_router); + + // Advance time to 1ms before the coalescing window ends. Packets should be + // flying. + EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); + time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); + } } // namespace test } // namespace webrtc