TaskQueuePacedSender: Remove pacer status update scheduler

The pacer status is never changed unless MaybeProcessPackets() is
called. This CL removes the scheduler, and updates pacer status after
every MaybeProcessPackets().

Bug: webrtc:10809, webrtc:13417
Change-Id: Ib5f18decf44c1596c0a716d799600a72b2332abd
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/239120
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35489}
This commit is contained in:
Jianhui Dai 2021-12-07 19:34:36 +08:00 committed by WebRTC LUCI CQ
parent af9a834802
commit 9445779545
7 changed files with 146 additions and 293 deletions

View File

@ -163,7 +163,15 @@ absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
TimeDelta PacedSender::OldestPacketWaitTime() const {
MutexLock lock(&mutex_);
return pacing_controller_.OldestPacketWaitTime();
Timestamp oldest_packet = pacing_controller_.OldestPacketEnqueueTime();
if (oldest_packet.IsInfinite())
return TimeDelta::Zero();
// (webrtc:9716): The clock is not always monotonic.
Timestamp current = clock_->CurrentTime();
if (current < oldest_packet)
return TimeDelta::Zero();
return current - oldest_packet;
}
int64_t PacedSender::TimeUntilNextProcess() {

View File

@ -281,13 +281,8 @@ absl::optional<Timestamp> PacingController::FirstSentPacketTime() const {
return first_sent_packet_time_;
}
TimeDelta PacingController::OldestPacketWaitTime() const {
Timestamp oldest_packet = packet_queue_.OldestEnqueueTime();
if (oldest_packet.IsInfinite()) {
return TimeDelta::Zero();
}
return CurrentTime() - oldest_packet;
Timestamp PacingController::OldestPacketEnqueueTime() const {
return packet_queue_.OldestEnqueueTime();
}
void PacingController::EnqueuePacketInternal(

View File

@ -41,7 +41,6 @@ namespace webrtc {
// the processing is done externally (e.g. RtpPacketPacer). Furthermore, the
// forwarding of packets when they are ready to be sent is also handled
// externally, via the PacingController::PacketSender interface.
//
class PacingController {
public:
// Periodic mode uses the IntervalBudget class for tracking bitrate
@ -114,8 +113,8 @@ class PacingController {
void SetTransportOverhead(DataSize overhead_per_packet);
// Returns the time since the oldest queued packet was enqueued.
TimeDelta OldestPacketWaitTime() const;
// Returns the time when the oldest packet was queued.
Timestamp OldestPacketEnqueueTime() const;
// Number of packets in the pacer queue.
size_t QueueSizePackets() const;
@ -125,7 +124,7 @@ class PacingController {
// Current buffer level, i.e. max of media and padding debt.
DataSize CurrentBufferLevel() const;
// Returns the time when the first packet was sent;
// Returns the time when the first packet was sent.
absl::optional<Timestamp> FirstSentPacketTime() const;
// Returns the number of milliseconds it will take to send the current
@ -197,9 +196,10 @@ class PacingController {
mutable Timestamp last_timestamp_;
bool paused_;
// If `use_interval_budget_` is true, `media_budget_` and `padding_budget_`
// will be used to track when packets can be sent. Otherwise the media and
// padding debt counters will be used together with the target rates.
// In dynamic mode, `media_budget_` and `padding_budget_` will be used to
// track when packets can be sent.
// In periodic mode, `media_debt_` and `padding_debt_` will be used together
// with the target rates.
// This is the media budget, keeping track of how many bits of media
// we can pace out during the current interval.

View File

@ -296,7 +296,7 @@ class PacingControllerTest
int64_t capture_time_ms = clock_.TimeInMilliseconds();
const size_t kPacketSize = 250;
EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite());
// Due to the multiplicative factor we can send 5 packets during a send
// interval. (network capacity * multiplier / (8 bits per byte *
@ -1193,7 +1193,7 @@ TEST_P(PacingControllerTest, Pause) {
uint32_t ssrc_high_priority = 12347;
uint16_t sequence_number = 1234;
EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite());
ConsumeInitialBudget();
@ -1222,8 +1222,7 @@ TEST_P(PacingControllerTest, Pause) {
}
// Expect everything to be queued.
EXPECT_EQ(TimeDelta::Millis(second_capture_time_ms - capture_time_ms),
pacer_->OldestPacketWaitTime());
EXPECT_EQ(capture_time_ms, pacer_->OldestPacketEnqueueTime().ms());
// Process triggers keep-alive packet.
EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) {
@ -1299,7 +1298,7 @@ TEST_P(PacingControllerTest, Pause) {
}
}
EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite());
}
TEST_P(PacingControllerTest, InactiveFromStart) {
@ -1349,7 +1348,7 @@ TEST_P(PacingControllerTest, ExpectedQueueTimeMs) {
const size_t kNumPackets = 60;
const size_t kPacketSize = 1200;
const int32_t kMaxBitrate = kPaceMultiplier * 30000;
EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite());
pacer_->SetPacingRates(DataRate::BitsPerSec(30000 * kPaceMultiplier),
DataRate::Zero());
@ -1382,7 +1381,7 @@ TEST_P(PacingControllerTest, ExpectedQueueTimeMs) {
TEST_P(PacingControllerTest, QueueTimeGrowsOverTime) {
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite());
pacer_->SetPacingRates(DataRate::BitsPerSec(30000 * kPaceMultiplier),
DataRate::Zero());
@ -1390,9 +1389,10 @@ TEST_P(PacingControllerTest, QueueTimeGrowsOverTime) {
clock_.TimeInMilliseconds(), 1200);
clock_.AdvanceTimeMilliseconds(500);
EXPECT_EQ(TimeDelta::Millis(500), pacer_->OldestPacketWaitTime());
EXPECT_EQ(clock_.TimeInMilliseconds() - 500,
pacer_->OldestPacketEnqueueTime().ms());
pacer_->ProcessPackets();
EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
EXPECT_TRUE(pacer_->OldestPacketEnqueueTime().IsInfinite());
}
TEST_P(PacingControllerTest, ProbingWithInsertedPackets) {

View File

@ -20,15 +20,6 @@
#include "rtc_base/trace_event.h"
namespace webrtc {
namespace {
// If no calls to MaybeProcessPackets() happen, make sure we update stats
// at least every `kMaxTimeBetweenStatsUpdates` as long as the pacer isn't
// completely drained.
constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
// Don't call UpdateStats() more than `kMinTimeBetweenStatsUpdates` apart,
// for performance reasons.
constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
} // namespace
TaskQueuePacedSender::TaskQueuePacedSender(
Clock* clock,
@ -47,8 +38,7 @@ TaskQueuePacedSender::TaskQueuePacedSender(
field_trials,
PacingController::ProcessMode::kDynamic),
next_process_time_(Timestamp::MinusInfinity()),
stats_update_scheduled_(false),
last_stats_time_(Timestamp::MinusInfinity()),
is_started_(false),
is_shutdown_(false),
packet_size_(/*alpha=*/0.95),
task_queue_(task_queue_factory->CreateTaskQueue(
@ -162,6 +152,7 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
task_queue_.PostTask([this, account_for_audio]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
@ -169,6 +160,7 @@ void TaskQueuePacedSender::SetIncludeOverhead() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetIncludeOverhead();
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
@ -176,6 +168,7 @@ void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
task_queue_.PostTask([this, overhead_per_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetTransportOverhead(overhead_per_packet);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
@ -200,7 +193,15 @@ absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
}
TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
return GetStats().oldest_packet_wait_time;
Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
if (oldest_packet.IsInfinite())
return TimeDelta::Zero();
// (webrtc:9716): The clock is not always monotonic.
Timestamp current = clock_->CurrentTime();
if (current < oldest_packet)
return TimeDelta::Zero();
return current - oldest_packet;
}
void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
@ -275,68 +276,17 @@ void TaskQueuePacedSender::MaybeProcessPackets(
time_to_next_process->ms<uint32_t>());
}
MaybeUpdateStats(false);
UpdateStats();
}
void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
if (is_shutdown_) {
if (is_scheduled_call) {
stats_update_scheduled_ = false;
}
return;
}
Timestamp now = clock_->CurrentTime();
if (is_scheduled_call) {
// Allow scheduled task to process packets to clear up an remaining debt
// level in an otherwise empty queue.
pacing_controller_.ProcessPackets();
} else {
if (now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
// Too frequent unscheduled stats update, return early.
return;
}
}
void TaskQueuePacedSender::UpdateStats() {
Stats new_stats;
new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
new_stats.oldest_packet_wait_time = pacing_controller_.OldestPacketWaitTime();
new_stats.oldest_packet_enqueue_time =
pacing_controller_.OldestPacketEnqueueTime();
new_stats.queue_size = pacing_controller_.QueueSizeData();
OnStatsUpdated(new_stats);
last_stats_time_ = now;
bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
pacing_controller_.CurrentBufferLevel().IsZero();
// If there's anything interesting to get from the pacer and this is a
// scheduled call (or no scheduled call in flight), post a new scheduled stats
// update.
if (!pacer_drained) {
if (!stats_update_scheduled_) {
// There is no pending delayed task to update stats, add one.
// Treat this call as being scheduled in order to bootstrap scheduling
// loop.
stats_update_scheduled_ = true;
is_scheduled_call = true;
}
// Only if on the scheduled call loop do we want to schedule a new delayed
// task.
if (is_scheduled_call) {
task_queue_.PostDelayedTask(
[this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
MaybeUpdateStats(true);
},
kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
}
} else if (is_scheduled_call) {
// This is a scheduled call, signing out since there's nothing interesting
// left to check.
stats_update_scheduled_ = false;
}
}
TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {

View File

@ -66,7 +66,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
// Methods implementing RtpPacketPacer:
// Methods implementing RtpPacketPacer.
void CreateProbeCluster(DataRate bitrate, int cluster_id) override;
@ -112,15 +112,15 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// Exposed as protected for test.
struct Stats {
Stats()
: oldest_packet_wait_time(TimeDelta::Zero()),
: oldest_packet_enqueue_time(Timestamp::MinusInfinity()),
queue_size(DataSize::Zero()),
expected_queue_time(TimeDelta::Zero()) {}
TimeDelta oldest_packet_wait_time;
Timestamp oldest_packet_enqueue_time;
DataSize queue_size;
TimeDelta expected_queue_time;
absl::optional<Timestamp> first_sent_packet_time;
};
virtual void OnStatsUpdated(const Stats& stats);
void OnStatsUpdated(const Stats& stats);
private:
// Check if it is time to send packets, or schedule a delayed task if not.
@ -130,7 +130,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// method again with desired (finite) scheduled process time.
void MaybeProcessPackets(Timestamp scheduled_process_time);
void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_);
void UpdateStats() RTC_RUN_ON(task_queue_);
Stats GetStats() const;
Clock* const clock_;
@ -146,19 +146,9 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// Timestamp::MinusInfinity() indicates no valid pending task.
Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_);
// Since we don't want to support synchronous calls that wait for a
// task execution, we poll the stats at some interval and update
// `current_stats_`, which can in turn be polled at any time.
// True iff there is delayed task in flight that that will call
// UdpateStats().
bool stats_update_scheduled_ RTC_GUARDED_BY(task_queue_);
// Last time stats were updated.
Timestamp last_stats_time_ RTC_GUARDED_BY(task_queue_);
// Indicates if this task queue is started. If not, don't allow
// posting delayed tasks yet.
bool is_started_ RTC_GUARDED_BY(task_queue_) = false;
bool is_started_ RTC_GUARDED_BY(task_queue_);
// Indicates if this task queue is shutting down. If so, don't allow
// posting any more delayed tasks as that can cause the task queue to

View File

@ -56,39 +56,6 @@ class MockPacketRouter : public PacketRouter {
(override));
};
class StatsUpdateObserver {
public:
StatsUpdateObserver() = default;
virtual ~StatsUpdateObserver() = default;
virtual void OnStatsUpdated() = 0;
};
class TaskQueuePacedSenderForTest : public TaskQueuePacedSender {
public:
TaskQueuePacedSenderForTest(Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
TimeDelta hold_back_window,
int max_hold_back_window_in_packets)
: TaskQueuePacedSender(clock,
packet_router,
event_log,
field_trials,
task_queue_factory,
hold_back_window,
max_hold_back_window_in_packets) {}
void OnStatsUpdated(const Stats& stats) override {
++num_stats_updates_;
TaskQueuePacedSender::OnStatsUpdated(stats);
}
size_t num_stats_updates_ = 0;
};
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize target_size) {
// 224 bytes is the max padding size for plain padding packets generated by
@ -149,7 +116,7 @@ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
TEST(TaskQueuePacedSenderTest, PacesPackets) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
@ -189,7 +156,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) {
TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
@ -241,7 +208,7 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
@ -274,11 +241,11 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -311,11 +278,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -339,110 +306,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
pacer.EnsureStarted();
const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
// Nothing inserted, no stats updates yet.
EXPECT_EQ(pacer.num_stats_updates_, 0u);
// Insert one packet, stats should be updated.
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
time_controller.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(pacer.num_stats_updates_, 1u);
// Advance time half of the min stats update interval, and trigger a
// refresh - stats should not be updated yet.
time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
pacer.EnqueuePackets({});
time_controller.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(pacer.num_stats_updates_, 1u);
// Advance time the next half, now stats update is triggered.
time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
pacer.EnqueuePackets({});
time_controller.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(pacer.num_stats_updates_, 2u);
}
TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds 10ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = TimeDelta::Millis(10);
const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
// Nothing inserted, no stats updates yet.
size_t num_expected_stats_updates = 0;
EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
pacer.EnsureStarted();
time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
// Updating pacing rates refreshes stats.
EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
// Record time when we insert first packet, this triggers the scheduled
// stats updating.
Clock* const clock = time_controller.GetClock();
const Timestamp start_time = clock->CurrentTime();
while (clock->CurrentTime() - start_time <=
kMaxTimeBetweenStatsUpdates - kPacketPacingTime) {
// Enqueue packet, expect stats update.
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
time_controller.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
// Advance time to halfway through pacing time, expect another stats
// update.
time_controller.AdvanceTime(kPacketPacingTime / 2);
pacer.EnqueuePackets({});
time_controller.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
// Advance time the rest of the way.
time_controller.AdvanceTime(kPacketPacingTime / 2);
}
// At this point, the pace queue is drained so there is no more intersting
// update to be made - but there is still as schduled task that should run
// `kMaxTimeBetweenStatsUpdates` after the first update.
time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates -
clock->CurrentTime());
EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
// Advance time a significant time - don't expect any more calls as stats
// updating does not happen when queue is drained.
time_controller.AdvanceTime(TimeDelta::Millis(400));
EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
}
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
@ -496,7 +364,6 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
// The pacer should have scheduled the next probe to be sent in
// kProbeTimeDelta. That there was existing scheduled call less than
// PacingController::kMinSleepTime before this should not matter.
EXPECT_CALL(packet_router,
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
kProbeClusterId)))
@ -510,7 +377,7 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
@ -560,45 +427,17 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1));
}
TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
// Nothing inserted, no stats updates yet.
EXPECT_EQ(pacer.num_stats_updates_, 0u);
// Insert one packet, stats should not be updated.
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
time_controller.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(pacer.num_stats_updates_, 0u);
// Advance time of the min stats update interval, and trigger a
// refresh - stats should not be updated still.
time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
EXPECT_EQ(pacer.num_stats_updates_, 0u);
}
TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10);
const int kPacketBasedHoldback = 5;
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -645,11 +484,11 @@ TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSenderForTest pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -687,5 +526,76 @@ TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
time_controller.AdvanceTime(kFixedCoalescingWindow);
}
TEST(TaskQueuePacedSenderTest, Stats) {
static constexpr Timestamp kStartTime = Timestamp::Millis(1234);
GlobalSimulatedTimeController time_controller(kStartTime);
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, kNoPacketHoldback);
// Simulate ~2mbps video stream, covering one second.
static constexpr size_t kPacketsToSend = 200;
static constexpr DataRate kPacingRate =
DataRate::BytesPerSec(kDefaultPacketSize * kPacketsToSend);
pacer.SetPacingRates(kPacingRate, DataRate::Zero());
pacer.EnsureStarted();
// Allowed `QueueSizeData` and `ExpectedQueueTime` deviation.
static constexpr size_t kAllowedPacketsDeviation = 1;
static constexpr DataSize kAllowedQueueSizeDeviation =
DataSize::Bytes(kDefaultPacketSize * kAllowedPacketsDeviation);
static constexpr TimeDelta kAllowedQueueTimeDeviation =
kAllowedQueueSizeDeviation / kPacingRate;
DataSize expected_queue_size = DataSize::MinusInfinity();
TimeDelta expected_queue_time = TimeDelta::MinusInfinity();
EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend);
// Stats before insert any packets.
EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero());
EXPECT_FALSE(pacer.FirstSentPacketTime().has_value());
EXPECT_TRUE(pacer.QueueSizeData().IsZero());
EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero());
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
// Advance to 200ms.
time_controller.AdvanceTime(TimeDelta::Millis(200));
EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(200));
EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
expected_queue_size = kPacingRate * TimeDelta::Millis(800);
expected_queue_time = expected_queue_size / kPacingRate;
EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(),
kAllowedQueueSizeDeviation.bytes());
EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(),
kAllowedQueueTimeDeviation.ms());
// Advance to 500ms.
time_controller.AdvanceTime(TimeDelta::Millis(300));
EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(500));
EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
expected_queue_size = kPacingRate * TimeDelta::Millis(500);
expected_queue_time = expected_queue_size / kPacingRate;
EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(),
kAllowedQueueSizeDeviation.bytes());
EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(),
kAllowedQueueTimeDeviation.ms());
// Advance to 1000ms+, expect all packets to be sent.
time_controller.AdvanceTime(TimeDelta::Millis(500) +
kAllowedQueueTimeDeviation);
EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero());
EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
EXPECT_TRUE(pacer.QueueSizeData().IsZero());
EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero());
}
} // namespace test
} // namespace webrtc