import os
from six import iteritems
import collections
import h5py
import numpy as np
import ABXpy.misc.type_fitting as type_fitting
from . import np2h5
# API functions:
# __init__, write, sort, find, read
[docs]class H5IO(object):
# example call without shared indexes and without fused datasets: H5IO('file.h5', ['talker', 'language', 'age'], {'talker': ['t1', 't2', 't3'], 'language': ['French', 'English']})
# example call with shared indexes and with fused datasets:
# H5IO('file.h5', {'talker1': 'talker', 'talker2': 'talker', 'language':
# 'language', 'age1': None, 'age2': None}, {'talker': ['t1', 't2', 't3'],
# 'language': ['French', 'English']}, {'talkers': ['talker1', 'talker2']})
def __init__(self, filename, datasets=None, indexes=None, fused=None, group='/'):
# format and check inputs
if indexes is None:
indexes = {}
if fused is None:
fused = {}
if datasets is not None:
if isinstance(datasets, collections.abc.Mapping):
indexed_datasets = [
key for key, value in iteritems(datasets) if not(value is None)]
indexed_datasets_indexes = [
value for value in datasets.values() if not(value is None)]
if not(set(datasets.values()).difference([None]) == set(indexes.keys())):
raise ValueError(
'Indexes and datasets declaration are inconsistent.')
datasets = list(datasets)
else:
indexed_datasets = list(indexes)
indexed_datasets_indexes = list(indexes)
if not(set(indexes.keys()).issubset(datasets)):
raise ValueError(
'Indexes and datasets declaration are inconsistent.')
# check that all datasets to be fused are indexed
all_fused_dsets = [dset for dsets in fused.values()
for dset in dsets]
for dset in all_fused_dsets:
if not(dset in indexed_datasets):
raise ValueError(
'Only datasets for which an index was provided can be fused.')
# create HDF5 file if it doesn't exist
try: # first check if file exists
with open(filename):
if not(datasets is None):
raise IOError('File %s already exists' % filename)
with h5py.File(filename, 'a') as f:
try:
f[group]
except KeyError:
raise IOError(
"File %s doesn't contain a group named %s" % (filename, group))
except IOError: # if file doesn't exist create it
if datasets is None:
raise IOError("File %s doesn't exist" % filename)
with h5py.File(filename, 'a') as f:
if not(group in f): # handler error here ...
g = f.create_group(group)
else:
g = f[group]
# general structure
g.attrs['empty'] = True
g.attrs['sorted'] = False
# h5 dtype for storing variable length strings
str_dtype = h5py.special_dtype(vlen=str)
g.create_dataset(
'managed_datasets', data=get_array(datasets), dtype=str_dtype)
raw_datasets = list(set(datasets).difference(indexed_datasets))
if raw_datasets:
g.create_dataset(
'raw_datasets', data=raw_datasets, dtype=str_dtype)
# indexed datasets
if indexed_datasets:
g.create_dataset(
'indexed_datasets', data=get_array(indexed_datasets), dtype=str_dtype)
g.create_dataset(
'indexed_datasets_indexes', data=get_array(indexed_datasets_indexes), dtype=str_dtype)
index_group = g.create_group('indexes')
for key, value in iteritems(indexes):
index_group.create_dataset(
key, data=get_array(value), dtype=get_dtype(value))
non_fused = [
dset for dset in indexed_datasets if not(dset in all_fused_dsets)]
if non_fused:
g.create_dataset(
'non_fused_datasets', data=get_array(non_fused), dtype=str_dtype)
# fused datasets
if fused:
g.create_dataset(
'fused_datasets', data=list(fused), dtype=str_dtype)
h = g.create_group('fused')
for name, fused_dsets in iteritems(fused):
i = h.create_group(name)
i.create_dataset(
'datasets', data=fused_dsets, dtype=str_dtype)
nb_levels = [len(indexes[indexed_datasets_indexes[
indexed_datasets.index(dset)]]) for dset in fused_dsets]
i.create_dataset(
'nb_levels', data=nb_levels, dtype=np.uint64)
# instantiate h5io runtime object from (possibly newly created) file
self.filename = filename
self.group = group
self.__load__()
def __load__(self):
self.__load_metadata__()
# FIXME: self.load_data() # this implementation supposes that the
# datasets can be held in memory without problems
def __load_metadata__(self):
with h5py.File(self.filename, 'a') as f:
g = f[self.group]
self.is_empty = g.attrs['empty']
self.is_sorted = g.attrs['sorted']
self.managed_datasets = list(g['managed_datasets'][...])
if 'raw_datasets' in g:
self.raw_datasets = list(g['raw_datasets'][...])
else:
self.raw_datasets = []
if 'indexed_datasets' in g:
self.indexed_datasets = list(g['indexed_datasets'][...])
self.indexed_datasets_indexes = list(
g['indexed_datasets_indexes'][...])
self.indexes = {}
for dset in g['indexes']:
self.indexes[dset] = list(g['indexes'][dset][...])
else:
self.indexed_datasets = []
if 'non_fused_datasets' in g:
self.non_fused_datasets = list(g['non_fused_datasets'][...])
else:
self.non_fused_datasets = []
if 'fused_datasets' in g:
self.fused_datasets = list(g['fused_datasets'][...])
self.fused_members = {}
self.key_weights = {}
self.nb_levels = {}
for fused_dataset in g['fused']:
self.fused_members[fused_dataset] = list(
g['fused'][fused_dataset]['datasets'][...])
if fused_dataset + '/key_weights' in g['fused']:
self.key_weights[fused_dataset] = g['fused'][
fused_dataset]['key_weigths'][...]
else:
self.nb_levels[fused_dataset] = g['fused'][
fused_dataset]['nb_levels'][...]
else:
self.fused_datasets = []
# FIXME h5io should be developed as a subclass of np2h5
def __enter__(self):
try:
self.np2h5 = np2h5.NP2H5(self.filename)
self.np2h5.__enter__()
return self
except:
# FIXME if this fails might need a try block to ignore the
# exception?
del self.np2h5
raise
def __exit__(self, eType, eValue, eTrace):
try:
self.np2h5.__exit__(eType, eValue, eTrace)
# FIXME here could need to ignore/log a second exception if eValue is
# not None
finally:
del self.np2h5
[docs] def write(self, data, append=True, iterate=False, indexed=False):
if not(hasattr(self, 'np2h5')):
raise RuntimeError(
"Writing to h5io objects must be done inside a context manager ('with' statemt)")
if not(self.is_empty) and not(append):
raise IOError('File %s is already filled' % self.filename)
# if necessary, instantiate datasets
if self.is_empty:
if iterate:
sample_data = data.next()
else:
sample_data = data
self.__initialize_datasets__(sample_data)
else:
sample_data = None
# FIXME for now have to check that np2h5 was initialized
if not(self.np2h5.buffers):
raise ValueError(
"Current implementation does not allow to complete non-empty datasets")
# set flags
with h5py.File(self.filename, 'a') as f:
if self.is_empty:
self.is_empty = False
f[self.group].attrs['empty'] = False
if self.is_sorted:
self.is_sorted = False
f[self.group].attrs['sorted'] = False
if not(sample_data is None) and iterate:
self.__write__(sample_data, indexed)
if iterate:
for d in data:
self.__write__(d, indexed)
else:
self.__write__(data, indexed)
def __parse_input_data__(self, data):
if not(isinstance(data, collections.abc.Mapping)):
data_dict = {}
for dataset, d in zip(self.managed_datasets, data):
data_dict[dataset] = d
data = data_dict
if not(set(data.keys()) == set(self.managed_datasets)):
raise ValueError(
'It is necessary to write to all of the managed datasets simultaneously.')
return data
def __convert_input_data__(self, data):
res = {}
for dset, d in iteritems(data):
if not(hasattr(d, 'shape')):
d = np.array(d) # risky type conversion ?
if len(d.shape) == 1:
# to avoid shape problems, maybe non optimal
d = np.reshape(d, (d.shape[0], 1))
res[dset] = d
return res
def __initialize_datasets__(self, sample_data):
self.out = {}
sample_data = self.__parse_input_data__(sample_data)
sample_data = self.__convert_input_data__(sample_data)
dims = {dset: 1 if len(data.shape) == 1 else data.shape[
1] for dset, data in iteritems(sample_data)}
# needed for raw_datasets only
dtypes = {
dset: get_dtype(sample_data[dset]) for dset in self.raw_datasets}
# init raw datasets
for dset in self.raw_datasets:
(group, dataset) = os.path.split(dset)
if not(group):
group = '/'
self.out[dset] = self.np2h5.add_dataset(group, dataset, n_columns=dims[dset], item_type=dtypes[
dset], fixed_size=False) # FIXME at some point should become super.add_dataset(...)
# init not fused indexed datasets, in this implementation they are all
# encoded in the same matrix
if self.non_fused_datasets:
indexed_dims = [dims[dset] for dset in self.non_fused_datasets]
indexed_levels = [len(self.indexes[dset])
for dset in self.non_fused_datasets]
dim = sum(indexed_dims)
# smallest unsigned integer dtype compatible with all
# indexed_datasets
d_type = type_fitting.fit_integer_type(
max(indexed_levels), is_signed=False)
# FIXME at some point should become super.add_dataset(...)
self.out['indexed'] = self.np2h5.add_dataset(
self.group, 'indexed_data', n_columns=dim, item_type=d_type, fixed_size=False)
with h5py.File(self.filename, 'a') as f:
# necessary to access the part of the data corresponding to a
# particular dataset
f[self.group].create_dataset(
'indexed_cumudims', data=np.cumsum(indexed_dims), dtype=np.uint64)
# fused datasets have a separate one dimensional dataset each
self.key_weights = {}
for fused_dset in self.fused_datasets:
fused_dims = np.array(
[dims[dset] for dset in self.fused_members[fused_dset]], dtype=np.uint64)
max_key = np.prod(
self.nb_levels[fused_dset] ** fused_dims) - np.uint64(1)
if max_key >= 2 ** 64:
raise ValueError('fused dataset %s in file %s cannot be created because 64 bits keys are not sufficient to cover all possible combinations of the fused datasets' % (
fused_dset, self.filename))
# smallest unsigned integer dtype compatible
d_type = type_fitting.fit_integer_type(max_key, is_signed=False)
# FIXME at some point should become super.add_dataset(...)
self.out[fused_dset] = self.np2h5.add_dataset(
self.group, fused_dset, n_columns=1, item_type=d_type, fixed_size=False)
nb_levels_with_multiplicity = np.concatenate([np.array(
n, dtype=d_type) * np.ones(d, dtype=d_type) for n, d in zip(self.nb_levels[fused_dset], fused_dims)])
self.key_weights[fused_dset] = np.concatenate(
[np.array([1], dtype=d_type), np.cumprod(d_type(nb_levels_with_multiplicity))[:-1]])
with h5py.File(self.filename, 'a') as f:
f[self.group]['fused'][fused_dset].create_dataset(
'key_weights', data=self.key_weights[fused_dset], dtype=d_type)
def __write__(self, data, indexed=False):
data = self.__parse_input_data__(data)
if not(indexed):
data = self.__compute_indexes__(data)
data = self.__convert_input_data__(data)
# write raw data
for dset in self.raw_datasets:
# need type conversion sometimes here? (np.array(data[dset]))
self.out[dset].write(data[dset])
# write indexed data
if self.non_fused_datasets:
# FIXME check that values are in correct range of index ?
indexed_values = [data[dset] for dset in self.non_fused_datasets]
# need type conversion sometimes here?
self.out['indexed'].write(np.concatenate(indexed_values, axis=1))
# write fused data
for fused_dset in self.fused_datasets:
keys = self.__compute_keys__(fused_dset, np.concatenate(
[data[key] for key in self.fused_members[fused_dset]], axis=1)) # need type conversion sometimes here?
self.out[fused_dset].write(keys)
# this function might be optimized if useful (using searchsorted and
# stuff?)
def __compute_indexes__(self, data):
data = dict([(dset, [self.indexes[self.indexed_datasets_indexes[self.indexed_datasets.index(dset)]].index(
e) for e in d]) if dset in self.indexed_datasets else (dset, d) for dset, d in iteritems(data)])
return data
def __compute_keys__(self, dset, values):
d_type = self.key_weights[dset].dtype
# this is vectorial
keys = np.sum(
self.key_weights[dset] * np.array(values, dtype=d_type), axis=1)
keys = np.reshape(keys, (keys.shape[0], 1))
return keys
# auxiliary function for determining dtype, strings (unicode or not) are
# always encoded with a variable length dtype, thus it should be more
# efficient in general to index string outputs, it's actually mandatory
# because determining chunk_size would fail for non-indexed strings
[docs]def get_dtype(data):
str_dtype = h5py.special_dtype(vlen=str)
# allow for the use of strings
if isinstance(data[0], str):
dtype = str_dtype
# could add some checks that the dtype is one of those supported by h5 ?
elif hasattr(data, 'dtype'):
dtype = data.dtype
else:
dtype = np.array(data).dtype
return dtype
[docs]def get_array(data):
if isinstance(data[0], str):
return np.array(data, dtype='S')
return data
[docs]def test_h5io():
try:
with H5IO('testh5io1.h5', ['talker1', 'talker2']) as h1:
h1.write([[0, 1], [1, 2]])
h1.write((([[i, i + 1], [10 - i, i]])
for i in range(5)), iterate=True)
h1.write({'talker1': [51], 'talker2': [62]})
with H5IO('testh5io2.h5', {'talker1': 'talker', 'talker2': 'talker', 'language': 'language', 'age1': None, 'age2': None}, {'talker': ['t1', 't2', 't3'], 'language': ['French', 'English']}, {'talkers': ['talker1', 'talker2']}) as h2:
h2.write({'talker1': ['t1', 't2', 't2'], 'talker2': ['t3', 't3', 't3'], 'language': [
'French', 'English', 'French'], 'age1': [44, 33, 22], 'age2': [11, 33, 21]}, indexed=False)
h2.write({'talker1': [0, 1, 2], 'talker2': [1, 1, 1], 'language': [
0, 0, 0], 'age1': [44, 33, 22], 'age2': [11, 33, 21]}, indexed=True)
finally:
pass
# os.remove('testh5io1.h5')
# os.remove('testh5io2.h5')