Environmental noise generator implemented.

BUG=webrtc:7218
NOTRY=True

Review-Url: https://codereview.webrtc.org/2718133002
Cr-Commit-Position: refs/heads/master@{#17506}
This commit is contained in:
alessiob 2017-04-03 06:54:46 -07:00 committed by Commit bot
parent 653063f6fd
commit 8a1b3c9d11
11 changed files with 351 additions and 67 deletions

View File

@ -12,10 +12,11 @@ copy("py_quality_assessment") {
testonly = true
sources = [
"README.md",
"apm_quality_assessment-export.py",
"apm_quality_assessment-gencfgs.py",
"apm_quality_assessment.py",
"apm_quality_assessment.sh",
"apm_quality_assessment_export.py",
"apm_quality_assessment_gencfgs.py",
"apm_quality_assessment_unittest.py",
]
outputs = [
"$root_build_dir/py_quality_assessment/{{source_file_part}}",
@ -36,9 +37,12 @@ copy("lib") {
"quality_assessment/audioproc_wrapper.py",
"quality_assessment/data_access.py",
"quality_assessment/eval_scores.py",
"quality_assessment/eval_scores_unittest.py",
"quality_assessment/evaluation.py",
"quality_assessment/noise_generation.py",
"quality_assessment/noise_generation_unittest.py",
"quality_assessment/signal_processing.py",
"quality_assessment/signal_processing_unittest.py",
"quality_assessment/simulation.py",
]
visibility = [ ":*" ] # Only targets in this file can depend on this.

View File

@ -22,8 +22,8 @@ SCORES=( \
OUTPUT_PATH=output
# Generate standard APM config files.
chmod +x apm_quality_assessment-gencfgs.py
./apm_quality_assessment-gencfgs.py
chmod +x apm_quality_assessment_gencfgs.py
./apm_quality_assessment_gencfgs.py
# Customize APM configurations if needed.
APM_CONFIGS=(apm_configs/*.json)
@ -56,8 +56,8 @@ done
wait
# Export results.
chmod +x ./apm_quality_assessment-export.py
./apm_quality_assessment-export.py -o ${OUTPUT_PATH}
chmod +x ./apm_quality_assessment_export.py
./apm_quality_assessment_export.py -o ${OUTPUT_PATH}
# Show results in the browser.
RESULTS_FILE="$(realpath ${OUTPUT_PATH}/results.html)"

View File

@ -6,6 +6,22 @@
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Noise generators producing pairs of signals intended to be used to test the
APM module. Each pair consists of a noisy and a reference signal. The former
is used as input for APM, and it is generated by adding noise to a signal.
The reference is the expected APM output when using the generated input.
Throughout this file, the following naming convention is used:
- input signal: the clean signal (e.g., speech),
- noise signal: the noise to be summed up to the input signal (e.g., white
noise, Gaussian noise),
- noisy signal: input + noise.
The noise signal may or may not be a function of the clean signal. For
instance, white noise is independently generated, whereas reverberation is
obtained by convolving the input signal with an impulse response.
"""
import logging
import os
from . import data_access
@ -15,17 +31,13 @@ class NoiseGenerator(object):
"""Abstract class responsible for the generation of noisy signals.
Given a clean signal, it generates two streams named noisy signal and
reference. The former is the clean signal deteriorated by the noise source,
the latter goes trhough the same deterioration process, but more "gently".
reference. The former is the clean signal deteriorated by the noise source,
the latter goes through the same deterioration process, but more "gently".
Noisy signal and reference are produced so that the reference is the signal
expected at the output of the APM module when the latter is fed with the nosiy
signal.
This is useful since it is not realistic to expect that APM will remove all
the background noise or all the echo. Hence, the process that generates the
reference signal is responsible for setting realistic expectations.
Finally, note that a noise source can generate multiple input-reference pairs.
A noise generator generates one or more input-reference pairs.
"""
NAME = None
@ -63,6 +75,10 @@ class NoiseGenerator(object):
def generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
"""Generate a set of noisy input and reference audiotrack file pairs.
This method initializes an empty set of pairs and calls the _generate()
method implemented in a concrete class.
"""
self.clear()
return self._generate(
input_signal_filepath, input_noise_cache_path, base_output_path)
@ -74,6 +90,8 @@ class NoiseGenerator(object):
def _generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
"""This is an abstract method to be implemented in each concrete class.
"""
raise NotImplementedError()
def _add_noise_snr_pairs(self, base_output_path, noisy_mix_filepaths,
@ -154,16 +172,13 @@ class WhiteNoiseGenerator(NoiseGenerator):
NAME = 'white'
# Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
# Since the implementation below only changes the gain of the noise, the
# values indicate the noise-to-signal ratio. Therefore a higher value means
# larger amount of noise.
# The reference (second value of each pair) always has a lower amount of noise
# - i.e., the SNR is 10 dB higher.
_SNR_VALUE_PAIRS = [
[0, -10], # Largest noise.
[-5, -15],
[-10, -20],
[-20, -30], # Smallest noise.
[20, 30], # Smallest noise.
[10, 20],
[5, 15],
[0, 10], # Largest noise.
]
_NOISY_SIGNAL_FILENAME_TEMPLATE = 'noise_{0:d}_SNR.wav'
@ -193,7 +208,7 @@ class WhiteNoiseGenerator(NoiseGenerator):
if not os.path.exists(noisy_signal_filepath):
# Create noisy signal.
noisy_signal = SignalProcessingUtils.mix_signals(
noise_signal, input_signal, snr)
input_signal, noise_signal, snr)
# Save.
SignalProcessingUtils.save_wav(noisy_signal_filepath, noisy_signal)
@ -230,22 +245,80 @@ class NarrowBandNoiseGenerator(NoiseGenerator):
pass
# TODO(alessiob): remove comment when class implemented.
# @NoiseGenerator.register_class
@NoiseGenerator.register_class
class EnvironmentalNoiseGenerator(NoiseGenerator):
"""
Additive environmental noise generator.
"""
NAME = 'environmental'
_NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'
# TODO(alessiob): allow the user to store the noise tracks in a custom path.
_NOISE_TRACKS_PATH = os.path.join(os.getcwd(), 'noise_tracks')
# TODO(alessiob): allow the user to have custom noise tracks.
_NOISE_TRACKS = [
'city.wav'
]
# Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
# The reference (second value of each pair) always has a lower amount of noise
# - i.e., the SNR is 10 dB higher.
_SNR_VALUE_PAIRS = [
[20, 30], # Smallest noise.
[10, 20],
[5, 15],
[0, 10], # Largest noise.
]
def __init__(self):
NoiseGenerator.__init__(self)
def _generate(
self, input_signal_filepath, input_noise_cache_path, base_output_path):
# TODO(alessiob): implement.
pass
# Init.
snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
# Load the input signal.
input_signal = SignalProcessingUtils.load_wav(input_signal_filepath)
input_signal = SignalProcessingUtils.normalize(input_signal)
noisy_mix_filepaths = {}
for noise_track_filename in self._NOISE_TRACKS:
# Load the noise track.
noise_track_name, _ = os.path.splitext(noise_track_filename)
noise_track_filepath = os.path.join(
self._NOISE_TRACKS_PATH, noise_track_filename)
if not os.path.exists(noise_track_filepath):
logging.error('cannot find the <%s> noise track', noise_track_filename)
continue
noise_signal = SignalProcessingUtils.load_wav(noise_track_filepath)
noise_signal = SignalProcessingUtils.normalize(noise_signal)
# Create the noisy mixes (once for each unique SNR value).
noisy_mix_filepaths[noise_track_name] = {}
for snr in snr_values:
noisy_signal_filepath = os.path.join(
input_noise_cache_path,
self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(noise_track_name, snr))
# Create and save if not done.
if not os.path.exists(noisy_signal_filepath):
# Create noisy signal.
noisy_signal = SignalProcessingUtils.mix_signals(
input_signal, noise_signal, snr)
# Save.
SignalProcessingUtils.save_wav(noisy_signal_filepath, noisy_signal)
# Add file to the collection of mixes.
noisy_mix_filepaths[noise_track_name][snr] = noisy_signal_filepath
# Add all the noise-SNR pairs.
self._add_noise_snr_pairs(
base_output_path, noisy_mix_filepaths, self._SNR_VALUE_PAIRS)
# TODO(alessiob): remove comment when class implemented.

View File

@ -0,0 +1,118 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
import os
import shutil
import tempfile
import unittest
from . import noise_generation
from . import signal_processing
class TestNoiseGen(unittest.TestCase):
def setUp(self):
"""Create temporary folders."""
self._base_output_path = tempfile.mkdtemp()
self._input_noise_cache_path = tempfile.mkdtemp()
def tearDown(self):
"""Recursively delete temporary folders."""
shutil.rmtree(self._base_output_path)
shutil.rmtree(self._input_noise_cache_path)
def testNoiseGenerators(self):
# Preliminary check.
self.assertTrue(os.path.exists(self._base_output_path))
self.assertTrue(os.path.exists(self._input_noise_cache_path))
# Check that there is at least one registered noise generator.
registered_classes = noise_generation.NoiseGenerator.REGISTERED_CLASSES
self.assertIsInstance(registered_classes, dict)
self.assertGreater(len(registered_classes), 0)
# Use a sample input file as clean input signal.
input_signal_filepath = os.path.join(
os.getcwd(), 'probing_signals', 'tone-880.wav')
self.assertTrue(os.path.exists(input_signal_filepath))
# Load input signal.
input_signal = signal_processing.SignalProcessingUtils.load_wav(
input_signal_filepath)
# Try each registered noise generator.
for noise_generator_name in registered_classes:
# Instance noise generator.
noise_generator_class = registered_classes[noise_generator_name]
noise_generator = noise_generator_class()
# Generate the noisy input - reference pairs.
noise_generator.generate(
input_signal_filepath=input_signal_filepath,
input_noise_cache_path=self._input_noise_cache_path,
base_output_path=self._base_output_path)
# Perform checks.
self._CheckNoiseGeneratorPairsListSizes(noise_generator)
self._CheckNoiseGeneratorPairsSignalDurations(
noise_generator, input_signal)
self._CheckNoiseGeneratorPairsOutputPaths(noise_generator)
def _CheckNoiseGeneratorPairsListSizes(self, noise_generator):
# Noise configuration names.
noise_config_names = noise_generator.config_names
number_of_pairs = len(noise_config_names)
# Check.
self.assertEqual(number_of_pairs,
len(noise_generator.noisy_signal_filepaths))
self.assertEqual(number_of_pairs,
len(noise_generator.output_paths))
self.assertEqual(number_of_pairs,
len(noise_generator.reference_signal_filepaths))
def _CheckNoiseGeneratorPairsSignalDurations(
self, noise_generator, input_signal):
"""Checks that the noisy input and the reference tracks are audio files
with duration >= to that of the input signal.
"""
input_signal_length = (
signal_processing.SignalProcessingUtils.count_samples(input_signal))
# Iterate over the noisy signal - reference pairs.
for noise_config_name in noise_generator.config_names:
# Load the noisy input file.
noisy_signal_filepath = noise_generator.noisy_signal_filepaths[
noise_config_name]
noisy_signal = signal_processing.SignalProcessingUtils.load_wav(
noisy_signal_filepath)
# Check noisy input signal length.
noisy_signal_length = (
signal_processing.SignalProcessingUtils.count_samples(noisy_signal))
self.assertGreaterEqual(noisy_signal_length, input_signal_length)
# Load the reference file.
reference_signal_filepath = (
noise_generator.reference_signal_filepaths[noise_config_name])
reference_signal = signal_processing.SignalProcessingUtils.load_wav(
reference_signal_filepath)
# Check noisy input signal length.
reference_signal_length = (
signal_processing.SignalProcessingUtils.count_samples(
reference_signal))
self.assertGreaterEqual(reference_signal_length, input_signal_length)
def _CheckNoiseGeneratorPairsOutputPaths(self, noise_generator):
"""Checks that the output path created by the generator exists.
"""
# Iterate over the noisy signal - reference pairs.
for noise_config_name in noise_generator.config_names:
output_path = noise_generator.output_paths[noise_config_name]
self.assertTrue(os.path.exists(output_path))

View File

@ -11,8 +11,13 @@ import logging
import numpy as np
import pydub
import pydub.generators
import scipy.signal
class SignalProcessingException(Exception):
pass
class SignalProcessingUtils(object):
def __init__(self):
@ -110,39 +115,72 @@ class SignalProcessingUtils(object):
return signal.apply_gain(-signal.max_dBFS)
@classmethod
def mix_signals(cls, signal_0, signal_1, target_snr=0.0,
def copy(cls, signal):
return pydub.AudioSegment(
data=signal.get_array_of_samples(),
metadata={
'sample_width': signal.sample_width,
'frame_rate': signal.frame_rate,
'frame_width': signal.frame_width,
'channels': signal.channels,
})
@classmethod
def mix_signals(cls, signal, noise, target_snr=0.0,
bln_pad_shortest=False):
"""
Mix two signals up to a desired SNR by scaling signal_0 (signal).
Mix two signals up to a desired SNR by scaling noise (noise).
If the target SNR is +/- infinite, a copy of signal/noise is returned.
Args:
signal_0: AudioSegment instance (signal).
signal_1: AudioSegment instance (noise).
target_snr: float (dB).
signal: AudioSegment instance (signal).
noise: AudioSegment instance (noise).
target_snr: float, numpy.Inf or -numpy.Inf (dB).
bln_pad_shortest: if True, it pads the shortest signal with silence at the
end.
"""
# Pad signal_1 (if necessary). If signal_0 is the shortest, the AudioSegment
# overlay() method implictly pads signal_0. Hence, the only case to handle
# is signal_1 shorter than signal_0 and bln_pad_shortest True.
# Handle infinite target SNR.
if target_snr == -np.Inf:
# Return a copy of noise.
logging.warning('SNR = -Inf, returning noise')
return cls.copy(noise)
elif target_snr == np.Inf:
# Return a copy of signal.
logging.warning('SNR = +Inf, returning signal')
return cls.copy(signal)
# Check signal and noise power.
signal_power = float(signal.dBFS)
noise_power = float(noise.dBFS)
if signal_power == -np.Inf:
logging.error('signal has -Inf power, cannot mix')
raise SignalProcessingException('cannot mix a signal with -Inf power')
if noise_power == -np.Inf:
logging.error('noise has -Inf power, cannot mix')
raise SignalProcessingException('cannot mix a signal with -Inf power')
# Pad signal (if necessary). If noise is the shortest, the AudioSegment
# overlay() method implictly pads noise. Hence, the only case to handle
# is signal shorter than noise and bln_pad_shortest True.
if bln_pad_shortest:
signal_0_duration = len(signal_0)
signal_1_duration = len(signal_1)
logging.debug('mix signals with padding')
logging.debug(' signal_0: %d ms', signal_0_duration)
logging.debug(' signal_1: %d ms', signal_1_duration)
padding_duration = signal_0_duration - signal_1_duration
if padding_duration > 0: # That is signal_1_duration < signal_0_duration.
signal_duration = len(signal)
noise_duration = len(noise)
logging.warning('mix signals with padding')
logging.warning(' signal: %d ms', signal_duration)
logging.warning(' noise: %d ms', noise_duration)
padding_duration = noise_duration - signal_duration
if padding_duration > 0: # That is signal_duration < noise_duration.
logging.debug(' padding: %d ms', padding_duration)
padding = pydub.AudioSegment.silent(
duration=padding_duration,
frame_rate=signal_0.frame_rate)
logging.debug(' signal_1 (pre): %d ms', len(signal_1))
signal_1 = signal_1 + padding
logging.debug(' signal_1 (post): %d ms', len(signal_1))
frame_rate=signal.frame_rate)
logging.debug(' signal (pre): %d ms', len(signal))
signal = signal + padding
logging.debug(' signal (post): %d ms', len(signal))
# Update power.
signal_power = float(signal.dBFS)
# Mix signals using the target SNR.
power_0 = float(signal_0.dBFS)
power_1 = float(signal_1.dBFS)
gain_db = target_snr + power_1 - power_0
return cls.normalize(signal_1.overlay(signal_0.apply_gain(gain_db)))
gain_db = signal_power - noise_power - target_snr
return cls.normalize(signal.overlay(noise.apply_gain(gain_db)))

View File

@ -0,0 +1,70 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
import unittest
import numpy as np
import pydub
from . import signal_processing
class TestSignalProcessing(unittest.TestCase):
def testMixSignals(self):
# Generate a template signal with which white noise can be generated.
silence = pydub.AudioSegment.silent(duration=1000, frame_rate=48000)
# Generate two distinct AudioSegment instances with 1 second of white noise.
signal = signal_processing.SignalProcessingUtils.generate_white_noise(
silence)
noise = signal_processing.SignalProcessingUtils.generate_white_noise(
silence)
# Extract samples.
signal_samples = signal.get_array_of_samples()
noise_samples = noise.get_array_of_samples()
# Test target SNR -Inf (noise expected).
mix_neg_inf = signal_processing.SignalProcessingUtils.mix_signals(
signal, noise, -np.Inf)
self.assertTrue(len(noise), len(mix_neg_inf)) # Check duration.
mix_neg_inf_samples = mix_neg_inf.get_array_of_samples()
self.assertTrue( # Check samples.
all([x == y for x, y in zip(noise_samples, mix_neg_inf_samples)]))
# Test target SNR 0.0 (different data expected).
mix_0 = signal_processing.SignalProcessingUtils.mix_signals(
signal, noise, 0.0)
self.assertTrue(len(signal), len(mix_0)) # Check duration.
self.assertTrue(len(noise), len(mix_0))
mix_0_samples = mix_0.get_array_of_samples()
self.assertTrue(
any([x != y for x, y in zip(signal_samples, mix_0_samples)]))
self.assertTrue(
any([x != y for x, y in zip(noise_samples, mix_0_samples)]))
# Test target SNR +Inf (signal expected).
mix_pos_inf = signal_processing.SignalProcessingUtils.mix_signals(
signal, noise, np.Inf)
self.assertTrue(len(signal), len(mix_pos_inf)) # Check duration.
mix_pos_inf_samples = mix_pos_inf.get_array_of_samples()
self.assertTrue( # Check samples.
all([x == y for x, y in zip(signal_samples, mix_pos_inf_samples)]))
def testMixSignalsMinInfPower(self):
silence = pydub.AudioSegment.silent(duration=1000, frame_rate=48000)
signal = signal_processing.SignalProcessingUtils.generate_white_noise(
silence)
with self.assertRaises(signal_processing.SignalProcessingException):
_ = signal_processing.SignalProcessingUtils.mix_signals(
signal, silence, 0.0)
with self.assertRaises(signal_processing.SignalProcessingException):
_ = signal_processing.SignalProcessingUtils.mix_signals(
silence, signal, 0.0)

View File

@ -1,19 +0,0 @@
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
import unittest
from . import noise_generation
class TestNoiseGen(unittest.TestCase):
def test_registered_classes(self):
# Check that there is at least one registered noise generator.
classes = noise_generation.NoiseGenerator.REGISTERED_CLASSES
self.assertIsInstance(classes, dict)
self.assertGreater(len(classes), 0)