From 010c189f767ef8786c683ad9a5bcd91b7c553726 Mon Sep 17 00:00:00 2001 From: Jakob Ivarsson Date: Wed, 4 Sep 2024 11:24:16 +0000 Subject: [PATCH] Move thread handling from source tracker. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This makes it simpler to use in more contexts. Bug: b/364184684 Change-Id: I1b08ebd24e51ba1b3f85261eed503a78cd006fd8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/361480 Reviewed-by: Danil Chapovalov Commit-Queue: Jakob Ivarsson‎ Reviewed-by: Åsa Persson Cr-Commit-Position: refs/heads/main@{#42956} --- audio/audio_receive_stream.cc | 16 +------ audio/audio_receive_stream.h | 2 - audio/audio_receive_stream_unittest.cc | 1 - audio/channel_receive.cc | 51 +++++++++++++---------- audio/channel_receive.h | 4 +- audio/mock_voe_channel_proxy.h | 2 +- modules/rtp_rtcp/source/source_tracker.cc | 36 ++++------------ modules/rtp_rtcp/source/source_tracker.h | 20 ++++----- video/video_receive_stream2.cc | 11 +++-- video/video_receive_stream2.h | 2 +- 10 files changed, 58 insertions(+), 87 deletions(-) diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc index 44c95f3c04..bf2da5dd18 100644 --- a/audio/audio_receive_stream.cc +++ b/audio/audio_receive_stream.cc @@ -102,7 +102,6 @@ AudioReceiveStreamImpl::AudioReceiveStreamImpl( std::unique_ptr channel_receive) : config_(config), audio_state_(audio_state), - source_tracker_(&env.clock()), channel_receive_(std::move(channel_receive)) { RTC_LOG(LS_INFO) << "AudioReceiveStreamImpl: " << config.rtp.remote_ssrc; RTC_DCHECK(config.decoder_factory); @@ -114,11 +113,6 @@ AudioReceiveStreamImpl::AudioReceiveStreamImpl( // Configure bandwidth estimation. channel_receive_->RegisterReceiverCongestionControlObjects(packet_router); - // When output is muted, ChannelReceive will directly notify the source - // tracker of "delivered" frames, so RtpReceiver information will continue to - // be updated. - channel_receive_->SetSourceTracker(&source_tracker_); - // Complete configuration. // TODO(solenberg): Config NACK history window (which is a packet count), // using the actual packet size for the configured codec. @@ -378,19 +372,13 @@ int AudioReceiveStreamImpl::GetBaseMinimumPlayoutDelayMs() const { std::vector AudioReceiveStreamImpl::GetSources() const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - return source_tracker_.GetSources(); + return channel_receive_->GetSources(); } AudioMixer::Source::AudioFrameInfo AudioReceiveStreamImpl::GetAudioFrameWithInfo(int sample_rate_hz, AudioFrame* audio_frame) { - AudioMixer::Source::AudioFrameInfo audio_frame_info = - channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame); - if (audio_frame_info != AudioMixer::Source::AudioFrameInfo::kError && - !audio_frame->packet_infos_.empty()) { - source_tracker_.OnFrameDelivered(audio_frame->packet_infos_); - } - return audio_frame_info; + return channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame); } int AudioReceiveStreamImpl::Ssrc() const { diff --git a/audio/audio_receive_stream.h b/audio/audio_receive_stream.h index 4eac8a7180..cb1450be14 100644 --- a/audio/audio_receive_stream.h +++ b/audio/audio_receive_stream.h @@ -25,7 +25,6 @@ #include "audio/audio_state.h" #include "call/audio_receive_stream.h" #include "call/syncable.h" -#include "modules/rtp_rtcp/source/source_tracker.h" #include "rtc_base/system/no_unique_address.h" namespace webrtc { @@ -156,7 +155,6 @@ class AudioReceiveStreamImpl final : public webrtc::AudioReceiveStreamInterface, SequenceChecker::kDetached}; webrtc::AudioReceiveStreamInterface::Config config_; rtc::scoped_refptr audio_state_; - SourceTracker source_tracker_; const std::unique_ptr channel_receive_; AudioSendStream* associated_send_stream_ RTC_GUARDED_BY(packet_sequence_checker_) = nullptr; diff --git a/audio/audio_receive_stream_unittest.cc b/audio/audio_receive_stream_unittest.cc index a263ff9683..0afda40733 100644 --- a/audio/audio_receive_stream_unittest.cc +++ b/audio/audio_receive_stream_unittest.cc @@ -132,7 +132,6 @@ struct ConfigHelper { .WillRepeatedly(Invoke([](const std::map& codecs) { EXPECT_THAT(codecs, ::testing::IsEmpty()); })); - EXPECT_CALL(*channel_receive_, SetSourceTracker(_)); EXPECT_CALL(*channel_receive_, GetLocalSsrc()) .WillRepeatedly(Return(kLocalSsrc)); diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index 7fcdf66cf8..86c8cfbae3 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -171,7 +171,7 @@ class ChannelReceive : public ChannelReceiveInterface, int PreferredSampleRate() const override; - void SetSourceTracker(SourceTracker* source_tracker) override; + std::vector GetSources() const override; // Associate to a send channel. // Used for obtaining RTT for a receive-only channel. @@ -240,7 +240,7 @@ class ChannelReceive : public ChannelReceiveInterface, std::unique_ptr rtp_receive_statistics_; std::unique_ptr rtp_rtcp_; const uint32_t remote_ssrc_; - SourceTracker* source_tracker_ = nullptr; + SourceTracker source_tracker_ RTC_GUARDED_BY(&worker_thread_checker_); // Info for GetSyncInfo is updated on network or worker thread, and queried on // the worker thread. @@ -325,20 +325,18 @@ void ChannelReceive::OnReceivedPayloadData( // Avoid inserting into NetEQ when we are not playing. Count the // packet as discarded. - // If we have a source_tracker_, tell it that the frame has been - // "delivered". Normally, this happens in AudioReceiveStreamInterface when - // audio frames are pulled out, but when playout is muted, nothing is - // pulling frames. The downside of this approach is that frames delivered - // this way won't be delayed for playout, and therefore will be - // unsynchronized with (a) audio delay when playing and (b) any audio/video - // synchronization. But the alternative is that muting playout also stops - // the SourceTracker from updating RtpSource information. - if (source_tracker_) { - RtpPacketInfos::vector_type packet_vector = { - RtpPacketInfo(rtpHeader, receive_time)}; - source_tracker_->OnFrameDelivered(RtpPacketInfos(packet_vector)); - } - + // Tell source_tracker_ that the frame has been "delivered". Normally, this + // happens in AudioReceiveStreamInterface when audio frames are pulled out, + // but when playout is muted, nothing is pulling frames. The downside of + // this approach is that frames delivered this way won't be delayed for + // playout, and therefore will be unsynchronized with (a) audio delay when + // playing and (b) any audio/video synchronization. But the alternative is + // that muting playout also stops the SourceTracker from updating RtpSource + // information. + RtpPacketInfos::vector_type packet_vector = { + RtpPacketInfo(rtpHeader, receive_time)}; + source_tracker_.OnFrameDelivered(RtpPacketInfos(packet_vector), + env_.clock().CurrentTime()); return; } @@ -482,7 +480,16 @@ AudioMixer::Source::AudioFrameInfo ChannelReceive::GetAudioFrameWithInfo( } packet_infos.push_back(std::move(new_packet_info)); } - audio_frame->packet_infos_ = RtpPacketInfos(packet_infos); + audio_frame->packet_infos_ = RtpPacketInfos(std::move(packet_infos)); + if (!audio_frame->packet_infos_.empty()) { + RtpPacketInfos infos_copy = audio_frame->packet_infos_; + Timestamp delivery_time = env_.clock().CurrentTime(); + worker_thread_->PostTask( + SafeTask(worker_safety_.flag(), [this, infos_copy, delivery_time]() { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + source_tracker_.OnFrameDelivered(infos_copy, delivery_time); + })); + } ++audio_frame_interval_count_; if (audio_frame_interval_count_ >= kHistogramReportingInterval) { @@ -514,10 +521,6 @@ int ChannelReceive::PreferredSampleRate() const { acm_receiver_.last_output_sample_rate_hz()); } -void ChannelReceive::SetSourceTracker(SourceTracker* source_tracker) { - source_tracker_ = source_tracker; -} - ChannelReceive::ChannelReceive( const Environment& env, NetEqFactory* neteq_factory, @@ -538,6 +541,7 @@ ChannelReceive::ChannelReceive( worker_thread_(TaskQueueBase::Current()), rtp_receive_statistics_(ReceiveStatistics::Create(&env_.clock())), remote_ssrc_(remote_ssrc), + source_tracker_(&env_.clock()), acm_receiver_(env_, AcmConfig(neteq_factory, decoder_factory, @@ -1102,6 +1106,11 @@ int ChannelReceive::GetRtpTimestampRateHz() const { : acm_receiver_.last_output_sample_rate_hz(); } +std::vector ChannelReceive::GetSources() const { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + return source_tracker_.GetSources(); +} + } // namespace std::unique_ptr CreateChannelReceive( diff --git a/audio/channel_receive.h b/audio/channel_receive.h index 0a1157324d..4e2048daac 100644 --- a/audio/channel_receive.h +++ b/audio/channel_receive.h @@ -148,9 +148,7 @@ class ChannelReceiveInterface : public RtpPacketSinkInterface { virtual int PreferredSampleRate() const = 0; - // Sets the source tracker to notify about "delivered" packets when output is - // muted. - virtual void SetSourceTracker(SourceTracker* source_tracker) = 0; + virtual std::vector GetSources() const = 0; // Associate to a send channel. // Used for obtaining RTT for a receive-only channel. diff --git a/audio/mock_voe_channel_proxy.h b/audio/mock_voe_channel_proxy.h index 1a62f02937..fc1cecbdca 100644 --- a/audio/mock_voe_channel_proxy.h +++ b/audio/mock_voe_channel_proxy.h @@ -62,7 +62,7 @@ class MockChannelReceive : public voe::ChannelReceiveInterface { (int sample_rate_hz, AudioFrame*), (override)); MOCK_METHOD(int, PreferredSampleRate, (), (const, override)); - MOCK_METHOD(void, SetSourceTracker, (SourceTracker*), (override)); + MOCK_METHOD(std::vector, GetSources, (), (const, override)); MOCK_METHOD(void, SetAssociatedSendChannel, (const voe::ChannelSendInterface*), diff --git a/modules/rtp_rtcp/source/source_tracker.cc b/modules/rtp_rtcp/source/source_tracker.cc index cd881f1443..4d1f40641f 100644 --- a/modules/rtp_rtcp/source/source_tracker.cc +++ b/modules/rtp_rtcp/source/source_tracker.cc @@ -17,42 +17,26 @@ namespace webrtc { -SourceTracker::SourceTracker(Clock* clock) - : worker_thread_(TaskQueueBase::Current()), clock_(clock) { - RTC_DCHECK(worker_thread_); +SourceTracker::SourceTracker(Clock* clock) : clock_(clock) { RTC_DCHECK(clock_); } -void SourceTracker::OnFrameDelivered(RtpPacketInfos packet_infos) { +void SourceTracker::OnFrameDelivered(const RtpPacketInfos& packet_infos, + Timestamp delivery_time) { + TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered"); if (packet_infos.empty()) { return; } - - Timestamp now = clock_->CurrentTime(); - if (worker_thread_->IsCurrent()) { - RTC_DCHECK_RUN_ON(worker_thread_); - OnFrameDeliveredInternal(now, packet_infos); - } else { - worker_thread_->PostTask( - SafeTask(worker_safety_.flag(), - [this, packet_infos = std::move(packet_infos), now]() { - RTC_DCHECK_RUN_ON(worker_thread_); - OnFrameDeliveredInternal(now, packet_infos); - })); + if (delivery_time.IsInfinite()) { + delivery_time = clock_->CurrentTime(); } -} - -void SourceTracker::OnFrameDeliveredInternal( - Timestamp now, - const RtpPacketInfos& packet_infos) { - TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered"); for (const RtpPacketInfo& packet_info : packet_infos) { for (uint32_t csrc : packet_info.csrcs()) { SourceKey key(RtpSourceType::CSRC, csrc); SourceEntry& entry = UpdateEntry(key); - entry.timestamp = now; + entry.timestamp = delivery_time; entry.audio_level = packet_info.audio_level(); entry.absolute_capture_time = packet_info.absolute_capture_time(); entry.local_capture_clock_offset = @@ -63,19 +47,17 @@ void SourceTracker::OnFrameDeliveredInternal( SourceKey key(RtpSourceType::SSRC, packet_info.ssrc()); SourceEntry& entry = UpdateEntry(key); - entry.timestamp = now; + entry.timestamp = delivery_time; entry.audio_level = packet_info.audio_level(); entry.absolute_capture_time = packet_info.absolute_capture_time(); entry.local_capture_clock_offset = packet_info.local_capture_clock_offset(); entry.rtp_timestamp = packet_info.rtp_timestamp(); } - PruneEntries(now); + PruneEntries(delivery_time); } std::vector SourceTracker::GetSources() const { - RTC_DCHECK_RUN_ON(worker_thread_); - PruneEntries(clock_->CurrentTime()); std::vector sources; diff --git a/modules/rtp_rtcp/source/source_tracker.h b/modules/rtp_rtcp/source/source_tracker.h index 9d39599fa6..3be339f603 100644 --- a/modules/rtp_rtcp/source/source_tracker.h +++ b/modules/rtp_rtcp/source/source_tracker.h @@ -19,8 +19,6 @@ #include #include "api/rtp_packet_infos.h" -#include "api/task_queue/pending_task_safety_flag.h" -#include "api/task_queue/task_queue_base.h" #include "api/transport/rtp/rtp_source.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" @@ -34,6 +32,7 @@ namespace webrtc { // - https://w3c.github.io/webrtc-pc/#dom-rtcrtpcontributingsource // - https://w3c.github.io/webrtc-pc/#dom-rtcrtpsynchronizationsource // +// This class is thread unsafe. class SourceTracker { public: // Amount of time before the entry associated with an update is removed. See: @@ -49,7 +48,8 @@ class SourceTracker { // Updates the source entries when a frame is delivered to the // RTCRtpReceiver's MediaStreamTrack. - void OnFrameDelivered(RtpPacketInfos packet_infos); + void OnFrameDelivered(const RtpPacketInfos& packet_infos, + Timestamp delivery_time = Timestamp::MinusInfinity()); // Returns an `RtpSource` for each unique SSRC and CSRC identifier updated in // the last `kTimeoutMs` milliseconds. Entries appear in reverse chronological @@ -116,27 +116,21 @@ class SourceTracker { SourceKeyHasher, SourceKeyComparator>; - void OnFrameDeliveredInternal(Timestamp now, - const RtpPacketInfos& packet_infos) - RTC_RUN_ON(worker_thread_); - // Updates an entry by creating it (if it didn't previously exist) and moving // it to the front of the list. Returns a reference to the entry. - SourceEntry& UpdateEntry(const SourceKey& key) RTC_RUN_ON(worker_thread_); + SourceEntry& UpdateEntry(const SourceKey& key); // Removes entries that have timed out. Marked as "const" so that we can do // pruning in getters. - void PruneEntries(Timestamp now) const RTC_RUN_ON(worker_thread_); + void PruneEntries(Timestamp now) const; - TaskQueueBase* const worker_thread_; Clock* const clock_; // Entries are stored in reverse chronological order (i.e. with the most // recently updated entries appearing first). Mutability is needed for timeout // pruning in const functions. - mutable SourceList list_ RTC_GUARDED_BY(worker_thread_); - mutable SourceMap map_ RTC_GUARDED_BY(worker_thread_); - ScopedTaskSafety worker_safety_; + mutable SourceList list_; + mutable SourceMap map_; }; } // namespace webrtc diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index c6a20ef25d..4372fdabdd 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -625,11 +625,11 @@ int VideoReceiveStream2::GetBaseMinimumPlayoutDelayMs() const { } void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) { - source_tracker_.OnFrameDelivered(video_frame.packet_infos()); config_.renderer->OnFrame(video_frame); - // TODO(bugs.webrtc.org/10739): we should set local capture clock offset for - // `video_frame.packet_infos`. But VideoFrame is const qualified here. + // TODO: bugs.webrtc.org/42220804 - we should set local capture clock offset + // for `packet_infos`. + RtpPacketInfos packet_infos = video_frame.packet_infos(); // For frame delay metrics, calculated in `OnRenderedFrame`, to better reflect // user experience measurements must be done as close as possible to frame @@ -640,7 +640,7 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) { // rendered" callback from the renderer. VideoFrameMetaData frame_meta(video_frame, env_.clock().CurrentTime()); call_->worker_thread()->PostTask( - SafeTask(task_safety_.flag(), [frame_meta, this]() { + SafeTask(task_safety_.flag(), [frame_meta, packet_infos, this]() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); int64_t video_playout_ntp_ms; int64_t sync_offset_ms; @@ -652,6 +652,8 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) { estimated_freq_khz); } stats_proxy_.OnRenderedFrame(frame_meta); + source_tracker_.OnFrameDelivered(packet_infos, + frame_meta.decode_timestamp); })); webrtc::MutexLock lock(&pending_resolution_mutex_); @@ -1047,6 +1049,7 @@ void VideoReceiveStream2::UpdatePlayoutDelays() const { } std::vector VideoReceiveStream2::GetSources() const { + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); return source_tracker_.GetSources(); } diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index f161f63e1b..2c7eb4c536 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -264,7 +264,7 @@ class VideoReceiveStream2 bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false; bool decoder_stopped_ RTC_GUARDED_BY(decode_sequence_checker_) = true; - SourceTracker source_tracker_; + SourceTracker source_tracker_ RTC_GUARDED_BY(worker_sequence_checker_); ReceiveStatisticsProxy stats_proxy_; // Shared by media and rtx stream receivers, since the latter has no RtpRtcp // module of its own.