Add ability to control TaskQueuePacedSender holdback window.

Holdback window can be specified as absolute time and in terms of packet
send times. Example:
WebRTC-TaskQueuePacer/Enabled,holdback_window:20ms,holdback_packet:3/

If current conditions have us running with 2000kbps pacing rate and
1250byte (10kbit) packets, each packet send time is 5ms.
The holdback window would then be min(20ms, 3*5ms) = 15ms.

The default is like before 1ms and packets no take into account when
TQ pacer is used, parameters have no effect with legacy process thread
pacer.

Bug: webrtc:10809
Change-Id: I800de05107e2d4df461eabaaf1ca04fb4c5de51e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/233421
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35266}
This commit is contained in:
Erik Språng 2021-10-26 16:19:03 +02:00 committed by WebRTC LUCI CQ
parent 7bb853f549
commit 0f86c1f125
7 changed files with 602 additions and 459 deletions

View File

@ -75,6 +75,15 @@ bool IsRelayed(const rtc::NetworkRoute& route) {
} // namespace } // namespace
RtpTransportControllerSend::PacerSettings::PacerSettings(
const WebRtcKeyValueConfig* trials)
: tq_disabled("Disabled"),
holdback_window("holdback_window", PacingController::kMinSleepTime),
holdback_packets("holdback_packets", -1) {
ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets},
trials->Lookup("WebRTC-TaskQueuePacer"));
}
RtpTransportControllerSend::RtpTransportControllerSend( RtpTransportControllerSend::RtpTransportControllerSend(
Clock* clock, Clock* clock,
webrtc::RtcEventLog* event_log, webrtc::RtcEventLog* event_log,
@ -89,8 +98,8 @@ RtpTransportControllerSend::RtpTransportControllerSend(
bitrate_configurator_(bitrate_config), bitrate_configurator_(bitrate_config),
pacer_started_(false), pacer_started_(false),
process_thread_(std::move(process_thread)), process_thread_(std::move(process_thread)),
use_task_queue_pacer_(!IsDisabled(trials, "WebRTC-TaskQueuePacer")), pacer_settings_(trials),
process_thread_pacer_(use_task_queue_pacer_ process_thread_pacer_(pacer_settings_.use_task_queue_pacer()
? nullptr ? nullptr
: new PacedSender(clock, : new PacedSender(clock,
&packet_router_, &packet_router_,
@ -98,14 +107,14 @@ RtpTransportControllerSend::RtpTransportControllerSend(
trials, trials,
process_thread_.get())), process_thread_.get())),
task_queue_pacer_( task_queue_pacer_(
use_task_queue_pacer_ pacer_settings_.use_task_queue_pacer()
? new TaskQueuePacedSender( ? new TaskQueuePacedSender(clock,
clock,
&packet_router_, &packet_router_,
event_log, event_log,
trials, trials,
task_queue_factory, task_queue_factory,
/*hold_back_window = */ PacingController::kMinSleepTime) pacer_settings_.holdback_window.Get(),
pacer_settings_.holdback_packets.Get())
: nullptr), : nullptr),
observer_(nullptr), observer_(nullptr),
controller_factory_override_(controller_factory), controller_factory_override_(controller_factory),
@ -194,14 +203,14 @@ void RtpTransportControllerSend::UpdateControlState() {
} }
RtpPacketPacer* RtpTransportControllerSend::pacer() { RtpPacketPacer* RtpTransportControllerSend::pacer() {
if (use_task_queue_pacer_) { if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get(); return task_queue_pacer_.get();
} }
return process_thread_pacer_.get(); return process_thread_pacer_.get();
} }
const RtpPacketPacer* RtpTransportControllerSend::pacer() const { const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
if (use_task_queue_pacer_) { if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get(); return task_queue_pacer_.get();
} }
return process_thread_pacer_.get(); return process_thread_pacer_.get();
@ -226,7 +235,7 @@ RtpTransportControllerSend::transport_feedback_observer() {
} }
RtpPacketSender* RtpTransportControllerSend::packet_sender() { RtpPacketSender* RtpTransportControllerSend::packet_sender() {
if (use_task_queue_pacer_) { if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get(); return task_queue_pacer_.get();
} }
return process_thread_pacer_.get(); return process_thread_pacer_.get();
@ -503,7 +512,7 @@ void RtpTransportControllerSend::IncludeOverheadInPacedSender() {
void RtpTransportControllerSend::EnsureStarted() { void RtpTransportControllerSend::EnsureStarted() {
if (!pacer_started_) { if (!pacer_started_) {
pacer_started_ = true; pacer_started_ = true;
if (use_task_queue_pacer_) { if (pacer_settings_.use_task_queue_pacer()) {
task_queue_pacer_->EnsureStarted(); task_queue_pacer_->EnsureStarted();
} else { } else {
process_thread_->Start(); process_thread_->Start();

View File

@ -128,6 +128,16 @@ class RtpTransportControllerSend final
void OnRemoteNetworkEstimate(NetworkStateEstimate estimate) override; void OnRemoteNetworkEstimate(NetworkStateEstimate estimate) override;
private: private:
struct PacerSettings {
explicit PacerSettings(const WebRtcKeyValueConfig* trials);
bool use_task_queue_pacer() const { return !tq_disabled.Get(); }
FieldTrialFlag tq_disabled; // Kill-switch not normally used.
FieldTrialParameter<TimeDelta> holdback_window;
FieldTrialParameter<int> holdback_packets;
};
void MaybeCreateControllers() RTC_RUN_ON(task_queue_); void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
void UpdateInitialConstraints(TargetRateConstraints new_contraints) void UpdateInitialConstraints(TargetRateConstraints new_contraints)
RTC_RUN_ON(task_queue_); RTC_RUN_ON(task_queue_);
@ -158,7 +168,7 @@ class RtpTransportControllerSend final
std::map<std::string, rtc::NetworkRoute> network_routes_; std::map<std::string, rtc::NetworkRoute> network_routes_;
bool pacer_started_; bool pacer_started_;
const std::unique_ptr<ProcessThread> process_thread_; const std::unique_ptr<ProcessThread> process_thread_;
const bool use_task_queue_pacer_; const PacerSettings pacer_settings_;
std::unique_ptr<PacedSender> process_thread_pacer_; std::unique_ptr<PacedSender> process_thread_pacer_;
std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_; std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;

View File

@ -48,6 +48,7 @@ rtc_library("pacing") {
"../../logging:rtc_event_pacing", "../../logging:rtc_event_pacing",
"../../rtc_base:checks", "../../rtc_base:checks",
"../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_base_approved",
"../../rtc_base:rtc_numerics",
"../../rtc_base:rtc_task_queue", "../../rtc_base:rtc_task_queue",
"../../rtc_base/experiments:field_trial_parser", "../../rtc_base/experiments:field_trial_parser",
"../../rtc_base/synchronization:mutex", "../../rtc_base/synchronization:mutex",

View File

@ -103,6 +103,7 @@ class PacingController {
// Sets the pacing rates. Must be called once before packets can be sent. // Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate); void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
DataRate pacing_rate() const { return pacing_bitrate_; }
// Currently audio traffic is not accounted by pacer and passed through. // Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for // With the introduction of audio BWE audio traffic will be accounted for

View File

@ -36,9 +36,11 @@ TaskQueuePacedSender::TaskQueuePacedSender(
RtcEventLog* event_log, RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials, const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory, TaskQueueFactory* task_queue_factory,
TimeDelta hold_back_window) TimeDelta max_hold_back_window,
int max_hold_back_window_in_packets)
: clock_(clock), : clock_(clock),
hold_back_window_(hold_back_window), max_hold_back_window_(max_hold_back_window),
max_hold_back_window_in_packets_(max_hold_back_window_in_packets),
pacing_controller_(clock, pacing_controller_(clock,
packet_sender, packet_sender,
event_log, event_log,
@ -48,9 +50,12 @@ TaskQueuePacedSender::TaskQueuePacedSender(
stats_update_scheduled_(false), stats_update_scheduled_(false),
last_stats_time_(Timestamp::MinusInfinity()), last_stats_time_(Timestamp::MinusInfinity()),
is_shutdown_(false), is_shutdown_(false),
packet_size_(/*alpha=*/0.95),
task_queue_(task_queue_factory->CreateTaskQueue( task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender", "TaskQueuePacedSender",
TaskQueueFactory::Priority::NORMAL)) {} TaskQueueFactory::Priority::NORMAL)) {
packet_size_.Apply(1, 0);
}
TaskQueuePacedSender::~TaskQueuePacedSender() { TaskQueuePacedSender::~TaskQueuePacedSender() {
// Post an immediate task to mark the queue as shutting down. // Post an immediate task to mark the queue as shutting down.
@ -144,6 +149,7 @@ void TaskQueuePacedSender::EnqueuePackets(
task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable { task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
for (auto& packet : packets_) { for (auto& packet : packets_) {
packet_size_.Apply(1, packet->size());
RTC_DCHECK_GE(packet->capture_time_ms(), 0); RTC_DCHECK_GE(packet->capture_time_ms(), 0);
pacing_controller_.EnqueuePacket(std::move(packet)); pacing_controller_.EnqueuePacket(std::move(packet));
} }
@ -227,6 +233,17 @@ void TaskQueuePacedSender::MaybeProcessPackets(
next_process_time = pacing_controller_.NextSendTime(); next_process_time = pacing_controller_.NextSendTime();
} }
TimeDelta hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered());
if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() &&
!avg_packet_size.IsZero()) {
TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
absl::optional<TimeDelta> time_to_next_process; absl::optional<TimeDelta> time_to_next_process;
if (pacing_controller_.IsProbing() && if (pacing_controller_.IsProbing() &&
next_process_time != next_process_time_) { next_process_time != next_process_time_) {
@ -241,11 +258,11 @@ void TaskQueuePacedSender::MaybeProcessPackets(
(next_process_time - now).RoundDownTo(TimeDelta::Millis(1))); (next_process_time - now).RoundDownTo(TimeDelta::Millis(1)));
} }
} else if (next_process_time_.IsMinusInfinity() || } else if (next_process_time_.IsMinusInfinity() ||
next_process_time <= next_process_time_ - hold_back_window_) { next_process_time <= next_process_time_ - hold_back_window) {
// Schedule a new task since there is none currently scheduled // Schedule a new task since there is none currently scheduled
// (`next_process_time_` is infinite), or the new process time is at least // (`next_process_time_` is infinite), or the new process time is at least
// one holdback window earlier than whatever is currently scheduled. // one holdback window earlier than whatever is currently scheduled.
time_to_next_process = std::max(next_process_time - now, hold_back_window_); time_to_next_process = std::max(next_process_time - now, hold_back_window);
} }
if (time_to_next_process) { if (time_to_next_process) {

View File

@ -29,6 +29,7 @@
#include "modules/pacing/pacing_controller.h" #include "modules/pacing/pacing_controller.h"
#include "modules/pacing/rtp_packet_pacer.h" #include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -43,14 +44,15 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// there is currently a pacer queue and packets can't immediately be // there is currently a pacer queue and packets can't immediately be
// processed. Increasing this reduces thread wakeups at the expense of higher // processed. Increasing this reduces thread wakeups at the expense of higher
// latency. // latency.
// TODO(bugs.webrtc.org/10809): Remove default value for hold_back_window. // TODO(bugs.webrtc.org/10809): Remove default values.
TaskQueuePacedSender( TaskQueuePacedSender(
Clock* clock, Clock* clock,
PacingController::PacketSender* packet_sender, PacingController::PacketSender* packet_sender,
RtcEventLog* event_log, RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials, const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory, TaskQueueFactory* task_queue_factory,
TimeDelta hold_back_window = PacingController::kMinSleepTime); TimeDelta max_hold_back_window = PacingController::kMinSleepTime,
int max_hold_back_window_in_packets = -1);
~TaskQueuePacedSender() override; ~TaskQueuePacedSender() override;
@ -132,7 +134,9 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
Stats GetStats() const; Stats GetStats() const;
Clock* const clock_; Clock* const clock_;
const TimeDelta hold_back_window_; const TimeDelta max_hold_back_window_;
const int max_hold_back_window_in_packets_;
PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_); PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
// We want only one (valid) delayed process task in flight at a time. // We want only one (valid) delayed process task in flight at a time.
@ -161,6 +165,9 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// never drain. // never drain.
bool is_shutdown_ RTC_GUARDED_BY(task_queue_); bool is_shutdown_ RTC_GUARDED_BY(task_queue_);
// Filtered size of enqueued packets, in bytes.
rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_);
mutable Mutex stats_mutex_; mutable Mutex stats_mutex_;
Stats current_stats_ RTC_GUARDED_BY(stats_mutex_); Stats current_stats_ RTC_GUARDED_BY(stats_mutex_);

View File

@ -37,6 +37,7 @@ constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567; constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678; constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 1234; constexpr size_t kDefaultPacketSize = 1234;
constexpr int kNoPacketHoldback = -1;
class MockPacketRouter : public PacketRouter { class MockPacketRouter : public PacketRouter {
public: public:
@ -70,13 +71,15 @@ class TaskQueuePacedSenderForTest : public TaskQueuePacedSender {
RtcEventLog* event_log, RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials, const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory, TaskQueueFactory* task_queue_factory,
TimeDelta hold_back_window) TimeDelta hold_back_window,
int max_hold_back_window_in_packets)
: TaskQueuePacedSender(clock, : TaskQueuePacedSender(clock,
packet_router, packet_router,
event_log, event_log,
field_trials, field_trials,
task_queue_factory, task_queue_factory,
hold_back_window) {} hold_back_window,
max_hold_back_window_in_packets) {}
void OnStatsUpdated(const Stats& stats) override { void OnStatsUpdated(const Stats& stats) override {
++num_stats_updates_; ++num_stats_updates_;
@ -110,7 +113,7 @@ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
namespace test { namespace test {
std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) { std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
auto packet = std::make_unique<RtpPacketToSend>(nullptr); auto packet = std::make_unique<RtpPacketToSend>(nullptr);
packet->set_packet_type(type); packet->set_packet_type(type);
switch (type) { switch (type) {
@ -131,9 +134,9 @@ namespace test {
packet->SetPayloadSize(kDefaultPacketSize); packet->SetPayloadSize(kDefaultPacketSize);
return packet; return packet;
} }
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets( std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
RtpPacketMediaType type, RtpPacketMediaType type,
size_t num_packets) { size_t num_packets) {
std::vector<std::unique_ptr<RtpPacketToSend>> packets; std::vector<std::unique_ptr<RtpPacketToSend>> packets;
@ -141,16 +144,16 @@ namespace test {
packets.push_back(BuildRtpPacket(type)); packets.push_back(BuildRtpPacket(type));
} }
return packets; return packets;
} }
TEST(TaskQueuePacedSenderTest, PacesPackets) { TEST(TaskQueuePacedSenderTest, PacesPackets) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router, time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime); PacingController::kMinSleepTime, kNoPacketHoldback);
// Insert a number of packets, covering one second. // Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42; static constexpr size_t kPacketsToSend = 42;
@ -181,16 +184,16 @@ namespace test {
EXPECT_EQ(packets_sent, kPacketsToSend); EXPECT_EQ(packets_sent, kPacketsToSend);
ASSERT_TRUE(end_time.IsFinite()); ASSERT_TRUE(end_time.IsFinite());
EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0); EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
} }
TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router, time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime); PacingController::kMinSleepTime, kNoPacketHoldback);
// Insert a number of packets to be sent 200ms apart. // Insert a number of packets to be sent 200ms apart.
const size_t kPacketsPerSecond = 5; const size_t kPacketsPerSecond = 5;
@ -233,16 +236,16 @@ namespace test {
1.0); 1.0);
EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0, EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
1.0); 1.0);
} }
TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router, time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime); PacingController::kMinSleepTime, kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -265,17 +268,17 @@ namespace test {
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
time_controller.AdvanceTime(TimeDelta::Zero()); time_controller.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router); ::testing::Mock::VerifyAndClearExpectations(&packet_router);
} }
TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr,
kCoalescingWindow); time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level. // Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -302,17 +305,17 @@ namespace test {
EXPECT_CALL(packet_router, SendPacket).Times(5); EXPECT_CALL(packet_router, SendPacket).Times(5);
time_controller.AdvanceTime(TimeDelta::Millis(1)); time_controller.AdvanceTime(TimeDelta::Millis(1));
::testing::Mock::VerifyAndClearExpectations(&packet_router); ::testing::Mock::VerifyAndClearExpectations(&packet_router);
} }
TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr,
kCoalescingWindow); time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level. // Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -334,17 +337,17 @@ namespace test {
// flying. // flying.
EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
} }
TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) { TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr,
kCoalescingWindow); time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
pacer.EnsureStarted(); pacer.EnsureStarted();
@ -371,17 +374,17 @@ namespace test {
pacer.EnqueuePackets({}); pacer.EnqueuePackets({});
time_controller.AdvanceTime(TimeDelta::Zero()); time_controller.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(pacer.num_stats_updates_, 2u); EXPECT_EQ(pacer.num_stats_updates_, 2u);
} }
TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) { TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr,
kCoalescingWindow); time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds 10ms of buffer level. // Set rates so one packet adds 10ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -433,9 +436,9 @@ namespace test {
// updating does not happen when queue is drained. // updating does not happen when queue is drained.
time_controller.AdvanceTime(TimeDelta::Millis(400)); time_controller.AdvanceTime(TimeDelta::Millis(400));
EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
} }
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
@ -443,7 +446,7 @@ namespace test {
time_controller.GetClock(), &packet_router, time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime); PacingController::kMinSleepTime, kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level. // Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -462,8 +465,7 @@ namespace test {
// will be scheduled for sending in 4ms. // will be scheduled for sending in 4ms.
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2)); pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2));
const int kNotAProbe = PacedPacketInfo::kNotAProbe; const int kNotAProbe = PacedPacketInfo::kNotAProbe;
EXPECT_CALL( EXPECT_CALL(packet_router,
packet_router,
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
kNotAProbe))); kNotAProbe)));
// Advance to less than 3ms before next packet send time. // Advance to less than 3ms before next packet send time.
@ -482,8 +484,7 @@ namespace test {
const DataSize kProbeSize = kProbeRate * kProbeTimeDelta; const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
const size_t kNumPacketsInProbe = const size_t kNumPacketsInProbe =
(kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize; (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize;
EXPECT_CALL( EXPECT_CALL(packet_router,
packet_router,
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
kProbeClusterId))) kProbeClusterId)))
.Times(kNumPacketsInProbe + 1); .Times(kNumPacketsInProbe + 1);
@ -496,26 +497,24 @@ namespace test {
// kProbeTimeDelta. That there was existing scheduled call less than // kProbeTimeDelta. That there was existing scheduled call less than
// PacingController::kMinSleepTime before this should not matter. // PacingController::kMinSleepTime before this should not matter.
EXPECT_CALL( EXPECT_CALL(packet_router,
packet_router,
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
kProbeClusterId))) kProbeClusterId)))
.Times(AtLeast(1)); .Times(AtLeast(1));
time_controller.AdvanceTime(TimeDelta::Millis(2)); time_controller.AdvanceTime(TimeDelta::Millis(2));
} }
TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
// Set min_probe_delta to be less than kMinSleepTime (1ms). // Set min_probe_delta to be less than kMinSleepTime (1ms).
const TimeDelta kMinProbeDelta = TimeDelta::Micros(100); const TimeDelta kMinProbeDelta = TimeDelta::Micros(100);
ScopedFieldTrials trials( ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
"WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router, time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime); PacingController::kMinSleepTime, kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level. // Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -540,8 +539,7 @@ namespace test {
// probes is 2x 100us, meaning 4 times per ms we will get least one call to // probes is 2x 100us, meaning 4 times per ms we will get least one call to
// SendPacket(). // SendPacket().
DataSize data_sent = DataSize::Zero(); DataSize data_sent = DataSize::Zero();
EXPECT_CALL( EXPECT_CALL(packet_router,
packet_router,
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
kProbeClusterId))) kProbeClusterId)))
.Times(AtLeast(4)) .Times(AtLeast(4))
@ -560,17 +558,17 @@ namespace test {
// counted into the probe rate here. // counted into the probe rate here.
EXPECT_EQ(data_sent, EXPECT_EQ(data_sent,
kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1)); kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1));
} }
TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) { TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router; MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer( TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr, /*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), /*field_trials=*/nullptr,
kCoalescingWindow); time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
@ -588,6 +586,106 @@ namespace test {
// refresh - stats should not be updated still. // refresh - stats should not be updated still.
time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
EXPECT_EQ(pacer.num_stats_updates_, 0u); EXPECT_EQ(pacer.num_stats_updates_, 0u);
} }
TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10);
const int kPacketBasedHoldback = 5;
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
const TimeDelta kExpectedHoldbackWindow =
kPacketPacingTime * kPacketBasedHoldback;
// `kFixedCoalescingWindow` sets the upper bound for the window.
ASSERT_GE(kFixedCoalescingWindow, kExpectedHoldbackWindow);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
return std::vector<std::unique_ptr<RtpPacketToSend>>();
});
pacer.EnsureStarted();
// Add some packets and wait till all have been sent, so that the pacer
// has a valid estimate of packet size.
const int kNumWarmupPackets = 40;
EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
// Wait until all packes have been sent, with a 2x margin.
time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
// Enqueue packets. Expect only the first one to be sent immediately.
EXPECT_CALL(packet_router, SendPacket).Times(1);
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
time_controller.AdvanceTime(TimeDelta::Zero());
// Advance time to 1ms before the coalescing window ends.
EXPECT_CALL(packet_router, SendPacket).Times(0);
time_controller.AdvanceTime(kExpectedHoldbackWindow - TimeDelta::Millis(1));
// Advance past where the coalescing window should end.
EXPECT_CALL(packet_router, SendPacket).Times(kPacketBasedHoldback - 1);
time_controller.AdvanceTime(TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(2);
const int kPacketBasedHoldback = 5;
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
const TimeDelta kExpectedPacketHoldbackWindow =
kPacketPacingTime * kPacketBasedHoldback;
// |kFixedCoalescingWindow| sets the upper bound for the window.
ASSERT_LT(kFixedCoalescingWindow, kExpectedPacketHoldbackWindow);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
return std::vector<std::unique_ptr<RtpPacketToSend>>();
});
pacer.EnsureStarted();
// Add some packets and wait till all have been sent, so that the pacer
// has a valid estimate of packet size.
const int kNumWarmupPackets = 40;
EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
// Wait until all packes have been sent, with a 2x margin.
time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
// Enqueue packets. Expect onlt the first one to be sent immediately.
EXPECT_CALL(packet_router, SendPacket).Times(1);
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
time_controller.AdvanceTime(TimeDelta::Zero());
// Advance time to the fixed coalescing window, that should take presedence so
// at least some of the packets should be sent.
EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
time_controller.AdvanceTime(kFixedCoalescingWindow);
}
} // namespace test } // namespace test
} // namespace webrtc } // namespace webrtc