diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn index 0446799fb7..70350f842d 100644 --- a/modules/rtp_rtcp/BUILD.gn +++ b/modules/rtp_rtcp/BUILD.gn @@ -295,6 +295,7 @@ rtc_library("rtp_rtcp") { "../../rtc_base:safe_minmax", "../../rtc_base/experiments:field_trial_parser", "../../rtc_base/synchronization:sequence_checker", + "../../rtc_base/task_utils:repeating_task", "../../rtc_base/task_utils:to_queued_task", "../../rtc_base/time:timestamp_extrapolator", "../../system_wrappers", diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc index e50f72bb29..76a6e09ac2 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc @@ -34,7 +34,6 @@ namespace webrtc { namespace { const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5; const int64_t kRtpRtcpRttProcessTimeMs = 1000; -const int64_t kRtpRtcpBitrateProcessTimeMs = 10; const int64_t kDefaultExpectedRetransmissionTimeMs = 125; } // namespace @@ -49,10 +48,10 @@ ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext( config.paced_sender ? config.paced_sender : &non_paced_sender) {} ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) - : rtcp_sender_(configuration), + : worker_queue_(TaskQueueBase::Current()), + rtcp_sender_(configuration), rtcp_receiver_(configuration, this), clock_(configuration.clock), - last_bitrate_process_time_(clock_->TimeInMilliseconds()), last_rtt_process_time_(clock_->TimeInMilliseconds()), next_process_time_(clock_->TimeInMilliseconds() + kRtpRtcpMaxIdleTimeProcessMs), @@ -62,6 +61,7 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) remote_bitrate_(configuration.remote_bitrate_estimator), rtt_stats_(configuration.rtt_stats), rtt_ms_(0) { + RTC_DCHECK(worker_queue_); process_thread_checker_.Detach(); if (!configuration.receiver_only) { rtp_sender_ = std::make_unique(configuration); @@ -78,7 +78,7 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration) } ModuleRtpRtcpImpl2::~ModuleRtpRtcpImpl2() { - RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK_RUN_ON(worker_queue_); } // static @@ -105,18 +105,6 @@ void ModuleRtpRtcpImpl2::Process() { // times a second. next_process_time_ = now + kRtpRtcpMaxIdleTimeProcessMs; - if (rtp_sender_) { - if (now >= last_bitrate_process_time_ + kRtpRtcpBitrateProcessTimeMs) { - rtp_sender_->packet_sender.ProcessBitrateAndNotifyObservers(); - last_bitrate_process_time_ = now; - // TODO(bugs.webrtc.org/11581): Is this a bug? At the top of the function, - // next_process_time_ is incremented by 5ms, here we effectively do a - // std::min() of (now + 5ms, now + 10ms). Seems like this is a no-op? - next_process_time_ = - std::min(next_process_time_, now + kRtpRtcpBitrateProcessTimeMs); - } - } - // TODO(bugs.webrtc.org/11581): We update the RTT once a second, whereas other // things that run in this method are updated much more frequently. Move the // RTT checking over to the worker thread, which matches better with where the diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h index 276f88a6b5..1050d17950 100644 --- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h +++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h @@ -21,6 +21,7 @@ #include "absl/types/optional.h" #include "api/rtp_headers.h" +#include "api/task_queue/task_queue_base.h" #include "api/video/video_bitrate_allocation.h" #include "modules/include/module_fec_types.h" #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" @@ -283,7 +284,7 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, bool TimeToSendFullNackList(int64_t now) const; - SequenceChecker construction_thread_checker_; + TaskQueueBase* const worker_queue_; SequenceChecker process_thread_checker_; std::unique_ptr rtp_sender_; @@ -293,7 +294,6 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface, Clock* const clock_; - int64_t last_bitrate_process_time_; int64_t last_rtt_process_time_; int64_t next_process_time_; uint16_t packet_overhead_; diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc index c309fc3f0d..eeb910401d 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.cc +++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc @@ -26,6 +26,8 @@ constexpr uint32_t kTimestampTicksPerMs = 90; constexpr int kSendSideDelayWindowMs = 1000; constexpr int kBitrateStatisticsWindowMs = 1000; constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13; +constexpr TimeDelta kUpdateInterval = + TimeDelta::Millis(kBitrateStatisticsWindowMs); bool IsEnabled(absl::string_view name, const WebRtcKeyValueConfig* field_trials) { @@ -55,7 +57,8 @@ void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets( RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, RtpPacketHistory* packet_history) - : ssrc_(config.local_media_ssrc), + : worker_queue_(TaskQueueBase::Current()), + ssrc_(config.local_media_ssrc), rtx_ssrc_(config.rtx_send_ssrc), flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc() : absl::nullopt), @@ -85,7 +88,19 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, ? std::make_unique( kRtpSequenceNumberMapMaxEntries) : nullptr) { - RTC_DCHECK(TaskQueueBase::Current()); + RTC_DCHECK(worker_queue_); + if (bitrate_callback_) { + update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_, + kUpdateInterval, [this]() { + PeriodicUpdate(); + return kUpdateInterval; + }); + } +} + +RtpSenderEgress::~RtpSenderEgress() { + RTC_DCHECK_RUN_ON(worker_queue_); + update_task_.Stop(); } void RtpSenderEgress::SendPacket(RtpPacketToSend* packet, @@ -198,29 +213,20 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet, if (send_success) { rtc::CritScope lock(&lock_); - UpdateRtpStats(*packet); + // TODO(bugs.webrtc.org/11581): Update the stats on the worker thread + // (PostTask). + UpdateRtpStats(now_ms, *packet); media_has_been_sent_ = true; } } -void RtpSenderEgress::ProcessBitrateAndNotifyObservers() { - if (!bitrate_callback_) - return; - - rtc::CritScope lock(&lock_); - RtpSendRates send_rates = GetSendRatesLocked(); - bitrate_callback_->Notify( - send_rates.Sum().bps(), - send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); -} - RtpSendRates RtpSenderEgress::GetSendRates() const { rtc::CritScope lock(&lock_); - return GetSendRatesLocked(); + const int64_t now_ms = clock_->TimeInMilliseconds(); + return GetSendRatesLocked(now_ms); } -RtpSendRates RtpSenderEgress::GetSendRatesLocked() const { - const int64_t now_ms = clock_->TimeInMilliseconds(); +RtpSendRates RtpSenderEgress::GetSendRatesLocked(int64_t now_ms) const { RtpSendRates current_rates; for (size_t i = 0; i < kNumMediaTypes; ++i) { RtpPacketMediaType type = static_cast(i); @@ -232,6 +238,8 @@ RtpSendRates RtpSenderEgress::GetSendRatesLocked() const { void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats, StreamDataCounters* rtx_stats) const { + // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are + // only touched on the worker thread. rtc::CritScope lock(&lock_); *rtp_stats = rtp_stats_; *rtx_stats = rtx_rtp_stats_; @@ -436,9 +444,10 @@ bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet, return true; } -void RtpSenderEgress::UpdateRtpStats(const RtpPacketToSend& packet) { - int64_t now_ms = clock_->TimeInMilliseconds(); - +void RtpSenderEgress::UpdateRtpStats(int64_t now_ms, + const RtpPacketToSend& packet) { + // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are + // only touched on the worker thread. StreamDataCounters* counters = packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; @@ -456,12 +465,34 @@ void RtpSenderEgress::UpdateRtpStats(const RtpPacketToSend& packet) { counters->transmitted.AddPacket(packet); RTC_DCHECK(packet.packet_type().has_value()); + // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the + // worker thread. send_rates_[static_cast(*packet.packet_type())].Update(packet.size(), now_ms); + // TODO(bugs.webrtc.org/11581): These (stats related) stat callbacks should be + // issued on the worker thread. if (rtp_stats_callback_) { rtp_stats_callback_->DataCountersUpdated(*counters, packet.Ssrc()); } + + // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point + // to the same object, so these callbacks could be consolidated into one. + if (bitrate_callback_) { + RtpSendRates send_rates = GetSendRatesLocked(now_ms); + bitrate_callback_->Notify( + send_rates.Sum().bps(), + send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); + } +} + +void RtpSenderEgress::PeriodicUpdate() { + RTC_DCHECK_RUN_ON(worker_queue_); + RTC_DCHECK(bitrate_callback_); + RtpSendRates send_rates = GetSendRates(); + bitrate_callback_->Notify( + send_rates.Sum().bps(), + send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); } } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h index a8e033c5bf..db29cd51da 100644 --- a/modules/rtp_rtcp/source/rtp_sender_egress.h +++ b/modules/rtp_rtcp/source/rtp_sender_egress.h @@ -18,6 +18,7 @@ #include "absl/types/optional.h" #include "api/call/transport.h" #include "api/rtc_event_log/rtc_event_log.h" +#include "api/task_queue/task_queue_base.h" #include "api/units/data_rate.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/rtp_packet_history.h" @@ -26,6 +27,8 @@ #include "modules/rtp_rtcp/source/rtp_sequence_number_map.h" #include "rtc_base/critical_section.h" #include "rtc_base/rate_statistics.h" +#include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -49,7 +52,7 @@ class RtpSenderEgress { RtpSenderEgress(const RtpRtcpInterface::Configuration& config, RtpPacketHistory* packet_history); - ~RtpSenderEgress() = default; + ~RtpSenderEgress(); void SendPacket(RtpPacketToSend* packet, const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_); @@ -57,7 +60,6 @@ class RtpSenderEgress { absl::optional RtxSsrc() const { return rtx_ssrc_; } absl::optional FlexFecSsrc() const { return flexfec_ssrc_; } - void ProcessBitrateAndNotifyObservers() RTC_LOCKS_EXCLUDED(lock_); RtpSendRates GetSendRates() const RTC_LOCKS_EXCLUDED(lock_); void GetDataCounters(StreamDataCounters* rtp_stats, StreamDataCounters* rtx_stats) const @@ -84,7 +86,8 @@ class RtpSenderEgress { // time. typedef std::map SendDelayMap; - RtpSendRates GetSendRatesLocked() const RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + RtpSendRates GetSendRatesLocked(int64_t now_ms) const + RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); bool HasCorrectSsrc(const RtpPacketToSend& packet) const; void AddPacketToTransportFeedback(uint16_t packet_id, const RtpPacketToSend& packet, @@ -100,9 +103,13 @@ class RtpSenderEgress { bool SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options, const PacedPacketInfo& pacing_info); - void UpdateRtpStats(const RtpPacketToSend& packet) + void UpdateRtpStats(int64_t now_ms, const RtpPacketToSend& packet) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); + // Called on a timer, once a second, on the worker_queue_. + void PeriodicUpdate(); + + TaskQueueBase* const worker_queue_; const uint32_t ssrc_; const absl::optional rtx_ssrc_; const absl::optional flexfec_ssrc_; @@ -142,6 +149,8 @@ class RtpSenderEgress { // 3. Whether the packet was the last in its frame. const std::unique_ptr rtp_sequence_number_map_ RTC_GUARDED_BY(lock_); + + RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_); }; } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_unittest.cc index 12055b5b1c..6f0bbbc26c 100644 --- a/modules/rtp_rtcp/source/rtp_sender_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_sender_unittest.cc @@ -1774,9 +1774,6 @@ TEST_P(RtpSenderTest, BitrateCallbacks) { RtpPacketHistory::StorageMode::kStoreAndCull, 1); uint32_t ssrc = rtp_sender()->SSRC(); - // Initial process call so we get a new time window. - rtp_egress()->ProcessBitrateAndNotifyObservers(); - // Send a few frames. RTPVideoHeader video_header; for (uint32_t i = 0; i < kNumPackets; ++i) { @@ -1787,15 +1784,13 @@ TEST_P(RtpSenderTest, BitrateCallbacks) { fake_clock_.AdvanceTimeMilliseconds(kPacketInterval); } - rtp_egress()->ProcessBitrateAndNotifyObservers(); - // We get one call for every stats updated, thus two calls since both the // stream stats and the retransmit stats are updated once. - EXPECT_EQ(2u, callback.num_calls_); + EXPECT_EQ(kNumPackets, callback.num_calls_); EXPECT_EQ(ssrc, callback.ssrc_); const uint32_t kTotalPacketSize = kPacketOverhead + sizeof(payload); // Bitrate measured over delta between last and first timestamp, plus one. - const uint32_t kExpectedWindowMs = kNumPackets * kPacketInterval + 1; + const uint32_t kExpectedWindowMs = (kNumPackets - 1) * kPacketInterval + 1; const uint32_t kExpectedBitsAccumulated = kTotalPacketSize * kNumPackets * 8; const uint32_t kExpectedRateBps = (kExpectedBitsAccumulated * 1000 + (kExpectedWindowMs / 2)) /