Add RtpPacketPacer interface for pacer control

The PacedSender is being reworked and will need an interface so we can
inject different implementations of it.

This CL introduces a new RtpPacketPacer interface inside the pacing
module. This interface handles the details of _how_ packets should be
paced, such as pacing rates/account for audio/max queue length etc.

The RtpPacketSender interface exposed from the rtp_rtcp module handles
only the actual sending of packets.

Some minor cleanups are included here.

Bug: webrtc:10809
Change-Id: I150b1a6262306d99e3f9d5f0b4afdb16a50e5ad8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/145212
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28699}
This commit is contained in:
Erik Språng 2019-07-29 16:38:27 +02:00 committed by Commit Bot
parent 96ea8c00e7
commit 425d6aaa4c
12 changed files with 326 additions and 182 deletions

View File

@ -77,6 +77,7 @@ rtc_source_set("rtp_interfaces") {
"../api:libjingle_peerconnection_api",
"../api:rtp_headers",
"../api/transport:bitrate_settings",
"../api/units:timestamp",
"../logging:rtc_event_log_api",
"../modules/rtp_rtcp:rtp_rtcp_format",
"../rtc_base:rtc_base_approved",

View File

@ -256,7 +256,7 @@ class Call final : public webrtc::Call,
MediaType media_type)
RTC_SHARED_LOCKS_REQUIRED(receive_crit_);
void UpdateSendHistograms(int64_t first_sent_packet_ms)
void UpdateSendHistograms(Timestamp first_sent_packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_);
void UpdateReceiveHistograms();
void UpdateHistograms();
@ -503,8 +503,8 @@ Call::~Call() {
call_stats_->DeregisterStatsObserver(&receive_side_cc_);
}
absl::optional<int64_t> first_sent_packet_ms =
transport_send_->GetFirstPacketTimeMs();
absl::optional<Timestamp> first_sent_packet_ms =
transport_send_->GetFirstPacketTime();
// Only update histograms after process threads have been shut down, so that
// they won't try to concurrently update stats.
if (first_sent_packet_ms) {
@ -619,9 +619,9 @@ void Call::UpdateHistograms() {
(clock_->TimeInMilliseconds() - start_ms_) / 1000);
}
void Call::UpdateSendHistograms(int64_t first_sent_packet_ms) {
void Call::UpdateSendHistograms(Timestamp first_sent_packet) {
int64_t elapsed_sec =
(clock_->TimeInMilliseconds() - first_sent_packet_ms) / 1000;
(clock_->TimeInMilliseconds() - first_sent_packet.ms()) / 1000;
if (elapsed_sec < metrics::kMinRunTimeInSeconds)
return;
const int kMinRequiredPeriodicSamples = 5;

View File

@ -93,7 +93,8 @@ RtpTransportControllerSend::RtpTransportControllerSend(
initial_config_.key_value_config = &trial_based_config_;
RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
pacer_.SetPacingRates(bitrate_config.start_bitrate_bps, 0);
pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps),
DataRate::Zero());
process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
process_thread_->Start();
@ -149,6 +150,18 @@ void RtpTransportControllerSend::UpdateControlState() {
observer_->OnTargetTransferRate(*update);
}
RtpPacketPacer* RtpTransportControllerSend::pacer() {
// TODO(bugs.webrtc.org/10809): Return reference to the correct
// pacer implementation.
return &pacer_;
}
const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
// TODO(bugs.webrtc.org/10809): Return reference to the correct
// pacer implementation.
return &pacer_;
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
return &task_queue_;
}
@ -168,6 +181,8 @@ RtpTransportControllerSend::transport_feedback_observer() {
}
RtpPacketSender* RtpTransportControllerSend::packet_sender() {
// TODO(bugs.webrtc.org/10809): Return reference to the correct
// pacer implementation.
return &pacer_;
}
@ -189,7 +204,7 @@ void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
UpdateStreamsConfig();
}
void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
pacer_.SetQueueTimeLimit(limit_ms);
pacer()->SetQueueTimeLimit(TimeDelta::ms(limit_ms));
}
void RtpTransportControllerSend::RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
@ -266,7 +281,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
} else {
UpdateInitialConstraints(msg.constraints);
}
pacer_.UpdateOutstandingData(0);
pacer()->UpdateOutstandingData(DataSize::Zero());
});
}
}
@ -282,11 +297,11 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
return;
network_available_ = msg.network_available;
if (network_available_) {
pacer_.Resume();
pacer()->Resume();
} else {
pacer_.Pause();
pacer()->Pause();
}
pacer_.UpdateOutstandingData(0);
pacer()->UpdateOutstandingData(DataSize::Zero());
if (controller_) {
control_handler_->SetNetworkAvailability(network_available_);
@ -305,10 +320,11 @@ RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() {
return this;
}
int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const {
return pacer_.QueueInMs();
return pacer()->OldestPacketWaitTime().ms();
}
int64_t RtpTransportControllerSend::GetFirstPacketTimeMs() const {
return pacer_.FirstSentPacketTimeMs();
absl::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime()
const {
return pacer()->FirstSentPacketTime();
}
void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
task_queue_.PostTask([this, enable]() {
@ -328,8 +344,8 @@ void RtpTransportControllerSend::OnSentPacket(
PostUpdates(controller_->OnSentPacket(*packet_msg));
});
}
pacer_.UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData().bytes());
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
}
void RtpTransportControllerSend::OnReceivedPacket(
@ -400,7 +416,7 @@ void RtpTransportControllerSend::OnTransportOverheadChanged(
void RtpTransportControllerSend::AccountForAudioPacketsInPacedSender(
bool account_for_audio) {
pacer_.SetAccountForAudioPackets(account_for_audio);
pacer()->SetAccountForAudioPackets(account_for_audio);
}
void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
@ -457,8 +473,8 @@ void RtpTransportControllerSend::OnTransportFeedback(
PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
});
}
pacer_.UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData().bytes());
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
}
void RtpTransportControllerSend::OnRemoteNetworkEstimate(
@ -510,8 +526,7 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time =
TimeDelta::ms(pacer_.ExpectedQueueTimeMs());
TimeDelta expected_queue_time = pacer()->ExpectedQueueTime();
control_handler_->SetPacerQueue(expected_queue_time);
UpdateControlState();
return kPacerQueueUpdateInterval;
@ -533,7 +548,7 @@ void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
ProcessInterval msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
if (add_pacing_to_cwin_)
msg.pacer_queue = DataSize::bytes(pacer_.QueueSizeBytes());
msg.pacer_queue = pacer()->QueueSizeData();
PostUpdates(controller_->OnProcessInterval(msg));
}
@ -545,18 +560,14 @@ void RtpTransportControllerSend::UpdateStreamsConfig() {
void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
if (update.congestion_window) {
if (update.congestion_window->IsFinite())
pacer_.SetCongestionWindow(update.congestion_window->bytes());
else
pacer_.SetCongestionWindow(PacedSender::kNoCongestionWindow);
pacer()->SetCongestionWindow(*update.congestion_window);
}
if (update.pacer_config) {
pacer_.SetPacingRates(update.pacer_config->data_rate().bps(),
update.pacer_config->pad_rate().bps());
pacer()->SetPacingRates(update.pacer_config->data_rate(),
update.pacer_config->pad_rate());
}
for (const auto& probe : update.probe_cluster_configs) {
int64_t bitrate_bps = probe.target_data_rate.bps();
pacer_.CreateProbeCluster(bitrate_bps, probe.id);
pacer()->CreateProbeCluster(probe.target_data_rate, probe.id);
}
if (update.target_rate) {
control_handler_->SetTargetRate(*update.target_rate);

View File

@ -25,6 +25,7 @@
#include "modules/congestion_controller/rtp/control_handler.h"
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/network_route.h"
@ -95,7 +96,7 @@ class RtpTransportControllerSend final
void OnNetworkAvailability(bool network_available) override;
RtcpBandwidthObserver* GetBandwidthObserver() override;
int64_t GetPacerQueuingDelayMs() const override;
int64_t GetFirstPacketTimeMs() const override;
absl::optional<Timestamp> GetFirstPacketTime() const override;
void EnablePeriodicAlrProbing(bool enable) override;
void OnSentPacket(const rtc::SentPacket& sent_packet) override;
void OnReceivedPacket(const ReceivedPacket& packet_msg) override;
@ -135,6 +136,8 @@ class RtpTransportControllerSend final
RTC_RUN_ON(task_queue_);
void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_);
void UpdateControlState() RTC_RUN_ON(task_queue_);
RtpPacketPacer* pacer();
const RtpPacketPacer* pacer() const;
Clock* const clock_;
RtcEventLog* const event_log_;

View File

@ -23,6 +23,7 @@
#include "api/crypto/crypto_options.h"
#include "api/fec_controller.h"
#include "api/transport/bitrate_settings.h"
#include "api/units/timestamp.h"
#include "call/rtp_config.h"
#include "logging/rtc_event_log/rtc_event_log.h"
#include "modules/rtp_rtcp/include/report_block_data.h"
@ -148,7 +149,7 @@ class RtpTransportControllerSendInterface {
virtual void OnNetworkAvailability(bool network_available) = 0;
virtual RtcpBandwidthObserver* GetBandwidthObserver() = 0;
virtual int64_t GetPacerQueuingDelayMs() const = 0;
virtual int64_t GetFirstPacketTimeMs() const = 0;
virtual absl::optional<Timestamp> GetFirstPacketTime() const = 0;
virtual void EnablePeriodicAlrProbing(bool enable) = 0;
virtual void OnSentPacket(const rtc::SentPacket& sent_packet) = 0;
virtual void OnReceivedPacket(const ReceivedPacket& received_packet) = 0;

View File

@ -61,7 +61,7 @@ class MockRtpTransportControllerSend
MOCK_METHOD1(OnNetworkAvailability, void(bool));
MOCK_METHOD0(GetBandwidthObserver, RtcpBandwidthObserver*());
MOCK_CONST_METHOD0(GetPacerQueuingDelayMs, int64_t());
MOCK_CONST_METHOD0(GetFirstPacketTimeMs, int64_t());
MOCK_CONST_METHOD0(GetFirstPacketTime, absl::optional<Timestamp>());
MOCK_METHOD1(EnablePeriodicAlrProbing, void(bool));
MOCK_METHOD1(OnSentPacket, void(const rtc::SentPacket&));
MOCK_METHOD1(SetSdpBitrateParameters, void(const BitrateConstraints&));

View File

@ -23,6 +23,7 @@ rtc_static_library("pacing") {
"packet_router.h",
"round_robin_packet_queue.cc",
"round_robin_packet_queue.h",
"rtp_packet_pacer.h",
]
deps = [
@ -32,6 +33,10 @@ rtc_static_library("pacing") {
"../../api/transport:field_trial_based_config",
"../../api/transport:network_control",
"../../api/transport:webrtc_key_value_config",
"../../api/units:data_rate",
"../../api/units:data_size",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../logging:rtc_event_bwe",
"../../logging:rtc_event_log_api",
"../../logging:rtc_event_pacing",

View File

@ -30,9 +30,9 @@ class MockPacedSender : public PacedSender {
int64_t capture_time_ms,
size_t bytes,
bool retransmission));
MOCK_METHOD2(CreateProbeCluster, void(int, int));
MOCK_METHOD2(CreateProbeCluster, void(DataRate, int));
MOCK_METHOD1(SetEstimatedBitrate, void(uint32_t));
MOCK_METHOD2(SetPacingRates, void(uint32_t, uint32_t));
MOCK_METHOD2(SetPacingRates, void(DataRate, DataRate));
MOCK_CONST_METHOD0(QueueInMs, int64_t());
MOCK_CONST_METHOD0(QueueInPackets, int());
MOCK_CONST_METHOD0(ExpectedQueueTimeMs, int64_t());

View File

@ -21,6 +21,7 @@
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
@ -92,12 +93,14 @@ PacedSender::PacedSender(Clock* clock,
padding_budget_(0),
prober_(*field_trials_),
probing_send_failure_(false),
pacing_bitrate_kbps_(0),
pacing_bitrate_(DataRate::Zero()),
time_last_process_us_(clock->TimeInMicroseconds()),
last_send_time_us_(clock->TimeInMicroseconds()),
first_sent_packet_ms_(-1),
packets_(clock->TimeInMicroseconds()),
packet_counter_(0),
congestion_window_size_(DataSize::PlusInfinity()),
outstanding_data_(DataSize::Zero()),
process_thread_(nullptr),
queue_time_limit(kMaxQueueLengthMs),
account_for_audio_(false),
legacy_packet_referencing_(
@ -113,9 +116,9 @@ PacedSender::PacedSender(Clock* clock,
PacedSender::~PacedSender() {}
void PacedSender::CreateProbeCluster(int bitrate_bps, int cluster_id) {
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
rtc::CritScope cs(&critsect_);
prober_.CreateProbeCluster(bitrate_bps, TimeMilliseconds(), cluster_id);
prober_.CreateProbeCluster(bitrate.bps(), TimeMilliseconds(), cluster_id);
}
void PacedSender::Pause() {
@ -148,20 +151,21 @@ void PacedSender::Resume() {
process_thread_->WakeUp(this);
}
void PacedSender::SetCongestionWindow(int64_t congestion_window_bytes) {
void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
rtc::CritScope cs(&critsect_);
congestion_window_bytes_ = congestion_window_bytes;
congestion_window_size_ = congestion_window_size;
}
void PacedSender::UpdateOutstandingData(int64_t outstanding_bytes) {
void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
rtc::CritScope cs(&critsect_);
outstanding_bytes_ = outstanding_bytes;
outstanding_data_ = outstanding_data;
}
bool PacedSender::Congested() const {
if (congestion_window_bytes_ == kNoCongestionWindow)
return false;
return outstanding_bytes_ >= congestion_window_bytes_;
if (congestion_window_size_.IsFinite()) {
return outstanding_data_ >= congestion_window_size_;
}
return false;
}
int64_t PacedSender::TimeMilliseconds() const {
@ -183,16 +187,15 @@ void PacedSender::SetProbingEnabled(bool enabled) {
prober_.SetEnabled(enabled);
}
void PacedSender::SetPacingRates(uint32_t pacing_rate_bps,
uint32_t padding_rate_bps) {
void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
rtc::CritScope cs(&critsect_);
RTC_DCHECK(pacing_rate_bps > 0);
pacing_bitrate_kbps_ = pacing_rate_bps / 1000;
padding_budget_.set_target_rate_kbps(padding_rate_bps / 1000);
RTC_DCHECK_GT(pacing_rate, DataRate::Zero());
pacing_bitrate_ = pacing_rate;
padding_budget_.set_target_rate_kbps(padding_rate.kbps());
RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps="
<< pacing_bitrate_kbps_
<< " padding_budget_kbps=" << padding_rate_bps / 1000;
<< pacing_bitrate_.kbps()
<< " padding_budget_kbps=" << padding_rate.kbps();
}
void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
@ -202,7 +205,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
size_t bytes,
bool retransmission) {
rtc::CritScope cs(&critsect_);
RTC_DCHECK(pacing_bitrate_kbps_ > 0)
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
int64_t now_ms = TimeMilliseconds();
@ -229,7 +232,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
rtc::CritScope cs(&critsect_);
RTC_DCHECK(pacing_bitrate_kbps_ > 0)
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
int64_t now_ms = TimeMilliseconds();
@ -249,11 +252,12 @@ void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
account_for_audio_ = account_for_audio;
}
int64_t PacedSender::ExpectedQueueTimeMs() const {
TimeDelta PacedSender::ExpectedQueueTime() const {
rtc::CritScope cs(&critsect_);
RTC_DCHECK_GT(pacing_bitrate_kbps_, 0);
return static_cast<int64_t>(packets_.SizeInBytes() * 8 /
pacing_bitrate_kbps_);
RTC_DCHECK_GT(pacing_bitrate_, DataRate::Zero());
return TimeDelta::ms(
(QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) /
pacing_bitrate_.bps());
}
size_t PacedSender::QueueSizePackets() const {
@ -261,24 +265,25 @@ size_t PacedSender::QueueSizePackets() const {
return packets_.SizeInPackets();
}
int64_t PacedSender::QueueSizeBytes() const {
DataSize PacedSender::QueueSizeData() const {
rtc::CritScope cs(&critsect_);
return packets_.SizeInBytes();
return DataSize::bytes(packets_.SizeInBytes());
}
int64_t PacedSender::FirstSentPacketTimeMs() const {
absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
rtc::CritScope cs(&critsect_);
return first_sent_packet_ms_;
return first_sent_packet_time_;
}
int64_t PacedSender::QueueInMs() const {
TimeDelta PacedSender::OldestPacketWaitTime() const {
rtc::CritScope cs(&critsect_);
int64_t oldest_packet = packets_.OldestEnqueueTimeMs();
if (oldest_packet == 0)
return 0;
if (oldest_packet == 0) {
return TimeDelta::Zero();
}
return TimeMilliseconds() - oldest_packet;
return TimeDelta::ms(TimeMilliseconds() - oldest_packet);
}
int64_t PacedSender::TimeUntilNextProcess() {
@ -356,7 +361,7 @@ void PacedSender::Process() {
return;
if (elapsed_time_ms > 0) {
int target_bitrate_kbps = pacing_bitrate_kbps_;
int target_bitrate_kbps = pacing_bitrate_.kbps();
size_t queue_size_bytes = packets_.SizeInBytes();
if (queue_size_bytes > 0) {
// Assuming equal size packets and input/output rate, the average packet
@ -540,8 +545,9 @@ RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
}
void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) {
if (first_sent_packet_ms_ == -1)
first_sent_packet_ms_ = TimeMilliseconds();
if (!first_sent_packet_time_) {
first_sent_packet_time_ = Timestamp::ms(TimeMilliseconds());
}
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
@ -566,14 +572,14 @@ void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
}
void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
outstanding_bytes_ += bytes_sent;
outstanding_data_ += DataSize::bytes(bytes_sent);
media_budget_.UseBudget(bytes_sent);
padding_budget_.UseBudget(bytes_sent);
}
void PacedSender::SetQueueTimeLimit(int limit_ms) {
void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
rtc::CritScope cs(&critsect_);
queue_time_limit = limit_ms;
queue_time_limit = limit.ms();
}
} // namespace webrtc

View File

@ -27,6 +27,7 @@
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/include/rtp_packet_sender.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/utility/include/process_thread.h"
@ -38,10 +39,10 @@ namespace webrtc {
class Clock;
class RtcEventLog;
class PacedSender : public Module, public RtpPacketSender {
class PacedSender : public Module,
public RtpPacketPacer,
public RtpPacketSender {
public:
static constexpr int64_t kNoCongestionWindow = -1;
// Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
@ -61,24 +62,7 @@ class PacedSender : public Module, public RtpPacketSender {
~PacedSender() override;
virtual void CreateProbeCluster(int bitrate_bps, int cluster_id);
// Temporarily pause all sending.
void Pause();
// Resume sending packets.
void Resume();
void SetCongestionWindow(int64_t congestion_window_bytes);
void UpdateOutstandingData(int64_t outstanding_bytes);
// Enable bitrate probing. Enabled by default, mostly here to simplify
// testing. Must be called before any packets are being sent to have an
// effect.
void SetProbingEnabled(bool enabled);
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps);
// Methods implementing RtpPacketSender.
// Adds the packet information to the queue and calls TimeToSendPacket
// when it's time to send.
@ -88,30 +72,71 @@ class PacedSender : public Module, public RtpPacketSender {
int64_t capture_time_ms,
size_t bytes,
bool retransmission) override;
// Adds the packet to the queue and calls PacketRouter::SendPacket() when
// it's time to send.
void EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) override;
// Methods implementing RtpPacketPacer:
void CreateProbeCluster(DataRate bitrate, int cluster_id) override;
// TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone.
void CreateProbeCluster(int bitrate_bps, int cluster_id) {
CreateProbeCluster(DataRate::bps(bitrate_bps), cluster_id);
}
// Temporarily pause all sending.
void Pause() override;
// Resume sending packets.
void Resume() override;
void SetCongestionWindow(DataSize congestion_window_size) override;
void UpdateOutstandingData(DataSize outstanding_data) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
// TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone.
void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps) {
SetPacingRates(DataRate::bps(pacing_rate_bps),
DataRate::bps(padding_rate_bps));
}
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected
// at high priority.
void SetAccountForAudioPackets(bool account_for_audio);
void SetAccountForAudioPackets(bool account_for_audio) override;
// Returns the time since the oldest queued packet was enqueued.
virtual int64_t QueueInMs() const;
TimeDelta OldestPacketWaitTime() const override;
// TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone.
int64_t QueueInMs() const { return OldestPacketWaitTime().ms(); }
virtual size_t QueueSizePackets() const;
virtual int64_t QueueSizeBytes() const;
size_t QueueSizePackets() const override;
DataSize QueueSizeData() const override;
// Returns the time when the first packet was sent, or -1 if no packet is
// sent.
virtual int64_t FirstSentPacketTimeMs() const;
// Returns the time when the first packet was sent;
absl::optional<Timestamp> FirstSentPacketTime() const override;
// Returns the number of milliseconds it will take to send the current
// packets in the queue, given the current size and bitrate, ignoring prio.
virtual int64_t ExpectedQueueTimeMs() const;
TimeDelta ExpectedQueueTime() const override;
void SetQueueTimeLimit(TimeDelta limit) override;
// TODO(bugs.webrtc.org/10809): Remove once downstream usage is gone.
void SetQueueTimeLimit(int limit_ms) {
SetQueueTimeLimit(TimeDelta::ms(limit_ms));
}
// Below are methods specific to this implementation, such as things related
// to module processing thread specifics or methods exposed for test.
// Enable bitrate probing. Enabled by default, mostly here to simplify
// testing. Must be called before any packets are being sent to have an
// effect.
void SetProbingEnabled(bool enabled);
// Methods implementing Module.
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
@ -122,7 +147,6 @@ class PacedSender : public Module, public RtpPacketSender {
// Called when the prober is associated with a process thread.
void ProcessThreadAttached(ProcessThread* process_thread) override;
void SetQueueTimeLimit(int limit_ms);
private:
int64_t UpdateTimeAndGetElapsedMs(int64_t now_us)
@ -177,18 +201,17 @@ class PacedSender : public Module, public RtpPacketSender {
BitrateProber prober_ RTC_GUARDED_BY(critsect_);
bool probing_send_failure_ RTC_GUARDED_BY(critsect_);
uint32_t pacing_bitrate_kbps_ RTC_GUARDED_BY(critsect_);
DataRate pacing_bitrate_ RTC_GUARDED_BY(critsect_);
int64_t time_last_process_us_ RTC_GUARDED_BY(critsect_);
int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_);
int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_);
absl::optional<Timestamp> first_sent_packet_time_ RTC_GUARDED_BY(critsect_);
RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_);
uint64_t packet_counter_ RTC_GUARDED_BY(critsect_);
int64_t congestion_window_bytes_ RTC_GUARDED_BY(critsect_) =
kNoCongestionWindow;
int64_t outstanding_bytes_ RTC_GUARDED_BY(critsect_) = 0;
DataSize congestion_window_size_ RTC_GUARDED_BY(critsect_);
DataSize outstanding_data_ RTC_GUARDED_BY(critsect_);
// Lock to avoid race when attaching process thread. This can happen due to
// the Call class setting network state on RtpTransportControllerSend, which
@ -196,7 +219,7 @@ class PacedSender : public Module, public RtpPacketSender {
// pacer process thread. If RtpTransportControllerSend is running on a task
// queue separate from the thread used by Call, this causes a race.
rtc::CriticalSection process_thread_lock_;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_);
int64_t queue_time_limit RTC_GUARDED_BY(critsect_);
bool account_for_audio_ RTC_GUARDED_BY(critsect_);

View File

@ -270,13 +270,16 @@ class PacedSenderTest : public ::testing::TestWithParam<PacerMode> {
}
void Init() {
send_bucket_->CreateProbeCluster(kFirstClusterBps, /*cluster_id=*/0);
send_bucket_->CreateProbeCluster(kSecondClusterBps, /*cluster_id=*/1);
send_bucket_->CreateProbeCluster(DataRate::bps(kFirstClusterBps),
/*cluster_id=*/0);
send_bucket_->CreateProbeCluster(DataRate::bps(kSecondClusterBps),
/*cluster_id=*/1);
// Default to bitrate probing disabled for testing purposes. Probing tests
// have to enable probing, either by creating a new PacedSender instance or
// by calling SetProbingEnabled(true).
send_bucket_->SetProbingEnabled(false);
send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier, 0);
send_bucket_->SetPacingRates(
DataRate::bps(kTargetBitrateBps * kPaceMultiplier), DataRate::Zero());
clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess());
}
@ -381,7 +384,7 @@ class PacedSenderFieldTrialTest : public ::testing::TestWithParam<PacerMode> {
TEST_P(PacedSenderFieldTrialTest, DefaultNoPaddingInSilence) {
PacedSender pacer(&clock_, &callback_, nullptr);
pacer.SetPacingRates(kTargetBitrateBps, 0);
pacer.SetPacingRates(DataRate::bps(kTargetBitrateBps), DataRate::Zero());
// Video packet to reset last send time and provide padding data.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
@ -397,7 +400,7 @@ TEST_P(PacedSenderFieldTrialTest, PaddingInSilenceWithTrial) {
ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) +
"WebRTC-Pacer-PadInSilence/Enabled/");
PacedSender pacer(&clock_, &callback_, nullptr);
pacer.SetPacingRates(kTargetBitrateBps, 0);
pacer.SetPacingRates(DataRate::bps(kTargetBitrateBps), DataRate::Zero());
// Video packet to reset last send time and provide padding data.
InsertPacket(&pacer, &video);
if (GetParam() == PacerMode::kReferencePackets) {
@ -417,9 +420,9 @@ TEST_P(PacedSenderFieldTrialTest, PaddingInSilenceWithTrial) {
TEST_P(PacedSenderFieldTrialTest, DefaultCongestionWindowAffectsAudio) {
EXPECT_CALL(callback_, SendPadding).Times(0);
PacedSender pacer(&clock_, &callback_, nullptr);
pacer.SetPacingRates(10000000, 0);
pacer.SetCongestionWindow(800);
pacer.UpdateOutstandingData(0);
pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero());
pacer.SetCongestionWindow(DataSize::bytes(800));
pacer.UpdateOutstandingData(DataSize::Zero());
// Video packet fills congestion window.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
@ -431,7 +434,7 @@ TEST_P(PacedSenderFieldTrialTest, DefaultCongestionWindowAffectsAudio) {
ProcessNext(&pacer);
// Audio packet unblocked when congestion window clear.
::testing::Mock::VerifyAndClearExpectations(&callback_);
pacer.UpdateOutstandingData(0);
pacer.UpdateOutstandingData(DataSize::Zero());
EXPECT_CALL(callback_, SendPacket).Times(1);
ProcessNext(&pacer);
}
@ -441,9 +444,9 @@ TEST_P(PacedSenderFieldTrialTest, CongestionWindowDoesNotAffectAudioInTrial) {
"WebRTC-Pacer-BlockAudio/Disabled/");
EXPECT_CALL(callback_, SendPadding).Times(0);
PacedSender pacer(&clock_, &callback_, nullptr);
pacer.SetPacingRates(10000000, 0);
pacer.SetCongestionWindow(800);
pacer.UpdateOutstandingData(0);
pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero());
pacer.SetCongestionWindow(DataSize::bytes(800));
pacer.UpdateOutstandingData(DataSize::Zero());
// Video packet fills congestion window.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
@ -456,8 +459,9 @@ TEST_P(PacedSenderFieldTrialTest, CongestionWindowDoesNotAffectAudioInTrial) {
TEST_P(PacedSenderFieldTrialTest, DefaultBudgetAffectsAudio) {
PacedSender pacer(&clock_, &callback_, nullptr);
pacer.SetPacingRates(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond,
0);
pacer.SetPacingRates(
DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond),
DataRate::Zero());
// Video fills budget for following process periods.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
@ -479,8 +483,9 @@ TEST_P(PacedSenderFieldTrialTest, BudgetDoesNotAffectAudioInTrial) {
"WebRTC-Pacer-BlockAudio/Disabled/");
EXPECT_CALL(callback_, SendPadding).Times(0);
PacedSender pacer(&clock_, &callback_, nullptr);
pacer.SetPacingRates(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond,
0);
pacer.SetPacingRates(
DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond),
DataRate::Zero());
// Video fills budget for following process periods.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
@ -504,7 +509,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) {
const int64_t kStartMs = clock_.TimeInMilliseconds();
// No packet sent.
EXPECT_EQ(-1, send_bucket_->FirstSentPacketTimeMs());
EXPECT_FALSE(send_bucket_->FirstSentPacketTime().has_value());
for (size_t i = 0; i < kPacketToSend; ++i) {
SendAndExpectPacket(RtpPacketToSend::Type::kVideo, kSsrc, sequence_number++,
@ -512,7 +517,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) {
send_bucket_->Process();
clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess());
}
EXPECT_EQ(kStartMs, send_bucket_->FirstSentPacketTimeMs());
EXPECT_EQ(Timestamp::ms(kStartMs), send_bucket_->FirstSentPacketTime());
}
TEST_P(PacedSenderTest, QueuePacket) {
@ -644,8 +649,9 @@ TEST_P(PacedSenderTest, Padding) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier,
kTargetBitrateBps);
send_bucket_->SetPacingRates(
DataRate::bps(kTargetBitrateBps) * kPaceMultiplier,
DataRate::bps(kTargetBitrateBps));
// Due to the multiplicative factor we can send 5 packets during a send
// interval. (network capacity * multiplier / (8 bits per byte *
@ -680,8 +686,9 @@ TEST_P(PacedSenderTest, Padding) {
}
TEST_P(PacedSenderTest, NoPaddingBeforeNormalPacket) {
send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier,
kTargetBitrateBps);
send_bucket_->SetPacingRates(
DataRate::bps(kTargetBitrateBps) * kPaceMultiplier,
DataRate::bps(kTargetBitrateBps));
EXPECT_CALL(callback_, SendPadding).Times(0);
send_bucket_->Process();
@ -707,8 +714,9 @@ TEST_P(PacedSenderTest, VerifyPaddingUpToBitrate) {
int64_t capture_time_ms = 56789;
const int kTimeStep = 5;
const int64_t kBitrateWindow = 100;
send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier,
kTargetBitrateBps);
send_bucket_->SetPacingRates(
DataRate::bps(kTargetBitrateBps) * kPaceMultiplier,
DataRate::bps(kTargetBitrateBps));
int64_t start_time = clock_.TimeInMilliseconds();
while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) {
@ -730,8 +738,9 @@ TEST_P(PacedSenderTest, VerifyAverageBitrateVaryingMediaPayload) {
PacedSenderPadding callback;
send_bucket_.reset(new PacedSender(&clock_, &callback, nullptr));
send_bucket_->SetProbingEnabled(false);
send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier,
kTargetBitrateBps);
send_bucket_->SetPacingRates(
DataRate::bps(kTargetBitrateBps) * kPaceMultiplier,
DataRate::bps(kTargetBitrateBps));
int64_t start_time = clock_.TimeInMilliseconds();
size_t media_bytes = 0;
@ -895,8 +904,8 @@ TEST_P(PacedSenderTest, SendsOnlyPaddingWhenCongested) {
int kPacketSize = 250;
int kCongestionWindow = kPacketSize * 10;
send_bucket_->UpdateOutstandingData(0);
send_bucket_->SetCongestionWindow(kCongestionWindow);
send_bucket_->UpdateOutstandingData(DataSize::Zero());
send_bucket_->SetCongestionWindow(DataSize::bytes(kCongestionWindow));
int sent_data = 0;
while (sent_data < kCongestionWindow) {
sent_data += kPacketSize;
@ -935,10 +944,11 @@ TEST_P(PacedSenderTest, DoesNotAllowOveruseAfterCongestion) {
EXPECT_CALL(callback_, SendPadding).Times(0);
// The pacing rate is low enough that the budget should not allow two packets
// to be sent in a row.
send_bucket_->SetPacingRates(400 * 8 * 1000 / 5, 0);
send_bucket_->SetPacingRates(DataRate::bps(400 * 8 * 1000 / 5),
DataRate::Zero());
// The congestion window is small enough to only let one packet through.
send_bucket_->SetCongestionWindow(800);
send_bucket_->UpdateOutstandingData(0);
send_bucket_->SetCongestionWindow(DataSize::bytes(800));
send_bucket_->UpdateOutstandingData(DataSize::Zero());
// Not yet budget limited or congested, packet is sent.
Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(1);
@ -954,13 +964,13 @@ TEST_P(PacedSenderTest, DoesNotAllowOveruseAfterCongestion) {
EXPECT_CALL(callback_, SendPacket).Times(0);
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
send_bucket_->UpdateOutstandingData(0);
send_bucket_->UpdateOutstandingData(DataSize::Zero());
// Congestion removed and budget has recovered, packet is sent.
Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(1);
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
send_bucket_->UpdateOutstandingData(0);
send_bucket_->UpdateOutstandingData(DataSize::Zero());
// Should be blocked due to budget limitation as congestion has be removed.
Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(0);
@ -976,8 +986,8 @@ TEST_P(PacedSenderTest, ResumesSendingWhenCongestionEnds) {
int64_t kCongestionWindow = kPacketSize * kCongestionCount;
int64_t kCongestionTimeMs = 1000;
send_bucket_->UpdateOutstandingData(0);
send_bucket_->SetCongestionWindow(kCongestionWindow);
send_bucket_->UpdateOutstandingData(DataSize::Zero());
send_bucket_->SetCongestionWindow(DataSize::bytes(kCongestionWindow));
int sent_data = 0;
while (sent_data < kCongestionWindow) {
sent_data += kPacketSize;
@ -1002,8 +1012,8 @@ TEST_P(PacedSenderTest, ResumesSendingWhenCongestionEnds) {
// as many are sent
int ack_count = kCongestionCount / 2;
EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count);
send_bucket_->UpdateOutstandingData(kCongestionWindow -
kPacketSize * ack_count);
send_bucket_->UpdateOutstandingData(
DataSize::bytes(kCongestionWindow - kPacketSize * ack_count));
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
clock_.AdvanceTimeMilliseconds(5);
@ -1017,7 +1027,7 @@ TEST_P(PacedSenderTest, ResumesSendingWhenCongestionEnds) {
EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _))
.Times(unacked_packets);
for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
send_bucket_->UpdateOutstandingData(0);
send_bucket_->UpdateOutstandingData(DataSize::Zero());
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->Process();
}
@ -1030,7 +1040,7 @@ TEST_P(PacedSenderTest, Pause) {
uint16_t sequence_number = 1234;
int64_t capture_time_ms = clock_.TimeInMilliseconds();
EXPECT_EQ(0, send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime());
// Due to the multiplicative factor we can send 5 packets during a send
// interval. (network capacity * multiplier / (8 bits per byte *
@ -1066,8 +1076,8 @@ TEST_P(PacedSenderTest, Pause) {
}
// Expect everything to be queued.
EXPECT_EQ(second_capture_time_ms - capture_time_ms,
send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::ms(second_capture_time_ms - capture_time_ms),
send_bucket_->OldestPacketWaitTime());
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1));
@ -1133,7 +1143,7 @@ TEST_P(PacedSenderTest, Pause) {
clock_.AdvanceTimeMilliseconds(5);
}
EXPECT_EQ(0, send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime());
}
TEST_P(PacedSenderTest, ResendPacket) {
@ -1151,7 +1161,7 @@ TEST_P(PacedSenderTest, ResendPacket) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = clock_.TimeInMilliseconds();
EXPECT_EQ(0, send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime());
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number, capture_time_ms, 250, false);
@ -1160,8 +1170,8 @@ TEST_P(PacedSenderTest, ResendPacket) {
sequence_number + 1, capture_time_ms + 1, 250,
false);
clock_.AdvanceTimeMilliseconds(9999);
EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms,
send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms),
send_bucket_->OldestPacketWaitTime());
// Fails to send first packet so only one call.
EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms,
false, _))
@ -1171,8 +1181,8 @@ TEST_P(PacedSenderTest, ResendPacket) {
send_bucket_->Process();
// Queue remains unchanged.
EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms,
send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms),
send_bucket_->OldestPacketWaitTime());
// Fails to send second packet.
EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms,
@ -1185,8 +1195,8 @@ TEST_P(PacedSenderTest, ResendPacket) {
send_bucket_->Process();
// Queue is reduced by 1 packet.
EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms - 1,
send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms - 1),
send_bucket_->OldestPacketWaitTime());
// Send second packet and queue becomes empty.
EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number + 1,
@ -1194,7 +1204,7 @@ TEST_P(PacedSenderTest, ResendPacket) {
.WillOnce(Return(RtpPacketSendResult::kSuccess));
clock_.AdvanceTimeMilliseconds(10000);
send_bucket_->Process();
EXPECT_EQ(0, send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime());
}
TEST_P(PacedSenderTest, ExpectedQueueTimeMs) {
@ -1203,18 +1213,19 @@ TEST_P(PacedSenderTest, ExpectedQueueTimeMs) {
const size_t kNumPackets = 60;
const size_t kPacketSize = 1200;
const int32_t kMaxBitrate = kPaceMultiplier * 30000;
EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime());
send_bucket_->SetPacingRates(30000 * kPaceMultiplier, 0);
send_bucket_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier),
DataRate::Zero());
for (size_t i = 0; i < kNumPackets; ++i) {
SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
}
// Queue in ms = 1000 * (bytes in queue) *8 / (bits per second)
int64_t queue_in_ms =
static_cast<int64_t>(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate);
EXPECT_EQ(queue_in_ms, send_bucket_->ExpectedQueueTimeMs());
TimeDelta queue_time =
TimeDelta::ms(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate);
EXPECT_EQ(queue_time, send_bucket_->ExpectedQueueTime());
int64_t time_start = clock_.TimeInMilliseconds();
while (send_bucket_->QueueSizePackets() > 0) {
@ -1227,7 +1238,7 @@ TEST_P(PacedSenderTest, ExpectedQueueTimeMs) {
}
int64_t duration = clock_.TimeInMilliseconds() - time_start;
EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->ExpectedQueueTime());
// Allow for aliasing, duration should be within one pack of max time limit.
EXPECT_NEAR(duration, PacedSender::kMaxQueueLengthMs,
@ -1237,16 +1248,17 @@ TEST_P(PacedSenderTest, ExpectedQueueTimeMs) {
TEST_P(PacedSenderTest, QueueTimeGrowsOverTime) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
EXPECT_EQ(0, send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime());
send_bucket_->SetPacingRates(30000 * kPaceMultiplier, 0);
send_bucket_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier),
DataRate::Zero());
SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number,
clock_.TimeInMilliseconds(), 1200);
clock_.AdvanceTimeMilliseconds(500);
EXPECT_EQ(500, send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::ms(500), send_bucket_->OldestPacketWaitTime());
send_bucket_->Process();
EXPECT_EQ(0, send_bucket_->QueueInMs());
EXPECT_EQ(TimeDelta::Zero(), send_bucket_->OldestPacketWaitTime());
}
TEST_P(PacedSenderTest, ProbingWithInsertedPackets) {
@ -1257,9 +1269,12 @@ TEST_P(PacedSenderTest, ProbingWithInsertedPackets) {
PacedSenderProbing packet_sender;
send_bucket_.reset(new PacedSender(&clock_, &packet_sender, nullptr));
send_bucket_->CreateProbeCluster(kFirstClusterBps, /*cluster_id=*/0);
send_bucket_->CreateProbeCluster(kSecondClusterBps, /*cluster_id=*/1);
send_bucket_->SetPacingRates(kInitialBitrateBps * kPaceMultiplier, 0);
send_bucket_->CreateProbeCluster(DataRate::bps(kFirstClusterBps),
/*cluster_id=*/0);
send_bucket_->CreateProbeCluster(DataRate::bps(kSecondClusterBps),
/*cluster_id=*/1);
send_bucket_->SetPacingRates(
DataRate::bps(kInitialBitrateBps * kPaceMultiplier), DataRate::Zero());
for (int i = 0; i < 10; ++i) {
Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
@ -1302,8 +1317,10 @@ TEST_P(PacedSenderTest, ProbingWithPaddingSupport) {
PacedSenderProbing packet_sender;
send_bucket_.reset(new PacedSender(&clock_, &packet_sender, nullptr));
send_bucket_->CreateProbeCluster(kFirstClusterBps, /*cluster_id=*/0);
send_bucket_->SetPacingRates(kInitialBitrateBps * kPaceMultiplier, 0);
send_bucket_->CreateProbeCluster(DataRate::bps(kFirstClusterBps),
/*cluster_id=*/0);
send_bucket_->SetPacingRates(
DataRate::bps(kInitialBitrateBps * kPaceMultiplier), DataRate::Zero());
for (int i = 0; i < 3; ++i) {
Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
@ -1335,7 +1352,8 @@ TEST_P(PacedSenderTest, PaddingOveruse) {
const size_t kPacketSize = 1200;
send_bucket_->Process();
send_bucket_->SetPacingRates(60000 * kPaceMultiplier, 0);
send_bucket_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier),
DataRate::Zero());
SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
@ -1344,11 +1362,12 @@ TEST_P(PacedSenderTest, PaddingOveruse) {
// Add 30kbit padding. When increasing budget, media budget will increase from
// negative (overuse) while padding budget will increase from 0.
clock_.AdvanceTimeMilliseconds(5);
send_bucket_->SetPacingRates(60000 * kPaceMultiplier, 30000);
send_bucket_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier),
DataRate::bps(30000));
SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
EXPECT_LT(5u, send_bucket_->ExpectedQueueTimeMs());
EXPECT_LT(TimeDelta::ms(5), send_bucket_->ExpectedQueueTime());
// Don't send padding if queue is non-empty, even if padding budget > 0.
EXPECT_CALL(callback_, SendPadding).Times(0);
send_bucket_->Process();
@ -1364,8 +1383,9 @@ TEST_P(PacedSenderTest, ProbeClusterId) {
uint16_t sequence_number = 1234;
const size_t kPacketSize = 1200;
send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier,
kTargetBitrateBps);
send_bucket_->SetPacingRates(
DataRate::bps(kTargetBitrateBps) * kPaceMultiplier,
DataRate::bps(kTargetBitrateBps));
send_bucket_->SetProbingEnabled(true);
for (int i = 0; i < 10; ++i) {
Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
@ -1447,8 +1467,9 @@ TEST_P(PacedSenderTest, AvoidBusyLoopOnSendFailure) {
uint16_t sequence_number = 1234;
const size_t kPacketSize = kFirstClusterBps / (8000 / 10);
send_bucket_->SetPacingRates(kTargetBitrateBps * kPaceMultiplier,
kTargetBitrateBps);
send_bucket_->SetPacingRates(
DataRate::bps(kTargetBitrateBps) * kPaceMultiplier,
DataRate::bps(kTargetBitrateBps));
send_bucket_->SetProbingEnabled(true);
Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number,
clock_.TimeInMilliseconds(), kPacketSize);

View File

@ -0,0 +1,73 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_RTP_PACKET_PACER_H_
#define MODULES_PACING_RTP_PACKET_PACER_H_
#include <stdint.h>
#include "absl/types/optional.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/include/rtp_packet_sender.h"
namespace webrtc {
class RtpPacketPacer {
public:
virtual ~RtpPacketPacer() = default;
virtual void CreateProbeCluster(DataRate bitrate, int cluster_id) = 0;
// Temporarily pause all sending.
virtual void Pause() = 0;
// Resume sending packets.
virtual void Resume() = 0;
virtual void SetCongestionWindow(DataSize congestion_window_size) = 0;
virtual void UpdateOutstandingData(DataSize outstanding_data) = 0;
// Sets the pacing rates. Must be called once before packets can be sent.
virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0;
// Time since the oldest packet currently in the queue was added.
virtual TimeDelta OldestPacketWaitTime() const = 0;
// Current number of packets curently in the pacer queue.
virtual size_t QueueSizePackets() const = 0;
// Sum of payload + padding bytes of all packets currently in the pacer queue.
virtual DataSize QueueSizeData() const = 0;
// Returns the time when the first packet was sent.
virtual absl::optional<Timestamp> FirstSentPacketTime() const = 0;
// Returns the expected number of milliseconds it will take to send the
// current packets in the queue, given the current size and bitrate, ignoring
// priority.
virtual TimeDelta ExpectedQueueTime() const = 0;
// Set the average upper bound on pacer queuing delay. The pacer may send at
// a higher rate than what was configured via SetPacingRates() in order to
// keep ExpectedQueueTimeMs() below |limit_ms| on average.
virtual void SetQueueTimeLimit(TimeDelta limit) = 0;
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected
// at high priority.
virtual void SetAccountForAudioPackets(bool account_for_audio) = 0;
};
} // namespace webrtc
#endif // MODULES_PACING_RTP_PACKET_PACER_H_