Source code for capsul.pipeline.pipeline_nodes

# -*- coding: utf-8 -*-
'''
Node classes for CAPSUL pipeline elements

Classes
=======
:class:`Plug`
-------------
:class:`Node`
-------------
:class:`ProcessNode`
--------------------
:class:`PipelineNode`
---------------------
:class:`Switch`
---------------
:class:`OptionalOutputSwitch`
-----------------------------
'''

from __future__ import print_function

# System import
from __future__ import absolute_import
import logging
import six
from six.moves import zip

# Define the logger
logger = logging.getLogger(__name__)

# Trait import
import traits.api as traits
from traits.api import Enum
from traits.api import Str
from traits.api import Bool
from traits.api import Any
from traits.api import Undefined
from traits.api import File
from traits.api import Directory
from traits.api import TraitError

# Capsul import
from soma.controller.trait_utils import trait_ids
from soma.controller.trait_utils import is_trait_pathname

# Soma import
from soma.controller import Controller
from soma.sorted_dictionary import SortedDictionary
from soma.utils.functiontools import SomaPartial
from soma.utils.weak_proxy import weak_proxy, get_ref

import os


[docs] class Plug(Controller): """ Overload of the traits in order to keep the pipeline memory. Attributes ---------- enabled : bool user parameter to control the plug activation activated : bool parameter describing the Plug status output : bool parameter to set the Plug type (input or output) optional : bool parameter to create an optional Plug has_default_value : bool indicate if a value is available for that plug even if its not linked links_to : set (node_name, plug_name, node, plug, is_weak) the successor plugs of this plug links_from : set (node_name, plug_name, node, plug, is_weak) the predecessor plugs of this plug """ enabled = Bool(default_value=True) activated = Bool(default_value=False) output = Bool(default_value=False) optional = Bool(default_value=False) def __init__(self, **kwargs): """ Generate a Plug, i.e. a trait with the memory of the pipeline adjacent nodes. """ super(Plug, self).__init__(**kwargs) # The links correspond to edges in the graph theory # links_to = successor # links_from = predecessor # A link is a tuple of the form (node, plug) self.links_to = set() self.links_from = set() # The has_default value flag can be set by setting a value for a # parameter in Pipeline.add_process self.has_default_value = False
[docs] class Node(Controller): """ Basic Node structure of the pipeline that need to be tuned. It is possible to define custom nodes inheriting Node. To be usable in all contexts (GUI construction, pipeline save / reload), custom nodes should define a few additional instance and class methods which will allow automatic systems (such as :func:`~capsul.study_config.get_node_instance`) to reinstantiate and save them: * configure_controller(cls): classmethod return a Controller instance which specifies parameters needed to build the node instance. Typically it may contain a parameters (plugs) list and other specifications. * configured_controller(self): instance method: on an instance, returns a Controller instance in the same shape as configure_controller above, but with values filled from the instance. This controller will allow saving parameters needed to instantiate again the node with the same state. * build_node(cls, pipeline, name, conf_controller): class method returns an instance of the node class, built using appropriate parameters (using configure_controller() or configured_controller() from another instance) Attributes ---------- name : str the node name full_name : str a unique name among all nodes and sub-nodes of the top level pipeline enabled : bool user parameter to control the node activation activated : bool parameter describing the node status Methods ------- connect set_callback_on_plug get_plug_value set_plug_value get_trait """ name = Str(hidden=True) enabled = Bool(default_value=True, hidden=True) activated = Bool(default_value=False, hidden=True) node_type = Enum(("processing_node", "view_node"), hidden=True) def __init__(self, pipeline, name, inputs, outputs): """ Generate a Node Parameters ---------- pipeline: Pipeline (mandatory) the pipeline object where the node is added name: str (mandatory) the node name inputs: list of dict (mandatory) a list of input parameters containing a dictionary with default values (mandatory key: name) outputs: dict (mandatory) a list of output parameters containing a dictionary with default values (mandatory key: name) """ super(Node, self).__init__() self.pipeline = weak_proxy(pipeline, self._pipeline_deleted) self.name = name self.plugs = SortedDictionary() self.invalid_plugs = set() # _callbacks -> (src_plug_name, dest_node, dest_plug_name) self._callbacks = {} # generate a list with all the inputs and outputs # the second parameter (parameter_type) is False for an input, # True for an output parameters = list(zip(inputs, [False, ] * len(inputs))) parameters.extend(list(zip(outputs, [True, ] * len(outputs)))) for parameter, parameter_type in parameters: # check if parameter is a dictionary as specified in the # docstring if isinstance(parameter, dict): # check if parameter contains a name item # as specified in the docstring if "name" not in parameter: raise Exception("Can't create parameter with unknown" "identifier and parameter {0}".format( parameter)) parameter = parameter.copy() plug_name = parameter.pop("name") # force the parameter type parameter["output"] = parameter_type # generate plug with input parameter and identifier name plug = Plug(**parameter) else: raise Exception("Can't create Node. Expect a dict structure " "to initialize the Node, " "got {0}: {1}".format(type(parameter), parameter)) # update plugs list self.plugs[plug_name] = plug # add an event on plug to validate the pipeline plug.on_trait_change(pipeline.update_nodes_and_plugs_activation, "enabled") # add an event on the Node instance traits to validate the pipeline self.on_trait_change(pipeline.update_nodes_and_plugs_activation, "enabled") @property def process(self): try: return get_ref(self._process) except AttributeError: raise AttributeError('%s object has no attribute process' % repr(self)) except ReferenceError: raise ReferenceError( 'The process underlying node %s, %s has been destroyed ' 'before the node that contains it.' % (self, self.name)) @process.setter def process(self, value): self._process = value @property def full_name(self): if self.pipeline.parent_pipeline: return self.pipeline.pipeline_node.full_name + '.' + self.name else: return self.name @staticmethod def _value_callback(self, source_plug_name, dest_node, dest_plug_name, value): """ Spread the source plug value to the destination plug. """ try: dest_node.set_plug_value( dest_plug_name, value, self.is_parameter_protected(source_plug_name)) except traits.TraitError: if isinstance(value, list) and len(value) == 1: # Nipype MultiObject, when a single object is involved, looks # like a single object but is actually a list. We want to # allow it to be linked to a "single object" plug. try: dest_node.set_plug_value( dest_plug_name, value[0], self.is_parameter_protected(source_plug_name)) except traits.TraitError: pass def _value_callback_with_logging( self, log_stream, prefix, source_plug_name, dest_node, dest_plug_name, value): """ Spread the source plug value to the destination plug, and log it in a stream for debugging. """ #print '(debug) value changed:', self, self.name, source_plug_name, dest_node, dest_plug_name, repr(value), ', stream:', log_stream, prefix plug = self.plugs.get(source_plug_name, None) if plug is None: return def _link_name(dest_node, plug, prefix, dest_plug_name, source_node_or_process): external = True sibling = False # check if it is an external link: if source is not a parent of dest if hasattr(source_node_or_process, 'process') \ and hasattr(source_node_or_process.process, 'nodes'): source_process = source_node_or_process source_node = source_node_or_process.process.pipeline_node children = [x for k, x in source_node.process.nodes.items() if x != ''] if dest_node in children: external = False # check if it is a sibling node: # if external and source is not in dest if external: sibling = True #print >> open('/tmp/linklog.txt', 'a'), 'check sibling, prefix:', prefix, 'source:', source_node_or_process, ', dest_plug_name:', dest_plug_name, 'dest_node:', dest_node, dest_node.name if hasattr(dest_node, 'process') \ and hasattr(dest_node.process, 'nodes'): children = [x for k, x in dest_node.process.nodes.items() if x != ''] if source_node_or_process in children: sibling = False else: children = [ x.process for x in children \ if hasattr(x, 'process')] if source_node_or_process in children: sibling = False #print 'sibling:', sibling if external: if sibling: name = '.'.join(prefix.split('.')[:-2] \ + [dest_node.name, dest_plug_name]) else: name = '.'.join(prefix.split('.')[:-2] + [dest_plug_name]) else: # internal connection in a (sub) pipeline name = prefix + dest_node.name if name != '' and not name.endswith('.'): name += '.' name += dest_plug_name return name dest_plug = dest_node.plugs[dest_plug_name] #print >> open('/tmp/linklog.txt', 'a'), 'link_name:', self, repr(self.name), ', prefix:', repr(prefix), ', source_plug_name:', source_plug_name, 'dest:', dest_plug, repr(dest_plug_name), 'dest node:', dest_node, repr(dest_node.name) print('value link:', \ 'from:', prefix + source_plug_name, \ 'to:', _link_name(dest_node, dest_plug, prefix, dest_plug_name, self), \ ', value:', repr(value), file=log_stream) #, 'self:', self, repr(self.name), ', prefix:',repr(prefix), ', source_plug_name:', source_plug_name, 'dest:', dest_plug, repr(dest_plug_name), 'dest node:', dest_node, repr(dest_node.name) log_stream.flush() # actually propagate dest_node.set_plug_value(dest_plug_name, value, self.is_parameter_protected(source_plug_name))
[docs] def connect(self, source_plug_name, dest_node, dest_plug_name): """ Connect linked plugs of two nodes Parameters ---------- source_plug_name: str (mandatory) the source plug name dest_node: Node (mandatory) the destination node dest_plug_name: str (mandatory) the destination plug name """ # add a callback to spread the source plug value value_callback = SomaPartial( self.__class__._value_callback, weak_proxy(self), source_plug_name, weak_proxy(dest_node), dest_plug_name) self._callbacks[(source_plug_name, dest_node, dest_plug_name)] = value_callback self.set_callback_on_plug(source_plug_name, value_callback)
[docs] def disconnect(self, source_plug_name, dest_node, dest_plug_name, silent=False): """ disconnect linked plugs of two nodes Parameters ---------- source_plug_name: str (mandatory) the source plug name dest_node: Node (mandatory) the destination node dest_plug_name: str (mandatory) the destination plug name silent: bool if False, do not fire an exception if the connection does not exust (perhaps already disconnected """ # remove the callback to spread the source plug value try: callback = self._callbacks.pop( (source_plug_name, dest_node, dest_plug_name)) self.remove_callback_from_plug(source_plug_name, callback) except Exception: if not silent: raise
def _pipeline_deleted(self, pipeline): self.cleanup()
[docs] def cleanup(self): """ cleanup before deletion disconnects all plugs, remove internal and cyclic references """ try: pipeline = get_ref(self.pipeline) except Exception: pipeline = None for plug_name, plug in self.plugs.items(): to_discard = [] for link in plug.links_from: link[2].disconnect(link[1], self, plug_name, silent=True) self.disconnect(plug_name, link[2], link[1], silent=True) link[3].links_to.discard((self.name, plug_name, self, plug, True)) to_discard.append(link) for link in to_discard: plug.links_from.discard(link) to_discard = [] for link in plug.links_to: self.disconnect(plug_name, link[2], link[1], silent=True) link[2].disconnect(link[1], self, plug_name, silent=True) to_discard.append(link) link[3].links_from.discard((self.name, plug_name, self, plug, False)) if pipeline: plug.on_trait_change( pipeline.update_nodes_and_plugs_activation, remove=True) if pipeline: self.on_trait_change(pipeline.update_nodes_and_plugs_activation, remove=True) self._callbacks = {} self.pipeline = None self.plugs = {}
def __getstate__(self): """ Remove the callbacks from the default __getstate__ result because they prevent Node instance from being used with pickle. Also remove the _weakref attribute eventually set by soma.utils.weak_proxy because it prevent Process instance from being used with pickle. """ state = super(Node, self).__getstate__() state['_callbacks'] = [(c[0], get_ref(c[1]), c[2]) for c in state['_callbacks'].keys()] #state['pipeline'] = get_ref(state['pipeline']) state.pop('_weakref', None) state = {k: get_ref(v) for k, v in state.items()} return state def __setstate__(self, state): """ Restore the callbacks that have been removed by __getstate__. """ state['_callbacks'] = dict((i, SomaPartial(self._value_callback, *i)) for i in state['_callbacks']) if state['pipeline'] is state['process']: state['pipeline'] = state['process'] = weak_proxy(state['pipeline']) else: state['pipeline'] = weak_proxy(state['pipeline']) super(Node, self).__setstate__(state) for callback_key, value_callback in six.iteritems(self._callbacks): self.set_callback_on_plug(callback_key[0], value_callback)
[docs] def set_callback_on_plug(self, plug_name, callback): """ Add an event when a plug change Parameters ---------- plug_name: str (mandatory) a plug name callback: @f (mandatory) a callback function """ self.on_trait_change(callback, plug_name)
[docs] def remove_callback_from_plug(self, plug_name, callback): """ Remove an event when a plug change Parameters ---------- plug_name: str (mandatory) a plug name callback: @f (mandatory) a callback function """ self.on_trait_change(callback, plug_name, remove=True)
[docs] def get_plug_value(self, plug_name): """ Return the plug value Parameters ---------- plug_name: str (mandatory) a plug name Returns ------- output: object the plug value """ return getattr(self, plug_name)
[docs] def set_plug_value(self, plug_name, value, protected=None): """ Set the plug value Parameters ---------- plug_name: str (mandatory) a plug name value: object (mandatory) the plug value we want to set protected: None or bool (tristate) if True or False, force the "protected" status of the plug. If None, keep it as is. """ if protected is not None: self.protect_parameter(plug_name, protected) setattr(self, plug_name, value)
[docs] def get_trait(self, trait_name): """ Return the desired trait Parameters ---------- trait_name: str (mandatory) a trait name Returns ------- output: trait the trait named trait_name """ return self.trait(trait_name)
[docs] def get_connections_through(self, plug_name, single=False): """ If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the "other side" of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is "opaque" (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns []. Parameters ---------- plug_name: str plug to get connections with single: bool if True, stop at the first connected plug. Otherwise return the list of all connected plugs. Returns ------- connected_plug; list of tuples [(node, plug_name, plug), ...] Returns [(self, plug_name, plug)] when the plug has no internal connection. """ if not self.activated or not self.enabled: return [] else: return [(self, plug_name, self.plugs[plug_name])]
[docs] def is_job(self): """ if True, the node will be represented as a Job in :somaworkflow:`Soma-Workflow <index.html>`. Otherwise the node is static and does not run. """ return hasattr(self, 'build_job')
[docs] def requirements(self): ''' Requirements needed to run the node. It is a dictionary which keys are config/settings modules and values are requests for them. The default implementation returns an empty dict (no requirements), and should be overloaded by processes which actually have requirements. Ex:: {'spm': 'version >= "12" and standalone == "True"') ''' return {}
[docs] def check_requirements(self, environment='global', message_list=None): ''' Checks the process requirements against configuration settings values in the attached CapsulEngine. This makes use of the :meth:`requirements` method and checks that there is one matching config value for each required module. Parameters ---------- environment: str config environment id. Normally corresponds to the computing resource name, and defaults to "global". message_list: list if not None, this list will be updated with messages for unsatisfied requirements, in order to present the user with an understandable error. Returns ------- config: dict, list, or None if None is returned, requirements are not met: the process cannot run. If a dict is returned, it corresponds to the matching config values. When no requirements are needed, an empty dict is returned. A pipeline, if its requirements are met will return a list of configuration values, because different nodes may require different config values. ''' capsul_engine = self.get_study_config().engine settings = capsul_engine.settings req = self.requirements() config = settings.select_configurations(environment, uses=req) success = True for module in req: module_name = settings.module_name(module) if module_name not in config: if message_list is not None: message_list.append('requirement 2: %s is not met in %s' % (req, self.name)) else: # if no message is expected, then we can return immediately # without checking further requirements. Otherwise we # continue to get a full list of unsatisfied requirements. return None success = False return None if success: return config else: return None
[docs] def get_missing_mandatory_parameters(self, exclude_links=False): ''' Returns a list of parameters which are not optional, and which value is Undefined or None, or an empty string for a File or Directory parameter. Parameters ---------- exclude_links: bool if True, an empty parameter which has a link to another node will not be reported missing, since the execution will assign it a temporary value which will not prevent the pipeline from running. ''' def check_trait(node, plug, trait, value, exclude_links): if trait.optional: return True if hasattr(trait, 'inner_traits') and len(trait.inner_traits) != 0: if value is Undefined: return bool(trait.output) for i, item in enumerate(value): j = min(i, len(trait.inner_traits) - 1) if not check_trait(node, plug, trait.inner_traits[j], item, exclude_links): return False return True if isinstance(trait.trait_type, (File, Directory)): if value not in (Undefined, None, '') \ or (trait.output and trait.input_filename is not False): return True if not exclude_links: return False if trait.output: links = plug.links_to else: links = plug.links_from # check if there is a connection not going outside the # current pipeline end = [l for l in links if l[0] != ''] if len(end) != 0: return True # it's connected. # otherwise check if there is a connection outside the # current pipeline end = [l for l in links if l[0] == ''] for link in end: p = link[2].plugs[link[1]] if trait.output: relinks = p.links_to else: relinks = p.links_from if relinks: return True # it's connected. return False # no other connection. return trait.output or value not in (Undefined, None) missing = [] for name, plug in six.iteritems(self.plugs): trait = self.get_trait(name) if not trait.optional: value = self.get_plug_value(name) if not check_trait(self, plug, trait, value, exclude_links): missing.append(name) return missing
[docs] def get_study_config(self): ''' Get (or create) the StudyConfig this process belongs to ''' study_config = getattr(self, 'study_config', None) if study_config is None: # Import cannot be done on module due to circular dependencies from capsul.study_config.study_config import default_study_config self.study_config = default_study_config() return self.study_config
[docs] def set_study_config(self, study_config): ''' Set the StudyConfig this process belongs to ''' self.study_config = study_config
[docs] class ProcessNode(Node): """ Process node. Attributes ---------- process : process instance the process instance stored in the pipeline node Methods ------- set_callback_on_plug get_plug_value set_plug_value get_trait """ def __init__(self, pipeline, name, process, **kwargs): """ Generate a ProcessNode Parameters ---------- pipeline: Pipeline (mandatory) the pipeline object where the node is added. name: str (mandatory) the node name. process: instance a process/interface instance. kwargs: dict process default values. """ if process is pipeline: self.process = weak_proxy(process, self._process_deleted) else: self.process = process self.kwargs = kwargs inputs = [] outputs = [] for parameter, trait in six.iteritems(self.process.user_traits()): if parameter in ('nodes_activation', 'selection_changed'): continue if trait.output: outputs.append(dict(name=parameter, optional=bool(trait.optional), output=True)) else: inputs.append(dict(name=parameter, optional=bool(trait.optional or parameter in kwargs))) super(ProcessNode, self).__init__(pipeline, name, inputs, outputs)
[docs] def set_callback_on_plug(self, plug_name, callback): """ Add an event when a plug change Parameters ---------- plug_name: str (mandatory) a plug name callback: @f (mandatory) a callback function """ self.process.on_trait_change(callback, plug_name)
[docs] def remove_callback_from_plug(self, plug_name, callback): """ Remove an event when a plug change Parameters ---------- plug_name: str (mandatory) a plug name callback: @f (mandatory) a callback function """ try: self.process.on_trait_change(callback, plug_name, remove=True) except ReferenceError: pass # process is deleted, just go on
[docs] def get_plug_value(self, plug_name): """ Return the plug value Parameters ---------- plug_name: str (mandatory) a plug name Returns ------- output: object the plug value """ if not isinstance(self.get_trait(plug_name).handler, traits.Event): try: return getattr(self.process, plug_name) except TraitError: return Undefined else: return None
[docs] def set_plug_value(self, plug_name, value, protected=None): """ Set the plug value Parameters ---------- plug_name: str (mandatory) a plug name value: object (mandatory) the plug value we want to set protected: None or bool (tristate) if True or False, force the "protected" status of the plug. If None, keep it as is. """ if value in ["<undefined>"]: value = Undefined elif is_trait_pathname(self.process.trait(plug_name)) and value is None: value = Undefined self.process.set_parameter(plug_name, value, protected)
[docs] def is_parameter_protected(self, plug_name): return self.process.is_parameter_protected(plug_name)
[docs] def protect_parameter(self, plug_name, state=True): self.process.protect_parameter(plug_name, state)
[docs] def get_trait(self, trait_name): """ Return the desired trait Parameters ---------- trait_name: str (mandatory) a trait name Returns ------- output: trait the trait named trait_name """ return self.process.trait(trait_name)
[docs] def is_job(self): return True
[docs] def requirements(self): ''' Requirements reimplementation for a process node. This node delegates to its underlying process. see :meth:`capsul.process.process.requirements` ''' return self.process.requirements()
[docs] def check_requirements(self, environment='global', message_list=None): ''' Reimplementation of :meth:`capsul.pipeline.pipeline_nodes.Node.requirements` for a ProcessNode. This one delegates to its underlying process (or pipeline). .. see:: :meth:`capsul.process.process.check_requirements` ''' return self.process.check_requirements(environment, message_list=message_list)
[docs] def get_study_config(self): ''' Get (or create) the StudyConfig this process belongs to ''' return self.process.get_study_config()
[docs] def set_study_config(self, study_config): ''' Get (or create) the StudyConfig this process belongs to ''' return self.process.set_study_config(study_config)
def _process_deleted(self, process): self.cleanup() @property def study_config(self): try: return self.process.study_config except ReferenceError: return None @study_config.setter def study_config(self, value): try: self.process.study_config = value except ReferenceError: pass @study_config.deleter def study_config(self): try: del self.process.study_config except ReferenceError: pass @property def completion_engine(self): try: return self.process.completion_engine except ReferenceError: return None @completion_engine.setter def completion_engine(self, value): try: self.process.completion_engine = value if value is not None: # move the completion engine process to node level # in order to allow access to nodes links value.process = weak_proxy(self, value._clear_node) except ReferenceError: pass @completion_engine.deleter def completion_engine(self): try: del self.process.completion_engine except ReferenceError: pass
[docs] class PipelineNode(ProcessNode): """ A special node to store the pipeline user-parameters """
[docs] def get_connections_through(self, plug_name, single=False): if not self.activated or not self.enabled: return [] plug = self.plugs[plug_name] if plug.output: links = plug.links_from else: links = plug.links_to dest_plugs = [] for link in links: done = False if not link[2].activated or not link[2].enabled: continue # skip disabled nodes if link[2] is self: # other side of the pipeline if link[3].output: more_links = link[3].links_to else: more_links = link[3].links_from if not more_links: # going outside the pipeline which seems to be top-level: # keep it dest_plugs.append((link[2], link[1], link[3])) if single: done = True for other_link in more_links: other_end = other_link[2].get_connections_through( other_link[1], single) dest_plugs += other_end if other_end and single: done = True break else: other_end = link[2].get_connections_through(link[1], single) dest_plugs += other_end if other_end and single: done = True if done: break return dest_plugs
[docs] class Switch(Node): """ Switch node to select a specific Process. A switch commutes a group of inputs to its outputs, according to its "switch" trait value. Each group may be typically linked to a different process. Processes not "selected" by the switch are disabled, if possible. Values are also propagated through inputs/outputs of the switch (see below). Inputs / outputs: Say the switch "my_switch" has 2 outputs, "param1" and "param2". It will be connected to the outputs of 2 processing nodes, "node1" and "node2", both having 2 outputs: node1.out1, node1.out2, node2.out1, node2.out2. The switch will thus have 4 entries, in 2 groups, named for instance "node1" and "node2". The switch will link the outputs of node1 or node2 to its outputs. The switch inputs will be named as follows: * 1st group: "node1_switch_param1", "node1_switch_param2" * 2nd group: "node2_switch_param1", "node2_switch_param2" * When my_switch.switch value is "node1", my_switch.node1_switch_param1 is connected to my_switch.param1 and my_switch.node1_switch_param2 is connected to my_switch.param2. The processing node node2 is disabled (unselected). * When my_switch.switch value is "node2", my_switch.node2_switch_param1 is connected to my_switch.param1 and my_switch.node2_switch_param2 is connected to my_switch.param2. The processing node node1 is disabled (unselected). Values propagation: * When a switch is activated (its switch parameter is changed), the outputs will reflect the selected inputs, which means their values will be the same as the corresponding inputs. * But in many cases, parameters values will be given from the output (if the switch output is one of the pipeline outputs, this one will be visible from the "outside world, not the switch inputs). In this case, values set as a switch input propagate to its inputs. * An exception is when a switch input is linked to the parent pipeline inputs: its value is also visible from "outside" and should not be set via output values via the switch. In this specific case, output values are not propagated to such inputs. Notes ----- Switch is normally not instantiated directly, but from a pipeline :py:meth:`pipeline_definition <capsul.pipeline.pipeline.Pipeline.pipeline_definition>` method Attributes ---------- _switch_values : list the switch options _outputs: list the switch output parameters See Also -------- _switch_changed _anytrait_changed capsul.pipeline.pipeline.Pipeline.add_switch capsul.pipeline.pipeline.Pipeline.pipeline_definition """ _doc_path = 'api/pipeline.html#capsul.pipeline.pipeline_nodes.Switch' def __init__(self, pipeline, name, inputs, outputs, make_optional=(), output_types=None): """ Generate a Switch Node Warnings -------- The input plug names are built according to the following rule: <input_name>_switch_<output_name> Parameters ---------- pipeline: Pipeline (mandatory) the pipeline object where the node is added name: str (mandatory) the switch node name inputs: list (mandatory) a list of options outputs: list (mandatory) a list of output parameters make_optional: sequence (optional) list of optional outputs. These outputs will be made optional in the switch output. By default they are mandatory. output_types: sequence of traits (optional) If given, this sequence should have the same size as outputs. It will specify each switch output parameter type (as a standard trait). Input parameters for each input block will also have this type. """ # if the user pass a simple element, create a list and add this # element #super(Node, self).__init__() self.__block_output_propagation = False if not isinstance(outputs, list): outputs = [outputs, ] if output_types is not None: if not isinstance(output_types, list) \ and not isinstance(output_types, tuple): raise ValueError( 'output_types parameter should be a list or tuple') if len(output_types) != len(outputs): raise ValueError('output_types should have the same number of ' 'elements as outputs') else: output_types = [Any(Undefined)] * len(outputs) # check consistency if not isinstance(inputs, list) or not isinstance(outputs, list): raise Exception("The Switch node input and output parameters " "are inconsistent: expect list, " "got {0}, {1}".format(type(inputs), type(outputs))) # private copy of outputs and inputs self._outputs = outputs self._switch_values = inputs # format inputs and outputs to inherit from Node class flat_inputs = [] for switch_name in inputs: flat_inputs.extend(["{0}_switch_{1}".format(switch_name, plug_name) for plug_name in outputs]) node_inputs = ([dict(name="switch"), ] + [dict(name=i, optional=True) for i in flat_inputs]) node_outputs = [dict(name=i, optional=(i in make_optional)) for i in outputs] # inherit from Node class super(Switch, self).__init__(pipeline, name, node_inputs, node_outputs) for node in node_inputs[1:]: plug = self.plugs[node["name"]] plug.enabled = False # add switch enum trait to select the process self.add_trait("switch", Enum(output=False, *inputs)) # add a trait for each input and each output input_types = output_types * len(inputs) for i, trait in zip(flat_inputs, input_types): self.add_trait(i, trait) self.trait(i).output = False self.trait(i).optional = self.plugs[i].optional for i, trait in zip(outputs, output_types): self.add_trait(i, trait) self.trait(i).output = True self.trait(i).optional = self.plugs[i].optional # activate the switch first Process self._switch_changed(self._switch_values[0], self._switch_values[0]) def _switch_changed(self, old_selection, new_selection): """ Add an event to the switch trait that enables us to select the desired option. Parameters ---------- old_selection: str (mandatory) the old option new_selection: str (mandatory) the new option """ self.__block_output_propagation = True self.pipeline.delay_update_nodes_and_plugs_activation() # deactivate the plugs associated with the old option old_plug_names = ["{0}_switch_{1}".format(old_selection, plug_name) for plug_name in self._outputs] for plug_name in old_plug_names: self.plugs[plug_name].enabled = False # activate the plugs associated with the new option new_plug_names = ["{0}_switch_{1}".format(new_selection, plug_name) for plug_name in self._outputs] for plug_name in new_plug_names: self.plugs[plug_name].enabled = True # refresh the pipeline self.pipeline.update_nodes_and_plugs_activation() # Refresh the links to the output plugs for output_plug_name in self._outputs: # Get the associated input name corresponding_input_plug_name = "{0}_switch_{1}".format( new_selection, output_plug_name) # Update the output value setattr(self, output_plug_name, getattr(self, corresponding_input_plug_name)) # Propagate the associated trait description out_trait = self.trait(output_plug_name) in_trait = self.trait(corresponding_input_plug_name) out_trait.desc = in_trait.desc self.pipeline.restore_update_nodes_and_plugs_activation() self.__block_output_propagation = False
[docs] def connections(self): """ Returns the current internal connections between input and output plugs Returns ------- connections: list list of internal connections [(input_plug_name, output_plug_name), ...] """ return [('{0}_switch_{1}'.format(self.switch, plug_name), plug_name) for plug_name in self._outputs]
def _anytrait_changed(self, name, old, new): """ Add an event to the switch trait that enables us to select the desired option. Propagates value through the switch, from in put to output if the switch state corresponds to this input, or from output to inputs. Parameters ---------- name: str (mandatory) the trait name old: str (mandatory) the old value new: str (mandatory) the new value """ # if the value change is on an output of the switch, and comes from # an "external" assignment (ie not the result of switch action or # change in one of its inputs), then propagate the new value to # all corresponding inputs. # However those inputs which are connected to a pipeline input are # not propagated, to avoid cyclic feedback between outputs and inputs # inside a pipeline if hasattr(self, '_outputs') and not self.__block_output_propagation \ and name in self._outputs: self.__block_output_propagation = True flat_inputs = ["{0}_switch_{1}".format(switch_name, name) for switch_name in self._switch_values] for input_name in flat_inputs: # check if input is connected to a pipeline input plug = self.plugs[input_name] for link_spec in plug.links_from: if isinstance(link_spec[2], PipelineNode) \ and not link_spec[3].output: break else: setattr(self, input_name, new) self.__block_output_propagation = False # if the change is in an input, change the corresponding output # accordingly, if the current switch selection is on this input. spliter = name.split("_switch_") if len(spliter) == 2 and spliter[0] in self._switch_values: switch_selection, output_plug_name = spliter if self.switch == switch_selection: self.__block_output_propagation = True setattr(self, output_plug_name, new) self.__block_output_propagation = False def __setstate__(self, state): self.__block_output_propagation = True super(Switch, self).__setstate__(state)
[docs] def get_connections_through(self, plug_name, single=False): if not self.activated or not self.enabled: return [] plug = self.plugs[plug_name] if plug.output: connected_plug_name = '%s_switch_%s' % (self.switch, plug_name) else: splitter = plug_name.split("_switch_") if len(splitter) != 2: # not a switch input plug return [] connected_plug_name = splitter[1] connected_plug = self.plugs[connected_plug_name] if plug.output: links = connected_plug.links_from else: links = connected_plug.links_to dest_plugs = [] for link in links: if link[2] is self.pipeline.pipeline_node: other_end = [(link[2], link[1], link[3])] else: other_end = link[2].get_connections_through(link[1], single) dest_plugs += other_end if other_end and single: break return dest_plugs
[docs] def is_job(self): return False
def get_switch_inputs(self): inputs = [] for plug, trait in self.user_traits().items(): if trait.output: continue ps = plug.split('_switch_') if len(ps) == 2 and ps[1] in self.user_traits() \ and self.trait(ps[1]).output: inputs.append(ps[0]) return inputs @classmethod def configure_controller(cls): c = Controller() c.add_trait('inputs', traits.List(traits.Str)) c.add_trait('outputs', traits.List(traits.Str)) c.add_trait('optional_params', traits.List(traits.Str)) c.add_trait('output_types', traits.List(traits.Str)) c.inputs = ['input_1', 'input_2'] c.outputs = ['output'] c.output_types = ['Any'] return c def configured_controller(self): c = self.configure_controller() c.outputs = [plug for plug, trait in self.user_traits().items() if trait.output] c.inputs = self.get_switch_inputs() c.output_types = [self.trait(p).trait_type.__class__.__name__ for p in self.outputs] c.optional_params = [self.trait(p).optional for p in self.inputs] return c @classmethod def build_node(cls, pipeline, name, conf_controller): node = Switch(pipeline, name, conf_controller.inputs, conf_controller.outputs, make_optional=conf_controller.optional_params, output_types=conf_controller.output_types) return node
[docs] class OptionalOutputSwitch(Switch): ''' A switch which activates or disables its input/output link according to the output value. If the output value is not None or Undefined, the link is active, otherwise it is inactive. This kind of switch is meant to make a pipeline output optional, but still available for temporary files values inside the pipeline. Ex: A.output -> B.input B.input is mandatory, but we want to make A.output available and optional in the pipeline outputs. If we directlty export A.output, then if the pipeline does not set a value, B.input will be empty and the pipeline run will fail. Instead we can add an OptionalOutputSwitch between A.output and pipeline.output. If pipeline.output is set a valid value, then A.output and B.input will have the same valid value. If pipeline.output is left Undefined, then A.output and B.input will get a temporary value during the run. Technically, the OptionalOutputSwitch is currently implemented as a specialized switch node with two inputs and one output, and thus follows the inputs naming rules. The first input is the defined one, and the second, hidden one, is named "_none". As a consequence, its 1st input should be connected under the name "<input>_switch_<output> as in a standard switch. The "switch" input is hidden (not exported to the pipeline) and set automatically according to the output value. The implementation details may change in future versions. ''' _doc_path = 'api/pipeline.html' \ '#capsul.pipeline.pipeline_nodes.OptionalOutputSwitch' def __init__(self, pipeline, name, input, output): """ Generate an OptionalOutputSwitch Node Warnings -------- The input plug name is built according to the following rule: <input>_switch_<output> Parameters ---------- pipeline: Pipeline (mandatory) the pipeline object where the node is added name: str (mandatory) the switch node name input: str (mandatory) option output: str (mandatory) output parameter """ super(OptionalOutputSwitch, self).__init__( pipeline, name, [input, '_none'], [output], [output]) self.trait('switch').optional = True self.plugs['switch'].optional = True self.switch = '_none' pipeline.do_not_export.add((name, 'switch')) none_input = '_none_switch_%s' % output pipeline.do_not_export.add((name, none_input)) # hide internal machinery plugs self.trait('switch').hidden = True self.plugs['switch'].hidden = True self.trait(none_input).hidden = True self.plugs[none_input].hidden = True def _anytrait_changed(self, name, old, new): """ Add an event to the switch trait that enables us to select the desired option. Propagates value through the switch, from output to input. Parameters ---------- name: str (mandatory) the trait name old: str (mandatory) the old value new: str (mandatory) the new value """ # if the value change is on an output of the switch, and comes from # an "external" assignment (ie not the result of switch action or # change in one of its inputs), then propagate the new value to # all corresponding inputs. # However those inputs which are connected to a pipeline input are # not propagated, to avoid cyclic feedback between outputs and inputs # inside a pipeline if name == 'trait_added': return if hasattr(self, '_outputs') \ and not self._Switch__block_output_propagation \ and name in self._outputs: self._Switch__block_output_propagation = True # change the switch value according to the output value if new in (None, Undefined): self.switch = '_none' else: self.switch = self._switch_values[0] flat_inputs = ["{0}_switch_{1}".format(switch_name, name) for switch_name in self._switch_values] for input_name in flat_inputs: # check if input is connected to a pipeline input plug = self.plugs[input_name] for link_spec in plug.links_from: if isinstance(link_spec[2], PipelineNode) \ and not link_spec[3].output: break else: setattr(self, input_name, new) self._Switch__block_output_propagation = False # if the change is in an input, change the corresponding output # accordingly, if the current switch selection is on this input. spliter = name.split("_switch_") if len(spliter) == 2 and spliter[0] in self._switch_values: switch_selection, output_plug_name = spliter if self.switch == switch_selection: self._Switch__block_output_propagation = True setattr(self, output_plug_name, new) self._Switch__block_output_propagation = False @classmethod def configure_controller(cls): c = Controller() c.add_trait('input', traits.Str) c.add_trait('output', traits.Str) c.input = 'input' c.output = 'output' return c def configured_controller(self): c = self.configure_controller() c.output = [plug for plug, trait in self.user_traits().items() if trait.output][0] c.input = self.get_switch_inputs()[0] return c @classmethod def build_node(cls, pipeline, name, conf_controller): node = OptionalOutputSwitch(pipeline, name, conf_controller.input, conf_controller.output) return node