From e2e046452aadb0d3ac64a74bd92ba8458f6bfbfd Mon Sep 17 00:00:00 2001 From: Tommi Date: Wed, 9 Jun 2021 16:11:11 +0200 Subject: [PATCH] Remove a couple of locks from ChannelReceive and add thread checks. * Removes playing_lock_, sync_info_lock_ and video_sync_lock_. * Also remove video_capture_thread_race_checker_ which was redundant. Only video_sync_lock_ was actually needed. The other two aren't needed anymore because of changes made to RtpStreamsSynchronizer class last year (see webrtc:11489). In the one case where we had a lock, we post a task to the thread where the state is maintained. This task is for capturing histograms which I'm not sure we should have been capturing on the audio thread anyway. Also making ChannelReceiveFrameTransformerDelegate compatible with more tests by using TaskQueueBase instead of rtc::Thread. A number of tests that instantiate ChannelReceive (and thereby CRFTD) set the worker thread as a TQ and not actually an rtc::Thread instance. In those cases CRFTD would previously have gotten a nullptr for the worker thread. Bug: webrtc:11993 Change-Id: I59f4b2afbfedb06f241d9a613f8538adc19cd6d8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221364 Commit-Queue: Tommi Reviewed-by: Henrik Lundin Reviewed-by: Markus Handell Cr-Commit-Position: refs/heads/master@{#34257} --- audio/BUILD.gn | 1 + audio/channel_receive.cc | 125 ++++++++---------- ...nnel_receive_frame_transformer_delegate.cc | 2 +- ...annel_receive_frame_transformer_delegate.h | 4 +- 4 files changed, 58 insertions(+), 74 deletions(-) diff --git a/audio/BUILD.gn b/audio/BUILD.gn index ccbf9fd2e3..bbf6bdd0d9 100644 --- a/audio/BUILD.gn +++ b/audio/BUILD.gn @@ -95,6 +95,7 @@ rtc_library("audio") { "../rtc_base/experiments:field_trial_parser", "../rtc_base/synchronization:mutex", "../rtc_base/system:no_unique_address", + "../rtc_base/task_utils:pending_task_safety_flag", "../rtc_base/task_utils:to_queued_task", "../system_wrappers", "../system_wrappers:field_trial", diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index f221b4e6b5..a92103104c 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -21,6 +21,7 @@ #include "api/frame_transformer_interface.h" #include "api/rtc_event_log/rtc_event_log.h" #include "api/sequence_checker.h" +#include "api/task_queue/task_queue_base.h" #include "audio/audio_level.h" #include "audio/channel_receive_frame_transformer_delegate.h" #include "audio/channel_send.h" @@ -47,6 +48,8 @@ #include "rtc_base/race_checker.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/no_unique_address.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/metrics.h" @@ -177,23 +180,22 @@ class ChannelReceive : public ChannelReceiveInterface { private: void ReceivePacket(const uint8_t* packet, size_t packet_length, - const RTPHeader& header); + const RTPHeader& header) + RTC_RUN_ON(worker_thread_checker_); int ResendPackets(const uint16_t* sequence_numbers, int length); - void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms); + void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) + RTC_RUN_ON(worker_thread_checker_); int GetRtpTimestampRateHz() const; int64_t GetRTT() const; void OnReceivedPayloadData(rtc::ArrayView payload, - const RTPHeader& rtpHeader); + const RTPHeader& rtpHeader) + RTC_RUN_ON(worker_thread_checker_); void InitFrameTransformerDelegate( - rtc::scoped_refptr frame_transformer); - - bool Playing() const { - MutexLock lock(&playing_lock_); - return playing_; - } + rtc::scoped_refptr frame_transformer) + RTC_RUN_ON(worker_thread_checker_); // Thread checkers document and lock usage of some methods to specific threads // we know about. The goal is to eventually split up voe::ChannelReceive into @@ -202,17 +204,18 @@ class ChannelReceive : public ChannelReceiveInterface { RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_; + TaskQueueBase* const worker_thread_; + ScopedTaskSafety worker_safety_; + // Methods accessed from audio and video threads are checked for sequential- // only access. We don't necessarily own and control these threads, so thread // checkers cannot be used. E.g. Chromium may transfer "ownership" from one // audio thread to another, but access is still sequential. rtc::RaceChecker audio_thread_race_checker_; - rtc::RaceChecker video_capture_thread_race_checker_; Mutex callback_mutex_; Mutex volume_settings_mutex_; - mutable Mutex playing_lock_; - bool playing_ RTC_GUARDED_BY(&playing_lock_) = false; + bool playing_ RTC_GUARDED_BY(worker_thread_checker_) = false; RtcEventLog* const event_log_; @@ -226,11 +229,10 @@ class ChannelReceive : public ChannelReceiveInterface { // Info for GetSyncInfo is updated on network or worker thread, and queried on // the worker thread. - mutable Mutex sync_info_lock_; absl::optional last_received_rtp_timestamp_ - RTC_GUARDED_BY(&sync_info_lock_); + RTC_GUARDED_BY(&worker_thread_checker_); absl::optional last_received_rtp_system_time_ms_ - RTC_GUARDED_BY(&sync_info_lock_); + RTC_GUARDED_BY(&worker_thread_checker_); // The AcmReceiver is thread safe, using its own lock. acm2::AcmReceiver acm_receiver_; @@ -243,15 +245,14 @@ class ChannelReceive : public ChannelReceiveInterface { // Timestamp of the audio pulled from NetEq. absl::optional jitter_buffer_playout_timestamp_; - mutable Mutex video_sync_lock_; - uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(video_sync_lock_); + uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(worker_thread_checker_); absl::optional playout_timestamp_rtp_time_ms_ - RTC_GUARDED_BY(video_sync_lock_); - uint32_t playout_delay_ms_ RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); + uint32_t playout_delay_ms_ RTC_GUARDED_BY(worker_thread_checker_); absl::optional playout_timestamp_ntp_ - RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); absl::optional playout_timestamp_ntp_time_ms_ - RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); mutable Mutex ts_stats_lock_; @@ -288,7 +289,7 @@ class ChannelReceive : public ChannelReceiveInterface { void ChannelReceive::OnReceivedPayloadData( rtc::ArrayView payload, const RTPHeader& rtpHeader) { - if (!Playing()) { + if (!playing_) { // Avoid inserting into NetEQ when we are not playing. Count the // packet as discarded. @@ -331,18 +332,21 @@ void ChannelReceive::InitFrameTransformerDelegate( rtc::scoped_refptr frame_transformer) { RTC_DCHECK(frame_transformer); RTC_DCHECK(!frame_transformer_delegate_); + RTC_DCHECK(worker_thread_); + RTC_DCHECK(worker_thread_->IsCurrent()); // Pass a callback to ChannelReceive::OnReceivedPayloadData, to be called by // the delegate to receive transformed audio. ChannelReceiveFrameTransformerDelegate::ReceiveFrameCallback receive_audio_callback = [this](rtc::ArrayView packet, const RTPHeader& header) { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); OnReceivedPayloadData(packet, header); }; frame_transformer_delegate_ = rtc::make_ref_counted( std::move(receive_audio_callback), std::move(frame_transformer), - rtc::Thread::Current()); + worker_thread_); frame_transformer_delegate_->Init(); } @@ -453,18 +457,19 @@ AudioMixer::Source::AudioFrameInfo ChannelReceive::GetAudioFrameWithInfo( } audio_frame->packet_infos_ = RtpPacketInfos(packet_infos); - { + RTC_DCHECK(worker_thread_); + worker_thread_->PostTask(ToQueuedTask(worker_safety_, [this]() { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.TargetJitterBufferDelayMs", acm_receiver_.TargetDelayMs()); const int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs(); - MutexLock lock(&video_sync_lock_); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDelayEstimateMs", jitter_buffer_delay + playout_delay_ms_); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverJitterBufferDelayMs", jitter_buffer_delay); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDeviceDelayMs", playout_delay_ms_); - } + })); return muted ? AudioMixer::Source::AudioFrameInfo::kMuted : AudioMixer::Source::AudioFrameInfo::kNormal; @@ -499,7 +504,8 @@ ChannelReceive::ChannelReceive( rtc::scoped_refptr frame_decryptor, const webrtc::CryptoOptions& crypto_options, rtc::scoped_refptr frame_transformer) - : event_log_(rtc_event_log), + : worker_thread_(TaskQueueBase::Current()), + event_log_(rtc_event_log), rtp_receive_statistics_(ReceiveStatistics::Create(clock)), remote_ssrc_(remote_ssrc), acm_receiver_(AcmConfig(neteq_factory, @@ -522,6 +528,7 @@ ChannelReceive::ChannelReceive( frame_decryptor_(frame_decryptor), crypto_options_(crypto_options), absolute_capture_time_interpolator_(clock) { + RTC_DCHECK(worker_thread_); RTC_DCHECK(module_process_thread_); RTC_DCHECK(audio_device_module); @@ -582,13 +589,11 @@ void ChannelReceive::SetSink(AudioSinkInterface* sink) { void ChannelReceive::StartPlayout() { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&playing_lock_); playing_ = true; } void ChannelReceive::StopPlayout() { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&playing_lock_); playing_ = false; _outputAudioLevel.ResetLevelFullRange(); } @@ -616,11 +621,8 @@ void ChannelReceive::OnRtpPacket(const RtpPacketReceived& packet) { // UpdatePlayoutTimestamp and int64_t now_ms = rtc::TimeMillis(); - { - MutexLock lock(&sync_info_lock_); - last_received_rtp_timestamp_ = packet.Timestamp(); - last_received_rtp_system_time_ms_ = now_ms; - } + last_received_rtp_timestamp_ = packet.Timestamp(); + last_received_rtp_system_time_ms_ = now_ms; // Store playout timestamp for the received RTP packet UpdatePlayoutTimestamp(false, now_ms); @@ -893,14 +895,8 @@ AudioDecodingCallStats ChannelReceive::GetDecodingCallStatistics() const { uint32_t ChannelReceive::GetDelayEstimate() const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - - uint32_t playout_delay; - { - MutexLock lock(&video_sync_lock_); - playout_delay = playout_delay_ms_; - } // Return the current jitter buffer delay + playout delay. - return acm_receiver_.FilteredCurrentDelayMs() + playout_delay; + return acm_receiver_.FilteredCurrentDelayMs() + playout_delay_ms_; } bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) { @@ -922,21 +918,17 @@ bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) { bool ChannelReceive::GetPlayoutRtpTimestamp(uint32_t* rtp_timestamp, int64_t* time_ms) const { - RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_); - { - MutexLock lock(&video_sync_lock_); - if (!playout_timestamp_rtp_time_ms_) - return false; - *rtp_timestamp = playout_timestamp_rtp_; - *time_ms = playout_timestamp_rtp_time_ms_.value(); - return true; - } + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + if (!playout_timestamp_rtp_time_ms_) + return false; + *rtp_timestamp = playout_timestamp_rtp_; + *time_ms = playout_timestamp_rtp_time_ms_.value(); + return true; } void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms, int64_t time_ms) { - RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_); - MutexLock lock(&video_sync_lock_); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); playout_timestamp_ntp_ = ntp_timestamp_ms; playout_timestamp_ntp_time_ms_ = time_ms; } @@ -944,7 +936,6 @@ void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms, absl::optional ChannelReceive::GetCurrentEstimatedPlayoutNtpTimestampMs(int64_t now_ms) const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&video_sync_lock_); if (!playout_timestamp_ntp_ || !playout_timestamp_ntp_time_ms_) return absl::nullopt; @@ -974,24 +965,19 @@ absl::optional ChannelReceive::GetSyncInfo() const { return absl::nullopt; } - { - MutexLock lock(&sync_info_lock_); - if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { - return absl::nullopt; - } - info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; - info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; + if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { + return absl::nullopt; } + info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; + info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs(); - { - MutexLock lock(&video_sync_lock_); - info.current_delay_ms = jitter_buffer_delay + playout_delay_ms_; - } + info.current_delay_ms = jitter_buffer_delay + playout_delay_ms_; return info; } +// RTC_RUN_ON(worker_thread_checker_) void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) { // TODO(bugs.webrtc.org/11993): Expect to be called exclusively on the // network thread. Once that's done, we won't need video_sync_lock_. @@ -1018,14 +1004,11 @@ void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) { // Remove the playout delay. playout_timestamp -= (delay_ms * (GetRtpTimestampRateHz() / 1000)); - { - MutexLock lock(&video_sync_lock_); - if (!rtcp && playout_timestamp != playout_timestamp_rtp_) { - playout_timestamp_rtp_ = playout_timestamp; - playout_timestamp_rtp_time_ms_ = now_ms; - } - playout_delay_ms_ = delay_ms; + if (!rtcp && playout_timestamp != playout_timestamp_rtp_) { + playout_timestamp_rtp_ = playout_timestamp; + playout_timestamp_rtp_time_ms_ = now_ms; } + playout_delay_ms_ = delay_ms; } int ChannelReceive::GetRtpTimestampRateHz() const { diff --git a/audio/channel_receive_frame_transformer_delegate.cc b/audio/channel_receive_frame_transformer_delegate.cc index 261afbb100..7e617df780 100644 --- a/audio/channel_receive_frame_transformer_delegate.cc +++ b/audio/channel_receive_frame_transformer_delegate.cc @@ -47,7 +47,7 @@ class TransformableAudioFrame : public TransformableAudioFrameInterface { ChannelReceiveFrameTransformerDelegate::ChannelReceiveFrameTransformerDelegate( ReceiveFrameCallback receive_frame_callback, rtc::scoped_refptr frame_transformer, - rtc::Thread* channel_receive_thread) + TaskQueueBase* channel_receive_thread) : receive_frame_callback_(receive_frame_callback), frame_transformer_(std::move(frame_transformer)), channel_receive_thread_(channel_receive_thread) {} diff --git a/audio/channel_receive_frame_transformer_delegate.h b/audio/channel_receive_frame_transformer_delegate.h index 0af748e37f..f59834d24e 100644 --- a/audio/channel_receive_frame_transformer_delegate.h +++ b/audio/channel_receive_frame_transformer_delegate.h @@ -32,7 +32,7 @@ class ChannelReceiveFrameTransformerDelegate : public TransformedFrameCallback { ChannelReceiveFrameTransformerDelegate( ReceiveFrameCallback receive_frame_callback, rtc::scoped_refptr frame_transformer, - rtc::Thread* channel_receive_thread); + TaskQueueBase* channel_receive_thread); // Registers |this| as callback for |frame_transformer_|, to get the // transformed frames. @@ -67,7 +67,7 @@ class ChannelReceiveFrameTransformerDelegate : public TransformedFrameCallback { RTC_GUARDED_BY(sequence_checker_); rtc::scoped_refptr frame_transformer_ RTC_GUARDED_BY(sequence_checker_); - rtc::Thread* channel_receive_thread_; + TaskQueueBase* const channel_receive_thread_; }; } // namespace webrtc