Source code for wordseg.evaluate

"""Word segmentation evaluation

Evaluates a segmented text against it's gold version: outputs the
precision, recall and f-score at type, token and boundary levels. We
distinguish whether utterance edges (begin and end of the utterance)
are counted towards the boundary performance or not.

The evaluation optionally computes the adjusted rank index (requires
the prepared text to be provided) and a summary of which word come to
be correctly segmented, or else segmented incorrectly (requires an
output JSON file to be specified).

"""

import codecs
import collections
import json
import warnings

import numpy as np
from sklearn.metrics.cluster import adjusted_rand_score

from wordseg import utils
from wordseg.separator import Separator


[docs]class TokenEvaluation: """Evaluation of token f-score, precision and recall""" def __init__(self): self.test = 0 self.gold = 0 self.correct = 0 self.n = 0 self.n_exactmatch = 0
[docs] def precision(self): """Returns token precision""" return float(self.correct) / self.test if self.test != 0 else None
[docs] def recall(self): """Returns token recall""" return float(self.correct) / self.gold if self.gold != 0 else None
[docs] def fscore(self): """Returns token fscore""" total = self.test + self.gold return float(2 * self.correct) / total if total != 0 else None
[docs] def exact_match(self): """Returns the number of exact matches""" return float(self.n_exactmatch) / self.n if self.n else None
[docs] def update(self, test_set, gold_set): """Update evaluation for a single utterance""" self.n += 1 if test_set == gold_set: self.n_exactmatch += 1 # omit empty items for type scoring (should not affect token # scoring). Type lists are prepared with '_' where there is no # match, to keep list lengths the same self.test += len([x for x in test_set if x != '_']) self.gold += len([x for x in gold_set if x != '_']) self.correct += len(test_set & gold_set)
[docs] def update_lists(self, test, gold): """Update evaluation for a suite of utterances""" if len(test) != len(gold): raise ValueError( '#words different in test and gold: {} != {}' .format(len(test), len(gold))) for t, g in zip(test, gold): self.update(t, g)
[docs]class TypeEvaluation(TokenEvaluation): """Evaluation of type f-score, precision and recall"""
[docs] @staticmethod def lexicon_check(textlex, goldlex): """Compare hypothesis and gold lexicons""" textlist = [] goldlist = [] for w in textlex: if w in goldlex: # set up matching lists for the true positives textlist.append(w) goldlist.append(w) else: # false positives textlist.append(w) # ensure matching null element in text list goldlist.append('_') for w in goldlex: if w not in goldlist: # now for the false negatives goldlist.append(w) # ensure matching null element in text list textlist.append('_') textset = [{w} for w in textlist] goldset = [{w} for w in goldlist] return textset, goldset
[docs] def update_lists(self, text, gold): lt, lg = self.lexicon_check(text, gold) super(TypeEvaluation, self).update_lists(lt, lg)
[docs]class BoundaryEvaluation(TokenEvaluation): """Evaluation of boundary f-score, precision and recall Includes first and last boundary of an utterance """
[docs] @staticmethod def get_boundary_positions(stringpos): """Returns the positions of boundaries""" return [{idx for pair in line for idx in pair} for line in stringpos]
[docs] def update_lists(self, text, gold): lt = self.get_boundary_positions(text) lg = self.get_boundary_positions(gold) super(BoundaryEvaluation, self).update_lists(lt, lg)
[docs]class BoundaryNoEdgeEvaluation(BoundaryEvaluation): """Evaluation of boundary f-score, precision and recall Excludes first and last boundary of an utterance """
[docs] @staticmethod def get_boundary_positions(stringpos): return [{left for left, _ in line if left > 0} for line in stringpos]
class _StringPos: """Compute start and stop index of words in an utterance""" def __init__(self): self.idx = 0 def __call__(self, n): """Return the position of the current word given its length `n`""" start = self.idx self.idx += n return start, self.idx
[docs]def read_data(text, separator=Separator(None, None, ' ')): """Load text data for evaluation Parameters ---------- text : list of str The list of utterances to read for the evaluation. separator : Separator, optional Separators to tokenize the text with, default to space separated words. Returns ------- (words, positions, lexicon) : three lists where `words` are the input utterances with word separators removed, `positions` stores the start/stop index of each word for each utterance, and `lexicon` is the list of words. """ words = [] positions = [] lexicon = {} # ignore empty lines for utt in (utt for utt in text if utt.strip()): # the utterance with word separators removed words.append(separator.remove(utt, 'word')) utt = list(separator.tokenize(utt, 'word')) # loop over words in line and add to dictionary for word in utt: lexicon[word] = 1 idx = _StringPos() positions.append({idx(len(word)) for word in utt}) # return the words lexicon as a sorted list return words, positions, sorted(lexicon.keys())
[docs]def compute_class_labels(words, units): """Compute class labels to be used for cluster similarity measures Each word is considered a class, and each unit is mapped to the word it belongs to. This function is used as a preprocessing step for the Adjusted Rand Index. Parameters ---------- words: list of str Utterances made of space separated words. units : list of str Utterances made of space separated atomic units (phonemes or syllables). Returns ------- class_labels : numpy array of int Each unit mapped to the word it belongs to (with words coded as integers) Raises ------ ValueError: If `words` and `units` do not match together Examples -------- >>> from wordseg.evaluate import compute_class_labels >>> words = ['hello world', 'python'] >>> units = ['h el lo wo r ld', 'py th on'] >>> compute_class_labels(words, units) array([0, 0, 0, 1, 1, 1, 2, 2, 2]) """ # in case the inputs are generators, force them as lists words, units = list(words), list(units) # make sure we have the number of utterances in words and units if not len(words) == len(units): raise ValueError( 'words and units do not have the same number of utterances: ' 'len(words)={}, len(units)={}'.format(len(words), len(units))) # for each utterance, checks words and units are the same nunits = 0 for n, (words_utt, units_utt) in enumerate(zip(words, units)): words_utt = words_utt.replace(' ', '') nunits += len(units_utt.split()) units_utt = units_utt.replace(' ', '') if not words_utt == units_utt: raise ValueError( 'utterance {}: words/units do not match: ' 'words="{}" / units="{}"'.format(n, words_utt, units_utt)) # collapse words and units from nested lists to numpy array words = np.asarray([word for utt in words for word in utt.split()]) units = np.asarray([unit for utt in units for unit in utt.split()]) # count the number of units in a word def _word_len(word, units): index, w = 0, '' while w != word: w += units[index] index += 1 return index # build the class labels class_labels = np.zeros((nunits,), dtype=int) index = 0 for class_id, word in enumerate(words): try: new_index = index + _word_len(word, units[index:]) except IndexError: raise ValueError( 'word "{}" not found in "{}"'.format(word, units[index:])) class_labels[index:new_index] = class_id index = new_index return class_labels
[docs]class SegmentationSummary: """Computes a summary of the segmentation errors The errors can be oversegmentations, undersegmentations or missegmentations. Correct segmentations are also reported. """ def __init__(self): # token separation on words only self.separator = Separator(phone=None, syllable=None, word=' ') # count over/under/mis/good segmentation for each word type self.over_segmentation = collections.defaultdict(int) self.under_segmentation = collections.defaultdict(int) self.mis_segmentation = collections.defaultdict(int) self.correct_segmentation = collections.defaultdict(int)
[docs] def to_dict(self): """Exports the summary as a dictionary Returns ------- summary : dict A dictionary with the complete summary in the following entries: 'over', 'under', 'mis', 'correct'. In each entry, the words are sorted by decreasing frequency, and alphabetically (for equivalent frequency). """ # collapse all the dicts in a single one summary = { 'over': self.over_segmentation, 'under': self.under_segmentation, 'mis': self.mis_segmentation, 'correct': self.correct_segmentation} # sort by most frequent word decreasing order (and then # alphabetically increasing order) summary = {k: {w[0]: w[1] for w in sorted( v.items(), key=lambda x: (-x[1], x[0]), reverse=False)} for k, v in summary.items()} return summary
[docs] def summarize(self, text, gold): """Computes segmentation errors on a whole text Call :meth:`summarize_utterance` on each utterance of gold and text. Parameters ---------- text : list of str The list of utterances for the segmented text (to be evaluated) gold : list of str The list of utterances for the gold text Raises ------ ValueError If `text` and `gold` do not have the same number of utterances. If :meth:`summarize_utterance` raise a ValueError. """ if not len(gold) == len(text): raise ValueError( 'text and gold do not have the same number of utterances') for t, g in zip(text, gold): self.summarize_utterance(t, g)
[docs] def summarize_utterance(self, text, gold): """Computes segmentation errors on a single utterance This method returns no result but update the intern summary, accessible using :meth:`to_dict`. Parameters ---------- text : str A segmented utterance gold : str A gold utterance Raises ------ ValueError If `text` and `gold` are mismatched, i.e. they do not contain the same suite of letters (once all the spaces removed). """ # check gold and text match (with all spaces removed) if self.separator.remove(gold) != self.separator.remove(text): raise ValueError( 'mismatch in gold and text: {} != {}'.format(gold, text)) # get text and gold as lists of words gold_words = self.separator.tokenize(gold, level='word') text_words = self.separator.tokenize(text, level='word') # silly case where gold and text are identical if gold_words == text_words: for word in gold_words: self.correct_segmentation[word] += 1 return # divide gold and text in chunks, packing chunks where gold # and text share a common boundary. chunks = self._boundary_chunks(text_words, gold_words) # classify each chunk as under/over/mis/good segmentation for text_chunk, gold_chunk in chunks: category = self._classify_chunk(text_chunk, gold_chunk) if category == 'correct': d = self.correct_segmentation elif category == 'under': d = self.under_segmentation elif category == 'over': d = self.over_segmentation else: d = self.mis_segmentation # register the chunk's words into the summary for the # relevant category for word in gold_chunk: d[word] += 1
@classmethod def _boundary_chunks(cls, text, gold): """Returns the list of chunks in a pair of text/gold utterance""" return cls._boundary_chunks_aux(text, gold, []) @classmethod def _boundary_chunks_aux(cls, text, gold, chunks): lg = len(gold) lt = len(text) # end of recursion if not lg and not lt: return chunks # impossible to have one empty but not the other. Should be # the case by construction, this assert is not required. assert lg and lt # compute the next chunk chunk = cls._compute_chunk(text, gold) # recursion return cls._boundary_chunks_aux( text[len(chunk[0]):], gold[len(chunk[1]):], chunks + [chunk]) @staticmethod def _compute_chunk(text, gold): """Find the first chunk in a pair of text/gold utterances A chunk is a pair of lists of words sharing a common boundary (begin and end of a sequence of words). Example ------- >>> gold = 'baby going home'.split() >>> text = 'ba by going home'.split() >> _compute_chunk(text, gold) (['ba', 'by'], ['baby']) >>> text = 'babygoinghome'.split() >> _compute_chunk(text, gold) (['babygoinghome'], ['baby', 'going', 'home']) """ # non empty texts and same letters. This should be the case by # construction, those checks are not required. if not(len(gold) and len(text)): raise ValueError('gold or text is empty') if ''.join(gold) != ''.join(text): raise ValueError( f'gold != text: {"".join(gold)} != {"".join(text)}') # easy case, first word is the same if gold[0] == text[0]: return ([text[0]], [gold[0]]) text_concat, text_index = text[0], 0 gold_concat, gold_index = gold[0], 0 while len(gold_concat) != len(text_concat): if len(gold_concat) < len(text_concat): gold_index += 1 gold_concat = gold_concat + gold[gold_index] else: text_index += 1 text_concat = text_concat + text[text_index] return (text[:text_index+1], gold[:gold_index+1]) def _classify_chunk(self, text, gold): """A chunk is either over/under/mis/correct""" if len(gold) == len(text): if len(gold) == 1: return 'correct' return 'mis' elif len(gold) < len(text): if len(gold) == 1: return 'over' return 'mis' else: # len(gold) > len(text) if len(text) == 1: return 'under' return 'mis'
[docs]def summary(text, gold): """Computes the summary of segmentation errors This function is a simple wrapper on :class:`SegmentationSummary` Parameters ---------- text : list of str The list of utterances for the segmented text (to be evaluated) gold : list of str The list of utterances for the gold text Returns ------- summary : dict A dictionary with the complete summary in the following entries: 'over', 'under', 'mis', 'correct'. Raises ------ ValueError If `text` and `gold` do not match, or something went wrong during the summary computation. """ s = SegmentationSummary() s.summarize(text, gold) return s.to_dict()
[docs]def evaluate(text, gold, units=None): """Scores a segmented text against its gold version Parameters ---------- text : sequence of str A suite of utterances made of space separated words. gold : sequence of str A suite of utterances made of space separated words. units : sequence of str, optional A suite of utterances made of space separated atomic units (phonemes or syllables). When specified, the function also computes the adjusted rand index. Returns ------- scores : ordered dict A dictionary with the following entries in that fixed order: * 'type_fscore' * 'type_precision' * 'type_recall' * 'token_fscore' * 'token_precision' * 'token_recall' * 'boundary_all_fscore' * 'boundary_all_precision' * 'boundary_all_recall' * 'boundary_noedge_fscore' * 'boundary_noedge_precision' * 'boundary_noedge_recall' If `units` is specified in arguments, this additional entry is added: * 'adjusted_rand_index' Raises ------ ValueError If `gold` and `text` have different size or differ in tokens """ # force text and gold to be lists text, gold = list(text), list(gold) word_separator = Separator(None, None, ' ') text_words, text_stringpos, text_lex = read_data(text, word_separator) gold_words, gold_stringpos, gold_lex = read_data(gold, word_separator) if len(gold_words) != len(text_words): raise ValueError( 'gold and train have different size: len(gold)={}, len(train)={}' .format(len(gold_words), len(text_words))) for i, (g, t) in enumerate(zip(gold_words, text_words)): if g != t: raise ValueError( 'gold and train differ at line {}: gold="{}", train="{}"' .format(i+1, g, t)) # token evaluation token_eval = TokenEvaluation() token_eval.update_lists(text_stringpos, gold_stringpos) # type evaluation type_eval = TypeEvaluation() type_eval.update_lists(text_lex, gold_lex) # boundary evaluation (with edges) boundary_eval = BoundaryEvaluation() boundary_eval.update_lists(text_stringpos, gold_stringpos) # boundary evaluation (no edges) boundary_noedge_eval = BoundaryNoEdgeEvaluation() boundary_noedge_eval.update_lists(text_stringpos, gold_stringpos) # return the scores in a fixed order (the default dict does not # repect insertion order). This is needed for python<3.6, see # https://docs.python.org/3.6/whatsnew/3.6.html#new-dict-implementation results = collections.OrderedDict((k, v) for k, v in ( ('token_precision', token_eval.precision()), ('token_recall', token_eval.recall()), ('token_fscore', token_eval.fscore()), ('type_precision', type_eval.precision()), ('type_recall', type_eval.recall()), ('type_fscore', type_eval.fscore()), ('boundary_all_precision', boundary_eval.precision()), ('boundary_all_recall', boundary_eval.recall()), ('boundary_all_fscore', boundary_eval.fscore()), ('boundary_noedge_precision', boundary_noedge_eval.precision()), ('boundary_noedge_recall', boundary_noedge_eval.recall()), ('boundary_noedge_fscore', boundary_noedge_eval.fscore()))) if units: labels_text = compute_class_labels(text, units) labels_gold = compute_class_labels(gold, units) with warnings.catch_warnings(): # ignore a warning issued by sklearn warnings.simplefilter('ignore', category=PendingDeprecationWarning) results['adjusted_rand_index'] = adjusted_rand_score( labels_gold, labels_text) return results
def _load_text(text): """Returns a list of non-empty striped lines from `text`""" return [l for l in (l.strip() for l in text) if l] def _add_arguments(parser): """Defines custom command-line arguments for wordseg-eval""" parser.add_argument( 'gold', metavar='<gold-file>', help='gold file to evaluate the input data on') parser.add_argument( '-r', '--rand-index', metavar='<prep-file>', default=None, help='computes the adjusted rand index, requires the prepared ' 'file as outputed by wordseg-prep (i.e. the atomic units, phonemes ' 'or syllables, separated by spaces)') parser.add_argument( '-s', '--summary', metavar='<summary-file>', default=None, help='computes a summary of the segmentation errors as ' 'over/under/mis segmentation of word types. Write the result in ' '<summary-file> in a JSON format. <summary-file> should have the ' '.json extension but this is not required') @utils.CatchExceptions def main(): """Entry point of the 'wordseg-eval' command""" streamin, streamout, _, log, args = utils.prepare_main( name='wordseg-eval', description=__doc__, add_arguments=_add_arguments) log.info('loads input and gold texts') # load the gold text as a list of utterances, remove empty lines gold = _load_text(codecs.open(args.gold, 'r', encoding='utf8')) # load the text as a list of utterances, remove empty lines text = _load_text(streamin) # load the prepared (unsegmented) text as a list of utterances, # remove empty lines if args.rand_index: units = _load_text(codecs.open(args.rand_index, 'r', encoding='utf8')) else: units = None # evaluation returns a dict of 'score name' -> float log.info('evaluates the segmentation') results = evaluate(text, gold, units=units) streamout.write('\n'.join( # display scores with 4-digit float precision '{}\t{}'.format(k, '%.4g' % v if v is not None else 'None') for k, v in results.items()) + '\n') if args.summary: log.info('computes errors summary, writes to %s', args.summary) with codecs.open(args.summary, 'w', encoding='utf8') as fsummary: fsummary.write(json.dumps(summary(text, gold), indent=4)) if __name__ == '__main__': main()