capsul.pipeline module

Inheritance diagram of capsul.pipeline, capsul.pipeline.pipeline, capsul.pipeline.pipeline_construction, capsul.pipeline.pipeline_nodes, capsul.pipeline.pipeline_tools, capsul.pipeline.pipeline_workflow, capsul.pipeline.process_iteration, capsul.pipeline.python_export, capsul.pipeline.topological_sort, capsul.pipeline.xml, capsul.pipeline.custom_nodes, capsul.pipeline.custom_nodes.strcat_node, capsul.pipeline.custom_nodes.cv_node, capsul.pipeline.custom_nodes.loo_node, capsul.pipeline.custom_nodes.map_node, capsul.pipeline.custom_nodes.reduce_node

Pipelining elements for CAPSUL

capsul.pipeline.pipeline submodule

Pipeline main class module

Classes

Pipeline

class capsul.pipeline.pipeline.Pipeline(autoexport_nodes_parameters=None, **kwargs)[source]

Pipeline containing Process nodes, and links between node parameters.

A Pipeline is normally subclassed, and its pipeline_definition() method is overloaded to define its nodes and links. pipeline_definition() will be called by the pipeline constructor.

from capsul.pipeline import Pipeline

class MyPipeline(Pipeline):

  def pipeline_definition(self):
      self.add_process('proc1', 'my_toolbox.my_process1')
      self.add_process('proc2', 'my_toolbox.my_process2')
      self.add_switch('main_switch', ['in1', 'in2'], ['out1', 'out2'])
      self.add_link('proc1.out1->main_switch.in1_switch_out1')
      self.add_link('proc1.out2->main_switch.in1_switch_out2')
      self.add_link('proc2.out1->main_switch.in2_switch_out1')
      self.add_link('proc2.out1->main_switch.in2_switch_out2')

After execution of pipeline_definition(), the inner nodes parameters which are not connected will be automatically exported to the parent pipeline, with names prefixed with their process name, unless they are listed in a special “do_not_export” list (passed to add_process() or stored in the pipeline instance).

>>> pipeline = MyPipeline()
>>> print(pipeline.proc1_input)
<undefined>

Nodes

