This CL is a refactoring of the APM QA tool; it includes the following changes:

- render stream support, required to assess AEC;
- echo path simulation and input mixer, to generate echo and add it to the
speech signal;
- export engine: improved UI, switch to Pandas DataFrames;
- minor design improvements and needed adaptions.

BUG=webrtc:7218

Review-Url: https://codereview.webrtc.org/2813883002
Cr-Commit-Position: refs/heads/master@{#19198}
This commit is contained in:
alessiob 2017-08-01 04:37:21 -07:00 committed by Commit Bot
parent 8a1d2a315f
commit fdd568eb25
25 changed files with 1612 additions and 622 deletions

View File

@ -54,11 +54,14 @@ copy("lib") {
"quality_assessment/__init__.py",
"quality_assessment/audioproc_wrapper.py",
"quality_assessment/data_access.py",
"quality_assessment/echo_path_simulation.py",
"quality_assessment/echo_path_simulation_factory.py",
"quality_assessment/eval_scores.py",
"quality_assessment/eval_scores_factory.py",
"quality_assessment/evaluation.py",
"quality_assessment/exceptions.py",
"quality_assessment/export.py",
"quality_assessment/input_mixer.py",
"quality_assessment/results.css",
"quality_assessment/results.js",
"quality_assessment/signal_processing.py",
@ -112,7 +115,9 @@ rtc_executable("fake_polqa") {
copy("lib_unit_tests") {
testonly = true
sources = [
"quality_assessment/echo_path_simulation_unittest.py",
"quality_assessment/eval_scores_unittest.py",
"quality_assessment/input_mixer_unittest.py",
"quality_assessment/signal_processing_unittest.py",
"quality_assessment/simulation_unittest.py",
"quality_assessment/test_data_generation_unittest.py",

View File

@ -11,7 +11,7 @@ reference one used for evaluation.
- OS: Linux
- Python 2.7
- Python libraries: numpy, scipy, pydub (0.17.0+)
- Python libraries: numpy, scipy, pydub (0.17.0+), pandas (0.20.1+)
- It is recommended that a dedicated Python environment is used
- install `virtualenv`
- `$ sudo apt-get install python-virtualenv`
@ -20,7 +20,7 @@ reference one used for evaluation.
- activate the new Python environment
- `$ source ~/my_env/bin/activate`
- add dependcies via `pip`
- `(my_env)$ pip install numpy pydub scipy`
- `(my_env)$ pip install numpy pydub scipy pandas`
- PolqaOem64 (see http://www.polqa.info/)
- Tested with POLQA Library v1.180 / P863 v2.400
- Aachen Impulse Response (AIR) Database
@ -68,22 +68,24 @@ Showing all the results at once can be confusing. You therefore may want to
export separate reports. In this case, you can use the
`apm_quality_assessment_export.py` script as follows:
- Set --output_dir to the same value used in `apm_quality_assessment.sh`
- Set `--output_dir, -o` to the same value used in `apm_quality_assessment.sh`
- Use regular expressions to select/filter out scores by
- APM configurations: `--config_names, -c`
- probing signals: `--input_names, -i`
- capture signals: `--capture_names, -i`
- render signals: `--render_names, -r`
- echo simulator: `--echo_simulator_names, -e`
- test data generators: `--test_data_generators, -t`
- scores: `--eval_scores, -e`
- scores: `--eval_scores, -s`
- Assign a suffix to the report name using `-f <suffix>`
For instance:
```
$ ./apm_quality_assessment-export.py \
-o ~/data/apm_quality_assessment \
-e \(polqa\) \
-n \(echo\) \
-o output/ \
-c "(^default$)|(.*AE.*)" \
-t \(white_noise\) \
-s \(polqa\) \
-f echo
```
@ -92,8 +94,8 @@ $ ./apm_quality_assessment-export.py \
The input wav file must be:
- sampled at a sample rate that is a multiple of 100 (required by POLQA)
- in the 16 bit format (required by `audioproc_f`)
- encoded in the Microsoft WAV signed 16 bit PCM format (Audacity default
when exporting)
- encoded in the Microsoft WAV signed 16 bit PCM format (Audacity default
when exporting)
Depending on the license, the POLQA tool may take “breaks” as a way to limit the
throughput. When this happens, the APM Quality Assessment tool is slowed down.

View File

@ -23,11 +23,14 @@ import os
import sys
import quality_assessment.audioproc_wrapper as audioproc_wrapper
import quality_assessment.echo_path_simulation as echo_path_simulation
import quality_assessment.eval_scores as eval_scores
import quality_assessment.evaluation as evaluation
import quality_assessment.test_data_generation as test_data_generation
import quality_assessment.simulation as simulation
_ECHO_PATH_SIMULATOR_NAMES = (
echo_path_simulation.EchoPathSimulator.REGISTERED_CLASSES)
_TEST_DATA_GENERATOR_CLASSES = (
test_data_generation.TestDataGenerator.REGISTERED_CLASSES)
_TEST_DATA_GENERATORS_NAMES = _TEST_DATA_GENERATOR_CLASSES.keys()
@ -53,8 +56,20 @@ def _InstanceArgumentsParser():
'called'),
default=[_DEFAULT_CONFIG_FILE])
parser.add_argument('-i', '--input_files', nargs='+', required=True,
help='path to the input wav files (one or more)')
parser.add_argument('-i', '--capture_input_files', nargs='+', required=True,
help='path to the capture input wav files (one or more)')
parser.add_argument('-r', '--render_input_files', nargs='+', required=False,
help=('path to the render input wav files; either '
'omitted or one file for each file in '
'--capture_input_files (files will be paired by '
'index)'), default=None)
parser.add_argument('-p', '--echo_path_simulator', required=False,
help=('custom echo path simulator name; required if '
'--render_input_files is specified'),
choices=_ECHO_PATH_SIMULATOR_NAMES,
default=echo_path_simulation.NoEchoPathSimulator.NAME)
parser.add_argument('-t', '--test_data_generators', nargs='+', required=False,
help='custom list of test data generators to use',
@ -87,6 +102,15 @@ def main():
parser = _InstanceArgumentsParser()
args = parser.parse_args()
if args.capture_input_files and args.render_input_files and (
len(args.capture_input_files) != len(args.render_input_files)):
parser.error('--render_input_files and --capture_input_files must be lists '
'having the same length')
sys.exit(1)
if args.render_input_files and not args.echo_path_simulator:
parser.error('when --render_input_files is set, --echo_path_simulator is '
'also required')
sys.exit(1)
simulator = simulation.ApmModuleSimulator(
aechen_ir_database_path=args.air_db_path,
@ -95,7 +119,9 @@ def main():
evaluator=evaluation.ApmModuleEvaluator())
simulator.Run(
config_filepaths=args.config_files,
input_filepaths=args.input_files,
capture_input_filepaths=args.capture_input_files,
render_input_filepaths=args.render_input_files,
echo_path_simulator_name=args.echo_path_simulator,
test_data_generator_names=args.test_data_generators,
eval_score_names=args.eval_scores,
output_dir=args.output_dir)

View File

@ -32,16 +32,17 @@ else
fi
# Customize probing signals, test data generators and scores if needed.
PROBING_SIGNALS=(probing_signals/*.wav)
CAPTURE_SIGNALS=(probing_signals/*.wav)
TEST_DATA_GENERATORS=( \
"identity" \
"white_noise" \
"environmental_noise" \
"reverberation" \
# "environmental_noise" \
# "reverberation" \
)
SCORES=( \
"polqa" \
"audio_level" \
# "polqa" \
"audio_level_peak" \
"audio_level_mean" \
)
OUTPUT_PATH=output
@ -59,8 +60,8 @@ fi
# Start one process for each "probing signal"-"test data source" pair.
chmod +x apm_quality_assessment.py
for probing_signal_filepath in "${PROBING_SIGNALS[@]}" ; do
probing_signal_name="$(basename $probing_signal_filepath)"
for capture_signal_filepath in "${CAPTURE_SIGNALS[@]}" ; do
probing_signal_name="$(basename $capture_signal_filepath)"
probing_signal_name="${probing_signal_name%.*}"
for test_data_gen_name in "${TEST_DATA_GENERATORS[@]}" ; do
LOG_FILE="${OUTPUT_PATH}/apm_qa-${probing_signal_name}-"`
@ -70,7 +71,7 @@ for probing_signal_filepath in "${PROBING_SIGNALS[@]}" ; do
./apm_quality_assessment.py \
--polqa_path ${POLQA_PATH}\
--air_db_path ${AECHEN_IR_DATABASE_PATH}\
-i ${probing_signal_filepath} \
-i ${capture_signal_filepath} \
-o ${OUTPUT_PATH} \
-t ${test_data_gen_name} \
-c "${APM_CONFIGS[@]}" \
@ -78,7 +79,7 @@ for probing_signal_filepath in "${PROBING_SIGNALS[@]}" ; do
done
done
# Join.
# Join Python processes running apm_quality_assessment.py.
wait
# Export results.

View File

@ -12,22 +12,37 @@
"""
import argparse
import collections
import logging
import glob
import os
import re
import sys
import quality_assessment.audioproc_wrapper as audioproc_wrapper
try:
import pandas as pd
except ImportError:
logging.critical('Cannot import the third-party Python package pandas')
sys.exit(1)
import quality_assessment.data_access as data_access
import quality_assessment.export as export
import quality_assessment.simulation as sim
# Regular expressions used to derive score descriptors from file paths.
RE_CONFIG_NAME = re.compile(r'cfg-(.+)')
RE_INPUT_NAME = re.compile(r'input-(.+)')
RE_TEST_DATA_GEN_NAME = re.compile(r'gen-(.+)')
RE_SCORE_NAME = re.compile(r'score-(.+)\.txt')
# Compiled regular expressions used to extract score descriptors.
RE_CONFIG_NAME = re.compile(
sim.ApmModuleSimulator.GetPrefixApmConfig() + r'(.+)')
RE_CAPTURE_NAME = re.compile(
sim.ApmModuleSimulator.GetPrefixCapture() + r'(.+)')
RE_RENDER_NAME = re.compile(
sim.ApmModuleSimulator.GetPrefixRender() + r'(.+)')
RE_ECHO_SIM_NAME = re.compile(
sim.ApmModuleSimulator.GetPrefixEchoSimulator() + r'(.+)')
RE_TEST_DATA_GEN_NAME = re.compile(
sim.ApmModuleSimulator.GetPrefixTestDataGenerator() + r'(.+)')
RE_TEST_DATA_GEN_PARAMS = re.compile(
sim.ApmModuleSimulator.GetPrefixTestDataGeneratorParameters() + r'(.+)')
RE_SCORE_NAME = re.compile(
sim.ApmModuleSimulator.GetPrefixScore() + r'(.+)(\..+)')
def _InstanceArgumentsParser():
@ -48,15 +63,23 @@ def _InstanceArgumentsParser():
help=('regular expression to filter the APM configuration'
' names'))
parser.add_argument('-i', '--input_names', type=re.compile,
help=('regular expression to filter the probing signal '
parser.add_argument('-i', '--capture_names', type=re.compile,
help=('regular expression to filter the capture signal '
'names'))
parser.add_argument('-r', '--render_names', type=re.compile,
help=('regular expression to filter the render signal '
'names'))
parser.add_argument('-e', '--echo_simulator_names', type=re.compile,
help=('regular expression to filter the echo simulator '
'names'))
parser.add_argument('-t', '--test_data_generators', type=re.compile,
help=('regular expression to filter the test data '
'generator names'))
parser.add_argument('-e', '--eval_scores', type=re.compile,
parser.add_argument('-s', '--eval_scores', type=re.compile,
help=('regular expression to filter the evaluation score '
'names'))
@ -70,32 +93,36 @@ def _GetScoreDescriptors(score_filepath):
score_filepath: path to the score file.
Returns:
A tuple of strings (APM configuration name, input audio track name,
test data generator name, test data generator parameters name,
evaluation score name).
A tuple of strings (APM configuration name, capture audio track name,
render audio track name, echo simulator name, test data generator name,
test data generator parameters as string, evaluation score name).
"""
(config_name, input_name, test_data_gen_name, test_data_gen_params,
score_name) = score_filepath.split(os.sep)[-5:]
config_name = RE_CONFIG_NAME.match(config_name).groups(0)[0]
input_name = RE_INPUT_NAME.match(input_name).groups(0)[0]
test_data_gen_name = RE_TEST_DATA_GEN_NAME.match(
test_data_gen_name).groups(0)[0]
score_name = RE_SCORE_NAME.match(score_name).groups(0)[0]
return (config_name, input_name, test_data_gen_name, test_data_gen_params,
score_name)
fields = score_filepath.split(os.sep)[-7:]
extract_name = lambda index, reg_expr: (
reg_expr.match(fields[index]).groups(0)[0])
return (
extract_name(0, RE_CONFIG_NAME),
extract_name(1, RE_CAPTURE_NAME),
extract_name(2, RE_RENDER_NAME),
extract_name(3, RE_ECHO_SIM_NAME),
extract_name(4, RE_TEST_DATA_GEN_NAME),
extract_name(5, RE_TEST_DATA_GEN_PARAMS),
extract_name(6, RE_SCORE_NAME),
)
def _ExcludeScore(config_name, input_name, test_data_gen_name, score_name,
args):
def _ExcludeScore(config_name, capture_name, render_name, echo_simulator_name,
test_data_gen_name, score_name, args):
"""Decides whether excluding a score.
Given a score descriptor, encoded in config_name, input_name,
test_data_gen_name and score_name, use the corresponding regular expressions
to determine if the score should be excluded.
A set of optional regular expressions in args is used to determine if the
score should be excluded (depending on its |*_name| descriptors).
Args:
config_name: APM configuration name.
input_name: input audio track name.
capture_name: capture audio track name.
render_name: render audio track name.
echo_simulator_name: echo simulator name.
test_data_gen_name: test data generator name.
score_name: evaluation score name.
args: parsed arguments.
@ -105,7 +132,9 @@ def _ExcludeScore(config_name, input_name, test_data_gen_name, score_name,
"""
value_regexpr_pairs = [
(config_name, args.config_names),
(input_name, args.input_names),
(capture_name, args.capture_names),
(render_name, args.render_names),
(echo_simulator_name, args.echo_simulator_names),
(test_data_gen_name, args.test_data_generators),
(score_name, args.eval_scores),
]
@ -134,53 +163,116 @@ def _BuildOutputFilename(filename_suffix):
return 'results-{}.html'.format(filename_suffix)
def _FindScores(src_path, args):
"""Given a search path, find scores and return a DataFrame object.
Args:
src_path: Search path pattern.
args: parsed arguments.
Returns:
A DataFrame object.
"""
# Get scores.
scores = []
for score_filepath in glob.iglob(src_path):
# Extract score descriptor fields from the path.
(config_name,
capture_name,
render_name,
echo_simulator_name,
test_data_gen_name,
test_data_gen_params,
score_name) = _GetScoreDescriptors(score_filepath)
# Ignore the score if required.
if _ExcludeScore(
config_name,
capture_name,
render_name,
echo_simulator_name,
test_data_gen_name,
score_name,
args):
logging.info(
'ignored score: %s %s %s %s %s %s',
config_name,
capture_name,
render_name,
echo_simulator_name,
test_data_gen_name,
score_name)
continue
# Read metadata and score.
metadata = data_access.Metadata.LoadAudioTestDataPaths(
os.path.split(score_filepath)[0])
score = data_access.ScoreFile.Load(score_filepath)
# Add a score with its descriptor fields.
scores.append((
metadata['clean_capture_input_filepath'],
metadata['echo_free_capture_filepath'],
metadata['echo_filepath'],
metadata['render_filepath'],
metadata['capture_filepath'],
metadata['apm_output_filepath'],
metadata['apm_reference_filepath'],
config_name,
capture_name,
render_name,
echo_simulator_name,
test_data_gen_name,
test_data_gen_params,
score_name,
score,
))
return pd.DataFrame(
data=scores,
columns=(
'clean_capture_input_filepath',
'echo_free_capture_filepath',
'echo_filepath',
'render_filepath',
'capture_filepath',
'apm_output_filepath',
'apm_reference_filepath',
'apm_config',
'capture',
'render',
'echo_simulator',
'test_data_gen',
'test_data_gen_params',
'eval_score_name',
'score',
))
def main():
# Init.
logging.basicConfig(level=logging.DEBUG) # TODO(alessio): INFO once debugged.
parser = _InstanceArgumentsParser()
nested_dict = lambda: collections.defaultdict(nested_dict)
scores = nested_dict() # Organize the scores in a nested dictionary.
# Parse command line arguments.
args = parser.parse_args()
# Find score files in the output path.
# Get the scores.
src_path = os.path.join(
args.output_dir, 'cfg-*', 'input-*', 'gen-*', '*', 'score-*.txt')
args.output_dir,
sim.ApmModuleSimulator.GetPrefixApmConfig() + '*',
sim.ApmModuleSimulator.GetPrefixCapture() + '*',
sim.ApmModuleSimulator.GetPrefixRender() + '*',
sim.ApmModuleSimulator.GetPrefixEchoSimulator() + '*',
sim.ApmModuleSimulator.GetPrefixTestDataGenerator() + '*',
sim.ApmModuleSimulator.GetPrefixTestDataGeneratorParameters() + '*',
sim.ApmModuleSimulator.GetPrefixScore() + '*')
logging.debug(src_path)
for score_filepath in glob.iglob(src_path):
# Extract score descriptors from the path.
(config_name, input_name, test_data_gen_name, test_data_gen_params,
score_name) = _GetScoreDescriptors(score_filepath)
# Ignore the score if required.
if _ExcludeScore(
config_name, input_name, test_data_gen_name, score_name, args):
logging.info('ignored score: %s %s %s %s',
config_name, input_name, test_data_gen_name, score_name)
continue
# Get metadata.
score_path, _ = os.path.split(score_filepath)
audio_in_filepath, audio_ref_filepath = (
data_access.Metadata.LoadAudioTestDataPaths(score_path))
audio_out_filepath = os.path.abspath(os.path.join(
score_path, audioproc_wrapper.AudioProcWrapper.OUTPUT_FILENAME))
# Add the score to the nested dictionary.
scores[score_name][config_name][input_name][test_data_gen_name][
test_data_gen_params] = {
'score': data_access.ScoreFile.Load(score_filepath),
'audio_in_filepath': audio_in_filepath,
'audio_out_filepath': audio_out_filepath,
'audio_ref_filepath': audio_ref_filepath,
}
scores_data_frame = _FindScores(src_path, args)
# Export.
output_filepath = os.path.join(args.output_dir, _BuildOutputFilename(
args.filename_suffix))
exporter = export.HtmlExport(output_filepath)
exporter.Export(scores)
exporter.Export(scores_data_frame)
logging.info('output file successfully written in %s', output_filepath)
sys.exit(0)

View File

@ -15,6 +15,7 @@ import os
import subprocess
from . import data_access
from . import exceptions
class AudioProcWrapper(object):
@ -22,11 +23,11 @@ class AudioProcWrapper(object):
"""
OUTPUT_FILENAME = 'output.wav'
_AUDIOPROC_F_BIN_PATH = os.path.abspath('../audioproc_f')
_AUDIOPROC_F_BIN_PATH = os.path.abspath(os.path.join(
os.pardir, 'audioproc_f'))
def __init__(self):
self._config = None
self._input_signal_filepath = None
self._output_signal_filepath = None
# Profiler instance to measure audioproc_f running time.
@ -36,17 +37,20 @@ class AudioProcWrapper(object):
def output_filepath(self):
return self._output_signal_filepath
def Run(self, config_filepath, input_filepath, output_path):
def Run(self, config_filepath, capture_input_filepath, output_path,
render_input_filepath=None):
"""Run audioproc_f.
Args:
config_filepath: path to the configuration file specifing the arguments
for audioproc_f.
input_filepath: path to the audio track input file.
capture_input_filepath: path to the capture audio track input file (aka
forward or near-end).
output_path: path of the audio track output file.
render_input_filepath: path to the render audio track input file (aka
reverse or far-end).
"""
# Init.
self._input_signal_filepath = input_filepath
self._output_signal_filepath = os.path.join(
output_path, self.OUTPUT_FILENAME)
profiling_stats_filepath = os.path.join(output_path, 'profiling.stats')
@ -60,8 +64,14 @@ class AudioProcWrapper(object):
self._config = data_access.AudioProcConfigFile.Load(config_filepath)
# Set remaining parametrs.
self._config['-i'] = self._input_signal_filepath
if not os.path.exists(capture_input_filepath):
raise exceptions.FileNotFoundError('cannot find capture input file')
self._config['-i'] = capture_input_filepath
self._config['-o'] = self._output_signal_filepath
if render_input_filepath is not None:
if not os.path.exists(render_input_filepath):
raise exceptions.FileNotFoundError('cannot find render input file')
self._config['-ri'] = render_input_filepath
# Build arguments list.
args = [self._AUDIOPROC_F_BIN_PATH]

View File

@ -31,7 +31,7 @@ class Metadata(object):
def __init__(self):
pass
_AUDIO_TEST_DATA_FILENAME = 'audio_test_data.txt'
_AUDIO_TEST_DATA_FILENAME = 'audio_test_data.json'
@classmethod
def LoadAudioTestDataPaths(cls, metadata_path):
@ -46,23 +46,21 @@ class Metadata(object):
metadata_filepath = os.path.join(
metadata_path, cls._AUDIO_TEST_DATA_FILENAME)
with open(metadata_filepath) as f:
audio_in_filepath = f.readline().strip()
audio_ref_filepath = f.readline().strip()
return audio_in_filepath, audio_ref_filepath
return json.load(f)
@classmethod
def SaveAudioTestDataPaths(cls, output_path, audio_in_filepath,
audio_ref_filepath):
def SaveAudioTestDataPaths(cls, output_path, **filepaths):
"""Saves the input and the reference audio track paths.
Args:
output_path: path to the directory containing the metadata file.
audio_in_filepath: path to the input audio track file.
audio_ref_filepath: path to the reference audio track file.
Keyword Args:
filepaths: collection of audio track file paths to save.
"""
output_filepath = os.path.join(output_path, cls._AUDIO_TEST_DATA_FILENAME)
with open(output_filepath, 'w') as f:
f.write('{}\n{}\n'.format(audio_in_filepath, audio_ref_filepath))
json.dump(filepaths, f)
class AudioProcConfigFile(object):

View File

@ -0,0 +1,136 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Echo path simulation module.
"""
import hashlib
import os
from . import signal_processing
class EchoPathSimulator(object):
"""Abstract class for the echo path simulators.
In general, an echo path simulator is a function of the render signal and
simulates the propagation of the latter into the microphone (e.g., due to
mechanical or electrical paths).
"""
NAME = None
REGISTERED_CLASSES = {}
def __init__(self):
pass
def Simulate(self, output_path):
"""Creates the echo signal and stores it in an audio file (abstract method).
Args:
output_path: Path in which any output can be saved.
Returns:
Path to the generated audio track file or None if no echo is present.
"""
raise NotImplementedError()
@classmethod
def RegisterClass(cls, class_to_register):
"""Registers an EchoPathSimulator implementation.
Decorator to automatically register the classes that extend
EchoPathSimulator.
Example usage:
@EchoPathSimulator.RegisterClass
class NoEchoPathSimulator(EchoPathSimulator):
pass
"""
cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register
return class_to_register
@EchoPathSimulator.RegisterClass
class NoEchoPathSimulator(EchoPathSimulator):
"""Simulates absence of echo."""
NAME = 'noecho'
def __init__(self):
EchoPathSimulator.__init__(self)
def Simulate(self, output_path):
return None
@EchoPathSimulator.RegisterClass
class LinearEchoPathSimulator(EchoPathSimulator):
"""Simulates linear echo path.
This class applies a given impulse response to the render input and then it
sums the signal to the capture input signal.
"""
NAME = 'linear'
def __init__(self, render_input_filepath, impulse_response):
"""
Args:
render_input_filepath: Render audio track file.
impulse_response: list or numpy vector of float values.
"""
EchoPathSimulator.__init__(self)
self._render_input_filepath = render_input_filepath
self._impulse_response = impulse_response
def Simulate(self, output_path):
"""Simulates linear echo path."""
# Form the file name with a hash of the impulse response.
impulse_response_hash = hashlib.sha256(
str(self._impulse_response).encode('utf-8', 'ignore')).hexdigest()
echo_filepath = os.path.join(output_path, 'linear_echo_{}.wav'.format(
impulse_response_hash))
# If the simulated echo audio track file does not exists, create it.
if not os.path.exists(echo_filepath):
render = signal_processing.SignalProcessingUtils.LoadWav(
self._render_input_filepath)
echo = signal_processing.SignalProcessingUtils.ApplyImpulseResponse(
render, self._impulse_response)
signal_processing.SignalProcessingUtils.SaveWav(echo_filepath, echo)
return echo_filepath
@EchoPathSimulator.RegisterClass
class RecordedEchoPathSimulator(EchoPathSimulator):
"""Uses recorded echo.
This class uses the clean capture input file name to build the file name of
the corresponding recording containing echo (a predefined suffix is used).
Such a file is expected to be already existing.
"""
NAME = 'recorded'
_FILE_NAME_SUFFIX = '_echo'
def __init__(self, render_input_filepath):
EchoPathSimulator.__init__(self)
self._render_input_filepath = render_input_filepath
def Simulate(self, output_path):
"""Uses recorded echo path."""
path, file_name_ext = os.path.split(self._render_input_filepath)
file_name, file_ext = os.path.splitext(file_name_ext)
echo_filepath = os.path.join(path, '{}{}{}'.format(
file_name, self._FILE_NAME_SUFFIX, file_ext))
assert os.path.exists(echo_filepath), (
'cannot find the echo audio track file {}'.format(echo_filepath))
return echo_filepath

View File

@ -0,0 +1,48 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Echo path simulation factory module.
"""
import numpy as np
from . import echo_path_simulation
class EchoPathSimulatorFactory(object):
# TODO(alessiob): Replace 5 ms delay (at 48 kHz sample rate) with a more
# realistic impulse response.
_LINEAR_ECHO_IMPULSE_RESPONSE = np.array([0.0]*(5 * 48) + [0.15])
def __init__(self):
pass
@classmethod
def GetInstance(cls, echo_path_simulator_class, render_input_filepath):
"""Creates an EchoPathSimulator instance given a class object.
Args:
echo_path_simulator_class: EchoPathSimulator class object (not an
instance).
render_input_filepath: Path to the render audio track file.
Returns:
An EchoPathSimulator instance.
"""
assert render_input_filepath is not None or (
echo_path_simulator_class == echo_path_simulation.NoEchoPathSimulator)
if echo_path_simulator_class == echo_path_simulation.NoEchoPathSimulator:
return echo_path_simulation.NoEchoPathSimulator()
elif echo_path_simulator_class == (
echo_path_simulation.LinearEchoPathSimulator):
return echo_path_simulation.LinearEchoPathSimulator(
render_input_filepath, cls._LINEAR_ECHO_IMPULSE_RESPONSE)
else:
return echo_path_simulator_class(render_input_filepath)

View File

@ -0,0 +1,81 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Unit tests for the echo path simulation module.
"""
import shutil
import os
import tempfile
import unittest
import pydub
from . import echo_path_simulation
from . import echo_path_simulation_factory
from . import signal_processing
class TestEchoPathSimulators(unittest.TestCase):
"""Unit tests for the eval_scores module.
"""
def setUp(self):
"""Creates temporary data."""
self._tmp_path = tempfile.mkdtemp()
# Create and save white noise.
silence = pydub.AudioSegment.silent(duration=1000, frame_rate=48000)
white_noise = signal_processing.SignalProcessingUtils.GenerateWhiteNoise(
silence)
self._audio_track_num_samples = (
signal_processing.SignalProcessingUtils.CountSamples(white_noise))
self._audio_track_filepath = os.path.join(self._tmp_path, 'white_noise.wav')
signal_processing.SignalProcessingUtils.SaveWav(
self._audio_track_filepath, white_noise)
# Make a copy the white noise audio track file; it will be used by
# echo_path_simulation.RecordedEchoPathSimulator.
shutil.copy(self._audio_track_filepath, os.path.join(
self._tmp_path, 'white_noise_echo.wav'))
def tearDown(self):
"""Recursively deletes temporary folders."""
shutil.rmtree(self._tmp_path)
def testRegisteredClasses(self):
# Check that there is at least one registered echo path simulator.
registered_classes = (
echo_path_simulation.EchoPathSimulator.REGISTERED_CLASSES)
self.assertIsInstance(registered_classes, dict)
self.assertGreater(len(registered_classes), 0)
# Instance factory.
factory = echo_path_simulation_factory.EchoPathSimulatorFactory()
# Try each registered echo path simulator.
for echo_path_simulator_name in registered_classes:
simulator = factory.GetInstance(
echo_path_simulator_class=registered_classes[
echo_path_simulator_name],
render_input_filepath=self._audio_track_filepath)
echo_filepath = simulator.Simulate(self._tmp_path)
if echo_filepath is None:
self.assertEqual(echo_path_simulation.NoEchoPathSimulator.NAME,
echo_path_simulator_name)
# No other tests in this case.
continue
# Check that the echo audio track file exists and its length is greater or
# equal to that of the render audio track.
self.assertTrue(os.path.exists(echo_filepath))
echo = signal_processing.SignalProcessingUtils.LoadWav(echo_filepath)
self.assertGreaterEqual(
signal_processing.SignalProcessingUtils.CountSamples(echo),
self._audio_track_num_samples)

View File

@ -9,6 +9,7 @@
"""Evaluation score abstract class and implementations.
"""
from __future__ import division
import logging
import os
import re
@ -24,7 +25,8 @@ class EvaluationScore(object):
NAME = None
REGISTERED_CLASSES = {}
def __init__(self):
def __init__(self, score_filename_prefix):
self._score_filename_prefix = score_filename_prefix
self._reference_signal = None
self._reference_signal_filepath = None
self._tested_signal = None
@ -76,8 +78,8 @@ class EvaluationScore(object):
Args:
output_path: path to the directory where the output is written.
"""
self._output_filepath = os.path.join(output_path, 'score-{}.txt'.format(
self.NAME))
self._output_filepath = os.path.join(
output_path, self._score_filename_prefix + self.NAME + '.txt')
try:
# If the score has already been computed, load.
self._LoadScore()
@ -110,10 +112,10 @@ class EvaluationScore(object):
@EvaluationScore.RegisterClass
class AudioLevelScore(EvaluationScore):
"""Audio level score.
class AudioLevelPeakScore(EvaluationScore):
"""Peak audio level score.
Defined as the difference between the average audio level of the tested and
Defined as the difference between the peak audio level of the tested and
the reference signals.
Unit: dB
@ -121,10 +123,10 @@ class AudioLevelScore(EvaluationScore):
Worst case: +/-inf dB
"""
NAME = 'audio_level'
NAME = 'audio_level_peak'
def __init__(self):
EvaluationScore.__init__(self)
def __init__(self, score_filename_prefix):
EvaluationScore.__init__(self, score_filename_prefix)
def _Run(self, output_path):
self._LoadReferenceSignal()
@ -133,6 +135,38 @@ class AudioLevelScore(EvaluationScore):
self._SaveScore()
@EvaluationScore.RegisterClass
class MeanAudioLevelScore(EvaluationScore):
"""Mean audio level score.
Defined as the difference between the mean audio level of the tested and
the reference signals.
Unit: dB
Ideal: 0 dB
Worst case: +/-inf dB
"""
NAME = 'audio_level_mean'
def __init__(self, score_filename_prefix):
EvaluationScore.__init__(self, score_filename_prefix)
def _Run(self, output_path):
self._LoadReferenceSignal()
self._LoadTestedSignal()
dbfs_diffs_sum = 0.0
seconds = min(len(self._tested_signal), len(self._reference_signal)) // 1000
for t in range(seconds):
t0 = t * seconds
t1 = t0 + seconds
dbfs_diffs_sum += (
self._tested_signal[t0:t1].dBFS - self._reference_signal[t0:t1].dBFS)
self._score = dbfs_diffs_sum / float(seconds)
self._SaveScore()
@EvaluationScore.RegisterClass
class PolqaScore(EvaluationScore):
"""POLQA score.
@ -146,8 +180,8 @@ class PolqaScore(EvaluationScore):
NAME = 'polqa'
def __init__(self, polqa_bin_filepath):
EvaluationScore.__init__(self)
def __init__(self, score_filename_prefix, polqa_bin_filepath):
EvaluationScore.__init__(self, score_filename_prefix)
# POLQA binary file path.
self._polqa_bin_filepath = polqa_bin_filepath

View File

@ -21,7 +21,8 @@ class EvaluationScoreWorkerFactory(object):
workers.
"""
def __init__(self, polqa_tool_bin_path):
def __init__(self, score_filename_prefix, polqa_tool_bin_path):
self._score_filename_prefix = score_filename_prefix
self._polqa_tool_bin_path = polqa_tool_bin_path
def GetInstance(self, evaluation_score_class):
@ -29,11 +30,14 @@ class EvaluationScoreWorkerFactory(object):
Args:
evaluation_score_class: EvaluationScore class object (not an instance).
Returns:
An EvaluationScore instance.
"""
logging.debug(
'factory producing a %s evaluation score', evaluation_score_class)
if evaluation_score_class == eval_scores.PolqaScore:
return eval_scores.PolqaScore(self._polqa_tool_bin_path)
return eval_scores.PolqaScore(
self._score_filename_prefix, self._polqa_tool_bin_path)
else:
# By default, no arguments in the constructor.
return evaluation_score_class()
return evaluation_score_class(self._score_filename_prefix)

View File

@ -63,6 +63,7 @@ class TestEvalScores(unittest.TestCase):
# Instance evaluation score workers factory with fake dependencies.
eval_score_workers_factory = (
eval_scores_factory.EvaluationScoreWorkerFactory(
score_filename_prefix='scores-',
polqa_tool_bin_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'fake_polqa')))

View File

@ -20,3 +20,9 @@ class SignalProcessingException(Exception):
"""Signal processing exeception.
"""
pass
class InputMixerException(Exception):
"""Input mixer exeception.
"""
pass

View File

@ -6,55 +6,39 @@
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
import logging
import hashlib
import os
import re
class HtmlExport(object):
"""HTML exporter class for APM quality scores.
"""
# Path to CSS and JS files.
_PATH = os.path.dirname(os.path.realpath(__file__))
# CSS file parameters.
_CSS_FILEPATH = os.path.join(_PATH, 'results.css')
_INLINE_CSS = False
# JS file parameters.
_JS_FILEPATH = os.path.join(_PATH, 'results.js')
_INLINE_JS = False
"""HTML exporter class for APM quality scores."""
_NEW_LINE = '\n'
# CSS and JS file paths.
_PATH = os.path.dirname(os.path.realpath(__file__))
_CSS_FILEPATH = os.path.join(_PATH, 'results.css')
_JS_FILEPATH = os.path.join(_PATH, 'results.js')
def __init__(self, output_filepath):
self._test_data_generator_names = None
self._test_data_generator_params = None
self._scores_data_frame = None
self._output_filepath = output_filepath
def Export(self, scores):
"""Exports the scores into an HTML file.
def Export(self, scores_data_frame):
"""Exports scores into an HTML file.
Args:
scores: nested dictionary containing the scores.
scores_data_frame: DataFrame instance.
"""
# Generate one table for each evaluation score.
tables = []
for score_name in sorted(scores.keys()):
tables.append(self._BuildScoreTable(score_name, scores[score_name]))
# Create the html file.
html = (
'<html>' +
self._BuildHeader() +
'<body onload="initialize()">' +
'<h1>Results from {}</h1>'.format(self._output_filepath) +
self._NEW_LINE.join(tables) +
'</body>' +
'</html>')
self._Save(self._output_filepath, html)
self._scores_data_frame = scores_data_frame
html = ['<html>',
self._BuildHeader(),
'<body onload="initialize()">',
self._BuildBody(),
'</body>',
'</html>']
self._Save(self._output_filepath, self._NEW_LINE.join(html))
def _BuildHeader(self):
"""Builds the <head> section of the HTML file.
@ -67,197 +51,289 @@ class HtmlExport(object):
"""
html = ['<head>', '<title>Results</title>']
# Function to append the lines of a text file to html.
# Add Material Design hosted libs.
html.append('<link rel="stylesheet" href="http://fonts.googleapis.com/'
'css?family=Roboto:300,400,500,700" type="text/css">')
html.append('<link rel="stylesheet" href="https://fonts.googleapis.com/'
'icon?family=Material+Icons">')
html.append('<link rel="stylesheet" href="https://code.getmdl.io/1.3.0/'
'material.indigo-pink.min.css">')
html.append('<script defer src="https://code.getmdl.io/1.3.0/'
'material.min.js"></script>')
# Embed custom JavaScript and CSS files.
def EmbedFile(filepath):
with open(filepath) as f:
for l in f:
html.append(l.strip())
# CSS.
if self._INLINE_CSS:
# Embed.
html.append('<style>')
EmbedFile(self._CSS_FILEPATH)
html.append('</style>')
else:
# Link.
html.append('<link rel="stylesheet" type="text/css" '
'href="file://{}?">'.format(self._CSS_FILEPATH))
# Javascript.
if self._INLINE_JS:
# Embed.
html.append('<script>')
EmbedFile(self._JS_FILEPATH)
html.append('</script>')
else:
# Link.
html.append('<script src="file://{}?"></script>'.format(
self._JS_FILEPATH))
html.append(l.rstrip())
html.append('<script>')
EmbedFile(self._JS_FILEPATH)
html.append('</script>')
html.append('<style>')
EmbedFile(self._CSS_FILEPATH)
html.append('</style>')
html.append('</head>')
return self._NEW_LINE.join(html)
def _BuildScoreTable(self, score_name, scores):
"""Builds a table for a specific evaluation score (e.g., POLQA).
def _BuildBody(self):
"""Builds the content of the <body> section."""
score_names = self._scores_data_frame.eval_score_name.unique().tolist()
Args:
score_name: name of the score.
scores: nested dictionary of scores.
html = [
('<div class="mdl-layout mdl-js-layout mdl-layout--fixed-header '
'mdl-layout--fixed-tabs">'),
'<header class="mdl-layout__header">',
'<div class="mdl-layout__header-row">',
'<span class="mdl-layout-title">APM QA results ({})</span>'.format(
self._output_filepath),
'</div>',
]
Returns:
A string with <table>...</table> HTML.
"""
config_names = sorted(scores.keys())
input_names = sorted(scores[config_names[0]].keys())
rows = [self._BuildTableRow(
score_name, config_name, scores[config_name], input_names) for (
config_name) in config_names]
# Tab selectors.
html.append('<div class="mdl-layout__tab-bar mdl-js-ripple-effect">')
for tab_index, score_name in enumerate(score_names):
is_active = tab_index == 0
html.append('<a href="#score-tab-{}" class="mdl-layout__tab{}">'
'{}</a>'.format(tab_index,
' is-active' if is_active else '',
self._FormatName(score_name)))
html.append('</div>')
html = (
'<table celpadding="0" cellspacing="0">' +
'<thead><tr>{}</tr></thead>'.format(
self._BuildTableHeader(score_name, input_names)) +
'<tbody>' +
'<tr>' + '</tr><tr>'.join(rows) + '</tr>' +
'</tbody>' +
'</table>' + self._BuildLegend())
html.append('</header>')
html.append('<main class="mdl-layout__content">')
return html
# Tabs content.
for tab_index, score_name in enumerate(score_names):
html.append('<section class="mdl-layout__tab-panel{}" '
'id="score-tab-{}">'.format(
' is-active' if is_active else '', tab_index))
html.append('<div class="page-content">')
html.append(self._BuildScoreTab(score_name))
html.append('</div>')
html.append('</section>')
def _BuildTableHeader(self, score_name, input_names):
"""Builds the cells of a table header.
html.append('</main>')
html.append('</div>')
A table header starts with a cell containing the name of the evaluation
score, and then it includes one column for each probing signal.
return self._NEW_LINE.join(html)
Args:
score_name: name of the score.
input_names: list of probing signal names.
def _BuildScoreTab(self, score_name):
"""Builds the content of a tab."""
# Find unique values.
scores = self._scores_data_frame[
self._scores_data_frame.eval_score_name == score_name]
apm_configs = sorted(self._FindUniqueTuples(scores, ['apm_config']))
test_data_gen_configs = sorted(self._FindUniqueTuples(
scores, ['test_data_gen', 'test_data_gen_params']))
Returns:
A string with a list of <th>...</th> HTML elements.
"""
html = (
'<th>{}</th>'.format(self._FormatName(score_name)) +
'<th>' + '</th><th>'.join(
[self._FormatName(name) for name in input_names]) + '</th>')
return html
html = [
'<div class="mdl-grid">',
'<div class="mdl-layout-spacer"></div>',
'<div class="mdl-cell mdl-cell--10-col">',
('<table class="mdl-data-table mdl-js-data-table mdl-shadow--2dp" '
'style="width: 100%;">'),
]
def _BuildTableRow(self, score_name, config_name, scores, input_names):
"""Builds the cells of a table row.
# Header.
html.append('<thead><tr><th>APM config / Test data generator</th>')
for test_data_gen_info in test_data_gen_configs:
html.append('<th>{} {}</th>'.format(
self._FormatName(test_data_gen_info[0]), test_data_gen_info[1]))
html.append('</tr></thead>')
A table row starts with the name of the APM configuration file, and then it
includes one column for each probing singal.
# Body.
html.append('<tbody>')
for apm_config in apm_configs:
html.append('<tr><td>' + self._FormatName(apm_config[0]) + '</td>')
for test_data_gen_info in test_data_gen_configs:
onclick_handler = 'openScoreStatsInspector(\'{}\')'.format(
self._ScoreStatsInspectorDialogId(score_name, apm_config[0],
test_data_gen_info[0],
test_data_gen_info[1]))
html.append('<td onclick="{}">{}</td>'.format(
onclick_handler, self._BuildScoreTableCell(
score_name, test_data_gen_info[0], test_data_gen_info[1],
apm_config[0])))
html.append('</tr>')
html.append('</tbody>')
Args:
score_name: name of the score.
config_name: name of the APM configuration.
scores: nested dictionary of scores.
input_names: list of probing signal names.
html.append('</table></div><div class="mdl-layout-spacer"></div></div>')
Returns:
A string with a list of <td>...</td> HTML elements.
"""
cells = [self._BuildTableCell(
scores[input_name], score_name, config_name, input_name) for (
input_name) in input_names]
html = ('<td>{}</td>'.format(self._FormatName(config_name)) +
'<td>' + '</td><td>'.join(cells) + '</td>')
return html
html.append(self._BuildScoreStatsInspectorDialogs(
score_name, apm_configs, test_data_gen_configs))
def _BuildTableCell(self, scores, score_name, config_name, input_name):
"""Builds the inner content of a table cell.
return self._NEW_LINE.join(html)
A table cell includes all the scores computed for a specific evaluation
score (e.g., POLQA), APM configuration (e.g., default), and probing signal.
def _BuildScoreTableCell(self, score_name, test_data_gen,
test_data_gen_params, apm_config):
"""Builds the content of a table cell for a score table."""
scores = self._SliceDataForScoreTableCell(
score_name, apm_config, test_data_gen, test_data_gen_params)
stats = self._ComputeScoreStats(scores)
Args:
scores: dictionary of score data.
score_name: name of the score.
config_name: name of the APM configuration.
input_name: name of the probing signal.
html = []
items_id_prefix = (
score_name + test_data_gen + test_data_gen_params + apm_config)
if stats['count'] == 1:
# Show the only available score.
item_id = hashlib.md5(items_id_prefix).hexdigest()
html.append('<div id="single-value-{0}">{1:f}</div>'.format(
item_id, scores['score'].mean()))
html.append('<div class="mdl-tooltip" data-mdl-for="single-value-{}">{}'
'</div>'.format(item_id, 'single value'))
else:
# Show stats.
for stat_name in ['min', 'max', 'mean', 'std dev']:
item_id = hashlib.md5(items_id_prefix + stat_name).hexdigest()
html.append('<div id="stats-{0}">{1:f}</div>'.format(
item_id, stats[stat_name]))
html.append('<div class="mdl-tooltip" data-mdl-for="stats-{}">{}'
'</div>'.format(item_id, stat_name))
Returns:
A string with the HTML of a table body cell.
"""
# Init test data generator names and parameters cache (if not done).
if self._test_data_generator_names is None:
self._test_data_generator_names = sorted(scores.keys())
self._test_data_generator_params = {test_data_generator_name: sorted(
scores[test_data_generator_name].keys()) for (
test_data_generator_name) in self._test_data_generator_names}
return self._NEW_LINE.join(html)
# For each noisy input (that is a pair of test data generator and
# generator parameters), add an item with the score and its metadata.
items = []
for name_index, test_data_generator_name in enumerate(
self._test_data_generator_names):
for params_index, test_data_generator_params in enumerate(
self._test_data_generator_params[test_data_generator_name]):
def _BuildScoreStatsInspectorDialogs(
self, score_name, apm_configs, test_data_gen_configs):
"""Builds a set of score stats inspector dialogs."""
html = []
for apm_config in apm_configs:
for test_data_gen_info in test_data_gen_configs:
dialog_id = self._ScoreStatsInspectorDialogId(
score_name, apm_config[0],
test_data_gen_info[0], test_data_gen_info[1])
# Init.
score_value = '?'
metadata = ''
html.append('<dialog class="mdl-dialog" id="{}" '
'style="width: 40%;">'.format(dialog_id))
# Extract score value and its metadata.
try:
data = scores[test_data_generator_name][test_data_generator_params]
score_value = '{0:f}'.format(data['score'])
metadata = (
'<input type="hidden" name="gen_name" value="{}"/>'
'<input type="hidden" name="gen_params" value="{}"/>'
'<input type="hidden" name="audio_in" value="file://{}"/>'
'<input type="hidden" name="audio_out" value="file://{}"/>'
'<input type="hidden" name="audio_ref" value="file://{}"/>'
).format(
test_data_generator_name,
test_data_generator_params,
data['audio_in_filepath'],
data['audio_out_filepath'],
data['audio_ref_filepath'])
except TypeError:
logging.warning(
'missing score found: <score:%s> <config:%s> <input:%s> '
'<generator:%s> <params:%s>', score_name, config_name, input_name,
test_data_generator_name, test_data_generator_params)
# Content.
html.append('<div class="mdl-dialog__content">')
html.append('<h6><strong>APM config preset</strong>: {}<br/>'
'<strong>Test data generator</strong>: {} ({})</h6>'.format(
self._FormatName(apm_config[0]),
self._FormatName(test_data_gen_info[0]),
test_data_gen_info[1]))
html.append(self._BuildScoreStatsInspectorDialog(
score_name, apm_config[0], test_data_gen_info[0],
test_data_gen_info[1]))
html.append('</div>')
# Add the score.
items.append(
'<div class="test-data-gen-desc">[{0:d}, {1:d}]{2}</div>'
'<div class="value">{3}</div>'.format(
name_index, params_index, metadata, score_value))
# Actions.
html.append('<div class="mdl-dialog__actions">')
html.append('<button type="button" class="mdl-button" '
'onclick="closeScoreStatsInspector(\'' + dialog_id + '\')">'
'Close</button>')
html.append('</div>')
html = (
'<div class="score">' +
'</div><div class="score">'.join(items) +
'</div>')
html.append('</dialog>')
return html
return self._NEW_LINE.join(html)
def _BuildLegend(self):
"""Builds the legend.
def _BuildScoreStatsInspectorDialog(
self, score_name, apm_config, test_data_gen, test_data_gen_params):
"""Builds one score stats inspector dialog."""
scores = self._SliceDataForScoreTableCell(
score_name, apm_config, test_data_gen, test_data_gen_params)
The legend details test data generator name and parameter pairs.
capture_render_pairs = sorted(self._FindUniqueTuples(
scores, ['capture', 'render']))
echo_simulators = sorted(self._FindUniqueTuples(scores, ['echo_simulator']))
Returns:
A string with a <div class="legend">...</div> HTML element.
"""
items = []
for name_index, test_data_generator_name in enumerate(
self._test_data_generator_names):
for params_index, test_data_generator_params in enumerate(
self._test_data_generator_params[test_data_generator_name]):
items.append(
'<div class="test-data-gen-desc">[{0:d}, {1:d}]</div>: {2}, '
'{3}'.format(name_index, params_index, test_data_generator_name,
test_data_generator_params))
html = (
'<div class="legend"><div>' +
'</div><div>'.join(items) + '</div></div>')
html = ['<table class="mdl-data-table mdl-js-data-table mdl-shadow--2dp">']
return html
# Header.
html.append('<thead><tr><th>Capture-Render / Echo simulator</th>')
for echo_simulator in echo_simulators:
html.append('<th>' + self._FormatName(echo_simulator[0]) +'</th>')
html.append('</tr></thead>')
# Body.
html.append('<tbody>')
for capture, render in capture_render_pairs:
html.append('<tr><td><div>{}</div><div>{}</div></td>'.format(
capture, render))
for echo_simulator in echo_simulators:
score_tuple = self._SliceDataForScoreStatsTableCell(
scores, capture, render, echo_simulator[0])
html.append('<td class="single-score-cell">{}</td>'.format(
self._BuildScoreStatsInspectorTableCell(score_tuple)))
html.append('</tr>')
html.append('</tbody>')
html.append('</table>')
# Placeholder for the audio inspector.
html.append('<div class="audio-inspector-placeholder"></div>')
return self._NEW_LINE.join(html)
def _BuildScoreStatsInspectorTableCell(self, score_tuple):
"""Builds the content of a cell of a score stats inspector."""
html = ['<div>{}</div>'.format(score_tuple.score)]
# Add all the available file paths as hidden data.
for field_name in score_tuple.keys():
if field_name.endswith('_filepath'):
html.append('<input type="hidden" name="{}" value="{}">'.format(
field_name, score_tuple[field_name]))
return self._NEW_LINE.join(html)
def _SliceDataForScoreTableCell(
self, score_name, apm_config, test_data_gen, test_data_gen_params):
"""Slices |self._scores_data_frame| to extract the data for a tab."""
masks = []
masks.append(self._scores_data_frame.eval_score_name == score_name)
masks.append(self._scores_data_frame.apm_config == apm_config)
masks.append(self._scores_data_frame.test_data_gen == test_data_gen)
masks.append(
self._scores_data_frame.test_data_gen_params == test_data_gen_params)
mask = reduce((lambda i1, i2: i1 & i2), masks)
del masks
return self._scores_data_frame[mask]
@classmethod
def _SliceDataForScoreStatsTableCell(
cls, scores, capture, render, echo_simulator):
"""Slices |scores| to extract the data for a tab."""
masks = []
masks.append(scores.capture == capture)
masks.append(scores.render == render)
masks.append(scores.echo_simulator == echo_simulator)
mask = reduce((lambda i1, i2: i1 & i2), masks)
del masks
sliced_data = scores[mask]
assert len(sliced_data) == 1, 'single score is expected'
return sliced_data.iloc[0]
@classmethod
def _FindUniqueTuples(cls, data_frame, fields):
"""Slices |data_frame| to a list of fields and finds unique tuples."""
return data_frame[fields].drop_duplicates().values.tolist()
@classmethod
def _ComputeScoreStats(cls, data_frame):
"""Computes score stats."""
scores = data_frame['score']
return {
'count': scores.count(),
'min': scores.min(),
'max': scores.max(),
'mean': scores.mean(),
'std dev': scores.std(),
}
@classmethod
def _ScoreStatsInspectorDialogId(cls, score_name, apm_config, test_data_gen,
test_data_gen_params):
"""Assigns a unique name to a dialog."""
return 'score-stats-dialog-' + hashlib.md5(
'score-stats-inspector-{}-{}-{}-{}'.format(
score_name, apm_config, test_data_gen,
test_data_gen_params)).hexdigest()
@classmethod
def _Save(cls, output_filepath, html):

View File

@ -0,0 +1,93 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Input mixer module.
"""
import logging
import os
from . import exceptions
from . import signal_processing
class ApmInputMixer(object):
"""Class to mix a set of audio segments down to the APM input."""
_HARD_CLIPPING_LOG_MSG = 'hard clipping detected in the mixed signal'
def __init__(self):
pass
@classmethod
def HardClippingLogMessage(cls):
"""Returns the log message used when hard clipping is detected in the mix.
This method is mainly intended to be used by the unit tests.
"""
return cls._HARD_CLIPPING_LOG_MSG
@classmethod
def Mix(cls, output_path, capture_input_filepath, echo_filepath):
"""Mixes capture and echo.
Creates the overall capture input for APM by mixing the "echo-free" capture
signal with the echo signal (e.g., echo simulated via the
echo_path_simulation module).
The echo signal cannot be shorter than the capture signal and the generated
mix will have the same duration of the capture signal. The latter property
is enforced in order to let the input of APM and the reference signal
created by TestDataGenerator have the same length (required for the
evaluation step).
Hard-clipping may occur in the mix; a warning is raised when this happens.
If |echo_filepath| is None, nothing is done and |capture_input_filepath| is
returned.
Args:
speech: AudioSegment instance.
echo_path: AudioSegment instance or None.
Returns:
Path to the mix audio track file.
"""
if echo_filepath is None:
return capture_input_filepath
# Build the mix output file name as a function of the echo file name.
# This ensures that if the internal parameters of the echo path simulator
# change, no erroneous cache hit occurs.
echo_file_name, _ = os.path.splitext(os.path.split(echo_filepath)[1])
mix_filepath = os.path.join(output_path, 'mix_capture_{}.wav'.format(
echo_file_name))
# Create the mix if not done yet.
mix = None
if not os.path.exists(mix_filepath):
echo_free_capture = signal_processing.SignalProcessingUtils.LoadWav(
capture_input_filepath)
echo = signal_processing.SignalProcessingUtils.LoadWav(echo_filepath)
if signal_processing.SignalProcessingUtils.CountSamples(echo) < (
signal_processing.SignalProcessingUtils.CountSamples(
echo_free_capture)):
raise exceptions.InputMixerException(
'echo cannot be shorter than capture')
mix = echo_free_capture.overlay(echo)
signal_processing.SignalProcessingUtils.SaveWav(mix_filepath, mix)
# Check if hard clipping occurs.
if mix is None:
mix = signal_processing.SignalProcessingUtils.LoadWav(mix_filepath)
if signal_processing.SignalProcessingUtils.DetectHardClipping(mix):
logging.warning(cls._HARD_CLIPPING_LOG_MSG)
return mix_filepath

View File

@ -0,0 +1,149 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Unit tests for the input mixer module.
"""
import logging
import os
import shutil
import sys
import tempfile
import unittest
SRC = os.path.abspath(os.path.join(
os.path.dirname((__file__)), os.pardir, os.pardir, os.pardir, os.pardir))
sys.path.append(os.path.join(SRC, 'third_party', 'pymock'))
import mock
from . import exceptions
from . import input_mixer
from . import signal_processing
class TestApmInputMixer(unittest.TestCase):
"""Unit tests for the ApmInputMixer class.
"""
# Audio track file names created in setUp().
_FILENAMES = ['capture', 'echo_1', 'echo_2', 'shorter', 'longer']
# Target peak power level (dBFS) of each audio track file created in setUp().
# These values are hand-crafted in order to make saturation happen when
# capture and echo_2 are mixed and the contrary for capture and echo_1.
# None means that the power is not changed.
_MAX_PEAK_POWER_LEVELS = [-10.0, -5.0, 0.0, None, None]
# Audio track file durations in milliseconds.
_DURATIONS = [1000, 1000, 1000, 800, 1200]
_SAMPLE_RATE = 48000
def setUp(self):
"""Creates temporary data."""
self._tmp_path = tempfile.mkdtemp()
# Create audio track files.
self._audio_tracks = {}
for filename, peak_power, duration in zip(
self._FILENAMES, self._MAX_PEAK_POWER_LEVELS, self._DURATIONS):
audio_track_filepath = os.path.join(self._tmp_path, '{}.wav'.format(
filename))
# Create a pure tone with the target peak power level.
template = signal_processing.SignalProcessingUtils.GenerateSilence(
duration=duration, sample_rate=self._SAMPLE_RATE)
signal = signal_processing.SignalProcessingUtils.GeneratePureTone(
template)
if peak_power is not None:
signal = signal.apply_gain(-signal.max_dBFS + peak_power)
signal_processing.SignalProcessingUtils.SaveWav(
audio_track_filepath, signal)
self._audio_tracks[filename] = {
'filepath': audio_track_filepath,
'num_samples': signal_processing.SignalProcessingUtils.CountSamples(
signal)
}
def tearDown(self):
"""Recursively deletes temporary folders."""
shutil.rmtree(self._tmp_path)
def testCheckMixSameDuration(self):
"""Checks the duration when mixing capture and echo with same duration."""
mix_filepath = input_mixer.ApmInputMixer.Mix(
self._tmp_path,
self._audio_tracks['capture']['filepath'],
self._audio_tracks['echo_1']['filepath'])
self.assertTrue(os.path.exists(mix_filepath))
mix = signal_processing.SignalProcessingUtils.LoadWav(mix_filepath)
self.assertEqual(self._audio_tracks['capture']['num_samples'],
signal_processing.SignalProcessingUtils.CountSamples(mix))
def testRejectShorterEcho(self):
"""Rejects echo signals that are shorter than the capture signal."""
try:
_ = input_mixer.ApmInputMixer.Mix(
self._tmp_path,
self._audio_tracks['capture']['filepath'],
self._audio_tracks['shorter']['filepath'])
self.fail('no exception raised')
except exceptions.InputMixerException:
pass
def testCheckMixDurationWithLongerEcho(self):
"""Checks the duration when mixing an echo longer than the capture."""
mix_filepath = input_mixer.ApmInputMixer.Mix(
self._tmp_path,
self._audio_tracks['capture']['filepath'],
self._audio_tracks['longer']['filepath'])
self.assertTrue(os.path.exists(mix_filepath))
mix = signal_processing.SignalProcessingUtils.LoadWav(mix_filepath)
self.assertEqual(self._audio_tracks['capture']['num_samples'],
signal_processing.SignalProcessingUtils.CountSamples(mix))
def testCheckOutputFileNamesConflict(self):
"""Checks that different echo files lead to different output file names."""
mix1_filepath = input_mixer.ApmInputMixer.Mix(
self._tmp_path,
self._audio_tracks['capture']['filepath'],
self._audio_tracks['echo_1']['filepath'])
self.assertTrue(os.path.exists(mix1_filepath))
mix2_filepath = input_mixer.ApmInputMixer.Mix(
self._tmp_path,
self._audio_tracks['capture']['filepath'],
self._audio_tracks['echo_2']['filepath'])
self.assertTrue(os.path.exists(mix2_filepath))
self.assertNotEqual(mix1_filepath, mix2_filepath)
def testHardClippingLogExpected(self):
"""Checks that hard clipping warning is raised when occurring."""
logging.warning = mock.MagicMock(name='warning')
_ = input_mixer.ApmInputMixer.Mix(
self._tmp_path,
self._audio_tracks['capture']['filepath'],
self._audio_tracks['echo_2']['filepath'])
logging.warning.assert_called_once_with(
input_mixer.ApmInputMixer.HardClippingLogMessage())
def testHardClippingLogNotExpected(self):
"""Checks that hard clipping warning is not raised when not occurring."""
logging.warning = mock.MagicMock(name='warning')
_ = input_mixer.ApmInputMixer.Mix(
self._tmp_path,
self._audio_tracks['capture']['filepath'],
self._audio_tracks['echo_1']['filepath'])
self.assertNotIn(
mock.call(input_mixer.ApmInputMixer.HardClippingLogMessage()),
logging.warning.call_args_list)

View File

@ -7,84 +7,22 @@
* be found in the AUTHORS file in the root of the source tree.
*/
body{
font-family: Arial;
font-size: 75%;
td.selected-score {
background-color: #DDD;
}
table{
border-top: 1px solid #000;
border-right: 1px solid #000;
margin: 1em 0 0.2em 0;
}
table thead tr th{
font-size: 0.9em;
.audio-inspector {
text-align: center;
border-bottom: 1px solid #000;
border-left: 1px solid #000;
min-width: 8em;
}
table thead tr th:first-child{
text-transform: uppercase;
.audio-inspector div{
margin-bottom: 0;
padding-bottom: 0;
padding-top: 0;
}
table tbody tr td{
font-size: 0.8em;
text-align: center;
border-bottom: 1px solid #000;
border-left: 1px solid #000;
}
table tbody tr td:first-child{
font-weight: bold;
}
table tbody tr td .selected{
background-color: #EE9;
}
table tbody tr td .value{
display: inline-block;
}
.test-data-gen-desc{
display: inline-block;
margin-right: 0.3em;
border: 1px solid #555;
color: #555;
background-color: #EEE;
padding: 1px;
font-size: 0.8em;
}
.inspector{
background-color: #FFF;
border: 3px solid #000;
display: block;
padding: 0.5em;
position: fixed;
right: 1em;
top: 1em;
}
.inspector .property{
margin-bottom: 1em;
}
.inspector .property .name{
font-weight: bold;
}
.inspector .property .value{
padding-left: 0.5em;
}
.inspector .buttons{
margin-top: 1em;
}
.inspector .buttons button{
margin: 0 0.25em;
.audio-inspector div div{
margin-bottom: 0;
padding-bottom: 0;
padding-top: 0;
}

View File

@ -6,59 +6,229 @@
// in the file PATENTS. All contributing project authors may
// be found in the AUTHORS file in the root of the source tree.
var inspector = null;
/**
* Inspector UI class.
* Opens the score stats inspector dialog.
* @param {String} dialogId: identifier of the dialog to show.
*/
function openScoreStatsInspector(dialogId) {
var dialog = document.getElementById(dialogId);
dialog.showModal();
}
/**
* Closes the score stats inspector dialog.
* @param {String} dialogId: identifier of the dialog to close.
*/
function closeScoreStatsInspector(dialogId) {
var dialog = document.getElementById(dialogId);
dialog.close();
if (inspector != null) {
inspector.stopAudio();
}
}
/**
* Instance and initialize the audio inspector.
*/
function initialize() {
inspector = new AudioInspector();
inspector.init();
}
/**
* Audio inspector class.
* @constructor
*/
function Inspector() {
function AudioInspector() {
this.audioPlayer_ = new Audio();
this.inspectorNode_ = document.createElement('div');
this.divTestDataGeneratorName_ = document.createElement('div');
this.divTestDataGenParameters_ = document.createElement('div');
this.buttonPlayAudioIn_ = document.createElement('button');
this.buttonPlayAudioOut_ = document.createElement('button');
this.buttonPlayAudioRef_ = document.createElement('button');
this.buttonStopAudio_ = document.createElement('button');
this.selectedItem_ = null;
this.audioInUrl_ = null;
this.audioOutUrl_ = null;
this.audioRefUrl_ = null;
this.metadata_ = {};
this.currentScore_ = null;
this.audioInspector_ = null;
}
/**
* Initialize.
*/
Inspector.prototype.init = function() {
AudioInspector.prototype.init = function() {
window.event.stopPropagation();
this.createAudioInspector_();
this.initializeEventHandlers_();
};
// Create inspector UI.
this.buildInspector_();
var body = document.getElementsByTagName('body')[0];
body.appendChild(this.inspectorNode_);
/**
* Set up the inspector for a new score.
* @param {DOMElement} element: Element linked to the selected score.
*/
AudioInspector.prototype.selectedScoreChange = function(element) {
if (this.currentScore_ == element) { return; }
if (this.currentScore_ != null) {
this.currentScore_.classList.remove('selected-score');
}
this.currentScore_ = element;
this.currentScore_.classList.add('selected-score');
this.stopAudio();
// Bind click handler.
var self = this;
var items = document.getElementsByClassName('score');
for (var index = 0; index < items.length; index++) {
items[index].onclick = function() {
self.openInspector(this);
};
// Read metadata.
var matches = element.querySelectorAll('input[type=hidden]');
this.metadata_ = {};
for (var index = 0; index < matches.length; ++index) {
this.metadata_[matches[index].name] = matches[index].value;
}
// Bind pressed key handlers.
// Show the audio inspector interface.
var container = element.parentNode.parentNode.parentNode.parentNode;
var audioInspectorPlaceholder = container.querySelector(
'.audio-inspector-placeholder');
this.moveInspector_(audioInspectorPlaceholder);
};
/**
* Stop playing audio.
*/
AudioInspector.prototype.stopAudio = function() {
this.audioPlayer_.pause();
};
/**
* Move the audio inspector DOM node into the given parent.
* @param {DOMElement} newParentNode: New parent for the inspector.
*/
AudioInspector.prototype.moveInspector_ = function(newParentNode) {
newParentNode.appendChild(this.audioInspector_);
};
/**
* Play audio file from url.
* @param {string} metadataFieldName: Metadata field name.
*/
AudioInspector.prototype.playAudio = function(metadataFieldName) {
if (this.metadata_[metadataFieldName] == undefined) { return; }
if (this.metadata_[metadataFieldName] == 'None') {
alert('The selected stream was not used during the experiment.');
return;
}
this.stopAudio();
this.audioPlayer_.src = this.metadata_[metadataFieldName];
this.audioPlayer_.play();
};
/**
* Initialize event handlers.
*/
AudioInspector.prototype.createAudioInspector_ = function() {
var buttonIndex = 0;
function getButtonHtml(icon, toolTipText, caption, metadataFieldName) {
var buttonId = 'audioInspectorButton' + buttonIndex++;
html = caption == null ? '' : caption;
html += '<button class="mdl-button mdl-js-button mdl-button--icon ' +
'mdl-js-ripple-effect" id="' + buttonId + '">' +
'<i class="material-icons">' + icon + '</i>' +
'<div class="mdl-tooltip" data-mdl-for="' + buttonId + '">' +
toolTipText +
'</div>';
if (metadataFieldName != null) {
html += '<input type="hidden" value="' + metadataFieldName + '">'
}
html += '</button>'
return html;
}
this.audioInspector_ = document.createElement('div');
this.audioInspector_.classList.add('audio-inspector');
this.audioInspector_.innerHTML =
'<div class="mdl-grid">' +
'<div class="mdl-layout-spacer"></div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('play_arrow', 'Simulated echo', 'E<sub>in</sub>',
'echo_filepath') +
'</div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('stop', 'Stop playing [S]', null, '__stop__') +
'</div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('play_arrow', 'Render stream', 'R<sub>in</sub>',
'render_filepath') +
'</div>' +
'<div class="mdl-layout-spacer"></div>' +
'</div>' +
'<div class="mdl-grid">' +
'<div class="mdl-layout-spacer"></div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('play_arrow', 'Capture stream (APM input) [1]',
'Y\'<sub>in</sub>', 'capture_filepath') +
'</div>' +
'<div class="mdl-cell mdl-cell--2-col"><strong>APM</strong></div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('play_arrow', 'APM output [2]', 'Y<sub>out</sub>',
'apm_output_filepath') +
'</div>' +
'<div class="mdl-layout-spacer"></div>' +
'</div>' +
'<div class="mdl-grid">' +
'<div class="mdl-layout-spacer"></div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('play_arrow', 'Echo-free capture stream',
'Y<sub>in</sub>', 'echo_free_capture_filepath') +
'</div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('play_arrow', 'Clean capture stream',
'Y<sub>clean</sub>', 'clean_capture_input_filepath') +
'</div>' +
'<div class="mdl-cell mdl-cell--2-col">' +
getButtonHtml('play_arrow', 'APM reference [3]', 'Y<sub>ref</sub>',
'apm_reference_filepath') +
'</div>' +
'<div class="mdl-layout-spacer"></div>' +
'</div>';
// Add an invisible node as initial container for the audio inspector.
var parent = document.createElement('div');
parent.style.display = 'none';
this.moveInspector_(parent);
document.body.appendChild(parent);
};
/**
* Initialize event handlers.
*/
AudioInspector.prototype.initializeEventHandlers_ = function() {
var self = this;
// Score cells.
document.querySelectorAll('td.single-score-cell').forEach(function(element) {
element.onclick = function() {
self.selectedScoreChange(this);
}
});
// Audio inspector buttons.
this.audioInspector_.querySelectorAll('button').forEach(function(element) {
var target = element.querySelector('input[type=hidden]');
if (target == null) { return; }
element.onclick = function() {
if (target.value == '__stop__') {
self.stopAudio();
} else {
self.playAudio(target.value);
}
};
});
// Keyboard shortcuts.
window.onkeyup = function(e) {
var key = e.keyCode ? e.keyCode : e.which;
switch (key) {
case 49: // 1.
self.playAudioIn();
self.playAudio('capture_filepath');
break;
case 50: // 2.
self.playAudioOut();
self.playAudio('apm_output_filepath');
break;
case 51: // 3.
self.playAudioRef();
self.playAudio('apm_reference_filepath');
break;
case 83: // S.
case 115: // s.
@ -67,121 +237,3 @@ Inspector.prototype.init = function() {
}
};
};
/**
* Open the inspector.
* @param {DOMElement} target: score element that has been clicked.
*/
Inspector.prototype.openInspector = function(target) {
if (this.selectedItem_ != null) {
this.selectedItem_.classList.remove('selected');
}
this.selectedItem_ = target;
this.selectedItem_.classList.add('selected');
var target = this.selectedItem_.querySelector('.test-data-gen-desc');
var testDataGenName = target.querySelector('input[name=gen_name]').value;
var testDataGenParams = target.querySelector('input[name=gen_params]').value;
var audioIn = target.querySelector('input[name=audio_in]').value;
var audioOut = target.querySelector('input[name=audio_out]').value;
var audioRef = target.querySelector('input[name=audio_ref]').value;
this.divTestDataGeneratorName_.innerHTML = testDataGenName;
this.divTestDataGenParameters_.innerHTML = testDataGenParams;
this.audioInUrl_ = audioIn;
this.audioOutUrl_ = audioOut;
this.audioRefUrl_ = audioRef;
};
/**
* Play APM audio input signal.
*/
Inspector.prototype.playAudioIn = function() {
this.play_(this.audioInUrl_);
};
/**
* Play APM audio output signal.
*/
Inspector.prototype.playAudioOut = function() {
this.play_(this.audioOutUrl_);
};
/**
* Play APM audio reference signal.
*/
Inspector.prototype.playAudioRef = function() {
this.play_(this.audioRefUrl_);
};
/**
* Stop playing audio.
*/
Inspector.prototype.stopAudio = function() {
this.audioPlayer_.pause();
};
/**
* Play audio file from url.
* @param {string} url
*/
Inspector.prototype.play_ = function(url) {
if (url == null) {
alert('Select a score first.');
return;
}
this.audioPlayer_.src = url;
this.audioPlayer_.play();
};
/**
* Build inspector.
*/
Inspector.prototype.buildInspector_ = function() {
var self = this;
this.inspectorNode_.setAttribute('class', 'inspector');
this.inspectorNode_.innerHTML =
'<div class="property test-data-gen-name">' +
'<div class="name">test data generator</div>' +
'</div>' +
'<div class="property test-data-gen-parmas">' +
'<div class="name">parameters</div>' +
'</div>' +
'<div class="buttons"></div>';
// Add value nodes.
function addValueNode(node, parent_selector) {
node.setAttribute('class', 'value');
node.innerHTML = '-';
var parentNode = self.inspectorNode_.querySelector(parent_selector);
parentNode.appendChild(node);
}
addValueNode(this.divTestDataGeneratorName_, 'div.test-data-gen-name');
addValueNode(this.divTestDataGenParameters_, 'div.test-data-gen-parmas');
// Add buttons.
var buttonsNode = this.inspectorNode_.querySelector('div.buttons');
function addButton(node, caption, callback) {
node.innerHTML = caption;
buttonsNode.appendChild(node);
node.onclick = callback.bind(self);
}
addButton(this.buttonPlayAudioIn_, 'A_in (<strong>1</strong>)',
this.playAudioIn);
addButton(this.buttonPlayAudioOut_, 'A_out (<strong>2</strong>)',
this.playAudioOut);
addButton(this.buttonPlayAudioRef_, 'A_ref (<strong>3</strong>)',
this.playAudioRef);
addButton(this.buttonStopAudio_, '<strong>S</strong>top', this.stopAudio);
};
/**
* Instance and initialize the inspector.
*/
function initialize() {
var inspector = new Inspector();
inspector.init();
}

View File

@ -86,25 +86,113 @@ class SignalProcessingUtils(object):
return number_of_samples / signal.channels
@classmethod
def GenerateWhiteNoise(cls, signal):
"""Generates white noise.
def GenerateSilence(cls, duration=1000, sample_rate=48000):
"""Generates silence.
White noise is generated with the same duration and in the same format as a
given signal.
This method can also be used to create a template AudioSegment instance.
A template can then be used with other Generate*() methods accepting an
AudioSegment instance as argument.
Args:
signal: AudioSegment instance.
duration: duration in ms.
sample_rate: sample rate.
Returns:
AudioSegment instance.
"""
return pydub.AudioSegment.silent(duration, sample_rate)
@classmethod
def GeneratePureTone(cls, template, frequency=440.0):
"""Generates a pure tone.
The pure tone is generated with the same duration and in the same format of
the given template signal.
Args:
template: AudioSegment instance.
frequency: Frequency of the pure tone in Hz.
Return:
AudioSegment instance.
"""
if frequency > template.frame_rate >> 1:
raise exceptions.SignalProcessingException('Invalid frequency')
generator = pydub.generators.Sine(
sample_rate=template.frame_rate,
bit_depth=template.sample_width * 8,
freq=frequency)
return generator.to_audio_segment(
duration=len(template),
volume=0.0)
@classmethod
def GenerateWhiteNoise(cls, template):
"""Generates white noise.
The white noise is generated with the same duration and in the same format
of the given template signal.
Args:
template: AudioSegment instance.
Return:
AudioSegment instance.
"""
generator = pydub.generators.WhiteNoise(
sample_rate=signal.frame_rate,
bit_depth=signal.sample_width * 8)
sample_rate=template.frame_rate,
bit_depth=template.sample_width * 8)
return generator.to_audio_segment(
duration=len(signal),
duration=len(template),
volume=0.0)
@classmethod
def DetectHardClipping(cls, signal, threshold=2):
"""Detects hard clipping.
Hard clipping is simply detected by counting samples that touch either the
lower or upper bound too many times in a row (according to |threshold|).
The presence of a single sequence of samples meeting such property is enough
to label the signal as hard clipped.
Args:
signal: AudioSegment instance.
threshold: minimum number of samples at full-scale in a row.
Returns:
True if hard clipping is detect, False otherwise.
"""
if signal.channels != 1:
raise NotImplementedError('mutliple-channel clipping not implemented')
if signal.sample_width != 2: # Note that signal.sample_width is in bytes.
raise exceptions.SignalProcessingException(
'hard-clipping detection only supported for 16 bit samples')
# Get raw samples, check type, cast.
samples = signal.get_array_of_samples()
if samples.typecode != 'h':
raise exceptions.SignalProcessingException(
'hard-clipping detection only supported for 16 bit samples')
samples = np.array(signal.get_array_of_samples(), np.int16)
# Detect adjacent clipped samples.
samples_type_info = np.iinfo(samples.dtype)
mask_min = samples == samples_type_info.min
mask_max = samples == samples_type_info.max
def HasLongSequence(vector, min_legth=threshold):
"""Returns True if there are one or more long sequences of True flags."""
seq_length = 0
for b in vector:
seq_length = seq_length + 1 if b else 0
if seq_length >= min_legth:
return True
return False
return HasLongSequence(mask_min) or HasLongSequence(mask_max)
@classmethod
def ApplyImpulseResponse(cls, signal, impulse_response):
"""Applies an impulse response to a signal.

View File

@ -13,20 +13,31 @@ import logging
import os
from . import data_access
from . import echo_path_simulation
from . import echo_path_simulation_factory
from . import eval_scores
from . import eval_scores_factory
from . import input_mixer
from . import test_data_generation
from . import test_data_generation_factory
class ApmModuleSimulator(object):
"""APM module simulator class.
"""Audio processing module (APM) simulator class.
"""
_TEST_DATA_GENERATOR_CLASSES = (
test_data_generation.TestDataGenerator.REGISTERED_CLASSES)
_EVAL_SCORE_WORKER_CLASSES = eval_scores.EvaluationScore.REGISTERED_CLASSES
_PREFIX_APM_CONFIG = 'apmcfg-'
_PREFIX_CAPTURE = 'capture-'
_PREFIX_RENDER = 'render-'
_PREFIX_ECHO_SIMULATOR = 'echosim-'
_PREFIX_TEST_DATA_GEN = 'datagen-'
_PREFIX_TEST_DATA_GEN_PARAMS = 'datagen_params-'
_PREFIX_SCORE = 'score-'
def __init__(self, aechen_ir_database_path, polqa_tool_bin_path,
ap_wrapper, evaluator):
# Init.
@ -36,9 +47,11 @@ class ApmModuleSimulator(object):
# Instance factory objects.
self._test_data_generator_factory = (
test_data_generation_factory.TestDataGeneratorFactory(
output_directory_prefix=self._PREFIX_TEST_DATA_GEN_PARAMS,
aechen_ir_database_path=aechen_ir_database_path))
self._evaluation_score_factory = (
eval_scores_factory.EvaluationScoreWorkerFactory(
score_filename_prefix=self._PREFIX_SCORE,
polqa_tool_bin_path=polqa_tool_bin_path))
# Properties for each run.
@ -46,21 +59,65 @@ class ApmModuleSimulator(object):
self._test_data_generators = None
self._evaluation_score_workers = None
self._config_filepaths = None
self._input_filepaths = None
self._capture_input_filepaths = None
self._render_input_filepaths = None
self._echo_path_simulator_class = None
def Run(self, config_filepaths, input_filepaths, test_data_generator_names,
eval_score_names, output_dir):
@classmethod
def GetPrefixApmConfig(cls):
return cls._PREFIX_APM_CONFIG
@classmethod
def GetPrefixCapture(cls):
return cls._PREFIX_CAPTURE
@classmethod
def GetPrefixRender(cls):
return cls._PREFIX_RENDER
@classmethod
def GetPrefixEchoSimulator(cls):
return cls._PREFIX_ECHO_SIMULATOR
@classmethod
def GetPrefixTestDataGenerator(cls):
return cls._PREFIX_TEST_DATA_GEN
@classmethod
def GetPrefixTestDataGeneratorParameters(cls):
return cls._PREFIX_TEST_DATA_GEN_PARAMS
@classmethod
def GetPrefixScore(cls):
return cls._PREFIX_SCORE
def Run(self, config_filepaths, capture_input_filepaths,
test_data_generator_names, eval_score_names, output_dir,
render_input_filepaths=None, echo_path_simulator_name=(
echo_path_simulation.NoEchoPathSimulator.NAME)):
"""Runs the APM simulation.
Initializes paths and required instances, then runs all the simulations.
The render input can be optionally added. If added, the number of capture
input audio tracks and the number of render input audio tracks have to be
equal. The two lists are used to form pairs of capture and render input.
Args:
config_filepaths: set of APM configuration files to test.
input_filepaths: set of input audio track files to test.
capture_input_filepaths: set of capture input audio track files to test.
test_data_generator_names: set of test data generator names to test.
eval_score_names: set of evaluation score names to test.
output_dir: base path to the output directory for wav files and outcomes.
render_input_filepaths: set of render input audio track files to test.
echo_path_simulator_name: name of the echo path simulator to use when
render input is provided.
"""
assert render_input_filepaths is None or (
len(capture_input_filepaths) == len(render_input_filepaths)), (
'render input set size not matching input set size')
assert render_input_filepaths is None or echo_path_simulator_name in (
echo_path_simulation.EchoPathSimulator.REGISTERED_CLASSES), (
'invalid echo path simulator')
self._base_output_path = os.path.abspath(output_dir)
# Instance test data generators.
@ -79,7 +136,20 @@ class ApmModuleSimulator(object):
self._config_filepaths = self._CreatePathsCollection(config_filepaths)
# Set probing signal file paths.
self._input_filepaths = self._CreatePathsCollection(input_filepaths)
if render_input_filepaths is None:
# Capture input only.
self._capture_input_filepaths = self._CreatePathsCollection(
capture_input_filepaths)
self._render_input_filepaths = None
else:
# Set both capture and render input signals.
self._SetTestInputSignalFilePaths(
capture_input_filepaths, render_input_filepaths)
# Set the echo path simulator class.
self._echo_path_simulator_class = (
echo_path_simulation.EchoPathSimulator.REGISTERED_CLASSES[
echo_path_simulator_name])
self._SimulateAll()
@ -87,44 +157,73 @@ class ApmModuleSimulator(object):
"""Runs all the simulations.
Iterates over the combinations of APM configurations, probing signals, and
test data generators.
test data generators. This method is mainly responsible for the creation of
the cache and output directories required in order to call _Simulate().
"""
without_render_input = self._render_input_filepaths is None
# Try different APM config files.
for config_name in self._config_filepaths:
config_filepath = self._config_filepaths[config_name]
# Try different probing signal files.
for input_name in self._input_filepaths:
input_filepath = self._input_filepaths[input_name]
# Try different capture-render pairs.
for capture_input_name in self._capture_input_filepaths:
capture_input_filepath = self._capture_input_filepaths[
capture_input_name]
render_input_filepath = None if without_render_input else (
self._render_input_filepaths[capture_input_name])
render_input_name = '(none)' if without_render_input else (
self._ExtractFileName(render_input_filepath))
# Instance echo path simulator (if needed).
echo_path_simulator = (
echo_path_simulation_factory.EchoPathSimulatorFactory.GetInstance(
self._echo_path_simulator_class, render_input_filepath))
# Try different test data generators.
for test_data_generators in self._test_data_generators:
logging.info('config: <%s>, input: <%s>, noise: <%s>',
config_name, input_name, test_data_generators.NAME)
logging.info('APM config preset: <%s>, capture: <%s>, render: <%s>,'
'test data generator: <%s>, echo simulator: <%s>',
config_name, capture_input_name, render_input_name,
test_data_generators.NAME, echo_path_simulator.NAME)
# Output path for the input-noise pairs. It is used to cache the noisy
# copies of the probing signals (shared across some simulations).
input_noise_cache_path = os.path.join(
self._base_output_path,
'_cache',
'input_{}-noise_{}'.format(input_name, test_data_generators.NAME))
data_access.MakeDirectory(input_noise_cache_path)
logging.debug('input-noise cache path: <%s>', input_noise_cache_path)
# Output path for the generated test data.
# The path is used to cache the signals shared across simulations.
test_data_cache_path = os.path.join(
self._base_output_path, '_cache',
self._PREFIX_CAPTURE + capture_input_name,
self._PREFIX_TEST_DATA_GEN + test_data_generators.NAME)
data_access.MakeDirectory(test_data_cache_path)
logging.debug('test data cache path: <%s>', test_data_cache_path)
# Output path for the echo simulator and APM input mixer output.
echo_test_data_cache_path = os.path.join(
test_data_cache_path, 'echosim-{}'.format(
echo_path_simulator.NAME))
data_access.MakeDirectory(echo_test_data_cache_path)
logging.debug('echo test data cache path: <%s>',
echo_test_data_cache_path)
# Full output path.
output_path = os.path.join(
self._base_output_path,
'cfg-{}'.format(config_name),
'input-{}'.format(input_name),
'gen-{}'.format(test_data_generators.NAME))
self._PREFIX_APM_CONFIG + config_name,
self._PREFIX_CAPTURE + capture_input_name,
self._PREFIX_RENDER + render_input_name,
self._PREFIX_ECHO_SIMULATOR + echo_path_simulator.NAME,
self._PREFIX_TEST_DATA_GEN + test_data_generators.NAME)
data_access.MakeDirectory(output_path)
logging.debug('output path: <%s>', output_path)
self._Simulate(test_data_generators, input_filepath,
input_noise_cache_path, output_path, config_filepath)
self._Simulate(test_data_generators, capture_input_filepath,
render_input_filepath, test_data_cache_path,
echo_test_data_cache_path, output_path,
config_filepath, echo_path_simulator)
def _Simulate(self, test_data_generators, input_filepath,
input_noise_cache_path, output_path, config_filepath):
def _Simulate(self, test_data_generators, clean_capture_input_filepath,
render_input_filepath, test_data_cache_path,
echo_test_data_cache_path, output_path, config_filepath,
echo_path_simulator):
"""Runs a single set of simulation.
Simulates a given combination of APM configuration, probing signal, and
@ -133,38 +232,52 @@ class ApmModuleSimulator(object):
Args:
test_data_generators: TestDataGenerator instance.
input_filepath: input audio track file to test.
input_noise_cache_path: path for the noisy audio track files.
clean_capture_input_filepath: capture input audio track file to be
processed by a test data generator and
not affected by echo.
render_input_filepath: render input audio track file to test.
test_data_cache_path: path for the generated test audio track files.
echo_test_data_cache_path: path for the echo simulator.
output_path: base output path for the test data generator.
config_filepath: APM configuration file to test.
echo_path_simulator: EchoPathSimulator instance.
"""
# Generate pairs of noisy input and reference signal files.
test_data_generators.Generate(
input_signal_filepath=input_filepath,
input_noise_cache_path=input_noise_cache_path,
input_signal_filepath=clean_capture_input_filepath,
test_data_cache_path=test_data_cache_path,
base_output_path=output_path)
# For each test data pair, simulate a call and evaluate.
for config_name in test_data_generators.config_names:
logging.info(' - test data generator config: <%s>', config_name)
# APM input and output signal paths.
noisy_signal_filepath = test_data_generators.noisy_signal_filepaths[
config_name]
# Paths to the test data generator output.
# Note that the reference signal does not depend on the render input
# which is optional.
noisy_capture_input_filepath = (
test_data_generators.noisy_signal_filepaths[config_name])
reference_signal_filepath = (
test_data_generators.reference_signal_filepaths[config_name])
# Output path for the evaluation (e.g., APM output file).
evaluation_output_path = test_data_generators.apm_output_paths[
config_name]
# Simulate a call using the audio processing module.
# Paths to the APM input signals.
echo_path_filepath = echo_path_simulator.Simulate(
echo_test_data_cache_path)
apm_input_filepath = input_mixer.ApmInputMixer.Mix(
echo_test_data_cache_path, noisy_capture_input_filepath,
echo_path_filepath)
# Simulate a call using APM.
self._audioproc_wrapper.Run(
config_filepath=config_filepath,
input_filepath=noisy_signal_filepath,
capture_input_filepath=apm_input_filepath,
render_input_filepath=render_input_filepath,
output_path=evaluation_output_path)
# Reference signal path for the evaluation step.
reference_signal_filepath = (
test_data_generators.reference_signal_filepaths[
config_name])
# Evaluate.
self._evaluator.Run(
evaluation_score_workers=self._evaluation_score_workers,
@ -172,6 +285,39 @@ class ApmModuleSimulator(object):
reference_input_filepath=reference_signal_filepath,
output_path=evaluation_output_path)
# Save simulation metadata.
data_access.Metadata.SaveAudioTestDataPaths(
output_path=evaluation_output_path,
clean_capture_input_filepath=clean_capture_input_filepath,
echo_free_capture_filepath=noisy_capture_input_filepath,
echo_filepath=echo_path_filepath,
render_filepath=render_input_filepath,
capture_filepath=apm_input_filepath,
apm_output_filepath=self._audioproc_wrapper.output_filepath,
apm_reference_filepath=reference_signal_filepath)
def _SetTestInputSignalFilePaths(self, capture_input_filepaths,
render_input_filepaths):
"""Sets input and render input file paths collections.
Pairs the input and render input files by storing the file paths into two
collections. The key is the file name of the input file.
Args:
capture_input_filepaths: list of file paths.
render_input_filepaths: list of file paths.
"""
self._capture_input_filepaths = {}
self._render_input_filepaths = {}
assert len(capture_input_filepaths) == len(render_input_filepaths)
for capture_input_filepath, render_input_filepath in zip(
capture_input_filepaths, render_input_filepaths):
name = self._ExtractFileName(capture_input_filepath)
self._capture_input_filepaths[name] = os.path.abspath(
capture_input_filepath)
self._render_input_filepaths[name] = os.path.abspath(
render_input_filepath)
@classmethod
def _CreatePathsCollection(cls, filepaths):
"""Creates a collection of file paths.
@ -188,6 +334,10 @@ class ApmModuleSimulator(object):
"""
filepaths_collection = {}
for filepath in filepaths:
name = os.path.splitext(os.path.split(filepath)[1])[0]
name = cls._ExtractFileName(filepath)
filepaths_collection[name] = os.path.abspath(filepath)
return filepaths_collection
@classmethod
def _ExtractFileName(cls, filepath):
return os.path.splitext(os.path.split(filepath)[-1])[0]

View File

@ -71,7 +71,7 @@ class TestApmModuleSimulator(unittest.TestCase):
# Run all simulations.
simulator.Run(
config_filepaths=config_files,
input_filepaths=input_files,
capture_input_filepaths=input_files,
test_data_generator_names=test_data_generators,
eval_score_names=eval_scores,
output_dir=self._output_path)

View File

@ -43,7 +43,7 @@ class TestDataGenerator(object):
reference. The former is the clean signal deteriorated by the noise source,
the latter goes through the same deterioration process, but more "gently".
Noisy signal and reference are produced so that the reference is the signal
expected at the output of the APM module when the latter is fed with the nosiy
expected at the output of the APM module when the latter is fed with the noisy
signal.
An test data generator generates one or more pairs.
@ -52,7 +52,8 @@ class TestDataGenerator(object):
NAME = None
REGISTERED_CLASSES = {}
def __init__(self):
def __init__(self, output_directory_prefix):
self._output_directory_prefix = output_directory_prefix
# Init dictionaries with one entry for each test data generator
# configuration (e.g., different SNRs).
# Noisy audio track files (stored separately in a cache folder).
@ -65,7 +66,7 @@ class TestDataGenerator(object):
@classmethod
def RegisterClass(cls, class_to_register):
"""Registers an TestDataGenerator implementation.
"""Registers a TestDataGenerator implementation.
Decorator to automatically register the classes that extend
TestDataGenerator.
@ -95,7 +96,7 @@ class TestDataGenerator(object):
return self._reference_signal_filepaths
def Generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
self, input_signal_filepath, test_data_cache_path, base_output_path):
"""Generates a set of noisy input and reference audiotrack file pairs.
This method initializes an empty set of pairs and calls the _Generate()
@ -103,12 +104,13 @@ class TestDataGenerator(object):
Args:
input_signal_filepath: path to the clean input audio track file.
input_noise_cache_path: path to the cache of noisy audio track files.
test_data_cache_path: path to the cache of the generated audio track
files.
base_output_path: base path where output is written.
"""
self.Clear()
self._Generate(
input_signal_filepath, input_noise_cache_path, base_output_path)
input_signal_filepath, test_data_cache_path, base_output_path)
def Clear(self):
"""Clears the generated output path dictionaries.
@ -118,7 +120,7 @@ class TestDataGenerator(object):
self._reference_signal_filepaths = {}
def _Generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
self, input_signal_filepath, test_data_cache_path, base_output_path):
"""Abstract method to be implemented in each concrete class.
"""
raise NotImplementedError()
@ -163,16 +165,10 @@ class TestDataGenerator(object):
self._reference_signal_filepaths[config_name] = os.path.abspath(
reference_signal_filepath)
# Save noisy and reference file paths.
data_access.Metadata.SaveAudioTestDataPaths(
output_path=output_path,
audio_in_filepath=self._noisy_signal_filepaths[config_name],
audio_ref_filepath=self._reference_signal_filepaths[config_name])
@classmethod
def _MakeDir(cls, base_output_path, test_data_generator_config_name):
def _MakeDir(self, base_output_path, test_data_generator_config_name):
output_path = os.path.join(
base_output_path, test_data_generator_config_name)
base_output_path,
self._output_directory_prefix + test_data_generator_config_name)
data_access.MakeDirectory(output_path)
return output_path
@ -186,11 +182,11 @@ class IdentityTestDataGenerator(TestDataGenerator):
NAME = 'identity'
def __init__(self):
TestDataGenerator.__init__(self)
def __init__(self, output_directory_prefix):
TestDataGenerator.__init__(self, output_directory_prefix)
def _Generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
self, input_signal_filepath, test_data_cache_path, base_output_path):
config_name = 'default'
output_path = self._MakeDir(base_output_path, config_name)
self._AddNoiseReferenceFilesPair(
@ -219,11 +215,11 @@ class WhiteNoiseTestDataGenerator(TestDataGenerator):
_NOISY_SIGNAL_FILENAME_TEMPLATE = 'noise_{0:d}_SNR.wav'
def __init__(self):
TestDataGenerator.__init__(self)
def __init__(self, output_directory_prefix):
TestDataGenerator.__init__(self, output_directory_prefix)
def _Generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
self, input_signal_filepath, test_data_cache_path, base_output_path):
# Load the input signal.
input_signal = signal_processing.SignalProcessingUtils.LoadWav(
input_signal_filepath)
@ -241,7 +237,7 @@ class WhiteNoiseTestDataGenerator(TestDataGenerator):
snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
for snr in snr_values:
noisy_signal_filepath = os.path.join(
input_noise_cache_path,
test_data_cache_path,
self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(snr))
# Create and save if not done.
@ -276,11 +272,11 @@ class NarrowBandNoiseTestDataGenerator(TestDataGenerator):
NAME = 'narrow_band_noise'
def __init__(self):
TestDataGenerator.__init__(self)
def __init__(self, output_directory_prefix):
TestDataGenerator.__init__(self, output_directory_prefix)
def _Generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
self, input_signal_filepath, test_data_cache_path, base_output_path):
# TODO(alessiob): implement.
pass
@ -316,11 +312,11 @@ class EnvironmentalNoiseTestDataGenerator(TestDataGenerator):
[0, 10], # Largest noise.
]
def __init__(self):
TestDataGenerator.__init__(self)
def __init__(self, output_directory_prefix):
TestDataGenerator.__init__(self, output_directory_prefix)
def _Generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
self, input_signal_filepath, test_data_cache_path, base_output_path):
"""Generates test data pairs using environmental noise.
For each noise track and pair of SNR values, the following two audio tracks
@ -356,7 +352,7 @@ class EnvironmentalNoiseTestDataGenerator(TestDataGenerator):
noisy_mix_filepaths[noise_track_name] = {}
for snr in snr_values:
noisy_signal_filepath = os.path.join(
input_noise_cache_path,
test_data_cache_path,
self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(noise_track_name, snr))
# Create and save if not done.
@ -405,12 +401,12 @@ class ReverberationTestDataGenerator(TestDataGenerator):
_NOISE_TRACK_FILENAME_TEMPLATE = '{0}.wav'
_NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'
def __init__(self, aechen_ir_database_path):
TestDataGenerator.__init__(self)
def __init__(self, output_directory_prefix, aechen_ir_database_path):
TestDataGenerator.__init__(self, output_directory_prefix)
self._aechen_ir_database_path = aechen_ir_database_path
def _Generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
self, input_signal_filepath, test_data_cache_path, base_output_path):
"""Generates test data pairs using reverberation noise.
For each impulse response, one noise track is created. For each impulse
@ -431,7 +427,7 @@ class ReverberationTestDataGenerator(TestDataGenerator):
noise_track_filename = self._NOISE_TRACK_FILENAME_TEMPLATE.format(
impulse_response_name)
noise_track_filepath = os.path.join(
input_noise_cache_path, noise_track_filename)
test_data_cache_path, noise_track_filename)
noise_signal = None
try:
# Load noise track.
@ -450,7 +446,7 @@ class ReverberationTestDataGenerator(TestDataGenerator):
noisy_mix_filepaths[impulse_response_name] = {}
for snr in snr_values:
noisy_signal_filepath = os.path.join(
input_noise_cache_path,
test_data_cache_path,
self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(
impulse_response_name, snr))

View File

@ -21,7 +21,8 @@ class TestDataGeneratorFactory(object):
generators will be produced.
"""
def __init__(self, aechen_ir_database_path):
def __init__(self, output_directory_prefix, aechen_ir_database_path):
self._output_directory_prefix = output_directory_prefix
self._aechen_ir_database_path = aechen_ir_database_path
def GetInstance(self, test_data_generators_class):
@ -30,12 +31,14 @@ class TestDataGeneratorFactory(object):
Args:
test_data_generators_class: TestDataGenerator class object (not an
instance).
Returns:
TestDataGenerator instance.
"""
logging.debug('factory producing %s', test_data_generators_class)
if test_data_generators_class == (
test_data_generation.ReverberationTestDataGenerator):
return test_data_generation.ReverberationTestDataGenerator(
self._aechen_ir_database_path)
self._output_directory_prefix, self._aechen_ir_database_path)
else:
# By default, no arguments in the constructor.
return test_data_generators_class()
return test_data_generators_class(self._output_directory_prefix)

