APM-QA Test data generation: environmental noise looped.
SignalProcessingUtils.MixSignals() now allows different padding options. This CL also adds more unit tests for SignalProcessingUtils.MixSignals(). Bug: webrtc:7494 Change-Id: Id62fe9998e512c275cb6399e0aedf11f23a9f36e Reviewed-on: https://webrtc-review.googlesource.com/5780 Commit-Queue: Alessio Bazzica <alessiob@webrtc.org> Reviewed-by: Alex Loiko <aleloi@webrtc.org> Cr-Commit-Position: refs/heads/master@{#20122}
This commit is contained in:
parent
90e1f539a5
commit
6967553240
@ -10,7 +10,7 @@ reference one used for evaluation.
|
||||
## Dependencies
|
||||
- OS: Linux
|
||||
- Python 2.7
|
||||
- Python libraries: numpy, scipy, pydub (0.17.0+), pandas (0.20.1+)
|
||||
- Python libraries: enum34, numpy, scipy, pydub (0.17.0+), pandas (0.20.1+)
|
||||
- It is recommended that a dedicated Python environment is used
|
||||
- install `virtualenv`
|
||||
- `$ sudo apt-get install python-virtualenv`
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
"""
|
||||
|
||||
import array
|
||||
import enum
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@ -29,6 +30,7 @@ except ImportError:
|
||||
|
||||
try:
|
||||
import scipy.signal
|
||||
import scipy.fftpack
|
||||
except ImportError:
|
||||
logging.critical('Cannot import the third-party Python package scipy')
|
||||
sys.exit(1)
|
||||
@ -40,6 +42,12 @@ class SignalProcessingUtils(object):
|
||||
"""Collection of signal processing utilities.
|
||||
"""
|
||||
|
||||
@enum.unique
|
||||
class MixPadding(enum.Enum):
|
||||
NO_PADDING = 0
|
||||
ZERO_PADDING = 1
|
||||
LOOP = 2
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@ -155,6 +163,14 @@ class SignalProcessingUtils(object):
|
||||
raise exceptions.SignalProcessingException('Unsupported samples type')
|
||||
return np.array(signal.get_array_of_samples(), np.int16)
|
||||
|
||||
@classmethod
|
||||
def Fft(cls, signal, normalize=True):
|
||||
x = cls.AudioSegmentToRawData(signal).astype(np.float32)
|
||||
if normalize:
|
||||
x /= max(abs(np.max(x)), 1.0)
|
||||
y = scipy.fftpack.fft(x)
|
||||
return y[:len(y) / 2]
|
||||
|
||||
@classmethod
|
||||
def DetectHardClipping(cls, signal, threshold=2):
|
||||
"""Detects hard clipping.
|
||||
@ -272,18 +288,24 @@ class SignalProcessingUtils(object):
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def MixSignals(cls, signal, noise, target_snr=0.0, bln_pad_shortest=False):
|
||||
"""Mixes two signals with a target SNR.
|
||||
def MixSignals(cls, signal, noise, target_snr=0.0,
|
||||
pad_noise=MixPadding.NO_PADDING):
|
||||
"""Mixes |signal| and |noise| with a target SNR.
|
||||
|
||||
Mix two signals with a desired SNR by scaling noise (noise).
|
||||
Mix |signal| and |noise| with a desired SNR by scaling |noise|.
|
||||
If the target SNR is +/- infinite, a copy of signal/noise is returned.
|
||||
If |signal| is shorter than |noise|, the length of the mix equals that of
|
||||
|signal|. Otherwise, the mix length depends on whether padding is applied.
|
||||
When padding is not applied, that is |pad_noise| is set to NO_PADDING
|
||||
(default), the mix length equals that of |noise| - i.e., |signal| is
|
||||
truncated. Otherwise, |noise| is extended and the resulting mix has the same
|
||||
length of |signal|.
|
||||
|
||||
Args:
|
||||
signal: AudioSegment instance (signal).
|
||||
noise: AudioSegment instance (noise).
|
||||
target_snr: float, numpy.Inf or -numpy.Inf (dB).
|
||||
bln_pad_shortest: if True, it pads the shortest signal with silence at the
|
||||
end.
|
||||
pad_noise: SignalProcessingUtils.MixPadding, default: NO_PADDING.
|
||||
|
||||
Returns:
|
||||
An AudioSegment instance.
|
||||
@ -310,28 +332,23 @@ class SignalProcessingUtils(object):
|
||||
raise exceptions.SignalProcessingException(
|
||||
'cannot mix a signal with -Inf power')
|
||||
|
||||
# Pad signal (if necessary). If noise is the shortest, the AudioSegment
|
||||
# overlay() method implictly pads noise. Hence, the only case to handle
|
||||
# is signal shorter than noise and bln_pad_shortest True.
|
||||
if bln_pad_shortest:
|
||||
signal_duration = len(signal)
|
||||
noise_duration = len(noise)
|
||||
logging.warning('mix signals with padding')
|
||||
logging.warning(' signal: %d ms', signal_duration)
|
||||
logging.warning(' noise: %d ms', noise_duration)
|
||||
padding_duration = noise_duration - signal_duration
|
||||
if padding_duration > 0: # That is signal_duration < noise_duration.
|
||||
logging.debug(' padding: %d ms', padding_duration)
|
||||
padding = pydub.AudioSegment.silent(
|
||||
duration=padding_duration,
|
||||
frame_rate=signal.frame_rate)
|
||||
logging.debug(' signal (pre): %d ms', len(signal))
|
||||
signal = signal + padding
|
||||
logging.debug(' signal (post): %d ms', len(signal))
|
||||
|
||||
# Update power.
|
||||
signal_power = float(signal.dBFS)
|
||||
|
||||
# Mix signals using the target SNR.
|
||||
# Mix.
|
||||
gain_db = signal_power - noise_power - target_snr
|
||||
return cls.Normalize(signal.overlay(noise.apply_gain(gain_db)))
|
||||
signal_duration = len(signal)
|
||||
noise_duration = len(noise)
|
||||
if signal_duration <= noise_duration:
|
||||
# Ignore |pad_noise|, |noise| is truncated if longer that |signal|, the
|
||||
# mix will have the same length of |signal|.
|
||||
return signal.overlay(noise.apply_gain(gain_db))
|
||||
elif pad_noise == cls.MixPadding.NO_PADDING:
|
||||
# |signal| is longer than |noise|, but no padding is applied to |noise|.
|
||||
# Truncate |signal|.
|
||||
return noise.overlay(signal, gain_during_overlay=gain_db)
|
||||
elif pad_noise == cls.MixPadding.ZERO_PADDING:
|
||||
# TODO(alessiob): Check that this works as expected.
|
||||
return signal.overlay(noise.apply_gain(gain_db))
|
||||
elif pad_noise == cls.MixPadding.LOOP:
|
||||
# |signal| is longer than |noise|, extend |noise| by looping.
|
||||
return signal.overlay(noise.apply_gain(gain_db), loop=True)
|
||||
else:
|
||||
raise exceptions.SignalProcessingException('invalid padding type')
|
||||
|
||||
@ -75,3 +75,112 @@ class TestSignalProcessing(unittest.TestCase):
|
||||
with self.assertRaises(exceptions.SignalProcessingException):
|
||||
_ = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
silence, signal, 0.0)
|
||||
|
||||
def testMixSignalNoiseDifferentLengths(self):
|
||||
# Test signals.
|
||||
shorter = signal_processing.SignalProcessingUtils.GenerateWhiteNoise(
|
||||
pydub.AudioSegment.silent(duration=1000, frame_rate=8000))
|
||||
longer = signal_processing.SignalProcessingUtils.GenerateWhiteNoise(
|
||||
pydub.AudioSegment.silent(duration=2000, frame_rate=8000))
|
||||
|
||||
# When the signal is shorter than the noise, the mix length always equals
|
||||
# that of the signal regardless of whether padding is applied.
|
||||
# No noise padding, length of signal less than that of noise.
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=shorter,
|
||||
noise=longer,
|
||||
pad_noise=signal_processing.SignalProcessingUtils.MixPadding.NO_PADDING)
|
||||
self.assertEqual(len(shorter), len(mix))
|
||||
# With noise padding, length of signal less than that of noise.
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=shorter,
|
||||
noise=longer,
|
||||
pad_noise=signal_processing.SignalProcessingUtils.MixPadding.ZERO_PADDING)
|
||||
self.assertEqual(len(shorter), len(mix))
|
||||
|
||||
# When the signal is longer than the noise, the mix length depends on
|
||||
# whether padding is applied.
|
||||
# No noise padding, length of signal greater than that of noise.
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=longer,
|
||||
noise=shorter,
|
||||
pad_noise=signal_processing.SignalProcessingUtils.MixPadding.NO_PADDING)
|
||||
self.assertEqual(len(shorter), len(mix))
|
||||
# With noise padding, length of signal greater than that of noise.
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=longer,
|
||||
noise=shorter,
|
||||
pad_noise=signal_processing.SignalProcessingUtils.MixPadding.ZERO_PADDING)
|
||||
self.assertEqual(len(longer), len(mix))
|
||||
|
||||
def testMixSignalNoisePaddingTypes(self):
|
||||
# Test signals.
|
||||
shorter = signal_processing.SignalProcessingUtils.GenerateWhiteNoise(
|
||||
pydub.AudioSegment.silent(duration=1000, frame_rate=8000))
|
||||
longer = signal_processing.SignalProcessingUtils.GeneratePureTone(
|
||||
pydub.AudioSegment.silent(duration=2000, frame_rate=8000), 440.0)
|
||||
|
||||
# Zero padding: expect pure tone only in 1-2s.
|
||||
mix_zero_pad = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=longer,
|
||||
noise=shorter,
|
||||
target_snr=-6,
|
||||
pad_noise=signal_processing.SignalProcessingUtils.MixPadding.ZERO_PADDING)
|
||||
|
||||
# Loop: expect pure tone plus noise in 1-2s.
|
||||
mix_loop = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=longer,
|
||||
noise=shorter,
|
||||
target_snr=-6,
|
||||
pad_noise=signal_processing.SignalProcessingUtils.MixPadding.LOOP)
|
||||
|
||||
def Energy(signal):
|
||||
samples = signal_processing.SignalProcessingUtils.AudioSegmentToRawData(
|
||||
signal).astype(np.float32)
|
||||
return np.sum(samples * samples)
|
||||
|
||||
e_mix_zero_pad = Energy(mix_zero_pad[-1000:])
|
||||
e_mix_loop = Energy(mix_loop[-1000:])
|
||||
self.assertLess(0, e_mix_zero_pad)
|
||||
self.assertLess(e_mix_zero_pad, e_mix_loop)
|
||||
|
||||
def testMixSignalSnr(self):
|
||||
# Test signals.
|
||||
tone_low = signal_processing.SignalProcessingUtils.GeneratePureTone(
|
||||
pydub.AudioSegment.silent(duration=64, frame_rate=8000), 250.0)
|
||||
tone_high = signal_processing.SignalProcessingUtils.GeneratePureTone(
|
||||
pydub.AudioSegment.silent(duration=64, frame_rate=8000), 3000.0)
|
||||
|
||||
def ToneAmplitudes(mix):
|
||||
"""Returns the amplitude of the coefficients #16 and #192, which
|
||||
correspond to the tones at 250 and 3k Hz respectively."""
|
||||
mix_fft = np.absolute(signal_processing.SignalProcessingUtils.Fft(mix))
|
||||
return mix_fft[16], mix_fft[192]
|
||||
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=tone_low,
|
||||
noise=tone_high,
|
||||
target_snr=-6)
|
||||
ampl_low, ampl_high = ToneAmplitudes(mix)
|
||||
self.assertLess(ampl_low, ampl_high)
|
||||
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=tone_high,
|
||||
noise=tone_low,
|
||||
target_snr=-6)
|
||||
ampl_low, ampl_high = ToneAmplitudes(mix)
|
||||
self.assertLess(ampl_high, ampl_low)
|
||||
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=tone_low,
|
||||
noise=tone_high,
|
||||
target_snr=6)
|
||||
ampl_low, ampl_high = ToneAmplitudes(mix)
|
||||
self.assertLess(ampl_high, ampl_low)
|
||||
|
||||
mix = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
signal=tone_high,
|
||||
noise=tone_low,
|
||||
target_snr=6)
|
||||
ampl_low, ampl_high = ToneAmplitudes(mix)
|
||||
self.assertLess(ampl_low, ampl_high)
|
||||
|
||||
@ -394,7 +394,8 @@ class EnvironmentalNoiseTestDataGenerator(TestDataGenerator):
|
||||
if not os.path.exists(noisy_signal_filepath):
|
||||
# Create noisy signal.
|
||||
noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
input_signal, noise_signal, snr)
|
||||
input_signal, noise_signal, snr,
|
||||
pad_noise=signal_processing.SignalProcessingUtils.MixPadding.LOOP)
|
||||
|
||||
# Save.
|
||||
signal_processing.SignalProcessingUtils.SaveWav(
|
||||
@ -489,7 +490,7 @@ class ReverberationTestDataGenerator(TestDataGenerator):
|
||||
if not os.path.exists(noisy_signal_filepath):
|
||||
# Create noisy signal.
|
||||
noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
|
||||
input_signal, noise_signal, snr, bln_pad_shortest=True)
|
||||
input_signal, noise_signal, snr)
|
||||
|
||||
# Save.
|
||||
signal_processing.SignalProcessingUtils.SaveWav(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user