Remove legacy PacedSender.

The new TaskQueuePacedSender has been default-on in code since M97, and
there are no further usages of it that I can find. Let's clean this up!

The PacingController and associated tests will be cleaned up in a
follow-up cl.

Bug: webrtc:10809
Change-Id: I0cb888602939add953415977ee79ff0b3878fea5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/258025
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36890}
This commit is contained in:
Erik Språng 2022-05-13 15:55:29 +02:00 committed by WebRTC LUCI CQ
parent 6a979149e4
commit f3f3a61167
15 changed files with 48 additions and 614 deletions

View File

@ -47,8 +47,7 @@ std::unique_ptr<CallFactoryInterface> CreateTimeControllerBasedCallFactory(
return Call::Create(config, time_controller_->GetClock(), module_thread_,
config.rtp_transport_controller_send_factory->Create(
transportConfig, time_controller_->GetClock(),
time_controller_->CreateProcessThread("Pacer")));
transportConfig, time_controller_->GetClock()));
}
private:

View File

@ -522,8 +522,7 @@ Call* Call::Create(const Call::Config& config,
return new internal::Call(
clock, config,
transport_controller_factory_.Create(transportConfig, clock,
std::move(pacer_thread)),
transport_controller_factory_.Create(transportConfig, clock),
std::move(call_thread), config.task_queue_factory);
}

View File