View File

@ -29,7 +29,7 @@ class TestTestDataGenerators(unittest.TestCase):
def setUp(self):
"""Create temporary folders."""
self._base_output_path = tempfile.mkdtemp()
self._input_noise_cache_path = tempfile.mkdtemp()
self._test_data_cache_path = tempfile.mkdtemp()
self._fake_air_db_path = tempfile.mkdtemp()
# Fake AIR DB impulse responses.
@ -49,13 +49,13 @@ class TestTestDataGenerators(unittest.TestCase):
def tearDown(self):
"""Recursively delete temporary folders."""
shutil.rmtree(self._base_output_path)
shutil.rmtree(self._input_noise_cache_path)
shutil.rmtree(self._test_data_cache_path)
shutil.rmtree(self._fake_air_db_path)
def testTestDataGenerators(self):
# Preliminary check.
self.assertTrue(os.path.exists(self._base_output_path))
self.assertTrue(os.path.exists(self._input_noise_cache_path))
self.assertTrue(os.path.exists(self._test_data_cache_path))
# Check that there is at least one registered test data generator.
registered_classes = (
@ -66,6 +66,7 @@ class TestTestDataGenerators(unittest.TestCase):
# Instance generators factory.
generators_factory = (
test_data_generation_factory.TestDataGeneratorFactory(
output_directory_prefix='datagen-',
aechen_ir_database_path=self._fake_air_db_path))
# Use a sample input file as clean input signal.
@ -86,7 +87,7 @@ class TestTestDataGenerators(unittest.TestCase):
# Generate the noisy input - reference pairs.
generator.Generate(
input_signal_filepath=input_signal_filepath,
input_noise_cache_path=self._input_noise_cache_path,
test_data_cache_path=self._test_data_cache_path,
base_output_path=self._base_output_path)
# Perform checks.