Source code for ABXpy.distances.distances

import h5py
import numpy as np
import pandas
import multiprocessing
import os
import sys
import warnings
import h5features

# FIXME Enforce single process usage when using python compiled with OMP
# enabled

# FIXME detect when multiprocessed jobs crashed

# FIXME do a separate functions: generic load_balancing

# FIXME write distances in a separate file

# FIXME write split_feature_file, such that it create the appropriate
# files + check that no filename conflict can occur in: (f + '_' +
# str(on) + '_' + str(off)) -> mem -> mem_by_cpu if not enough
# mem_by_cpu to load whole dataset: rewrite files of size matching
# mem_by_cpu using the structure of the jobs do not rewrite items of a
# by block if not used in considered job only way this can fail is if
# some (sub-)by blocks contain too many items for being stored in
# mem_by_cpu do not treat this case for now, but detect it and if it
# ever happens: divide items in (items_1, items_2,....)  read items_1,
# then items_1, items_2, then items_1, items_3...  if this ever proves
# too slow: could use co-clustering on the big by blocks, use it for
# the job creation and adopt smarter loading schemes

[docs]def create_distance_jobs(pair_file, distance_file, n_cpu, buffer_max_size=100): """Divide the work load into smaller blocks to be passed to the cpus Parameters ---------- pair_file: string hdf5 task file, or at least an hdf5 file containing a 'unique_pairs' dataset distance_file: string hdf5 file taht will contain the distance datasets n_cpu: number of cpus tu use block_ceil_size: int maximum size in RAM of a block in Mb """ # FIXME check (given an optional checking function) # that all features required in feat_dbs are indeed present in feature # files # getting 'by' datasets characteristics with h5py.File(pair_file, 'r') as fh: # by_dsets = [by_dset for by_dset in fh['feat_dbs']] by_dsets = fh['bys'][...] by_n_pairs = [] # number of distances to be computed for each by db for by_dset in by_dsets: attrs = fh['unique_pairs'].attrs[by_dset] by_n_pairs.append(attrs[2] - attrs[1]) total_n_pairs = fh['unique_pairs/data'].shape[0] # initializing output datasets with h5py.File(distance_file, 'a') as fh: fh.attrs.create('done', False) g = fh.create_group('distances') g.create_dataset('data', shape=(total_n_pairs, 1), dtype=np.float) """ #### Load balancing #### Heuristic: each process should have approximately the same number of distances to compute. 1 - 'by' blocks bigger than n_pairs/n_proc are divided into blocks of size n_pairs/n_proc + a remainder block 2 - iteratively the cpu with the lowest number of distances gets attributed the biggest remaining block This will not be a good heuristic if the average time required to compute a distance varies widely from one 'by' dataset to another or if the i/o time is larger than the time required to compute the distances. """ # step 1 by_n_pairs = np.int64(by_n_pairs) total_n_pairs = np.sum(by_n_pairs) max_block_size = min( np.int64(np.ceil(total_n_pairs / np.float(n_cpu))), # buffer_max_size * 1000000 / np.dtype(by_n_pairs).itemsize) buffer_max_size * 1000000 / 8) by = [] start = [] stop = [] n_dist = [] for n_pairs, dset in zip(by_n_pairs, by_dsets): sta = 0 sto = 0 while n_pairs > 0: if n_pairs > max_block_size: amount = max_block_size else: amount = n_pairs n_pairs = n_pairs - amount sto = sto + amount by.append(dset) start.append(sta) stop.append(sto) n_dist.append(amount) sta = sta + amount # step 2 # blocks are sorted according to the number of distances they contain # (in decreasing order, hence the [::-1]) order = np.argsort(np.int64(n_dist))[::-1] # associated cpu for each block in zip(by, start, stop, n_dist) cpu = np.zeros(len(by), dtype=np.int64) cpu_loads = np.zeros(n_cpu, dtype=np.int64) for i in order: idlest_cpu = np.argmin(cpu_loads) cpu[i] = idlest_cpu cpu_loads[idlest_cpu] = cpu_loads[idlest_cpu] + n_dist[i] # output job description for each cpu by = np.array(by) # easier to index... start = np.array(start) stop = np.array(stop) jobs = [] for i in range(n_cpu): indices = np.where(cpu == i)[0] _by = list(by[indices]) _start = start[indices] _stop = stop[indices] job = {'pair_file': pair_file, 'by': _by, 'start': _start, 'stop': _stop} jobs.append(job) return jobs
# If there are very large by blocks, two additional # things could help optimization: # 1. doing co-clustering inside of the large by blocks # at the beginning to get two types of blocks: # sparse and dense distances (could help if i/o is a problem # but a bit delicate) # 2. rewriting the features to accomodate the co-clusters # If there are very small by blocks and i/o become too slow # could group them into intermediate size h5features files # # This function can be used concurrently by several processes. # When it is the case synchronization of the writing operations in # the target distance_file is required by HDF5. # The current solution uses a global lock for the whole file. # Since each job is writing in different places, it should in principle be # possible to do all the write concurrently if ever necessary, using parallel # HDF5 (based on MPI-IO).
[docs]def run_distance_job(job_description, distance_file, distance, feature_files, feature_groups, splitted_features, job_id, normalize, distance_file_lock=None): if distance_file_lock is None: synchronize = False else: synchronize = True if not(splitted_features): times = {} features = {} for feature_file, feature_group in zip(feature_files, feature_groups): # with recent versions of h5features, files can contain # properties (as data[2]) data =, feature_group) t = data[0] f = data[1] assert not(set(times.keys()).intersection( t.keys())), ("The same file is indexed by (at least) two " "different feature files") times.update(t) features.update(f) get_features = Features_Accessor(times, features).get_features_from_raw pair_file = job_description['pair_file'] n_blocks = len(job_description['by']) for b in range(n_blocks): print('Job %d: computing distances for block %d on %d' % (job_id, b, n_blocks)) # get block spec by = job_description['by'][b] start = job_description['start'][b] stop = job_description['stop'][b] if splitted_features: # FIXME modify feature_file/feature_group to adapt to 'by' # FIXME any change needed when several feature files before # splitting ? times = {} features = {} for feature_file, feature_group in zip(feature_files, feature_groups): # with recent versions of h5features, files can contain # properties (as data[2]) data =, feature_group) t = data[0] f = data[1] assert not(set(times.keys()).intersection( t.keys())), ("The same file is indexed by (at least) two " "different feature files") times.update(t) features.update(f) accessor = Features_Accessor(times, features) get_features = accessor.get_features_from_splitted # load pandas dataframe containing info for loading the features if synchronize: distance_file_lock.acquire() store = pandas.HDFStore(pair_file) by_db = store['feat_dbs/' + by] store.close() # load pairs to be computed # indexed relatively to the above dataframe with h5py.File(pair_file, 'r') as fh: attrs = fh['unique_pairs'].attrs[by] pair_list = fh['unique_pairs/data'][attrs[1]+start:attrs[1]+stop, 0] base = attrs[0] if synchronize: distance_file_lock.release() A = np.mod(pair_list, base) B = pair_list // base pairs = np.column_stack([A, B]) n_pairs = pairs.shape[0] # get dataframe with one entry by item involved in this block # indexed by its 'by'-specific index by_inds = np.unique(np.concatenate([A, B])) items = by_db.iloc[by_inds] # get a dictionary whose keys are the 'by' indices features = get_features(items) dis = np.empty(shape=(n_pairs, 1)) # FIXME: second dim is 1 because of the way it is stored to disk, # but ultimately it shouldn't be necessary anymore # (if using axis arg in np2h5, h52np and h5io...) for i in range(n_pairs): dataA = features[pairs[i, 0]] dataB = features[pairs[i, 1]] if dataA.shape[0] == 0: warnings.warn('No features found for file {}, {} - {}' .format(items['file'][pairs[i, 0]], items['onset'][pairs[i, 0]], items['offset'][pairs[i, 0]]), UserWarning) if dataB.shape[0] == 0: warnings.warn('No features found for file {}, {} - {}' .format(items['file'][pairs[i, 1]], items['onset'][pairs[i, 1]], items['offset'][pairs[i, 1]]), UserWarning) try: if normalize is not None: if normalize == 1: normalize = True elif normalize == 0: normalize = False else: print('normalized parameter neither 1 nor 0,' 'using normalization') normalize = True dis[i, 0] = distance(dataA, dataB, normalized=normalize) else: dis[i, 0] = distance(dataA, dataB) except: sys.stderr.write( 'Error when calculating the distance between item {}, {} - {} ' 'and item {}, {} - {}\n' .format(items['file'][pairs[i, 0]], items['onset'][pairs[i, 0]], items['offset'][pairs[i, 0]], items['file'][pairs[i, 1]], items['onset'][pairs[i, 1]], items['offset'][pairs[i, 1]]), ) raise if synchronize: distance_file_lock.acquire() with h5py.File(distance_file, 'a') as fh: fh['distances/data'][attrs[1]+start:attrs[1]+stop, :] = dis if synchronize: distance_file_lock.release()
# mem in megabytes # FIXME allow several feature files? # and/or have an external utility for concatenating them? # get rid of the group in feature file (never used ?)
[docs]def compute_distances(feature_file, feature_group, pair_file, distance_file, distance, normalized, n_cpu=None, mem=1000, feature_file_as_list=False): #with h5py.File(distance_file) as fh: # fh.attrs.create('distance', pickle.dumps(distance)) if n_cpu is None: n_cpu = multiprocessing.cpu_count() if not(feature_file_as_list): feature_files = [feature_file] feature_groups = [feature_group] else: feature_files = feature_file feature_groups = feature_group # FIXME if there are other datasets in feature_file this is not accurate mem_needed = 0 for feature_file in feature_files: feature_size = os.path.getsize(feature_file) / float(2 ** 20) mem_needed = feature_size * n_cpu + mem_needed splitted_features = False # splitted_features = mem_needed > mem # if splitted_features: # split_feature_file(feature_file, feature_group, pair_file) jobs = create_distance_jobs(pair_file, distance_file, n_cpu) # results = [] if n_cpu > 1: # use of a manager seems necessary because we're using a Pool... distance_file_lock = multiprocessing.Manager().Lock() try: pool = multiprocessing.Pool(n_cpu) args = [(job, distance_file, distance, feature_files, feature_groups, splitted_features, i, normalized, distance_file_lock) for i, job in enumerate(jobs)], args) finally: pool.close() pool.join() else: run_distance_job( jobs[0], distance_file, distance, feature_files, feature_groups, splitted_features, 1, normalized) with h5py.File(distance_file, 'a') as fh: fh.attrs.modify('done', True)
# hack, external function for visibility reasons
[docs]def worker(args): return run_distance_job(*args)
[docs]class Features_Accessor(object): def __init__(self, times, features): self.times = times self.features = features # FIXME a dirty fix to deal with python3 incompatibility, # would be better to handle it in h5features directly if sys.version_info.major >= 3: try: self.times = { k.decode('utf8'): v for k, v in self.times.items()} self.features = { k.decode('utf8'): v for k, v in self.features.items()} except AttributeError: pass
[docs] def get_features_from_raw(self, items): features = {} for ix, f, on, off in zip( items.index, items['file'], items['onset'], items['offset']): f = str(f) t = np.where(np.logical_and( self.times[f] >= on, self.times[f] <= off))[0] # if len(t) == 0: # raise IOError('No features found for file {}, at ' # 'time {}-{}'.format(f, on, off)) features[ix] = self.features[f][t, :] return features
[docs] def get_features_from_splitted(self, items): features = {} for ix, f, on, off in zip(items.index, items['file'], items['onset'], items['offset']): features[ix] = self.features[f + '_' + str(on) + '_' + str(off)] return features