Source code for capsul.attributes.completion_engine

# -*- coding: utf-8 -*-
'''
Completion system for Capsul

Classes
=======
:class:`ProcessCompletionEngine`
--------------------------------
:class:`SwitchCompletionEngine`
-------------------------------
:class:`PathCompletionEngine`
-----------------------------
:class:`ProcessCompletionEngineFactory`
---------------------------------------
:class:`PathCompletionEngineFactory`
------------------------------------
'''

from __future__ import print_function
from __future__ import absolute_import
from soma.singleton import Singleton
from soma.controller import Controller, ControllerTrait
from capsul.pipeline.pipeline import Pipeline
from capsul.pipeline.pipeline import Graph
from capsul.pipeline.pipeline_nodes import (
    Node, ProcessNode, Switch, PipelineNode)
from capsul.attributes.attributes_schema import ProcessAttributes, \
    EditableAttributes
from capsul.pipeline import pipeline_tools
import traits.api as traits
from soma.utils.weak_proxy import weak_proxy, get_ref
from soma.functiontools import SomaPartial
from soma.controller.trait_utils import relax_exists_constraint
import six
import sys
import copy
from six.moves import range

# DEBUG
#ce_calls = 0


[docs] class ProcessCompletionEngine(traits.HasTraits): ''' Parameters completion from attributes for a process or pipeline node instance, in the context of a specific data organization. ProcessCompletionEngine can be used directly for a pipeline, which merely delegates completion to its nodes, and has to be subclassed for a data organization framework on a node. To get a completion engine, use: :: completion_engine = ProcessCompletionEngine.get_completion_engine( node, name) Note that this will assign permanently the ProcessCompletionEngine object to its associated node or process. To get and set the attributes set: :: attributes = completion_engine.get_attribute_values() print(attributes.user_traits().keys()) attributes.specific_process_attribute = 'a value' Once attributes are set, to process with parameters completion: :: completion_engine.complete_parameters() It is possible to have complete_parameters() triggered automatically when attributes or switch nodes change. To set this up, use: :: completion_engine.install_auto_completion() ProcessCompletionEngine can (and should) be specialized, at least to provide the attributes set for a given process. A factory is used to create the correct type of ProcessCompletionEngine for a given process / name: :py:class:`ProcessCompletionEngineFactory` :py:class:`capsul.attributes.fom_completion_engine.FomProcessCompletionEngine` is a specialization of ``ProcessCompletionEngine`` to manage File Organization Models (FOM). Methods ------- get_completion_engine get_attribute_values complete_parameters set_parameters attributes_to_path get_path_completion_engine install_auto_completion remove_auto_completion ''' def __init__(self, process, name=None): super(ProcessCompletionEngine, self).__init__( process=process, name=name) self.process = weak_proxy(process, self._clear_node) self.name = name self.completion_ongoing = False self.add_trait('completion_progress', traits.Float(0.)) self.add_trait('completion_progress_total', traits.Float(1.)) self._rebuild_attributes = False def __del__(self): self.remove_switch_observers() self.remove_auto_completion() def _clear_node(self, wr): '''Called when the object behind the self.process proxy is about to be deleted ''' self.process = None
[docs] def get_attribute_values(self): ''' Get attributes Controller associated to a process or node Returns ------- attributes: ProcessAttributes instance The default implementation does nothing for a single Process instance, and merges attributes from its children if the process is a pipeline. ''' if not self._rebuild_attributes \ and 'capsul_attributes' in self._instance_traits(): # we have to use this private HasTraits method _instance_traits() # to know if we actually have an instance trait, because # class traits are automatically added whenever we access an # attribute of another instance which doesn't have it (!) return self.capsul_attributes schemas = self._get_schemas() study_config = self.process.get_study_config() engine = study_config.engine proc_attr_cls = ProcessAttributes if 'capsul.engine.module.attributes' in engine._loaded_modules: factory = engine._modules_data['attributes']['attributes_factory'] names = [self.process.name] if hasattr(self.process, 'context_name'): names.insert(0, self.process.context_name) for name in names: try: proc_attr_cls = factory.get('process_attributes', name) found = True break except ValueError: pass if not hasattr(self, 'capsul_attributes'): self.add_trait('capsul_attributes', ControllerTrait(Controller())) self.capsul_attributes = proc_attr_cls(self.process, schemas) self._rebuild_attributes = False # if no specialized attributes set and process is a pipeline, # try building from children nodes if proc_attr_cls is ProcessAttributes \ and isinstance(self.process, (PipelineNode, Pipeline)): attributes = self.capsul_attributes pipeline = self.process if isinstance(pipeline, PipelineNode): pipeline = pipeline.process name = getattr(pipeline, 'context_name', getattr(self.process, 'context_name', getattr(pipeline, 'context_name', pipeline.name))) for node_name, node in six.iteritems(pipeline.nodes): if node_name == '': continue subprocess = None if hasattr(node, 'process'): subprocess = node.process elif isinstance(node, Switch): subprocess = node if subprocess is not None: pname = '.'.join([name, node_name]) subprocess_compl = \ ProcessCompletionEngine.get_completion_engine( subprocess, pname) try: sub_attributes \ = subprocess_compl.get_attribute_values() except Exception: try: subprocess_compl = self.__class__(subprocess) sub_attributes \ = subprocess_compl.get_attribute_values() except Exception: continue for attribute, trait \ in six.iteritems(sub_attributes.user_traits()): if attribute not in attributes._instance_traits(): attributes.add_trait(attribute, trait) # all attributes are optional by default attributes.trait(attribute).optional = True setattr(attributes, attribute, getattr(sub_attributes, attribute)) self._get_linked_attributes() return self.capsul_attributes
def _get_linked_attributes(self): # for parameters which still do not have attributes, we can try # using links: if a linked parameter in a sub-process has # attributes, then we can get them here. attributes = self.capsul_attributes schema = 'link' # FIXME param_attributes = attributes.get_parameters_attributes() forbidden_attributes = set(['generated_by_parameter', 'generated_by_process']) done_parameters = set( [p for p, al in six.iteritems(param_attributes) if len([a for a in al if a not in forbidden_attributes]) != 0]) traits_types = {str: traits.Str, six.text_type: traits.Str, int: traits.Int, float: traits.Float, list: traits.List} pipeline = self.process if isinstance(pipeline, PipelineNode): pipeline = pipeline.process name = pipeline.name for pname, trait in six.iteritems(pipeline.user_traits()): if pname in done_parameters: continue plug = pipeline.pipeline_node.plugs.get(pname) if plug is None: continue if trait.output: links = plug.links_from else: links = plug.links_to for link in links: node = link[2] completion_engine \ = ProcessCompletionEngine.get_completion_engine( node, '.'.join([name, link[0]])) sub_attributes = completion_engine.get_attribute_values() sub_p_attribs = sub_attributes.get_parameters_attributes() if link[1] in sub_p_attribs: s_p_attributes = sub_p_attribs[link[1]] if len(s_p_attributes) != 0 \ and len([x for x in s_p_attributes.keys() if x not in forbidden_attributes]) != 0: ea = EditableAttributes() for attribute, value in six.iteritems( s_p_attributes): if attribute not in forbidden_attributes: ttype = traits_types.get(type(value)) if ttype is not None: trait = ttype() else: trait = value ea.add_trait(attribute, ttype) # all attributes are optional by default ea.trait(attribute).optional = True setattr(ea, attribute, value) attributes.set_parameter_attributes( pname, schema, ea, {}) break
[docs] def complete_parameters(self, process_inputs={}, complete_iterations=True): ''' Completes file parameters from given inputs parameters, which may include both "regular" process parameters (file names) and attributes. Parameters ---------- process_inputs: dict (optional) parameters to be set on the process. It may include "regular" process parameters, and attributes used for completion. Attributes should be in a sub-dictionary under the key "capsul_attributes". complete_iterations: bool (optional) if False, iteration nodes inside the pipeline will not run their own completion. Thus parameters for the iterations will not be correctly completed. However this has 2 advantages: 1. it prevents modification of the input pipeline, 2. it will not do iterations completion which will anyway be done (again) when building a workflow in `~capsul.pipeline.pipeline_workflow.workflow_from_pipeline`. ''' self.completion_progress = 0. self.completion_progress_total = 1. self.set_parameters(process_inputs) # if process is a pipeline, trigger completions for its nodes and # sub-pipelines. # # Note: for now we do so first, so that parameters can be overwritten # afterwards by the higher-level pipeline FOM. # Ideally we should process the other way: complete high-level, # specific parameters first, then complete with lower-level, more # generic ones, while blocking already set ones. # as this blocking mechanism does not exist yet, we do it this way for # now, but it is sub-optimal since many parameters will be set many # times. def satisfied_deps(node, all_nodes, done): for param, plug in node.plugs.items(): if not plug.output: for link in plug.links_from: snode = link[2] if snode not in done \ and (link[0], snode) in all_nodes: return False return True verbose = False pipeline = None if isinstance(self.process, PipelineNode): pipeline = self.process.process elif isinstance(self.process, Pipeline): pipeline = self.process if pipeline: attrib_values = self.get_attribute_values().export_to_dict() name = getattr(pipeline, 'context_name', pipeline.name) # build nodes list nodes_list = set([n for n in pipeline.nodes.items() if n[0] != '' and pipeline_tools.is_node_enabled( pipeline, n[0], n[1])]) init_result = True done = set() todo = [(node_name, node) for node_name, node in nodes_list if satisfied_deps(node, nodes_list, done)] self.completion_progress_total = len(nodes_list) + 0.05 index = 0 # process topologically through nodes dependencies while todo: node_name, node = todo.pop(0) done.add(node) pname = '.'.join([name, node_name]) subprocess_compl = \ ProcessCompletionEngine.get_completion_engine(node, pname) self._install_subprogress_moniotoring(subprocess_compl) try: subprocess_compl.complete_parameters( {'capsul_attributes': attrib_values}, complete_iterations=complete_iterations) except Exception: try: self.__class__(node).complete_parameters( {'capsul_attributes': attrib_values}, complete_iterations=complete_iterations) except Exception: pass self._remove_subprogress_moniotoring(subprocess_compl) # increase progress notification index += 1 self.completion_progress = index # insert downstream nodes in todo list for param, plug in node.plugs.items(): if not plug.output: continue links = plug.links_to for l in links: dnode = l[2] #print(l[0], dnode in done, dnode in ) if dnode not in done \ and (l[0], dnode) in nodes_list \ and (l[0], dnode) not in todo \ and satisfied_deps(dnode, nodes_list, done): todo.append((l[0], dnode)) # not needed any longer: ## release "exists" property on connected traits #if hasattr(dnode, 'process'): #p = l[2].process #else: #p = l[2] #trait = p.trait(l[1]) #if trait: #relax_exists_constraint(trait) ## FIXME this very specific stuff should be ## handled another way at another place... #if hasattr(p, 'process') \ #and hasattr(p.process, 'inputs'): ## MIA custom wrappings of nipype interfaces ## are this way, and do not release the ## exists constrain internally. #relax_exists_constraint( #p.process.inputs.trait(l[1])) if len(done) != len(nodes_list): print('Some nodes of the pipeline could not be reached ' 'through dependencies. The pipeline structure is ' 'probably wrong:') print([nname for nname, n in nodes_list if n not in done]) attributes = self.get_attribute_values() # if some attributes are list, we must separate list and non-list # attributes, and use an un-listed controller to get a path have_list = any([isinstance(t.trait_type, traits.List) for t in attributes.user_traits().values()]) if have_list: attributes_single = attributes.copy_to_single(with_values=True) attributes_list = attributes_single.copy_to_single( with_values=True) else: # no list parameter attributes_single = attributes # now complete process parameters: process = self.process if isinstance(process, ProcessNode): process = process.process for pname, trait in six.iteritems(process.user_traits()): if trait.forbid_completion \ or process.is_parameter_protected(pname): # completion has been explicitly disabled on this parameter continue value = [] # for the try.. except try: if isinstance(process.trait(pname).trait_type, traits.List): nmax = 0 param_att = attributes.parameter_attributes.get(pname) if param_att is None: continue # no completion for you # FIXME: why [0][0]; check what it is... for a in param_att[0][0] \ .user_traits().keys(): att_value = getattr(attributes, a) if not isinstance(att_value, list): if att_value is not None: nmax = max(nmax, 1) else: nmax = max(nmax, len(att_value)) value = [] # param is a list: call iteratively the path completion # for each attributes values set for item in range(nmax): for a, t in six.iteritems(attributes.user_traits()): if isinstance(t.trait_type, traits.List): att_value = getattr(attributes, a) if not isinstance(att_value, list): setattr(attributes_list, a, att_value) elif len(att_value) == 0: setattr(attributes_list, a, t.inner_traits[0].default) else: if len(att_value) <= item: item = -1 setattr(attributes_list, a, att_value[item]) value.append( self.attributes_to_path(pname, attributes_list)) # avoid case of invalid attribute values if value == [None]: value = [] else: if pname in attributes.parameter_attributes: value = self.attributes_to_path(pname, attributes_single) else: value = None # not in pattern: don't complete if value is not None: # should None be valid ? setattr(process, pname, value) except Exception as e: if verbose: print('Exception:', e) print('param:', pname) print('value:', repr(value)) import traceback traceback.print_exc() #pass self.completion_progress = self.completion_progress_total
[docs] def attributes_to_path(self, parameter, attributes): ''' Build a path from attributes for a given parameter in a process. Parameters ---------- parameter: str attributes: ProcessAttributes instance (Controller) ''' return self.get_path_completion_engine() \ .attributes_to_path(self.process, parameter, attributes)
[docs] def set_parameters(self, process_inputs): ''' Set the given parameters dict to the given process. process_inputs may include regular parameters of the underlying process, and attributes (capsul_attributes: dict). ''' # This convenience method only differs from the Controller # import_from_dict() method in the way that capsul_attributes items # will not completely replace all the attributes values, but only set # those specified here, and leave the others in place. dst_attributes = self.get_attribute_values() attributes = process_inputs.get('capsul_attributes') if attributes: avail_attrib = set(dst_attributes.user_traits().keys()) attributes = dict((k, v) for k, v in six.iteritems(attributes) if k in avail_attrib) dst_attributes.import_from_dict(attributes) process_inputs = dict((k, v) for k, v in six.iteritems(process_inputs) if k != 'capsul_attributes') process = self.process if isinstance(process, ProcessNode): process = self.process.process process.import_from_dict(process_inputs)
[docs] def attributes_changed(self, obj, name, old, new): ''' Traits changed callback which triggers parameters update. This method basically calls complete_parameters() (after some checks). Users do not normally have to use it directly, it is used internally when install_auto_completion() has been called. See :py:meth:`install_auto_completion` ''' if name != 'trait_added' and name != 'user_traits_changed' \ and self.completion_ongoing is False: #setattr(self.capsul_attributes, name, new) self.completion_ongoing = True self.complete_parameters({'capsul_attributes': {name: new}}) self.completion_ongoing = False
[docs] def nodes_selection_changed(self, obj, name, old, new): ''' Traits changed callback which triggers parameters update. This method basically calls complete_parameters() (after some checks). Users do not normally have to use it directly, it is used internally when install_auto_completion() has been called. See :py:meth:`install_auto_completion` ''' if not self.completion_ongoing: self.completion_ongoing = True self._rebuild_attributes = True self.complete_parameters() self.completion_ongoing = False
[docs] def install_auto_completion(self): ''' Monitor attributes changes and switches changes (which may influence attributes) and recompute parameters completion when needed. ''' self.get_attribute_values().on_trait_change( self.attributes_changed, 'anytrait') process = self.process if isinstance(process, ProcessNode): process = process.process if isinstance(process, Pipeline): for node_name, node in six.iteritems(process.nodes): if isinstance(node, Switch): # a switch may change attributes dynamically # so we must be notified if this happens. node.on_trait_change(self.nodes_selection_changed, 'switch')
[docs] def remove_auto_completion(self): ''' Remove attributes monitoring and auto-recomputing of parameters. Reverts install_auto_completion() ''' if self.process is not None: try: av = self.process.completion_engine except (ReferenceError, AttributeError): return av.on_trait_change( self.attributes_changed, 'anytrait', remove=True) process = self.process if isinstance(process, PipelineNode): process = process.process if isinstance(process, Pipeline): for node_name, node in six.iteritems(process.nodes): if isinstance(node, Switch): # a switch may change attributes dynamically # so we must be notified if this happens. node.on_trait_change(self.nodes_selection_changed, 'switch', remove=True)
[docs] def get_path_completion_engine(self): ''' Get a PathCompletionEngine object for the given process. The default implementation queries PathCompletionEngineFactory, but some specific ProcessCompletionEngine implementations may override it for path completion at the process level (FOMs for instance). ''' study_config = self.process.get_study_config() engine = study_config.engine engine_factory = None if 'capsul.engine.module.attributes' in engine._loaded_modules: try: engine_factory \ = engine._modules_data['attributes'] \ ['attributes_factory'].get( 'path_completion', study_config.path_completion) except ValueError: pass # not found if engine_factory is None: engine_factory = PathCompletionEngineFactory() return engine_factory.get_path_completion_engine(self.process)
[docs] @staticmethod def get_completion_engine(process, name=None): ''' Get a ProcessCompletionEngine instance for a given process/node within the framework of its StudyConfig factory function. ''' #global ce_calls #ce_calls += 1 engine_factory = None study_config = None if hasattr(process, 'get_study_config'): # switches don't have a study_config at the moment. study_config = process.get_study_config() engine = study_config.engine if 'capsul.engine.module.attributes' in engine._loaded_modules: try: engine_factory \ = engine._modules_data['attributes'] \ ['attributes_factory'].get( 'process_completion', study_config.process_completion) except ValueError: pass # not found if engine_factory is None: from . import completion_engine_factory engine_factory = completion_engine_factory. \ BuiltinProcessCompletionEngineFactory() completion_engine = engine_factory.get_completion_engine( process, name=name) # I remove the completion_engine cache because when the FOM config # changes in StudyConfig, the completion engine may change also, # and caching it will make use of an obsolete one. # The other option is to remove the caches when the FOM config changes, # but this needs to setup many callbacks that we don't know easily # when to clear (process deletion etc) ## set the completion engine into the process if completion_engine is not None: process.completion_engine = completion_engine process._has_studyconfig_callback = True if study_config is not None: from capsul.process.process import Process if isinstance(process, Process): nclass = Process else: nclass = Node if not hasattr(nclass, '_remove_completion_engine'): nclass._remove_completion_engine \ = ProcessCompletionEngine._remove_completion_engine nclass.__del__ \ = ProcessCompletionEngine._del_process_callback else: try: # remove former callback, if any study_config.on_trait_change( process._remove_completion_engine, 'use_fom,input_fom,output_fom,shared_fom', remove=True) except Exception: pass study_config.on_trait_change( process._remove_completion_engine, 'use_fom,input_fom,output_fom,shared_fom') return completion_engine
@staticmethod def _remove_completion_engine(process): if hasattr(process, 'completion_engine'): del process.completion_engine @staticmethod def _del_process_callback(process): if hasattr(process, 'study_config') \ and process.study_config is not None \ and hasattr(process, '_has_studyconfig_callback'): process.study_config.on_trait_change( process._remove_completion_engine, 'use_fom,input_fom,output_fom,shared_fom', remove=True) del process._has_studyconfig_callback def _get_schemas(self): ''' Get schemas dictionary from process and its StudyConfig ''' schemas = {} try: study_config = self.process.get_study_config() engine = study_config.engine except ReferenceError: # process is deleted return schemas factory = getattr(engine, '_modules_data', {}).get( 'attributes', {}).get('attributes_factory', None) if factory is not None: for dir_name, schema_name \ in six.iteritems(study_config.attributes_schemas): schemas[dir_name] = factory.get('schema', schema_name) return schemas def _install_subprogress_moniotoring(self, subprocess_compl): monitor_subprocess_progress = getattr( self, 'monitor_subprocess_progress', True) if monitor_subprocess_progress: self._old_monitor_sub = getattr( subprocess_compl, 'monitor_subprocess_progress', True) subprocess_compl.monitor_subprocess_progress = False self._current_progress = self.completion_progress self._monitoring_callback = SomaPartial( self.__class__._substep_completion_progress, weak_proxy(self), subprocess_compl) subprocess_compl.on_trait_change( self._monitoring_callback, 'completion_progress') def _remove_subprogress_moniotoring(self, subprocess_compl): monitor_subprocess_progress = getattr( self, 'monitor_subprocess_progress', True) if monitor_subprocess_progress: subprocess_compl.on_trait_change( self._monitoring_callback, 'completion_progress', remove=True) del self._current_progress del self._monitoring_callback if self._old_monitor_sub: del subprocess_compl.monitor_subprocess_progress del self._old_monitor_sub @staticmethod def _substep_completion_progress(self, substep_completion_engine, obj, name, old, new): sub_completion_rate \ = substep_completion_engine.completion_progress \ / substep_completion_engine.completion_progress_total self.completion_progress = self._current_progress \ + sub_completion_rate
[docs] def remove_attributes(self): '''Clear attributes controller cache, to allow rebuilding it after a change. This is generally a callback attached to switches changes. ''' if 'capsul_attributes' in self._instance_traits(): self.remove_trait('capsul_attributes')
[docs] def remove_switch_observers(self): '''Remove notification callbacks previously set to listen switches state changes. ''' try: process = self.process if isinstance(process, PipelineNode): process = process.process if isinstance(process, Pipeline): for name, node in six.iteritems(process.nodes): if isinstance(node, Switch) \ and hasattr(node, 'completion_engine'): completion_engine = node.completion_engine completion_engine.remove_switch_observer(self) except ReferenceError: # the process has already been destroyed pass
[docs] class SwitchCompletionEngine(ProcessCompletionEngine): ''' Completion engine specislization for a switch. The switch will propagate attributes from its selected inputs to corresponding outputs, if they can be retrieved from parameters links. Otherwise the countrary will be tried (propagated from outputs to inputs). ''' def __del__(self): self.remove_switch_observer()
[docs] def get_attribute_values(self): if 'capsul_attributes' in self._instance_traits(): return self.capsul_attributes self.add_trait('capsul_attributes', ControllerTrait(Controller())) capsul_attributes = ProcessAttributes(self.process, {}) self.capsul_attributes = capsul_attributes outputs = self.process._outputs schema = 'switch' # FIXME name = getattr(self.process, 'context_name', self.name) pipeline_name = '.'.join(name.split('.')[:-1]) if pipeline_name == '': pipeline_name = [] else: pipeline_name = [pipeline_name] forbidden_attributes = set(['generated_by_parameter', 'generated_by_process']) traits_types = {str: traits.Str, six.text_type: traits.Str, int: traits.Int, float: traits.Float, list: traits.List} for out_name in outputs: in_name = '_switch_'.join((self.process.switch, out_name)) found = False for output, name in ((False, in_name), (True, out_name)): plug = self.process.plugs.get(name) if plug is None: continue if output: links = plug.links_to else: links = plug.links_from for link in links: node = link[2] if isinstance(node, Switch): # FIXME: just for now continue if link[0] == '': # link to the parent pipeline: don't call it to avoid # an infinite loop. # Either it will provide attributes by its own, either # we must not take them into account, so skip it. continue proc_name = '.'.join(pipeline_name + [link[0]]) completion_engine \ = ProcessCompletionEngine.get_completion_engine( node, name=proc_name) attributes = completion_engine.get_attribute_values() try: param_attributes \ = attributes.get_parameters_attributes()[link[1]] except Exception: continue if len(param_attributes) != 0 \ and len([x for x in param_attributes.keys() if x not in forbidden_attributes]) != 0: ea = EditableAttributes() for attribute, value in six.iteritems( param_attributes): if attribute not in forbidden_attributes: ttype = traits_types.get(type(value)) if ttype is not None: trait = ttype() else: trait = value ea.add_trait(attribute, ttype) # all attributes are optional by default ea.trait(attribute).optional = True setattr(ea, attribute, value) capsul_attributes.set_parameter_attributes( name, schema, ea, {}) found = True break if found: break if found: # propagate from input/output to other side ea = EditableAttributes() for attribute, value in six.iteritems( param_attributes): ttype = traits_types.get(type(value)) if ttype is not None: trait = ttype() else: trait = value ea.add_trait(attribute, ttype) # all attributes are optional by default ea.trait(attribute).optional = True setattr(ea, attribute, value) if output: capsul_attributes.set_parameter_attributes( in_name, schema, ea, {}) else: capsul_attributes.set_parameter_attributes( out_name, schema, ea, {}) self.install_switch_observer() return capsul_attributes
[docs] def install_switch_observer(self, observer=None): '''Setup a switch change observation, to remove parameters attributes when the switch state changes. Parameters ---------- observer: ProcessCompletionEngine instance The observer which should change attributes after switch change. If not specified, the observer is the switch completion engine (self). Notification will call the observer remove_attributes() method. ''' if observer is None: observer = self self.process.on_trait_change(observer.remove_attributes, 'switch')
[docs] def remove_switch_observer(self, observer=None): '''Remove notification previously set by install_switch_observer() ''' if observer is None: observer = self try: self.process.on_trait_change(observer.remove_attributes, 'switch', remove=True) except Exception: pass # probably already deleted object
[docs] class PathCompletionEngine(object): ''' Implements building of a single path from a set of attributes for a specific process / parameter '''
[docs] def attributes_to_path(self, process, parameter, attributes): ''' Build a path from attributes for a given parameter in a process. This method has to be specialized. The default implementation returns None. Parameters ---------- process: Node or Process instance parameter: str attributes: ProcessAttributes instance (Controller) ''' return None
[docs] def allowed_formats(self, process, parameter): ''' List of possible formats names associated with a parameter ''' return []
[docs] def allowed_extensions(self, process, parameter): ''' List of possible file extensions associated with a parameter ''' return []
[docs] class ProcessCompletionEngineFactory(object): ''' Get a :class:`ProcessCompletionEngine` instance ''' factory_id = 'basic'
[docs] def get_completion_engine(self, process, name=None): ''' Factory for ProcessCompletionEngine: get an ProcessCompletionEngine instance for a process in the context of a given StudyConfig. The study_config should specify which completion system(s) is (are) used (FOM, ...) If nothing is configured, a ProcessCompletionEngine base instance will be returned. It will not be able to perform completion at all, but will conform to the API. The base class implementation returns a base ProcessCompletionEngine instance, which is quite incomplete. ''' if hasattr(process, 'completion_engine'): return process.completion_engine if isinstance(process, Switch): return SwitchCompletionEngine(process, name=name) return ProcessCompletionEngine(process, name=name)
[docs] class PathCompletionEngineFactory(object): ''' Get a :class:`PathCompletionEngine` instance ''' factory_id = 'null' def get_path_completion_engine(self, process): raise RuntimeError('PathCompletionEngineFactory is pure virtual. ' 'It must be derived to do actual work.')