Source code for shennong.processor.plp

"""Provides the PlpProcessor class to extract PLP features

Extract PLP (Perceptual Linear Predictive analysis of speech) from an
audio signal. Uses the Kaldi implementation (see [Hermansky1990]_ and
[kaldi-plp]_). Optionally apply RASTA filtering (see [Herm94]_).

    :class:`~shennong.audio.Audio` ---> PlpProcessor \
    ---> :class:`~shennong.features.Features`

Examples
--------

>>> from shennong.audio import Audio
>>> from shennong.processor.plp import PlpProcessor
>>> audio = Audio.load('./test/data/test.wav')

Initialize the PLP processor with some options. Options can be
specified at construction, or after:

>>> processor = PlpProcessor()
>>> processor.sample_rate = audio.sample_rate

Here we apply RASTA filters

>>> processor.rasta = True

Compute the PLP features with the specified options, the output is an
instance of :class:`~shennong.features.Features`:

>>> plp = processor.process(audio)
>>> type(plp)
<class 'shennong.features.Features'>
>>> plp.shape[1] == processor.num_ceps
True

References
----------

.. [Hermansky1990]
     H. Hermansky, "Perceptual linear predictive (PLP) analysis of speech",
     Journal of the Acoustical Society of America, vol. 87, no. 4, pages
     1738–1752 (1990)`

.. [Herm94]
     H. Hermansky and N. Morgan, "RASTA processing of speech", IEEE Trans. on
     Speech and Audio Proc., vol. 2, no. 4, pp. 578-589, Oct. 1994.

.. [kaldi-plp]
     http://kaldi-asr.org/doc/feat.html#feat_plp

"""

import numpy as np
import scipy.signal
import kaldi.base.math
import kaldi.feat.mel
import kaldi.feat.window
import kaldi.matrix

from shennong import Features
from shennong.processor.base import MelFeaturesProcessor