@ -163,8 +163,7 @@ Call* CallFactory::CreateCall(const Call::Config& config) {
SharedModuleThread::Create(
ProcessThread::Create("ModuleProcessThread"), nullptr),
config.rtp_transport_controller_send_factory->Create(
transportConfig, Clock::GetRealTimeClock(),
ProcessThread::Create("PacerThread")))),
transportConfig, Clock::GetRealTimeClock()))),
send_degradation_configs, receive_degradation_configs);
}
@ -178,8 +177,7 @@ Call* CallFactory::CreateCall(const Call::Config& config) {
return Call::Create(config, Clock::GetRealTimeClock(), module_thread_,
config.rtp_transport_controller_send_factory->Create(
transportConfig, Clock::GetRealTimeClock(),
ProcessThread::Create("PacerThread")));
transportConfig, Clock::GetRealTimeClock()));
}
std::unique_ptr<CallFactoryInterface> CreateCallFactory() {

View File

@ -75,10 +75,9 @@ bool IsRelayed(const rtc::NetworkRoute& route) {
RtpTransportControllerSend::PacerSettings::PacerSettings(
const FieldTrialsView& trials)
: tq_disabled("Disabled"),
holdback_window("holdback_window", TimeDelta::Millis(5)),
: holdback_window("holdback_window", TimeDelta::Millis(5)),
holdback_packets("holdback_packets", 3) {
ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets},
ParseFieldTrial({&holdback_window, &holdback_packets},
trials.Lookup("WebRTC-TaskQueuePacer"));
}
@ -88,30 +87,19 @@ RtpTransportControllerSend::RtpTransportControllerSend(
NetworkStatePredictorFactoryInterface* predictor_factory,
NetworkControllerFactoryInterface* controller_factory,
const BitrateConstraints& bitrate_config,
std::unique_ptr<ProcessThread> process_thread,
TaskQueueFactory* task_queue_factory,
const FieldTrialsView& trials)
: clock_(clock),
event_log_(event_log),
bitrate_configurator_(bitrate_config),
pacer_started_(false),
process_thread_(std::move(process_thread)),
pacer_settings_(trials),
process_thread_pacer_(pacer_settings_.use_task_queue_pacer()
? nullptr
: new PacedSender(clock,
&packet_router_,
trials,
process_thread_.get())),
task_queue_pacer_(
pacer_settings_.use_task_queue_pacer()
? new TaskQueuePacedSender(clock,
pacer_(clock,
&packet_router_,
trials,
task_queue_factory,
pacer_settings_.holdback_window.Get(),
pacer_settings_.holdback_packets.Get())
: nullptr),
pacer_settings_.holdback_packets.Get()),
observer_(nullptr),
controller_factory_override_(controller_factory),
controller_factory_fallback_(
@ -141,8 +129,8 @@ RtpTransportControllerSend::RtpTransportControllerSend(
initial_config_.key_value_config = &trials;
RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
pacer()->SetPacingRates(
DataRate::BitsPerSec(bitrate_config.start_bitrate_bps), DataRate::Zero());
pacer_.SetPacingRates(DataRate::BitsPerSec(bitrate_config.start_bitrate_bps),
DataRate::Zero());
if (absl::StartsWith(trials.Lookup("WebRTC-LazyPacerStart"), "Disabled")) {
EnsureStarted();
@ -151,7 +139,6 @@ RtpTransportControllerSend::RtpTransportControllerSend(
RtpTransportControllerSend::~RtpTransportControllerSend() {
RTC_DCHECK(video_rtp_senders_.empty());
process_thread_->Stop();
}
RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
@ -207,24 +194,10 @@ void RtpTransportControllerSend::UpdateCongestedState() {
congestion_window_size_;
if (congested != is_congested_) {
is_congested_ = congested;
pacer()->SetCongested(congested);
pacer_.SetCongested(congested);
}
}
RtpPacketPacer* RtpTransportControllerSend::pacer() {
if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
}
const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
return &task_queue_;
}
@ -244,10 +217,7 @@ RtpTransportControllerSend::transport_feedback_observer() {
}
RtpPacketSender* RtpTransportControllerSend::packet_sender() {
if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
return &pacer_;
}
void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
@ -264,7 +234,7 @@ void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
UpdateStreamsConfig();
}
void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
pacer()->SetQueueTimeLimit(TimeDelta::Millis(limit_ms));
pacer_.SetQueueTimeLimit(TimeDelta::Millis(limit_ms));
}
StreamFeedbackProvider*
RtpTransportControllerSend::GetStreamFeedbackProvider() {
@ -371,7 +341,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
UpdateInitialConstraints(msg.constraints);
}
is_congested_ = false;
pacer()->SetCongested(false);
pacer_.SetCongested(false);
});
}
}
@ -388,12 +358,12 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
return;
network_available_ = msg.network_available;
if (network_available_) {
pacer()->Resume();
pacer_.Resume();
} else {
pacer()->Pause();
pacer_.Pause();
}
is_congested_ = false;
pacer()->SetCongested(false);
pacer_.SetCongested(false);
if (controller_) {
control_handler_->SetNetworkAvailability(network_available_);
@ -412,11 +382,11 @@ RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() {
return this;
}
int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const {
return pacer()->OldestPacketWaitTime().ms();
return pacer_.OldestPacketWaitTime().ms();
}
absl::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime()
const {
return pacer()->FirstSentPacketTime();
return pacer_.FirstSentPacketTime();
}
void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
task_queue_.PostTask([this, enable]() {
@ -505,7 +475,7 @@ void RtpTransportControllerSend::OnTransportOverheadChanged(
return;
}
pacer()->SetTransportOverhead(
pacer_.SetTransportOverhead(
DataSize::Bytes(transport_overhead_bytes_per_packet));
// TODO(holmer): Call AudioRtpSenders when they have been moved to
@ -518,21 +488,17 @@ void RtpTransportControllerSend::OnTransportOverheadChanged(
void RtpTransportControllerSend::AccountForAudioPacketsInPacedSender(
bool account_for_audio) {
pacer()->SetAccountForAudioPackets(account_for_audio);
pacer_.SetAccountForAudioPackets(account_for_audio);
}
void RtpTransportControllerSend::IncludeOverheadInPacedSender() {
pacer()->SetIncludeOverhead();
pacer_.SetIncludeOverhead();
}
void RtpTransportControllerSend::EnsureStarted() {
if (!pacer_started_) {
pacer_started_ = true;
if (pacer_settings_.use_task_queue_pacer()) {
task_queue_pacer_->EnsureStarted();
} else {
process_thread_->Start();
}
pacer_.EnsureStarted();
}
}
@ -652,7 +618,7 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time = pacer()->ExpectedQueueTime();
TimeDelta expected_queue_time = pacer_.ExpectedQueueTime();
control_handler_->SetPacerQueue(expected_queue_time);
UpdateControlState();
return kPacerQueueUpdateInterval;
@ -674,7 +640,7 @@ void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
ProcessInterval msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
if (add_pacing_to_cwin_)
msg.pacer_queue = pacer()->QueueSizeData();
msg.pacer_queue = pacer_.QueueSizeData();
PostUpdates(controller_->OnProcessInterval(msg));
}
@ -690,11 +656,11 @@ void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
UpdateCongestedState();
}
if (update.pacer_config) {
pacer()->SetPacingRates(update.pacer_config->data_rate(),
pacer_.SetPacingRates(update.pacer_config->data_rate(),
update.pacer_config->pad_rate());
}
for (const auto& probe : update.probe_cluster_configs) {
pacer()->CreateProbeCluster(probe.target_data_rate, probe.id);
pacer_.CreateProbeCluster(probe.target_data_rate, probe.id);
}
if (update.target_rate) {
control_handler_->SetTargetRate(*update.target_rate);

View File

@ -27,11 +27,9 @@
#include "modules/congestion_controller/rtp/control_handler.h"
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
#include "modules/pacing/paced_sender.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/pacing/task_queue_paced_sender.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/network_route.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/task_queue.h"
@ -57,7 +55,6 @@ class RtpTransportControllerSend final
NetworkStatePredictorFactoryInterface* predictor_factory,
NetworkControllerFactoryInterface* controller_factory,
const BitrateConstraints& bitrate_config,
std::unique_ptr<ProcessThread> process_thread,
TaskQueueFactory* task_queue_factory,
const FieldTrialsView& trials);
~RtpTransportControllerSend() override;
@ -134,9 +131,6 @@ class RtpTransportControllerSend final
struct PacerSettings {
explicit PacerSettings(const FieldTrialsView& trials);
bool use_task_queue_pacer() const { return !tq_disabled.Get(); }
FieldTrialFlag tq_disabled; // Kill-switch not normally used.
FieldTrialParameter<TimeDelta> holdback_window;
FieldTrialParameter<int> holdback_packets;
};
@ -159,8 +153,6 @@ class RtpTransportControllerSend final
void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_);
void UpdateControlState() RTC_RUN_ON(task_queue_);
void UpdateCongestedState() RTC_RUN_ON(task_queue_);
RtpPacketPacer* pacer();
const RtpPacketPacer* pacer() const;
Clock* const clock_;
RtcEventLog* const event_log_;
@ -171,10 +163,8 @@ class RtpTransportControllerSend final
RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_;
bool pacer_started_;
const std::unique_ptr<ProcessThread> process_thread_;
const PacerSettings pacer_settings_;
std::unique_ptr<PacedSender> process_thread_pacer_;
std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
TaskQueuePacedSender pacer_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
TransportFeedbackDemuxer feedback_demuxer_;

View File

@ -23,13 +23,12 @@ class RtpTransportControllerSendFactory
public:
std::unique_ptr<RtpTransportControllerSendInterface> Create(
const RtpTransportConfig& config,
Clock* clock,
std::unique_ptr<ProcessThread> process_thread) override {
Clock* clock) override {
RTC_CHECK(config.trials);
return std::make_unique<RtpTransportControllerSend>(
clock, config.event_log, config.network_state_predictor_factory,
config.network_controller_factory, config.bitrate_config,
std::move(process_thread), config.task_queue_factory, *config.trials);
config.task_queue_factory, *config.trials);
}
virtual ~RtpTransportControllerSendFactory() {}

View File

@ -23,8 +23,7 @@ class RtpTransportControllerSendFactoryInterface {
public:
virtual std::unique_ptr<RtpTransportControllerSendInterface> Create(
const RtpTransportConfig& config,
Clock* clock,
std::unique_ptr<ProcessThread> process_thread) = 0;
Clock* clock) = 0;
virtual ~RtpTransportControllerSendFactoryInterface() {}
};

View File

@ -127,13 +127,11 @@ class RtpVideoSenderTestFixture {
payload_type)),
send_delay_stats_(time_controller_.GetClock()),
bitrate_config_(GetBitrateConfig()),
transport_controller_(
time_controller_.GetClock(),
transport_controller_(time_controller_.GetClock(),
&event_log_,
nullptr,
nullptr,
bitrate_config_,
time_controller_.CreateProcessThread("PacerThread"),
time_controller_.GetTaskQueueFactory(),
field_trials ? *field_trials : field_trials_),
stats_proxy_(time_controller_.GetClock(),

View File

@ -14,6 +14,7 @@
#include <vector>
#include "api/units/data_rate.h"
#include "modules/pacing/pacing_controller.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/numerics/safe_minmax.h"
@ -62,7 +63,8 @@ absl::optional<TargetTransferRate> CongestionControlHandler::GetUpdate() {
if (!network_available_) {
pause_encoding = true;
} else if (!disable_pacer_emergency_stop_ &&
pacer_expected_queue_ms_ > PacedSender::kMaxQueueLengthMs) {
pacer_expected_queue_ms_ >
PacingController::kMaxExpectedQueueLength.ms()) {
pause_encoding = true;
}
if (pause_encoding)

View File

@ -18,7 +18,6 @@
#include "api/transport/network_types.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "modules/pacing/paced_sender.h"
#include "rtc_base/system/no_unique_address.h"
namespace webrtc {

View File

@ -17,8 +17,6 @@ rtc_library("pacing") {
sources = [
"bitrate_prober.cc",
"bitrate_prober.h",
"paced_sender.cc",
"paced_sender.h",
"pacing_controller.cc",
"pacing_controller.h",
"packet_router.cc",
@ -92,7 +90,6 @@ if (rtc_include_tests) {
sources = [
"bitrate_prober_unittest.cc",
"interval_budget_unittest.cc",
"paced_sender_unittest.cc",
"pacing_controller_unittest.cc",
"packet_router_unittest.cc",
"prioritized_packet_queue_unittest.cc",

View File

@ -1,201 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/paced_sender.h"
#include <algorithm>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
PacedSender::PacedSender(Clock* clock,
PacketRouter* packet_router,
const FieldTrialsView& field_trials,
ProcessThread* process_thread)
: process_mode_(
absl::StartsWith(field_trials.Lookup("WebRTC-Pacer-DynamicProcess"),
"Enabled")
? PacingController::ProcessMode::kDynamic
: PacingController::ProcessMode::kPeriodic),
pacing_controller_(clock, packet_router, field_trials, process_mode_),
clock_(clock),
process_thread_(process_thread) {
if (process_thread_)
process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
}
PacedSender::~PacedSender() {
if (process_thread_) {
process_thread_->DeRegisterModule(&module_proxy_);
}
}
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
MutexLock lock(&mutex_);
return pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
}
void PacedSender::Pause() {
{
MutexLock lock(&mutex_);
pacing_controller_.Pause();
}
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new (longer) estimate for when to call Process().
if (process_thread_) {
process_thread_->WakeUp(&module_proxy_);
}
}
void PacedSender::Resume() {
{
MutexLock lock(&mutex_);
pacing_controller_.Resume();
}
// Tell the process thread to call our TimeUntilNextProcess() method to
// refresh the estimate for when to call Process().
if (process_thread_) {
process_thread_->WakeUp(&module_proxy_);
}
}
void PacedSender::SetCongested(bool congested) {
{
MutexLock lock(&mutex_);
pacing_controller_.SetCongested(congested);
}
MaybeWakupProcessThread();
}
void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
{
MutexLock lock(&mutex_);
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
}
MaybeWakupProcessThread();
}
void PacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
{
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"PacedSender::EnqueuePackets");
MutexLock lock(&mutex_);
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"PacedSender::EnqueuePackets::Loop", "sequence_number",
packet->SequenceNumber(), "rtp_timestamp",
packet->Timestamp());
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
pacing_controller_.EnqueuePacket(std::move(packet));
}
}
MaybeWakupProcessThread();
}
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
MutexLock lock(&mutex_);
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
}
void PacedSender::SetIncludeOverhead() {
MutexLock lock(&mutex_);
pacing_controller_.SetIncludeOverhead();
}
void PacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
MutexLock lock(&mutex_);
pacing_controller_.SetTransportOverhead(overhead_per_packet);
}
TimeDelta PacedSender::ExpectedQueueTime() const {
MutexLock lock(&mutex_);
return pacing_controller_.ExpectedQueueTime();
}
DataSize PacedSender::QueueSizeData() const {
MutexLock lock(&mutex_);
return pacing_controller_.QueueSizeData();
}
absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
MutexLock lock(&mutex_);
return pacing_controller_.FirstSentPacketTime();
}
TimeDelta PacedSender::OldestPacketWaitTime() const {
MutexLock lock(&mutex_);
Timestamp oldest_packet = pacing_controller_.OldestPacketEnqueueTime();
if (oldest_packet.IsInfinite())
return TimeDelta::Zero();
// (webrtc:9716): The clock is not always monotonic.
Timestamp current = clock_->CurrentTime();
if (current < oldest_packet)
return TimeDelta::Zero();
return current - oldest_packet;
}
int64_t PacedSender::TimeUntilNextProcess() {
MutexLock lock(&mutex_);
Timestamp next_send_time = pacing_controller_.NextSendTime();
TimeDelta sleep_time =
std::max(TimeDelta::Zero(), next_send_time - clock_->CurrentTime());
if (process_mode_ == PacingController::ProcessMode::kDynamic) {
return std::max(sleep_time, PacingController::kMinSleepTime).ms();
}
return sleep_time.ms();
}
void PacedSender::Process() {
MutexLock lock(&mutex_);
pacing_controller_.ProcessPackets();
}
void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << process_thread;
RTC_DCHECK(!process_thread || process_thread == process_thread_);
}
void PacedSender::MaybeWakupProcessThread() {
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new time for when to call Process().
if (process_thread_ &&
process_mode_ == PacingController::ProcessMode::kDynamic) {
process_thread_->WakeUp(&module_proxy_);
}
}
void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
{
MutexLock lock(&mutex_);
pacing_controller_.SetQueueTimeLimit(limit);
}
MaybeWakupProcessThread();
}
} // namespace webrtc

