"""Puddle word segmentation algorithm
Implementation of the puddle philosophy developped by P. Monaghan.
See "Monaghan, P., & Christiansen, M. H. (2010). Words in puddles of sound:
modelling psycholinguistic effects in speech segmentation. Journal of child
language, 37(03), 545-564."
The algorithm has two modes of operation:
- Segmentation and online learning on the same text:
Specify <input-text> only, <input-text> must be in phonologized form. The
PUDDLE model is updated line per line and so segmentation performances are
better at the end. Use --nfolds and --njobs options to run the segmentation
in several folds in parallel.
- Training ans segmentation on separate files:
Specify <input-text> and --train-file <training-file>. Both texts must be in
phonologized form. The PUDDLE model is trained offline on <training-file>,
before the segmentation of <input-text>. In this mode --nfolds and --njobs
options are not valid.
"""
import codecs
import collections
import os
import joblib
from wordseg import utils, folding
[docs]class Puddle:
"""Train and segmenttext with a PUDDLE modelling
Implementation of a PUDDLE model with `train()` and `segment()` methods.
Parameters
----------
window : int, optional
Number of phonemes to be taken into account for boundary constraint.
Default to 2.
by_frequency : bool, optional
When True choose the word candidates by filterring them by frequency.
Default to False.
log : logging.Logger, optional
The logger instance where to send messages.
"""
def __init__(self, window=2, by_frequency=False, log=utils.null_logger()):
self._log = log
self.window = window
self.by_frequency = by_frequency
self._lexicon = collections.Counter()
self._beginning = collections.Counter()
self._ending = collections.Counter()
def __eq__(self, other):
return (
self.window == other.window and
self.by_frequency == other.by_frequency and
self._lexicon == other._lexicon and
self._beginning == other._beginning and
self._ending == other._ending)
[docs] def train(self, text):
"""Train a PUDDLE model from `text`
`text` must be a sequence of strings, each one considered as an
utterance.
"""
for utterance in text:
self._process_utterance(
utterance.strip().split(),
segmented=[],
do_update=True)
[docs] def segment(self, text, update_model=True):
"""Segments a `text` using the trained PUDDLE model
`text` must be a sequence of strings, each one considered as an
utterance.
If `update_model` is True, the model is trained online during
segmentation. Otherwise it stays constant.
Yields the segmented utterances.
"""
for utterance in text:
yield ' '.join(
self._process_utterance(
utterance.strip().split(),
segmented=[], do_update=update_model))
def _filter_by_frequency(self, utterance, i, j):
all_candidates = []
for k in range(j, len(utterance)):
try:
all_candidates.append(
(k, self._lexicon[''.join(utterance[i:k+1])]))
except KeyError:
pass
j, _ = sorted(all_candidates, key=lambda x: x[1])[-1]
return j
def _filter_by_boundary_condition(self, utterance, i, j):
# previous must be word-end
prev_biphone = ''.join(utterance[i - self.window:i])
if i != 0 and prev_biphone not in self._ending:
return False
next_biphone = ''.join(utterance[j + 1:j + 1 + self.window])
if len(utterance) != j - i and next_biphone not in self._beginning:
return False
return True
def _update_candidate(self, segmented, utterance, i, j):
self._lexicon.update([''.join(utterance[i:j+1])])
segmented += [''.join(utterance[i:j+1])]
if len(utterance[i:j+1]) == len(utterance):
self._log.debug(
'utterance %s added in lexicon', ''.join(utterance[i:j+1]))
else:
self._log.debug(
'match %s added in lexicon', ''.join(utterance[i:j+1]))
if len(utterance[i:j+1]) >= 2:
self._beginning.update([''.join(utterance[i:i+self.window])])
self._ending.update([''.join(utterance[j+1-self.window:j+1])])
self._log.debug(
'biphones %s added in beginning',
''.join(utterance[i:i+self.window]))
self._log.debug(
'biphones %s added in ending',
''.join(utterance[j+1-self.window:j+1]))
return segmented
@staticmethod
def _segment_candidate(segmented, utterance, i, j):
segmented.append(''.join(utterance[i:j+1]))
return segmented
def _process_utterance(self, utterance, segmented, do_update):
"""Recursive function implementing puddle
Parameters
----------
utterance : list
A non-empty list of phonological symbols (phones or syllables)
corresponding to an utterance.
segmented : list
Recursively build lexicon of pseudo words.
do_update : bool
When True, update the model while segmenting
Raises
------
ValueError
If `utterance` is empty.
"""
if not utterance:
raise ValueError('The utterance is empty')
# select the right method for segmenting pseudo words with respect to
# `do_update`
process_candidate = (
self._update_candidate if do_update else
self._segment_candidate)
found = False
# index of start of word candidate
i = 0
while i < len(utterance):
j = i
while j < len(utterance):
candidate_word = ''.join(utterance[i:j+1])
# self._log.debug('word candidate: %s', candidate_word)
if candidate_word in self._lexicon:
if self.by_frequency:
# choose the best candidate by looking at the
# frequency of different candidates
j = self._filter_by_frequency(utterance, i, j)
# check if the boundary conditions are respected
found = self._filter_by_boundary_condition(utterance, i, j)
if found:
self._log.info('match found : %s', candidate_word)
if i != 0:
# add the word preceding the word found in
# lexicon; update beginning and ending
# counters and segment
segmented = process_candidate(
segmented, utterance, 0, i-1)
# update the lexicon, beginning and ending counters
segmented = process_candidate(
segmented, utterance, i, j)
if j != len(utterance) - 1:
# recursion
return self._process_utterance(
utterance[j+1:],
segmented=segmented,
do_update=do_update)
# go to the next chunk and apply the same condition
self._log.info(
'go to next chunk : %s', utterance[j+1:])
break
j += 1
i += 1 # or go to the next phoneme
if not found:
process_candidate(segmented, utterance, 0, len(utterance) - 1)
return segmented
def _do_puddle(text, window, by_frequency, log_level, log_name):
"""Auxiliary function to segment"""
model = Puddle(
window=window,
by_frequency=by_frequency,
log=utils.get_logger(name=log_name, level=log_level))
return list(model.segment(text, update_model=True))
[docs]def segment(text, train_text=None, window=2, by_frequency=False, nfolds=5,
njobs=1, log=utils.null_logger()):
"""Returns a word segmented version of `text` using the puddle algorithm
Parameters
----------
text : sequence of str
A sequence of lines with syllable (or phoneme) boundaries
marked by spaces and no word boundaries. Each line in the
sequence corresponds to a single and complete utterance.
train_text : sequence of str
The list of utterances to train the model on. If None (default) the
model is trained online during segmentation. When `train_text` is
specified, the options `nfolds` and `njobs` are ignored.
window : int, optional
Number of phonemes to be taken into account for boundary constraint.
Default to 2.
by_frequency : bool, optional
When True choose the word candidates by filterring them by frequency.
Default to False.
nfolds : int, optional
The number of folds to segment the `text` on. This option is ignored if
a `train_text` is provided.
njobs : int, optional
The number of subprocesses to run in parallel. The folds are
independant of each others and can be computed in parallel. Requesting
a number of jobs greater then `nfolds` have no effect. This option is
ignored if a `train_text` is provided.
log : logging.Logger, optional
The logger instance where to send messages.
Returns
-------
generator
The utterances from `text` with estimated words boundaries.
See also
--------
wordseg.folding.fold
"""
# force the text to be a list of utterances
text = list(text)
if not train_text:
log.info('not train data provided, will train model on test data')
log.debug('building %s folds', nfolds)
folded_texts, fold_index = folding.fold(text, nfolds)
# segment the folds in parallel
segmented_texts = joblib.Parallel(n_jobs=njobs, verbose=0)(
joblib.delayed(_do_puddle)(
fold, window, by_frequency,
log.getEffectiveLevel(),
f'wordseg-puddle - fold {n+1}')
for n, fold in enumerate(folded_texts))
log.debug('unfolding the %s folds', nfolds)
output_text = folding.unfold(segmented_texts, fold_index)
return (utt for utt in output_text if utt)
# force the train text from sequence to list
train_text = list(train_text)
log.info('train data: %s utterances loaded', len(train_text))
# init a puddle model and train it
model = Puddle(window=window, by_frequency=by_frequency, log=log)
model.train(train_text)
# segmentation of the test text, keeping the model constant
return (utt for utt in model.segment(text, update_model=False) if utt)
def _add_arguments(parser):
"""Add algorithm specific options to the parser"""
parser.add_argument(
'-f', '--nfolds', type=int, metavar='<int>', default=None,
help='number of folds to segment the text on, default is 5, '
'ignored if <training-file> specified.')
parser.add_argument(
'-j', '--njobs', type=int, metavar='<int>', default=None,
help='number of parallel jobs to use, default is 1, '
'ignored if <training-file> specified.')
parser.add_argument(
'-w', '--window', type=int, default=2, metavar='<int>', help='''
Number of phonemes to be taken into account for boundary constraint,
default is %(default)s.''')
parser.add_argument(
'-F', '--by-frequency', action='store_true',
help='choose word candidates based on frequency '
'(deactivated by default)')
# parser.add_argument(
# '-d', '--decay', action='store_true',
# help='Decrease the size of lexicon, modelize memory of lexicon.')
@utils.CatchExceptions
def main():
"""Entry point of the 'wordseg-puddle' command"""
streamin, streamout, _, log, args = utils.prepare_main(
name='wordseg-puddle',
description=__doc__,
add_arguments=_add_arguments,
train_file=True)
# post-process arguments
if args.train_file and (args.njobs or args.nfolds):
raise ValueError(
'--train-file option is incompatible with --njobs and --nfolds')
args.njobs = args.njobs or 1
args.nfolds = args.nfolds or 5
# load the train text if any
train_text = None
if args.train_file:
if not os.path.isfile(args.train_file):
raise RuntimeError(
f'test file not found: {args.train_file}')
train_text = codecs.open(args.train_file, 'r', encoding='utf8')
# load train and test texts, ignore empty lines
test_text = (line for line in streamin if line)
if train_text:
train_text = (line for line in train_text if line)
segmented = segment(
test_text, train_text=train_text,
window=args.window, by_frequency=args.by_frequency,
nfolds=args.nfolds, njobs=args.njobs, log=log)
streamout.write('\n'.join(segmented) + '\n')
if __name__ == '__main__':
main()