Source code for shennong.alignment

"""Handles time alignments of speech signals

This module provides two classes to operate on time alignments:

* :class:`Alignment` is the class representing a time-alignment for a
  single item.

* :class:`AlignmentCollection` is a high-level class to load/save
  alignment files. It exposes a dictionnary of items mapped to
  :class:`Alignment` instances.

The time alignements are used as input to the
:class:`~shennong.features.processor.onehot.OneHotProcessor` and
:class:`~shennong.features.processor.onehot.FramedOneHotProcessor`
features processors.

----------------------------------

A speech signal is time-aligned when, for each pronunced token (phone
or word) in the speech, their associated onset and offset times are
provided. An alignment can be obtained manually (by annotation), or
automatically (using a Kaldi recipe for example).

Alignment files supported by `shennong` are text files (optionnaly
compressed) in which each line is formatted as follow::

     <item> <onset> <offset> <token>

The ``<item>`` can be the reference of an utterance, a speaker, or a
file. The ``<onset>`` and ``<offset>`` are begin and end timestamps
(in seconds) of the ``<token>`` being pronunced. An exemple file is
located in ``shennong/test/data/alignment.txt`` and has been produced
by a Kaldi forced-alignement recipe. Here are its first 10 lines::

    S01F1522_0001 0.0125 0.1125 e:
    S01F1522_0001 0.1125 0.2225 t
    S01F1522_0001 0.2225 0.3125 o
    S01F1522_0001 0.3125 0.3625 u
    S01F1522_0001 0.3625 0.4225 r
    S01F1522_0001 0.4225 0.4925 e
    S01F1522_0001 0.4925 0.5925 sy
    S01F1522_0001 0.5925 0.8925 i
    S01F1522_0001 0.8925 1.2025 k
    S01F1522_0001 1.2025 1.2825 u


Examples
--------

Load a collection of 34 alignments from the provided test file:

>>> from shennong.alignment import AlignmentCollection
>>> alignments = AlignmentCollection.load('./test/data/alignment.txt')
>>> len(alignments.keys())
34

Get the alignment of one item, an item from an
:class:`AlignmentCollection` is an instance of :class:`Alignment`:

>>> ali1 = alignments['S01F1522_0033']
>>> type(ali1)
<class 'shennong.alignment.Alignment'>
>>> ali1.duration()
0.64
>>> print(ali1)
0.0125 0.0425 m
0.0425 0.1225 a
0.1225 0.1825 s
0.1825 0.2425 o
0.2425 0.3025 r
0.3025 0.3625 e
0.3625 0.4325 k
0.4325 0.4925 a
0.4925 0.5625 r
0.5625 0.6525 a

Extract a subpart of the alignment, as an :class:`Alignment` instance
as well:

>>> ali2 = ali1[0.4325:0.6525]
>>> print(ali2)
0.4325 0.4925 a
0.4925 0.5625 r
0.5625 0.6525 a

"""

import gzip
import os
import numpy as np