[docs]class RastaFilter: """Rasta filter for Rasta PLP implementation Reimplemented after [labrosa]_ and [rastapy]_ on a frame by frame basis. Original implementation takes the whole signal at once. Parameters ---------- size : int The dimension of the frames to filter References ---------- .. [labrosa] https://labrosa.ee.columbia.edu/matlab/rastamat/ .. [rastapy] https://github.com/mystlee/rasta_py """ def __init__(self, size): # filter numerator and denominator self._num = -np.arange(-2, 3) / np.sum(np.arange(-2, 3) ** 2) self._den = np.array([1, -0.94]) # dimensionality of the frames to filter self._size = size # the following attributes are initialized in the reset() method self._delay = None self._count = None self._first_frames = None self.reset()
[docs] def reset(self): """Initializes the filter state""" self._count = 0 self._first_frames = [] self._delay = np.dstack( (scipy.signal.lfilter_zi(self._num, 1),) * self._size).squeeze()
[docs] def filter(self, frame, do_log=True): """RASTA filtering of a mel frame Parameters ---------- frame : numpy array, shape = [size, 1] The frame vector to filter. do_log : bool, optional When True move to the log domain before filtering, and do inverse log after. When False the frame is expected to be in log domain already. Default to true. Returns ------- filtered : numpy array, shape = [size, 1] The filtered frame. """ x = frame if do_log: x = np.log(x + np.finfo(x.dtype).eps) if self._count < 4: # stack the 4 first frames into a buffer and initialize the filter self._first_frames.append(x) y = np.zeros((x.shape)) if self._count == 3: x = np.asarray(self._first_frames) _, self._delay = scipy.signal.lfilter( self._num, 1, x, zi=self._delay * x[0], axis=0) if self._count >= 4: y, self._delay = scipy.signal.lfilter( self._num, self._den, [x], zi=self._delay, axis=0) self._count += 1 y = np.atleast_2d(y)[0, :].astype(x.dtype) if do_log: y = np.exp(y) return y
def _lpc2cepstrum(lpc_order, lpc, cepstrum): """Reimplementation of Kaldi Lpc2Cepstrum from src/feat/mel_computations.h (missing in pykadli) Parameters ---------- lpc_order : int LPC order lpc : kaldi.matrix.Vector LPC coefficients of size `lpc_order` cepstrum : kaldi.matrix.Vector Output cepstrum, must be preallocated with size `lpc_order` """ for i in range(lpc_order): _sum = 0.0 for j in range(i): _sum += float(i - j) * lpc[j] * cepstrum[i - j - 1] cepstrum[i] = -lpc[i] - _sum / float(i + 1) def _process_window(opts, window_function, window, do_log_energy): """Reimplmentation of Kaldi ProcessWindow from src/feat/feature-window.h This function is already wrapped in pykaldi but without estimation of log energy. Because the PLP recipe needs it, it has been reimplemented. """ frame_length = opts.window_size() assert window.dim == frame_length if opts.dither != 0: kaldi.feat.window.dither(window, opts.dither) if opts.remove_dc_offset: window.add_(-window.sum() / frame_length) log_energy = 0 if do_log_energy: log_energy = kaldi.base.math.log(max( kaldi.matrix.functions.vec_vec(window, window), np.finfo(float).eps)) if opts.preemph_coeff != 0: kaldi.feat.window.preemphasize(window, opts.preemph_coeff) window.mul_elements_(window_function.window) return log_energy def _extract_window( sample_offset, wave, frame, opts, window_function, window, do_log_energy): """Reimplmentation of Kaldi ExtractWindow from src/feat/feature-window.h This function is already wrapped in pykaldi but without estimation of log energy. Because the PLP recipe needs it, it has been reimplemented. """ assert sample_offset >= 0 and wave.dim != 0 frame_length = opts.window_size() frame_length_padded = opts.padded_window_size() num_samples = sample_offset + wave.dim start_sample = kaldi.feat.window.first_sample_of_frame(frame, opts) end_sample = start_sample + frame_length if opts.snip_edges: assert start_sample >= sample_offset and end_sample <= num_samples else: assert sample_offset == 0 or start_sample >= sample_offset if window.dim != frame_length_padded: window.resize_( frame_length_padded, kaldi.matrix.common.MatrixResizeType.UNDEFINED) # wave_start and wave_end are start and end indexes into 'wave', for the # piece of wave that we're trying to extract. wave_start = int(start_sample - sample_offset) wave_end = wave_start + frame_length if wave_start >= 0 and wave_end <= wave.dim: # the normal case -- no edge effects to consider window[:frame_length] = wave[wave_start:wave_start+frame_length] else: # Deal with any end effects by reflection, if needed. This code will # only be reached for about two frames per utterance, so we don't # concern ourselves excessively with efficiency. for s in range(frame_length): s_in_wave = s + wave_start while s_in_wave < 0 or s_in_wave >= wave.dim: # reflect around the beginning or end of the wave. # e.g. -1 -> 0, -2 -> 1. # dim -> dim - 1, dim + 1 -> dim - 2. # the code supports repeated reflections, although this # would only be needed in pathological cases. if s_in_wave < 0: s_in_wave = -s_in_wave - 1 else: s_in_wave = 2 * wave.dim - 1 - s_in_wave window[s] = wave[s_in_wave] if frame_length_padded > frame_length: window[frame_length:frame_length_padded] = 0 return _process_window( opts, window_function, window[:frame_length], do_log_energy)
[docs]class PlpProcessor(MelFeaturesProcessor): """Perceptive linear predictive features""" def __init__(self, sample_rate=16000, frame_shift=0.01, frame_length=0.025, rasta=False, dither=1.0, preemph_coeff=0.97, remove_dc_offset=True, window_type='povey', round_to_power_of_two=True, blackman_coeff=0.42, snip_edges=True, num_bins=23, low_freq=20, high_freq=0, vtln_low=100, vtln_high=-500, lpc_order=12, num_ceps=13, use_energy=True, energy_floor=0.0, raw_energy=True, compress_factor=1.0/3.0, cepstral_lifter=22, cepstral_scale=1.0, htk_compat=False): # Forward options to MelFeaturesProcessor super().__init__( sample_rate=sample_rate, frame_shift=frame_shift, frame_length=frame_length, dither=dither, preemph_coeff=preemph_coeff, remove_dc_offset=remove_dc_offset, window_type=window_type, round_to_power_of_two=round_to_power_of_two, blackman_coeff=blackman_coeff, snip_edges=snip_edges, num_bins=num_bins, low_freq=low_freq, high_freq=high_freq, vtln_low=vtln_low, vtln_high=vtln_high) self._options = kaldi.feat.plp.PlpOptions() self._options.frame_opts = self._frame_options self._options.mel_opts = self._mel_options self.rasta = rasta self.lpc_order = lpc_order self.num_ceps = num_ceps self.use_energy = use_energy self.energy_floor = energy_floor self.raw_energy = raw_energy self.compress_factor = compress_factor self.cepstral_lifter = cepstral_lifter self.cepstral_scale = cepstral_scale self.htk_compat = htk_compat # will store the Rasta filter if `rasta` option is True self._rasta_filter = None # will store mel banks and equal loudness coeffs for various VTLN warps self._mel_banks = {} self._equal_loudness = {} # the buffers needed by the recipe are allocated in # self._reset_buffers() self._mel_energies_duplicated = kaldi.matrix.Vector() self._autocorr_coeffs = kaldi.matrix.Vector() self._lpc_coeffs = kaldi.matrix.Vector() self._raw_cepstrum = kaldi.matrix.Vector() self._lifter_coeffs = kaldi.matrix.Vector() self._idft_bases = kaldi.matrix.Matrix() self._log_energy_floor = 0 # this recipe does not rely on a Kaldi class as all is reimplemented in # python/pykaldi self._kaldi_processor = None @property def name(self): return 'plp' @property def rasta(self): """Whether to do RASTA filtering""" return self._rasta @rasta.setter def rasta(self, value): self._rasta = bool(value) @property def lpc_order(self): """Order of LPC analysis in PLP computation""" return self._options.lpc_order @lpc_order.setter def lpc_order(self, value): self._options.lpc_order = value @property def num_ceps(self): """Number of cepstra in PLP computation (including C0) Must be positive and smaller or equal to `lpc_order` + 1. """ return self._options.num_ceps @num_ceps.setter def num_ceps(self, value): value = int(value) if value <= 0: raise ValueError('num_ceps must be > 0') if value > self.lpc_order + 1: raise ValueError( 'We must have num_ceps <= lpc_order+1, but {} > {}+1'.format( value, self.lpc_order)) self._options.num_ceps = value @property def use_energy(self): """Use energy (instead of C0) for zeroth PLP feature""" return self._options.use_energy @use_energy.setter def use_energy(self, value): self._options.use_energy = value @property def energy_floor(self): """Floor on energy (absolute, not relative) in PLP computation""" return self._options.energy_floor @energy_floor.setter def energy_floor(self, value): self._options.energy_floor = value @property def raw_energy(self): """If true, compute energy before preemphasis and windowing""" return self._options.raw_energy @raw_energy.setter def raw_energy(self, value): self._options.raw_energy = value @property def compress_factor(self): """Compression factor in PLP computation""" return np.float32(self._options.compress_factor) @compress_factor.setter def compress_factor(self, value): self._options.compress_factor = value @property def cepstral_lifter(self): """Constant that controls scaling of PLPs""" return self._options.cepstral_lifter @cepstral_lifter.setter def cepstral_lifter(self, value): self._options.cepstral_lifter = value @property def cepstral_scale(self): """Scaling constant in PLP computation""" return self._options.cepstral_scale @cepstral_scale.setter def cepstral_scale(self, value): self._options.cepstral_scale = value @property def htk_compat(self): """If True, get closer to HTK PLP features Put energy or C0 last. Warning: Not sufficient to get HTK compatible features (need to change other parameters) """ return self._options.htk_compat @htk_compat.setter def htk_compat(self, value): self._options.htk_compat = value @property def ndims(self): return self.num_ceps def _reset_buffers(self): if self._mel_energies_duplicated.dim != self.num_bins + 2: self._mel_energies_duplicated.resize_( self.num_bins + 2, kaldi.matrix.common.MatrixResizeType.UNDEFINED) if self._autocorr_coeffs.dim != self.lpc_order + 1: self._autocorr_coeffs.resize_( self.lpc_order + 1, kaldi.matrix.common.MatrixResizeType.UNDEFINED) if self._lpc_coeffs.dim != self.lpc_order: self._lpc_coeffs.resize_( self.lpc_order, kaldi.matrix.common.MatrixResizeType.UNDEFINED) if self._raw_cepstrum.dim != self.lpc_order: self._raw_cepstrum.resize_( self.lpc_order, kaldi.matrix.common.MatrixResizeType.UNDEFINED) if ( self.cepstral_lifter != 0 and self._lifter_coeffs.dim != self.num_ceps): self._lifter_coeffs.resize_(self.num_ceps) kaldi.feat.mel.compute_lifter_coeffs( self.cepstral_lifter, self._lifter_coeffs) elif self.cepstral_lifter == 0: self._lifter_coeffs.resize_(0) self._idft_bases = kaldi.feat.functions.init_idft_bases( self.lpc_order + 1, self.num_bins + 2) if self.energy_floor > 0: self._log_energy_floor = kaldi.base.math.log(self.energy_floor) if self.rasta: self._rasta_filter = RastaFilter(self.num_bins) def _get_mel_banks(self, vtln_warp): """Returns MEL banks for a given VTLN warp Banks are generated on demand and stored for fastest retrieval. """ try: return self._mel_banks[vtln_warp] except KeyError: mel_banks = kaldi.feat.mel.MelBanks( self._mel_options, self._frame_options, vtln_warp) self._mel_banks[vtln_warp] = mel_banks return mel_banks def _get_equal_loudness(self, vtln_warp): """Returns equal loudness coefficient for a given VTLN warp Coefficients are generated on demand and stored for fastest retrieval. """ mel_banks = self._get_mel_banks(vtln_warp) try: return self._equal_loudness[vtln_warp] except KeyError: ans = kaldi.feat.mel.get_equal_loudness_vector(mel_banks) self._equal_loudness[vtln_warp] = ans return ans def _compute(self, signal, vtln_warp): """Reimplementation of Kaldi OfflineFeatureTpl::Compute From src/feat/feature-common.h. Reimplementation needed to integrate Rasta filtering. """ rows_out = kaldi.feat.window.num_frames( signal.nsamples, self._frame_options) cols_out = self.ndims if rows_out == 0: # pragma: nocover return np.zeros((0, 0)) # force the input signal to be 16 bits integers signal = kaldi.matrix.SubVector(signal.astype(np.int16).data) # allocate the output data output = kaldi.matrix.Matrix(rows_out, cols_out) # windowed waveform and windowing function window = kaldi.matrix.Vector() window_function = kaldi.feat.window.FeatureWindowFunction.from_options( self._frame_options) # for each frame for row in range(rows_out): # extract its window and its log energy... raw_log_energy = _extract_window( 0, signal, row, self._frame_options, window_function, window, self.use_energy and self.raw_energy) # ... and extract PLP with optional Rasta filtering self._compute_frame( raw_log_energy, vtln_warp, window, output[row, :]) return output.numpy() def _compute_frame( self, signal_log_energy, vtln_warp, signal_frame, output_frame): """Reimplementation of Kaldi PlpComputer::Computer From src/feat/feature-plp.h. This implementation includes Rasta filtering. """ assert signal_frame.dim == self._frame_options.padded_window_size() assert output_frame.dim == self.ndims mel_banks = self._get_mel_banks(vtln_warp) equal_loudness = self._get_equal_loudness(vtln_warp) assert self.num_ceps <= self.lpc_order + 1 # our num-ceps includes C0 if self.use_energy and not self.raw_energy: signal_log_energy = kaldi.base.math.log(max( kaldi.matrix.functions.vec_vec(signal_frame, signal_frame), np.finfo(float).eps)) # split radix FFT not wrapped in pykaldi so fall back to real_fft even # if frame length is a power of two... kaldi.matrix.functions.real_fft(signal_frame, True) # convert the FFT into a power spectrum, elements 0 ... # signal_frame.dim/2 kaldi.feat.functions.compute_power_spectrum(signal_frame) power_spectrum = signal_frame[:int(signal_frame.dim / 2 + 1)] # convert power spectrum to mel filterbank mel_energies = self._mel_energies_duplicated[1:self.num_bins+1] mel_banks.compute(power_spectrum, mel_energies) if self.rasta: # do rasta filtering mel_energies[:] = self._rasta_filter.filter( mel_energies.numpy(), do_log=True) mel_energies.mul_elements_(equal_loudness) mel_energies.apply_pow_(self.compress_factor) # duplicate first and last elements self._mel_energies_duplicated[0] = self._mel_energies_duplicated[1] self._mel_energies_duplicated[self.num_bins+1] = \ self._mel_energies_duplicated[self.num_bins] self._autocorr_coeffs.set_zero_() # in case of NaNs or infs self._autocorr_coeffs.add_mat_vec_( 1.0, self._idft_bases, kaldi.matrix.common.MatrixTransposeType.NO_TRANS, self._mel_energies_duplicated, 0.0) residual_log_energy = kaldi.feat.mel.compute_lpc( self._autocorr_coeffs, self._lpc_coeffs) residual_log_energy = max(residual_log_energy, np.finfo(float).eps) _lpc2cepstrum(self.lpc_order, self._lpc_coeffs, self._raw_cepstrum) output_frame[1:self.num_ceps] = self._raw_cepstrum[:self.num_ceps - 1] output_frame[0] = residual_log_energy if self.cepstral_lifter != 0: output_frame.mul_elements_(self._lifter_coeffs) if self.cepstral_scale != 1.0: output_frame.scale_(self.cepstral_scale) if self.use_energy: if ( self.energy_floor > 0 and signal_log_energy < self._log_energy_floor): signal_log_energy = self._log_energy_floor output_frame[0] = signal_log_energy if self.htk_compat: # reorder the features log_energy = output_frame[0] for i in range(self.num_ceps - 1): output_frame[i] = output_frame[i+1] output_frame[self.num_ceps - 1] = log_energy
[docs] def process(self, signal, vtln_warp=1.0): """Compute Rasta-PLP features with the specified options Do an optional feature-level vocal tract length normalization (VTLN) when `vtln_warp` != 1.0. Parameters ---------- signal : Audio, shape = [nsamples, 1] The input audio signal to compute the features on, must be mono vtln_warp : float, optional The VTLN warping factor to be applied when computing features. Be 1.0 by default, meaning no warping is to be done. Returns ------- features : `Features`, shape = [nframes, `ndims`] The computed features, output will have as many rows as there are frames (depends on the specified options `frame_shift` and `frame_length`). Raises ------ ValueError If the input `signal` has more than one channel (i.e. is not mono). If `sample_rate` != `signal.sample_rate`. """ # ensure the signal is correct if signal.nchannels != 1: raise ValueError( 'signal must have one dimension, but it has {}' .format(signal.nchannels)) if self.sample_rate != signal.sample_rate: raise ValueError( 'processor and signal mismatch in sample rates: ' '{} != {}'.format(self.sample_rate, signal.sample_rate)) # extract the PLP features self._reset_buffers() data = self._compute(signal, vtln_warp) return Features( data, self.times(data.shape[0]), properties=self.get_properties(vtln_warp=vtln_warp), validate=False)