Source code for shennong.features.processor.rastaplp

"""Extraction of RASTA-PLP features from a speech signal

    :class:`~shennong.audio.Audio` --> RastaPlpProcessor
    --> :class:`~shennong.features.features.Features`

Implementation of the RASTA-PLP features extraction algorithm (see
[labrosa]_ and [rastapy]_ for implementations and [Herm94]_ for the
paper).

Examples
--------

Compute RASTA-PLP features on some speech signal:

>>> from shennong.audio import Audio
>>> from shennong.features.processor.rastaplp import RastaPlpProcessor
>>> audio = Audio.load('./test/data/test.wav')
>>> processor = RastaPlpProcessor(order=8)
>>> features = processor.process(audio)
>>> features.shape
(140, 9)

The output dimension depends on the PLP ``order`` parameter:

>>> processor.order = 10
>>> features = processor.process(audio)
>>> features.shape
(140, 11)


References
----------

.. [labrosa]
     https://labrosa.ee.columbia.edu/matlab/rastamat/

.. [rastapy]
     https://github.com/mystlee/rasta_py

.. [Herm94]
     H. Hermansky and N. Morgan, "RASTA processing of speech", IEEE
     Trans. on Speech and Audio Proc., vol. 2, no. 4, pp. 578-589,
     Oct. 1994.

"""

import kaldi.feat.window
import kaldi.matrix
import numpy as np
import scipy.fftpack
import scipy.signal

from shennong.features import Features
from shennong.features.processor.base import FramesProcessor


def _audspec(p_spectrum, fs=16000, nfilts=0, fbtype='bark',
             min_freq=0, max_freq=0, sumpower=True, bandwidth=1):
    if max_freq == 0:
        max_freq = fs / 2

    nfreqs = p_spectrum.shape[0]
    nfft = (int(nfreqs) - 1) * 2

    if fbtype == 'bark':
        wts = _fft2barkmx(nfft, fs, nfilts, bandwidth, min_freq, max_freq)
    else:  # pragma: nocover
        if fbtype == 'mel':
            htk = False
            constamp = False
        elif fbtype == 'htkmel':
            htk = True
            constamp = True
        elif fbtype == 'fcmel':
            htk = True
            constamp = False
        else:
            raise ValueError('unknown fbtype')

        wts = _fft2melmx(
            nfft, fs, nfilts, bandwidth, min_freq, max_freq,
            htk=htk, constamp=constamp)

    wts = wts[:, 0:nfreqs]

    if sumpower:
        aspectrum = np.matmul(wts, p_spectrum)
    else:  # pragma: nocover
        aspectrum = np.matmul(wts, np.sqrt(p_spectrum)) ** 2
    return aspectrum


def _hz2bark(f):
    return 6 * np.arcsinh(f / 600)


def _bark2hz(z):
    return 600 * np.sinh(z / 6)


def _fft2barkmx(fft_length, fs, nfilts, band_width, min_freq, max_freq):
    min_bark = _hz2bark(min_freq)
    nyqbark = _hz2bark(max_freq) - min_bark

    if nfilts == 0:
        nfilts = np.ceil(nyqbark) + 1

    wts = np.zeros((int(nfilts), int(fft_length)))
    step_barks = nyqbark / (nfilts - 1)
    binbarks = _hz2bark(np.arange(0, fft_length / 2 + 1) * fs / fft_length)

    for i in range(int(nfilts)):
        f_bark_mid = min_bark + i * step_barks
        lof = binbarks - f_bark_mid - 0.5
        hif = binbarks - f_bark_mid + 0.5
        minimum = np.minimum(0, np.minimum(hif, -2.5 * lof) / band_width)
        wts[i, 0:int(fft_length / 2) + 1] = np.power(10, minimum)
    return wts