[docs]class Alignment: """Time alignment of tokens An Alignment handles a time alignment of tokens, i.e. a suite of tokens linked with their onset and offset timestamps. See the :func:`validate` method for a list constraints applying to the `data`. Parameters ---------- times : array of float, shape = [ntokens, 2] The array of (onset, offset) timestamps for each aligned token tokens : array of str, shape = [ntokens, 1] The array of aligned tokens validate : bool, optional When True, checks the alignment is in a valid format, when False does not perform any verification, default is True Raises ------ ValueError When :func:`validate` is True and the alignment data is not correctly formatted """ def __init__(self, times, tokens, validate=True): self._times = times self._tokens = tokens if validate is True: self.validate() @property def times(self): """The (start, stop) timestamps of the aligned tokens in seconds""" return self._times @property def onsets(self): """The start timestamps of the aligned tokens in seconds""" return self._times[:, 0] @property def offsets(self): """The stop timestamps of the aligned tokens in seconds""" return self._times[:, 1] @property def tokens(self): """The aligned tokens associated with timestamps""" return self._tokens
[docs] @staticmethod def from_list(data, validate=True): """Build an Alignment from a list of (tstart, tsop, token) triplets This method checks all elements in the `data` list have 3 fields, convert them to `times` and `data` arrays, and instanciates an Alignment instance with them. Parameters ---------- data : sequence of (tstart, tstop, token) A list or sequence of triplets `(tstart, tstop, token)` representing a time aligned token. `tstart` and `tstop` are the onset and offset of the pronunciation (in seconds). `token` is a string representation of the token. """ # check we have 3 fields in each data entry for i, entry in enumerate(data): if len(entry) != 3: raise ValueError( 'line {}: entry must have 3 fields but has {}' .format(i, len(entry))) times = np.array([d[:2] for d in data], dtype=np.float) tokens = np.array([d[2] for d in data]) return Alignment( times, tokens, validate=validate)
[docs] def validate(self): """Raises a ValueError is the Alignment is not consistent The following conditions must apply for the alignment to be valid: * `onsets`, `offsets` and `tokens` must have the same length * `onsets` and `offsets` must be sorted in increasing order: `data` is a temporal sequence * `onsets[n]` must be lesser than `offsets[n]`: each token in `data` has a strictly positive duration * `offsets[n]` must be equal to `onsets[n+1]`: `data` has a temporal continuity. """ # same length for timestamps and tokens if not self._times.shape[0] == self.tokens.shape[0]: raise ValueError('timestamps and tokens must have the same length') # check tstart < tstop for all timestamps for i in range(self.tokens.shape[0]): if self.onsets[i] >= self.offsets[i]: raise ValueError( 'token {}: onset must be lesser than offset'.format(i)) # check tstarts are sorted in increasing order and tstop[n] # matches tstart[n+1] for i in range(self.tokens.shape[0] - 1): if self.onsets[i] > self.onsets[i+1]: raise ValueError( 'timestamps must be sorted in increasing order') if self.offsets[i] != self.onsets[i+1]: raise ValueError( 'mismatch in tstop/tstart timestamps')
[docs] def is_valid(self): """Returns True if the Alignment is consistent, False otherwise""" try: self.validate() except ValueError: return False return True
def __eq__(self, other): return (np.array_equal(self._times, other._times) and np.array_equal(self.tokens, other.tokens)) def __getitem__(self, time): """Returns data aligned in `time` slice Extracts a subpart of the alignment using slice notation. For example ``alignment[:2.0]`` will extract alignment for the first two seconds, or ``alignment[3.25:4.25]`` will extract one second in the middle of the data. Parameters ---------- time : slice (onset, offset), in seconds The time interval on which to extract the alignment is defined by `slice.start` and `slice.stop`, expressed in seconds. `slice.step` is not used. Returns ------- alignment : Alignment The sub-alignment rextracted from the original one Raises ------ ValueError If `time` is not a slice, or if `time.step` is defined. """ if not isinstance(time, slice): raise ValueError( 'time must be a slice but is {}'.format(type(time))) if time.step is not None: raise ValueError('time.step is defined but is useless') # setup the start and stop timestamp from the `time` slice, # bound them at start and stop timestamps of the alignment tmin = self.onsets[0] tstart = time.start if tstart is None or tstart < tmin: tstart = tmin tmax = self.offsets[-1] tstop = time.stop if tstop is None or tstop > tmax: tstop = tmax # deal with corner cases if tstart >= tstop or tstart >= tmax or tstop <= tmin: return Alignment(np.array([]), np.array([]), validate=False) if tstart == tmin and tstop == tmax: return self # now (tstart, tstop) are in boundaries assert tmin <= tstart < tstop <= tmax # TODO in the following lines we can optimize. This is useless # to do a np.where on the whole timestamps, can we restrict # the area of search using np.searchsorted for instance? # find the start index (last <= tstart) if tstart == tmin: istart = 0 else: istart = np.where(self.onsets <= tstart)[0][-1] # find the stop index (first >= tstop) if tstop == tmax: istop = self.tokens.shape[0] - 1 else: istop = np.where(self.offsets >= tstop)[0][0] # we have a partial read of a single token if istart == istop: tokens = np.array(self.tokens[istart:istart+1]) times = np.array([tstart, tstop]).reshape(1, 2) else: # build the computed subalignment tokens = self.tokens[istart:istop+1] times = np.copy(self._times[istart:istop+1, :]) times[0, 0] = tstart times[-1, 1] = tstop return Alignment(times, tokens, validate=False) def __repr__(self): return '\n'.join( '{} {} {}'.format(t0, t1, p) for (t0, t1, p) in self.to_list())
[docs] def to_list(self): """Returns the alignment as a list of triplets (onset, offset, token) This is the reverse operation of :func:`from_list`. """ return [(self.onsets[i], self.offsets[i], self.tokens[i]) for i in range(self.tokens.shape[0])]
[docs] def at_sample_rate(self, sample_rate): """Returns an array of tokens read at the given `sample_rate`""" # allocate the result data data = np.zeros( (int(self.duration() * sample_rate),), dtype=self.tokens.dtype) # the sampled timestamps times = np.arange(data.shape[0]) / sample_rate + self.onsets[0] j = 0 for i in range(data.shape[0]): while times[i] >= self.offsets[j]: j += 1 data[i] = self.tokens[j] return data
[docs] def duration(self): """Returns the duration of the alignment in seconds""" if len(self.tokens) == 0: return 0 return self.offsets[-1] - self.onsets[0]
[docs] def get_tokens_inventory(self): """Returns the different tokens composing the alignment Returns ------- tokens : set Unique tokens present in the alignment """ return set(self.tokens)
[docs]class AlignmentCollection(dict): """A dictionary of :class:`.Alignment` indexed by items An :class:`AlignmentCollection` is a usual Python dictionary with some additional functions. Keys are strings, values are :class:`Alignment` instances. Parameters ---------- data : sequence of quadruplets A list or a sequence of quadruplets `(item, onset, offset, token)` representing a time aligned token for a given `item`, where `onset` is the start timestamp of the pronunced token, `offset` is the end timestamp of the pronunciation and `token` is a string representation of the token. `onset` and `offset` are expressed in seconds. Raises ------ ValueError If one element of `data` is not a quadruplet, if the Alignment mapped to an `item` cannot be instanciated. """ def __init__(self, data): for i, entry in enumerate(data): if len(entry) != 4: raise ValueError( 'alignment must have 4 columns but line {} has {}' .format(i+1, len(entry))) item = entry[0] # first init of the dict with lists of entries if item not in self.keys(): self[item] = [] self[item].append(entry[1:]) # second init: from list to Alignment for item, data in self.items(): try: self[item] = Alignment.from_list(data, validate=True) except ValueError as err: raise ValueError('item {}: {}'.format(item, err))
[docs] @staticmethod def load(filename, compress=False): """Returns an `AlignmentCollection` loaded from the `alignment_file` The text file, optionally compressed, is read as utf8. It must be composed of lines with 4 fields ``<item> <onset> <offset> <token>``. Parameters ---------- filename : str The path to the alignment file to read, must be an existing text file. Returns ------- alignment : AlignmentCollection The AlignmentCollection instance initialized from the `alignment_file` Raises ------ ValueError If the `alignment_file` is not a valid alignment or if the AlignmentCollection cannot be instanciated. """ if not os.path.isfile(filename): raise ValueError('{}: file not found'.format(filename)) # read the input file compressed or not open_fun = gzip.open if compress is True else open data = [line.split() for line in open_fun(filename, 'rt', encoding='utf8').readlines()] return AlignmentCollection(data)
[docs] def save(self, filename, sort=False, compress=False): """Save the alignments to a `filename` Parameters ---------- filename : str The text file to write (should have a `.txt` extension, or `.txt.gz` if `compress` is True, but this is not required). Must be a non existing file. sort : bool, optional When True, the items are sorted in lexicographical order. Default to False. compress : bool, optional When True the file is compressed using the gzip algorithm. Default to False. Raises ------ ValueError If the `filename` already exists or is not writable. """ # check this file does not exist if os.path.isfile(filename): raise ValueError('{} already exist'.format(filename)) # prepare the items to write, optionally sorted items = self.keys() if sort is True: items = sorted(items) # write in raw text or gzip text format open_fun = gzip.open if compress is True else open try: with open_fun(filename, 'wt', encoding='utf8') as fh: # write the file item by item for item in items: fh.write('\n'.join(self._list_str(item)) + '\n') except FileNotFoundError: raise ValueError('cannot write to {}'.format(filename))
def _list_str(self, item): """Returns an alignment item as a list of strings""" return [item + ' ' + '{} {} {}'.format(l[0], l[1], l[2]) for l in self[item].to_list()]
[docs] def get_tokens_inventory(self): """Returns the different tokens composing the collection Returns ------- tokens : set Unique tokens present in the collection's alignments """ return set.union(*(v.get_tokens_inventory() for v in self.values()))