Source code for ABXpy.sideop.side_operations_manager

"""Class  providing services for task.py

- finds out the best point to execute side-operations (such as
  filtering and regressor generation) in the ABX task computation
  flow:

    * basically the more related a given side-operation is to the
      on/across/by structure of the ABX task, the earlier it can be
      executed and the lowest the computational cost is

- providing methods to actually carry out these side-operations at the
  point in the execution flow to which they were attributed

"""

import copy
import numpy as np


[docs]class SideOperationsManager(object): def __init__(self, db_hierarchy, on, across, by): # all columns self.extensions = ['', '_A', '_B', '_X', '_AB', '_AX', '_1', '_2'] self.all_cols = { node.name for tree in db_hierarchy for node in tree.preOrder()} # FIXME add some checks that the original column names will # not cause parsing problems self.extended_cols = [ col + ext for col in self.all_cols for ext in self.extensions] self.extended_cols_by_column = [ [col + ext for col in self.all_cols] for ext in self.extensions] # find on/by/across descendant columns db_hierarchy is a list # of ABX.lib.tinytree object roots = [tree.findChild(lambda x: x.name == on[0]) for tree in db_hierarchy] for root in roots: if not(root is None): on_root = root break self.on_cols = {node.name for node in on_root.preOrder()} across_roots = [] for col in across: roots = [tree.findChild(lambda x: x.name == col) for tree in db_hierarchy] for root in roots: if not(root is None): across_roots.append(root) break self.across_cols = {col for root in across_roots for col in { node.name for node in root.preOrder()}} by_roots = [] for col in by: roots = [tree.findChild(lambda x: x.name == col) for tree in db_hierarchy] for root in roots: if not(root is None): by_roots.append(root) break self.by_cols = {col for root in by_roots for col in { node.name for node in root.preOrder()}} # other columns self.other_cols = set.difference( self.all_cols, set.union(self.on_cols, self.across_cols, self.by_cols)) # FIXME containers could also add AX, AB, BX for further # optimization (but wait to see if this can really be useful) # one value for a whole 'by' database self.by = [] # one value for a whole ABX cell self.on_across_by = [] # value dependent on specific items in A column self.A = [] # value dependent on specific items in B column (or on their # 'on' property which we do not treat as a special case as # there can be very few elements with the same 'on' in a row # in the B column) self.B = [] # value dependent on specific items in X column (or on their # 'across' property which we do not treat as a special case as # there can be very few elements with the same 'across' in a # row in the X colum) self.X = [] # most general case self.ABX = [] self.by_context = { 'by': set(), 'generic': set(), 'on_across_by': set(), 'A': set(), 'B': set(), 'X': set(), 'ABX': set()} self.generic_context = {'generic': set()} self.on_context = { 'on_across_by': set(), 'A': set(), 'B': set(), 'X': set(), 'ABX': set()} self.across_context = { 'on_across_by': set(), 'A': set(), 'B': set(), 'X': set(), 'ABX': set()} self.A_context = {'A': set(), 'ABX': set()} self.B_context = {'B': set(), 'ABX': set()} self.X_context = {'X': set(), 'ABX': set()}
[docs] def parse_extended_columns(self, columns): """Get radical and suffix part for every context_variable, returns the set of the encountered couples. """ out = set() for var in columns: out.add(self.parse_extended_column(var)) return out
[docs] def parse_extended_column(self, column): """Get radical and suffix part of a context_variable. """ for i, cols in enumerate(self.extended_cols_by_column): if column in cols: suffix = self.extensions[i] radical = column[:len(column) - len(self.extensions[i])] break return radical, suffix
[docs] def check_extensions(self, elements): """Check that something with a AX, AB or 1, 2 extension is an on/across descendant and a correct one for AX, AB. """ errC1 = "_1 or _2" errC2 = "_AX" errC3 = "_AB" onAndAcross = "on and across" on = "on" across = "across" def valueError(column, attr): return ValueError("Columns used with extensions " + column + " in \ filter and regressor specifications must be \ appropriately determined by the " + attr + " of \ the task defined.") for r, s in elements: C1 = (s in ['_1', '_2'] and not(r in self.on_cols or r in self.across_cols)) C2 = s == '_AX' and not(r in self.across_cols) C3 = s == 'AB' and not(r in self.on_cols) if C1: raise valueError(errC1, onAndAcross) if C2: raise valueError(errC2, on) if C3: raise valueError(errC3, across)
[docs] def classify_by(self, elements, db_fun, db_variables): """Detect operations that depend only on a variable that is used as a 'by' factor in the ABX task. """ # set db_variables db_variables['by'] = {(r, s) for r, s in elements if r in self.by_cols} # check if we have only by descendants (with or without extension) and # classify these as 'by' if {r for r, s in elements}.issubset(self.by_cols): self.by.append(db_fun) self.by_context['by'].update(db_variables['by']) elements = {} else: # columns determined by 'by' are not considered further elements = {(r, s) for r, s in elements if r not in self.by_cols} return elements, db_variables
[docs] def classify_generic(self, elements, db_fun, db_variables): """Detect operations that can be applied directly to the columns of the original database. This is subclass specific... """ return elements, db_variables
# detect operations that can be applied at the level of an on/across/by # block during the generation of the ABX triplets
[docs] def classify_on_across_by(self, elements, db_fun, db_variables): """Detect operations that can be applied at the level of an on/across/by block during the generation of the ABX triplets. """ if '' in {s for r, s in elements}: radical = {r for (r, s) in elements if s == ''} raise ValueError('Use of column name(s) %s without extension is \ ambiguous in this context' % radical) else: # find elements that do not depend on _1, _AX, AB, or (A or X and # are descendants of on) or (A or B and are descendants of across) def condition(r, s): return ( not(s in ['_1', '_AX', '_AB']) and not(s in ['_A', '_X'] and r in self.on_cols) and not(s in ['A', 'B'] and r in self.across_cols)) # fill db_variables db_variables['on'] = {(r, s) for r, s in elements if ( not(condition(r, s)) and r in self.on_cols)} db_variables['across'] = { (r, s) for r, s in elements if (not(condition(r, s)) and r in self.across_cols)} elements = {e for e in elements if condition(e[0], e[1])} # if there are none, classify as on_across_by if not(elements): self.on_across_by.append(db_fun) self.by_context['on_across_by'].update(db_variables['by']) self.on_context['on_across_by'].update(db_variables['on']) self.across_context['on_across_by'].update( db_variables['across']) return elements, db_variables
# detect operations that depend on only one of the A, B or X # factors inside an on/across/by block other operations are # classified as ABX (the most general)
[docs] def classify_ABX(self, elements, db_fun, db_variables): """the only left extensions are either not descendant of on/across/by or descendant of across and _X or descendant of on and _B (i.e. _2) we do not try to batch the _2 because we think they are potentially too small, instead if necessary we should batch several consecutive calls """ # set up db_variables # FIXME could/should group these three contexts ???? + ABX ???? # in the remaining elements _2 is considered as _B for a on descendant, # _X for a across descendant, so we only have remaining columns with # _A, _B or _X interpret_2 = lambda r: '_B' if r in self.on_cols else '_X' get_ext = lambda r, s: interpret_2(r) if s == '_2' else s db_variables['A'] = {(r, s) for r, s in elements if get_ext(r, s) == '_A'} db_variables['B'] = {(r, s) for r, s in elements if get_ext(r, s) == '_B'} db_variables['X'] = {(r, s) for r, s in elements if get_ext(r, s) == '_X'} # if there is only _Xs or only _Bs, or only _As: classify as # 'singleton' exts = {get_ext(r, s) for r, s in elements} if exts == set(['_A']): self.A.append(db_fun) self.A_context['A'].update(db_variables['A']) name = 'A' elif exts == set(['_B']): self.B.append(db_fun) self.B_context['B'].update(db_variables['B']) name = 'B' elif exts == set(['_X']): self.X.append(db_fun) self.X_context['X'].update(db_variables['X']) name = 'X' # else: classify as 'triplet' (could also have pairs, but do not # implement until proved useful) else: self.ABX.append(db_fun) self.A_context['ABX'].update(db_variables['A']) self.B_context['ABX'].update(db_variables['B']) self.X_context['ABX'].update(db_variables['X']) name = 'ABX' self.by_context[name].update(db_variables['by']) self.on_context[name].update(db_variables['on']) self.across_context[name].update(db_variables['across'])
# db_fun implements the dbfun API
[docs] def add(self, db_fun, name=None): elements = self.parse_extended_columns(db_fun.input_names) db_variables = {} self.check_extensions(elements) # find appropriate point of execution for db_fun elements, db_variables = self.classify_by( elements, db_fun, db_variables) if elements: elements, db_variables = self.classify_generic( elements, db_fun, db_variables) if elements: elements, db_variables = self.classify_on_across_by( elements, db_fun, db_variables) if elements: self.classify_ABX(elements, db_fun, db_variables)
# could use arrays instead of lists for speed ?
[docs] def set_by_context(self, context, stage, by_values): for radical, extension in self.by_context[stage]: context[radical + extension] = [by_values[radical]] return context
# could use arrays instead of lists for speed ?
[docs] def set_generic_context(self, context, stage, db): for radical, extension in self.generic_context[stage]: # note that in the current implementation the extension is # always '' context[radical + extension] = list(db[radical]) return context
[docs] def set_on_across_context(self, context, stage, on_across_values): # this list contains 0 or 1 elements for radical, extension in self.on_context[stage]: context[radical + extension] = [on_across_values[radical]] for radical, extension in self.across_context[stage]: context[radical + extension] = [on_across_values[radical]] return context
# FIXME use a single function for set_by and set_on and set_across ?
[docs] def set_A_B_X_context(self, context_field, context, stage, db, indices): field = getattr(self, context_field) for radical, extension in field[stage]: # FIXME might be faster to index once for all the columns? context[radical + extension] = list(db[radical][indices]) return context
[docs] def set_ABX_context(self, context, db, triplets): # each column of triplets is redundant, this might be used to # acess the db more efficiently... this is the only call to # numpy in the module... could remove this if we always used # arrays... triplets = np.array(triplets) context = self.set_A_B_X_context( 'A_context', context, 'ABX', db, triplets[:, 0]) context = self.set_A_B_X_context( 'B_context', context, 'ABX', db, triplets[:, 1]) context = self.set_A_B_X_context( 'X_context', context, 'ABX', db, triplets[:, 2]) return context
# the evaluate_... functions are actually generators to allow lazy # evaluation for filters
[docs] def evaluate_by(self, by_values): context = self.set_by_context({}, 'by', by_values) # set up context # evaluate dbfun return singleton_result_generator(self.by, context)
# context passed as an argument can be used to induce side-effects # in the result generator, for example for lazy filter evaluation
[docs] def evaluate_generic(self, by_values, db, context=None): # set up context if context is None: context = {} context = self.set_by_context(context, 'generic', by_values) for var in context: context[var] = context[var] * len(db) context = self.set_generic_context(context, 'generic', db) # evaluate dbfuns return result_generator(self.generic, context)
# from this point on, by design, we are sure that generic # variables cannot be needed for context
[docs] def evaluate_on_across_by(self, on_across_by_values): # set up context context = self.set_by_context({}, 'on_across_by', on_across_by_values) context = self.set_on_across_context( context, 'on_across_by', on_across_by_values) # evaluate dbfuns return singleton_result_generator(self.on_across_by, context)
# possible optimization: group A, B, X context in case there is # some overlap ?
[docs] def evaluate_A_B_X(self, name, on_across_by_values, db, indices, context=None): # set up context. context passed as an argument can be used to # induce side-effects in the result generator, for example for # lazy filter evaluation if context is None: context = {} context = self.set_by_context(context, name, on_across_by_values) context = self.set_on_across_context( context, name, on_across_by_values) for var in context: context[var] = context[var] * len(indices) context = self.set_A_B_X_context( name + '_context', context, name, db, indices) # evaluate dbfuns return result_generator(getattr(self, name), context)
[docs] def evaluate_A(self, *args): return self.evaluate_A_B_X('A', *args)
[docs] def evaluate_B(self, *args): return self.evaluate_A_B_X('B', *args)
[docs] def evaluate_X(self, *args): return self.evaluate_A_B_X('X', *args)
[docs] def evaluate_ABX(self, on_across_by_values, db, triplets, context=None): stage = 'ABX' # set up context. context passed as an argument can be used to # induce side-effects in the result generator, for example for # lazy filter evaluation if context is None: context = {} context = self.set_by_context(context, stage, on_across_by_values) context = self.set_on_across_context( context, stage, on_across_by_values) for var in context: context[var] = context[var] * len(triplets) context = self.set_ABX_context(context, db, triplets) # evaluate dbfuns return result_generator(getattr(self, stage), context)
[docs]def result_generator(db_funs, context): # to avoid any undesirable side-effects a deep-copy of the context is made # each time return (db_fun.evaluate(copy.deepcopy(context)) for db_fun in db_funs)
[docs]def singleton_result_generator(db_funs, context): # to avoid any undesirable side-effects a deep-copy of the context is made # each time return (db_fun.evaluate(copy.deepcopy(context))[0] for db_fun in db_funs)
# db_fun.evaluate returns [[np_array_output_1_dbfun_1, # np_array_output_2_dbfun_1,...], [np_array_output_1_dbfun_2, ...], # ...] # # Would the previous functions change with VLEN outputs that would # change this pattern ? # def test(): # import ABX.lib.database # _, db_h = ABX.lib.database.load('../test/AI/corpus/AI.item') # som = SideOperationsManager( # db_h, ['consonant'], ['talker'], ['syllable_type', 'vowel'])