From 82d75a6214bbdcc867ff4e2689021f8d67a745f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Fri, 9 Aug 2019 22:44:47 +0200 Subject: [PATCH] Use unit types in RoundRobingPacketQueue and PacedSender MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This CL replaces various int types with DataRata, DataSize, Timestamp and TimeDelta classes. This is part of larger refactoring work where most of PacedSender will be broken out into a class handling the logic and another responsible for thread handling. Splitting that up for easier reviewing. Bug: webrtc:10809 Change-Id: If57a238e5090c47bf3a99c2042783ae584b425f1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/148591 Reviewed-by: Sebastian Jansson Commit-Queue: Erik Språng Cr-Commit-Position: refs/heads/master@{#28835} --- modules/pacing/paced_sender.cc | 244 +++++++++++---------- modules/pacing/paced_sender.h | 28 +-- modules/pacing/paced_sender_unittest.cc | 4 +- modules/pacing/round_robin_packet_queue.cc | 116 +++++----- modules/pacing/round_robin_packet_queue.h | 69 +++--- 5 files changed, 237 insertions(+), 224 deletions(-) diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 3951cf0245..25a15a129b 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -27,14 +27,14 @@ namespace webrtc { namespace { // Time limit in milliseconds between packet bursts. -const int64_t kDefaultMinPacketLimitMs = 5; -const int64_t kCongestedPacketIntervalMs = 500; -const int64_t kPausedProcessIntervalMs = kCongestedPacketIntervalMs; -const int64_t kMaxElapsedTimeMs = 2000; +constexpr TimeDelta kDefaultMinPacketLimit = TimeDelta::Millis<5>(); +constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis<500>(); +constexpr TimeDelta kPausedProcessInterval = kCongestedPacketInterval; +constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds<2>(); // Upper cap on process interval, in case process has not been called in a long // time. -const int64_t kMaxIntervalTimeMs = 30; +constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis<30>(); bool IsDisabled(const WebRtcKeyValueConfig& field_trials, absl::string_view key) { @@ -86,22 +86,22 @@ PacedSender::PacedSender(Clock* clock, send_padding_if_silent_( IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")), pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")), - min_packet_limit_ms_("", kDefaultMinPacketLimitMs), - last_timestamp_ms_(clock_->TimeInMilliseconds()), + min_packet_limit_(kDefaultMinPacketLimit), + last_timestamp_(clock_->CurrentTime()), paused_(false), media_budget_(0), padding_budget_(0), prober_(*field_trials_), probing_send_failure_(false), pacing_bitrate_(DataRate::Zero()), - time_last_process_us_(clock->TimeInMicroseconds()), - last_send_time_us_(clock->TimeInMicroseconds()), - packets_(clock->TimeInMicroseconds(), field_trials), + time_last_process_(clock->CurrentTime()), + last_send_time_(time_last_process_), + packets_(time_last_process_, field_trials), packet_counter_(0), congestion_window_size_(DataSize::PlusInfinity()), outstanding_data_(DataSize::Zero()), process_thread_(nullptr), - queue_time_limit(kMaxQueueLengthMs), + queue_time_limit(TimeDelta::ms(kMaxQueueLengthMs)), account_for_audio_(false), legacy_packet_referencing_( IsEnabled(*field_trials_, "WebRTC-Pacer-LegacyPacketReferencing")) { @@ -109,16 +109,18 @@ PacedSender::PacedSender(Clock* clock, RTC_LOG(LS_WARNING) << "Pacer queues will not be drained," "pushback experiment must be enabled."; } - ParseFieldTrial({&min_packet_limit_ms_}, + FieldTrialParameter min_packet_limit_ms("", min_packet_limit_.ms()); + ParseFieldTrial({&min_packet_limit_ms}, field_trials_->Lookup("WebRTC-Pacer-MinPacketLimitMs")); - UpdateBudgetWithElapsedTime(min_packet_limit_ms_); + min_packet_limit_ = TimeDelta::ms(min_packet_limit_ms.Get()); + UpdateBudgetWithElapsedTime(min_packet_limit_); } PacedSender::~PacedSender() {} void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) { rtc::CritScope cs(&critsect_); - prober_.CreateProbeCluster(bitrate.bps(), TimeMilliseconds(), cluster_id); + prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id); } void PacedSender::Pause() { @@ -127,7 +129,7 @@ void PacedSender::Pause() { if (!paused_) RTC_LOG(LS_INFO) << "PacedSender paused."; paused_ = true; - packets_.SetPauseState(true, TimeMilliseconds()); + packets_.SetPauseState(true, CurrentTime()); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to get @@ -142,7 +144,7 @@ void PacedSender::Resume() { if (paused_) RTC_LOG(LS_INFO) << "PacedSender resumed."; paused_ = false; - packets_.SetPauseState(false, TimeMilliseconds()); + packets_.SetPauseState(false, CurrentTime()); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to @@ -168,17 +170,17 @@ bool PacedSender::Congested() const { return false; } -int64_t PacedSender::TimeMilliseconds() const { - int64_t time_ms = clock_->TimeInMilliseconds(); - if (time_ms < last_timestamp_ms_) { +Timestamp PacedSender::CurrentTime() const { + Timestamp time = clock_->CurrentTime(); + if (time < last_timestamp_) { RTC_LOG(LS_WARNING) << "Non-monotonic clock behavior observed. Previous timestamp: " - << last_timestamp_ms_ << ", new timestamp: " << time_ms; - RTC_DCHECK_GE(time_ms, last_timestamp_ms_); - time_ms = last_timestamp_ms_; + << last_timestamp_.ms() << ", new timestamp: " << time.ms(); + RTC_DCHECK_GE(time, last_timestamp_); + time = last_timestamp_; } - last_timestamp_ms_ = time_ms; - return time_ms; + last_timestamp_ = time; + return time; } void PacedSender::SetProbingEnabled(bool enabled) { @@ -208,11 +210,11 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) << "SetPacingRate must be called before InsertPacket."; - int64_t now_ms = TimeMilliseconds(); + Timestamp now = CurrentTime(); prober_.OnIncomingPacket(bytes); if (capture_time_ms < 0) - capture_time_ms = now_ms; + capture_time_ms = now.ms(); RtpPacketToSend::Type type; switch (priority) { @@ -226,7 +228,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, type = RtpPacketToSend::Type::kVideo; } packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number, - capture_time_ms, now_ms, bytes, retransmission, + capture_time_ms, now, DataSize::bytes(bytes), retransmission, packet_counter_++); } @@ -235,16 +237,16 @@ void PacedSender::EnqueuePacket(std::unique_ptr packet) { RTC_DCHECK(pacing_bitrate_ > DataRate::Zero()) << "SetPacingRate must be called before InsertPacket."; - int64_t now_ms = TimeMilliseconds(); + Timestamp now = CurrentTime(); prober_.OnIncomingPacket(packet->payload_size()); if (packet->capture_time_ms() < 0) { - packet->set_capture_time_ms(now_ms); + packet->set_capture_time_ms(now.ms()); } RTC_CHECK(packet->packet_type()); int priority = GetPriorityForType(*packet->packet_type()); - packets_.Push(priority, now_ms, packet_counter_++, std::move(packet)); + packets_.Push(priority, now, packet_counter_++, std::move(packet)); } void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { @@ -267,7 +269,7 @@ size_t PacedSender::QueueSizePackets() const { DataSize PacedSender::QueueSizeData() const { rtc::CritScope cs(&critsect_); - return DataSize::bytes(packets_.SizeInBytes()); + return packets_.Size(); } absl::optional PacedSender::FirstSentPacketTime() const { @@ -277,51 +279,50 @@ absl::optional PacedSender::FirstSentPacketTime() const { TimeDelta PacedSender::OldestPacketWaitTime() const { rtc::CritScope cs(&critsect_); - - int64_t oldest_packet = packets_.OldestEnqueueTimeMs(); - if (oldest_packet == 0) { + Timestamp oldest_packet = packets_.OldestEnqueueTime(); + if (oldest_packet.IsInfinite()) { return TimeDelta::Zero(); } - return TimeDelta::ms(TimeMilliseconds() - oldest_packet); + return CurrentTime() - oldest_packet; } int64_t PacedSender::TimeUntilNextProcess() { rtc::CritScope cs(&critsect_); - int64_t elapsed_time_us = - clock_->TimeInMicroseconds() - time_last_process_us_; - int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; + TimeDelta elapsed_time = CurrentTime() - time_last_process_; // When paused we wake up every 500 ms to send a padding packet to ensure // we won't get stuck in the paused state due to no feedback being received. - if (paused_) - return std::max(kPausedProcessIntervalMs - elapsed_time_ms, 0); + if (paused_) { + return std::max(kPausedProcessInterval - elapsed_time, TimeDelta::Zero()) + .ms(); + } if (prober_.IsProbing()) { - int64_t ret = prober_.TimeUntilNextProbe(TimeMilliseconds()); + int64_t ret = prober_.TimeUntilNextProbe(CurrentTime().ms()); if (ret > 0 || (ret == 0 && !probing_send_failure_)) return ret; } - return std::max(min_packet_limit_ms_ - elapsed_time_ms, 0); + return std::max(min_packet_limit_ - elapsed_time, TimeDelta::Zero()).ms(); } -int64_t PacedSender::UpdateTimeAndGetElapsedMs(int64_t now_us) { - int64_t elapsed_time_ms = (now_us - time_last_process_us_ + 500) / 1000; - time_last_process_us_ = now_us; - if (elapsed_time_ms > kMaxElapsedTimeMs) { - RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time_ms +TimeDelta PacedSender::UpdateTimeAndGetElapsed(Timestamp now) { + TimeDelta elapsed_time = now - time_last_process_; + time_last_process_ = now; + if (elapsed_time > kMaxElapsedTime) { + RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time.ms() << " ms) longer than expected, limiting to " - << kMaxElapsedTimeMs << " ms"; - elapsed_time_ms = kMaxElapsedTimeMs; + << kMaxElapsedTime.ms(); + elapsed_time = kMaxElapsedTime; } - return elapsed_time_ms; + return elapsed_time; } -bool PacedSender::ShouldSendKeepalive(int64_t now_us) const { +bool PacedSender::ShouldSendKeepalive(Timestamp now) const { if (send_padding_if_silent_ || paused_ || Congested()) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. - int64_t elapsed_since_last_send_us = now_us - last_send_time_us_; - if (elapsed_since_last_send_us >= kCongestedPacketIntervalMs * 1000) { + TimeDelta elapsed_since_last_send = now - last_send_time_; + if (elapsed_since_last_send >= kCongestedPacketInterval) { // We can not send padding unless a normal packet has first been sent. If // we do, timestamps get messed up. if (packet_counter_ > 0) { @@ -334,66 +335,66 @@ bool PacedSender::ShouldSendKeepalive(int64_t now_us) const { void PacedSender::Process() { rtc::CritScope cs(&critsect_); - int64_t now_us = clock_->TimeInMicroseconds(); - int64_t elapsed_time_ms = UpdateTimeAndGetElapsedMs(now_us); - if (ShouldSendKeepalive(now_us)) { + Timestamp now = CurrentTime(); + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); + if (ShouldSendKeepalive(now)) { if (legacy_packet_referencing_) { critsect_.Leave(); size_t bytes_sent = packet_router_->TimeToSendPadding(1, PacedPacketInfo()); critsect_.Enter(); - OnPaddingSent(bytes_sent); + OnPaddingSent(DataSize::bytes(bytes_sent)); } else { - size_t keepalive_bytes_sent = 0; + DataSize keepalive_data_sent = DataSize::Zero(); critsect_.Leave(); std::vector> keepalive_packets = packet_router_->GeneratePadding(1); for (auto& packet : keepalive_packets) { - keepalive_bytes_sent += packet->payload_size() + packet->padding_size(); + keepalive_data_sent += + DataSize::bytes(packet->payload_size() + packet->padding_size()); packet_router_->SendPacket(std::move(packet), PacedPacketInfo()); } critsect_.Enter(); - OnPaddingSent(keepalive_bytes_sent); + OnPaddingSent(keepalive_data_sent); } } if (paused_) return; - if (elapsed_time_ms > 0) { - int target_bitrate_kbps = pacing_bitrate_.kbps(); - size_t queue_size_bytes = packets_.SizeInBytes(); - if (queue_size_bytes > 0) { + if (elapsed_time > TimeDelta::Zero()) { + DataRate target_rate = pacing_bitrate_; + DataSize queue_size_data = packets_.Size(); + if (queue_size_data > DataSize::Zero()) { // Assuming equal size packets and input/output rate, the average packet // has avg_time_left_ms left to get queue_size_bytes out of the queue, if // time constraint shall be met. Determine bitrate needed for that. - packets_.UpdateQueueTime(TimeMilliseconds()); + packets_.UpdateQueueTime(CurrentTime()); if (drain_large_queues_) { - int64_t avg_time_left_ms = std::max( - 1, queue_time_limit - packets_.AverageQueueTimeMs()); - int min_bitrate_needed_kbps = - static_cast(queue_size_bytes * 8 / avg_time_left_ms); - if (min_bitrate_needed_kbps > target_bitrate_kbps) { - target_bitrate_kbps = min_bitrate_needed_kbps; + TimeDelta avg_time_left = std::max( + TimeDelta::ms(1), queue_time_limit - packets_.AverageQueueTime()); + DataRate min_rate_needed = queue_size_data / avg_time_left; + if (min_rate_needed > target_rate) { + target_rate = min_rate_needed; RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps=" - << target_bitrate_kbps; + << target_rate.kbps(); } } } - media_budget_.set_target_rate_kbps(target_bitrate_kbps); - UpdateBudgetWithElapsedTime(elapsed_time_ms); + media_budget_.set_target_rate_kbps(target_rate.kbps()); + UpdateBudgetWithElapsedTime(elapsed_time); } bool is_probing = prober_.IsProbing(); PacedPacketInfo pacing_info; - absl::optional recommended_probe_size; + absl::optional recommended_probe_size; if (is_probing) { pacing_info = prober_.CurrentCluster(); - recommended_probe_size = prober_.RecommendedMinProbeSize(); + recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize()); } - size_t bytes_sent = 0; + DataSize data_sent = DataSize::Zero(); // The paused state is checked in the loop since it leaves the critical // section allowing the paused state to be changed from other code. while (!paused_) { @@ -401,12 +402,12 @@ void PacedSender::Process() { if (packet == nullptr) { // No packet available to send, check if we should send padding. if (!legacy_packet_referencing_) { - size_t padding_bytes_to_add = - PaddingBytesToAdd(recommended_probe_size, bytes_sent); - if (padding_bytes_to_add > 0) { + DataSize padding_to_add = + PaddingToAdd(recommended_probe_size, data_sent); + if (padding_to_add > DataSize::Zero()) { critsect_.Leave(); std::vector> padding_packets = - packet_router_->GeneratePadding(padding_bytes_to_add); + packet_router_->GeneratePadding(padding_to_add.bytes()); critsect_.Enter(); if (padding_packets.empty()) { // No padding packets were generated, quite send loop. @@ -445,10 +446,10 @@ void PacedSender::Process() { success == RtpPacketSendResult::kPacketNotFound) { // Packet sent or invalid packet, remove it from queue. // TODO(webrtc:8052): Don't consume media budget on kInvalid. - bytes_sent += packet->size_in_bytes(); + data_sent += packet->size(); // Send succeeded, remove it from the queue. OnPacketSent(packet); - if (recommended_probe_size && bytes_sent > *recommended_probe_size) + if (recommended_probe_size && data_sent > *recommended_probe_size) break; } else if (owned_rtp_packet) { // Send failed, but we can't put it back in the queue, remove it without @@ -466,25 +467,27 @@ void PacedSender::Process() { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. if (packet_counter_ > 0) { - int padding_needed = static_cast( - recommended_probe_size ? (*recommended_probe_size - bytes_sent) - : padding_budget_.bytes_remaining()); - if (padding_needed > 0) { - size_t padding_sent = 0; + DataSize padding_needed = + (recommended_probe_size && *recommended_probe_size > data_sent) + ? (*recommended_probe_size - data_sent) + : DataSize::bytes(padding_budget_.bytes_remaining()); + if (padding_needed > DataSize::Zero()) { + DataSize padding_sent = DataSize::Zero(); critsect_.Leave(); - padding_sent = - packet_router_->TimeToSendPadding(padding_needed, pacing_info); + padding_sent = DataSize::bytes(packet_router_->TimeToSendPadding( + padding_needed.bytes(), pacing_info)); critsect_.Enter(); - bytes_sent += padding_sent; + data_sent += padding_sent; OnPaddingSent(padding_sent); } } } if (is_probing) { - probing_send_failure_ = bytes_sent == 0; - if (!probing_send_failure_) - prober_.ProbeSent(TimeMilliseconds(), bytes_sent); + probing_send_failure_ = data_sent == DataSize::Zero(); + if (!probing_send_failure_) { + prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes()); + } } } @@ -494,33 +497,33 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { process_thread_ = process_thread; } -size_t PacedSender::PaddingBytesToAdd( - absl::optional recommended_probe_size, - size_t bytes_sent) { +DataSize PacedSender::PaddingToAdd( + absl::optional recommended_probe_size, + DataSize data_sent) { if (!packets_.Empty()) { // Actual payload available, no need to add padding. - return 0; + return DataSize::Zero(); } if (Congested()) { // Don't add padding if congested, even if requested for probing. - return 0; + return DataSize::Zero(); } if (packet_counter_ == 0) { // We can not send padding unless a normal packet has first been sent. If we // do, timestamps get messed up. - return 0; + return DataSize::Zero(); } if (recommended_probe_size) { - if (*recommended_probe_size > bytes_sent) { - return *recommended_probe_size - bytes_sent; + if (*recommended_probe_size > data_sent) { + return *recommended_probe_size - data_sent; } - return 0; + return DataSize::Zero(); } - return padding_budget_.bytes_remaining(); + return DataSize::bytes(padding_budget_.bytes_remaining()); } RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket( @@ -545,41 +548,42 @@ RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket( } void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) { + Timestamp now = CurrentTime(); if (!first_sent_packet_time_) { - first_sent_packet_time_ = Timestamp::ms(TimeMilliseconds()); + first_sent_packet_time_ = now; } bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio; if (!audio_packet || account_for_audio_) { // Update media bytes sent. - UpdateBudgetWithBytesSent(packet->size_in_bytes()); - last_send_time_us_ = clock_->TimeInMicroseconds(); + UpdateBudgetWithSentData(packet->size()); + last_send_time_ = now; } // Send succeeded, remove it from the queue. packets_.FinalizePop(); } -void PacedSender::OnPaddingSent(size_t bytes_sent) { - if (bytes_sent > 0) { - UpdateBudgetWithBytesSent(bytes_sent); +void PacedSender::OnPaddingSent(DataSize data_sent) { + if (data_sent > DataSize::Zero()) { + UpdateBudgetWithSentData(data_sent); } - last_send_time_us_ = clock_->TimeInMicroseconds(); + last_send_time_ = CurrentTime(); } -void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { - delta_time_ms = std::min(kMaxIntervalTimeMs, delta_time_ms); - media_budget_.IncreaseBudget(delta_time_ms); - padding_budget_.IncreaseBudget(delta_time_ms); +void PacedSender::UpdateBudgetWithElapsedTime(TimeDelta delta) { + delta = std::min(kMaxProcessingInterval, delta); + media_budget_.IncreaseBudget(delta.ms()); + padding_budget_.IncreaseBudget(delta.ms()); } -void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) { - outstanding_data_ += DataSize::bytes(bytes_sent); - media_budget_.UseBudget(bytes_sent); - padding_budget_.UseBudget(bytes_sent); +void PacedSender::UpdateBudgetWithSentData(DataSize size) { + outstanding_data_ += size; + media_budget_.UseBudget(size.bytes()); + padding_budget_.UseBudget(size.bytes()); } void PacedSender::SetQueueTimeLimit(TimeDelta limit) { rtc::CritScope cs(&critsect_); - queue_time_limit = limit.ms(); + queue_time_limit = limit; } } // namespace webrtc diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 391f623744..07c249f2c8 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -43,7 +43,7 @@ class PacedSender : public Module, public RtpPacketPacer, public RtpPacketSender { public: - // Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than + // Expected max pacer delay in ms. If ExpectedQueueTime() is higher than // this value, the packet producers should wait (eg drop frames rather than // encoding them). Bitrate sent may temporarily exceed target set by // UpdateBitrate() so that this limit will be upheld. @@ -134,19 +134,19 @@ class PacedSender : public Module, void ProcessThreadAttached(ProcessThread* process_thread) override; private: - int64_t UpdateTimeAndGetElapsedMs(int64_t now_us) + TimeDelta UpdateTimeAndGetElapsed(Timestamp now) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - bool ShouldSendKeepalive(int64_t at_time_us) const + bool ShouldSendKeepalive(Timestamp now) const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); // Updates the number of bytes that can be sent for the next time interval. - void UpdateBudgetWithElapsedTime(int64_t delta_time_in_ms) + void UpdateBudgetWithElapsedTime(TimeDelta delta) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - void UpdateBudgetWithBytesSent(size_t bytes) + void UpdateBudgetWithSentData(DataSize size) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - size_t PaddingBytesToAdd(absl::optional recommended_probe_size, - size_t bytes_sent) + DataSize PaddingToAdd(absl::optional recommended_probe_size, + DataSize data_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RoundRobinPacketQueue::QueuedPacket* GetPendingPacket( @@ -154,11 +154,11 @@ class PacedSender : public Module, RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - void OnPaddingSent(size_t padding_sent) + void OnPaddingSent(DataSize padding_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - int64_t TimeMilliseconds() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + Timestamp CurrentTime() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); Clock* const clock_; PacketRouter* const packet_router_; @@ -168,12 +168,12 @@ class PacedSender : public Module, const bool drain_large_queues_; const bool send_padding_if_silent_; const bool pace_audio_; - FieldTrialParameter min_packet_limit_ms_; + TimeDelta min_packet_limit_; rtc::CriticalSection critsect_; // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic. // The last millisecond timestamp returned by |clock_|. - mutable int64_t last_timestamp_ms_ RTC_GUARDED_BY(critsect_); + mutable Timestamp last_timestamp_ RTC_GUARDED_BY(critsect_); bool paused_ RTC_GUARDED_BY(critsect_); // This is the media budget, keeping track of how many bits of media // we can pace out during the current interval. @@ -188,8 +188,8 @@ class PacedSender : public Module, DataRate pacing_bitrate_ RTC_GUARDED_BY(critsect_); - int64_t time_last_process_us_ RTC_GUARDED_BY(critsect_); - int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_); + Timestamp time_last_process_ RTC_GUARDED_BY(critsect_); + Timestamp last_send_time_ RTC_GUARDED_BY(critsect_); absl::optional first_sent_packet_time_ RTC_GUARDED_BY(critsect_); RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_); @@ -206,7 +206,7 @@ class PacedSender : public Module, rtc::CriticalSection process_thread_lock_; ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_); - int64_t queue_time_limit RTC_GUARDED_BY(critsect_); + TimeDelta queue_time_limit RTC_GUARDED_BY(critsect_); bool account_for_audio_ RTC_GUARDED_BY(critsect_); // If true, PacedSender should only reference packets as in legacy mode. diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index c6c82fca02..961a2ff79f 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -506,7 +506,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) { const uint32_t kSsrc = 12345; const size_t kSizeBytes = 250; const size_t kPacketToSend = 3; - const int64_t kStartMs = clock_.TimeInMilliseconds(); + const Timestamp kStartTime = clock_.CurrentTime(); // No packet sent. EXPECT_FALSE(send_bucket_->FirstSentPacketTime().has_value()); @@ -517,7 +517,7 @@ TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) { send_bucket_->Process(); clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); } - EXPECT_EQ(Timestamp::ms(kStartMs), send_bucket_->FirstSentPacketTime()); + EXPECT_EQ(kStartTime, send_bucket_->FirstSentPacketTime()); } TEST_P(PacedSenderTest, QueuePacket) { diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index c0c664b2d6..7b5eb9e304 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -17,6 +17,9 @@ #include "rtc_base/checks.h" namespace webrtc { +namespace { +static constexpr DataSize kMaxLeadingSize = DataSize::Bytes<1400>(); +} RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) = default; @@ -28,11 +31,11 @@ RoundRobinPacketQueue::QueuedPacket::QueuedPacket( uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, + Timestamp enqueue_time, + DataSize size, bool retransmission, uint64_t enqueue_order, - std::multiset::iterator enqueue_time_it, + std::multiset::iterator enqueue_time_it, absl::optional>::iterator> packet_it) : type_(type), @@ -40,8 +43,8 @@ RoundRobinPacketQueue::QueuedPacket::QueuedPacket( ssrc_(ssrc), sequence_number_(seq_number), capture_time_ms_(capture_time_ms), - enqueue_time_ms_(enqueue_time_ms), - bytes_(length_in_bytes), + enqueue_time_(enqueue_time), + size_(size), retransmission_(retransmission), enqueue_order_(enqueue_order), enqueue_time_it_(enqueue_time_it), @@ -52,9 +55,9 @@ RoundRobinPacketQueue::QueuedPacket::ReleasePacket() { return packet_it_ ? std::move(**packet_it_) : nullptr; } -void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs( - int64_t pause_time_sum_ms) { - enqueue_time_ms_ -= pause_time_sum_ms; +void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime( + TimeDelta pause_time_sum) { + enqueue_time_ -= pause_time_sum; } bool RoundRobinPacketQueue::QueuedPacket::operator<( @@ -67,7 +70,7 @@ bool RoundRobinPacketQueue::QueuedPacket::operator<( return enqueue_order_ > other.enqueue_order_; } -RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {} +RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {} RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default; RoundRobinPacketQueue::Stream::~Stream() {} @@ -79,9 +82,15 @@ bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) { } RoundRobinPacketQueue::RoundRobinPacketQueue( - int64_t start_time_us, + Timestamp start_time, const WebRtcKeyValueConfig* field_trials) - : time_last_updated_ms_(start_time_us / 1000), + : time_last_updated_(start_time), + paused_(false), + size_packets_(0), + size_(DataSize::Zero()), + max_size_(kMaxLeadingSize), + queue_time_sum_(TimeDelta::Zero()), + pause_time_sum_(TimeDelta::Zero()), send_side_bwe_with_overhead_( IsEnabled(field_trials, "WebRTC-SendSideBwe-WithOverhead")) {} @@ -92,35 +101,34 @@ void RoundRobinPacketQueue::Push(int priority, uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, + Timestamp enqueue_time, + DataSize size, bool retransmission, uint64_t enqueue_order) { Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms, - enqueue_time_ms, length_in_bytes, retransmission, - enqueue_order, enqueue_times_.insert(enqueue_time_ms), - absl::nullopt)); + enqueue_time, size, retransmission, enqueue_order, + enqueue_times_.insert(enqueue_time), absl::nullopt)); } void RoundRobinPacketQueue::Push(int priority, - int64_t enqueue_time_ms, + Timestamp enqueue_time, uint64_t enqueue_order, std::unique_ptr packet) { uint32_t ssrc = packet->Ssrc(); uint16_t sequence_number = packet->SequenceNumber(); int64_t capture_time_ms = packet->capture_time_ms(); - size_t size_bytes = send_side_bwe_with_overhead_ + DataSize size = + DataSize::bytes(send_side_bwe_with_overhead_ ? packet->size() - : packet->payload_size() + packet->padding_size(); + : packet->payload_size() + packet->padding_size()); auto type = packet->packet_type(); RTC_DCHECK(type.has_value()); rtp_packets_.push_front(std::move(packet)); - Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms, - enqueue_time_ms, size_bytes, - *type == RtpPacketToSend::Type::kRetransmission, - enqueue_order, enqueue_times_.insert(enqueue_time_ms), - rtp_packets_.begin())); + Push(QueuedPacket( + priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time, + size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order, + enqueue_times_.insert(enqueue_time), rtp_packets_.begin())); } RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() { @@ -153,9 +161,9 @@ void RoundRobinPacketQueue::FinalizePop() { // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and // by subtracting it now we effectively remove the time spent in in the // queue while in a paused state. - int64_t time_in_non_paused_state_ms = - time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_; - queue_time_sum_ms_ -= time_in_non_paused_state_ms; + TimeDelta time_in_non_paused_state = + time_last_updated_ - packet.enqueue_time() - pause_time_sum_; + queue_time_sum_ -= time_in_non_paused_state; RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end()); enqueue_times_.erase(packet.EnqueueTimeIterator()); @@ -171,13 +179,13 @@ void RoundRobinPacketQueue::FinalizePop() { // case a "budget" will be built up for the stream sending at the lower // rate. To avoid building a too large budget we limit |bytes| to be within // kMaxLeading bytes of the stream that has sent the most amount of bytes. - stream->bytes = std::max(stream->bytes + packet.size_in_bytes(), - max_bytes_ - kMaxLeadingBytes); - max_bytes_ = std::max(max_bytes_, stream->bytes); + stream->size = + std::max(stream->size + packet.size(), max_size_ - kMaxLeadingSize); + max_size_ = std::max(max_size_, stream->size); - size_bytes_ -= packet.size_in_bytes(); + size_ -= packet.size(); size_packets_ -= 1; - RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0); + RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero()); // If there are packets left to be sent, schedule the stream again. RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); @@ -186,7 +194,7 @@ void RoundRobinPacketQueue::FinalizePop() { } else { int priority = stream->packet_queue.top().priority(); stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(priority, stream->bytes), stream->ssrc); + StreamPrioKey(priority, stream->size), stream->ssrc); } pop_packet_.reset(); @@ -204,44 +212,44 @@ size_t RoundRobinPacketQueue::SizeInPackets() const { return size_packets_; } -uint64_t RoundRobinPacketQueue::SizeInBytes() const { - return size_bytes_; +DataSize RoundRobinPacketQueue::Size() const { + return size_; } -int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const { +Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const { if (Empty()) - return 0; + return Timestamp::MinusInfinity(); RTC_CHECK(!enqueue_times_.empty()); return *enqueue_times_.begin(); } -void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) { - RTC_CHECK_GE(timestamp_ms, time_last_updated_ms_); - if (timestamp_ms == time_last_updated_ms_) +void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) { + RTC_CHECK_GE(now, time_last_updated_); + if (now == time_last_updated_) return; - int64_t delta_ms = timestamp_ms - time_last_updated_ms_; + TimeDelta delta = now - time_last_updated_; if (paused_) { - pause_time_sum_ms_ += delta_ms; + pause_time_sum_ += delta; } else { - queue_time_sum_ms_ += delta_ms * size_packets_; + queue_time_sum_ += TimeDelta::us(delta.us() * size_packets_); } - time_last_updated_ms_ = timestamp_ms; + time_last_updated_ = now; } -void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { +void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) { if (paused_ == paused) return; - UpdateQueueTime(timestamp_ms); + UpdateQueueTime(now); paused_ = paused; } -int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const { +TimeDelta RoundRobinPacketQueue::AverageQueueTime() const { if (Empty()) - return 0; - return queue_time_sum_ms_ / size_packets_; + return TimeDelta::Zero(); + return queue_time_sum_ / size_packets_; } void RoundRobinPacketQueue::Push(QueuedPacket packet) { @@ -258,14 +266,14 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) { // If the SSRC is not currently scheduled, add it to |stream_priorities_|. RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc()); + StreamPrioKey(packet.priority(), stream->size), packet.ssrc()); } else if (packet.priority() < stream->priority_it->first.priority) { // If the priority of this SSRC increased, remove the outdated StreamPrioKey // and insert a new one with the new priority. Note that |priority_| uses // lower ordinal for higher priority. stream_priorities_.erase(stream->priority_it); stream->priority_it = stream_priorities_.emplace( - StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc()); + StreamPrioKey(packet.priority(), stream->size), packet.ssrc()); } RTC_CHECK(stream->priority_it != stream_priorities_.end()); @@ -275,11 +283,11 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) { // amount of time the queue has been paused at that moment. This way we // subtract the total amount of time the packet has spent in the queue while // in a paused state. - UpdateQueueTime(packet.enqueue_time_ms()); - packet.SubtractPauseTimeMs(pause_time_sum_ms_); + UpdateQueueTime(packet.enqueue_time()); + packet.SubtractPauseTime(pause_time_sum_); size_packets_ += 1; - size_bytes_ += packet.size_in_bytes(); + size_ += packet.size(); stream->packet_queue.push(packet); } diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index c045be4ab6..abb6e3a46d 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -22,6 +22,9 @@ #include "absl/types/optional.h" #include "api/transport/webrtc_key_value_config.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "system_wrappers/include/clock.h" @@ -30,7 +33,7 @@ namespace webrtc { class RoundRobinPacketQueue { public: - RoundRobinPacketQueue(int64_t start_time_us, + RoundRobinPacketQueue(Timestamp start_time, const WebRtcKeyValueConfig* field_trials); ~RoundRobinPacketQueue(); @@ -42,11 +45,11 @@ class RoundRobinPacketQueue { uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, + Timestamp enqueue_time, + DataSize size, bool retransmission, uint64_t enqueue_order, - std::multiset::iterator enqueue_time_it, + std::multiset::iterator enqueue_time_it, absl::optional>::iterator> packet_it); QueuedPacket(const QueuedPacket& rhs); @@ -59,8 +62,8 @@ class RoundRobinPacketQueue { uint32_t ssrc() const { return ssrc_; } uint16_t sequence_number() const { return sequence_number_; } int64_t capture_time_ms() const { return capture_time_ms_; } - int64_t enqueue_time_ms() const { return enqueue_time_ms_; } - size_t size_in_bytes() const { return bytes_; } + Timestamp enqueue_time() const { return enqueue_time_; } + DataSize size() const { return size_; } bool is_retransmission() const { return retransmission_; } uint64_t enqueue_order() const { return enqueue_order_; } std::unique_ptr ReleasePacket(); @@ -70,10 +73,10 @@ class RoundRobinPacketQueue { PacketIterator() const { return packet_it_; } - std::multiset::iterator EnqueueTimeIterator() const { + std::multiset::iterator EnqueueTimeIterator() const { return enqueue_time_it_; } - void SubtractPauseTimeMs(int64_t pause_time_sum_ms); + void SubtractPauseTime(TimeDelta pause_time_sum); private: RtpPacketToSend::Type type_; @@ -81,11 +84,11 @@ class RoundRobinPacketQueue { uint32_t ssrc_; uint16_t sequence_number_; int64_t capture_time_ms_; // Absolute time of frame capture. - int64_t enqueue_time_ms_; // Absolute time of pacer queue entry. - size_t bytes_; + Timestamp enqueue_time_; // Absolute time of pacer queue entry. + DataSize size_; bool retransmission_; uint64_t enqueue_order_; - std::multiset::iterator enqueue_time_it_; + std::multiset::iterator enqueue_time_it_; // Iterator into |rtp_packets_| where the memory for RtpPacket is owned, // if applicable. absl::optional>::iterator> @@ -97,12 +100,12 @@ class RoundRobinPacketQueue { uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, - int64_t enqueue_time_ms, - size_t length_in_bytes, + Timestamp enqueue_time, + DataSize size, bool retransmission, uint64_t enqueue_order); void Push(int priority, - int64_t enqueue_time_ms, + Timestamp enqueue_time, uint64_t enqueue_order, std::unique_ptr packet); QueuedPacket* BeginPop(); @@ -111,26 +114,26 @@ class RoundRobinPacketQueue { bool Empty() const; size_t SizeInPackets() const; - uint64_t SizeInBytes() const; + DataSize Size() const; - int64_t OldestEnqueueTimeMs() const; - int64_t AverageQueueTimeMs() const; - void UpdateQueueTime(int64_t timestamp_ms); - void SetPauseState(bool paused, int64_t timestamp_ms); + Timestamp OldestEnqueueTime() const; + TimeDelta AverageQueueTime() const; + void UpdateQueueTime(Timestamp now); + void SetPauseState(bool paused, Timestamp now); private: struct StreamPrioKey { - StreamPrioKey(int priority, int64_t bytes) - : priority(priority), bytes(bytes) {} + StreamPrioKey(int priority, DataSize size) + : priority(priority), size(size) {} bool operator<(const StreamPrioKey& other) const { if (priority != other.priority) return priority < other.priority; - return bytes < other.bytes; + return size < other.size; } const int priority; - const size_t bytes; + const DataSize size; }; struct Stream { @@ -139,7 +142,7 @@ class RoundRobinPacketQueue { virtual ~Stream(); - size_t bytes; + DataSize size; uint32_t ssrc; std::priority_queue packet_queue; @@ -151,8 +154,6 @@ class RoundRobinPacketQueue { std::multimap::iterator priority_it; }; - static constexpr size_t kMaxLeadingBytes = 1400; - void Push(QueuedPacket packet); Stream* GetHighestPriorityStream(); @@ -160,16 +161,16 @@ class RoundRobinPacketQueue { // Just used to verify correctness. bool IsSsrcScheduled(uint32_t ssrc) const; - int64_t time_last_updated_ms_; + Timestamp time_last_updated_; absl::optional pop_packet_; absl::optional pop_stream_; - bool paused_ = false; - size_t size_packets_ = 0; - size_t size_bytes_ = 0; - size_t max_bytes_ = kMaxLeadingBytes; - int64_t queue_time_sum_ms_ = 0; - int64_t pause_time_sum_ms_ = 0; + bool paused_; + size_t size_packets_; + DataSize size_; + DataSize max_size_; + TimeDelta queue_time_sum_; + TimeDelta pause_time_sum_; // A map of streams used to prioritize from which stream to send next. We use // a multimap instead of a priority_queue since the priority of a stream can @@ -182,7 +183,7 @@ class RoundRobinPacketQueue { // The enqueue time of every packet currently in the queue. Used to figure out // the age of the oldest packet in the queue. - std::multiset enqueue_times_; + std::multiset enqueue_times_; // List of RTP packets to be sent, not necessarily in the order they will be // sent. PacketInfo.packet_it will point to an entry in this list, or the