Use unit types in RoundRobingPacketQueue and PacedSender

This CL replaces various int types with DataRata, DataSize, Timestamp
and TimeDelta classes.

This is part of larger refactoring work where most of PacedSender will
be broken out into a class handling the logic and another responsible
for thread handling. Splitting that up for easier reviewing.

Bug: webrtc:10809
Change-Id: If57a238e5090c47bf3a99c2042783ae584b425f1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/148591
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28835}
This commit is contained in:
Erik Språng 2019-08-09 22:44:47 +02:00 committed by Commit Bot
parent 4d207a3460
commit 82d75a6214
5 changed files with 237 additions and 224 deletions

View File

@ -27,14 +27,14 @@
namespace webrtc {
namespace {
// Time limit in milliseconds between packet bursts.
const int64_t kDefaultMinPacketLimitMs = 5;
const int64_t kCongestedPacketIntervalMs = 500;
const int64_t kPausedProcessIntervalMs = kCongestedPacketIntervalMs;
const int64_t kMaxElapsedTimeMs = 2000;
constexpr TimeDelta kDefaultMinPacketLimit = TimeDelta::Millis<5>();
constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis<500>();
constexpr TimeDelta kPausedProcessInterval = kCongestedPacketInterval;
constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds<2>();
// Upper cap on process interval, in case process has not been called in a long
// time.
const int64_t kMaxIntervalTimeMs = 30;
constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis<30>();
bool IsDisabled(const WebRtcKeyValueConfig& field_trials,
absl::string_view key) {
@ -86,22 +86,22 @@ PacedSender::PacedSender(Clock* clock,
send_padding_if_silent_(
IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")),
pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")),
min_packet_limit_ms_("", kDefaultMinPacketLimitMs),
last_timestamp_ms_(clock_->TimeInMilliseconds()),
min_packet_limit_(kDefaultMinPacketLimit),
last_timestamp_(clock_->CurrentTime()),
paused_(false),
media_budget_(0),
padding_budget_(0),
prober_(*field_trials_),
probing_send_failure_(false),
pacing_bitrate_(DataRate::Zero()),
time_last_process_us_(clock->TimeInMicroseconds()),
last_send_time_us_(clock->TimeInMicroseconds()),
packets_(clock->TimeInMicroseconds(), field_trials),
time_last_process_(clock->CurrentTime()),
last_send_time_(time_last_process_),
packets_(time_last_process_, field_trials),
packet_counter_(0),
congestion_window_size_(DataSize::PlusInfinity()),
outstanding_data_(DataSize::Zero()),
process_thread_(nullptr),
queue_time_limit(kMaxQueueLengthMs),
queue_time_limit(TimeDelta::ms(kMaxQueueLengthMs)),
account_for_audio_(false),
legacy_packet_referencing_(
IsEnabled(*field_trials_, "WebRTC-Pacer-LegacyPacketReferencing")) {
@ -109,16 +109,18 @@ PacedSender::PacedSender(Clock* clock,
RTC_LOG(LS_WARNING) << "Pacer queues will not be drained,"
"pushback experiment must be enabled.";
}
ParseFieldTrial({&min_packet_limit_ms_},
FieldTrialParameter<int> min_packet_limit_ms("", min_packet_limit_.ms());
ParseFieldTrial({&min_packet_limit_ms},
field_trials_->Lookup("WebRTC-Pacer-MinPacketLimitMs"));
UpdateBudgetWithElapsedTime(min_packet_limit_ms_);
min_packet_limit_ = TimeDelta::ms(min_packet_limit_ms.Get());
UpdateBudgetWithElapsedTime(min_packet_limit_);
}
PacedSender::~PacedSender() {}
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
rtc::CritScope cs(&critsect_);
prober_.CreateProbeCluster(bitrate.bps(), TimeMilliseconds(), cluster_id);
prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id);
}
void PacedSender::Pause() {
@ -127,7 +129,7 @@ void PacedSender::Pause() {
if (!paused_)
RTC_LOG(LS_INFO) << "PacedSender paused.";
paused_ = true;
packets_.SetPauseState(true, TimeMilliseconds());
packets_.SetPauseState(true, CurrentTime());
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to get
@ -142,7 +144,7 @@ void PacedSender::Resume() {
if (paused_)
RTC_LOG(LS_INFO) << "PacedSender resumed.";
paused_ = false;
packets_.SetPauseState(false, TimeMilliseconds());
packets_.SetPauseState(false, CurrentTime());
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to
@ -168,17 +170,17 @@ bool PacedSender::Congested() const {
return false;
}
int64_t PacedSender::TimeMilliseconds() const {
int64_t time_ms = clock_->TimeInMilliseconds();
if (time_ms < last_timestamp_ms_) {
Timestamp PacedSender::CurrentTime() const {
Timestamp time = clock_->CurrentTime();
if (time < last_timestamp_) {
RTC_LOG(LS_WARNING)
<< "Non-monotonic clock behavior observed. Previous timestamp: "
<< last_timestamp_ms_ << ", new timestamp: " << time_ms;
RTC_DCHECK_GE(time_ms, last_timestamp_ms_);
time_ms = last_timestamp_ms_;
<< last_timestamp_.ms() << ", new timestamp: " << time.ms();
RTC_DCHECK_GE(time, last_timestamp_);
time = last_timestamp_;
}
last_timestamp_ms_ = time_ms;
return time_ms;
last_timestamp_ = time;
return time;
}
void PacedSender::SetProbingEnabled(bool enabled) {
@ -208,11 +210,11 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
int64_t now_ms = TimeMilliseconds();
Timestamp now = CurrentTime();
prober_.OnIncomingPacket(bytes);
if (capture_time_ms < 0)
capture_time_ms = now_ms;
capture_time_ms = now.ms();
RtpPacketToSend::Type type;
switch (priority) {
@ -226,7 +228,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
type = RtpPacketToSend::Type::kVideo;
}
packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number,
capture_time_ms, now_ms, bytes, retransmission,
capture_time_ms, now, DataSize::bytes(bytes), retransmission,
packet_counter_++);
}
@ -235,16 +237,16 @@ void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
int64_t now_ms = TimeMilliseconds();
Timestamp now = CurrentTime();
prober_.OnIncomingPacket(packet->payload_size());
if (packet->capture_time_ms() < 0) {
packet->set_capture_time_ms(now_ms);
packet->set_capture_time_ms(now.ms());
}
RTC_CHECK(packet->packet_type());
int priority = GetPriorityForType(*packet->packet_type());
packets_.Push(priority, now_ms, packet_counter_++, std::move(packet));
packets_.Push(priority, now, packet_counter_++, std::move(packet));
}
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
@ -267,7 +269,7 @@ size_t PacedSender::QueueSizePackets() const {
DataSize PacedSender::QueueSizeData() const {
rtc::CritScope cs(&critsect_);
return DataSize::bytes(packets_.SizeInBytes());
return packets_.Size();
}
absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
@ -277,51 +279,50 @@ absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
TimeDelta PacedSender::OldestPacketWaitTime() const {
rtc::CritScope cs(&critsect_);
int64_t oldest_packet = packets_.OldestEnqueueTimeMs();
if (oldest_packet == 0) {
Timestamp oldest_packet = packets_.OldestEnqueueTime();
if (oldest_packet.IsInfinite()) {
return TimeDelta::Zero();
}
return TimeDelta::ms(TimeMilliseconds() - oldest_packet);
return CurrentTime() - oldest_packet;
}
int64_t PacedSender::TimeUntilNextProcess() {
rtc::CritScope cs(&critsect_);
int64_t elapsed_time_us =
clock_->TimeInMicroseconds() - time_last_process_us_;
int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
TimeDelta elapsed_time = CurrentTime() - time_last_process_;
// When paused we wake up every 500 ms to send a padding packet to ensure
// we won't get stuck in the paused state due to no feedback being received.
if (paused_)
return std::max<int64_t>(kPausedProcessIntervalMs - elapsed_time_ms, 0);
if (paused_) {
return std::max(kPausedProcessInterval - elapsed_time, TimeDelta::Zero())
.ms();
}
if (prober_.IsProbing()) {
int64_t ret = prober_.TimeUntilNextProbe(TimeMilliseconds());
int64_t ret = prober_.TimeUntilNextProbe(CurrentTime().ms());
if (ret > 0 || (ret == 0 && !probing_send_failure_))
return ret;
}
return std::max<int64_t>(min_packet_limit_ms_ - elapsed_time_ms, 0);
return std::max(min_packet_limit_ - elapsed_time, TimeDelta::Zero()).ms();
}
int64_t PacedSender::UpdateTimeAndGetElapsedMs(int64_t now_us) {
int64_t elapsed_time_ms = (now_us - time_last_process_us_ + 500) / 1000;
time_last_process_us_ = now_us;
if (elapsed_time_ms > kMaxElapsedTimeMs) {
RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time_ms
TimeDelta PacedSender::UpdateTimeAndGetElapsed(Timestamp now) {
TimeDelta elapsed_time = now - time_last_process_;
time_last_process_ = now;
if (elapsed_time > kMaxElapsedTime) {
RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time.ms()
<< " ms) longer than expected, limiting to "
<< kMaxElapsedTimeMs << " ms";
elapsed_time_ms = kMaxElapsedTimeMs;
<< kMaxElapsedTime.ms();
elapsed_time = kMaxElapsedTime;
}
return elapsed_time_ms;
return elapsed_time;
}
bool PacedSender::ShouldSendKeepalive(int64_t now_us) const {
bool PacedSender::ShouldSendKeepalive(Timestamp now) const {
if (send_padding_if_silent_ || paused_ || Congested()) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
int64_t elapsed_since_last_send_us = now_us - last_send_time_us_;
if (elapsed_since_last_send_us >= kCongestedPacketIntervalMs * 1000) {
TimeDelta elapsed_since_last_send = now - last_send_time_;
if (elapsed_since_last_send >= kCongestedPacketInterval) {
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (packet_counter_ > 0) {
@ -334,66 +335,66 @@ bool PacedSender::ShouldSendKeepalive(int64_t now_us) const {
void PacedSender::Process() {
rtc::CritScope cs(&critsect_);
int64_t now_us = clock_->TimeInMicroseconds();
int64_t elapsed_time_ms = UpdateTimeAndGetElapsedMs(now_us);
if (ShouldSendKeepalive(now_us)) {
Timestamp now = CurrentTime();
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
if (ShouldSendKeepalive(now)) {
if (legacy_packet_referencing_) {
critsect_.Leave();
size_t bytes_sent =
packet_router_->TimeToSendPadding(1, PacedPacketInfo());
critsect_.Enter();
OnPaddingSent(bytes_sent);
OnPaddingSent(DataSize::bytes(bytes_sent));
} else {
size_t keepalive_bytes_sent = 0;
DataSize keepalive_data_sent = DataSize::Zero();
critsect_.Leave();
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_router_->GeneratePadding(1);
for (auto& packet : keepalive_packets) {
keepalive_bytes_sent += packet->payload_size() + packet->padding_size();
keepalive_data_sent +=
DataSize::bytes(packet->payload_size() + packet->padding_size());
packet_router_->SendPacket(std::move(packet), PacedPacketInfo());
}
critsect_.Enter();
OnPaddingSent(keepalive_bytes_sent);
OnPaddingSent(keepalive_data_sent);
}
}
if (paused_)
return;
if (elapsed_time_ms > 0) {
int target_bitrate_kbps = pacing_bitrate_.kbps();
size_t queue_size_bytes = packets_.SizeInBytes();
if (queue_size_bytes > 0) {
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packets_.Size();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packets_.UpdateQueueTime(TimeMilliseconds());
packets_.UpdateQueueTime(CurrentTime());
if (drain_large_queues_) {
int64_t avg_time_left_ms = std::max<int64_t>(
1, queue_time_limit - packets_.AverageQueueTimeMs());
int min_bitrate_needed_kbps =
static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
if (min_bitrate_needed_kbps > target_bitrate_kbps) {
target_bitrate_kbps = min_bitrate_needed_kbps;
TimeDelta avg_time_left = std::max(
TimeDelta::ms(1), queue_time_limit - packets_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
<< target_bitrate_kbps;
<< target_rate.kbps();
}
}
}
media_budget_.set_target_rate_kbps(target_bitrate_kbps);
UpdateBudgetWithElapsedTime(elapsed_time_ms);
media_budget_.set_target_rate_kbps(target_rate.kbps());
UpdateBudgetWithElapsedTime(elapsed_time);
}
bool is_probing = prober_.IsProbing();
PacedPacketInfo pacing_info;
absl::optional<size_t> recommended_probe_size;
absl::optional<DataSize> recommended_probe_size;
if (is_probing) {
pacing_info = prober_.CurrentCluster();
recommended_probe_size = prober_.RecommendedMinProbeSize();
recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
}
size_t bytes_sent = 0;
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
@ -401,12 +402,12 @@ void PacedSender::Process() {
if (packet == nullptr) {
// No packet available to send, check if we should send padding.
if (!legacy_packet_referencing_) {
size_t padding_bytes_to_add =
PaddingBytesToAdd(recommended_probe_size, bytes_sent);
if (padding_bytes_to_add > 0) {
DataSize padding_to_add =
PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
critsect_.Leave();
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_router_->GeneratePadding(padding_bytes_to_add);
packet_router_->GeneratePadding(padding_to_add.bytes());
critsect_.Enter();
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
@ -445,10 +446,10 @@ void PacedSender::Process() {
success == RtpPacketSendResult::kPacketNotFound) {
// Packet sent or invalid packet, remove it from queue.
// TODO(webrtc:8052): Don't consume media budget on kInvalid.
bytes_sent += packet->size_in_bytes();
data_sent += packet->size();
// Send succeeded, remove it from the queue.
OnPacketSent(packet);
if (recommended_probe_size && bytes_sent > *recommended_probe_size)
if (recommended_probe_size && data_sent > *recommended_probe_size)
break;
} else if (owned_rtp_packet) {
// Send failed, but we can't put it back in the queue, remove it without
@ -466,25 +467,27 @@ void PacedSender::Process() {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (packet_counter_ > 0) {
int padding_needed = static_cast<int>(
recommended_probe_size ? (*recommended_probe_size - bytes_sent)
: padding_budget_.bytes_remaining());
if (padding_needed > 0) {
size_t padding_sent = 0;
DataSize padding_needed =
(recommended_probe_size && *recommended_probe_size > data_sent)
? (*recommended_probe_size - data_sent)
: DataSize::bytes(padding_budget_.bytes_remaining());
if (padding_needed > DataSize::Zero()) {
DataSize padding_sent = DataSize::Zero();
critsect_.Leave();
padding_sent =
packet_router_->TimeToSendPadding(padding_needed, pacing_info);
padding_sent = DataSize::bytes(packet_router_->TimeToSendPadding(
padding_needed.bytes(), pacing_info));
critsect_.Enter();
bytes_sent += padding_sent;
data_sent += padding_sent;
OnPaddingSent(padding_sent);
}
}
}
if (is_probing) {
probing_send_failure_ = bytes_sent == 0;
if (!probing_send_failure_)
prober_.ProbeSent(TimeMilliseconds(), bytes_sent);
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
}
}
}
@ -494,33 +497,33 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
process_thread_ = process_thread;
}
size_t PacedSender::PaddingBytesToAdd(
absl::optional<size_t> recommended_probe_size,
size_t bytes_sent) {
DataSize PacedSender::PaddingToAdd(
absl::optional<DataSize> recommended_probe_size,
DataSize data_sent) {
if (!packets_.Empty()) {
// Actual payload available, no need to add padding.
return 0;
return DataSize::Zero();
}
if (Congested()) {
// Don't add padding if congested, even if requested for probing.
return 0;
return DataSize::Zero();
}
if (packet_counter_ == 0) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
return 0;
return DataSize::Zero();
}
if (recommended_probe_size) {
if (*recommended_probe_size > bytes_sent) {
return *recommended_probe_size - bytes_sent;
if (*recommended_probe_size > data_sent) {
return *recommended_probe_size - data_sent;
}
return 0;
return DataSize::Zero();
}
return padding_budget_.bytes_remaining();
return DataSize::bytes(padding_budget_.bytes_remaining());
}
RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
@ -545,41 +548,42 @@ RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
}
void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) {
Timestamp now = CurrentTime();
if (!first_sent_packet_time_) {
first_sent_packet_time_ = Timestamp::ms(TimeMilliseconds());
first_sent_packet_time_ = now;
}
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
UpdateBudgetWithBytesSent(packet->size_in_bytes());
last_send_time_us_ = clock_->TimeInMicroseconds();
UpdateBudgetWithSentData(packet->size());
last_send_time_ = now;
}
// Send succeeded, remove it from the queue.
packets_.FinalizePop();
}
void PacedSender::OnPaddingSent(size_t bytes_sent) {
if (bytes_sent > 0) {
UpdateBudgetWithBytesSent(bytes_sent);
void PacedSender::OnPaddingSent(DataSize data_sent) {
if (data_sent > DataSize::Zero()) {
UpdateBudgetWithSentData(data_sent);
}
last_send_time_us_ = clock_->TimeInMicroseconds();
last_send_time_ = CurrentTime();
}
void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
delta_time_ms = std::min(kMaxIntervalTimeMs, delta_time_ms);
media_budget_.IncreaseBudget(delta_time_ms);
padding_budget_.IncreaseBudget(delta_time_ms);
void PacedSender::UpdateBudgetWithElapsedTime(TimeDelta delta) {
delta = std::min(kMaxProcessingInterval, delta);
media_budget_.IncreaseBudget(delta.ms());
padding_budget_.IncreaseBudget(delta.ms());
}
void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
outstanding_data_ += DataSize::bytes(bytes_sent);
media_budget_.UseBudget(bytes_sent);
padding_budget_.UseBudget(bytes_sent);
void PacedSender::UpdateBudgetWithSentData(DataSize size) {
outstanding_data_ += size;
media_budget_.UseBudget(size.bytes());
padding_budget_.UseBudget(size.bytes());
}
void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
rtc::CritScope cs(&critsect_);
queue_time_limit = limit.ms();
queue_time_limit = limit;
}
} // namespace webrtc

View File

@ -43,7 +43,7 @@ class PacedSender : public Module,
public RtpPacketPacer,
public RtpPacketSender {
public:
// Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than
// Expected max pacer delay in ms. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
// UpdateBitrate() so that this limit will be upheld.
@ -134,19 +134,19 @@ class PacedSender : public Module,
void ProcessThreadAttached(ProcessThread* process_thread) override;
private:
int64_t UpdateTimeAndGetElapsedMs(int64_t now_us)
TimeDelta UpdateTimeAndGetElapsed(Timestamp now)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool ShouldSendKeepalive(int64_t at_time_us) const
bool ShouldSendKeepalive(Timestamp now) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBudgetWithElapsedTime(int64_t delta_time_in_ms)
void UpdateBudgetWithElapsedTime(TimeDelta delta)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void UpdateBudgetWithBytesSent(size_t bytes)
void UpdateBudgetWithSentData(DataSize size)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
size_t PaddingBytesToAdd(absl::optional<size_t> recommended_probe_size,
size_t bytes_sent)
DataSize PaddingToAdd(absl::optional<DataSize> recommended_probe_size,
DataSize data_sent)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
@ -154,11 +154,11 @@ class PacedSender : public Module,
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnPaddingSent(size_t padding_sent)
void OnPaddingSent(DataSize padding_sent)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
int64_t TimeMilliseconds() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
Timestamp CurrentTime() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
Clock* const clock_;
PacketRouter* const packet_router_;
@ -168,12 +168,12 @@ class PacedSender : public Module,
const bool drain_large_queues_;
const bool send_padding_if_silent_;
const bool pace_audio_;
FieldTrialParameter<int> min_packet_limit_ms_;
TimeDelta min_packet_limit_;
rtc::CriticalSection critsect_;
// TODO(webrtc:9716): Remove this when we are certain clocks are monotonic.
// The last millisecond timestamp returned by |clock_|.
mutable int64_t last_timestamp_ms_ RTC_GUARDED_BY(critsect_);
mutable Timestamp last_timestamp_ RTC_GUARDED_BY(critsect_);
bool paused_ RTC_GUARDED_BY(critsect_);
// This is the media budget, keeping track of how many bits of media
// we can pace out during the current interval.
@ -188,8 +188,8 @@ class PacedSender : public Module,
DataRate pacing_bitrate_ RTC_GUARDED_BY(critsect_);
int64_t time_last_process_us_ RTC_GUARDED_BY(critsect_);
int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_);
Timestamp time_last_process_ RTC_GUARDED_BY(critsect_);
Timestamp last_send_time_ RTC_GUARDED_BY(critsect_);
absl::optional<Timestamp> first_sent_packet_time_ RTC_GUARDED_BY(critsect_);
RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_);
@ -206,7 +206,7 @@ class PacedSender : public Module,
rtc::CriticalSection process_thread_lock_;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_);
int64_t queue_time_limit RTC_GUARDED_BY(critsect_);
TimeDelta queue_time_limit RTC_GUARDED_BY(critsect_);
bool account_for_audio_ RTC_GUARDED_BY(critsect_);
// If true, PacedSender should only reference packets as in legacy mode.

View File

@ -506,7 +506,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) {
const uint32_t kSsrc = 12345;
const size_t kSizeBytes = 250;
const size_t kPacketToSend = 3;
const int64_t kStartMs = clock_.TimeInMilliseconds();
const Timestamp kStartTime = clock_.CurrentTime();
// No packet sent.
EXPECT_FALSE(send_bucket_->FirstSentPacketTime().has_value());
@ -517,7 +517,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) {
send_bucket_->Process();
clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess());
}
EXPECT_EQ(Timestamp::ms(kStartMs), send_bucket_->FirstSentPacketTime());
EXPECT_EQ(kStartTime, send_bucket_->FirstSentPacketTime());
}
TEST_P(PacedSenderTest, QueuePacket) {

View File

@ -17,6 +17,9 @@
#include "rtc_base/checks.h"
namespace webrtc {
namespace {
static constexpr DataSize kMaxLeadingSize = DataSize::Bytes<1400>();
}
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
default;
@ -28,11 +31,11 @@ RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order,
std::multiset<int64_t>::iterator enqueue_time_it,
std::multiset<Timestamp>::iterator enqueue_time_it,
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
packet_it)
: type_(type),
@ -40,8 +43,8 @@ RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
ssrc_(ssrc),
sequence_number_(seq_number),
capture_time_ms_(capture_time_ms),
enqueue_time_ms_(enqueue_time_ms),
bytes_(length_in_bytes),
enqueue_time_(enqueue_time),
size_(size),
retransmission_(retransmission),
enqueue_order_(enqueue_order),
enqueue_time_it_(enqueue_time_it),
@ -52,9 +55,9 @@ RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
return packet_it_ ? std::move(**packet_it_) : nullptr;
}
void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs(
int64_t pause_time_sum_ms) {
enqueue_time_ms_ -= pause_time_sum_ms;
void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
TimeDelta pause_time_sum) {
enqueue_time_ -= pause_time_sum;
}
bool RoundRobinPacketQueue::QueuedPacket::operator<(
@ -67,7 +70,7 @@ bool RoundRobinPacketQueue::QueuedPacket::operator<(
return enqueue_order_ > other.enqueue_order_;
}
RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {}
RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
RoundRobinPacketQueue::Stream::~Stream() {}
@ -79,9 +82,15 @@ bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
}
RoundRobinPacketQueue::RoundRobinPacketQueue(
int64_t start_time_us,
Timestamp start_time,
const WebRtcKeyValueConfig* field_trials)
: time_last_updated_ms_(start_time_us / 1000),
: time_last_updated_(start_time),
paused_(false),
size_packets_(0),
size_(DataSize::Zero()),
max_size_(kMaxLeadingSize),
queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()),
send_side_bwe_with_overhead_(
IsEnabled(field_trials, "WebRTC-SendSideBwe-WithOverhead")) {}
@ -92,35 +101,34 @@ void RoundRobinPacketQueue::Push(int priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order) {
Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
enqueue_time_ms, length_in_bytes, retransmission,
enqueue_order, enqueue_times_.insert(enqueue_time_ms),
absl::nullopt));
enqueue_time, size, retransmission, enqueue_order,
enqueue_times_.insert(enqueue_time), absl::nullopt));
}
void RoundRobinPacketQueue::Push(int priority,
int64_t enqueue_time_ms,
Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet) {
uint32_t ssrc = packet->Ssrc();
uint16_t sequence_number = packet->SequenceNumber();
int64_t capture_time_ms = packet->capture_time_ms();
size_t size_bytes = send_side_bwe_with_overhead_
DataSize size =
DataSize::bytes(send_side_bwe_with_overhead_
? packet->size()
: packet->payload_size() + packet->padding_size();
: packet->payload_size() + packet->padding_size());
auto type = packet->packet_type();
RTC_DCHECK(type.has_value());
rtp_packets_.push_front(std::move(packet));
Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms,
enqueue_time_ms, size_bytes,
*type == RtpPacketToSend::Type::kRetransmission,
enqueue_order, enqueue_times_.insert(enqueue_time_ms),
rtp_packets_.begin()));
Push(QueuedPacket(
priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
}
RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
@ -153,9 +161,9 @@ void RoundRobinPacketQueue::FinalizePop() {
// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
int64_t time_in_non_paused_state_ms =
time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_;
queue_time_sum_ms_ -= time_in_non_paused_state_ms;
TimeDelta time_in_non_paused_state =
time_last_updated_ - packet.enqueue_time() - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;
RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
enqueue_times_.erase(packet.EnqueueTimeIterator());
@ -171,13 +179,13 @@ void RoundRobinPacketQueue::FinalizePop() {
// case a "budget" will be built up for the stream sending at the lower
// rate. To avoid building a too large budget we limit |bytes| to be within
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
stream->bytes = std::max(stream->bytes + packet.size_in_bytes(),
max_bytes_ - kMaxLeadingBytes);
max_bytes_ = std::max(max_bytes_, stream->bytes);
stream->size =
std::max(stream->size + packet.size(), max_size_ - kMaxLeadingSize);
max_size_ = std::max(max_size_, stream->size);
size_bytes_ -= packet.size_in_bytes();
size_ -= packet.size();
size_packets_ -= 1;
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
// If there are packets left to be sent, schedule the stream again.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
@ -186,7 +194,7 @@ void RoundRobinPacketQueue::FinalizePop() {
} else {
int priority = stream->packet_queue.top().priority();
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(priority, stream->bytes), stream->ssrc);
StreamPrioKey(priority, stream->size), stream->ssrc);
}
pop_packet_.reset();
@ -204,44 +212,44 @@ size_t RoundRobinPacketQueue::SizeInPackets() const {
return size_packets_;
}
uint64_t RoundRobinPacketQueue::SizeInBytes() const {
return size_bytes_;
DataSize RoundRobinPacketQueue::Size() const {
return size_;
}
int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const {
Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
if (Empty())
return 0;
return Timestamp::MinusInfinity();
RTC_CHECK(!enqueue_times_.empty());
return *enqueue_times_.begin();
}
void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
RTC_CHECK_GE(timestamp_ms, time_last_updated_ms_);
if (timestamp_ms == time_last_updated_ms_)
void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) {
RTC_CHECK_GE(now, time_last_updated_);
if (now == time_last_updated_)
return;
int64_t delta_ms = timestamp_ms - time_last_updated_ms_;
TimeDelta delta = now - time_last_updated_;
if (paused_) {
pause_time_sum_ms_ += delta_ms;
pause_time_sum_ += delta;
} else {
queue_time_sum_ms_ += delta_ms * size_packets_;
queue_time_sum_ += TimeDelta::us(delta.us() * size_packets_);
}
time_last_updated_ms_ = timestamp_ms;
time_last_updated_ = now;
}
void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) {
if (paused_ == paused)
return;
UpdateQueueTime(timestamp_ms);
UpdateQueueTime(now);
paused_ = paused;
}
int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const {
TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
if (Empty())
return 0;
return queue_time_sum_ms_ / size_packets_;
return TimeDelta::Zero();
return queue_time_sum_ / size_packets_;
}
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
@ -258,14 +266,14 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) {
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
} else if (packet.priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that |priority_| uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
@ -275,11 +283,11 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) {
// amount of time the queue has been paused at that moment. This way we
// subtract the total amount of time the packet has spent in the queue while
// in a paused state.
UpdateQueueTime(packet.enqueue_time_ms());
packet.SubtractPauseTimeMs(pause_time_sum_ms_);
UpdateQueueTime(packet.enqueue_time());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;
size_bytes_ += packet.size_in_bytes();
size_ += packet.size();
stream->packet_queue.push(packet);
}

View File

@ -22,6 +22,9 @@
#include "absl/types/optional.h"
#include "api/transport/webrtc_key_value_config.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "system_wrappers/include/clock.h"
@ -30,7 +33,7 @@ namespace webrtc {
class RoundRobinPacketQueue {
public:
RoundRobinPacketQueue(int64_t start_time_us,
RoundRobinPacketQueue(Timestamp start_time,
const WebRtcKeyValueConfig* field_trials);
~RoundRobinPacketQueue();
@ -42,11 +45,11 @@ class RoundRobinPacketQueue {
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order,
std::multiset<int64_t>::iterator enqueue_time_it,
std::multiset<Timestamp>::iterator enqueue_time_it,
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
packet_it);
QueuedPacket(const QueuedPacket& rhs);
@ -59,8 +62,8 @@ class RoundRobinPacketQueue {
uint32_t ssrc() const { return ssrc_; }
uint16_t sequence_number() const { return sequence_number_; }
int64_t capture_time_ms() const { return capture_time_ms_; }
int64_t enqueue_time_ms() const { return enqueue_time_ms_; }
size_t size_in_bytes() const { return bytes_; }
Timestamp enqueue_time() const { return enqueue_time_; }
DataSize size() const { return size_; }
bool is_retransmission() const { return retransmission_; }
uint64_t enqueue_order() const { return enqueue_order_; }
std::unique_ptr<RtpPacketToSend> ReleasePacket();
@ -70,10 +73,10 @@ class RoundRobinPacketQueue {
PacketIterator() const {
return packet_it_;
}
std::multiset<int64_t>::iterator EnqueueTimeIterator() const {
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const {
return enqueue_time_it_;
}
void SubtractPauseTimeMs(int64_t pause_time_sum_ms);
void SubtractPauseTime(TimeDelta pause_time_sum);
private:
RtpPacketToSend::Type type_;
@ -81,11 +84,11 @@ class RoundRobinPacketQueue {
uint32_t ssrc_;
uint16_t sequence_number_;
int64_t capture_time_ms_; // Absolute time of frame capture.
int64_t enqueue_time_ms_; // Absolute time of pacer queue entry.
size_t bytes_;
Timestamp enqueue_time_; // Absolute time of pacer queue entry.
DataSize size_;
bool retransmission_;
uint64_t enqueue_order_;
std::multiset<int64_t>::iterator enqueue_time_it_;
std::multiset<Timestamp>::iterator enqueue_time_it_;
// Iterator into |rtp_packets_| where the memory for RtpPacket is owned,
// if applicable.
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
@ -97,12 +100,12 @@ class RoundRobinPacketQueue {
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order);
void Push(int priority,
int64_t enqueue_time_ms,
Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet);
QueuedPacket* BeginPop();
@ -111,26 +114,26 @@ class RoundRobinPacketQueue {
bool Empty() const;
size_t SizeInPackets() const;
uint64_t SizeInBytes() const;
DataSize Size() const;
int64_t OldestEnqueueTimeMs() const;
int64_t AverageQueueTimeMs() const;
void UpdateQueueTime(int64_t timestamp_ms);
void SetPauseState(bool paused, int64_t timestamp_ms);
Timestamp OldestEnqueueTime() const;
TimeDelta AverageQueueTime() const;
void UpdateQueueTime(Timestamp now);
void SetPauseState(bool paused, Timestamp now);
private:
struct StreamPrioKey {
StreamPrioKey(int priority, int64_t bytes)
: priority(priority), bytes(bytes) {}
StreamPrioKey(int priority, DataSize size)
: priority(priority), size(size) {}
bool operator<(const StreamPrioKey& other) const {
if (priority != other.priority)
return priority < other.priority;
return bytes < other.bytes;
return size < other.size;
}
const int priority;
const size_t bytes;
const DataSize size;
};
struct Stream {
@ -139,7 +142,7 @@ class RoundRobinPacketQueue {
virtual ~Stream();
size_t bytes;
DataSize size;
uint32_t ssrc;
std::priority_queue<QueuedPacket> packet_queue;
@ -151,8 +154,6 @@ class RoundRobinPacketQueue {
std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
};
static constexpr size_t kMaxLeadingBytes = 1400;
void Push(QueuedPacket packet);
Stream* GetHighestPriorityStream();
@ -160,16 +161,16 @@ class RoundRobinPacketQueue {
// Just used to verify correctness.
bool IsSsrcScheduled(uint32_t ssrc) const;
int64_t time_last_updated_ms_;
Timestamp time_last_updated_;
absl::optional<QueuedPacket> pop_packet_;
absl::optional<Stream*> pop_stream_;
bool paused_ = false;
size_t size_packets_ = 0;
size_t size_bytes_ = 0;
size_t max_bytes_ = kMaxLeadingBytes;
int64_t queue_time_sum_ms_ = 0;
int64_t pause_time_sum_ms_ = 0;
bool paused_;
size_t size_packets_;
DataSize size_;
DataSize max_size_;
TimeDelta queue_time_sum_;
TimeDelta pause_time_sum_;
// A map of streams used to prioritize from which stream to send next. We use
// a multimap instead of a priority_queue since the priority of a stream can
@ -182,7 +183,7 @@ class RoundRobinPacketQueue {
// The enqueue time of every packet currently in the queue. Used to figure out
// the age of the oldest packet in the queue.
std::multiset<int64_t> enqueue_times_;
std::multiset<Timestamp> enqueue_times_;
// List of RTP packets to be sent, not necessarily in the order they will be
// sent. PacketInfo.packet_it will point to an entry in this list, or the