Source code for ABXpy.h5tools.h52np

"""Read efficiently h5 files

Includes functions useful for merging sorted datasets.

Some code is shared by H52NP and NP2H5: could have a superclass:
optionally_h5_context_manager who would consist in implementing
__init__, __enter__, __exit__ where a filename or a file handle can be
passed and the file should be handled by the context manager only if a
filename is passed.

Also the functionalities specific to sorted datasets could be put in a
subclass.

"""

import numpy as np
import bisect
import h5py


[docs]class H52NP(object): # sink is the name of the HDF5 file to which to write, buffer size is in # kilobytes def __init__(self, h5file): # set up output file and buffer list if isinstance(h5file, str): self.manage_file = True self.filename = h5file self.file_open = False else: # supposed to be a h5 file handle self.manage_file = False self.file_open = True self.file = h5file self.filename = h5file.filename self.buffers = [] def __enter__(self): if not(self.file_open): self.file = h5py.File(self.filename, 'r+') self.file_open = True return self def __exit__(self, eType, eValue, eTrace): if self.file_open and self.manage_file: try: self.file.close() self.file_open = False except: if eValue is not None: # the first exception will be raised, but could log a # warning here ... pass else: raise
[docs] def add_dataset(self, group, dataset, buf_size=100, minimum_occupied_portion=0.25): if self.file_open: buf = H52NPbuffer( self, group, dataset, buf_size, minimum_occupied_portion) self.buffers.append(buf) return buf else: raise IOError( "Method add_dataset of class H52NP can only be used " "within a 'with' statement!")
[docs] def add_subdataset(self, group, dataset, buf_size=100, minimum_occupied_portion=0.25, indexes=None): if self.file_open: buf = H5dataset2NPbuffer( self, group, dataset, buf_size, minimum_occupied_portion, indexes=indexes) self.buffers.append(buf) return buf else: raise IOError( "Method add_dataset of class H52NP can only be used " "within a 'with' statement!")
[docs]class H52NPbuffer(object): def __init__(self, parent, group, dataset, buf_size, minimum_occupied_portion): assert parent.file_open # get info from dataset # fail if dataset do not exist if not(group + '/' + dataset in parent.file): raise IOError('Dataset %s does not exists in file %s!' % (dataset, parent.filename)) dset = parent.file[group][dataset] self.n_rows = dset.shape[0] self.n_columns = dset.shape[1] self.type = dset.dtype self.dataset = dset self.dataset_ix = 0 # could add checks: no more than 2 dims, etc. self.parent = parent self.minimum_occupied_portion = minimum_occupied_portion # initialize buffer row_size = self.n_columns * self.type.itemsize / \ 1000. # entry size in kilobytes self.buf_len = int(round(buf_size / row_size)) # buf_ix represents the number of free rows in the buffer. Here the # buffer is empty self.buf_ix = self.buf_len self.buf = np.zeros((self.buf_len, self.n_columns), self.type) # fill it self.refill_buffer() # read and consume, refill automatically if the buffer becomes empty, if # there is not enough data left, just send less than what was asked
[docs] def read(self, amount=None): assert self.parent.file_open if not(amount is None) and amount <= 0: raise ValueError( 'The amount to read in h52np.read must be strictly positive') if amount is None: amount = self.buf_len - self.buf_ix if self.isempty(): raise StopIteration amount_found = 0 data = [] while amount_found < amount and not(self.isempty()): needed = amount - amount_found amount_in_buffer = self.buf_len - self.buf_ix if amount_in_buffer > needed: # enough data in buffer next_buf_ix = self.buf_ix + needed amount_found = amount else: # not enough data in buffer (or just enough) next_buf_ix = self.buf_len amount_found = amount_found + amount_in_buffer # the np.copy is absolutely necessary here to avoid ugly # side effects... data.append(np.copy(self.buf[self.buf_ix:next_buf_ix, :])) self.buf_ix = next_buf_ix # fill buffer or not, according to refill policy and current buffer # state self.refill_buffer() return np.concatenate(data)
[docs] def refill_buffer(self): if not(self.dataset_ix == self.n_rows): # for now one policy is implemented: if less than # self.minimum_occupied_portion of the full capacity is occupied # the buffer is refilled occupied_portion = 1. - float(self.buf_ix) / float(self.buf_len) if occupied_portion < self.minimum_occupied_portion: # set useful variables curr_ix = self.dataset_ix next_ix = curr_ix + self.buf_ix next_buf_ix = next_ix - self.n_rows amount_in_buffer = self.buf_len - self.buf_ix # take care of not going out of the dataset next_buf_ix = max(next_buf_ix, 0) next_ix = min(next_ix, self.n_rows) # move old data self.buf[next_buf_ix:next_buf_ix+amount_in_buffer, :] = self.buf[self.buf_ix:,:] # add new data self.buf[next_buf_ix+amount_in_buffer:, :] = self.dataset[curr_ix:next_ix,:] # update indices self.buf_ix = next_buf_ix self.dataset_ix = next_ix
def __iter__(self): return self def __next__(self): return self.read() # true only if the input file has been totally read and the buffer is empty
[docs] def isempty(self): assert self.parent.file_open return self.dataset_ix == self.n_rows and self.buf_ix == self.buf_len
[docs] def buffer_empty(self): assert self.parent.file_open return self.buf_ix == self.buf_len
[docs] def dataset_empty(self): assert self.parent.file_open return self.dataset_ix == self.n_rows
# return the last row currently in the buffer (useful for merge sort...) # assuming the data is one-column
[docs] def current_tail(self): assert self.parent.file_open assert self.n_columns == 1 return self.buf[-1, 0]
# returns the number of element in the buffer lower or equal to x, # assuming the data is one-column ordered and sorted
[docs] def nb_lower_than(self, x): assert self.parent.file_open assert self.n_columns == 1 return bisect.bisect_right(self.buf[self.buf_ix:, :], x)
[docs]class H5dataset2NPbuffer(H52NPbuffer): """Augmentation of the H%2NPbuffer, proposing to use a subdataset selected by index""" def __init__(self, parent, group, dataset, buf_size, minimum_occupied_portion, indexes=None): assert parent.file_open # super(H5dataset2NPbuffer, self).__init__( # parent, group, dataset, buf_size, # minimum_occupied_portion) # get info from dataset # fail if dataset do not exist if not(group + '/' + dataset in parent.file): raise IOError('Dataset %s does not exists in file %s!' % (dataset, parent.filename)) dset = parent.file[group][dataset] self.n_columns = dset.shape[1] self.type = dset.dtype self.dataset = dset self.dataset_ix = 0 self.dataset_end = self.dataset.shape[0] if indexes is not None: assert len(indexes) == 2 self.dataset_ix = indexes[0] self.dataset_end = indexes[1] self.n_rows = self.dataset_end # - self.dataset_ix # could add checks: no more than 2 dims, etc. self.parent = parent self.minimum_occupied_portion = minimum_occupied_portion # initialize buffer row_size = self.n_columns * self.type.itemsize / \ 1000. # entry size in kilobytes self.buf_len = int(round(buf_size / row_size)) # buf_ix represents the number of free rows in the buffer. Here the # buffer is empty self.buf_ix = self.buf_len self.buf = np.zeros((self.buf_len, self.n_columns), self.type) # fill it self.refill_buffer() # read and consume, refill automatically if the buffer becomes empty, if # there is not enough data left, just send less than what was asked
[docs] def read(self, amount=None): return super(H5dataset2NPbuffer, self).read(amount)
[docs] def refill_buffer(self): if not(self.dataset_ix == self.dataset_end): # for now one policy is implemented: if less than # self.minimum_occupied_portion of the full capacity is occupied # the buffer is refilled occupied_portion = 1. - float(self.buf_ix) / float(self.buf_len) if occupied_portion < self.minimum_occupied_portion: # set useful variables curr_ix = self.dataset_ix next_ix = curr_ix + self.buf_ix next_buf_ix = next_ix - self.n_rows amount_in_buffer = self.buf_len - self.buf_ix # take care of not going out of the dataset next_buf_ix = max(next_buf_ix, 0) next_ix = min(next_ix, self.dataset_end) # move old data self.buf[next_buf_ix:next_buf_ix+amount_in_buffer, :] = self.buf[self.buf_ix:,:] # add new data self.buf[next_buf_ix+amount_in_buffer:, :] = self.dataset[curr_ix:next_ix,:] # update indices self.buf_ix = next_buf_ix self.dataset_ix = next_ix