diff --git a/api/peer_connection_interface.h b/api/peer_connection_interface.h index 1037afa081..d89b2262ac 100644 --- a/api/peer_connection_interface.h +++ b/api/peer_connection_interface.h @@ -695,6 +695,9 @@ class RTC_EXPORT PeerConnectionInterface : public rtc::RefCountInterface { PortAllocatorConfig port_allocator_config; + // The burst interval of the pacer, see TaskQueuePacedSender constructor. + absl::optional pacer_burst_interval; + // // Don't forget to update operator== if adding something. // diff --git a/call/call_config.cc b/call/call_config.cc index 23b60ce436..93f6b1aec4 100644 --- a/call/call_config.cc +++ b/call/call_config.cc @@ -31,6 +31,7 @@ RtpTransportConfig CallConfig::ExtractTransportConfig() const { network_state_predictor_factory; transportConfig.task_queue_factory = task_queue_factory; transportConfig.trials = trials; + transportConfig.pacer_burst_interval = pacer_burst_interval; return transportConfig; } diff --git a/call/call_config.h b/call/call_config.h index 3072fa452f..6df4ab7ed4 100644 --- a/call/call_config.h +++ b/call/call_config.h @@ -78,6 +78,9 @@ struct CallConfig { rtp_transport_controller_send_factory = nullptr; Metronome* metronome = nullptr; + + // The burst interval of the pacer, see TaskQueuePacedSender constructor. + absl::optional pacer_burst_interval; }; } // namespace webrtc diff --git a/call/rtp_transport_config.h b/call/rtp_transport_config.h index f2030b3672..6c94f7d911 100644 --- a/call/rtp_transport_config.h +++ b/call/rtp_transport_config.h @@ -44,6 +44,9 @@ struct RtpTransportConfig { // Key-value mapping of internal configurations to apply, // e.g. field trials. const FieldTrialsView* trials = nullptr; + + // The burst interval of the pacer, see TaskQueuePacedSender constructor. + absl::optional pacer_burst_interval; }; } // namespace webrtc diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index 3ecec98b80..f1a7f305ce 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -89,7 +89,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( NetworkControllerFactoryInterface* controller_factory, const BitrateConstraints& bitrate_config, TaskQueueFactory* task_queue_factory, - const FieldTrialsView& trials) + const FieldTrialsView& trials, + absl::optional pacer_burst_interval) : clock_(clock), event_log_(event_log), task_queue_factory_(task_queue_factory), @@ -101,7 +102,8 @@ RtpTransportControllerSend::RtpTransportControllerSend( trials, task_queue_factory, pacer_settings_.holdback_window.Get(), - pacer_settings_.holdback_packets.Get()), + pacer_settings_.holdback_packets.Get(), + pacer_burst_interval), observer_(nullptr), controller_factory_override_(controller_factory), controller_factory_fallback_( diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index 88f5b2bae4..d66f104e86 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -57,7 +57,8 @@ class RtpTransportControllerSend final NetworkControllerFactoryInterface* controller_factory, const BitrateConstraints& bitrate_config, TaskQueueFactory* task_queue_factory, - const FieldTrialsView& trials); + const FieldTrialsView& trials, + absl::optional pacer_burst_interval); ~RtpTransportControllerSend() override; RtpTransportControllerSend(const RtpTransportControllerSend&) = delete; diff --git a/call/rtp_transport_controller_send_factory.h b/call/rtp_transport_controller_send_factory.h index 8cdae8cfbe..592ca91f6b 100644 --- a/call/rtp_transport_controller_send_factory.h +++ b/call/rtp_transport_controller_send_factory.h @@ -28,7 +28,7 @@ class RtpTransportControllerSendFactory return std::make_unique( clock, config.event_log, config.network_state_predictor_factory, config.network_controller_factory, config.bitrate_config, - config.task_queue_factory, *config.trials); + config.task_queue_factory, *config.trials, config.pacer_burst_interval); } virtual ~RtpTransportControllerSendFactory() {} diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 196e29b801..ab3dd3b653 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -135,7 +135,8 @@ class RtpVideoSenderTestFixture { nullptr, bitrate_config_, time_controller_.GetTaskQueueFactory(), - field_trials ? *field_trials : field_trials_), + field_trials ? *field_trials : field_trials_, + absl::nullopt), stats_proxy_(time_controller_.GetClock(), config_, VideoEncoderConfig::ContentType::kRealtimeVideo, diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index a42220b834..8879d19087 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -57,7 +57,8 @@ TaskQueuePacedSender::TaskQueuePacedSender( const FieldTrialsView& field_trials, TaskQueueFactory* task_queue_factory, TimeDelta max_hold_back_window, - int max_hold_back_window_in_packets) + int max_hold_back_window_in_packets, + absl::optional burst_interval) : clock_(clock), bursty_pacer_flags_(field_trials), slacked_pacer_flags_(field_trials), @@ -85,6 +86,12 @@ TaskQueuePacedSender::TaskQueuePacedSender( burst = slacked_burst; } } + // Burst can also be controlled via the `burst_interval` argument. + if (burst_interval.has_value() && + (!burst.has_value() || burst.value() < burst_interval.value())) { + burst = burst_interval; + } + if (burst.has_value()) { pacing_controller_.SetSendBurstInterval(burst.value()); } diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h index 18be6acef0..42aab1121d 100644 --- a/modules/pacing/task_queue_paced_sender.h +++ b/modules/pacing/task_queue_paced_sender.h @@ -39,16 +39,25 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { public: static const int kNoPacketHoldback; + // The pacer can be configured using `field_trials` or specified parameters. + // // The `hold_back_window` parameter sets a lower bound on time to sleep if // there is currently a pacer queue and packets can't immediately be // processed. Increasing this reduces thread wakeups at the expense of higher // latency. - TaskQueuePacedSender(Clock* clock, - PacingController::PacketSender* packet_sender, - const FieldTrialsView& field_trials, - TaskQueueFactory* task_queue_factory, - TimeDelta max_hold_back_window, - int max_hold_back_window_in_packets); + // + // If the `burst_interval` parameter is set, the pacer is allowed to build up + // a packet "debt" that correspond to approximately the send rate during the + // specified interval. This greatly reduced wake ups by not pacing packets + // within the allowed burst budget. + TaskQueuePacedSender( + Clock* clock, + PacingController::PacketSender* packet_sender, + const FieldTrialsView& field_trials, + TaskQueueFactory* task_queue_factory, + TimeDelta max_hold_back_window, + int max_hold_back_window_in_packets, + absl::optional burst_interval = absl::nullopt); ~TaskQueuePacedSender() override; diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index 59790d00dd..69c7b9b7ef 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -253,6 +253,53 @@ TEST_P(TaskQueuePacedSenderTest, PacesPackets) { EXPECT_NEAR((end_time - start_time).ms(), 1000.0, 50.0); } +// Same test as above, but with 0.5s of burst applied. +TEST_P(TaskQueuePacedSenderTest, PacesPacketsWithBurst) { + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + ScopedKeyValueConfig trials(GetParam()); + TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, + time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime, + TaskQueuePacedSender::kNoPacketHoldback, + // Half a second of bursting. + TimeDelta::Seconds(0.5)); + + // Insert a number of packets, covering one second. + static constexpr size_t kPacketsToSend = 42; + SequenceChecker sequence_checker; + pacer.SetPacingRates( + DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), + DataRate::Zero()); + pacer.EnsureStarted(); + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); + + // Expect all of them to be sent. + size_t packets_sent = 0; + Timestamp end_time = Timestamp::PlusInfinity(); + EXPECT_CALL(packet_router, SendPacket) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo& cluster_info) { + ++packets_sent; + if (packets_sent == kPacketsToSend) { + end_time = time_controller.GetClock()->CurrentTime(); + } + EXPECT_EQ(sequence_checker.IsCurrent(), UsingWorkerThread(GetParam())); + }); + + const Timestamp start_time = time_controller.GetClock()->CurrentTime(); + + // Packets should be sent over a period of close to 1s. Expect a little + // lower than this since initial probing is a bit quicker. + time_controller.AdvanceTime(TimeDelta::Seconds(1)); + EXPECT_EQ(packets_sent, kPacketsToSend); + ASSERT_TRUE(end_time.IsFinite()); + // Because of half a second of burst, what would normally have been paced over + // ~1 second now takes ~0.5 seconds. + EXPECT_NEAR((end_time - start_time).ms(), 500.0, 50.0); +} + TEST_P(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); MockPacketRouter packet_router; diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 9e78bac671..d5af81e35f 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -343,6 +343,7 @@ bool PeerConnectionInterface::RTCConfiguration::operator==( webrtc::VpnPreference vpn_preference; std::vector vpn_list; PortAllocatorConfig port_allocator_config; + absl::optional pacer_burst_interval; }; static_assert(sizeof(stuff_being_tested_for_equality) == sizeof(*this), "Did you add something to RTCConfiguration and forget to " @@ -409,7 +410,8 @@ bool PeerConnectionInterface::RTCConfiguration::operator==( vpn_preference == o.vpn_preference && vpn_list == o.vpn_list && port_allocator_config.min_port == o.port_allocator_config.min_port && port_allocator_config.max_port == o.port_allocator_config.max_port && - port_allocator_config.flags == o.port_allocator_config.flags; + port_allocator_config.flags == o.port_allocator_config.flags && + pacer_burst_interval == o.pacer_burst_interval; } bool PeerConnectionInterface::RTCConfiguration::operator!=( diff --git a/pc/peer_connection_factory.cc b/pc/peer_connection_factory.cc index 1e1c8185e0..27efe73e0b 100644 --- a/pc/peer_connection_factory.cc +++ b/pc/peer_connection_factory.cc @@ -242,9 +242,10 @@ PeerConnectionFactory::CreatePeerConnectionOrError( const FieldTrialsView* trials = dependencies.trials ? dependencies.trials.get() : &field_trials(); - std::unique_ptr call = - worker_thread()->BlockingCall([this, &event_log, trials] { - return CreateCall_w(event_log.get(), *trials); + std::unique_ptr call = worker_thread()->BlockingCall( + [this, &event_log, trials, + pacer_burst_interval = configuration.pacer_burst_interval] { + return CreateCall_w(event_log.get(), *trials, pacer_burst_interval); }); auto result = PeerConnection::Create(context_, options_, std::move(event_log), @@ -303,7 +304,8 @@ std::unique_ptr PeerConnectionFactory::CreateRtcEventLog_w() { std::unique_ptr PeerConnectionFactory::CreateCall_w( RtcEventLog* event_log, - const FieldTrialsView& field_trials) { + const FieldTrialsView& field_trials, + absl::optional pacer_burst_interval) { RTC_DCHECK_RUN_ON(worker_thread()); webrtc::Call::Config call_config(event_log, network_thread()); @@ -346,6 +348,7 @@ std::unique_ptr PeerConnectionFactory::CreateCall_w( call_config.rtp_transport_controller_send_factory = transport_controller_send_factory_.get(); call_config.metronome = metronome_.get(); + call_config.pacer_burst_interval = pacer_burst_interval; return std::unique_ptr( context_->call_factory()->CreateCall(call_config)); } diff --git a/pc/peer_connection_factory.h b/pc/peer_connection_factory.h index 2851954a2f..9e96a2bb8c 100644 --- a/pc/peer_connection_factory.h +++ b/pc/peer_connection_factory.h @@ -136,8 +136,10 @@ class PeerConnectionFactory : public PeerConnectionFactoryInterface { bool IsTrialEnabled(absl::string_view key) const; std::unique_ptr CreateRtcEventLog_w(); - std::unique_ptr CreateCall_w(RtcEventLog* event_log, - const FieldTrialsView& field_trials); + std::unique_ptr CreateCall_w( + RtcEventLog* event_log, + const FieldTrialsView& field_trials, + absl::optional pacer_burst_interval); rtc::scoped_refptr context_; PeerConnectionFactoryInterface::Options options_