From 96816753d95c571db3b935ce33b632edd4fc36ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Spr=C3=A5ng?= Date: Tue, 4 Sep 2018 18:40:19 +0200 Subject: [PATCH] Don't let time flow backwards in pacer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: webrtc:9716, b:111681259 Change-Id: I1bf8edeaed6c56f3f5a0bdcc1f71108e119e1843 Reviewed-on: https://webrtc-review.googlesource.com/97701 Commit-Queue: Erik Språng Reviewed-by: Philip Eliasson Cr-Commit-Position: refs/heads/master@{#24561} --- modules/pacing/paced_sender.cc | 32 ++++++++++++++++------ modules/pacing/paced_sender.h | 5 ++++ modules/pacing/round_robin_packet_queue.cc | 4 +-- modules/pacing/round_robin_packet_queue.h | 1 - 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index c64b161478..eb690f72f1 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -67,6 +67,7 @@ PacedSender::PacedSender(const Clock* clock, send_padding_if_silent_( field_trial::IsEnabled("WebRTC-Pacer-PadInSilence")), video_blocks_audio_(!field_trial::IsDisabled("WebRTC-Pacer-BlockAudio")), + last_timestamp_ms_(clock_->TimeInMilliseconds()), paused_(false), media_budget_(absl::make_unique(0)), padding_budget_(absl::make_unique(0)), @@ -94,7 +95,7 @@ PacedSender::~PacedSender() {} void PacedSender::CreateProbeCluster(int bitrate_bps) { rtc::CritScope cs(&critsect_); - prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds()); + prober_->CreateProbeCluster(bitrate_bps, TimeMilliseconds()); } void PacedSender::Pause() { @@ -103,7 +104,7 @@ void PacedSender::Pause() { if (!paused_) RTC_LOG(LS_INFO) << "PacedSender paused."; paused_ = true; - packets_->SetPauseState(true, clock_->TimeInMilliseconds()); + packets_->SetPauseState(true, TimeMilliseconds()); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to get @@ -118,7 +119,7 @@ void PacedSender::Resume() { if (paused_) RTC_LOG(LS_INFO) << "PacedSender resumed."; paused_ = false; - packets_->SetPauseState(false, clock_->TimeInMilliseconds()); + packets_->SetPauseState(false, TimeMilliseconds()); } rtc::CritScope cs(&process_thread_lock_); // Tell the process thread to call our TimeUntilNextProcess() method to @@ -143,6 +144,19 @@ bool PacedSender::Congested() const { return outstanding_bytes_ >= congestion_window_bytes_; } +int64_t PacedSender::TimeMilliseconds() const { + int64_t time_ms = clock_->TimeInMilliseconds(); + if (time_ms < last_timestamp_ms_) { + RTC_LOG(LS_WARNING) + << "Non-monotonic clock behavior observed. Previous timestamp: " + << last_timestamp_ms_ << ", new timestamp: " << time_ms; + RTC_DCHECK_GE(time_ms, last_timestamp_ms_); + time_ms = last_timestamp_ms_; + } + last_timestamp_ms_ = time_ms; + return time_ms; +} + void PacedSender::SetProbingEnabled(bool enabled) { rtc::CritScope cs(&critsect_); RTC_CHECK_EQ(0, packet_counter_); @@ -192,7 +206,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, RTC_DCHECK(pacing_bitrate_kbps_ > 0) << "SetPacingRate must be called before InsertPacket."; - int64_t now_ms = clock_->TimeInMilliseconds(); + int64_t now_ms = TimeMilliseconds(); prober_->OnIncomingPacket(bytes); if (capture_time_ms < 0) @@ -238,7 +252,7 @@ int64_t PacedSender::QueueInMs() const { if (oldest_packet == 0) return 0; - return clock_->TimeInMilliseconds() - oldest_packet; + return TimeMilliseconds() - oldest_packet; } int64_t PacedSender::TimeUntilNextProcess() { @@ -252,7 +266,7 @@ int64_t PacedSender::TimeUntilNextProcess() { return std::max(kPausedProcessIntervalMs - elapsed_time_ms, 0); if (prober_->IsProbing()) { - int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); + int64_t ret = prober_->TimeUntilNextProbe(TimeMilliseconds()); if (ret > 0 || (ret == 0 && !probing_send_failure_)) return ret; } @@ -294,7 +308,7 @@ void PacedSender::Process() { // Assuming equal size packets and input/output rate, the average packet // has avg_time_left_ms left to get queue_size_bytes out of the queue, if // time constraint shall be met. Determine bitrate needed for that. - packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); + packets_->UpdateQueueTime(TimeMilliseconds()); if (drain_large_queues_) { int64_t avg_time_left_ms = std::max( 1, queue_time_limit - packets_->AverageQueueTimeMs()); @@ -353,7 +367,7 @@ void PacedSender::Process() { if (is_probing) { probing_send_failure_ = bytes_sent == 0; if (!probing_send_failure_) - prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent); + prober_->ProbeSent(TimeMilliseconds(), bytes_sent); } alr_detector_->OnBytesSent(bytes_sent, now_us / 1000); } @@ -384,7 +398,7 @@ bool PacedSender::SendPacket(const PacketQueueInterface::Packet& packet, if (success) { if (first_sent_packet_ms_ == -1) - first_sent_packet_ms_ = clock_->TimeInMilliseconds(); + first_sent_packet_ms_ = TimeMilliseconds(); if (!audio_packet || account_for_audio_) { // Update media bytes sent. // TODO(eladalon): TimeToSendPacket() can also return |true| in some diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 2fcc3a3c0d..c5ca5a6b9a 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -157,6 +157,7 @@ class PacedSender : public Pacer { void OnBytesSent(size_t bytes_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + int64_t TimeMilliseconds() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); const Clock* const clock_; PacketSender* const packet_sender_; @@ -165,7 +166,11 @@ class PacedSender : public Pacer { const bool drain_large_queues_; const bool send_padding_if_silent_; const bool video_blocks_audio_; + rtc::CriticalSection critsect_; + // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic. + // The last millisecond timestamp returned by |clock_|. + mutable int64_t last_timestamp_ms_ RTC_GUARDED_BY(critsect_); bool paused_ RTC_GUARDED_BY(critsect_); // This is the media budget, keeping track of how many bits of media // we can pace out during the current interval. diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index a7cb8befda..0989b45966 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -17,12 +17,12 @@ namespace webrtc { -RoundRobinPacketQueue::Stream::Stream() : bytes(0) {} +RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {} RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default; RoundRobinPacketQueue::Stream::~Stream() {} RoundRobinPacketQueue::RoundRobinPacketQueue(const Clock* clock) - : clock_(clock), time_last_updated_(clock_->TimeInMilliseconds()) {} + : time_last_updated_(clock->TimeInMilliseconds()) {} RoundRobinPacketQueue::~RoundRobinPacketQueue() {} diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index 5dd68cd0ec..e8cb19e9c7 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -82,7 +82,6 @@ class RoundRobinPacketQueue : public PacketQueueInterface { // Just used to verify correctness. bool IsSsrcScheduled(uint32_t ssrc) const; - const Clock* const clock_; int64_t time_last_updated_; absl::optional pop_packet_; absl::optional pop_stream_;