diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 2fed7be2ba..7e0ac781e2 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -11,6 +11,7 @@ #include "modules/pacing/paced_sender.h" #include +#include #include "absl/memory/memory.h" #include "logging/rtc_event_log/rtc_event_log.h" @@ -257,9 +258,7 @@ int64_t PacedSender::TimeUntilNextProcess() { return std::max(kMinPacketLimitMs - elapsed_time_ms, 0); } -void PacedSender::Process() { - int64_t now_us = clock_->TimeInMicroseconds(); - rtc::CritScope cs(&critsect_); +int64_t PacedSender::UpdateTimeAndGetElapsedMs(int64_t now_us) { int64_t elapsed_time_ms = (now_us - time_last_process_us_ + 500) / 1000; time_last_process_us_ = now_us; if (elapsed_time_ms > kMaxElapsedTimeMs) { @@ -268,6 +267,10 @@ void PacedSender::Process() { << kMaxElapsedTimeMs << " ms"; elapsed_time_ms = kMaxElapsedTimeMs; } + return elapsed_time_ms; +} + +bool PacedSender::ShouldSendKeepalive(int64_t now_us) const { if (send_padding_if_silent_ || paused_ || Congested()) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. @@ -276,12 +279,25 @@ void PacedSender::Process() { // We can not send padding unless a normal packet has first been sent. If // we do, timestamps get messed up. if (packet_counter_ > 0) { - PacedPacketInfo pacing_info; - size_t bytes_sent = SendPadding(1, pacing_info); - alr_detector_->OnBytesSent(bytes_sent, now_us / 1000); + return true; } } } + return false; +} + +void PacedSender::Process() { + rtc::CritScope cs(&critsect_); + int64_t now_us = clock_->TimeInMicroseconds(); + int64_t elapsed_time_ms = UpdateTimeAndGetElapsedMs(now_us); + if (ShouldSendKeepalive(now_us)) { + critsect_.Leave(); + size_t bytes_sent = packet_sender_->TimeToSendPadding(1, PacedPacketInfo()); + critsect_.Enter(); + OnPaddingSent(bytes_sent); + alr_detector_->OnBytesSent(bytes_sent, now_us / 1000); + } + if (paused_) return; @@ -315,23 +331,27 @@ void PacedSender::Process() { pacing_info = prober_.CurrentCluster(); recommended_probe_size = prober_.RecommendedMinProbeSize(); } - // The paused state is checked in the loop since SendPacket leaves the - // critical section allowing the paused state to be changed from other code. + // The paused state is checked in the loop since it leaves the critical + // section allowing the paused state to be changed from other code. while (!packets_.Empty() && !paused_) { - // Since we need to release the lock in order to send, we first pop the - // element from the priority queue but keep it in storage, so that we can - // reinsert it if send fails. - const RoundRobinPacketQueue::Packet& packet = packets_.BeginPop(); + const auto* packet = GetPendingPacket(pacing_info); + if (packet == nullptr) + break; - if (SendPacket(packet, pacing_info)) { - bytes_sent += packet.bytes; + critsect_.Leave(); + bool success = packet_sender_->TimeToSendPacket( + packet->ssrc, packet->sequence_number, packet->capture_time_ms, + packet->retransmission, pacing_info); + critsect_.Enter(); + if (success) { + bytes_sent += packet->bytes; // Send succeeded, remove it from the queue. - packets_.FinalizePop(packet); + OnPacketSent(std::move(packet)); if (is_probing && bytes_sent > recommended_probe_size) break; } else { // Send failed, put it back into the queue. - packets_.CancelPop(packet); + packets_.CancelPop(*packet); break; } } @@ -344,7 +364,12 @@ void PacedSender::Process() { static_cast(is_probing ? (recommended_probe_size - bytes_sent) : padding_budget_.bytes_remaining()); if (padding_needed > 0) { - bytes_sent += SendPadding(padding_needed, pacing_info); + critsect_.Leave(); + size_t padding_sent = + packet_sender_->TimeToSendPadding(padding_needed, pacing_info); + critsect_.Enter(); + bytes_sent += padding_sent; + OnPaddingSent(padding_sent); } } } @@ -362,54 +387,46 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { process_thread_ = process_thread; } -bool PacedSender::SendPacket(const RoundRobinPacketQueue::Packet& packet, - const PacedPacketInfo& pacing_info) { - RTC_DCHECK(!paused_); - bool audio_packet = packet.priority == kHighPriority; +const RoundRobinPacketQueue::Packet* PacedSender::GetPendingPacket( + const PacedPacketInfo& pacing_info) { + // Since we need to release the lock in order to send, we first pop the + // element from the priority queue but keep it in storage, so that we can + // reinsert it if send fails. + const RoundRobinPacketQueue::Packet* packet = &packets_.BeginPop(); + bool audio_packet = packet->priority == kHighPriority; bool apply_pacing = !audio_packet || account_for_audio_ || video_blocks_audio_; if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 && pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe))) { - return false; + packets_.CancelPop(*packet); + return nullptr; } - - critsect_.Leave(); - const bool success = packet_sender_->TimeToSendPacket( - packet.ssrc, packet.sequence_number, packet.capture_time_ms, - packet.retransmission, pacing_info); - critsect_.Enter(); - - if (success) { - if (first_sent_packet_ms_ == -1) - first_sent_packet_ms_ = TimeMilliseconds(); - if (!audio_packet || account_for_audio_) { - // Update media bytes sent. - // TODO(eladalon): TimeToSendPacket() can also return |true| in some - // situations where nothing actually ended up being sent to the network, - // and we probably don't want to update the budget in such cases. - // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052 - UpdateBudgetWithBytesSent(packet.bytes); - last_send_time_us_ = clock_->TimeInMicroseconds(); - } - } - - return success; + return packet; } -size_t PacedSender::SendPadding(size_t padding_needed, - const PacedPacketInfo& pacing_info) { - RTC_DCHECK_GT(packet_counter_, 0); - critsect_.Leave(); - size_t bytes_sent = - packet_sender_->TimeToSendPadding(padding_needed, pacing_info); - critsect_.Enter(); +void PacedSender::OnPacketSent(const RoundRobinPacketQueue::Packet* packet) { + if (first_sent_packet_ms_ == -1) + first_sent_packet_ms_ = TimeMilliseconds(); + bool audio_packet = packet->priority == kHighPriority; + if (!audio_packet || account_for_audio_) { + // Update media bytes sent. + // TODO(eladalon): TimeToSendPacket() can also return |true| in some + // situations where nothing actually ended up being sent to the network, + // and we probably don't want to update the budget in such cases. + // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052 + UpdateBudgetWithBytesSent(packet->bytes); + last_send_time_us_ = clock_->TimeInMicroseconds(); + } + // Send succeeded, remove it from the queue. + packets_.FinalizePop(*packet); +} +void PacedSender::OnPaddingSent(size_t bytes_sent) { if (bytes_sent > 0) { UpdateBudgetWithBytesSent(bytes_sent); } last_send_time_us_ = clock_->TimeInMicroseconds(); - return bytes_sent; } void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index b6f294f6df..4586d29cca 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -143,19 +143,25 @@ class PacedSender : public Pacer { void SetQueueTimeLimit(int limit_ms); private: + int64_t UpdateTimeAndGetElapsedMs(int64_t now_us) + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + bool ShouldSendKeepalive(int64_t at_time_us) const + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + // Updates the number of bytes that can be sent for the next time interval. void UpdateBudgetWithElapsedTime(int64_t delta_time_in_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); void UpdateBudgetWithBytesSent(size_t bytes) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - bool SendPacket(const RoundRobinPacketQueue::Packet& packet, - const PacedPacketInfo& cluster_info) + const RoundRobinPacketQueue::Packet* GetPendingPacket( + const PacedPacketInfo& pacing_info) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) + void OnPacketSent(const RoundRobinPacketQueue::Packet* packet) + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + void OnPaddingSent(size_t padding_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); - void OnBytesSent(size_t bytes_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); int64_t TimeMilliseconds() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);