Source code for capsul.attributes.fom_completion_engine

# -*- coding: utf-8 -*-

'''
Completion engine for File Organization Models (FOM).

Classes
=======
:class:`FomProcessCompletionEngine`
-----------------------------------
:class:`FomPathCompletionEngine`
--------------------------------
:class:`FomProcessCompletionEngineIteration`
--------------------------------------------
'''

from __future__ import print_function
from __future__ import absolute_import

import os
import six
from traits.api import Str, HasTraits, List, Undefined

from soma.controller import Controller, ControllerTrait
from capsul.pipeline.pipeline import Pipeline
from capsul.pipeline.pipeline_nodes import Node, Switch, ProcessNode
from capsul.attributes.completion_engine import ProcessCompletionEngine, \
    ProcessCompletionEngineFactory, PathCompletionEngine, \
    PathCompletionEngineFactory
from capsul.attributes.completion_engine_iteration \
    import ProcessCompletionEngineIteration
from capsul.pipeline.process_iteration import ProcessIteration
from capsul.attributes.attributes_schema import ProcessAttributes, \
    EditableAttributes
from soma.fom import DirectoryAsDict
from soma.path import split_path
from soma.sorted_dictionary import SortedDictionary


[docs] class FomProcessCompletionEngine(ProcessCompletionEngine): """ FOM (File Organization Model) implementation of completion engine. * A capsul.study_config.StudyConfig also needs to be configured with FOM module, and selected FOMS and directories: :: from capsul.api import StudyConfig from capsul.study_config.config_modules.fom_config import FomConfig study_config = StudyConfig(modules=StudyConfig.default_modules + ['FomConfig', 'BrainVISAConfig']) study_config.update_study_configuration('study_config.json') * Only then a FomProcessCompletionEngine can be created: :: process = get_process_instance('morphologist') fom_completion_engine = FomProcessCompletionEngine( process, study_config) But generally this creation is handled via the ProcessCompletionEngine.get_completion_engine() function: :: fom_completion_engine = ProcessCompletionEngine.get_completion_engine( process) Parameters ---------- name: string (optional) name of the process in the FOM dictionary. By default the process.name variable will be used. Methods ------- create_attributes_with_fom """ def __init__(self, process, name=None): super(FomProcessCompletionEngine, self).__init__( process=process, name=name) self.input_fom = None self.output_fom = None self.shared_fom = None
[docs] def get_attribute_values(self): ''' Get attributes Controller associated to a process Returns ------- attributes: Controller ''' if not self._rebuild_attributes \ and self.trait('capsul_attributes') is not None \ and hasattr(self, 'capsul_attributes'): return self.capsul_attributes self.create_attributes_with_fom() self._rebuild_attributes = False return self.capsul_attributes
[docs] def create_attributes_with_fom(self): """To get useful attributes by the fom""" # print('create_attributes_with_fom for', self.process, ', FCE:', self) process = self.process if isinstance(process, ProcessNode): process = process.process study_config = process.study_config modules_data = study_config.modules_data #Get attributes in input fom id = getattr(process, 'id', None) names_search_list = [self.name] if id: names_search_list.append(id) names_search_list += [process.name, getattr(process, 'context_name', '')] schemas = self._get_schemas() if not hasattr(self, 'capsul_attributes'): self.add_trait('capsul_attributes', ControllerTrait(Controller())) self.capsul_attributes = ProcessAttributes(self.process, schemas) capsul_attributes = self.capsul_attributes matching_fom = False input_found = False output_found = False shared_fom = None # print('create_attributes_with_fom for', self.process, ', FCE:', self, ', foms:', {n: f.fom_names[-1] for n, f in modules_data.foms.items()}) foms = SortedDictionary() foms.update(modules_data.foms) if study_config.auto_fom: # in auto-fom mode, also search in additional and non-loaded FOMs foms.update(modules_data.all_foms) def editable_attributes(attributes, fom): ea = EditableAttributes() for attribute in attributes: if attribute.startswith('fom_'): continue # skip FOM internals default_value = fom.attribute_definitions[attribute].get( 'default_value', '') ea.add_trait(attribute, Str(default_value, optional=True)) return ea sel_foms = {} if study_config.input_fom: sel_foms['input'] = study_config.input_fom if study_config.output_fom: sel_foms['output'] = study_config.output_fom if study_config.shared_fom: sel_foms['shared'] = study_config.shared_fom for schema, fom in foms.items(): if fom is None: fom, atp, pta \ = study_config.modules['FomConfig'].load_fom(schema) foms[schema] = fom if schema not in ('input', 'output', 'shared'): # exclude incompatible FOMs for fs in ('input', 'output', 'shared'): found = False f = modules_data.foms.get(fs) if f is None: if fs == 'shared': if 'shared' in schema: sel_foms['shared'] = schema found = True elif 'shared' not in schema: sel_foms[fs] = schema found = True elif fom is not f: if sum([n in fom.fom_names for n in f.fom_names]) \ == len(f.fom_names): # all sub-foms of f included in fom: # possibly keep it if fs != 'shared': # print('extend fom:', schema, 'for', fs, ':', f.fom_names) for name in names_search_list: fom_patterns = fom.patterns.get(name) if fom_patterns is not None: found = True break #else: #print('process not found - discard it') else: found = True if found: sel_foms[fs] = schema # print('new foms:', sel_foms) fom_modified = False found = False for schema, fom_type in sel_foms.items(): # print('schema:', schema, fom_type) fom = modules_data.all_foms[fom_type] atp = modules_data.fom_atp['all'].get(fom_type) \ or modules_data.fom_atp.get(schema) if atp is None: continue for name in names_search_list: fom_patterns = fom.patterns.get(name) if fom_patterns is not None: break else: # print('process', names_search_list, 'not found in', fom_type) continue found = True # print('completion using FOM:', schema, fom_type, 'for', process.id, ', atp:', atp) #break for parameter in fom_patterns: param_attributes = atp.find_discriminant_attributes( fom_parameter=parameter, fom_process=name) ea = editable_attributes(param_attributes, fom) try: capsul_attributes.set_parameter_attributes( parameter, schema, ea, {}) except KeyError: # param already registered pass if (schema not in modules_data.foms or modules_data.foms[schema] != fom or schema not in modules_data.fom_atp or modules_data.fom_atp[schema] != atp or getattr(study_config, '%s_fom' % schema) != fom_type): modules_data.foms[schema] = fom modules_data.fom_atp[schema] = atp setattr(study_config, '%s_fom' % schema, fom_type) fom_modified = True if not found: # no FOM contains the process raise KeyError('No matching FOM for process %s' % repr(names_search_list)) #if fom_modified: # the modif will trigger another completion #return # in a pipeline, we still must iterate over nodes to find switches, # which have their own behaviour. if isinstance(process, Pipeline): attributes = self.capsul_attributes name = process.name for node_name, node in list(process.nodes.items()): if isinstance(node, Switch): subprocess = node if subprocess is None: continue pname = '.'.join([name, node_name]) subprocess_compl = \ ProcessCompletionEngine.get_completion_engine( subprocess, pname) try: sub_attributes \ = subprocess_compl.get_attribute_values() except Exception: try: subprocess_compl = self.__class__(subprocess) sub_attributes \ = subprocess_compl.get_attribute_values() except Exception: continue for attribute, trait \ in six.iteritems(sub_attributes.user_traits()): if attributes.trait(attribute) is None: attributes.add_trait(attribute, trait) setattr(attributes, attribute, getattr(sub_attributes, attribute)) self._get_linked_attributes() # remember foms for later if 'input' in sel_foms: self.input_fom = sel_foms['input'] else: self.input_fom = study_config.input_fom if 'output' in sel_foms: self.output_fom = sel_foms['output'] else: self.output_fom = study_config.output_fom if 'shared' in sel_foms: self.shared_fom = sel_foms['shared'] else: self.shared_fom = study_config.shared_fom
@staticmethod def setup_fom(process): completion_engine \ = ProcessCompletionEngine.get_completion_engine(process) if not isinstance(completion_engine, FomProcessCompletionEngine): return if not hasattr(completion_engine, 'input_fom') \ or completion_engine.input_fom is None: if process.study_config.input_fom != completion_engine.input_fom \ or process.study_config.output_fom \ != completion_engine.output_fom \ or process.study_config.shared_fom \ != completion_engine.shared_fom: # FOMs have changed: rebuild attributes completion_engine._rebuild_attributes = True completion_engine.create_attributes_with_fom() if process.study_config.input_fom != completion_engine.input_fom: process.study_config.input_fom = completion_engine.input_fom if process.study_config.output_fom != completion_engine.output_fom: process.study_config.output_fom = completion_engine.output_fom if process.study_config.shared_fom != completion_engine.shared_fom: process.study_config.shared_fom = completion_engine.shared_fom
[docs] def path_attributes(self, filename, parameter=None): """By the path, find value of attributes""" pta = self.process.study_config.modules_data.fom_pta['input'] # Extract the attributes from the first result returned by # parse_directory liste = split_path(filename) len_element_to_delete = 1 for element in liste: if element != os.sep: len_element_to_delete \ = len_element_to_delete + len(element) + 1 new_value = filename[len_element_to_delete:len(filename)] try: #import logging #logging.root.setLevel( logging.DEBUG ) #path, st, self.attributes = pta.parse_directory( # DirectoryAsDict.paths_to_dict( new_value), # log=logging ).next() path, st, attributes = next(pta.parse_directory( DirectoryAsDict.paths_to_dict( new_value) )) break except StopIteration: if element == liste[-1]: raise ValueError( '%s is not recognized for parameter "%s" of "%s"' % (new_value, parameter, self.process.name)) attrib_values = self.get_attribute_values().export_to_dict() for att in attributes: if att in list(attrib_values.keys()): setattr(attrib_values, att, attributes[att]) return attributes
[docs] def get_path_completion_engine(self): ''' ''' return FomPathCompletionEngine()
@staticmethod def _fom_completion_factory(process, name): ''' Factoty inserted in attributed_processFactory ''' study_config = process.get_study_config() if study_config is None \ or 'FomConfig' not in study_config.modules \ or study_config.use_fom == False: #print("no FOM:", study_config, study_config.modules.keys()) return None # Non Fom config, no way it could work try: pfom = FomProcessCompletionEngine(process, name) return pfom except KeyError: # process not in FOM pass return None
[docs] class FomPathCompletionEngine(PathCompletionEngine):
[docs] def attributes_to_path(self, process, parameter, attributes): ''' Build a path from attributes Parameters ---------- process: Process instance parameter: str attributes: ProcessAttributes instance (Controller) ''' FomProcessCompletionEngine.setup_fom(process) input_fom = process.study_config.modules_data.foms['input'] output_fom = process.study_config.modules_data.foms['output'] input_atp = process.study_config.modules_data.fom_atp['input'] output_atp = process.study_config.modules_data.fom_atp['output'] #Create completion names_search_list = [] if isinstance(process, Node): trait = process.get_trait(parameter) name = process.name if hasattr(process, 'process'): if hasattr(process.process, 'context_name'): names_search_list.append(process.process.context_name) names_search_list.append(process.process.name) else: trait = process.trait(parameter) name = process.id names_search_list.append(name) if trait.output: atp = output_atp fom = output_fom else: atp = input_atp fom = input_fom names_search_list += [process.name, getattr(process, 'context_name', '')] for fname in names_search_list: fom_patterns = fom.patterns.get(fname) if fom_patterns is not None: name = fname break else: raise KeyError('Process not found in FOMs amongst %s' \ % repr(names_search_list)) allowed_attributes = set(attributes.user_traits().keys()) allowed_attributes.discard('parameter') allowed_attributes.discard('process_name') #allowed_attributes = set(attributes.get_parameters_attributes()[ #parameter].keys()) #allowed_attributes.discard('type') #allowed_attributes.discard('generated_by_parameter') #allowed_attributes.discard('generated_by_process') # Select only the attributes that are discriminant for this # parameter otherwise other attributes can prevent the appropriate # rule to match parameter_attributes = atp.find_discriminant_attributes( fom_parameter=parameter, fom_process=name) d = dict((i, getattr(attributes, i)) \ for i in parameter_attributes if i in allowed_attributes) d['fom_process'] = name d['fom_parameter'] = parameter d['fom_format'] = 'fom_preferred' path_value = None #path_values = [] #debug = getattr(self, 'debug', None) for h in atp.find_paths(d): # , debug=debug): path_value = h[0] # find_paths() is a generator which can sometimes generate # several values (formats). We are only interested in the # first one. #path_values.append(h[0]) break return path_value
[docs] def open_values_attributes(self, process, parameter): ''' Attributes with "open" values, not restricted to a list of possible values ''' # print('open_values_attributes', process.id, parameter) FomProcessCompletionEngine.setup_fom(process) for schema in ('input', 'output', 'shared'): fom = process.study_config.modules_data.foms[schema] atp = process.study_config.modules_data.fom_atp[schema] # print('fom:', fom.fom_names) # print('atp:', atp) name = getattr(process, 'id', process.name) names_search_list = [] if hasattr(process, 'id'): names_search_list.append(process.id) names_search_list += [process.name, getattr(process, 'context_name', '')] for fname in names_search_list: fom_patterns = fom.patterns.get(fname) if fom_patterns is not None: name = fname break else: continue values = atp.find_attributes_values() attributes = [k for k, v in six.iteritems(values) if k not in ('fom_name', 'fom_process', 'fom_parameter', 'fom_format') and len(v) == 2 and v[1] == (u'', )] return attributes return None
[docs] def allowed_formats(self, process, parameter): ''' List of possible formats names associated with a parameter ''' FomProcessCompletionEngine.setup_fom(process) formats = [] for schema in ('input', 'output', 'shared'): atp = process.study_config.modules_data.fom_atp[schema] sub_f = atp.allowed_formats_for_parameter(process.name, parameter) formats += [f for f in sub_f if f not in formats] return formats
[docs] def allowed_extensions(self, process, parameter): ''' List of possible file extensions associated with a parameter ''' FomProcessCompletionEngine.setup_fom(process) exts = set() for schema in ('input', 'output', 'shared'): atp = process.study_config.modules_data.fom_atp[schema] sub_e = atp.allowed_extensions_for_parameter( process_name=process.name, param=parameter) exts.update(sub_e) # sort and add dots exts2 = sorted(['.%s' % e for e in exts if e]) if '' in exts: exts2.append('') return exts2
[docs] class FomProcessCompletionEngineIteration(ProcessCompletionEngineIteration):
[docs] def get_iterated_attributes(self): process = self.process if isinstance(process, ProcessNode): process = process.process subprocess = process.process FomProcessCompletionEngine.setup_fom(subprocess) input_fom = subprocess.study_config.modules_data.foms['input'] output_fom = subprocess.study_config.modules_data.foms['output'] input_atp = subprocess.study_config.modules_data.fom_atp['input'] output_atp = subprocess.study_config.modules_data.fom_atp['output'] name = subprocess.id names_search_list = (subprocess.id, subprocess.name, getattr(subprocess, 'context_name', '')) for fom in (input_fom, output_fom): for fname in names_search_list: fom_patterns = fom.patterns.get(fname) if fom_patterns is not None: name = fname break else: continue break else: raise KeyError('Process not found in FOMs amongst %s' \ % repr(names_search_list)) iter_attrib = set() if not process.iterative_parameters: params = list(subprocess.user_traits().keys()) else: params = process.iterative_parameters for parameter in params: if subprocess.trait(parameter).output: atp = output_atp else: atp = input_atp parameter_attributes = set([ x for x in atp.find_discriminant_attributes( fom_parameter=parameter, fom_process=name) if not x.startswith('fom_')]) iter_attrib.update(parameter_attributes) return iter_attrib
#class FomPathCompletionEngineFactory(PathCompletionEngineFactory): #factory_id = 'fom' #def get_path_completion_engine(self, process): #return FomPathCompletionEngine(process)