def _fft2melmx(fft_length, fs, nfilts=0, band_width=1, min_freq=0, max_freq=0,
               htk=False, constamp=False):  # pragma: nocover
    if nfilts == 0:
        nfilts = np.ceil(_hz2mel(max_freq, htk) / 2)
    if max_freq == 0:
        max_freq = fs / 2

    wts = np.zeros((int(nfilts), int(fft_length)))
    fftfrqs = (np.arange(0, fft_length / 2 + 1) / fft_length) * fs

    min_mel = _hz2mel(min_freq, htk)
    max_mel = _hz2mel(max_freq, htk)
    binfrqs = _mel2hz(np.add(min_mel, np.multiply(
        np.arange(0, nfilts + 2), (max_mel - min_mel) / (nfilts + 1))), htk)

    for i in range(int(nfilts)):
        fs_tmp = binfrqs[np.arange(0, 3) + i]
        fs_tmp = fs_tmp[1] + band_width * (fs_tmp - fs_tmp[1])
        loslope = (fftfrqs - fs_tmp[0]) / (fs_tmp[1] - fs_tmp[0])
        hislope = (fs_tmp[2] - fftfrqs) / (fs_tmp[2] - fs_tmp[1])
        wts[i, 0:int(fft_length / 2) + 1] = np.maximum(
            0, np.minimum(loslope, hislope))

    if constamp is False:
        sub = binfrqs[2:int(nfilts) + 2] - binfrqs[0:int(nfilts)]
        wts = np.matmul(np.diag(2 / sub), wts)

    return wts


def _hz2mel(f, htk=False):  # pragma: nocover
    if htk:
        z = 2595 * np.log10(1 + f / 700)
    else:
        f_0 = 0.0
        f_sp = 200 / 3
        brkfrq = 1000
        brkpt = (brkfrq - f_0) / f_sp
        logstep = np.exp(np.log(6.4) / 27.0)

        f = np.array(f, ndmin=1)
        z = np.zeros((f.shape[0], ))

        for i in range(f.shape[0]):
            if f[i] < brkpt:
                z[i] = (f[i] - f_0) / f_sp
            else:
                z[i] = brkpt + (np.log(f[i] / brkfrq) / np.log(logstep))
    return z


def _mel2hz(z, htk=False):  # pragma: nocover
    if htk:
        f = 700 * (np.power(10, z / 2595) - 1)
    else:
        f_0 = 0
        f_sp = 200/3
        brkfrq = 1000
        brkpt = (brkfrq - f_0) / f_sp
        logstep = np.exp(np.log(6.4) / 27.0)

        z = np.array(z, ndmin=1)
        f = np.zeros((z.shape[0], ))

        for i in range(z.shape[0]):
            if z[i] < brkpt:
                f[i] = f_0 + f_sp * z[i]
            else:
                f[i] = brkfrq * np.exp(np.log(logstep) * (z[i] - brkpt))
    return f


def _rastafilt(x):
    numer = np.arange(-2, 3)
    numer = -numer / np.sum(numer ** 2)
    denom = np.array([1, -0.94])

    zi = scipy.signal.lfilter_zi(numer, 1)
    y = np.zeros((x.shape))
    for i in range(x.shape[0]):
        y1, zi = scipy.signal.lfilter(
            numer, 1, x[i, 0:4], axis=0, zi=zi * x[i, 0])
        y1 = y1*0
        y2, _ = scipy.signal.lfilter(
            numer, denom, x[i, 4:x.shape[1]], axis=0, zi=zi)
        y[i, :] = np.append(y1, y2)
    return y


