Refactor AecDump not to rely on QueuedTask

Bug: webrtc:14245
Change-Id: Ib41765652745a247da2ae6c2ca6be714de927ca7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268185
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Sam Zackrisson <saza@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37542}
This commit is contained in:
Danil Chapovalov 2022-07-18 12:23:08 +02:00 committed by WebRTC LUCI CQ
parent e740b34c06
commit 3e378d7efa
8 changed files with 63 additions and 194 deletions

View File

@ -57,8 +57,6 @@ if (rtc_enable_protobuf) {
"aec_dump_impl.h",
"capture_stream_info.cc",
"capture_stream_info.h",
"write_to_file_task.cc",
"write_to_file_task.h",
]
deps = [

View File

@ -27,7 +27,7 @@ namespace webrtc {
class RTC_EXPORT AecDumpFactory {
public:
// The `worker_queue` may not be null and must outlive the created
// AecDump instance. |max_log_size_bytes == -1| means the log size
// AecDump instance. `max_log_size_bytes == -1` means the log size
// will be unlimited. `handle` may not be null. The AecDump takes
// responsibility for `handle` and closes it in the destructor. A
// non-null return value indicates that the file has been

View File

@ -16,6 +16,7 @@
#include "modules/audio_processing/aec_dump/aec_dump_factory.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/task_queue.h"
namespace webrtc {
@ -60,8 +61,7 @@ AecDumpImpl::AecDumpImpl(FileWrapper debug_file,
rtc::TaskQueue* worker_queue)
: debug_file_(std::move(debug_file)),
num_bytes_left_for_log_(max_log_size_bytes),
worker_queue_(worker_queue),
capture_stream_info_(CreateWriteToFileTask()) {}
worker_queue_(worker_queue) {}
AecDumpImpl::~AecDumpImpl() {
// Block until all tasks have finished running.
@ -74,8 +74,7 @@ AecDumpImpl::~AecDumpImpl() {
void AecDumpImpl::WriteInitMessage(const ProcessingConfig& api_format,
int64_t time_now_ms) {
auto task = CreateWriteToFileTask();
auto* event = task->GetEvent();
auto event = std::make_unique<audioproc::Event>();
event->set_type(audioproc::Event::INIT);
audioproc::Init* msg = event->mutable_init();
@ -96,7 +95,7 @@ void AecDumpImpl::WriteInitMessage(const ProcessingConfig& api_format,
api_format.reverse_output_stream().num_channels());
msg->set_timestamp_ms(time_now_ms);
worker_queue_->PostTask(std::move(task));
PostWriteToFileTask(std::move(event));
}
void AecDumpImpl::AddCaptureStreamInput(
@ -126,31 +125,24 @@ void AecDumpImpl::AddAudioProcessingState(const AudioProcessingState& state) {
}
void AecDumpImpl::WriteCaptureStreamMessage() {
auto task = capture_stream_info_.GetTask();
RTC_DCHECK(task);
worker_queue_->PostTask(std::move(task));
capture_stream_info_.SetTask(CreateWriteToFileTask());
PostWriteToFileTask(capture_stream_info_.FetchEvent());
}
void AecDumpImpl::WriteRenderStreamMessage(const int16_t* const data,
int num_channels,
int samples_per_channel) {
auto task = CreateWriteToFileTask();
auto* event = task->GetEvent();
auto event = std::make_unique<audioproc::Event>();
event->set_type(audioproc::Event::REVERSE_STREAM);
audioproc::ReverseStream* msg = event->mutable_reverse_stream();
const size_t data_size = sizeof(int16_t) * samples_per_channel * num_channels;
msg->set_data(data, data_size);
worker_queue_->PostTask(std::move(task));
PostWriteToFileTask(std::move(event));
}
void AecDumpImpl::WriteRenderStreamMessage(
const AudioFrameView<const float>& src) {
auto task = CreateWriteToFileTask();
auto* event = task->GetEvent();
auto event = std::make_unique<audioproc::Event>();
event->set_type(audioproc::Event::REVERSE_STREAM);
audioproc::ReverseStream* msg = event->mutable_reverse_stream();
@ -160,23 +152,21 @@ void AecDumpImpl::WriteRenderStreamMessage(
msg->add_channel(channel_view.begin(), sizeof(float) * channel_view.size());
}
worker_queue_->PostTask(std::move(task));
PostWriteToFileTask(std::move(event));
}
void AecDumpImpl::WriteConfig(const InternalAPMConfig& config) {
RTC_DCHECK_RUNS_SERIALIZED(&race_checker_);
auto task = CreateWriteToFileTask();
auto* event = task->GetEvent();
auto event = std::make_unique<audioproc::Event>();
event->set_type(audioproc::Event::CONFIG);
CopyFromConfigToEvent(config, event->mutable_config());
worker_queue_->PostTask(std::move(task));
PostWriteToFileTask(std::move(event));
}
void AecDumpImpl::WriteRuntimeSetting(
const AudioProcessing::RuntimeSetting& runtime_setting) {
RTC_DCHECK_RUNS_SERIALIZED(&race_checker_);
auto task = CreateWriteToFileTask();
auto* event = task->GetEvent();
auto event = std::make_unique<audioproc::Event>();
event->set_type(audioproc::Event::RUNTIME_SETTING);
audioproc::RuntimeSetting* setting = event->mutable_runtime_setting();
switch (runtime_setting.type()) {
@ -233,12 +223,34 @@ void AecDumpImpl::WriteRuntimeSetting(
RTC_DCHECK_NOTREACHED();
break;
}
worker_queue_->PostTask(std::move(task));
PostWriteToFileTask(std::move(event));
}
std::unique_ptr<WriteToFileTask> AecDumpImpl::CreateWriteToFileTask() {
return std::make_unique<WriteToFileTask>(&debug_file_,
&num_bytes_left_for_log_);
void AecDumpImpl::PostWriteToFileTask(std::unique_ptr<audioproc::Event> event) {
RTC_DCHECK(event);
worker_queue_->PostTask([event = std::move(event), this] {
std::string event_string = event->SerializeAsString();
const size_t event_byte_size = event_string.size();
if (num_bytes_left_for_log_ >= 0) {
const int64_t next_message_size = sizeof(int32_t) + event_byte_size;
if (num_bytes_left_for_log_ < next_message_size) {
// Ensure that no further events are written, even if they're smaller
// than the current event.
num_bytes_left_for_log_ = 0;
return;
}
num_bytes_left_for_log_ -= next_message_size;
}
// Write message preceded by its size.
if (!debug_file_.Write(&event_byte_size, sizeof(int32_t))) {
RTC_DCHECK_NOTREACHED();
}
if (!debug_file_.Write(event_string.data(), event_string.size())) {
RTC_DCHECK_NOTREACHED();
}
});
}
std::unique_ptr<AecDump> AecDumpFactory::Create(webrtc::FileWrapper file,

View File

@ -16,7 +16,6 @@
#include <vector>
#include "modules/audio_processing/aec_dump/capture_stream_info.h"
#include "modules/audio_processing/aec_dump/write_to_file_task.h"
#include "modules/audio_processing/include/aec_dump.h"
#include "rtc_base/ignore_wundef.h"
#include "rtc_base/race_checker.h"
@ -33,21 +32,19 @@ RTC_PUSH_IGNORING_WUNDEF()
#endif
RTC_POP_IGNORING_WUNDEF()
namespace rtc {
class TaskQueue;
} // namespace rtc
namespace webrtc {
// Task-queue based implementation of AecDump. It is thread safe by
// relying on locks in TaskQueue.
class AecDumpImpl : public AecDump {
public:
// Does member variables initialization shared across all c-tors.
// `max_log_size_bytes` - maximum number of bytes to write to the debug file,
// `max_log_size_bytes == -1` means the log size will be unlimited.
AecDumpImpl(FileWrapper debug_file,
int64_t max_log_size_bytes,
rtc::TaskQueue* worker_queue);
AecDumpImpl(const AecDumpImpl&) = delete;
AecDumpImpl& operator=(const AecDumpImpl&) = delete;
~AecDumpImpl() override;
void WriteInitMessage(const ProcessingConfig& api_format,
@ -75,7 +72,7 @@ class AecDumpImpl : public AecDump {
const AudioProcessing::RuntimeSetting& runtime_setting) override;
private:
std::unique_ptr<WriteToFileTask> CreateWriteToFileTask();
void PostWriteToFileTask(std::unique_ptr<audioproc::Event> event);
FileWrapper debug_file_;
int64_t num_bytes_left_for_log_ = 0;

View File

@ -11,17 +11,9 @@
#include "modules/audio_processing/aec_dump/capture_stream_info.h"
namespace webrtc {
CaptureStreamInfo::CaptureStreamInfo(std::unique_ptr<WriteToFileTask> task)
: task_(std::move(task)) {
RTC_DCHECK(task_);
task_->GetEvent()->set_type(audioproc::Event::STREAM);
}
CaptureStreamInfo::~CaptureStreamInfo() = default;
void CaptureStreamInfo::AddInput(const AudioFrameView<const float>& src) {
RTC_DCHECK(task_);
auto* stream = task_->GetEvent()->mutable_stream();
auto* stream = event_->mutable_stream();
for (int i = 0; i < src.num_channels(); ++i) {
const auto& channel_view = src.channel(i);
@ -31,8 +23,7 @@ void CaptureStreamInfo::AddInput(const AudioFrameView<const float>& src) {
}
void CaptureStreamInfo::AddOutput(const AudioFrameView<const float>& src) {
RTC_DCHECK(task_);
auto* stream = task_->GetEvent()->mutable_stream();
auto* stream = event_->mutable_stream();
for (int i = 0; i < src.num_channels(); ++i) {
const auto& channel_view = src.channel(i);
@ -44,8 +35,7 @@ void CaptureStreamInfo::AddOutput(const AudioFrameView<const float>& src) {
void CaptureStreamInfo::AddInput(const int16_t* const data,
int num_channels,
int samples_per_channel) {
RTC_DCHECK(task_);
auto* stream = task_->GetEvent()->mutable_stream();
auto* stream = event_->mutable_stream();
const size_t data_size = sizeof(int16_t) * samples_per_channel * num_channels;
stream->set_input_data(data, data_size);
}
@ -53,16 +43,14 @@ void CaptureStreamInfo::AddInput(const int16_t* const data,
void CaptureStreamInfo::AddOutput(const int16_t* const data,
int num_channels,
int samples_per_channel) {
RTC_DCHECK(task_);
auto* stream = task_->GetEvent()->mutable_stream();
auto* stream = event_->mutable_stream();
const size_t data_size = sizeof(int16_t) * samples_per_channel * num_channels;
stream->set_output_data(data, data_size);
}
void CaptureStreamInfo::AddAudioProcessingState(
const AecDump::AudioProcessingState& state) {
RTC_DCHECK(task_);
auto* stream = task_->GetEvent()->mutable_stream();
auto* stream = event_->mutable_stream();
stream->set_delay(state.delay);
stream->set_drift(state.drift);
stream->set_level(state.level);

View File

@ -13,13 +13,9 @@
#include <memory>
#include <utility>
#include <vector>
#include "modules/audio_processing/aec_dump/write_to_file_task.h"
#include "modules/audio_processing/include/aec_dump.h"
#include "rtc_base/checks.h"
#include "rtc_base/ignore_wundef.h"
#include "rtc_base/logging.h"
// Files generated at build-time by the protobuf compiler.
RTC_PUSH_IGNORING_WUNDEF()
@ -34,8 +30,11 @@ namespace webrtc {
class CaptureStreamInfo {
public:
explicit CaptureStreamInfo(std::unique_ptr<WriteToFileTask> task);
~CaptureStreamInfo();
CaptureStreamInfo() { CreateNewEvent(); }
CaptureStreamInfo(const CaptureStreamInfo&) = delete;
CaptureStreamInfo& operator=(const CaptureStreamInfo&) = delete;
~CaptureStreamInfo() = default;
void AddInput(const AudioFrameView<const float>& src);
void AddOutput(const AudioFrameView<const float>& src);
@ -48,20 +47,18 @@ class CaptureStreamInfo {
void AddAudioProcessingState(const AecDump::AudioProcessingState& state);
std::unique_ptr<WriteToFileTask> GetTask() {
RTC_DCHECK(task_);
return std::move(task_);
}
void SetTask(std::unique_ptr<WriteToFileTask> task) {
RTC_DCHECK(!task_);
RTC_DCHECK(task);
task_ = std::move(task);
task_->GetEvent()->set_type(audioproc::Event::STREAM);
std::unique_ptr<audioproc::Event> FetchEvent() {
std::unique_ptr<audioproc::Event> result = std::move(event_);
CreateNewEvent();
return result;
}
private:
std::unique_ptr<WriteToFileTask> task_;
void CreateNewEvent() {
event_ = std::make_unique<audioproc::Event>();
event_->set_type(audioproc::Event::STREAM);
}
std::unique_ptr<audioproc::Event> event_;
};
} // namespace webrtc

View File

@ -1,66 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/audio_processing/aec_dump/write_to_file_task.h"
#include <string>
namespace webrtc {
WriteToFileTask::WriteToFileTask(webrtc::FileWrapper* debug_file,
int64_t* num_bytes_left_for_log)
: debug_file_(debug_file),
num_bytes_left_for_log_(num_bytes_left_for_log) {}
WriteToFileTask::~WriteToFileTask() = default;
audioproc::Event* WriteToFileTask::GetEvent() {
return &event_;
}
bool WriteToFileTask::IsRoomForNextEvent(size_t event_byte_size) const {
int64_t next_message_size = event_byte_size + sizeof(int32_t);
return (*num_bytes_left_for_log_ < 0) ||
(*num_bytes_left_for_log_ >= next_message_size);
}
void WriteToFileTask::UpdateBytesLeft(size_t event_byte_size) {
RTC_DCHECK(IsRoomForNextEvent(event_byte_size));
if (*num_bytes_left_for_log_ >= 0) {
*num_bytes_left_for_log_ -= (sizeof(int32_t) + event_byte_size);
}
}
bool WriteToFileTask::Run() {
std::string event_string;
event_.SerializeToString(&event_string);
const size_t event_byte_size = event_.ByteSizeLong();
if (!IsRoomForNextEvent(event_byte_size)) {
// Ensure that no further events are written, even if they're smaller than
// the current event.
*num_bytes_left_for_log_ = 0;
return true;
}
UpdateBytesLeft(event_byte_size);
// Write message preceded by its size.
if (!debug_file_->Write(&event_byte_size, sizeof(int32_t))) {
RTC_DCHECK_NOTREACHED();
}
if (!debug_file_->Write(event_string.data(), event_string.length())) {
RTC_DCHECK_NOTREACHED();
}
return true; // Delete task from queue at once.
}
} // namespace webrtc

View File

@ -1,57 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_AUDIO_PROCESSING_AEC_DUMP_WRITE_TO_FILE_TASK_H_
#define MODULES_AUDIO_PROCESSING_AEC_DUMP_WRITE_TO_FILE_TASK_H_
#include <memory>
#include <string>
#include <utility>
#include "api/task_queue/queued_task.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/ignore_wundef.h"
#include "rtc_base/system/file_wrapper.h"
// Files generated at build-time by the protobuf compiler.
RTC_PUSH_IGNORING_WUNDEF()
#ifdef WEBRTC_ANDROID_PLATFORM_BUILD
#include "external/webrtc/webrtc/modules/audio_processing/debug.pb.h"
#else
#include "modules/audio_processing/debug.pb.h"
#endif
RTC_POP_IGNORING_WUNDEF()
namespace webrtc {
class WriteToFileTask : public QueuedTask {
public:
WriteToFileTask(webrtc::FileWrapper* debug_file,
int64_t* num_bytes_left_for_log);
~WriteToFileTask() override;
audioproc::Event* GetEvent();
private:
bool IsRoomForNextEvent(size_t event_byte_size) const;
void UpdateBytesLeft(size_t event_byte_size);
bool Run() override;
webrtc::FileWrapper* const debug_file_;
audioproc::Event event_;
int64_t* const num_bytes_left_for_log_;
};
} // namespace webrtc
#endif // MODULES_AUDIO_PROCESSING_AEC_DUMP_WRITE_TO_FILE_TASK_H_