A pipeline is made of nodes, and links between their parameters. Several types of nodes may be part of a pipeline:

  • process nodes (pipeline_nodes.ProcessNode) are the leaf nodes which represent actual processing bricks.

  • pipeline nodes (pipeline_nodes.PipelineNode) are sub-pipelines which allow to reuse an existing pipeline within another one

  • switch nodes (pipeline_nodes.Switch) allows to select values between several possible inputs. The switch mechanism also allows to select between several alternative processes or processing branches.

  • iterative process (:py:class:process_iteration.ProcessIteration`) represent parallel processing of the same pipeline on a set of parameters.

Note that you normally do not instantiate these nodes explicitly when building a pipeline. Rather programmers may call the add_process(), add_switch(), add_iterative_process() methods.

Nodes activation

Pipeline nodes may be enabled or disabled. Disabling a node will trigger a global pipeline nodes activation step, where all nodes which do not form a complete chain will be inactive. This way a branch may be disabled by disabling one of its nodes. This process is used by the switch system, which allows to select one processing branch between several, and disables the unselected ones.

Pipeline steps

Pipelines may define execution steps: they are user-oriented groups of nodes that are to be run together, or disabled together for runtime execution. They are intended to allow partial, or step-by-step execution. They do not work like the nodes enabling mechanism described above.

Steps may be defined within the pipeline_definition() method. See add_pipeline_step().

Note also that pipeline steps only act at the highest level: if a sub-pipeline has disabled steps, they will not be taken into account in the higher level pipeline execution, because executing by steps a part of a sub-pipeline within the context of a higher one does generally not make sense.

Main methods

nodes

a dictionary containing the pipeline nodes and where the pipeline node name is ‘’

Type:

dict {node_name: node}

workflow_list

a list of ordered nodes that can be executed

Type:

list

workflow_repr

a string representation of the workflow list <node_i>-><node_i+1>

Type:

str

Note

  • Type ‘Pipeline.help()’ for a full description of this process parameters.

  • Type ‘<Pipeline>.get_input_spec()’ for a full description of this process input trait types.

  • Type ‘<Pipeline>.get_output_spec()’ for a full description of this process output trait types.

Initialize the Pipeline class

Parameters:

autoexport_nodes_parameters (bool) – if True (default) nodes containing pipeline plugs are automatically exported.

add_custom_node(name, node_type, parameters=None, make_optional=(), do_not_export=None, **kwargs)[source]

Inserts a custom node (Node subclass instance which is not a Process) in the pipeline.

Parameters:
  • node_type (str or Node subclass or Node instance) – node type to be built. Either a class (Node subclass) or a Node instance (the node will be re-instantiated), or a string describing a module and class.

  • parameters (dict or Controller or None) – configuration dict or Controller defining parameters needed to build the node. The controller should be obtained using the node class’s configure_node() static method, then filled with the desired values. If not given the node is supposed to be built with no parameters, which will not work for every node type.

  • make_optional (list or tuple) – parameters names to be made optional

  • do_not_export (list of str (optional)) – a list of plug names that we do not want to export.

  • kwargs (default values of node parameters) –

add_iterative_process(name, process, iterative_plugs=None, do_not_export=None, make_optional=None, inputs_to_copy=None, inputs_to_clean=None, **kwargs)[source]

Add a new iterative node in the pipeline.

Parameters:
  • name (str (mandatory)) – the node name (has to be unique).

  • process (Process or str (mandatory)) – the process we want to add.

  • iterative_plugs (list of str (optional)) – a list of plug names on which we want to iterate. If None, all plugs of the process will be iterated.

  • do_not_export (list of str (optional)) – a list of plug names that we do not want to export.

  • make_optional (list of str (optional)) – a list of plug names that we do not want to export.

  • inputs_to_copy (list of str (optional)) – a list of item to copy.

  • inputs_to_clean (list of str (optional)) – a list of temporary items.

Add a link between pipeline nodes.

If the destination node is a switch, force the source plug to be not optional.

Parameters:
  • link (str or list/tuple) – link description. Its shape should be: “node.output->other_node.input”. If no node is specified, the pipeline itself is assumed. Alternatively the link can be (source_node, source_plug_name, dest_node, dest_plug_name)

  • weak_link (bool) – this property is used when nodes are optional, the plug information may not be generated.

  • allow_export (bool) – if True, if the link links from/to the pipeline node with a plug name which doesn’t exist, the plug will be created, and the function will act exactly like export_parameter. This may be a more convenient way of exporting/connecting pipeline plugs to several nodes without having to export the first one, then link the others.

  • value (any) – if given, set this value instead of the source plug value

add_optional_output_switch(name, input, output=None)[source]

Add an optional output switch node in the pipeline

An optional switch activates or disables its input/output link according to the output value. If the output value is not None or Undefined, the link is active, otherwise it is inactive.

This kind of switch is meant to make a pipeline output optional, but still available for temporary files values inside the pipeline.

Ex:

A.output -> B.input

B.input is mandatory, but we want to make A.output available and optional in the pipeline outputs. If we directlty export A.output, then if the pipeline does not set a value, B.input will be empty and the pipeline run will fail.

Instead we can add an OptionalOutputSwitch between A.output and pipeline.output. If pipeline.output is set a valid value, then A.output and B.input will have the same valid value. If pipeline.output is left Undefined, then A.output and B.input will get a temporary value during the run.

Add an optional output switch node in the pipeline

Parameters:
  • name (str (mandatory)) – name for the switch node (has to be unique)

  • input (str (mandatory)) – name for switch input. Switch activation will select between it and a hidden input, “_none”. Inputs names will actually be a combination of input and output, in the shape “input_switch_output”.

  • output (str (optional)) – name for output. Default is the switch name

Examples

>>> pipeline.add_optional_output_switch('out1', 'in1')
>>> pipeline.add_link('node1.output->out1.in1_switch_out1')
add_pipeline_step(step_name, nodes, enabled=True)[source]

Add a step definition to the pipeline (see also define_steps).

Steps are groups of pipeline nodes, which may be disabled at runtime. They are normally defined in a logical order regarding the workflow streams. They are different from pipelines in that steps are purely virtual groups, they do not have parameters.

Disabling a step acts differently as the pipeline node activation: other nodes are not inactivated according to their dependencies. Instead, those steps are not run.

Parameters:
  • step_name (string (mandatory)) – name of the new step

  • nodes (list or sequence) – nodes contained in the step (Node instances)

  • enabled (bool (optional)) – initial state of the step

add_process(name, process, do_not_export=None, make_optional=None, inputs_to_copy=None, inputs_to_clean=None, skip_invalid=False, **kwargs)[source]

Add a new node in the pipeline

Note about invalid nodes:

A pipeline can typically offer alternatives (through a switch) to different algorithmic nodes, which may have different dependencies, or may be provided through external modules, thus can be missing. To handle this, Capsul can be telled that a process node can be invalid (or missing) without otherwise interfering the rest of the pipeline. This is done using the “skip_invalid” option. When used, the process to be added is tested, and if its instantiation fails, it will not be added in the pipeline, but will not trigger an error. Instead the missing node will be marked as “allowed invalid”, and links and exports built using this node will silently do nothing. thus the pipeline will work normally, without the invalid node.

Such nodes are generally gathered through a switch mechanism. However the switch inputs should be restricted to actually available nodes. The recommended method is to check that nodes have actually been added in the pipeline. Then links can be made normally as if the nodes were all present:

self.add_process('method1', 'module1.Module1', skip_invalid=True)
self.add_process('method2', 'module2.Module2', skip_invalid=True)
self.add_process('method3', 'module3.Module3', skip_invalid=True)

input_params = [n for n in ['method1', 'method2', 'method3']
                if n in self.nodes]
self.add_switch('select_method', input_params, 'output')

self.add_link('method1.input->select_method.method1_switch_output')
self.add_link('method2.input->select_method.method2_switch_output')
self.add_link('method3.input->select_method.method3_switch_output')

A last note about invalid nodes:

When saving a pipeline (through the graphical editor typically), missing nodes will not be saved because they are not actually in the pipeline. So be careful to save only pipelines with full features.

Parameters:
  • name (str (mandatory)) – the node name (has to be unique).

  • process (Process (mandatory)) – the process we want to add. May be a string (‘module.process’), a process instance or a class.

  • do_not_export (list of str (optional)) – a list of plug names that we do not want to export.

  • make_optional (list of str (optional)) – a list of plug names that we do not want to export.

  • inputs_to_copy (list of str (optional)) – a list of item to copy.

  • inputs_to_clean (list of str (optional)) – a list of temporary items.

  • skip_invalid (bool) – if True, if the process is failing (cannot be instantiated), don’t throw an exception but instead don’t insert the node, and mark it as such in order to make add_link() to also silently do nothing. This option is useful for optional process nodes which may or may not be available depending on their dependencies, typically in a switch offering several alternative methods.

add_processes_selection(selection_parameter, selection_groups, value=None)[source]

Add a processes selection switch definition to the pipeline.

Selectors are a “different” kind of switch: one pipeline node set in a group is enabled, the others are disabled.

The selector has 2 levels:

selection_parameter selects a group.

A group contains a set of nodes which will be activated together. Groups are mutually exclusive.

Parameters:
  • selection_parameter (string (mandatory)) – name of the selector parameter: the parameter is added in the pipeline, and its value is the name of the selected group.

  • selection_groups (dict or OrderedDict) – nodes groups contained in the selector : {group_name: [Node names]}

  • value (str (optional)) – initial state of the selector (default: 1st group)

add_switch(name, inputs, outputs, export_switch=True, make_optional=(), output_types=None, switch_value=None, opt_nodes=None)[source]

Add a switch node in the pipeline

Parameters:
  • name (str (mandatory)) – name for the switch node (has to be unique)

  • inputs (list of str (mandatory)) – names for switch inputs. Switch activation will select amongst them. Inputs names will actually be a combination of input and output, in the shape “input_switch_output”. This behaviour is needed when there are several outputs, and thus several input groups.

  • outputs (list of str (mandatory)) – names for outputs.

  • export_switch (bool (optional)) – if True, export the switch trigger to the parent pipeline with name as parameter name

  • make_optional (sequence (optional)) – list of optional outputs. These outputs will be made optional in the switch output. By default they are mandatory.

  • output_types (sequence of traits (optional)) – If given, this sequence should have the same size as outputs. It will specify each switch output parameter type (as a standard trait). Input parameters for each input block will also have this type.

  • switch_value (str (optional)) – Initial value of the switch parameter (one of the inputs names). Defaults to 1st input.

  • opt_nodes (bool or list) – tells that switch values are node names, and some of them may be optional and missing. In such a case, missing nodes are not added as inputs. If a list is passed, then it is a list of node names which length should match the number of inputs, and which order tells nodes related to inputs (in case inputs names are not directly node names).

Examples

>>> pipeline.add_switch('group_switch', ['in1', 'in2'],
                        ['out1', 'out2'])

will create a switch with 4 inputs and 2 outputs: inputs: “in1_switch_out1”, “in2_switch_out1”, “in1_switch_out2”, “in2_switch_out2” outputs: “out1”, “out2”

add_trait(name, trait)[source]

Add a trait to the pipeline

Parameters:
  • name (str (mandatory)) – the trait name

  • trait (trait instance (mandatory)) – the trait we want to add

all_nodes()[source]

Iterate over all pipeline nodes including sub-pipeline nodes.

Returns:

nodes – Iterates over all nodes

Return type:

Generator of Node

autoexport_nodes_parameters(include_optional=False)[source]

Automatically export nodes plugs to the pipeline.

Some parameters can be explicitly preserved from exportation if they are listed in the pipeline “do_not_export” variable (list or set).

Parameters:

include_optional (bool (optional)) – If True (the default), optional plugs are not exported Exception: optional output plugs of switches are exported (otherwise they are useless). It should probably be any single output plug of a node.

call_process_method(process_name, method, *args, **kwargs)[source]

Call a method of a process previously added with add_process or add_iterative_process.

Parameters:
  • process_name (str (mandatory)) – name given to the process node.

  • method (str (mandatory)) – name of the method to call.

check_requirements(environment='global', message_list=None)[source]

Reimplementation for pipelines of capsul.process.process.Process.check_requirements

A pipeline will return a list of unique configuration values.

compare_to_state(pipeline_state)[source]

Returns the differences between this pipeline and a previously recorded state.

Returns:

differences – each element is a human readable string explaining one difference (e.g. ‘node “my_process” is missing’)

Return type:

list

count_items()[source]

Count pipeline items to get its size.

Returns:

items – (nodes_count, processes_count, plugs_count, params_count, links_count, enabled_nodes_count, enabled_procs_count, enabled_links_count)

Return type:

tuple

define_groups_as_steps(exclusive=True)[source]

Define parameters groups according to which steps they are connected to.

Parameters:

exclusive (bool (optional)) – if True, a parameter is assigned to a single group, the first step it is connected to. If False, a parameter is assigned all steps groups it is connected to.

define_pipeline_steps(steps)[source]

Define steps in the pipeline. Steps are pipeline portions that form groups, and which can be enabled or disabled on a runtime basis (when building workflows).

Once steps are defined, their activation may be accessed through the “step” trait, which has one boolean property for each step:

Ex:

steps = OrderedDict()
steps['preprocessings'] = [
    'normalization',
    'bias_correction',
    'histo_analysis']
steps['brain_extraction'] = [
    'brain_segmentation',
    'hemispheres_split']
pipeline.define_pipeline_steps(steps)
>>> print(pipeline.pipeline_steps.preprocessings)
True
>>> pipeline.pipeline_steps.brain_extraction = False

See also add_pipeline_step()

Parameters:

steps (dict or preferably OrderedDict or SortedDictionary (mandatory)) – The steps dict keys are steps names, the values are lists of nodes names forming the step.

disabled_pipeline_steps_nodes()[source]

List nodes disabled for runtime execution

Returns:

disabled_nodes – list of pipeline nodes (Node instances) which will not run in a workflow created from this pipeline state.

Return type:

list

enable_all_pipeline_steps()[source]

Set all defined steps (using add_step() or define_steps()) to be enabled. Useful to reset the pipeline state after it has been changed.

export_parameter(node_name, plug_name, pipeline_parameter=None, weak_link=False, is_enabled=None, is_optional=None, allow_existing_plug=None)[source]

Export a node plug at the pipeline level.

Parameters:
  • node_name (str (mandatory)) – the name of node containing the plug we want to export

  • plug_name (str (mandatory)) – the node plug name we want to export

  • pipeline_parameter (str (optional)) – the name to access this parameter at the pipeline level. Default None, the plug name is used

  • weak_link (bool (optional)) – this property is used when nodes are weak, FIXME: what does it exactly mean ? the plug information may not be generated.

  • is_enabled (bool (optional)) – a property to specify that it is not a user-parameter automatic generation)

  • is_optional (bool (optional)) – sets the exported parameter to be optional

  • allow_existing_plug (bool (optional)) – the same pipeline plug may be connected to several process plugs

find_empty_parameters()[source]

Find internal File/Directory parameters not exported to the main input/output parameters of the pipeline with empty values. This is meant to track parameters which should be associated with temporary files internally.

Returns:

Each element is a list with 3 values: node, parameter_name, optional

Return type:

list

get_pipeline_step_nodes(step_name)[source]

Get the nodes in the given pipeline step

get_processes_selection_groups(selection_parameter)[source]

Get groups names involved in a processes selection switch

get_processes_selection_nodes(selection_parameter, group)[source]

Get nodes names involved in a processes selection switch with value group

get_processes_selections()[source]

Get process_selection groups names (corresponding to selection parameters on the pipeline)

Set callbacks when traits value change, and follow plugs links to debug links propagation and problems in it.

Parameters:
  • log_file (str (optional)) – file-like object to write links propagation in. If none is specified, a temporary file will be created for it.

  • handler (function (optional)) – Callback to be processed for debugging. If none is specified, the default pipeline debugging function will be used. This default handler prints traits changes and links to be processed in the log_file. The handler function will receive a prefix string, a node, and traits parameters, namely the object (process) owning the changed value, the trait name and value in this object.

  • prefix (str (optional)) – prefix to be prepended to traits names, typically the parent pipeline full name

Returns:

log_file

Return type:

the file object where events will be written in

Parse a link coming from export_parameter method.

Parameters:
  • link (str) – the link description of the form ‘node_from.plug_name->node_to.plug_name’

  • check (bool) – if True, check that the node and plug exist

Returns:

output – tuple containing the link description and instances

Return type:

tuple

Examples

>>> Pipeline.parse_link("node1.plug1->node2.plug2")
"node1", "plug1", <instance node1>, <instance plug1>,
"node2", "plug2", <instance node2>, <instance plug2>

For a pipeline node:

>>> Pipeline.parse_link("plug1->node2.plug2")
"", "plug1", <instance pipeline>, <instance plug1>,
"node2", "plug2", <instance node2>, <instance plug2>
parse_parameter(name, check=True)[source]

Parse parameter of a node from its description.

Parameters:
  • name (str) – the description plug we want to load ‘node.plug’

  • check (bool) – if True, check that the node and plug exist

Returns:

output – tuple containing the plug description and instances

Return type:

tuple

pipeline_definition()[source]

Define pipeline structure, nodes, sub-pipelines, switches, and links.

This method should be overloaded in subclasses, it does nothing in the base Pipeline class.

pipeline_state()[source]

Return an object composed of basic Python objects that contains the whole structure and state of the pipeline. This object can be given to compare_to_state method in order to get the differences with a previously stored state. This is typically used in tests scripts.

Returns:

pipeline_state – todo

Return type:

dictionary

propagate_metadata(node, param, metadata)[source]

Set metadata on a node parameter, and propagate these values to the connected plugs.

Typically needed to propagate the “forbid_completion” metadata to avoid manuyally set values to be overridden by completion.

node may be a Node instance or a node name

Remove a link between pipeline nodes

Parameters:

link (str or list/tuple) – link description. Its shape should be: “node.output->other_node.input”. If no node is specified, the pipeline itself is assumed. Alternatively the link can be (source_node, source_plug_name, dest_node, dest_plug_name)

remove_node(node_name)[source]

Remove a node from the pipeline

remove_pipeline_step(step_name)[source]

Remove the given step

remove_trait(name)[source]

Remove a trait to the pipeline

Parameters:

name (str (mandatory)) – the trait name

rename_node(old_node_name, new_node_name)[source]

Change the name of the selected node and updates the pipeline.

Parameters:
  • old_node_name (str) – old node name

  • new_node_name (str) – new node name

reorder_traits(names)[source]

Reimplementation of Controller method reorder_traits() so that we also reorder the pipeline node plugs.

set_study_config(study_config)[source]

Set a StudyConfig for the process. Note that it can only be done once: once a non-null StudyConfig has been assigned to the process, it should not change.

Remove links debugging callbacks set by install_links_debug_handler

update_nodes_and_plugs_activation()[source]

Reset all nodes and plugs activations according to the current state of the pipeline (i.e. switch selection, nodes disabled, etc.). Activations are set according to the following rules.

workflow_graph(remove_disabled_steps=True, remove_disabled_nodes=True)[source]

Generate a workflow graph

Returns:

  • graph (topological_sort.Graph) – graph representation of the workflow from the current state of the pipeline

  • remove_disabled_steps (bool (optional)) – When set, disabled steps (and their children) will not be included in the workflow graph. Default: True

  • remove_disabled_nodes (bool (optional)) – When set, disabled nodes will not be included in the workflow graph. Default: True

workflow_ordered_nodes(remove_disabled_steps=True)[source]

Generate a workflow: list of process node to execute

Returns:

  • workflow_list (list of Process) – an ordered list of Processes to execute

  • remove_disabled_steps (bool (optional)) – When set, disabled steps (and their children) will not be included in the workflow graph. Default: True

capsul.pipeline.pipeline_construction submodule

This module is an internal machinery, the user needs not to know it and bother about it. In brief it provide helper functions to build a pipeline from an IO serialization.

Classes

PipelineConstructor

ConstructedPipeline

class capsul.pipeline.pipeline_construction.ConstructedPipeline(autoexport_nodes_parameters=None, **kwargs)[source]

Base class of all pipelines created with PipelineConstructor. It redefines pipeline_definition in order to “replay”, at each instantiation, the method calls previously recorded with the PipelineConstructor.

Note

  • Type ‘ConstructedPipeline.help()’ for a full description of this process parameters.

  • Type ‘<ConstructedPipeline>.get_input_spec()’ for a full description of this process input trait types.

  • Type ‘<ConstructedPipeline>.get_output_spec()’ for a full description of this process output trait types.

Initialize the Pipeline class

Parameters:

autoexport_nodes_parameters (bool) – if True (default) nodes containing pipeline plugs are automatically exported.

pipeline_definition()[source]

Executes on self, all method calls recorded in self._pipeline_definition_calls.

class capsul.pipeline.pipeline_construction.PipelineConstructor(module, name)[source]

A PipelineConstructor is used to build a new Pipeline class and dynamically constructs its contents, for instances by adding processes nodes and creating links between parameters. This class must be used whenever one wants to create a pipeline from an external source (i.e. a source that is not an installed Python module). For instance, the creation of a pipeline from an XML representation uses a PipelineConstructor.

`pipeline`

the constructed Pipeline class

Type:

ConstructedPipeline derived class

Initialize a new empty ConstructedPipeline class that is ready to be customized by calling methods of the PipelineConstructor. When pipeline creation is done, the pipeline class can be accessed with the pipeline attribute.

Parameters:
  • module (str (mandatory)) – name of the module for the created Pipeline class.

  • name (str (mandatory)) – name of the created Pipeline class.

add_custom_node(*args, **kwargs)[source]

Adds an custom Node subtype to the pipeline

add_iterative_process(*args, **kwargs)[source]

Adds a new iterative process to the pipeline.

Add a link between pipeline processes.

add_optional_output_switch(*args, **kwargs)[source]

Adds an OptionalOutputswitch to the pipeline

add_pipeline_step(step_name, nodes, enabled)[source]

Defines a pipeline step

add_process(*args, **kwargs)[source]

Adds a new process to the pipeline.

add_processes_selection(*args, **kwargs)[source]

Add processes selection to the pipeline.

Add a link between subpipeline processes.

add_switch(*args, **kwargs)[source]

Adds a switch to the pipeline

call_process_method(*args, **kwargs)[source]

Call a method of a process previously added with add_process or add_itConstructedPipelineerative_process.

export_parameter(*args, **kwargs)[source]

Export an internal parameter to the pipeline parameters.

set_documentation(doc)[source]

Sets the documentation of the pipeline

set_node_enabled(name, state)[source]

Enables or disabled a node

set_node_position(node_name, x, y)[source]

Set a pipeline node position.

set_scene_scale_factor(scale)[source]

Set global nodes view scale factor.

capsul.pipeline.pipeline_nodes submodule

Node classes for CAPSUL pipeline elements

Classes

Plug

Node

ProcessNode

PipelineNode

Switch

OptionalOutputSwitch

class capsul.pipeline.pipeline_nodes.Node(pipeline, name, inputs, outputs)[source]

Basic Node structure of the pipeline that need to be tuned.

It is possible to define custom nodes inheriting Node. To be usable in all contexts (GUI construction, pipeline save / reload), custom nodes should define a few additional instance and class methods which will allow automatic systems (such as get_node_instance()) to reinstantiate and save them:

  • configure_controller(cls): classmethod

    return a Controller instance which specifies parameters needed to build the node instance. Typically it may contain a parameters (plugs) list and other specifications.

  • configured_controller(self): instance method:

    on an instance, returns a Controller instance in the same shape as configure_controller above, but with values filled from the instance. This controller will allow saving parameters needed to instantiate again the node with the same state.

  • build_node(cls, pipeline, name, conf_controller): class method

    returns an instance of the node class, built using appropriate parameters (using configure_controller() or configured_controller() from another instance)

name

the node name

Type:

str

full_name

a unique name among all nodes and sub-nodes of the top level pipeline

Type:

str

enabled

user parameter to control the node activation

Type:

bool

activated

parameter describing the node status

Type:

bool

connect()[source]
set_callback_on_plug()[source]
get_plug_value()[source]
set_plug_value()[source]
get_trait()[source]

Generate a Node

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the node name

  • inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)

  • outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)

check_requirements(environment='global', message_list=None)[source]

Checks the process requirements against configuration settings values in the attached CapsulEngine. This makes use of the requirements() method and checks that there is one matching config value for each required module.

Parameters:
  • environment (str) – config environment id. Normally corresponds to the computing resource name, and defaults to “global”.

  • message_list (list) – if not None, this list will be updated with messages for unsatisfied requirements, in order to present the user with an understandable error.

Returns:

config – if None is returned, requirements are not met: the process cannot run. If a dict is returned, it corresponds to the matching config values. When no requirements are needed, an empty dict is returned. A pipeline, if its requirements are met will return a list of configuration values, because different nodes may require different config values.

Return type:

dict, list, or None

cleanup()[source]

cleanup before deletion

disconnects all plugs, remove internal and cyclic references

connect(source_plug_name, dest_node, dest_plug_name)[source]

Connect linked plugs of two nodes

Parameters:
  • source_plug_name (str (mandatory)) – the source plug name

  • dest_node (Node (mandatory)) – the destination node

  • dest_plug_name (str (mandatory)) – the destination plug name

disconnect(source_plug_name, dest_node, dest_plug_name, silent=False)[source]

disconnect linked plugs of two nodes

Parameters:
  • source_plug_name (str (mandatory)) – the source plug name

  • dest_node (Node (mandatory)) – the destination node

  • dest_plug_name (str (mandatory)) – the destination plug name

  • silent (bool) – if False, do not fire an exception if the connection does not exust (perhaps already disconnected

get_connections_through(plug_name, single=False)[source]

If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].

Parameters:
  • plug_name (str) – plug to get connections with

  • single (bool) – if True, stop at the first connected plug. Otherwise return the list of all connected plugs.

Returns:

[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.

Return type:

connected_plug; list of tuples

get_missing_mandatory_parameters(exclude_links=False)[source]

Returns a list of parameters which are not optional, and which value is Undefined or None, or an empty string for a File or Directory parameter.

Parameters:

exclude_links (bool) – if True, an empty parameter which has a link to another node will not be reported missing, since the execution will assign it a temporary value which will not prevent the pipeline from running.

get_plug_value(plug_name)[source]

Return the plug value

Parameters:

plug_name (str (mandatory)) – a plug name

Returns:

output – the plug value

Return type:

object

get_study_config()[source]

Get (or create) the StudyConfig this process belongs to

get_trait(trait_name)[source]

Return the desired trait

Parameters:

trait_name (str (mandatory)) – a trait name

Returns:

output – the trait named trait_name

Return type:

trait

is_job()[source]

if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.

remove_callback_from_plug(plug_name, callback)[source]

Remove an event when a plug change

Parameters:
  • plug_name (str (mandatory)) – a plug name

  • callback (@f (mandatory)) – a callback function

requirements()[source]

Requirements needed to run the node. It is a dictionary which keys are config/settings modules and values are requests for them.

The default implementation returns an empty dict (no requirements), and should be overloaded by processes which actually have requirements.

Ex:

{'spm': 'version >= "12" and standalone == "True"')
set_callback_on_plug(plug_name, callback)[source]

Add an event when a plug change

Parameters:
  • plug_name (str (mandatory)) – a plug name

  • callback (@f (mandatory)) – a callback function

set_plug_value(plug_name, value, protected=None)[source]

Set the plug value

Parameters:
  • plug_name (str (mandatory)) – a plug name

  • value (object (mandatory)) – the plug value we want to set

  • protected (None or bool (tristate)) – if True or False, force the “protected” status of the plug. If None, keep it as is.

set_study_config(study_config)[source]

Set the StudyConfig this process belongs to

class capsul.pipeline.pipeline_nodes.OptionalOutputSwitch(pipeline, name, input, output)[source]

A switch which activates or disables its input/output link according to the output value. If the output value is not None or Undefined, the link is active, otherwise it is inactive.

This kind of switch is meant to make a pipeline output optional, but still available for temporary files values inside the pipeline.

Ex:

A.output -> B.input

B.input is mandatory, but we want to make A.output available and optional in the pipeline outputs. If we directlty export A.output, then if the pipeline does not set a value, B.input will be empty and the pipeline run will fail.

Instead we can add an OptionalOutputSwitch between A.output and pipeline.output. If pipeline.output is set a valid value, then A.output and B.input will have the same valid value. If pipeline.output is left Undefined, then A.output and B.input will get a temporary value during the run.

Technically, the OptionalOutputSwitch is currently implemented as a specialized switch node with two inputs and one output, and thus follows the inputs naming rules. The first input is the defined one, and the second, hidden one, is named “_none”. As a consequence, its 1st input should be connected under the name “<input>_switch_<output> as in a standard switch. The “switch” input is hidden (not exported to the pipeline) and set automatically according to the output value. The implementation details may change in future versions.

Generate an OptionalOutputSwitch Node

Warning

The input plug name is built according to the following rule: <input>_switch_<output>

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the switch node name

  • input (str (mandatory)) – option

  • output (str (mandatory)) – output parameter

class capsul.pipeline.pipeline_nodes.PipelineNode(pipeline, name, process, **kwargs)[source]

A special node to store the pipeline user-parameters

Generate a ProcessNode

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added.

  • name (str (mandatory)) – the node name.

  • process (instance) – a process/interface instance.

  • kwargs (dict) – process default values.

get_connections_through(plug_name, single=False)[source]

If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].

Parameters:
  • plug_name (str) – plug to get connections with

  • single (bool) – if True, stop at the first connected plug. Otherwise return the list of all connected plugs.

Returns:

[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.

Return type:

connected_plug; list of tuples

class capsul.pipeline.pipeline_nodes.Plug(**kwargs)[source]

Overload of the traits in order to keep the pipeline memory.

enabled

user parameter to control the plug activation

Type:

bool

activated

parameter describing the Plug status

Type:

bool

output

parameter to set the Plug type (input or output)

Type:

bool

optional

parameter to create an optional Plug

Type:

bool

has_default_value

indicate if a value is available for that plug even if its not linked

Type:

bool

the successor plugs of this plug

Type:

set (node_name, plug_name, node, plug, is_weak)

the predecessor plugs of this plug

Type:

set (node_name, plug_name, node, plug, is_weak)

Generate a Plug, i.e. a trait with the memory of the pipeline adjacent nodes.

class capsul.pipeline.pipeline_nodes.ProcessNode(pipeline, name, process, **kwargs)[source]

Process node.

process

the process instance stored in the pipeline node

Type:

process instance

set_callback_on_plug()[source]
get_plug_value()[source]
set_plug_value()[source]
get_trait()[source]

Generate a ProcessNode

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added.

  • name (str (mandatory)) – the node name.

  • process (instance) – a process/interface instance.

  • kwargs (dict) – process default values.

check_requirements(environment='global', message_list=None)[source]

Reimplementation of capsul.pipeline.pipeline_nodes.Node.requirements() for a ProcessNode. This one delegates to its underlying process (or pipeline).

get_plug_value(plug_name)[source]

Return the plug value

Parameters:

plug_name (str (mandatory)) – a plug name

Returns:

output – the plug value

Return type:

object

get_study_config()[source]

Get (or create) the StudyConfig this process belongs to

get_trait(trait_name)[source]

Return the desired trait

Parameters:

trait_name (str (mandatory)) – a trait name

Returns:

output – the trait named trait_name

Return type:

trait

is_job()[source]

if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.

is_parameter_protected(plug_name)[source]

Tells whether the given parameter is protected or not

protect_parameter(plug_name, state=True)[source]

Protect the named parameter.

Protecting is not a real lock, it just marks the parameter a list of “protected” parameters. This is typically used to mark values that have been set manually by the user (using the ControllerWidget for instance) and that should not be later modified by automatic parameters tweaking (such as completion systems).

Protected parameters are listed in an additional trait, “protected_parameters”.

If the “state” parameter is False, then we will unprotect it (calling unprotect_parameter())

remove_callback_from_plug(plug_name, callback)[source]

Remove an event when a plug change

Parameters:
  • plug_name (str (mandatory)) – a plug name

  • callback (@f (mandatory)) – a callback function

requirements()[source]

Requirements reimplementation for a process node. This node delegates to its underlying process. see capsul.process.process.requirements()

set_callback_on_plug(plug_name, callback)[source]

Add an event when a plug change

Parameters:
  • plug_name (str (mandatory)) – a plug name

  • callback (@f (mandatory)) – a callback function

set_plug_value(plug_name, value, protected=None)[source]

Set the plug value

Parameters:
  • plug_name (str (mandatory)) – a plug name

  • value (object (mandatory)) – the plug value we want to set

  • protected (None or bool (tristate)) – if True or False, force the “protected” status of the plug. If None, keep it as is.

set_study_config(study_config)[source]

Get (or create) the StudyConfig this process belongs to

class capsul.pipeline.pipeline_nodes.Switch(pipeline, name, inputs, outputs, make_optional=(), output_types=None)[source]

Switch node to select a specific Process.

A switch commutes a group of inputs to its outputs, according to its “switch” trait value. Each group may be typically linked to a different process. Processes not “selected” by the switch are disabled, if possible. Values are also propagated through inputs/outputs of the switch (see below).

Inputs / outputs:

Say the switch “my_switch” has 2 outputs, “param1” and “param2”. It will be connected to the outputs of 2 processing nodes, “node1” and “node2”, both having 2 outputs: node1.out1, node1.out2, node2.out1, node2.out2. The switch will thus have 4 entries, in 2 groups, named for instance “node1” and “node2”. The switch will link the outputs of node1 or node2 to its outputs. The switch inputs will be named as follows:

  • 1st group: “node1_switch_param1”, “node1_switch_param2”

  • 2nd group: “node2_switch_param1”, “node2_switch_param2”

  • When my_switch.switch value is “node1”, my_switch.node1_switch_param1 is connected to my_switch.param1 and my_switch.node1_switch_param2 is connected to my_switch.param2. The processing node node2 is disabled (unselected).

  • When my_switch.switch value is “node2”, my_switch.node2_switch_param1 is connected to my_switch.param1 and my_switch.node2_switch_param2 is connected to my_switch.param2. The processing node node1 is disabled (unselected).

Values propagation:

  • When a switch is activated (its switch parameter is changed), the outputs will reflect the selected inputs, which means their values will be the same as the corresponding inputs.

  • But in many cases, parameters values will be given from the output (if the switch output is one of the pipeline outputs, this one will be visible from the “outside world, not the switch inputs). In this case, values set as a switch input propagate to its inputs.

  • An exception is when a switch input is linked to the parent pipeline inputs: its value is also visible from “outside” and should not be set via output values via the switch. In this specific case, output values are not propagated to such inputs.

Notes

Switch is normally not instantiated directly, but from a pipeline pipeline_definition method

_switch_values

the switch options

Type:

list

_outputs

the switch output parameters

Type:

list

Generate a Switch Node

Warning

The input plug names are built according to the following rule: <input_name>_switch_<output_name>

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the switch node name

  • inputs (list (mandatory)) – a list of options

  • outputs (list (mandatory)) – a list of output parameters

  • make_optional (sequence (optional)) – list of optional outputs. These outputs will be made optional in the switch output. By default they are mandatory.

  • output_types (sequence of traits (optional)) – If given, this sequence should have the same size as outputs. It will specify each switch output parameter type (as a standard trait). Input parameters for each input block will also have this type.

connections()[source]

Returns the current internal connections between input and output plugs

Returns:

connections – list of internal connections [(input_plug_name, output_plug_name), …]

Return type:

list

get_connections_through(plug_name, single=False)[source]

If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].

Parameters:
  • plug_name (str) – plug to get connections with

  • single (bool) – if True, stop at the first connected plug. Otherwise return the list of all connected plugs.

Returns:

[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.

Return type:

connected_plug; list of tuples

is_job()[source]

if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.

capsul.pipeline.pipeline_tools submodule

Miscellaneous pipeline handling utility functions

Functions

pipeline_node_colors()

dot_graph_from_pipeline()

dot_graph_from_workflow()

save_dot_graph()

save_dot_image()

disable_runtime_steps_with_existing_outputs()

nodes_with_existing_outputs()

nodes_with_missing_inputs()

where_is_plug_value_from()

dump_pipeline_state_as_dict()

set_pipeline_state_from_dict()

get_output_directories()

create_output_directories()

save_pipeline()

load_pipeline_parameters()

save_pipeline_parameters()

is_node_enabled()

capsul.pipeline.pipeline_tools.create_output_directories(process)[source]

Create output directories for a process, pipeline or node.

capsul.pipeline.pipeline_tools.disable_runtime_steps_with_existing_outputs(pipeline)[source]

Disable steps in a pipeline which outputs contain existing files. This disabling is the “runtime steps disabling” one (see capsul.pipeline.Pipeline), not the node disabling with activation propagation, so it doesn’t affect the actual pipeline state. The aim is to prevent overwriting files which have already been processed, and to allow downstream execution of the remaining steps of the pipeline.

Parameters:

pipeline (Pipeline (mandatory)) – pipeline to disable nodes in.

capsul.pipeline.pipeline_tools.dot_graph_from_pipeline(pipeline, nodes_sizes={}, use_nodes_pos=False, include_io=True, enlarge_boxes=0.0)[source]

Build a graphviz/dot-compatible representation of the pipeline. The pipeline representation uses one link between two given boxes: parameters are not represented.

This is different from the workflow graph, as given by capsul.pipeline.Pipeline.workflow_graph(), in that the full graph is represented here, including disabled nodes.

To build a workflow graph, see dot_graph_from_workflow().

Parameters:
  • pipeline (Pipeline) – pipeline to convert to a dot graph

  • nodes_sizes (dict (optional)) – nodes sizes may be specified here, keys are node names, and values are tuples (width, height). Special “inputs” and “outputs” keys represent the global inputs/outputs blocks of the pipeline.

  • use_nodes_pos (bool (optional)) – if True, nodes positions in the pipeline.node_position variable will be used to force nodes positions in the graph.

  • include_io (bool (optional)) – If True, build a node for the pipeline inputs and a node for the pipeline outputs. If False, these nodes will not be generated, and their links ignored.

  • enlarge_boxes (float (optional)) – when nodes sizes are specified, enlarge them by this amount to produce bigger boxes

Returns:

dot_graph – a (nodes, edges) tuple, where nodes is a list of node tuples (id, node_name, props) and edges is a dict, where keys are (source_node_id, dest_node_id), and values are tuples (props, active, weak). In both cases props is a dictionary of properties. This representation is simple and is meant to feed save_dot_graph()

Return type:

tuple

capsul.pipeline.pipeline_tools.dot_graph_from_workflow(pipeline, nodes_sizes={}, use_nodes_pos=False, enlarge_boxes=0.0)[source]

Build a graphviz/dot-compatible representation of the pipeline workflow.

This is different from the pipeline graph, as obtained by:py:func:dot_graph_from_pipeline, since only used parts are visible here: switches and disabled branches are removed.

Parameters:
  • pipeline (Pipeline) – pipeline to convert to a dot graph

  • nodes_sizes (dict (optional)) – nodes sizes may be specified here, keys are node names, and values are tuples (width, height). Special “inputs” and “outputs” keys represent the global inputs/outputs blocks of the pipeline.

  • use_nodes_pos (bool (optional)) – if True, nodes positions in the pipeline.node_position variable will be used to force nodes positions in the graph.

  • enlarge_boxes (float (optional)) – when nodes sizes are specified, enlarge them by this amount to produce bigger boxes

Returns:

dot_graph – a (nodes, edges) tuple, where nodes is a list of node tuples (id, node_name, props) and edges is a dict, where keys are (source_node_id, dest_node_id), and values are tuples (props, active, weak). In both cases props is a dictionary of properties. This representation is simple and is meant to feed save_dot_graph()

Return type:

tuple

capsul.pipeline.pipeline_tools.dump_pipeline_state_as_dict(pipeline)[source]

Get a pipeline state (parameters values, nodes activation, selected steps… in a dictionary.

The returned dict may contain sub-pipelines state also.

The dict may be saved, and used to restore a pipeline state, using set_pipeline_state_from_dict().

Note that pipeline.export_to_dict would almost do the job, but does not include the recursive aspect.

Parameters:

pipeline (Pipeline (or Process) instance) – pipeline (or process) to get state from

Returns:

state_dict – pipeline state

Return type:

dict

capsul.pipeline.pipeline_tools.find_node(pipeline, node)[source]

Find the given node in the pipeline or a sub-pipeline

Returns:

node_chain – list of node names in the pipeline going through sub-pipelines to the given node

Return type:

list

capsul.pipeline.pipeline_tools.find_plug_connection_destinations(plug, pipeline=None)[source]

A bit like find_plug_connection_sources() but the other way

Returns:

dest – [(node, param_name, parent_node), …]

Return type:

list

capsul.pipeline.pipeline_tools.find_plug_connection_sources(plug, pipeline=None)[source]

A bit like where_is_plug_value_from() but looks for all incoming connection sources

Returns:

sources – [(node, param_name, parent_node), …]

Return type:

list

capsul.pipeline.pipeline_tools.get_output_directories(process)[source]

Get output directories for a process, pipeline, or node

Returns:

  • dirs (dict) – organized directories list: a dict with recursive nodes mapping. In each element, the “directories” key holds a directories names set, and “nodes” is a dict with sub-nodes (node_name, dict mapping, organized the same way)

  • flat_dirs (set) – set of all directories in the pipeline, as a flat set.

capsul.pipeline.pipeline_tools.is_node_enabled(pipeline, node_name=None, node=None)[source]

Checks if the given node is enabled in the pipeline. It may be disabled if it has its enabled or activated properties set to False, or if it is part of a disabled step. The node may be given as a Node instance, or its name in the pipeline.

capsul.pipeline.pipeline_tools.load_pipeline_parameters(filename, pipeline)[source]

Loading and setting pipeline parameters (inputs and outputs) from a Json file.

capsul.pipeline.pipeline_tools.nodes_with_existing_outputs(pipeline, exclude_inactive=True, recursive=False, exclude_inputs=True)[source]

Checks nodes in a pipeline which outputs contain existing files on the filesystem. Such nodes, maybe, should not run again. Only nodes which actually produce outputs are selected this way (switches are skipped).

Parameters:
  • pipeline (Pipeline (mandatory)) – pipeline to disable nodes in.

  • exclude_inactive (bool (optional)) – if this option is set, inactive nodes will not be checked nor returned in the list. Inactive means disabled, not active, or in a disabled runtime step. Default: True

  • recursive (bool (optional)) – if this option is set, sub-pipelines will not be returned as a whole but will be parsed recursively to select individual leaf nodes. Default: False

  • exclude_inputs (bool (optional, default: True)) – Some processes or pipelines have input/output files: files taken as inputs which are re-written as outputs, or may carry an input file to the outputs through a switch selection (in the case of preprocessing steps, for instance). If this option is set, such outputs which also appear in the same node inputs will not be listed in the existing outputs, so that they will not be erased by a cleaning operation, and will not prevent execution of these nodes.

Returns:

selected_nodes – keys: node names values: list of pairs (param_name, file_name)

Return type:

dict

capsul.pipeline.pipeline_tools.nodes_with_missing_inputs(pipeline, recursive=True)[source]

Checks nodes in a pipeline which inputs contain invalid inputs. Inputs which are files non-existing on the filesystem (so, which cannot run), or have missing mandatory inputs, or take as input a temporary file which should be the output from another disabled node, are recorded.

Parameters:
  • pipeline (Pipeline (mandatory)) – pipeline to disable nodes in.

  • recursive (bool (optional)) – if this option is set, sub-pipelines will not be returned as a whole but will be parsed recursively to select individual leaf nodes. Note that if not set, a pipeline is regarded as a process, but pipelines may not use all their inputs/outputs so the result might be inaccurate. Default: True

Returns:

selected_nodes – keys: node names values: list of pairs (param_name, file_name)

Return type:

dict

Link color and style for graphical display and graphviz graphs.

Parameters:
  • plug (Plug) – the plug the link belong to

  • link (link tuple (5 values)) – link to color

Returns:

link_props – (color, style, active, weak) where color is a RGB tuple of float values (between 0. and 1.), style is a string (“solid”, “dotted”), active and weak are booleans.

Return type:

tuple

capsul.pipeline.pipeline_tools.pipeline_node_colors(pipeline, node)[source]

Node color to display boxes in GUI and graphviz graphs. Depending on the node type (process, pipeline, switch) and its activation, colors will differ.

Parameters:
  • pipeline (Pipeline) – the pipeline the node belongs to

  • node (Node) – the node to be colored

Returns:

colors – (box_color, background_fill_color, dark_color, style). Colors are 3-tuples of float values between 0. and 1. style is “default”, “switch” or “pipeline”.

Return type:

tuple

capsul.pipeline.pipeline_tools.save_dot_graph(dot_graph, filename, **kwargs)[source]

Write a graphviz/dot input file, which can be used to generate an image representation of the graph, or to make dot automatically position nodes.

Parameters:
  • dot_graph (dot graph) – representation of the pipeline, obatained using dot_graph_from_pipeline()

  • filename (string) – file name to save the dot definition in

  • **kwargs (additional attributes for the dot graph) – like nodesep=0.1 or rankdir=”TB”

capsul.pipeline.pipeline_tools.save_dot_image(pipeline, filename, nodes_sizes={}, use_nodes_pos=False, include_io=True, enlarge_boxes=0.0, workflow=False, format=None, **kwargs)[source]

Save a dot/graphviz image of the pipeline in a file.

It may use either the complete pipeline graph (with switches and disabled branches), or the workflow, hiding disabled parts (see the workflow parameter).

Basically combines dot_graph_from_pipeline() or dot_graph_from_workflow(), and save_dot_graph(), then runs the dot command, which has to be installed and available on the system.

Parameters:
  • pipeline (Pipeline) – pipeline to convert to a dot graph

  • filename (string) – file name to save the dot definition in.

  • nodes_sizes (dict (optional)) – nodes sizes may be specified here, keys are node names, and values are tuples (width, height). Special “inputs” and “outputs” keys represent the global inputs/outputs blocks of the pipeline.

  • use_nodes_pos (bool (optional)) – if True, nodes positions in the pipeline.node_position variable will be used to force nodes positions in the graph.

  • include_io (bool (optional)) – If True, build a node for the pipeline inputs and a node for the pipeline outputs. If False, these nodes will not be generated, and their links ignored.

  • enlarge_boxes (float (optional)) – when nodes sizes are specified, enlarge them by this amount to produce bigger boxes

  • workflow (bool (optional)) – if True, the workflow corresponding to the current pipeline state will be used instead of the complete graph: disabled parts will be hidden.

  • format (string (optional)) –

    dot output format (see dot command doc). If not specified, guessed from the file name extension.

  • **kwargs (additional attributes for the dot graph) – like nodesep=0.1 or rankdir=”TB”

capsul.pipeline.pipeline_tools.save_pipeline(pipeline, file, format=None)[source]

Save the pipeline either in XML, JSON, or .py source file

Parameters:
  • pipeline (Pipeline instance) –

  • file (file object or str) – either a filename, or a file-like stream

  • format (str) – ‘py’, ‘xml’… If not specified and file is a file name, it will be guessed from its extension. If file is not a string, then format will default to xml.

capsul.pipeline.pipeline_tools.save_pipeline_parameters(filename, pipeline)[source]

Saving pipeline parameters (inputs and outputs) to a Json file.

capsul.pipeline.pipeline_tools.set_pipeline_state_from_dict(pipeline, state_dict)[source]

Set a pipeline (or process) state from a dict description.

State includes parameters values, nodes activation, steps selection etc. The state is generally taken using dump_pipeline_state_as_dict().

Parameters:
  • pipeline (Pipeline or Process instance) – process to set state in

  • state_dict (dict (mapping object)) – state dictionary

capsul.pipeline.pipeline_tools.where_is_plug_value_from(plug, recursive=True)[source]

Find where the given (input) plug takes its value from. It has to be the output of an uphill process, or be unconnected. Looking for it may involve ascending through switches or pipeline walls.

Parameters:
  • plug (Plug instance (mandatory)) – the plug to find source connection with

  • recursive (bool (optional)) – if this option is set, sub-pipelines will not be returned as a whole but will be parsed recursively to select individual leaf nodes. Note that if not set, a pipeline is regarded as a process, but pipelines may not use all their inputs/outputs so the result might be inaccurate. Default: True

Returns:

  • node (Node) – origin pipeline node. May be None if the plug was not connected to an active source.

  • param_name (string) – origin plug name in the origin node. May be None if node is None

  • parent (Node) – Top-level parent node of the origin node. Useful to determine if the origin node is in a runtime pipeline step, which only records top-level nodes.

capsul.pipeline.pipeline_tools.write_fake_pipeline(pipeline, module_name, dirname, sleep_time=0)[source]

Write a “fake pipeline” with same class name, structure, and parameters as the input pipeline, but replacing its processes with “fake” processes which do not actually do a real job while executing.

This is meant for tests, to mimic a “real” pipeline structure without its dependencies.

:warning:`This function actually modifies the input pipeline, which is transformed into a fake one.`

capsul.pipeline.pipeline_tools.write_fake_process(process, filename, sleep_time=0)[source]

Write a “fake process” with same class name and parameters as the input process, but with a fake execution function meant for tests.

capsul.pipeline.pipeline_workflow submodule

Capsul Pipeline conversion into soma-workflow workflow.

Standard use case:

workflow = workflow_from_pipeline(pipeline)
controller, wf_id = workflow_run(workflow_name, workflow, study_config)

Functions

workflow_from_pipeline()

workflow_run()

class capsul.pipeline.pipeline_workflow.TempFile(string='')[source]
capsul.pipeline.pipeline_workflow.workflow_from_pipeline(pipeline, study_config=None, disabled_nodes=None, jobs_priority=0, create_directories=True, environment='global', check_requirements=True, complete_parameters=False)[source]

Create a soma-workflow workflow from a Capsul Pipeline

Parameters:
  • pipeline (Pipeline (mandatory)) – a CAPSUL pipeline

  • study_config (StudyConfig (optional), or dict) – holds information about file transfers and shared resource paths. If not specified, it will be accessed through the pipeline.

  • disabled_nodes (sequence of pipeline nodes (Node instances) (optional)) – such nodes will be disabled on-the-fly in the pipeline, file transfers will be adapted accordingly (outputs may become inputs in the resulting workflow), and temporary files will be checked. If a disabled node was to produce a temporary files which is still used in an enabled node, then a ValueError exception will be raised. If disabled_nodes is not passed, they will possibly be taken from the pipeline (if available) using disabled steps: see Pipeline.define_steps()

  • jobs_priority (int (optional, default: 0)) – set this priority on soma-workflow jobs.

  • create_directories (bool (optional, default: True)) – if set, needed output directories (which will contain output files) will be created in a first job, which all other ones depend on.

  • environment (str (default: "global")) – configuration environment name (default: “global”). See capsul.engine.CapsulEngine and capsul.engine.settings.Settings.

  • check_requirements (bool (default: True)) – if True, check the pipeline nodes requirements in the capsul engine settings, and issue an exception if they are not met instead of proceeding with the workflow.

  • complete_parameters (bool (default: False)) – Perform parameters completion on the input pipeline while building the workflow. The default is False because we should avoid to do things several times when it’s already done, but in iteration nodes, completion needs to be done anyway for each iteration, so this option offers to do the rest of the “parent” pipeline completion.

Returns:

workflow – a soma-workflow workflow

Return type:

Workflow

capsul.pipeline.pipeline_workflow.workflow_run(workflow_name, workflow, study_config)[source]

Create a soma-workflow controller and submit a workflow

Parameters:
  • workflow_name (str (mandatory)) – the name of the workflow

  • workflow (Workflow (mandatory)) – the soma-workflow workflow

  • study_config (StudyConfig (mandatory)) – contains needed configuration through the SomaWorkflowConfig module

capsul.pipeline.process_iteration submodule

Utility class for iterated nodes in a pipeline. This is mainly internal infrastructure, which a normal programmer should not have to bother about. A pipeline programmer will not instantiate ProcessIteration directly, but rather use the Pipeline method add_iterative_process().

Classes

ProcessIteration

class capsul.pipeline.process_iteration.ProcessIteration(process, iterative_parameters, study_config=None, context_name=None)[source]

Note

  • Type ‘ProcessIteration.help()’ for a full description of this process parameters.

  • Type ‘<ProcessIteration>.get_input_spec()’ for a full description of this process input trait types.

  • Type ‘<ProcessIteration>.get_output_spec()’ for a full description of this process output trait types.

Initialize the Process class.

change_iterative_plug(parameter, iterative=None)[source]

Change a parameter to be iterative (or non-iterative)

Parameters:
  • parameter (str) – parameter name

  • iterative (bool or None) – if None, the iterative state will be toggled. If True or False, the parameter state will be set accordingly.

set_study_config(study_config)[source]

Set a StudyConfig for the process. Note that it can only be done once: once a non-null StudyConfig has been assigned to the process, it should not change.

capsul.pipeline.python_export submodule

Pipeline exportation function as a python source code file.

Functions

save_py_pipeline()

capsul.pipeline.python_export.save_py_pipeline(pipeline, py_file)[source]

Save a pipeline in an Python source file

Parameters:
  • pipeline (Pipeline instance) – pipeline to save

  • py_file (str or file-like object) – .py file to save the pipeline in

capsul.pipeline.topological_sort submodule

Build graph with dependencies from a pipeline

Classes

GraphNode

class capsul.pipeline.topological_sort.Graph[source]

Simple Graph Structure on which we want to perform a topological tree (no cycle).

The algorithm is based on the R.E. Tarjanlinear linear optimization (O(N+A)).

_nodes

the graph nodes {node.name: node}

Type:

dict

graph edges (from_node, to_node)

Type:

list

add_node()[source]
find_node()[source]
topological_sort()[source]

Create a Graph

add_link(from_node, to_node)[source]

Method to add an edge between two GraphNodes of the Graph

Parameters:
  • from_node (GraphNode (mandatory)) –

  • graph (a node in the) –

  • to_node (GraphNode (mandatory)) –

  • node (the successor) –

add_node(node)[source]

Method to add a GraphNode in the Graph

Parameters:
  • node (GraphNode (mandatory)) –

  • insert (the node to) –

find_node(node_name)[source]

Method to find a GraphNode in the Graph

Parameters:
  • node_name (str (mandatory)) –

  • node (the name of the desired) –

topological_sort()[source]

Perform the topological sort: find an order in which all the nodes can be taken. Step 1: Identify nodes that have no incoming link (nnil). Step 2: Loop until there are nnil a) Delete the current nodes c_nnil of in-degree 0. b) Place it in the output. c) Remove all its outgoing links from the graph. d) If the node has in-degree 0, add the node to nnil. Step 3: Assert that there is no loop in the graph.

Returns:

output – a list of ordered nodes with a tuple element containing the node name and the node meta element.

Return type:

list of tuple

class capsul.pipeline.topological_sort.GraphNode(name, meta)[source]

Simple Graph Node Structure

name

the node name

Type:

str

meta

a python object stored in the node

Type:

object

object to store the graph edges: successor

Type:

list

object to store the graph edges: predecessor

Type:

list

degree of the node regarding the successors

Type:

int

degree of the node regarding the predecessors

Type:

int

Create a Graph Node

Parameters:
  • name (str (mandatory)) –

  • node (an python object to store in the) –

  • meta (object) –

  • node

add_link_from(node)[source]

Method to add a Predecessor

Parameters:
  • node (the predecessor) –

  • node

add_link_to(node)[source]

Method to add a Successor

Parameters:
  • node (the successor) –

  • node

remove_link_from(node)[source]

Method to remove a Predecessor

Parameters:
  • node (the predecessor) –

  • node

remove_link_to(node)[source]

Method to remove a Successor

Parameters:
  • node (the successor) –

  • node

capsul.pipeline.xml submodule

XML IO for pipelines

Functions

create_xml_pipeline()

save_xml_pipeline()

capsul.pipeline.xml.create_xml_pipeline(module, name, xml_file)[source]

Create a pipeline class given its Capsul XML 2.0 representation.

Parameters:
  • module (str (mandatory)) – name of the module for the created Pipeline class (the Python module is not modified).

  • name (str (mandatory)) – name of the new pipeline class

  • xml_file (str (mandatory)) – name of file containing the XML description or XML string.

capsul.pipeline.xml.save_xml_pipeline(pipeline, xml_file)[source]

Save a pipeline in an XML file

Parameters:
  • pipeline (Pipeline instance) – pipeline to save

  • xml_file (str or file-like object) – XML file to save the pipeline in

capsul.pipeline.custom_nodes submodule

This module is a placeholder for custom nodes types. Custom nodes can be used as pipeline nodes, do not represent a process (a physical job in a computing resource), but can help setting things up in a pipeline structure or parameters.

capsul.pipeline.custom_nodes.strcat_node

StrCatNode

class capsul.pipeline.custom_nodes.strcat_node.StrCatNode(pipeline, name, params, concat_plug, outputs, make_optional=(), param_types={})[source]

This “inert” node concatenates its inputs (as strings) and generates the concatenation on one of its plugs. All plugs may be inputs or outputs.

Parameters:
  • pipeline (Pipeline) – pipeline which will hold the node

  • name (str) – node name

  • params (list) – names of parameters to be concatenated

  • concat_plug (str) – name of the concatenated plug (should not be part of params)

  • outputs (list) – list of parameters names which are outputs. May include elements from params, and/or concat_plug

  • make_optional (list) – list of plug names which should be optional.

  • param_types (dict) – parameters types dict: {param_name: trait_type_as_string}

capsul.pipeline.custom_nodes.strconv

StrConvNode

class capsul.pipeline.custom_nodes.strconv.StrConvNode(pipeline, name, input_type=None)[source]

This “inert” node converts the input into a string.

Generate a Node

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the node name

  • inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)

  • outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)

get_connections_through(plug_name, single=False)[source]

If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].

Parameters:
  • plug_name (str) – plug to get connections with

  • single (bool) – if True, stop at the first connected plug. Otherwise return the list of all connected plugs.

Returns:

[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.

Return type:

connected_plug; list of tuples

is_job()[source]

if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.

capsul.pipeline.custom_nodes.cv_node

CrossValidationFoldNode

class capsul.pipeline.custom_nodes.cv_node.CrossValidationFoldNode(pipeline, name, input_type=None)[source]

This “inert” node filters a list to separate it into (typically) learn and test sublists.

The “outputs” are “train” and “test” output traits.

Generate a Node

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the node name

  • inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)

  • outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)

capsul.pipeline.custom_nodes.loo_node

LeaveOneOutNode

class capsul.pipeline.custom_nodes.loo_node.LeaveOneOutNode(pipeline, name, is_output=True, input_type=None, test_is_output=True, has_index=True)[source]

This “inert” node excludes one input from the list of inputs, to allow leave-one-out applications. The “outputs” may be either an output trait (to serve as inputs to other nodes), or an input trait (to assign output values to other nodes).

Generate a Node

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the node name

  • inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)

  • outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)

capsul.pipeline.custom_nodes.map_node

MapNode

class capsul.pipeline.custom_nodes.map_node.MapNode(pipeline, name, input_names=['inputs'], output_names=['output_%d'], input_types=None)[source]

This node converts lists into series of single items. Typically an input named inputs is a list of items. The node will separate items and output each of them as an output parameter. The i-th item will be output as output_<i> by default. The inputs / outputs names can be customized using the constructor parameters input_names and output_names. Several lists can be split in the same node. The node will also output a lengths parameter which will contain the input lists lengths. This lengths can typically be input in reduce nodes to perform the reverse operation (see ReduceNode).

  • input_names is a list of input parameters names, each being a list to be split. The default is ['inputs']

  • output_names is a list of patterns used to build output parameters names. Each item is a string containing a substitution pattern "%d" which will be replaced with a number. The default is ['output_%d']. Each pattern will be used to replace items from the corresponding input in the same order. Thus input_names and output_names should be the same length.

Generate a Node

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the node name

  • inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)

  • outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)

capsul.pipeline.custom_nodes.reduce_node

ReduceNode

class capsul.pipeline.custom_nodes.reduce_node.ReduceNode(pipeline, name, input_names=['input_%d'], output_names=['outputs'], input_types=None)[source]

Reduce node: converts series of inputs into lists. Typically a series of inputs named input_0 .. input_<n> will be output as a single list named outputs.

Several input series can be handled by the node, and input names can be customized.

  • The numbers of inputs for each series is given as the lengths input parameter. It is typically linked from the output of a MapNode.

  • Input parameters names patterns are given as the input_names parameter. It is a list of patterns, each containing a "%d" pattern for the input number. The default value is ['input_%d'].

  • Output parameters names are given as the output_names parameter. The default is ['outputs'].

Generate a Node

Parameters:
  • pipeline (Pipeline (mandatory)) – the pipeline object where the node is added

  • name (str (mandatory)) – the node name

  • inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)

  • outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)