def _postaud(x, fmax, fbtype='bark', broaden=False):
    nbands, nframes = x.shape
    nfpts = int(nbands + 2 * broaden)

    if fbtype == 'bark':
        bandcfhz = _bark2hz(
            np.linspace(0, _hz2bark(fmax), nfpts))
    elif fbtype == 'mel':  # pragma: nocover
        bandcfhz = _mel2hz(
            np.linspace(0, _hz2mel(fmax), nfpts))
    elif fbtype == 'htkmel' or fbtype == 'fcmel':  # pragma: nocover
        bandcfhz = _mel2hz(
            np.linspace(0, _hz2mel(fmax, htk=True), nfpts), htk=True)

    bandcfhz = bandcfhz[broaden:(nfpts - broaden)]

    fsq = np.power(bandcfhz, 2)
    ftmp = fsq + 1.6e5
    eql = np.power(fsq / ftmp, 2) * (fsq + 1.44e6) / (fsq + 9.61e6)

    z = np.multiply(np.tile(eql, (nframes, 1)).T, x)
    z = np.power(z, 0.33)

    if broaden:  # pragma: nocover
        y = np.zeros((z.shape[0] + 2, z.shape[1]))
        y[0, :] = z[0, :]
        y[1:nbands + 1, :] = z
        y[nbands + 1, :] = z[z.shape[0] - 1, :]
    else:
        y = np.zeros((z.shape[0], z.shape[1]))
        y[0, :] = z[1, :]
        y[1:nbands - 1, :] = z[1:z.shape[0] - 1, :]
        y[nbands - 1, :] = z[z.shape[0] - 2, :]

    return y, eql


def _dolpc(x, modelorder=8):
    nbands, nframes = x.shape
    ncorr = 2 * (nbands - 1)
    R = np.zeros((ncorr, nframes))

    R[0:nbands, :] = x
    for i in range(nbands - 1):
        R[i + nbands - 1, :] = x[nbands - (i + 1), :]

    r = scipy.fftpack.ifft(R.T).real.T
    r = r[0:nbands, :]

    y = np.ones((nframes, modelorder + 1))
    e = np.zeros((nframes, 1))

    for i in range(nframes):
        y_tmp, e_tmp, _ = _levinson(
            r[:, i], modelorder, allow_singularity=True)
        if modelorder != 0:
            y[i, 1:modelorder + 1] = y_tmp
        e[i, 0] = e_tmp

    y = y.T / (np.tile(e.T, (modelorder + 1, 1)) + 1e-8)

    return y


