diff --git a/audio/audio_send_stream.cc b/audio/audio_send_stream.cc index 68819a7f18..6d16a48bbd 100644 --- a/audio/audio_send_stream.cc +++ b/audio/audio_send_stream.cc @@ -85,6 +85,7 @@ AudioSendStream::AudioSendStream( Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, @@ -94,13 +95,14 @@ AudioSendStream::AudioSendStream( : AudioSendStream(clock, config, audio_state, + task_queue_factory, rtp_transport, bitrate_allocator, event_log, rtcp_rtt_stats, suspended_rtp_state, voe::CreateChannelSend(clock, - rtp_transport->GetWorkerQueue(), + task_queue_factory, module_process_thread, config.media_transport, /*overhead_observer=*/this, @@ -116,6 +118,7 @@ AudioSendStream::AudioSendStream( Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, RtcEventLog* event_log, diff --git a/audio/audio_send_stream.h b/audio/audio_send_stream.h index 0d35562d90..8bee3524d5 100644 --- a/audio/audio_send_stream.h +++ b/audio/audio_send_stream.h @@ -23,6 +23,7 @@ #include "rtc_base/constructor_magic.h" #include "rtc_base/experiments/audio_allocation_settings.h" #include "rtc_base/race_checker.h" +#include "rtc_base/task_queue.h" #include "rtc_base/thread_checker.h" namespace webrtc { @@ -42,6 +43,7 @@ class AudioSendStream final : public webrtc::AudioSendStream, AudioSendStream(Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, @@ -52,6 +54,7 @@ class AudioSendStream final : public webrtc::AudioSendStream, AudioSendStream(Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, RtcEventLog* event_log, diff --git a/audio/audio_send_stream_unittest.cc b/audio/audio_send_stream_unittest.cc index 80d5ad4d2e..1d946c5ed8 100644 --- a/audio/audio_send_stream_unittest.cc +++ b/audio/audio_send_stream_unittest.cc @@ -167,8 +167,8 @@ struct ConfigHelper { return std::unique_ptr( new internal::AudioSendStream( Clock::GetRealTimeClock(), stream_config_, audio_state_, - &rtp_transport_, &bitrate_allocator_, &event_log_, &rtcp_rtt_stats_, - absl::nullopt, + &GlobalTaskQueueFactory(), &rtp_transport_, &bitrate_allocator_, + &event_log_, &rtcp_rtt_stats_, absl::nullopt, std::unique_ptr(channel_send_))); } diff --git a/audio/channel_send.cc b/audio/channel_send.cc index 19edf3c088..9f95ebb90b 100644 --- a/audio/channel_send.cc +++ b/audio/channel_send.cc @@ -31,7 +31,6 @@ #include "modules/pacing/packet_router.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" #include "rtc_base/event.h" #include "rtc_base/format_macros.h" #include "rtc_base/location.h" @@ -88,7 +87,7 @@ class ChannelSend friend class VoERtcpObserver; ChannelSend(Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, @@ -181,8 +180,6 @@ class ChannelSend rtc::scoped_refptr frame_encryptor) override; private: - class ProcessAndEncodeAudioTask; - // From AudioPacketizationCallback in the ACM int32_t SendData(AudioFrameType frameType, uint8_t payloadType, @@ -200,20 +197,23 @@ class ChannelSend uint8_t payloadType, uint32_t timeStamp, rtc::ArrayView payload, - const RTPFragmentationHeader* fragmentation); + const RTPFragmentationHeader* fragmentation) + RTC_RUN_ON(encoder_queue_); int32_t SendMediaTransportAudio(AudioFrameType frameType, uint8_t payloadType, uint32_t timeStamp, rtc::ArrayView payload, - const RTPFragmentationHeader* fragmentation); + const RTPFragmentationHeader* fragmentation) + RTC_RUN_ON(encoder_queue_); // Return media transport or nullptr if using RTP. MediaTransportInterface* media_transport() { return media_transport_; } // Called on the encoder task queue when a new input audio frame is ready // for encoding. - void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input); + void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input) + RTC_RUN_ON(encoder_queue_); void OnReceivedRtt(int64_t rtt_ms); @@ -267,9 +267,7 @@ class ChannelSend const bool use_twcc_plr_for_ana_; - rtc::CriticalSection encoder_queue_lock_; - bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_lock_) = false; - rtc::TaskQueue* const encoder_queue_ = nullptr; + bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_) = false; MediaTransportInterface* const media_transport_; int media_transport_sequence_number_ RTC_GUARDED_BY(encoder_queue_) = 0; @@ -286,12 +284,17 @@ class ChannelSend RTC_GUARDED_BY(&media_transport_lock_); // E2EE Audio Frame Encryption - rtc::scoped_refptr frame_encryptor_; + rtc::scoped_refptr frame_encryptor_ + RTC_GUARDED_BY(encoder_queue_); // E2EE Frame Encryption Options const webrtc::CryptoOptions crypto_options_; rtc::CriticalSection bitrate_crit_section_; int configured_bitrate_bps_ RTC_GUARDED_BY(bitrate_crit_section_) = 0; + + // Defined last to ensure that there are no running tasks when the other + // members are destroyed. + rtc::TaskQueue encoder_queue_; }; const int kTelephoneEventAttenuationdB = 10; @@ -473,32 +476,13 @@ class VoERtcpObserver : public RtcpBandwidthObserver { RtcpBandwidthObserver* bandwidth_observer_ RTC_GUARDED_BY(crit_); }; -class ChannelSend::ProcessAndEncodeAudioTask : public rtc::QueuedTask { - public: - ProcessAndEncodeAudioTask(std::unique_ptr audio_frame, - ChannelSend* channel) - : audio_frame_(std::move(audio_frame)), channel_(channel) { - RTC_DCHECK(channel_); - } - - private: - bool Run() override { - RTC_DCHECK_RUN_ON(channel_->encoder_queue_); - channel_->ProcessAndEncodeAudioOnTaskQueue(audio_frame_.get()); - return true; - } - - std::unique_ptr audio_frame_; - ChannelSend* const channel_; -}; - int32_t ChannelSend::SendData(AudioFrameType frameType, uint8_t payloadType, uint32_t timeStamp, const uint8_t* payloadData, size_t payloadSize, const RTPFragmentationHeader* fragmentation) { - RTC_DCHECK_RUN_ON(encoder_queue_); + RTC_DCHECK_RUN_ON(&encoder_queue_); rtc::ArrayView payload(payloadData, payloadSize); if (media_transport() != nullptr) { @@ -521,7 +505,6 @@ int32_t ChannelSend::SendRtpAudio(AudioFrameType frameType, uint32_t timeStamp, rtc::ArrayView payload, const RTPFragmentationHeader* fragmentation) { - RTC_DCHECK_RUN_ON(encoder_queue_); if (_includeAudioLevelIndication) { // Store current audio level in the RTP sender. // The level will be used in combination with voice-activity state @@ -594,7 +577,6 @@ int32_t ChannelSend::SendMediaTransportAudio( uint32_t timeStamp, rtc::ArrayView payload, const RTPFragmentationHeader* fragmentation) { - RTC_DCHECK_RUN_ON(encoder_queue_); // TODO(nisse): Use null _transportPtr for MediaTransport. // RTC_DCHECK(_transportPtr == nullptr); uint64_t channel_id; @@ -645,7 +627,7 @@ int32_t ChannelSend::SendMediaTransportAudio( } ChannelSend::ChannelSend(Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, @@ -671,12 +653,13 @@ ChannelSend::ChannelSend(Clock* clock, new RateLimiter(clock, kMaxRetransmissionWindowMs)), use_twcc_plr_for_ana_( webrtc::field_trial::FindFullName("UseTwccPlrForAna") == "Enabled"), - encoder_queue_(encoder_queue), media_transport_(media_transport), frame_encryptor_(frame_encryptor), - crypto_options_(crypto_options) { + crypto_options_(crypto_options), + encoder_queue_(task_queue_factory->CreateTaskQueue( + "AudioEncoder", + TaskQueueFactory::Priority::NORMAL)) { RTC_DCHECK(module_process_thread); - RTC_DCHECK(encoder_queue); module_process_thread_checker_.DetachFromThread(); audio_coding_.reset(AudioCodingModule::Create(AudioCodingModule::Config())); @@ -763,11 +746,11 @@ void ChannelSend::StartSend() { _rtpRtcpModule->SetSendingMediaStatus(true); int ret = _rtpRtcpModule->SetSendingStatus(true); RTC_DCHECK_EQ(0, ret); - { - // It is now OK to start posting tasks to the encoder task queue. - rtc::CritScope cs(&encoder_queue_lock_); + // It is now OK to start processing on the encoder task queue. + encoder_queue_.PostTask([this] { + RTC_DCHECK_RUN_ON(&encoder_queue_); encoder_queue_is_active_ = true; - } + }); } void ChannelSend::StopSend() { @@ -777,22 +760,12 @@ void ChannelSend::StopSend() { } sending_ = false; - // Post a task to the encoder thread which sets an event when the task is - // executed. We know that no more encoding tasks will be added to the task - // queue for this channel since sending is now deactivated. It means that, - // if we wait for the event to bet set, we know that no more pending tasks - // exists and it is therfore guaranteed that the task queue will never try - // to acccess and invalid channel object. - RTC_DCHECK(encoder_queue_); - rtc::Event flush; - { - // Clear |encoder_queue_is_active_| under lock to prevent any other tasks - // than this final "flush task" to be posted on the queue. - rtc::CritScope cs(&encoder_queue_lock_); + encoder_queue_.PostTask([this, &flush]() { + RTC_DCHECK_RUN_ON(&encoder_queue_); encoder_queue_is_active_ = false; - encoder_queue_->PostTask([&flush]() { flush.Set(); }); - } + flush.Set(); + }); flush.Wait(rtc::Event::kForever); // Reset sending SSRC and sequence number and triggers direct transmission @@ -1115,20 +1088,24 @@ CallSendStatistics ChannelSend::GetRTCPStatistics() const { void ChannelSend::ProcessAndEncodeAudio( std::unique_ptr audio_frame) { RTC_DCHECK_RUNS_SERIALIZED(&audio_thread_race_checker_); - // Avoid posting any new tasks if sending was already stopped in StopSend(). - rtc::CritScope cs(&encoder_queue_lock_); - if (!encoder_queue_is_active_) { - return; - } + struct ProcessAndEncodeAudio { + void operator()() { + RTC_DCHECK_RUN_ON(&channel->encoder_queue_); + if (!channel->encoder_queue_is_active_) { + return; + } + channel->ProcessAndEncodeAudioOnTaskQueue(audio_frame.get()); + } + std::unique_ptr audio_frame; + ChannelSend* const channel; + }; // Profile time between when the audio frame is added to the task queue and // when the task is actually executed. audio_frame->UpdateProfileTimeStamp(); - encoder_queue_->PostTask(std::unique_ptr( - new ProcessAndEncodeAudioTask(std::move(audio_frame), this))); + encoder_queue_.PostTask(ProcessAndEncodeAudio{std::move(audio_frame), this}); } void ChannelSend::ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input) { - RTC_DCHECK_RUN_ON(encoder_queue_); RTC_DCHECK_GT(audio_input->samples_per_channel_, 0); RTC_DCHECK_LE(audio_input->num_channels_, 2); @@ -1233,14 +1210,10 @@ int64_t ChannelSend::GetRTT() const { void ChannelSend::SetFrameEncryptor( rtc::scoped_refptr frame_encryptor) { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - rtc::CritScope cs(&encoder_queue_lock_); - if (encoder_queue_is_active_) { - encoder_queue_->PostTask([this, frame_encryptor]() mutable { - this->frame_encryptor_ = std::move(frame_encryptor); - }); - } else { + encoder_queue_.PostTask([this, frame_encryptor]() mutable { + RTC_DCHECK_RUN_ON(&encoder_queue_); frame_encryptor_ = std::move(frame_encryptor); - } + }); } // TODO(sukhanov): Consider moving TargetTransferRate observer to @@ -1261,7 +1234,7 @@ void ChannelSend::OnReceivedRtt(int64_t rtt_ms) { std::unique_ptr CreateChannelSend( Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, @@ -1273,7 +1246,7 @@ std::unique_ptr CreateChannelSend( bool extmap_allow_mixed, int rtcp_report_interval_ms) { return absl::make_unique( - clock, encoder_queue, module_process_thread, media_transport, + clock, task_queue_factory, module_process_thread, media_transport, overhead_observer, rtp_transport, rtcp_rtt_stats, rtc_event_log, frame_encryptor, crypto_options, extmap_allow_mixed, rtcp_report_interval_ms); diff --git a/audio/channel_send.h b/audio/channel_send.h index 0957035451..f6acd88f29 100644 --- a/audio/channel_send.h +++ b/audio/channel_send.h @@ -19,10 +19,10 @@ #include "api/audio_codecs/audio_encoder.h" #include "api/crypto/crypto_options.h" #include "api/media_transport_interface.h" +#include "api/task_queue/task_queue_factory.h" #include "modules/rtp_rtcp/include/rtp_rtcp.h" #include "modules/rtp_rtcp/source/rtp_sender_audio.h" #include "rtc_base/function_view.h" -#include "rtc_base/task_queue.h" namespace webrtc { @@ -119,7 +119,7 @@ class ChannelSendInterface { std::unique_ptr CreateChannelSend( Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, diff --git a/audio/test/media_transport_test.cc b/audio/test/media_transport_test.cc index 71a3fb8fc3..b9a084db14 100644 --- a/audio/test/media_transport_test.cc +++ b/audio/test/media_transport_test.cc @@ -131,8 +131,8 @@ TEST(AudioWithMediaTransport, DeliversAudio) { &GlobalTaskQueueFactory()); webrtc::internal::AudioSendStream send_stream( Clock::GetRealTimeClock(), send_config, audio_state, - send_process_thread.get(), &rtp_transport, &bitrate_allocator, - null_event_log.get(), + &GlobalTaskQueueFactory(), send_process_thread.get(), &rtp_transport, + &bitrate_allocator, null_event_log.get(), /*rtcp_rtt_stats=*/nullptr, absl::optional()); audio_device->Init(); // Starts thread. diff --git a/call/call.cc b/call/call.cc index 3000244807..a693f7f8c4 100644 --- a/call/call.cc +++ b/call/call.cc @@ -666,10 +666,11 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } } - AudioSendStream* send_stream = new AudioSendStream( - clock_, config, config_.audio_state, module_process_thread_.get(), - transport_send_ptr_, bitrate_allocator_.get(), event_log_, - call_stats_.get(), suspended_rtp_state); + AudioSendStream* send_stream = + new AudioSendStream(clock_, config, config_.audio_state, + task_queue_factory_, module_process_thread_.get(), + transport_send_ptr_, bitrate_allocator_.get(), + event_log_, call_stats_.get(), suspended_rtp_state); { WriteLockScoped write_lock(*send_crit_); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==