diff --git a/modules/audio_processing/aec3/BUILD.gn b/modules/audio_processing/aec3/BUILD.gn index 189bcfd712..b34ed226db 100644 --- a/modules/audio_processing/aec3/BUILD.gn +++ b/modules/audio_processing/aec3/BUILD.gn @@ -75,6 +75,7 @@ rtc_static_library("aec3") { "matched_filter_lag_aggregator.h", "matrix_buffer.cc", "matrix_buffer.h", + "message_queue.h", "moving_average.cc", "moving_average.h", "render_buffer.cc", diff --git a/modules/audio_processing/aec3/echo_canceller3.cc b/modules/audio_processing/aec3/echo_canceller3.cc index e3846058f2..fc802b1f96 100644 --- a/modules/audio_processing/aec3/echo_canceller3.cc +++ b/modules/audio_processing/aec3/echo_canceller3.cc @@ -276,13 +276,13 @@ const int kNumberOfHighPassBiQuads_16kHz = 1; class EchoCanceller3::RenderWriter { public: - RenderWriter(ApmDataDumper* data_dumper, - SwapQueue>, - Aec3RenderQueueItemVerifier>* render_transfer_queue, - std::unique_ptr render_highpass_filter, - int sample_rate_hz, - int frame_length, - int num_bands); + RenderWriter( + ApmDataDumper* data_dumper, + MessageQueue>>* render_transfer_queue, + std::unique_ptr render_highpass_filter, + int sample_rate_hz, + int frame_length, + int num_bands); ~RenderWriter(); void Insert(AudioBuffer* input); @@ -293,15 +293,13 @@ class EchoCanceller3::RenderWriter { const int num_bands_; std::unique_ptr render_highpass_filter_; std::vector> render_queue_input_frame_; - SwapQueue>, Aec3RenderQueueItemVerifier>* - render_transfer_queue_; + MessageQueue>>* render_transfer_queue_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RenderWriter); }; EchoCanceller3::RenderWriter::RenderWriter( ApmDataDumper* data_dumper, - SwapQueue>, Aec3RenderQueueItemVerifier>* - render_transfer_queue, + MessageQueue>>* render_transfer_queue, std::unique_ptr render_highpass_filter, int sample_rate_hz, int frame_length, @@ -370,12 +368,10 @@ EchoCanceller3::EchoCanceller3(const EchoCanceller3Config& config, output_framer_(num_bands_), capture_blocker_(num_bands_), render_blocker_(num_bands_), - render_transfer_queue_( - kRenderTransferQueueSizeFrames, - std::vector>( - num_bands_, - std::vector(frame_length_, 0.f)), - Aec3RenderQueueItemVerifier(num_bands_, frame_length_)), + render_transfer_queue_(kRenderTransferQueueSizeFrames, + std::vector>( + num_bands_, + std::vector(frame_length_, 0.f))), block_processor_(std::move(block_processor)), render_queue_output_frame_(num_bands_, std::vector(frame_length_, 0.f)), diff --git a/modules/audio_processing/aec3/echo_canceller3.h b/modules/audio_processing/aec3/echo_canceller3.h index c1298d207e..cb3b382628 100644 --- a/modules/audio_processing/aec3/echo_canceller3.h +++ b/modules/audio_processing/aec3/echo_canceller3.h @@ -24,12 +24,12 @@ #include "modules/audio_processing/aec3/block_processor.h" #include "modules/audio_processing/aec3/cascaded_biquad_filter.h" #include "modules/audio_processing/aec3/frame_blocker.h" +#include "modules/audio_processing/aec3/message_queue.h" #include "modules/audio_processing/audio_buffer.h" #include "modules/audio_processing/logging/apm_data_dumper.h" #include "rtc_base/checks.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/race_checker.h" -#include "rtc_base/swap_queue.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -107,7 +107,7 @@ class EchoCanceller3 : public EchoControl { private: class RenderWriter; - // Empties the render SwapQueue. + // Empties the render MessageQueue. void EmptyRenderQueue(); rtc::RaceChecker capture_race_checker_; @@ -127,8 +127,7 @@ class EchoCanceller3 : public EchoControl { BlockFramer output_framer_ RTC_GUARDED_BY(capture_race_checker_); FrameBlocker capture_blocker_ RTC_GUARDED_BY(capture_race_checker_); FrameBlocker render_blocker_ RTC_GUARDED_BY(capture_race_checker_); - SwapQueue>, Aec3RenderQueueItemVerifier> - render_transfer_queue_; + MessageQueue>> render_transfer_queue_; std::unique_ptr block_processor_ RTC_GUARDED_BY(capture_race_checker_); std::vector> render_queue_output_frame_ diff --git a/modules/audio_processing/aec3/echo_canceller3_unittest.cc b/modules/audio_processing/aec3/echo_canceller3_unittest.cc index 3f1e059a0c..ab2ae04809 100644 --- a/modules/audio_processing/aec3/echo_canceller3_unittest.cc +++ b/modules/audio_processing/aec3/echo_canceller3_unittest.cc @@ -461,7 +461,7 @@ class EchoCanceller3Tester { // This test verifies that the swapqueue is able to handle jitter in the // capture and render API calls. - void RunRenderSwapQueueVerificationTest() { + void RunRenderMessageQueueVerificationTest() { const EchoCanceller3Config config; EchoCanceller3 aec3( config, sample_rate_hz_, false, @@ -502,7 +502,7 @@ class EchoCanceller3Tester { // This test verifies that a buffer overrun in the render swapqueue is // properly reported. - void RunRenderPipelineSwapQueueOverrunReturnValueTest() { + void RunRenderPipelineMessageQueueOverrunReturnValueTest() { EchoCanceller3 aec3(EchoCanceller3Config(), sample_rate_hz_, false); constexpr size_t kRenderTransferQueueSize = 30; @@ -631,18 +631,18 @@ TEST(EchoCanceller3Buffering, RenderBitexactness) { } } -TEST(EchoCanceller3Buffering, RenderSwapQueue) { +TEST(EchoCanceller3Buffering, RenderMessageQueue) { for (auto rate : {8000, 16000}) { SCOPED_TRACE(ProduceDebugText(rate)); - EchoCanceller3Tester(rate).RunRenderSwapQueueVerificationTest(); + EchoCanceller3Tester(rate).RunRenderMessageQueueVerificationTest(); } } -TEST(EchoCanceller3Buffering, RenderSwapQueueOverrunReturnValue) { +TEST(EchoCanceller3Buffering, RenderMessageQueueOverrunReturnValue) { for (auto rate : {8000, 16000, 32000, 48000}) { SCOPED_TRACE(ProduceDebugText(rate)); EchoCanceller3Tester(rate) - .RunRenderPipelineSwapQueueOverrunReturnValueTest(); + .RunRenderPipelineMessageQueueOverrunReturnValueTest(); } } diff --git a/modules/audio_processing/aec3/message_queue.h b/modules/audio_processing/aec3/message_queue.h new file mode 100644 index 0000000000..ad07d52c81 --- /dev/null +++ b/modules/audio_processing/aec3/message_queue.h @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_ +#define MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_ + +#include +#include +#include + +#include "rtc_base/checks.h" +#include "rtc_base/thread_checker.h" + +namespace webrtc { + +// Fixed-size circular queue similar to SwapQueue, but lock-free and no +// QueueItemVerifierFunction. +// The queue is designed for single-producer-single-consumer (accessed by one +// producer thread, calling Insert(), and one consumer thread, calling Remove(). +template +class MessageQueue { + public: + explicit MessageQueue(size_t size) : num_elements_(0), queue_(size) { + producer_thread_checker_.DetachFromThread(); + consumer_thread_checker_.DetachFromThread(); + } + MessageQueue(size_t size, const T& prototype) + : num_elements_(0), queue_(size, prototype) { + producer_thread_checker_.DetachFromThread(); + consumer_thread_checker_.DetachFromThread(); + } + ~MessageQueue() = default; + + // Inserts a T at the back of the queue by swapping *input with a T from the + // queue. This function should not be called concurrently. It can however be + // called concurrently with Remove(). Returns true if the item was inserted or + // false if not (the queue was full). + bool Insert(T* input) { + RTC_DCHECK_RUN_ON(&producer_thread_checker_); + RTC_DCHECK(input); + + if (num_elements_ == queue_.size()) { + return false; + } + + std::swap(*input, queue_[next_write_index_]); + + ++next_write_index_; + if (next_write_index_ == queue_.size()) { + next_write_index_ = 0; + } + + ++num_elements_; + + RTC_DCHECK_LT(next_write_index_, queue_.size()); + + return true; + } + + // Removes the frontmost T from the queue by swapping it with the T in + // *output. This function should not be called concurrently. It can however be + // called concurrently with Insert(). Returns true if an item could be removed + // or false if not (the queue was empty). + bool Remove(T* output) { + RTC_DCHECK_RUN_ON(&consumer_thread_checker_); + RTC_DCHECK(output); + + if (num_elements_ == 0) { + return false; + } + + std::swap(*output, queue_[next_read_index_]); + + ++next_read_index_; + if (next_read_index_ == queue_.size()) { + next_read_index_ = 0; + } + + --num_elements_; + + RTC_DCHECK_LT(next_read_index_, queue_.size()); + + return true; + } + + private: + uint32_t next_write_index_ = 0; + uint32_t next_read_index_ = 0; + rtc::ThreadChecker producer_thread_checker_; + rtc::ThreadChecker consumer_thread_checker_; + std::atomic num_elements_; + std::vector queue_; +}; +} // namespace webrtc + +#endif // MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_