# TODO optimize: this is the most inefficient function in the
# processor, takes about half of the compute time. Complexity is
# o(N**2 + N). See scipy.linalg.solve_toeplitz: can be
# solve_toeplitz((r[:-1], r.conj()[:-1]), -r[1:]) but the function does
# not return prediction error...
def _levinson(r, order=None, allow_singularity=False):
    r"""Levinson-Durbin recursion.

    Find the coefficients of a length(r)-1 order autoregressive linear
    process

    Parameters
    ----------
    r : numpy array
        Autocorrelation sequence of length N + 1 (first element being
        the zero-lag autocorrelation)
    order : int, optional
        Requested order of the autoregressive coefficients, default is N
    allow_singularity : bool, optional
        False by default. Other implementations may be True (e.g., octave)

    Returns
    -------

    * the `N+1` autoregressive coefficients :math:`A=(1, a_1...a_N)`
    * the prediction errors
    * the `N` reflections coefficients values

    This algorithm solves the set of complex linear simultaneous equations
    using Levinson algorithm.

    .. math::

        \bold{T}_M \left(\begin{array}{c} 1 \\ \bold{a}_M \end{array} \right) =
        \left( \begin{array}{c} \rho_M \\ \bold{0}_M  \end{array} \right)

    where :math:`\bold{T}_M` is a Hermitian Toeplitz matrix with elements
    :math:`T_0, T_1, \dots ,T_M`.

    .. note:: Solving this equations by Gaussian elimination would
        require :math:`M^3` operations whereas the levinson algorithm
        requires :math:`M^2+M` additions and :math:`M^2+M` multiplications.

    This is equivalent to solve the following symmetric Toeplitz system of
    linear equations

    .. math::

        \left( \begin{array}{cccc}
        r_1 & r_2^* & \dots & r_{n}^*\\
        r_2 & r_1^* & \dots & r_{n-1}^*\\
        \dots & \dots & \dots & \dots\\
        r_n & \dots & r_2 & r_1 \end{array} \right)
        \left( \begin{array}{cccc}
        a_2\\
        a_3 \\
        \dots \\
        a_{N+1}  \end{array} \right)
        =
        \left( \begin{array}{cccc}
        -r_2\\
        -r_3 \\
        \dots \\
        -r_{N+1}  \end{array} \right)

    where :math:`r = (r_1 ... r_{N+1})` is the input autocorrelation
    vector, and :math:`r_i^*` denotes the complex conjugate of
    :math:`r_i`. The input r is typically a vector of autocorrelation
    coefficients where lag 0 is the first element :math:`r_1`.


    Examples
    --------

    >>> import numpy as np
    >>> from shennong.features.processor.rastaplp import _levinson
    >>> T = np.array([3., -2+0.5j, .7-1j])
    >>> a, e, k = _levinson(T)
    >>> a
    array([0.86315789+0.03157895j, 0.34736842+0.21052632j])
    >>> e
    1.322105263157895
    >>> k
    array([0.66666667-0.16666667j, 0.34736842+0.21052632j])

    Notes
    -----

    This function is taken from the Python spectrum package
    (https://github.com/cokelaer/spectrum) and is under BSD-3-Clause
    license. **Copyright 2011, Thomas Cokelaer**.

    """
    T0 = np.real(r[0])
    T = r[1:]
    M = len(T)

    if order is None:
        M = len(T)
    else:
        if order > M:  # pragma: nocover (checked by RastaPlpProcessor.order)
            raise ValueError(
                f'order ({order}) must be less than size of '
                f'the input data ({M})')
        M = order

    realdata = np.isrealobj(r)
    dtype = float if realdata else complex
    A = np.zeros(M, dtype=dtype)
    ref = np.zeros(M, dtype=dtype)

    P = T0

    for k in range(0, M):
        save = T[k]
        if k == 0:
            temp = -save / P
        else:
            # save += sum([A[j]*T[k-j-1] for j in range(0,k)])
            for j in range(0, k):
                save = save + A[j] * T[k-j-1]
            temp = -save / P

        if realdata:
            P *= 1. - temp ** 2
        else:
            P *= 1. - (temp.real ** 2 + temp.imag ** 2)

        if P <= 0 and allow_singularity is False:  # pragma: nocover
            raise ValueError("singular matrix")

        A[k] = temp
        ref[k] = temp  # save reflection coeff at each step
        if k == 0:
            continue

        khalf = (k+1) // 2
        if realdata:
            for j in range(0, khalf):
                kj = k-j-1
                save = A[j]
                A[j] = save + temp * A[kj]
                if j != kj:
                    A[kj] += temp*save
        else:
            for j in range(0, khalf):
                kj = k-j-1
                save = A[j]
                A[j] = save + temp * A[kj].conjugate()
                if j != kj:
                    A[kj] = A[kj] + temp * save.conjugate()

    return A, P, ref


def _lpc2cep(a, nout):
    nin, ncol = a.shape

    cep = np.zeros((nout, ncol))
    cep[0, :] = -np.log(a[0, :])

    norm_a = a / (np.tile(a[0, :], (nin, 1)) + 1e-8)

    for n in range(1, nout):
        _sum = 0
        for m in range(1, n):
            _sum += (n - m) * norm_a[m, :] * cep[(n - m), :]

        cep[n, :] = -(norm_a[n, :] + _sum / n)

    return cep


