Source code for shennong.features.frames

"""Provides the Frames class to extract frames from raw signals

Extracts overlapping frames from raw (sampled) signals::

    array ---> Frames ---> array

Examples
--------

>>> import numpy as np
>>> from shennong.features.frames import Frames

Build a discrete signal

>>> a = np.arange(10)
>>> a
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

Computes frames of 3s with a shift of 1s (here we assume fs=1Hz
for simplicity)

>>> f = Frames(sample_rate=1, frame_shift=1, frame_length=3)
>>> b = f.make_frames(a)
>>> b
array([[0, 1, 2],
       [1, 2, 3],
       [2, 3, 4],
       [3, 4, 5],
       [4, 5, 6],
       [5, 6, 7],
       [6, 7, 8],
       [7, 8, 9]])

"""

import kaldi.feat.window
import numpy as np

from shennong.base import BaseProcessor


[docs]class Frames(BaseProcessor): """Extract frames from raw signals""" def __init__(self, sample_rate=16000, frame_shift=0.01, frame_length=0.025, snip_edges=True): self._options = kaldi.feat.window.FrameExtractionOptions() self.sample_rate = sample_rate self.frame_shift = frame_shift self.frame_length = frame_length self.snip_edges = snip_edges @property def sample_rate(self): """Waveform sample frequency in Hertz Must match the sample rate of the signal specified in `process` """ return self._options.samp_freq @sample_rate.setter def sample_rate(self, value): self._options.samp_freq = value @property def frame_shift(self): """Frame shift in seconds""" return self._options.frame_shift_ms / 1000.0 @frame_shift.setter def frame_shift(self, value): self._options.frame_shift_ms = value * 1000.0 @property def frame_length(self): """Frame length in seconds""" return self._options.frame_length_ms / 1000.0 @frame_length.setter def frame_length(self, value): self._options.frame_length_ms = value * 1000.0 @property def snip_edges(self): """If true, output only frames that completely fit in the file When True the number of frames depends on the `frame_length`. If False, the number of frames depends only on the `frame_shift`, and we reflect the data at the ends. """ return self._options.snip_edges @snip_edges.setter def snip_edges(self, value): self._options.snip_edges = value @property def samples_per_frame(self): """The number of samples in one frame""" return int(self.frame_length * self.sample_rate) @property def samples_per_shift(self): """The number of samples between two shifts""" return int(self.frame_shift * self.sample_rate)
[docs] def nframes(self, nsamples): """Returns the number of frames extracted from `nsamples` This function returns the number of frames that we can extract from a wave file with the given number of samples in it (assumed to have the same sampling rate as specified in init). Parameters ---------- nsamples : int The number of samples in the input Returns ------- nframes : int The number of frames extracted from `nsamples` Raises ------ ValueError If ``samples_per_shift == 0``, meaning the sample rate is to low w.r.t the frame shift. """ if self.samples_per_shift == 0: raise ValueError('cannot compute nframes: sample rate too low') return int(kaldi.feat.window.num_frames( nsamples, self._options, flush=True))
[docs] def first_sample_of_frame(self, frame): """Returns the index of the first sample of frame indexed `frame`""" return int(frame * self.samples_per_shift)
[docs] def last_sample_of_frame(self, frame): """Returns the index+1 of the last sample of frame indexed `frame`""" return int(self.first_sample_of_frame(frame) + self.samples_per_frame)
[docs] def boundaries(self, nframes): """Returns an array of (istart, istop) index boundaries of frames Parameters ---------- nframes : int The number of frames to generate Returns ------- boundaries : array, shape = [nframes, 2] The start and stop indices of each frame extracted from `nsamples` samples. """ first = [self.first_sample_of_frame(i) for i in range(nframes)] return (np.asarray(first).repeat(2).reshape(nframes, 2) + (0, self.samples_per_frame)).astype(np.int)
[docs] def make_frames(self, array, writeable=False): """Returns an `array` divided in frames Parameters ---------- array : array, shape = [x, ...] The array to be divided in frames writeable : bool, optional Default to False. When True, the returned array is writable but the frames are made of copies of the original `array`. When False, the result is read-only but this optimizes the process: no explicit copy is made of the orignal `array`, only views are used. (see https://docs.scipy.org/doc/numpy-1.15.0/reference/generated/ numpy.lib.stride_tricks.as_strided.html) Returns ------- frames : array, shape = [nframes(x), samples_per_frame, ...] The frames computed from the original `array` """ nframes = self.nframes(array.shape[0]) # special case when not sniping edges: mirror the data in the # last frames if not self.snip_edges: n = self.last_sample_of_frame(nframes-1) - array.shape[0] array = np.concatenate((array, array[-n-1:-1][::-1])) if writeable is True: return self._make_frames_by_copy(array, nframes) else: return self._make_frames_by_view(array, nframes)
def _make_frames_by_view(self, array, nframes): # shape of the frames, concatenate the shape for supplementary # dimensions shape = (nframes, self.samples_per_frame) + array.shape[1:] # strides for the framed array, don't touch the strides for # the additional dimensions strides = (array.strides[0] * self.samples_per_shift, array.strides[0]) + array.strides[1:] return np.lib.stride_tricks.as_strided( array, shape=shape, strides=strides, writeable=False) def _make_frames_by_copy(self, array, nframes): # the frames boundaries boundaries = self.boundaries(nframes) nsamples = self.samples_per_frame # allocate the framed array framed = np.empty( (nframes, nsamples) + array.shape[1:], dtype=array.dtype) # build the frames for i, (start, stop) in enumerate(boundaries): assert stop - start == nsamples framed[i] = array[start:stop] return framed