View File

@ -1,150 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PACED_SENDER_H_
#define MODULES_PACING_PACED_SENDER_H_
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <memory>
#include <vector>
#include "absl/types/optional.h"
#include "api/field_trials_view.h"
#include "api/function_view.h"
#include "api/transport/field_trial_based_config.h"
#include "api/transport/network_types.h"
#include "modules/include/module.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/pacing_controller.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/include/rtp_packet_sender.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
class Clock;
class PacedSender : public RtpPacketPacer, public RtpPacketSender {
public:
// Expected max pacer delay in ms. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
// UpdateBitrate() so that this limit will be upheld.
static const int64_t kMaxQueueLengthMs;
// Pacing-rate relative to our target send rate.
// Multiplicative factor that is applied to the target bitrate to calculate
// the number of bytes that can be transmitted per interval.
// Increasing this factor will result in lower delays in cases of bitrate
// overshoots from the encoder.
static const float kDefaultPaceMultiplier;
// TODO(bugs.webrtc.org/10937): Make the `process_thread` argument be non
// optional once all callers have been updated.
PacedSender(Clock* clock,
PacketRouter* packet_router,
const FieldTrialsView& field_trials,
ProcessThread* process_thread = nullptr);
~PacedSender() override;
// Methods implementing RtpPacketSender.
// Adds the packet to the queue and calls PacketRouter::SendPacket() when
// it's time to send.
void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packet) override;
// Methods implementing RtpPacketPacer:
void CreateProbeCluster(DataRate bitrate, int cluster_id) override;
// Temporarily pause all sending.
void Pause() override;
// Resume sending packets.
void Resume() override;
void SetCongested(bool congested) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected
// at high priority.
void SetAccountForAudioPackets(bool account_for_audio) override;
void SetIncludeOverhead() override;
void SetTransportOverhead(DataSize overhead_per_packet) override;
// Returns the time since the oldest queued packet was enqueued.
TimeDelta OldestPacketWaitTime() const override;
DataSize QueueSizeData() const override;
// Returns the time when the first packet was sent;
absl::optional<Timestamp> FirstSentPacketTime() const override;
// Returns the number of milliseconds it will take to send the current
// packets in the queue, given the current size and bitrate, ignoring prio.
TimeDelta ExpectedQueueTime() const override;
void SetQueueTimeLimit(TimeDelta limit) override;
// Below are methods specific to this implementation, such as things related
// to module processing thread specifics or methods exposed for test.
private:
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
int64_t TimeUntilNextProcess();
// Called when the prober is associated with a process thread.
void ProcessThreadAttached(ProcessThread* process_thread);
// Process any pending packets in the queue(s).
void Process();
// In dynamic process mode, refreshes the next process time.
void MaybeWakupProcessThread();
// Private implementation of Module to not expose those implementation details
// publicly and control when the class is registered/deregistered.
class ModuleProxy : public Module {
public:
explicit ModuleProxy(PacedSender* delegate) : delegate_(delegate) {}
private:
int64_t TimeUntilNextProcess() override {
return delegate_->TimeUntilNextProcess();
}
void Process() override { return delegate_->Process(); }
void ProcessThreadAttached(ProcessThread* process_thread) override {
return delegate_->ProcessThreadAttached(process_thread);
}
PacedSender* const delegate_;
} module_proxy_{this};
mutable Mutex mutex_;
const PacingController::ProcessMode process_mode_;
PacingController pacing_controller_ RTC_GUARDED_BY(mutex_);
Clock* const clock_;
ProcessThread* const process_thread_;
};
} // namespace webrtc
#endif // MODULES_PACING_PACED_SENDER_H_