def _lpc2spec(lpcas, nout, fmout=False):
    rows, cols = lpcas.shape
    order = rows - 1

    gg = lpcas[0, :]
    aa = lpcas / np.tile(gg, (rows, 1))

    # Calculate the actual z-plane polyvals: nout points around unit circle
    tmp_1 = np.array(np.arange(0, nout), ndmin=2).T
    tmp_1 = -1j * tmp_1 * np.pi / (nout - 1)
    tmp_2 = np.array(np.arange(0, order + 1), ndmin=2)
    zz = np.exp(np.matmul(tmp_1, tmp_2))

    # Actual polyvals, in power (mag^2)
    features = np.divide(
        np.power(np.divide(1, np.abs(np.matmul(zz, aa))), 2),
        np.tile(gg, (nout, 1)))
    F = np.zeros((cols, int(np.ceil(rows / 2))))
    M = F

    if fmout is True:  # pragma: nocover
        for c in range(cols):
            aaa = aa[:, c]
            rr = np.roots(aaa)
            ff_tmp = np.angle(rr)
            ff = np.array(ff_tmp, ndmin=2).T
            zz = np.exp(np.multiply(1j, np.matmul(ff, np.array(
                np.arange(0, aaa.shape[0]), ndmin=2))))
            mags = np.sqrt(np.divide(np.power(np.divide(
                1, np.abs(np.matmul(
                    zz, np.array(aaa, ndmin=2).T))), 2), gg[c]))

            ix = np.argsort(ff_tmp)
            dummy = np.sort(ff_tmp)
            tmp_F_list = []
            tmp_M_list = []
            for i in range(ff.shape[0]):
                if dummy[i] > 0:
                    tmp_F_list = np.append(tmp_F_list, dummy[i])
                    tmp_M_list = np.append(tmp_M_list, mags[ix[i]])

            M[c, 0:tmp_M_list.shape[0]] = tmp_M_list
            F[c, 0:tmp_F_list.shape[0]] = tmp_F_list

    return features, F, M


def _spec2cep(spec, ncep=13, dcttype=2):
    nrow = spec.shape[0]
    dctm = np.zeros((ncep, nrow))

    if dcttype == 2 or dcttype == 3:
        for i in range(ncep):
            dctm[i, :] = (
                np.cos(i * np.arange(1, 2 * nrow, 2) / (2 * nrow) * np.pi)
                * np.sqrt(2 / nrow))

        if dcttype == 2:
            dctm[0, :] = dctm[0, :] / np.sqrt(2)

    elif dcttype == 4:  # pragma: nocover
        for i in range(ncep):
            dctm[i, :] = (2 * np.cos(
                np.pi * (i * np.arange(1, nrow + 1)) / (nrow + 1)))
            dctm[i, 0] += 1
            dctm[i, nrow - 1] *= np.power(-1, i)
        dctm /= 2 * (nrow + 1)

    else:  # pragma: nocover
        for i in range(ncep):
            dctm[i, :] = np.divide(
                np.multiply(np.cos(np.multiply(
                    np.divide(np.multiply(i, np.arange(0, nrow)), (nrow - 1)),
                    np.pi)), 2), 2 * (nrow - 1))
        dctm[:, 0] = dctm[:, 0] * 0.5
        dctm[:, int(nrow - 1)] = dctm[:, int(nrow - 1)] * 0.5

    cep = np.matmul(dctm, np.log(spec + 1e-8))

    return cep, dctm


def _lifter(x, log, lift=0.6, inverse=False):
    ncep = x.shape[0]

    if lift == 0:  # pragma: nocover
        return x

    if lift < 0:  # pragma: nocover
        log.warning(
            'HTK liftering does not support yet; default liftering')
        lift = 0.6

    liftwts = np.power(np.arange(1, ncep), lift)
    liftwts = np.append(1, liftwts)

    if inverse:  # pragma: nocover
        liftwts = 1 / liftwts

    y = np.matmul(np.diag(liftwts), x)
    return y


