Source code for ABXpy.dbfun.lookuptable_connector

"""

AST visitor class for finding calls to lookup table (.dbfun) files in
a script and make the script ready for execution.

There is a restriction: when in a lookup table calling node, check
that the node.code_nodes concatened of the children is empty:
i.e. hierarchical calls to auxiliary h5 file are not allowed with a
depth larger than 1. We could allow deeper calls, but would there be
any practical benefit ? Probably not because the input to a lookup
table call can only be of the type of a column of the database (at
least with the current system).

"""

import ast
import uuid


[docs]class LookupTableConnector(ast.NodeTransformer): def __init__(self, script, aux_functions, aliases, *args, **kwargs): ast.NodeTransformer.__init__(self, *args, **kwargs) self.aux_functions = aux_functions self.aliases = aliases self.script = script
[docs] def check_Call(self, node): if not(isinstance(node.func.ctx, ast.Load) and not(node.keywords) and node.kwargs is None and node.starargs is None): raise ValueError( 'Call to function %s defined in an auxiliary h5 file is not ' 'correct in script: %s' % (node.func.id, self.script))
[docs] def check_Subscript(self, node, func_name): error = ValueError( 'Call to function %s defined in an auxiliary h5 file is not ' 'correct in script: %s' % (func_name, self.script)) if not(isinstance(node.slice, ast.Index) and isinstance(node.ctx, ast.Load)): raise error if not(isinstance(node.slice.value, ast.Tuple) and isinstance(node.slice.value.ctx, ast.Load)): raise error # output column must appear explicitly as strings if not(all([isinstance(e, ast.Str) for e in node.slice.value.elts])): raise error
[docs] def check_flatness(self, node): if node.code_nodes: raise ValueError( 'There are calls to functions defined in auxiliary files whose' ' arguments are defined in terms of calls to other functions ' 'defined in auxiliary files in script: %s' % self.script)
# extract and store in a code_node all useful information about the call # and replace node in its parent by a ast.Name node pointing to a random # but duly stored 'varname'
[docs] def bundle_call_info(self, node, call_node): code_node = {} if not(node is call_node): code_node['output_cols'] = [e.s for e in node.slice.value.elts] code_node['child_asts'] = [ast.Expression(arg) for arg in node.args] code_node['function'] = self.aux_functions( self.aliases.index(call_node.func.id)) code_node['varname'] = 'var_' + str(uuid.uuid4()).translate(None, '-') new_node = ast.copy_location( ast.Name(ctx=ast.Load(), id=code_node['varname']), node) new_node.code_nodes = [code_node] return new_node
# case of call with output columns specified: aux_file_alias(in_1, ..., # in_k).out['out_name_1',..., 'out_name_n']
[docs] def visit_Subscript(self, node): # check if it is a call to an auxiliary h5 file aux_call = False if isinstance(node.value, ast.Attribute) and node.value.attr == 'out': call_node = node.value.value if ((isinstance(call_node, ast.Call) and call_node.func.id in self.aliases)): aux_call = True if not(aux_call): # visit all children and bubble up code_nodes node = self.generic_visit(node) return node else: # first check correctness of the call self.check_Call(call_node) self.check_Subscript(node, call_node.func.id) # check that the aux function call is flat (i.e. there are no other # aux function call in the call_node children) # generic visit here, not visit_Call, to bubble up code_nodes call_node = self.generic_visit(call_node) node.code_nodes = call_node.code_nodes self.check_flatness(node) # extract and store in a code_node all useful information about # the call # and replace node in its parent by a ast.Name node pointing to a # random but duly stored 'varname' return self.bundle_call_info(node, call_node)
# case of call for all outputs: aux_file_alias(in_1, ..., in_k)
[docs] def visit_Call(self, node): # check if it is not a call to an auxiliary h5 file if not(node.func.id in self.aliases): # visit all children and bubble up code_nodes node = self.generic_visit(node) return node else: # first check correctness of the call self.check_Call(node) # check that the aux function call is flat (i.e. there are no other # aux function call in the node children) node = self.generic_visit(node) # bubble up code nodes self.check_flatness(node) return self.bundle_call_info(node, node)
[docs] def generic_visit(self, node): # visit all children super(ast.NodeVisitor, self).generic_visit(node) # bubble up code_nodes from the children if any (cleaning up children # at the same time to keep sound asts) node.code_nodes = [] for child in ast.iter_child_nodes(node): node.code_nodes = node.code_nodes + child.code_nodes del(child.code_nodes)