Source code for ABXpy.analyze

"""This module is used to analyse the results of an ABX discrimination task

It collapses the result and give the mean score for each block of triplets
sharing the same on, across and by labels. It output a tab separated csv file
which columns are the relevant labels, the average score and the number of
triplets in the block. See `Files format <FilesFormat.html>`_ for a more
in-depth explanation.

It requires a score file and a task file.

Usage
-----
Form the command line:

.. code-block:: bash

    python analyze.py data.score data.abx data.csv

In python:

.. code-block:: python

    import ABXpy.analyze
    # Prerequisite: calculate a task data.abx, and a score data.score
    ABXpy.analyze.analyze(data.score, data.abx, data.csv)

"""

import h5py
import numpy as np
import argparse
import os.path as path
import os
import warnings

from ABXpy.misc.type_fitting import fit_integer_type


[docs]def npdecode(keys, max_ind): """Vectorized implementation of the decoding of the labels: i = (a1*n2 + a2)*n3 + a3 ... """ res = np.empty((len(keys), len(max_ind))) aux = keys k = len(max_ind) for i in range(k - 1): res[:, k - 1 - i] = np.mod(aux, max_ind[k - 1 - i]) aux = np.divide(aux - res[:, k - 1 - i], max_ind[k - 1 - i]) res[:, 0] = aux return res
[docs]def unique_rows(arr): """Numpy unique applied to the row only""" return (np.unique(np.ascontiguousarray(arr) .view(np.dtype((np.void, arr.dtype.itemsize * arr.shape[1])))) .view(arr.dtype).reshape(-1, arr.shape[1]))
[docs]def collapse(scorefile, taskfile, fid): """Collapses the results for each triplets sharing the same on, across and by labels. """ # We make the assumption that everything fits in memory... scorefid = h5py.File(scorefile, 'r+') taskfid = h5py.File(taskfile, 'r') bys = taskfid['bys'][...] for by_idx, by in enumerate(bys): # print 'collapsing {0}/{1}'.format(by_idx + 1, len(bys)) trip_attrs = taskfid['triplets']['by_index'][by_idx] tfrk = taskfid['regressors'][by] tmp = tfrk[u'indexed_data'] indices = np.array(tmp) if indices.size == 0: continue tmp = scorefid['scores'][trip_attrs[0]:trip_attrs[1]] scores_arr = np.array(tmp) tmp = np.ascontiguousarray(indices).view( np.dtype((np.void, indices.dtype.itemsize * indices.shape[1]))) n_indices = np.max(indices, 0) + 1 assert np.prod(n_indices) < 18446744073709551615, "type not big enough" ind_type = fit_integer_type(np.prod(n_indices), is_signed=False) # encoding the indices of a triplet to a unique index new_index = indices[:, 0].astype(ind_type) for i in range(1, len(n_indices)): new_index = indices[:, i] + n_indices[i] * new_index permut = np.argsort(new_index) # collapsing the score sorted_scores = scores_arr[permut] sorted_index = new_index[permut] mean, unique_index, counts = unique(sorted_index, sorted_scores) # retrieving the triplet indices from the unique index. tmp = npdecode(unique_index, n_indices) regs = tfrk['indexed_datasets'] indexes = [] for reg in regs: indexes.append(tfrk['indexes'][reg][:]) nregs = len(regs) for i, key in enumerate(tmp): aux = list() for j in range(nregs): aux.append(indexes[j][int(key[j])]) score = mean[i] n = counts[i] result = aux + [by, score, int(n)] fid.write('\t'.join(map(str, result)) + u'\n') # results.append(aux + [context, score, n]) # wf_tmp.write('\t'.join(map(str, results[-1])) + '\n') scorefid.close() taskfid.close() del taskfid
# wf_tmp.close() # return results
[docs]def unique(index, scores): flag = np.concatenate( ([True], index[1:] != index[:-1], [True])) unique_idx = np.nonzero(flag)[0] counts = unique_idx[1:] - unique_idx[:-1] unique_index = index[unique_idx[:-1]] means = np.empty((len(counts)),) i = 0 for a, c in enumerate(counts): means[a] = np.mean(scores[i:i+c]) i += c means = (means + 1) / 2 return means, unique_index, counts
[docs]def analyze(task_file, score_file, result_file): """Analyse the results of a task Parameters ---------- task_file : string, hdf5 file the file containing the triplets and pairs of the task score_file : string, hdf5 file the file containing the score of a task result_file: string, csv file the file that will contain the analysis results """ with open(result_file, 'w+') as fid: taskfid = h5py.File(task_file, 'r') aux = taskfid['regressors'] tfrk = aux[list(aux)[0]] regs = tfrk['indexed_datasets'] string = u'' for reg in regs: string += reg + '\t' string += 'by\tscore\tn\n' fid.write(string) taskfid.close() collapse(score_file, task_file, fid)
[docs]def parse_args(): parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description='Collapse results of ABX score on type of ABX triplet.', epilog='''Example usage: $ ./analyze.py abx.score abx.task abx.csv compute the average the scores in abx.score by type of ABX triplet and output the results in tab separated csv format.''') parser.add_argument('scorefile', metavar='SCORE', help='score file in hdf5 format') parser.add_argument('taskfile', metavar='TASK', help='task file in hdf5 format') parser.add_argument('output', metavar='OUTPUT', help='output file in csv format') return vars(parser.parse_args())
[docs]def main(): args = parse_args() score_file = args['scorefile'] if not path.exists(score_file): print('No such file: {}'.format(score_file)) exit() task_file = args['taskfile'] if not path.exists(task_file): print('No such file: {}'.format(task_file)) exit() result_file = args['output'] if os.path.exists(result_file): warnings.warn( 'Overwriting results file ' + args['output'], UserWarning) os.remove(result_file) analyze(task_file, score_file, result_file)
if __name__ == '__main__': main()