From 44dd9f29c791bc0bb089976ea5efa46e5425140c Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Fri, 8 Mar 2019 14:50:30 +0100 Subject: [PATCH] Adds ChannelSend specific encoder task queue. Before this change the encoder tasks runs on a shared worker queue. That makes the destruction require synchronization to avoid races. By keeping a separate encode queue to ChannelSend, we can safely destruct the object without worrying for left over tasks, as they will be stopped when the task queue is destroyed. For TaskQueue implementations using one thread per TaskQueue this will increase the thread count by the number of AudioSendStreams, which typically is just one. This is partly a reland of 9b9344742b186b14d87e827e71a1757f4c94b30e Original change's description: > Removes lock from ChannelSend. > > The lock isn't really needed as encoder_queue_is_active_ can be checked > on the task queue to provide synchronization. > > There is one behavioral change due to this: We will not cancel any currently > pending encoding tasks when we stop sending, they will be allowed to finish. > > Bug: webrtc:10365 > Change-Id: I2b4897dde8d49bc7ee5d2d69694616aee8aaea38 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125096 > Reviewed-by: Oskar Sundbom > Commit-Queue: Sebastian Jansson > Cr-Commit-Position: refs/heads/master@{#26963} Bug: webrtc:10365 Change-Id: Iafb84e25d90ec8639359be80fad65763d08e5719 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125740 Reviewed-by: Oskar Sundbom Commit-Queue: Sebastian Jansson Cr-Commit-Position: refs/heads/master@{#27038} --- audio/audio_send_stream.cc | 5 +- audio/audio_send_stream.h | 3 + audio/audio_send_stream_unittest.cc | 4 +- audio/channel_send.cc | 117 +++++++++++----------------- audio/channel_send.h | 4 +- audio/test/media_transport_test.cc | 4 +- call/call.cc | 9 ++- 7 files changed, 63 insertions(+), 83 deletions(-) diff --git a/audio/audio_send_stream.cc b/audio/audio_send_stream.cc index 68819a7f18..6d16a48bbd 100644 --- a/audio/audio_send_stream.cc +++ b/audio/audio_send_stream.cc @@ -85,6 +85,7 @@ AudioSendStream::AudioSendStream( Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, @@ -94,13 +95,14 @@ AudioSendStream::AudioSendStream( : AudioSendStream(clock, config, audio_state, + task_queue_factory, rtp_transport, bitrate_allocator, event_log, rtcp_rtt_stats, suspended_rtp_state, voe::CreateChannelSend(clock, - rtp_transport->GetWorkerQueue(), + task_queue_factory, module_process_thread, config.media_transport, /*overhead_observer=*/this, @@ -116,6 +118,7 @@ AudioSendStream::AudioSendStream( Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, RtcEventLog* event_log, diff --git a/audio/audio_send_stream.h b/audio/audio_send_stream.h index 0d35562d90..8bee3524d5 100644 --- a/audio/audio_send_stream.h +++ b/audio/audio_send_stream.h @@ -23,6 +23,7 @@ #include "rtc_base/constructor_magic.h" #include "rtc_base/experiments/audio_allocation_settings.h" #include "rtc_base/race_checker.h" +#include "rtc_base/task_queue.h" #include "rtc_base/thread_checker.h" namespace webrtc { @@ -42,6 +43,7 @@ class AudioSendStream final : public webrtc::AudioSendStream, AudioSendStream(Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, @@ -52,6 +54,7 @@ class AudioSendStream final : public webrtc::AudioSendStream, AudioSendStream(Clock* clock, const webrtc::AudioSendStream::Config& config, const rtc::scoped_refptr& audio_state, + TaskQueueFactory* task_queue_factory, RtpTransportControllerSendInterface* rtp_transport, BitrateAllocatorInterface* bitrate_allocator, RtcEventLog* event_log, diff --git a/audio/audio_send_stream_unittest.cc b/audio/audio_send_stream_unittest.cc index 80d5ad4d2e..1d946c5ed8 100644 --- a/audio/audio_send_stream_unittest.cc +++ b/audio/audio_send_stream_unittest.cc @@ -167,8 +167,8 @@ struct ConfigHelper { return std::unique_ptr( new internal::AudioSendStream( Clock::GetRealTimeClock(), stream_config_, audio_state_, - &rtp_transport_, &bitrate_allocator_, &event_log_, &rtcp_rtt_stats_, - absl::nullopt, + &GlobalTaskQueueFactory(), &rtp_transport_, &bitrate_allocator_, + &event_log_, &rtcp_rtt_stats_, absl::nullopt, std::unique_ptr(channel_send_))); } diff --git a/audio/channel_send.cc b/audio/channel_send.cc index 19edf3c088..9f95ebb90b 100644 --- a/audio/channel_send.cc +++ b/audio/channel_send.cc @@ -31,7 +31,6 @@ #include "modules/pacing/packet_router.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/checks.h" -#include "rtc_base/critical_section.h" #include "rtc_base/event.h" #include "rtc_base/format_macros.h" #include "rtc_base/location.h" @@ -88,7 +87,7 @@ class ChannelSend friend class VoERtcpObserver; ChannelSend(Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, @@ -181,8 +180,6 @@ class ChannelSend rtc::scoped_refptr frame_encryptor) override; private: - class ProcessAndEncodeAudioTask; - // From AudioPacketizationCallback in the ACM int32_t SendData(AudioFrameType frameType, uint8_t payloadType, @@ -200,20 +197,23 @@ class ChannelSend uint8_t payloadType, uint32_t timeStamp, rtc::ArrayView payload, - const RTPFragmentationHeader* fragmentation); + const RTPFragmentationHeader* fragmentation) + RTC_RUN_ON(encoder_queue_); int32_t SendMediaTransportAudio(AudioFrameType frameType, uint8_t payloadType, uint32_t timeStamp, rtc::ArrayView payload, - const RTPFragmentationHeader* fragmentation); + const RTPFragmentationHeader* fragmentation) + RTC_RUN_ON(encoder_queue_); // Return media transport or nullptr if using RTP. MediaTransportInterface* media_transport() { return media_transport_; } // Called on the encoder task queue when a new input audio frame is ready // for encoding. - void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input); + void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input) + RTC_RUN_ON(encoder_queue_); void OnReceivedRtt(int64_t rtt_ms); @@ -267,9 +267,7 @@ class ChannelSend const bool use_twcc_plr_for_ana_; - rtc::CriticalSection encoder_queue_lock_; - bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_lock_) = false; - rtc::TaskQueue* const encoder_queue_ = nullptr; + bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_) = false; MediaTransportInterface* const media_transport_; int media_transport_sequence_number_ RTC_GUARDED_BY(encoder_queue_) = 0; @@ -286,12 +284,17 @@ class ChannelSend RTC_GUARDED_BY(&media_transport_lock_); // E2EE Audio Frame Encryption - rtc::scoped_refptr frame_encryptor_; + rtc::scoped_refptr frame_encryptor_ + RTC_GUARDED_BY(encoder_queue_); // E2EE Frame Encryption Options const webrtc::CryptoOptions crypto_options_; rtc::CriticalSection bitrate_crit_section_; int configured_bitrate_bps_ RTC_GUARDED_BY(bitrate_crit_section_) = 0; + + // Defined last to ensure that there are no running tasks when the other + // members are destroyed. + rtc::TaskQueue encoder_queue_; }; const int kTelephoneEventAttenuationdB = 10; @@ -473,32 +476,13 @@ class VoERtcpObserver : public RtcpBandwidthObserver { RtcpBandwidthObserver* bandwidth_observer_ RTC_GUARDED_BY(crit_); }; -class ChannelSend::ProcessAndEncodeAudioTask : public rtc::QueuedTask { - public: - ProcessAndEncodeAudioTask(std::unique_ptr audio_frame, - ChannelSend* channel) - : audio_frame_(std::move(audio_frame)), channel_(channel) { - RTC_DCHECK(channel_); - } - - private: - bool Run() override { - RTC_DCHECK_RUN_ON(channel_->encoder_queue_); - channel_->ProcessAndEncodeAudioOnTaskQueue(audio_frame_.get()); - return true; - } - - std::unique_ptr audio_frame_; - ChannelSend* const channel_; -}; - int32_t ChannelSend::SendData(AudioFrameType frameType, uint8_t payloadType, uint32_t timeStamp, const uint8_t* payloadData, size_t payloadSize, const RTPFragmentationHeader* fragmentation) { - RTC_DCHECK_RUN_ON(encoder_queue_); + RTC_DCHECK_RUN_ON(&encoder_queue_); rtc::ArrayView payload(payloadData, payloadSize); if (media_transport() != nullptr) { @@ -521,7 +505,6 @@ int32_t ChannelSend::SendRtpAudio(AudioFrameType frameType, uint32_t timeStamp, rtc::ArrayView payload, const RTPFragmentationHeader* fragmentation) { - RTC_DCHECK_RUN_ON(encoder_queue_); if (_includeAudioLevelIndication) { // Store current audio level in the RTP sender. // The level will be used in combination with voice-activity state @@ -594,7 +577,6 @@ int32_t ChannelSend::SendMediaTransportAudio( uint32_t timeStamp, rtc::ArrayView payload, const RTPFragmentationHeader* fragmentation) { - RTC_DCHECK_RUN_ON(encoder_queue_); // TODO(nisse): Use null _transportPtr for MediaTransport. // RTC_DCHECK(_transportPtr == nullptr); uint64_t channel_id; @@ -645,7 +627,7 @@ int32_t ChannelSend::SendMediaTransportAudio( } ChannelSend::ChannelSend(Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, @@ -671,12 +653,13 @@ ChannelSend::ChannelSend(Clock* clock, new RateLimiter(clock, kMaxRetransmissionWindowMs)), use_twcc_plr_for_ana_( webrtc::field_trial::FindFullName("UseTwccPlrForAna") == "Enabled"), - encoder_queue_(encoder_queue), media_transport_(media_transport), frame_encryptor_(frame_encryptor), - crypto_options_(crypto_options) { + crypto_options_(crypto_options), + encoder_queue_(task_queue_factory->CreateTaskQueue( + "AudioEncoder", + TaskQueueFactory::Priority::NORMAL)) { RTC_DCHECK(module_process_thread); - RTC_DCHECK(encoder_queue); module_process_thread_checker_.DetachFromThread(); audio_coding_.reset(AudioCodingModule::Create(AudioCodingModule::Config())); @@ -763,11 +746,11 @@ void ChannelSend::StartSend() { _rtpRtcpModule->SetSendingMediaStatus(true); int ret = _rtpRtcpModule->SetSendingStatus(true); RTC_DCHECK_EQ(0, ret); - { - // It is now OK to start posting tasks to the encoder task queue. - rtc::CritScope cs(&encoder_queue_lock_); + // It is now OK to start processing on the encoder task queue. + encoder_queue_.PostTask([this] { + RTC_DCHECK_RUN_ON(&encoder_queue_); encoder_queue_is_active_ = true; - } + }); } void ChannelSend::StopSend() { @@ -777,22 +760,12 @@ void ChannelSend::StopSend() { } sending_ = false; - // Post a task to the encoder thread which sets an event when the task is - // executed. We know that no more encoding tasks will be added to the task - // queue for this channel since sending is now deactivated. It means that, - // if we wait for the event to bet set, we know that no more pending tasks - // exists and it is therfore guaranteed that the task queue will never try - // to acccess and invalid channel object. - RTC_DCHECK(encoder_queue_); - rtc::Event flush; - { - // Clear |encoder_queue_is_active_| under lock to prevent any other tasks - // than this final "flush task" to be posted on the queue. - rtc::CritScope cs(&encoder_queue_lock_); + encoder_queue_.PostTask([this, &flush]() { + RTC_DCHECK_RUN_ON(&encoder_queue_); encoder_queue_is_active_ = false; - encoder_queue_->PostTask([&flush]() { flush.Set(); }); - } + flush.Set(); + }); flush.Wait(rtc::Event::kForever); // Reset sending SSRC and sequence number and triggers direct transmission @@ -1115,20 +1088,24 @@ CallSendStatistics ChannelSend::GetRTCPStatistics() const { void ChannelSend::ProcessAndEncodeAudio( std::unique_ptr audio_frame) { RTC_DCHECK_RUNS_SERIALIZED(&audio_thread_race_checker_); - // Avoid posting any new tasks if sending was already stopped in StopSend(). - rtc::CritScope cs(&encoder_queue_lock_); - if (!encoder_queue_is_active_) { - return; - } + struct ProcessAndEncodeAudio { + void operator()() { + RTC_DCHECK_RUN_ON(&channel->encoder_queue_); + if (!channel->encoder_queue_is_active_) { + return; + } + channel->ProcessAndEncodeAudioOnTaskQueue(audio_frame.get()); + } + std::unique_ptr audio_frame; + ChannelSend* const channel; + }; // Profile time between when the audio frame is added to the task queue and // when the task is actually executed. audio_frame->UpdateProfileTimeStamp(); - encoder_queue_->PostTask(std::unique_ptr( - new ProcessAndEncodeAudioTask(std::move(audio_frame), this))); + encoder_queue_.PostTask(ProcessAndEncodeAudio{std::move(audio_frame), this}); } void ChannelSend::ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input) { - RTC_DCHECK_RUN_ON(encoder_queue_); RTC_DCHECK_GT(audio_input->samples_per_channel_, 0); RTC_DCHECK_LE(audio_input->num_channels_, 2); @@ -1233,14 +1210,10 @@ int64_t ChannelSend::GetRTT() const { void ChannelSend::SetFrameEncryptor( rtc::scoped_refptr frame_encryptor) { RTC_DCHECK_RUN_ON(&worker_thread_checker_); - rtc::CritScope cs(&encoder_queue_lock_); - if (encoder_queue_is_active_) { - encoder_queue_->PostTask([this, frame_encryptor]() mutable { - this->frame_encryptor_ = std::move(frame_encryptor); - }); - } else { + encoder_queue_.PostTask([this, frame_encryptor]() mutable { + RTC_DCHECK_RUN_ON(&encoder_queue_); frame_encryptor_ = std::move(frame_encryptor); - } + }); } // TODO(sukhanov): Consider moving TargetTransferRate observer to @@ -1261,7 +1234,7 @@ void ChannelSend::OnReceivedRtt(int64_t rtt_ms) { std::unique_ptr CreateChannelSend( Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, @@ -1273,7 +1246,7 @@ std::unique_ptr CreateChannelSend( bool extmap_allow_mixed, int rtcp_report_interval_ms) { return absl::make_unique( - clock, encoder_queue, module_process_thread, media_transport, + clock, task_queue_factory, module_process_thread, media_transport, overhead_observer, rtp_transport, rtcp_rtt_stats, rtc_event_log, frame_encryptor, crypto_options, extmap_allow_mixed, rtcp_report_interval_ms); diff --git a/audio/channel_send.h b/audio/channel_send.h index 0957035451..f6acd88f29 100644 --- a/audio/channel_send.h +++ b/audio/channel_send.h @@ -19,10 +19,10 @@ #include "api/audio_codecs/audio_encoder.h" #include "api/crypto/crypto_options.h" #include "api/media_transport_interface.h" +#include "api/task_queue/task_queue_factory.h" #include "modules/rtp_rtcp/include/rtp_rtcp.h" #include "modules/rtp_rtcp/source/rtp_sender_audio.h" #include "rtc_base/function_view.h" -#include "rtc_base/task_queue.h" namespace webrtc { @@ -119,7 +119,7 @@ class ChannelSendInterface { std::unique_ptr CreateChannelSend( Clock* clock, - rtc::TaskQueue* encoder_queue, + TaskQueueFactory* task_queue_factory, ProcessThread* module_process_thread, MediaTransportInterface* media_transport, OverheadObserver* overhead_observer, diff --git a/audio/test/media_transport_test.cc b/audio/test/media_transport_test.cc index 71a3fb8fc3..b9a084db14 100644 --- a/audio/test/media_transport_test.cc +++ b/audio/test/media_transport_test.cc @@ -131,8 +131,8 @@ TEST(AudioWithMediaTransport, DeliversAudio) { &GlobalTaskQueueFactory()); webrtc::internal::AudioSendStream send_stream( Clock::GetRealTimeClock(), send_config, audio_state, - send_process_thread.get(), &rtp_transport, &bitrate_allocator, - null_event_log.get(), + &GlobalTaskQueueFactory(), send_process_thread.get(), &rtp_transport, + &bitrate_allocator, null_event_log.get(), /*rtcp_rtt_stats=*/nullptr, absl::optional()); audio_device->Init(); // Starts thread. diff --git a/call/call.cc b/call/call.cc index 3000244807..a693f7f8c4 100644 --- a/call/call.cc +++ b/call/call.cc @@ -666,10 +666,11 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } } - AudioSendStream* send_stream = new AudioSendStream( - clock_, config, config_.audio_state, module_process_thread_.get(), - transport_send_ptr_, bitrate_allocator_.get(), event_log_, - call_stats_.get(), suspended_rtp_state); + AudioSendStream* send_stream = + new AudioSendStream(clock_, config, config_.audio_state, + task_queue_factory_, module_process_thread_.get(), + transport_send_ptr_, bitrate_allocator_.get(), + event_log_, call_stats_.get(), suspended_rtp_state); { WriteLockScoped write_lock(*send_crit_); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==