diff --git a/audio/BUILD.gn b/audio/BUILD.gn index ccbf9fd2e3..bbf6bdd0d9 100644 --- a/audio/BUILD.gn +++ b/audio/BUILD.gn @@ -95,6 +95,7 @@ rtc_library("audio") { "../rtc_base/experiments:field_trial_parser", "../rtc_base/synchronization:mutex", "../rtc_base/system:no_unique_address", + "../rtc_base/task_utils:pending_task_safety_flag", "../rtc_base/task_utils:to_queued_task", "../system_wrappers", "../system_wrappers:field_trial", diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc index f221b4e6b5..a92103104c 100644 --- a/audio/channel_receive.cc +++ b/audio/channel_receive.cc @@ -21,6 +21,7 @@ #include "api/frame_transformer_interface.h" #include "api/rtc_event_log/rtc_event_log.h" #include "api/sequence_checker.h" +#include "api/task_queue/task_queue_base.h" #include "audio/audio_level.h" #include "audio/channel_receive_frame_transformer_delegate.h" #include "audio/channel_send.h" @@ -47,6 +48,8 @@ #include "rtc_base/race_checker.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/no_unique_address.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/metrics.h" @@ -177,23 +180,22 @@ class ChannelReceive : public ChannelReceiveInterface { private: void ReceivePacket(const uint8_t* packet, size_t packet_length, - const RTPHeader& header); + const RTPHeader& header) + RTC_RUN_ON(worker_thread_checker_); int ResendPackets(const uint16_t* sequence_numbers, int length); - void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms); + void UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) + RTC_RUN_ON(worker_thread_checker_); int GetRtpTimestampRateHz() const; int64_t GetRTT() const; void OnReceivedPayloadData(rtc::ArrayView payload, - const RTPHeader& rtpHeader); + const RTPHeader& rtpHeader) + RTC_RUN_ON(worker_thread_checker_); void InitFrameTransformerDelegate( - rtc::scoped_refptr frame_transformer); - - bool Playing() const { - MutexLock lock(&playing_lock_); - return playing_; - } + rtc::scoped_refptr frame_transformer) + RTC_RUN_ON(worker_thread_checker_); // Thread checkers document and lock usage of some methods to specific threads // we know about. The goal is to eventually split up voe::ChannelReceive into @@ -202,17 +204,18 @@ class ChannelReceive : public ChannelReceiveInterface { RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_; RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_; + TaskQueueBase* const worker_thread_; + ScopedTaskSafety worker_safety_; + // Methods accessed from audio and video threads are checked for sequential- // only access. We don't necessarily own and control these threads, so thread // checkers cannot be used. E.g. Chromium may transfer "ownership" from one // audio thread to another, but access is still sequential. rtc::RaceChecker audio_thread_race_checker_; - rtc::RaceChecker video_capture_thread_race_checker_; Mutex callback_mutex_; Mutex volume_settings_mutex_; - mutable Mutex playing_lock_; - bool playing_ RTC_GUARDED_BY(&playing_lock_) = false; + bool playing_ RTC_GUARDED_BY(worker_thread_checker_) = false; RtcEventLog* const event_log_; @@ -226,11 +229,10 @@ class ChannelReceive : public ChannelReceiveInterface { // Info for GetSyncInfo is updated on network or worker thread, and queried on // the worker thread. - mutable Mutex sync_info_lock_; absl::optional last_received_rtp_timestamp_ - RTC_GUARDED_BY(&sync_info_lock_); + RTC_GUARDED_BY(&worker_thread_checker_); absl::optional last_received_rtp_system_time_ms_ - RTC_GUARDED_BY(&sync_info_lock_); + RTC_GUARDED_BY(&worker_thread_checker_); // The AcmReceiver is thread safe, using its own lock. acm2::AcmReceiver acm_receiver_; @@ -243,15 +245,14 @@ class ChannelReceive : public ChannelReceiveInterface { // Timestamp of the audio pulled from NetEq. absl::optional jitter_buffer_playout_timestamp_; - mutable Mutex video_sync_lock_; - uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(video_sync_lock_); + uint32_t playout_timestamp_rtp_ RTC_GUARDED_BY(worker_thread_checker_); absl::optional playout_timestamp_rtp_time_ms_ - RTC_GUARDED_BY(video_sync_lock_); - uint32_t playout_delay_ms_ RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); + uint32_t playout_delay_ms_ RTC_GUARDED_BY(worker_thread_checker_); absl::optional playout_timestamp_ntp_ - RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); absl::optional playout_timestamp_ntp_time_ms_ - RTC_GUARDED_BY(video_sync_lock_); + RTC_GUARDED_BY(worker_thread_checker_); mutable Mutex ts_stats_lock_; @@ -288,7 +289,7 @@ class ChannelReceive : public ChannelReceiveInterface { void ChannelReceive::OnReceivedPayloadData( rtc::ArrayView payload, const RTPHeader& rtpHeader) { - if (!Playing()) { + if (!playing_) { // Avoid inserting into NetEQ when we are not playing. Count the // packet as discarded. @@ -331,18 +332,21 @@ void ChannelReceive::InitFrameTransformerDelegate( rtc::scoped_refptr frame_transformer) { RTC_DCHECK(frame_transformer); RTC_DCHECK(!frame_transformer_delegate_); + RTC_DCHECK(worker_thread_); + RTC_DCHECK(worker_thread_->IsCurrent()); // Pass a callback to ChannelReceive::OnReceivedPayloadData, to be called by // the delegate to receive transformed audio. ChannelReceiveFrameTransformerDelegate::ReceiveFrameCallback receive_audio_callback = [this](rtc::ArrayView packet, const RTPHeader& header) { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); OnReceivedPayloadData(packet, header); }; frame_transformer_delegate_ = rtc::make_ref_counted( std::move(receive_audio_callback), std::move(frame_transformer), - rtc::Thread::Current()); + worker_thread_); frame_transformer_delegate_->Init(); } @@ -453,18 +457,19 @@ AudioMixer::Source::AudioFrameInfo ChannelReceive::GetAudioFrameWithInfo( } audio_frame->packet_infos_ = RtpPacketInfos(packet_infos); - { + RTC_DCHECK(worker_thread_); + worker_thread_->PostTask(ToQueuedTask(worker_safety_, [this]() { + RTC_DCHECK_RUN_ON(&worker_thread_checker_); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.TargetJitterBufferDelayMs", acm_receiver_.TargetDelayMs()); const int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs(); - MutexLock lock(&video_sync_lock_); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDelayEstimateMs", jitter_buffer_delay + playout_delay_ms_); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverJitterBufferDelayMs", jitter_buffer_delay); RTC_HISTOGRAM_COUNTS_1000("WebRTC.Audio.ReceiverDeviceDelayMs", playout_delay_ms_); - } + })); return muted ? AudioMixer::Source::AudioFrameInfo::kMuted : AudioMixer::Source::AudioFrameInfo::kNormal; @@ -499,7 +504,8 @@ ChannelReceive::ChannelReceive( rtc::scoped_refptr frame_decryptor, const webrtc::CryptoOptions& crypto_options, rtc::scoped_refptr frame_transformer) - : event_log_(rtc_event_log), + : worker_thread_(TaskQueueBase::Current()), + event_log_(rtc_event_log), rtp_receive_statistics_(ReceiveStatistics::Create(clock)), remote_ssrc_(remote_ssrc), acm_receiver_(AcmConfig(neteq_factory, @@ -522,6 +528,7 @@ ChannelReceive::ChannelReceive( frame_decryptor_(frame_decryptor), crypto_options_(crypto_options), absolute_capture_time_interpolator_(clock) { + RTC_DCHECK(worker_thread_); RTC_DCHECK(module_process_thread_); RTC_DCHECK(audio_device_module); @@ -582,13 +589,11 @@ void ChannelReceive::SetSink(AudioSinkInterface* sink) { void ChannelReceive::StartPlayout() { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&playing_lock_); playing_ = true; } void ChannelReceive::StopPlayout() { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&playing_lock_); playing_ = false; _outputAudioLevel.ResetLevelFullRange(); } @@ -616,11 +621,8 @@ void ChannelReceive::OnRtpPacket(const RtpPacketReceived& packet) { // UpdatePlayoutTimestamp and int64_t now_ms = rtc::TimeMillis(); - { - MutexLock lock(&sync_info_lock_); - last_received_rtp_timestamp_ = packet.Timestamp(); - last_received_rtp_system_time_ms_ = now_ms; - } + last_received_rtp_timestamp_ = packet.Timestamp(); + last_received_rtp_system_time_ms_ = now_ms; // Store playout timestamp for the received RTP packet UpdatePlayoutTimestamp(false, now_ms); @@ -893,14 +895,8 @@ AudioDecodingCallStats ChannelReceive::GetDecodingCallStatistics() const { uint32_t ChannelReceive::GetDelayEstimate() const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - - uint32_t playout_delay; - { - MutexLock lock(&video_sync_lock_); - playout_delay = playout_delay_ms_; - } // Return the current jitter buffer delay + playout delay. - return acm_receiver_.FilteredCurrentDelayMs() + playout_delay; + return acm_receiver_.FilteredCurrentDelayMs() + playout_delay_ms_; } bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) { @@ -922,21 +918,17 @@ bool ChannelReceive::SetMinimumPlayoutDelay(int delay_ms) { bool ChannelReceive::GetPlayoutRtpTimestamp(uint32_t* rtp_timestamp, int64_t* time_ms) const { - RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_); - { - MutexLock lock(&video_sync_lock_); - if (!playout_timestamp_rtp_time_ms_) - return false; - *rtp_timestamp = playout_timestamp_rtp_; - *time_ms = playout_timestamp_rtp_time_ms_.value(); - return true; - } + RTC_DCHECK_RUN_ON(&worker_thread_checker_); + if (!playout_timestamp_rtp_time_ms_) + return false; + *rtp_timestamp = playout_timestamp_rtp_; + *time_ms = playout_timestamp_rtp_time_ms_.value(); + return true; } void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms, int64_t time_ms) { - RTC_DCHECK_RUNS_SERIALIZED(&video_capture_thread_race_checker_); - MutexLock lock(&video_sync_lock_); + RTC_DCHECK_RUN_ON(&worker_thread_checker_); playout_timestamp_ntp_ = ntp_timestamp_ms; playout_timestamp_ntp_time_ms_ = time_ms; } @@ -944,7 +936,6 @@ void ChannelReceive::SetEstimatedPlayoutNtpTimestampMs(int64_t ntp_timestamp_ms, absl::optional ChannelReceive::GetCurrentEstimatedPlayoutNtpTimestampMs(int64_t now_ms) const { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - MutexLock lock(&video_sync_lock_); if (!playout_timestamp_ntp_ || !playout_timestamp_ntp_time_ms_) return absl::nullopt; @@ -974,24 +965,19 @@ absl::optional ChannelReceive::GetSyncInfo() const { return absl::nullopt; } - { - MutexLock lock(&sync_info_lock_); - if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { - return absl::nullopt; - } - info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; - info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; + if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) { + return absl::nullopt; } + info.latest_received_capture_timestamp = *last_received_rtp_timestamp_; + info.latest_receive_time_ms = *last_received_rtp_system_time_ms_; int jitter_buffer_delay = acm_receiver_.FilteredCurrentDelayMs(); - { - MutexLock lock(&video_sync_lock_); - info.current_delay_ms = jitter_buffer_delay + playout_delay_ms_; - } + info.current_delay_ms = jitter_buffer_delay + playout_delay_ms_; return info; } +// RTC_RUN_ON(worker_thread_checker_) void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) { // TODO(bugs.webrtc.org/11993): Expect to be called exclusively on the // network thread. Once that's done, we won't need video_sync_lock_. @@ -1018,14 +1004,11 @@ void ChannelReceive::UpdatePlayoutTimestamp(bool rtcp, int64_t now_ms) { // Remove the playout delay. playout_timestamp -= (delay_ms * (GetRtpTimestampRateHz() / 1000)); - { - MutexLock lock(&video_sync_lock_); - if (!rtcp && playout_timestamp != playout_timestamp_rtp_) { - playout_timestamp_rtp_ = playout_timestamp; - playout_timestamp_rtp_time_ms_ = now_ms; - } - playout_delay_ms_ = delay_ms; + if (!rtcp && playout_timestamp != playout_timestamp_rtp_) { + playout_timestamp_rtp_ = playout_timestamp; + playout_timestamp_rtp_time_ms_ = now_ms; } + playout_delay_ms_ = delay_ms; } int ChannelReceive::GetRtpTimestampRateHz() const { diff --git a/audio/channel_receive_frame_transformer_delegate.cc b/audio/channel_receive_frame_transformer_delegate.cc index 261afbb100..7e617df780 100644 --- a/audio/channel_receive_frame_transformer_delegate.cc +++ b/audio/channel_receive_frame_transformer_delegate.cc @@ -47,7 +47,7 @@ class TransformableAudioFrame : public TransformableAudioFrameInterface { ChannelReceiveFrameTransformerDelegate::ChannelReceiveFrameTransformerDelegate( ReceiveFrameCallback receive_frame_callback, rtc::scoped_refptr frame_transformer, - rtc::Thread* channel_receive_thread) + TaskQueueBase* channel_receive_thread) : receive_frame_callback_(receive_frame_callback), frame_transformer_(std::move(frame_transformer)), channel_receive_thread_(channel_receive_thread) {} diff --git a/audio/channel_receive_frame_transformer_delegate.h b/audio/channel_receive_frame_transformer_delegate.h index 0af748e37f..f59834d24e 100644 --- a/audio/channel_receive_frame_transformer_delegate.h +++ b/audio/channel_receive_frame_transformer_delegate.h @@ -32,7 +32,7 @@ class ChannelReceiveFrameTransformerDelegate : public TransformedFrameCallback { ChannelReceiveFrameTransformerDelegate( ReceiveFrameCallback receive_frame_callback, rtc::scoped_refptr frame_transformer, - rtc::Thread* channel_receive_thread); + TaskQueueBase* channel_receive_thread); // Registers |this| as callback for |frame_transformer_|, to get the // transformed frames. @@ -67,7 +67,7 @@ class ChannelReceiveFrameTransformerDelegate : public TransformedFrameCallback { RTC_GUARDED_BY(sequence_checker_); rtc::scoped_refptr frame_transformer_ RTC_GUARDED_BY(sequence_checker_); - rtc::Thread* channel_receive_thread_; + TaskQueueBase* const channel_receive_thread_; }; } // namespace webrtc