[docs]class RastaPlpProcessor(FramesProcessor): def __init__(self, sample_rate=16000, do_rasta=True, order=8, frame_shift=0.01, frame_length=0.025, dither=1.0, preemph_coeff=0.97, remove_dc_offset=True, window_type='povey', round_to_power_of_two=True, blackman_coeff=0.42, snip_edges=True): super().__init__( sample_rate=sample_rate, frame_shift=frame_shift, frame_length=frame_length, dither=dither, preemph_coeff=preemph_coeff, remove_dc_offset=remove_dc_offset, window_type=window_type, round_to_power_of_two=round_to_power_of_two, blackman_coeff=blackman_coeff, snip_edges=snip_edges) self.do_rasta = do_rasta self.order = order @property def name(self): return 'rasta-plp' @property def ndims(self): return 13 if self.order == 0 else self.order + 1 @property def do_rasta(self): """If False, just calculate the PLP, default to True""" return self._do_rasta @do_rasta.setter def do_rasta(self, value): self._do_rasta = bool(value) @property def order(self): """Order of the PLP model Must be an integer in [0, 12], 0 means no PLP """ return self._order @order.setter def order(self, value): if not isinstance(value, int) or value < 0 or value > 12: raise ValueError('order must be an integer in [0, 12]') self._order = value def _power_spectrum(self, signal, block_size=64): num_frames = kaldi.feat.window.num_frames( signal.nsamples, self._frame_options) window_size = self._frame_options.padded_window_size() window_function = kaldi.feat.window.FeatureWindowFunction.from_options( self._frame_options) # convert the audio input to a Kaldi vector view kaldi_signal = kaldi.matrix.SubVector(signal.data) # preallocate the power spectrum matrix power_spectrum = np.empty( (1 + window_size // 2, num_frames), dtype=np.complex) # preallocate frames buffer single_frame = kaldi.matrix.Vector(window_size) buffer_frames = np.empty((window_size, block_size), dtype=np.float32) for min_frame in range(0, num_frames, block_size): max_frame = min(min_frame + block_size, num_frames) for i in range(min_frame, max_frame): # extract the frame i with Kaldi (result goes in single_frame) kaldi.feat.window.extract_window( 0, kaldi_signal, i, self._frame_options, window_function, single_frame) buffer_frames[:, i - min_frame] = single_frame.numpy() # compute the power spectrum per block power_spectrum[:, min_frame:max_frame] = np.fft.rfft( buffer_frames[:, :max_frame - min_frame], axis=0) return np.abs(power_spectrum) ** 2 def _rastaplp(self, signal): # compute power spectrum pow_spectrum = self._power_spectrum(signal) # group to critical bands aspectrum = _audspec(pow_spectrum, signal.sample_rate) nbands = aspectrum.shape[0] if self.do_rasta: # log domain, add an epsilon in case of log(0) nl_aspectrum = np.log(aspectrum + 1e-8) # rasta filtering ras_nl_aspectrum = _rastafilt(nl_aspectrum) # inverse log aspectrum = np.exp(ras_nl_aspectrum) postspectrum, _ = _postaud(aspectrum, signal.sample_rate / 2) lpcas = _dolpc(postspectrum, self.order) cepstra = _lpc2cep(lpcas, self.order + 1) if self.order > 0: lpcas = _dolpc(postspectrum, self.order) cepstra = _lpc2cep(lpcas, self.order + 1) spectra, F, M = _lpc2spec(lpcas, nbands) else: spectra = postspectrum cepstra, _ = _spec2cep(spectra) cepstra = _lifter(cepstra, self._log, 0.6) return cepstra
[docs] def process(self, signal): # ensure the signal is correct if signal.nchannels != 1: raise ValueError( 'signal must have one dimension, but it has {}' .format(signal.nchannels)) if self.sample_rate != signal.sample_rate: raise ValueError( 'processor and signal mismatch in sample rates: ' '{} != {}'.format(self.sample_rate, signal.sample_rate)) # force the signal to be int16 signal = signal.astype(np.int16) # extract the features data = self._rastaplp(signal) return Features( data.T.astype(np.float32), self.times(data.T.shape[0]), properties=self.get_properties())