Source code for ABXpy.h5tools.h5_handler

"""Sort rows of a several two dimenional numeric dataset (possibly
with just one column) according to numeric key in two-dimensional key
dataset with just one column (the first dimension of all datasets
involved must match). The result replaces the original dataset buffer
size is in kilobytes.

Two things need improvement:

  * backup solution is too slow for big files

  * the case of very small files should be handled nicely by using
    internal sort

To gain time: could try to parallelize the sort, however not easy how
it would work for the merging part...  Also cythonizing the 'read
chunk' part might help getting more efficient when there are many
chunks

Also should write a function to determine buffer_size based on amount
of RAM and size of the file to be sorted: aim for 30 chunks or the
least possible without exploding the RAM, except if file can be loaded
in memory as a whole, then do internal sorting

"""

import numpy as np
import tempfile
import os
import h5py

from . import np2h5, h52np


[docs]class H5Handler(object): def __init__(self, h5file, keygroup, keyset, groups=None, datasets=None): if groups is None: groups = [] if datasets is None: datasets = [] paths = [g + '/' + d for g, d in zip(groups, datasets)] keypath = keygroup + '/' + keyset if keypath in paths: raise ValueError( 'key dataset %s should not be in the list of datasets to be sorted: %s' % (keypath, paths)) self.file = h5file self.groups = [keygroup] + groups self.datasets = [keyset] + datasets self.sources = list(zip(self.groups, self.datasets)) # sort the content of several datasets with n lines and varying # number of columns in an h5file according to the order specified # in the first column of the 'key' dataset the result replaces the # original datasets order is specified by integers and sort is # done in increasing order buffer size is in Ko
[docs] def sort(self, buffer_size=1000, o_buffer_size=1000, tmpdir=None): # first backup file to be sorted self.backupDir = tempfile.mkdtemp(dir=tmpdir) self.backup = os.path.join(self.backupDir, os.path.basename(self.file)) # shutil.copyfile(self.file, self.backup) # FIXME if no backup is mad cannot recover from exceptions ... try: with H5TMP(tmpdir=tmpdir) as tmp: with h5py.File(self.file, 'a') as f: # check file structure for g, d in self.sources: if not(g + '/' + d in f): raise IOError( 'Dataset %s does not exist in group %s of file %s!' % (d, g, self.file)) # set tmp file structure (cannot be done in __init__ # because the tmp file is destroyed as soon as the 'with' # statement is exited) for g in np.unique(self.groups): tmp.create_group(g) # initialize variables self.tmp = tmp dsets = [f[g][d] for g, d in self.sources] n_rows = [dset.shape[0] for dset in dsets] if not(all([n_row == n_rows[0] for n_row in n_rows])): raise IOError( 'First dimensions of all the datasets should be the same') self.n_row = n_rows[0] self.n_columns = [dset.shape[1] for dset in dsets] self.dtypes = [dset.dtype for dset in dsets] dset_weights = [ dset.shape[1] * dset.dtype.itemsize for dset in dsets] self.dsets = dsets self.line_weight = np.sum(dset_weights) buf_rows = int( round(buffer_size * 1000. / self.line_weight)) row = 0 n_chunks = 0 # here default to internal sort if not enough rows ? # sort while row <= self.n_row - buf_rows: self.extract_chunk(row, row + buf_rows, n_chunks) n_chunks = n_chunks + 1 row = row + buf_rows if row < self.n_row: self.extract_chunk(row, self.n_row, n_chunks) n_chunks = n_chunks + 1 # merge (tmp still open but f closed, both are important (tmp destruction and backup)) # try block to allow backup recovery if something goes wrong try: with np2h5.NP2H5(self.file) as o: with h52np.H52NP(tmp) as i: # buf_size = ? for i: n_chunks * n_line_buf * totallinecost = buf_size ... # for o ? # create n_datasets*n_chunks input buffers # number of rows i_buf_rows = int(round(buf_rows / n_chunks)) # might need to set up an explicit case for dealing # with files that can fit in one chunk and remove # these asserts assert i_buf_rows > 0 i_buf = [] for ix, (g, d) in enumerate(self.sources): buf1 = [] for c in range(n_chunks): buf_size = self.dtypes[ ix].itemsize * self.n_columns[ix] * i_buf_rows / 1000. buf1.append( i.add_dataset(g, d + '_' + str(c), buf_size)) i_buf.append(buf1) # create n_datasets output buffers o_buf_rows = int( round(1000. * o_buffer_size / self.line_weight)) assert o_buf_rows > 0 o_buf = [] for ix, (g, d) in enumerate(self.sources): buf_size = self.dtypes[ ix].itemsize * self.n_columns[ix] * o_buf_rows / 1000. o_buf.append(o.add_dataset(g, d, self.n_row, self.n_columns[ ix], buf_size, buf_size, self.dtypes[ix], overwrite=True)) # some redundancy could be removed from the following while + flushing # while not all input datasets empty: # sufficient to check key buffers only while not(all([buf.dataset_empty() for buf in i_buf[0]])): # find min of maxes (current tails) in key min_tail = min( [buf.current_tail() for buf in i_buf[0] if not(buf.isempty())]) # find for each chunk the amount to be read amounts = [ buf.nb_lower_than(min_tail) for buf in i_buf[0]] # get the order on key for the concatenated # data data = [] # for all chunks: read up to the appropriate # index, concatenate, sort and output for c in range(n_chunks): if amounts[c] > 0: data.append( i_buf[0][c].read(amounts[c])) data = np.concatenate(data) order = np.argsort(data[:, 0]) o_buf[0].write(data[order, :]) # for all other dset use the order from the # key: for d in range(1, len(self.datasets)): data = [] # for all chunks: read up to the # appropriate index, concatenate, sort and # output for c in range(n_chunks): if amounts[c] > 0: data.append( i_buf[d][c].read(amounts[c])) data = np.concatenate(data) o_buf[d].write(data[order, :]) # flush the buffers to finish data = [] for c in range(n_chunks): if not(i_buf[0][c].buffer_empty()): # read without argument gives the whole # buffer data.append(i_buf[0][c].read()) if data: # False iff list is empty data = np.concatenate(data) order = np.argsort(data[:, 0]) o_buf[0].write(data[order, :]) for d in range(1, len(self.datasets)): data = [] for c in range(n_chunks): if not(i_buf[d][c].buffer_empty()): # read without argument gives the whole # buffer data.append(i_buf[d][c].read()) if data: # False iff list is empty data = np.concatenate(data) o_buf[d].write(data[order, :]) # use the backup except: # shutil.copyfile(self.backup, self.file) #FIXME not usable # if backup isn't made raise # delete the backup finally: # os.remove(self.backup) os.rmdir(self.backupDir)
[docs] def extract_chunk(self, i_start, i_end, chunk_id): # get key order for the chunk key = self.dsets[0][i_start:i_end, 0] order = np.argsort(key) # sort all datasets according to it and store them for i, d in enumerate(self.dsets): # sort chunk chunk = d[i_start:i_end, :] chunk = chunk[order, :] # store chunk chunk_name = self.datasets[i] + '_' + str(chunk_id) g = self.groups[i] self.tmp[g].create_dataset( chunk_name, (i_end - i_start, self.n_columns[i]), dtype=d.dtype) self.tmp[g][chunk_name][:, :] = chunk
# Class for creating a temporary h5 file. Should be used with the 'with' # statement. The temporary file will be erased upon exiting the 'with' # statement
[docs]class H5TMP(object): def __init__(self, tmpdir=None): self.tmpdir = tempfile.mkdtemp(dir=tmpdir) self.tmpfile = os.path.join(self.tmpdir, 'tmp.h5') def __enter__(self): self.tmp = h5py.File(self.tmpfile, 'a') return self.tmp def __exit__(self, eType, eValue, eTrace): try: self.tmp.close() os.remove(self.tmpfile) os.rmdir(self.tmpdir) except: print('problem') # here could log a warning... pass
[docs]def test(): # generate random h5 file n1 = 10 n2 = 1000 np.random.seed(42) folder = tempfile.mkdtemp() with h5py.File(os.path.join(folder, 'tmp.h5')) as fh: fh.create_group('key') fh['key'].create_dataset('k', (n2 * n1, 1), np.int64) fh.create_group('data') fh['data'].create_dataset('f', (n2 * n1, 3), np.float64) fh['data'].create_dataset('i', (n2 * n1, 5), np.int64) for i in range(n1): keys = np.random.randint(10 ** 12, size=(n2, 1)) fh['key']['k'][i * n2:(i + 1) * n2, :] = keys floats = np.random.rand(n2, 3) fh['data']['f'][i * n2:(i + 1) * n2, :] = floats integers = np.random.randint(10 ** 12, size=(n2, 5)) fh['data']['i'][i * n2:(i + 1) * n2, :] = integers #folder = '/var/folders/ma/maqYoEehEsiaVpQWYhnOr++++TY/-Tmp-/tmpaLxTaQ' # test sort on it h = H5Handler( os.path.join(folder, 'tmp.h5'), 'key', 'k', ['data', 'data'], ['f', 'i']) h.sort(100, 100)
# get the same in a regular arrays and compare with regular sorting # ...