View File

@ -1,160 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/paced_sender.h"
#include <list>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "modules/pacing/packet_router.h"
#include "modules/utility/include/mock/mock_process_thread.h"
#include "system_wrappers/include/clock.h"
#include "test/gmock.h"
#include "test/gtest.h"
using ::testing::_;
using ::testing::Return;
using ::testing::SaveArg;
namespace webrtc {
namespace {
constexpr uint32_t kAudioSsrc = 12345;
constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 234;
// Mock callback implementing the raw api.
class MockCallback : public PacketRouter {
public:
MOCK_METHOD(void,
SendPacket,
(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info),
(override));
MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
GeneratePadding,
(DataSize target_size),
(override));
};
class ProcessModeTrials : public FieldTrialsView {
public:
explicit ProcessModeTrials(bool dynamic_process) : mode_(dynamic_process) {}
std::string Lookup(absl::string_view key) const override {
if (key == "WebRTC-Pacer-DynamicProcess") {
return mode_ ? "Enabled" : "Disabled";
}
return "";
}
private:
const bool mode_;
};
} // namespace
namespace test {
class PacedSenderTest
: public ::testing::TestWithParam<PacingController::ProcessMode> {
public:
PacedSenderTest()
: clock_(0),
paced_module_(nullptr),
trials_(GetParam() == PacingController::ProcessMode::kDynamic) {}
void SetUp() override {
EXPECT_CALL(process_thread_, RegisterModule)
.WillOnce(SaveArg<0>(&paced_module_));
pacer_ = std::make_unique<PacedSender>(&clock_, &callback_, trials_,
&process_thread_);
EXPECT_CALL(process_thread_, WakeUp).WillRepeatedly([&](Module* module) {
clock_.AdvanceTimeMilliseconds(module->TimeUntilNextProcess());
});
EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1);
}
protected:
std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
auto packet = std::make_unique<RtpPacketToSend>(nullptr);
packet->set_packet_type(type);
switch (type) {
case RtpPacketMediaType::kAudio:
packet->SetSsrc(kAudioSsrc);
break;
case RtpPacketMediaType::kVideo:
packet->SetSsrc(kVideoSsrc);
break;
case RtpPacketMediaType::kRetransmission:
case RtpPacketMediaType::kPadding:
packet->SetSsrc(kVideoRtxSsrc);
break;
case RtpPacketMediaType::kForwardErrorCorrection:
packet->SetSsrc(kFlexFecSsrc);
break;
}
packet->SetPayloadSize(kDefaultPacketSize);
return packet;
}
SimulatedClock clock_;
MockCallback callback_;
MockProcessThread process_thread_;
Module* paced_module_;
ProcessModeTrials trials_;
std::unique_ptr<PacedSender> pacer_;
};
TEST_P(PacedSenderTest, PacesPackets) {
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
pacer_->SetPacingRates(
DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
DataRate::Zero());
std::vector<std::unique_ptr<RtpPacketToSend>> packets;
for (size_t i = 0; i < kPacketsToSend; ++i) {
packets.emplace_back(BuildRtpPacket(RtpPacketMediaType::kVideo));
}
pacer_->EnqueuePackets(std::move(packets));
// Expect all of them to be sent.
size_t packets_sent = 0;
EXPECT_CALL(callback_, SendPacket)
.WillRepeatedly(
[&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) { ++packets_sent; });
const Timestamp start_time = clock_.CurrentTime();
while (packets_sent < kPacketsToSend) {
clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
paced_module_->Process();
}
// Packets should be sent over a period of close to 1s. Expect a little lower
// than this since initial probing is a bit quicker.
TimeDelta duration = clock_.CurrentTime() - start_time;
EXPECT_GT(duration, TimeDelta::Millis(900));
}
INSTANTIATE_TEST_SUITE_P(
WithAndWithoutDynamicProcess,
PacedSenderTest,
::testing::Values(PacingController::ProcessMode::kPeriodic,
PacingController::ProcessMode::kDynamic));
} // namespace test
} // namespace webrtc

View File

@ -24,7 +24,7 @@
#include "api/video_codecs/video_codec.h"
#include "call/rtp_transport_controller_send_interface.h"
#include "call/video_send_stream.h"
#include "modules/pacing/paced_sender.h"
#include "modules/pacing/pacing_controller.h"
#include "rtc_base/atomic_ops.h"
#include "rtc_base/checks.h"
#include "rtc_base/experiments/alr_experiment.h"
@ -194,8 +194,7 @@ uint32_t GetInitialEncoderMaxBitrate(int initial_encoder_max_bitrate) {
PacingConfig::PacingConfig(const FieldTrialsView& field_trials)
: pacing_factor("factor", kStrictPacingMultiplier),
max_pacing_delay("max_delay",
TimeDelta::Millis(PacedSender::kMaxQueueLengthMs)) {
max_pacing_delay("max_delay", PacingController::kMaxExpectedQueueLength) {
ParseFieldTrial({&pacing_factor, &max_pacing_delay},
field_trials.Lookup("WebRTC-Video-Pacing"));
}