Source code for shennong.features.processor.onehot

"""One hot encoding of time-aligned tokens


    :class:`~shennong.alignment.Alignment` ---> {Framed}OneHotProcessor \
    ---> :class:`~shennong.features.features.Features`


One hot features are built from a time alignement of the spoken
tokens. They come in two flavours:

* :class:`OneHotProcessor` simply encode tokens in an alignment into
  on hot vectors

* :class:`FramedOneHotProcessor` includes the alignment into windowed
  frames before doing the one hot encoding

Examples
--------

Create a fake alignment:

>>> import numpy as np
>>> from shennong.alignment import Alignment
>>> alignment = Alignment(np.asarray([[0, 1], [1, 2]]), np.asarray(['a', 'b']))
>>> alignment
0 1 a
1 2 b

Extract onehot vectors from it:

>>> from shennong.features.processor.onehot import OneHotProcessor
>>> processor = OneHotProcessor()
>>> onehot = processor.process(alignment)
>>> onehot.times
array([[0, 1],
       [1, 2]])
>>> onehot.data
array([[ True, False],
       [False,  True]])

"""

import collections
import operator

import numpy as np

import shennong.features.window
from shennong.features import Features
from shennong.features.frames import Frames
from shennong.features.processor.base import FeaturesProcessor


class _OneHotBase(FeaturesProcessor):
    def __init__(self, tokens=None):
        self.tokens = tokens

    @property
    def name(self):
        return 'onehot'

    @property
    def tokens(self):
        return self._tokens

    @tokens.setter
    def tokens(self, value):
        if value is None:
            self._tokens = None
        else:
            self._tokens = sorted(set(value))

    @property
    def ndims(self):
        if self.tokens:
            return len(self.tokens)
        else:
            raise ValueError(
                'onehot tokens are not defined, cannot know their dimension')

    def _tokens_set(self, alignment):
        # if no tokens list specified, take them from the alignment
        if self.tokens is None:
            return alignment.get_tokens_inventory()
        else:
            errors = [p for p in set(alignment.tokens) if p not in self.tokens]
            if errors != []:
                raise ValueError(
                    'following tokens are in alignment but not defined in the '
                    'onehot features processor: {}'.format(errors))
        return self.tokens

    def _token2index(self, alignment):
        tokens = self._tokens_set(alignment)
        return {p: i for i, p in enumerate(sorted(tokens))}


[docs]class OneHotProcessor(_OneHotBase): """Simple version of one hot features encoding The `OneHotProcessor` directly converts an :class:`Alignment` to :class:`features.Features` while preserving the timestamps of the original alignment. Parameters ---------- tokens : sequence, optional The tokens composing the alignment. Specify the tokens if you want to have consistant one-hot vectors accross different :class:`Features`. By default the tokens are extracted from the alignment in :meth:`process`. """ def __init__(self, tokens=None): super().__init__(tokens=tokens)
[docs] def process(self, alignment): # build a bijection token <-> onehot index token2index = self._token2index(alignment) # initialize the data matrix with zeros, TODO should data be a # scipy.sparse matrix? data = np.zeros( (alignment.tokens.shape[0], len(token2index)), dtype=np.bool) # fill the data with onehot encoding of tokens for i, p in enumerate(alignment.tokens): data[i, token2index[p]] = 1 try: properties = self.get_properties() except ValueError: # tokens not defined self.tokens = token2index.keys() properties = self.get_properties() self.tokens = None properties[self.name].update({'token2index': token2index}) return Features( data, alignment.times, properties=properties)
[docs]class FramedOneHotProcessor(_OneHotBase): """One-hot encoding on framed signals Computes the one-hot encoding on framed signals (i.e. on overlapping time windows) Parameters ---------- tokens : sequence, optional The tokens composing the alignment. Specify the tokens if you want to have consistant one-hot vectors accross different :class:`Features`. By default the tokens are extracted from the alignment in :func:`process`. sample_rate : int, optional Sample frequency used for frames, in Hz, default to 16kHz frame_shift : float, optional Frame shift in seconds, default to 10ms frame_length : float, optional Frame length in seconds, default to 25ms window_type : {'povey', 'hanning', 'hamming', 'rectangular', 'blackman'} The type of the window, default is 'povey' (like hamming but goes to zero at edges) blackman_coeff : float, optional The constant coefficient for generalized Blackman window, used only when `window_type` is 'blackman', default is 0.42. """ def __init__(self, tokens=None, sample_rate=16000, frame_shift=0.01, frame_length=0.025, window_type='povey', blackman_coeff=0.42): super().__init__(tokens=tokens) self.frame = Frames( sample_rate=sample_rate, frame_shift=frame_shift, frame_length=frame_length) self.window_type = window_type self.blackman_coeff = blackman_coeff @property def sample_rate(self): """The processor operation sample rate Must match the sample rate of the signal specified in `process` """ return self.frame.sample_rate @sample_rate.setter def sample_rate(self, value): self.frame.sample_rate = value @property def frame_shift(self): """Frame shift in seconds""" return self.frame.frame_shift @frame_shift.setter def frame_shift(self, value): self.frame.frame_shift = value @property def frame_length(self): """Frame length in seconds""" return self.frame.frame_length @frame_length.setter def frame_length(self, value): self.frame.frame_length = value
[docs] def process(self, alignment): # build a bijection token <-> onehot index token2index = self._token2index(alignment) # sample the alignment at the requested sample rate sampled = alignment.at_sample_rate(self.frame.sample_rate) # get the frames as pairs (istart:istop) nframes = self.frame.nframes(sampled.shape[0]) frame_boundaries = self.frame.boundaries(nframes) # allocate the features data data = np.zeros( (frame_boundaries.shape[0], len(token2index)), dtype=np.bool) # allocate the window function window = shennong.features.window.window( self.frame.samples_per_frame, type=self.window_type, blackman_coeff=self.blackman_coeff) for i, (onset, offset) in enumerate(frame_boundaries): framed = sampled[onset:offset] # the frame is made of a single token, no needs to compute # a window function if np.all(framed[0] == framed[1:]): winner = framed[0] else: # several tokens in the frame, compute the weights weights = collections.defaultdict(int) for j, w in enumerate(window): weights[framed[j]] += w # the winner token has the biggest weight winner = sorted( weights.items(), key=operator.itemgetter(1), reverse=True)[0][0] data[i, token2index[winner]] = 1 try: properties = self.get_properties() except ValueError: # tokens not defined self.tokens = token2index.keys() properties = self.get_properties() self.tokens = None properties[self.name].update({'token2index': token2index}) return Features( data, frame_boundaries / self.frame.sample_rate, properties=properties)