capsul.pipeline module¶
Pipelining elements for CAPSUL
capsul.pipeline.pipeline submodule¶
Pipeline main class module
Classes¶
Pipeline
¶
- class capsul.pipeline.pipeline.Pipeline(autoexport_nodes_parameters=None, **kwargs)[source]¶
Pipeline containing Process nodes, and links between node parameters.
A Pipeline is normally subclassed, and its
pipeline_definition()
method is overloaded to define its nodes and links.pipeline_definition()
will be called by the pipeline constructor.from capsul.pipeline import Pipeline class MyPipeline(Pipeline): def pipeline_definition(self): self.add_process('proc1', 'my_toolbox.my_process1') self.add_process('proc2', 'my_toolbox.my_process2') self.add_switch('main_switch', ['in1', 'in2'], ['out1', 'out2']) self.add_link('proc1.out1->main_switch.in1_switch_out1') self.add_link('proc1.out2->main_switch.in1_switch_out2') self.add_link('proc2.out1->main_switch.in2_switch_out1') self.add_link('proc2.out1->main_switch.in2_switch_out2')
After execution of
pipeline_definition()
, the inner nodes parameters which are not connected will be automatically exported to the parent pipeline, with names prefixed with their process name, unless they are listed in a special “do_not_export” list (passed toadd_process()
or stored in the pipeline instance).>>> pipeline = MyPipeline() >>> print(pipeline.proc1_input) <undefined>
Nodes
A pipeline is made of nodes, and links between their parameters. Several types of nodes may be part of a pipeline:
process nodes (
pipeline_nodes.ProcessNode
) are the leaf nodes which represent actual processing bricks.pipeline nodes (
pipeline_nodes.PipelineNode
) are sub-pipelines which allow to reuse an existing pipeline within another oneswitch nodes (
pipeline_nodes.Switch
) allows to select values between several possible inputs. The switch mechanism also allows to select between several alternative processes or processing branches.iterative process (:py:class:process_iteration.ProcessIteration`) represent parallel processing of the same pipeline on a set of parameters.
Note that you normally do not instantiate these nodes explicitly when building a pipeline. Rather programmers may call the
add_process()
,add_switch()
,add_iterative_process()
methods.Nodes activation
Pipeline nodes may be enabled or disabled. Disabling a node will trigger a global pipeline nodes activation step, where all nodes which do not form a complete chain will be inactive. This way a branch may be disabled by disabling one of its nodes. This process is used by the switch system, which allows to select one processing branch between several, and disables the unselected ones.
Pipeline steps
Pipelines may define execution steps: they are user-oriented groups of nodes that are to be run together, or disabled together for runtime execution. They are intended to allow partial, or step-by-step execution. They do not work like the nodes enabling mechanism described above.
Steps may be defined within the
pipeline_definition()
method. Seeadd_pipeline_step()
.Note also that pipeline steps only act at the highest level: if a sub-pipeline has disabled steps, they will not be taken into account in the higher level pipeline execution, because executing by steps a part of a sub-pipeline within the context of a higher one does generally not make sense.
Main methods
- nodes¶
a dictionary containing the pipeline nodes and where the pipeline node name is ‘’
- Type:
dict {node_name: node}
Note
Type ‘Pipeline.help()’ for a full description of this process parameters.
Type ‘<Pipeline>.get_input_spec()’ for a full description of this process input trait types.
Type ‘<Pipeline>.get_output_spec()’ for a full description of this process output trait types.
Initialize the Pipeline class
- Parameters:
autoexport_nodes_parameters (bool) – if True (default) nodes containing pipeline plugs are automatically exported.
- add_custom_node(name, node_type, parameters=None, make_optional=(), do_not_export=None, **kwargs)[source]¶
Inserts a custom node (Node subclass instance which is not a Process) in the pipeline.
- Parameters:
node_type (str or Node subclass or Node instance) – node type to be built. Either a class (Node subclass) or a Node instance (the node will be re-instantiated), or a string describing a module and class.
parameters (dict or Controller or None) – configuration dict or Controller defining parameters needed to build the node. The controller should be obtained using the node class’s configure_node() static method, then filled with the desired values. If not given the node is supposed to be built with no parameters, which will not work for every node type.
make_optional (list or tuple) – parameters names to be made optional
do_not_export (list of str (optional)) – a list of plug names that we do not want to export.
kwargs (default values of node parameters)
- add_iterative_process(name, process, iterative_plugs=None, do_not_export=None, make_optional=None, inputs_to_copy=None, inputs_to_clean=None, **kwargs)[source]¶
Add a new iterative node in the pipeline.
- Parameters:
name (str (mandatory)) – the node name (has to be unique).
process (Process or str (mandatory)) – the process we want to add.
iterative_plugs (list of str (optional)) – a list of plug names on which we want to iterate. If None, all plugs of the process will be iterated.
do_not_export (list of str (optional)) – a list of plug names that we do not want to export.
make_optional (list of str (optional)) – a list of plug names that we do not want to export.
inputs_to_copy (list of str (optional)) – a list of item to copy.
inputs_to_clean (list of str (optional)) – a list of temporary items.
- add_link(link, weak_link=False, allow_export=False, value=None)[source]¶
Add a link between pipeline nodes.
If the destination node is a switch, force the source plug to be not optional.
- Parameters:
link (str or list/tuple) – link description. Its shape should be: “node.output->other_node.input”. If no node is specified, the pipeline itself is assumed. Alternatively the link can be (source_node, source_plug_name, dest_node, dest_plug_name)
weak_link (bool) – this property is used when nodes are optional, the plug information may not be generated.
allow_export (bool) – if True, if the link links from/to the pipeline node with a plug name which doesn’t exist, the plug will be created, and the function will act exactly like export_parameter. This may be a more convenient way of exporting/connecting pipeline plugs to several nodes without having to export the first one, then link the others.
value (any) – if given, set this value instead of the source plug value
- add_optional_output_switch(name, input, output=None)[source]¶
Add an optional output switch node in the pipeline
An optional switch activates or disables its input/output link according to the output value. If the output value is not None or Undefined, the link is active, otherwise it is inactive.
This kind of switch is meant to make a pipeline output optional, but still available for temporary files values inside the pipeline.
Ex:
A.output -> B.input
B.input is mandatory, but we want to make A.output available and optional in the pipeline outputs. If we directlty export A.output, then if the pipeline does not set a value, B.input will be empty and the pipeline run will fail.
Instead we can add an OptionalOutputSwitch between A.output and pipeline.output. If pipeline.output is set a valid value, then A.output and B.input will have the same valid value. If pipeline.output is left Undefined, then A.output and B.input will get a temporary value during the run.
Add an optional output switch node in the pipeline
- Parameters:
name (str (mandatory)) – name for the switch node (has to be unique)
input (str (mandatory)) – name for switch input. Switch activation will select between it and a hidden input, “_none”. Inputs names will actually be a combination of input and output, in the shape “input_switch_output”.
output (str (optional)) – name for output. Default is the switch name
Examples
>>> pipeline.add_optional_output_switch('out1', 'in1') >>> pipeline.add_link('node1.output->out1.in1_switch_out1')
- add_pipeline_step(step_name, nodes, enabled=True)[source]¶
Add a step definition to the pipeline (see also define_steps).
Steps are groups of pipeline nodes, which may be disabled at runtime. They are normally defined in a logical order regarding the workflow streams. They are different from pipelines in that steps are purely virtual groups, they do not have parameters.
Disabling a step acts differently as the pipeline node activation: other nodes are not inactivated according to their dependencies. Instead, those steps are not run.
- add_process(name, process, do_not_export=None, make_optional=None, inputs_to_copy=None, inputs_to_clean=None, skip_invalid=False, **kwargs)[source]¶
Add a new node in the pipeline
Note about invalid nodes:
A pipeline can typically offer alternatives (through a switch) to different algorithmic nodes, which may have different dependencies, or may be provided through external modules, thus can be missing. To handle this, Capsul can be telled that a process node can be invalid (or missing) without otherwise interfering the rest of the pipeline. This is done using the “skip_invalid” option. When used, the process to be added is tested, and if its instantiation fails, it will not be added in the pipeline, but will not trigger an error. Instead the missing node will be marked as “allowed invalid”, and links and exports built using this node will silently do nothing. thus the pipeline will work normally, without the invalid node.
Such nodes are generally gathered through a switch mechanism. However the switch inputs should be restricted to actually available nodes. The recommended method is to check that nodes have actually been added in the pipeline. Then links can be made normally as if the nodes were all present:
self.add_process('method1', 'module1.Module1', skip_invalid=True) self.add_process('method2', 'module2.Module2', skip_invalid=True) self.add_process('method3', 'module3.Module3', skip_invalid=True) input_params = [n for n in ['method1', 'method2', 'method3'] if n in self.nodes] self.add_switch('select_method', input_params, 'output') self.add_link('method1.input->select_method.method1_switch_output') self.add_link('method2.input->select_method.method2_switch_output') self.add_link('method3.input->select_method.method3_switch_output')
A last note about invalid nodes:
When saving a pipeline (through the
graphical editor
typically), missing nodes will not be saved because they are not actually in the pipeline. So be careful to save only pipelines with full features.- Parameters:
name (str (mandatory)) – the node name (has to be unique).
process (Process (mandatory)) – the process we want to add. May be a string (‘module.process’), a process instance or a class.
do_not_export (list of str (optional)) – a list of plug names that we do not want to export.
make_optional (list of str (optional)) – a list of plug names that we do not want to export.
inputs_to_copy (list of str (optional)) – a list of item to copy.
inputs_to_clean (list of str (optional)) – a list of temporary items.
skip_invalid (bool) – if True, if the process is failing (cannot be instantiated), don’t throw an exception but instead don’t insert the node, and mark it as such in order to make add_link() to also silently do nothing. This option is useful for optional process nodes which may or may not be available depending on their dependencies, typically in a switch offering several alternative methods.
- add_processes_selection(selection_parameter, selection_groups, value=None)[source]¶
Add a processes selection switch definition to the pipeline.
Selectors are a “different” kind of switch: one pipeline node set in a group is enabled, the others are disabled.
The selector has 2 levels:
selection_parameter selects a group.
A group contains a set of nodes which will be activated together. Groups are mutually exclusive.
- Parameters:
selection_parameter (string (mandatory)) – name of the selector parameter: the parameter is added in the pipeline, and its value is the name of the selected group.
selection_groups (dict or OrderedDict) – nodes groups contained in the selector : {group_name: [Node names]}
value (str (optional)) – initial state of the selector (default: 1st group)
- add_switch(name, inputs, outputs, export_switch=True, make_optional=(), output_types=None, switch_value=None, opt_nodes=None)[source]¶
Add a switch node in the pipeline
- Parameters:
name (str (mandatory)) – name for the switch node (has to be unique)
inputs (list of str (mandatory)) – names for switch inputs. Switch activation will select amongst them. Inputs names will actually be a combination of input and output, in the shape “input_switch_output”. This behaviour is needed when there are several outputs, and thus several input groups.
export_switch (bool (optional)) – if True, export the switch trigger to the parent pipeline with
name
as parameter namemake_optional (sequence (optional)) – list of optional outputs. These outputs will be made optional in the switch output. By default they are mandatory.
output_types (sequence of traits (optional)) – If given, this sequence should have the same size as outputs. It will specify each switch output parameter type (as a standard trait). Input parameters for each input block will also have this type.
switch_value (str (optional)) – Initial value of the switch parameter (one of the inputs names). Defaults to 1st input.
opt_nodes (bool or list) – tells that switch values are node names, and some of them may be optional and missing. In such a case, missing nodes are not added as inputs. If a list is passed, then it is a list of node names which length should match the number of inputs, and which order tells nodes related to inputs (in case inputs names are not directly node names).
Examples
>>> pipeline.add_switch('group_switch', ['in1', 'in2'], ['out1', 'out2'])
will create a switch with 4 inputs and 2 outputs: inputs: “in1_switch_out1”, “in2_switch_out1”, “in1_switch_out2”, “in2_switch_out2” outputs: “out1”, “out2”
- add_trait(name, trait)[source]¶
Add a trait to the pipeline
- Parameters:
name (str (mandatory)) – the trait name
trait (trait instance (mandatory)) – the trait we want to add
- all_nodes()[source]¶
Iterate over all pipeline nodes including sub-pipeline nodes.
- Returns:
nodes – Iterates over all nodes
- Return type:
Generator of Node
- autoexport_nodes_parameters(include_optional=False)[source]¶
Automatically export nodes plugs to the pipeline.
Some parameters can be explicitly preserved from exportation if they are listed in the pipeline “do_not_export” variable (list or set).
- Parameters:
include_optional (bool (optional)) – If True (the default), optional plugs are not exported Exception: optional output plugs of switches are exported (otherwise they are useless). It should probably be any single output plug of a node.
- call_process_method(process_name, method, *args, **kwargs)[source]¶
Call a method of a process previously added with add_process or add_iterative_process.
- check_requirements(environment='global', message_list=None)[source]¶
Reimplementation for pipelines of
capsul.process.process.Process.check_requirements
A pipeline will return a list of unique configuration values.
- compare_to_state(pipeline_state)[source]¶
Returns the differences between this pipeline and a previously recorded state.
- Returns:
differences – each element is a human readable string explaining one difference (e.g. ‘node “my_process” is missing’)
- Return type:
- count_items()[source]¶
Count pipeline items to get its size.
- Returns:
items – (nodes_count, processes_count, plugs_count, params_count, links_count, enabled_nodes_count, enabled_procs_count, enabled_links_count)
- Return type:
- define_groups_as_steps(exclusive=True)[source]¶
Define parameters groups according to which steps they are connected to.
- Parameters:
exclusive (bool (optional)) – if True, a parameter is assigned to a single group, the first step it is connected to. If False, a parameter is assigned all steps groups it is connected to.
- define_pipeline_steps(steps)[source]¶
Define steps in the pipeline. Steps are pipeline portions that form groups, and which can be enabled or disabled on a runtime basis (when building workflows).
Once steps are defined, their activation may be accessed through the “step” trait, which has one boolean property for each step:
Ex:
steps = OrderedDict() steps['preprocessings'] = [ 'normalization', 'bias_correction', 'histo_analysis'] steps['brain_extraction'] = [ 'brain_segmentation', 'hemispheres_split'] pipeline.define_pipeline_steps(steps)
>>> print(pipeline.pipeline_steps.preprocessings) True
>>> pipeline.pipeline_steps.brain_extraction = False
See also add_pipeline_step()
- Parameters:
steps (dict or preferably OrderedDict or SortedDictionary (mandatory)) – The steps dict keys are steps names, the values are lists of nodes names forming the step.
- disabled_pipeline_steps_nodes()[source]¶
List nodes disabled for runtime execution
- Returns:
disabled_nodes – list of pipeline nodes (Node instances) which will not run in a workflow created from this pipeline state.
- Return type:
- enable_all_pipeline_steps()[source]¶
Set all defined steps (using add_step() or define_steps()) to be enabled. Useful to reset the pipeline state after it has been changed.
- export_parameter(node_name, plug_name, pipeline_parameter=None, weak_link=False, is_enabled=None, is_optional=None, allow_existing_plug=None)[source]¶
Export a node plug at the pipeline level.
- Parameters:
node_name (str (mandatory)) – the name of node containing the plug we want to export
plug_name (str (mandatory)) – the node plug name we want to export
pipeline_parameter (str (optional)) – the name to access this parameter at the pipeline level. Default None, the plug name is used
weak_link (bool (optional)) – this property is used when nodes are weak, FIXME: what does it exactly mean ? the plug information may not be generated.
is_enabled (bool (optional)) – a property to specify that it is not a user-parameter automatic generation)
is_optional (bool (optional)) – sets the exported parameter to be optional
allow_existing_plug (bool (optional)) – the same pipeline plug may be connected to several process plugs
- find_empty_parameters()[source]¶
Find internal File/Directory parameters not exported to the main input/output parameters of the pipeline with empty values. This is meant to track parameters which should be associated with temporary files internally.
- Returns:
Each element is a list with 3 values: node, parameter_name, optional
- Return type:
- get_processes_selection_groups(selection_parameter)[source]¶
Get groups names involved in a processes selection switch
- get_processes_selection_nodes(selection_parameter, group)[source]¶
Get nodes names involved in a processes selection switch with value group
- get_processes_selections()[source]¶
Get process_selection groups names (corresponding to selection parameters on the pipeline)
- install_links_debug_handler(log_file=None, handler=None, prefix='')[source]¶
Set callbacks when traits value change, and follow plugs links to debug links propagation and problems in it.
- Parameters:
log_file (str (optional)) – file-like object to write links propagation in. If none is specified, a temporary file will be created for it.
handler (function (optional)) – Callback to be processed for debugging. If none is specified, the default pipeline debugging function will be used. This default handler prints traits changes and links to be processed in the log_file. The handler function will receive a prefix string, a node, and traits parameters, namely the object (process) owning the changed value, the trait name and value in this object.
prefix (str (optional)) – prefix to be prepended to traits names, typically the parent pipeline full name
- Returns:
log_file
- Return type:
the file object where events will be written in
- parse_link(link, check=True)[source]¶
Parse a link coming from export_parameter method.
- Parameters:
- Returns:
output – tuple containing the link description and instances
- Return type:
Examples
>>> Pipeline.parse_link("node1.plug1->node2.plug2") "node1", "plug1", <instance node1>, <instance plug1>, "node2", "plug2", <instance node2>, <instance plug2>
For a pipeline node:
>>> Pipeline.parse_link("plug1->node2.plug2") "", "plug1", <instance pipeline>, <instance plug1>, "node2", "plug2", <instance node2>, <instance plug2>
- pipeline_definition()[source]¶
Define pipeline structure, nodes, sub-pipelines, switches, and links.
This method should be overloaded in subclasses, it does nothing in the base Pipeline class.
- pipeline_state()[source]¶
Return an object composed of basic Python objects that contains the whole structure and state of the pipeline. This object can be given to compare_to_state method in order to get the differences with a previously stored state. This is typically used in tests scripts.
- Returns:
pipeline_state – todo
- Return type:
dictionary
- propagate_metadata(node, param, metadata)[source]¶
Set metadata on a node parameter, and propagate these values to the connected plugs.
Typically needed to propagate the “forbid_completion” metadata to avoid manuyally set values to be overridden by completion.
node may be a Node instance or a node name
- remove_link(link)[source]¶
Remove a link between pipeline nodes
- Parameters:
link (str or list/tuple) – link description. Its shape should be: “node.output->other_node.input”. If no node is specified, the pipeline itself is assumed. Alternatively the link can be (source_node, source_plug_name, dest_node, dest_plug_name)
- remove_trait(name)[source]¶
Remove a trait to the pipeline
- Parameters:
name (str (mandatory)) – the trait name
- rename_node(old_node_name, new_node_name)[source]¶
Change the name of the selected node and updates the pipeline.
- reorder_traits(names)[source]¶
Reimplementation of
Controller
methodreorder_traits()
so that we also reorder the pipeline node plugs.
- set_study_config(study_config)[source]¶
Set a StudyConfig for the process. Note that it can only be done once: once a non-null StudyConfig has been assigned to the process, it should not change.
- uninstall_links_debug_handler()[source]¶
Remove links debugging callbacks set by install_links_debug_handler
- update_nodes_and_plugs_activation()[source]¶
Reset all nodes and plugs activations according to the current state of the pipeline (i.e. switch selection, nodes disabled, etc.). Activations are set according to the following rules.
- workflow_graph(remove_disabled_steps=True, remove_disabled_nodes=True)[source]¶
Generate a workflow graph
- Returns:
graph (topological_sort.Graph) – graph representation of the workflow from the current state of the pipeline
remove_disabled_steps (bool (optional)) – When set, disabled steps (and their children) will not be included in the workflow graph. Default: True
remove_disabled_nodes (bool (optional)) – When set, disabled nodes will not be included in the workflow graph. Default: True
- workflow_ordered_nodes(remove_disabled_steps=True)[source]¶
Generate a workflow: list of process node to execute
- Returns:
workflow_list (list of Process) – an ordered list of Processes to execute
remove_disabled_steps (bool (optional)) – When set, disabled steps (and their children) will not be included in the workflow graph. Default: True
capsul.pipeline.pipeline_construction submodule¶
This module is an internal machinery, the user needs not to know it and bother about it. In brief it provide helper functions to build a pipeline from an IO serialization.
Classes¶
PipelineConstructor
¶
ConstructedPipeline
¶
- class capsul.pipeline.pipeline_construction.ConstructedPipeline(autoexport_nodes_parameters=None, **kwargs)[source]¶
Base class of all pipelines created with PipelineConstructor. It redefines pipeline_definition in order to “replay”, at each instantiation, the method calls previously recorded with the PipelineConstructor.
Note
Type ‘ConstructedPipeline.help()’ for a full description of this process parameters.
Type ‘<ConstructedPipeline>.get_input_spec()’ for a full description of this process input trait types.
Type ‘<ConstructedPipeline>.get_output_spec()’ for a full description of this process output trait types.
Initialize the Pipeline class
- Parameters:
autoexport_nodes_parameters (bool) – if True (default) nodes containing pipeline plugs are automatically exported.
- class capsul.pipeline.pipeline_construction.PipelineConstructor(module, name)[source]¶
A PipelineConstructor is used to build a new Pipeline class and dynamically constructs its contents, for instances by adding processes nodes and creating links between parameters. This class must be used whenever one wants to create a pipeline from an external source (i.e. a source that is not an installed Python module). For instance, the creation of a pipeline from an XML representation uses a PipelineConstructor.
- `pipeline`
the constructed Pipeline class
- Type:
ConstructedPipeline derived class
Initialize a new empty ConstructedPipeline class that is ready to be customized by calling methods of the PipelineConstructor. When pipeline creation is done, the pipeline class can be accessed with the pipeline attribute.
- Parameters:
capsul.pipeline.pipeline_nodes submodule¶
Node classes for CAPSUL pipeline elements
Classes¶
Plug
¶
Node
¶
ProcessNode
¶
PipelineNode
¶
Switch
¶
OptionalOutputSwitch
¶
- class capsul.pipeline.pipeline_nodes.Node(pipeline, name, inputs, outputs)[source]¶
Basic Node structure of the pipeline that need to be tuned.
It is possible to define custom nodes inheriting Node. To be usable in all contexts (GUI construction, pipeline save / reload), custom nodes should define a few additional instance and class methods which will allow automatic systems (such as
get_node_instance()
) to reinstantiate and save them:- configure_controller(cls): classmethod
return a Controller instance which specifies parameters needed to build the node instance. Typically it may contain a parameters (plugs) list and other specifications.
- configured_controller(self): instance method:
on an instance, returns a Controller instance in the same shape as configure_controller above, but with values filled from the instance. This controller will allow saving parameters needed to instantiate again the node with the same state.
- build_node(cls, pipeline, name, conf_controller): class method
returns an instance of the node class, built using appropriate parameters (using configure_controller() or configured_controller() from another instance)
Generate a Node
- Parameters:
pipeline (Pipeline (mandatory)) – the pipeline object where the node is added
name (str (mandatory)) – the node name
inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)
outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)
- check_requirements(environment='global', message_list=None)[source]¶
Checks the process requirements against configuration settings values in the attached CapsulEngine. This makes use of the
requirements()
method and checks that there is one matching config value for each required module.- Parameters:
- Returns:
config – if None is returned, requirements are not met: the process cannot run. If a dict is returned, it corresponds to the matching config values. When no requirements are needed, an empty dict is returned. A pipeline, if its requirements are met will return a list of configuration values, because different nodes may require different config values.
- Return type:
- cleanup()[source]¶
cleanup before deletion
disconnects all plugs, remove internal and cyclic references
- disconnect(source_plug_name, dest_node, dest_plug_name, silent=False)[source]¶
disconnect linked plugs of two nodes
- get_connections_through(plug_name, single=False)[source]¶
If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].
- Parameters:
- Returns:
[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.
- Return type:
connected_plug; list of tuples
- get_missing_mandatory_parameters(exclude_links=False)[source]¶
Returns a list of parameters which are not optional, and which value is Undefined or None, or an empty string for a File or Directory parameter.
- Parameters:
exclude_links (bool) – if True, an empty parameter which has a link to another node will not be reported missing, since the execution will assign it a temporary value which will not prevent the pipeline from running.
- get_trait(trait_name)[source]¶
Return the desired trait
- Parameters:
trait_name (str (mandatory)) – a trait name
- Returns:
output – the trait named trait_name
- Return type:
trait
- is_job()[source]¶
if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.
- remove_callback_from_plug(plug_name, callback)[source]¶
Remove an event when a plug change
- Parameters:
plug_name (str (mandatory)) – a plug name
callback (@f (mandatory)) – a callback function
- requirements()[source]¶
Requirements needed to run the node. It is a dictionary which keys are config/settings modules and values are requests for them.
The default implementation returns an empty dict (no requirements), and should be overloaded by processes which actually have requirements.
Ex:
{'spm': 'version >= "12" and standalone == "True"')
- set_callback_on_plug(plug_name, callback)[source]¶
Add an event when a plug change
- Parameters:
plug_name (str (mandatory)) – a plug name
callback (@f (mandatory)) – a callback function
- class capsul.pipeline.pipeline_nodes.OptionalOutputSwitch(pipeline, name, input, output)[source]¶
A switch which activates or disables its input/output link according to the output value. If the output value is not None or Undefined, the link is active, otherwise it is inactive.
This kind of switch is meant to make a pipeline output optional, but still available for temporary files values inside the pipeline.
Ex:
A.output -> B.input
B.input is mandatory, but we want to make A.output available and optional in the pipeline outputs. If we directlty export A.output, then if the pipeline does not set a value, B.input will be empty and the pipeline run will fail.
Instead we can add an OptionalOutputSwitch between A.output and pipeline.output. If pipeline.output is set a valid value, then A.output and B.input will have the same valid value. If pipeline.output is left Undefined, then A.output and B.input will get a temporary value during the run.
Technically, the OptionalOutputSwitch is currently implemented as a specialized switch node with two inputs and one output, and thus follows the inputs naming rules. The first input is the defined one, and the second, hidden one, is named “_none”. As a consequence, its 1st input should be connected under the name “<input>_switch_<output> as in a standard switch. The “switch” input is hidden (not exported to the pipeline) and set automatically according to the output value. The implementation details may change in future versions.
Generate an OptionalOutputSwitch Node
Warning
The input plug name is built according to the following rule: <input>_switch_<output>
- class capsul.pipeline.pipeline_nodes.PipelineNode(pipeline, name, process, **kwargs)[source]¶
A special node to store the pipeline user-parameters
Generate a ProcessNode
- Parameters:
- get_connections_through(plug_name, single=False)[source]¶
If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].
- Parameters:
- Returns:
[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.
- Return type:
connected_plug; list of tuples
- class capsul.pipeline.pipeline_nodes.Plug(**kwargs)[source]¶
Overload of the traits in order to keep the pipeline memory.
- links_from¶
the predecessor plugs of this plug
- Type:
set (node_name, plug_name, node, plug, is_weak)
Generate a Plug, i.e. a trait with the memory of the pipeline adjacent nodes.
- class capsul.pipeline.pipeline_nodes.ProcessNode(pipeline, name, process, **kwargs)[source]¶
Process node.
- process¶
the process instance stored in the pipeline node
- Type:
process instance
Generate a ProcessNode
- Parameters:
- check_requirements(environment='global', message_list=None)[source]¶
Reimplementation of
capsul.pipeline.pipeline_nodes.Node.requirements()
for a ProcessNode. This one delegates to its underlying process (or pipeline).
- get_trait(trait_name)[source]¶
Return the desired trait
- Parameters:
trait_name (str (mandatory)) – a trait name
- Returns:
output – the trait named trait_name
- Return type:
trait
- is_job()[source]¶
if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.
- protect_parameter(plug_name, state=True)[source]¶
Protect the named parameter.
Protecting is not a real lock, it just marks the parameter a list of “protected” parameters. This is typically used to mark values that have been set manually by the user (using the ControllerWidget for instance) and that should not be later modified by automatic parameters tweaking (such as completion systems).
Protected parameters are listed in an additional trait, “protected_parameters”.
If the “state” parameter is False, then we will unprotect it (calling unprotect_parameter())
- remove_callback_from_plug(plug_name, callback)[source]¶
Remove an event when a plug change
- Parameters:
plug_name (str (mandatory)) – a plug name
callback (@f (mandatory)) – a callback function
- requirements()[source]¶
Requirements reimplementation for a process node. This node delegates to its underlying process. see
capsul.process.process.requirements()
- set_callback_on_plug(plug_name, callback)[source]¶
Add an event when a plug change
- Parameters:
plug_name (str (mandatory)) – a plug name
callback (@f (mandatory)) – a callback function
- class capsul.pipeline.pipeline_nodes.Switch(pipeline, name, inputs, outputs, make_optional=(), output_types=None)[source]¶
Switch node to select a specific Process.
A switch commutes a group of inputs to its outputs, according to its “switch” trait value. Each group may be typically linked to a different process. Processes not “selected” by the switch are disabled, if possible. Values are also propagated through inputs/outputs of the switch (see below).
Inputs / outputs:
Say the switch “my_switch” has 2 outputs, “param1” and “param2”. It will be connected to the outputs of 2 processing nodes, “node1” and “node2”, both having 2 outputs: node1.out1, node1.out2, node2.out1, node2.out2. The switch will thus have 4 entries, in 2 groups, named for instance “node1” and “node2”. The switch will link the outputs of node1 or node2 to its outputs. The switch inputs will be named as follows:
1st group: “node1_switch_param1”, “node1_switch_param2”
2nd group: “node2_switch_param1”, “node2_switch_param2”
When my_switch.switch value is “node1”, my_switch.node1_switch_param1 is connected to my_switch.param1 and my_switch.node1_switch_param2 is connected to my_switch.param2. The processing node node2 is disabled (unselected).
When my_switch.switch value is “node2”, my_switch.node2_switch_param1 is connected to my_switch.param1 and my_switch.node2_switch_param2 is connected to my_switch.param2. The processing node node1 is disabled (unselected).
Values propagation:
When a switch is activated (its switch parameter is changed), the outputs will reflect the selected inputs, which means their values will be the same as the corresponding inputs.
But in many cases, parameters values will be given from the output (if the switch output is one of the pipeline outputs, this one will be visible from the “outside world, not the switch inputs). In this case, values set as a switch input propagate to its inputs.
An exception is when a switch input is linked to the parent pipeline inputs: its value is also visible from “outside” and should not be set via output values via the switch. In this specific case, output values are not propagated to such inputs.
Notes
Switch is normally not instantiated directly, but from a pipeline
pipeline_definition
methodSee also
_switch_changed
,_anytrait_changed
,capsul.pipeline.pipeline.Pipeline.add_switch
,capsul.pipeline.pipeline.Pipeline.pipeline_definition
Generate a Switch Node
Warning
The input plug names are built according to the following rule: <input_name>_switch_<output_name>
- Parameters:
pipeline (Pipeline (mandatory)) – the pipeline object where the node is added
name (str (mandatory)) – the switch node name
inputs (list (mandatory)) – a list of options
outputs (list (mandatory)) – a list of output parameters
make_optional (sequence (optional)) – list of optional outputs. These outputs will be made optional in the switch output. By default they are mandatory.
output_types (sequence of traits (optional)) – If given, this sequence should have the same size as outputs. It will specify each switch output parameter type (as a standard trait). Input parameters for each input block will also have this type.
- connections()[source]¶
Returns the current internal connections between input and output plugs
- Returns:
connections – list of internal connections [(input_plug_name, output_plug_name), …]
- Return type:
- get_connections_through(plug_name, single=False)[source]¶
If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].
- Parameters:
- Returns:
[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.
- Return type:
connected_plug; list of tuples
- is_job()[source]¶
if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.
capsul.pipeline.pipeline_tools submodule¶
Miscellaneous pipeline handling utility functions
Functions¶
pipeline_node_colors()
¶
pipeline_link_color()
¶
dot_graph_from_pipeline()
¶
dot_graph_from_workflow()
¶
save_dot_graph()
¶
save_dot_image()
¶
disable_runtime_steps_with_existing_outputs()
¶
nodes_with_existing_outputs()
¶
nodes_with_missing_inputs()
¶
where_is_plug_value_from()
¶
dump_pipeline_state_as_dict()
¶
set_pipeline_state_from_dict()
¶
get_output_directories()
¶
create_output_directories()
¶
save_pipeline()
¶
load_pipeline_parameters()
¶
save_pipeline_parameters()
¶
is_node_enabled()
¶
- capsul.pipeline.pipeline_tools.create_output_directories(process)[source]¶
Create output directories for a process, pipeline or node.
- capsul.pipeline.pipeline_tools.disable_runtime_steps_with_existing_outputs(pipeline)[source]¶
Disable steps in a pipeline which outputs contain existing files. This disabling is the “runtime steps disabling” one (see
capsul.pipeline.Pipeline
), not the node disabling with activation propagation, so it doesn’t affect the actual pipeline state. The aim is to prevent overwriting files which have already been processed, and to allow downstream execution of the remaining steps of the pipeline.- Parameters:
pipeline (Pipeline (mandatory)) – pipeline to disable nodes in.
- capsul.pipeline.pipeline_tools.dot_graph_from_pipeline(pipeline, nodes_sizes={}, use_nodes_pos=False, include_io=True, enlarge_boxes=0.0)[source]¶
Build a graphviz/dot-compatible representation of the pipeline. The pipeline representation uses one link between two given boxes: parameters are not represented.
This is different from the workflow graph, as given by
capsul.pipeline.Pipeline.workflow_graph()
, in that the full graph is represented here, including disabled nodes.To build a workflow graph, see
dot_graph_from_workflow()
.- Parameters:
pipeline (Pipeline) – pipeline to convert to a dot graph
nodes_sizes (dict (optional)) – nodes sizes may be specified here, keys are node names, and values are tuples (width, height). Special “inputs” and “outputs” keys represent the global inputs/outputs blocks of the pipeline.
use_nodes_pos (bool (optional)) – if True, nodes positions in the pipeline.node_position variable will be used to force nodes positions in the graph.
include_io (bool (optional)) – If True, build a node for the pipeline inputs and a node for the pipeline outputs. If False, these nodes will not be generated, and their links ignored.
enlarge_boxes (float (optional)) – when nodes sizes are specified, enlarge them by this amount to produce bigger boxes
- Returns:
dot_graph – a (nodes, edges) tuple, where nodes is a list of node tuples (id, node_name, props) and edges is a dict, where keys are (source_node_id, dest_node_id), and values are tuples (props, active, weak). In both cases props is a dictionary of properties. This representation is simple and is meant to feed
save_dot_graph()
- Return type:
- capsul.pipeline.pipeline_tools.dot_graph_from_workflow(pipeline, nodes_sizes={}, use_nodes_pos=False, enlarge_boxes=0.0)[source]¶
Build a graphviz/dot-compatible representation of the pipeline workflow.
This is different from the pipeline graph, as obtained by:py:func:dot_graph_from_pipeline, since only used parts are visible here: switches and disabled branches are removed.
- Parameters:
pipeline (Pipeline) – pipeline to convert to a dot graph
nodes_sizes (dict (optional)) – nodes sizes may be specified here, keys are node names, and values are tuples (width, height). Special “inputs” and “outputs” keys represent the global inputs/outputs blocks of the pipeline.
use_nodes_pos (bool (optional)) – if True, nodes positions in the pipeline.node_position variable will be used to force nodes positions in the graph.
enlarge_boxes (float (optional)) – when nodes sizes are specified, enlarge them by this amount to produce bigger boxes
- Returns:
dot_graph – a (nodes, edges) tuple, where nodes is a list of node tuples (id, node_name, props) and edges is a dict, where keys are (source_node_id, dest_node_id), and values are tuples (props, active, weak). In both cases props is a dictionary of properties. This representation is simple and is meant to feed
save_dot_graph()
- Return type:
- capsul.pipeline.pipeline_tools.dump_pipeline_state_as_dict(pipeline)[source]¶
Get a pipeline state (parameters values, nodes activation, selected steps… in a dictionary.
The returned dict may contain sub-pipelines state also.
The dict may be saved, and used to restore a pipeline state, using
set_pipeline_state_from_dict()
.Note that
pipeline.export_to_dict
would almost do the job, but does not include the recursive aspect.
- capsul.pipeline.pipeline_tools.find_node(pipeline, node)[source]¶
Find the given node in the pipeline or a sub-pipeline
- Returns:
node_chain – list of node names in the pipeline going through sub-pipelines to the given node
- Return type:
- capsul.pipeline.pipeline_tools.find_plug_connection_destinations(plug, pipeline=None)[source]¶
A bit like
find_plug_connection_sources()
but the other way- Returns:
dest – [(node, param_name, parent_node), …]
- Return type:
- capsul.pipeline.pipeline_tools.find_plug_connection_sources(plug, pipeline=None)[source]¶
A bit like
where_is_plug_value_from()
but looks for all incoming connection sources- Returns:
sources – [(node, param_name, parent_node), …]
- Return type:
- capsul.pipeline.pipeline_tools.get_output_directories(process)[source]¶
Get output directories for a process, pipeline, or node
- Returns:
dirs (dict) – organized directories list: a dict with recursive nodes mapping. In each element, the “directories” key holds a directories names set, and “nodes” is a dict with sub-nodes (node_name, dict mapping, organized the same way)
flat_dirs (set) – set of all directories in the pipeline, as a flat set.
- capsul.pipeline.pipeline_tools.is_node_enabled(pipeline, node_name=None, node=None)[source]¶
Checks if the given node is enabled in the pipeline. It may be disabled if it has its
enabled
oractivated
properties set to False, or if it is part of a disabled step. The node may be given as a Node instance, or its name in the pipeline.
- capsul.pipeline.pipeline_tools.load_pipeline_parameters(filename, pipeline)[source]¶
Loading and setting pipeline parameters (inputs and outputs) from a Json file.
- capsul.pipeline.pipeline_tools.nodes_with_existing_outputs(pipeline, exclude_inactive=True, recursive=False, exclude_inputs=True)[source]¶
Checks nodes in a pipeline which outputs contain existing files on the filesystem. Such nodes, maybe, should not run again. Only nodes which actually produce outputs are selected this way (switches are skipped).
- Parameters:
pipeline (Pipeline (mandatory)) – pipeline to disable nodes in.
exclude_inactive (bool (optional)) – if this option is set, inactive nodes will not be checked nor returned in the list. Inactive means disabled, not active, or in a disabled runtime step. Default: True
recursive (bool (optional)) – if this option is set, sub-pipelines will not be returned as a whole but will be parsed recursively to select individual leaf nodes. Default: False
exclude_inputs (bool (optional, default: True)) – Some processes or pipelines have input/output files: files taken as inputs which are re-written as outputs, or may carry an input file to the outputs through a switch selection (in the case of preprocessing steps, for instance). If this option is set, such outputs which also appear in the same node inputs will not be listed in the existing outputs, so that they will not be erased by a cleaning operation, and will not prevent execution of these nodes.
- Returns:
selected_nodes – keys: node names values: list of pairs (param_name, file_name)
- Return type:
- capsul.pipeline.pipeline_tools.nodes_with_missing_inputs(pipeline, recursive=True)[source]¶
Checks nodes in a pipeline which inputs contain invalid inputs. Inputs which are files non-existing on the filesystem (so, which cannot run), or have missing mandatory inputs, or take as input a temporary file which should be the output from another disabled node, are recorded.
- Parameters:
pipeline (Pipeline (mandatory)) – pipeline to disable nodes in.
recursive (bool (optional)) – if this option is set, sub-pipelines will not be returned as a whole but will be parsed recursively to select individual leaf nodes. Note that if not set, a pipeline is regarded as a process, but pipelines may not use all their inputs/outputs so the result might be inaccurate. Default: True
- Returns:
selected_nodes – keys: node names values: list of pairs (param_name, file_name)
- Return type:
- capsul.pipeline.pipeline_tools.pipeline_link_color(plug, link)[source]¶
Link color and style for graphical display and graphviz graphs.
- Parameters:
plug (Plug) – the plug the link belong to
link (link tuple (5 values)) – link to color
- Returns:
link_props – (color, style, active, weak) where color is a RGB tuple of float values (between 0. and 1.), style is a string (“solid”, “dotted”), active and weak are booleans.
- Return type:
- capsul.pipeline.pipeline_tools.pipeline_node_colors(pipeline, node)[source]¶
Node color to display boxes in GUI and graphviz graphs. Depending on the node type (process, pipeline, switch) and its activation, colors will differ.
- capsul.pipeline.pipeline_tools.save_dot_graph(dot_graph, filename, **kwargs)[source]¶
Write a graphviz/dot input file, which can be used to generate an image representation of the graph, or to make dot automatically position nodes.
- Parameters:
dot_graph (dot graph) – representation of the pipeline, obatained using
dot_graph_from_pipeline()
filename (string) – file name to save the dot definition in
**kwargs (additional attributes for the dot graph) – like nodesep=0.1 or rankdir=”TB”
- capsul.pipeline.pipeline_tools.save_dot_image(pipeline, filename, nodes_sizes={}, use_nodes_pos=False, include_io=True, enlarge_boxes=0.0, workflow=False, format=None, **kwargs)[source]¶
Save a dot/graphviz image of the pipeline in a file.
It may use either the complete pipeline graph (with switches and disabled branches), or the workflow, hiding disabled parts (see the workflow parameter).
Basically combines
dot_graph_from_pipeline()
ordot_graph_from_workflow()
, andsave_dot_graph()
, then runs the dot command, which has to be installed and available on the system.- Parameters:
pipeline (Pipeline) – pipeline to convert to a dot graph
filename (string) – file name to save the dot definition in.
nodes_sizes (dict (optional)) – nodes sizes may be specified here, keys are node names, and values are tuples (width, height). Special “inputs” and “outputs” keys represent the global inputs/outputs blocks of the pipeline.
use_nodes_pos (bool (optional)) – if True, nodes positions in the pipeline.node_position variable will be used to force nodes positions in the graph.
include_io (bool (optional)) – If True, build a node for the pipeline inputs and a node for the pipeline outputs. If False, these nodes will not be generated, and their links ignored.
enlarge_boxes (float (optional)) – when nodes sizes are specified, enlarge them by this amount to produce bigger boxes
workflow (bool (optional)) – if True, the workflow corresponding to the current pipeline state will be used instead of the complete graph: disabled parts will be hidden.
format (string (optional)) –
dot output format (see dot command doc). If not specified, guessed from the file name extension.
**kwargs (additional attributes for the dot graph) – like nodesep=0.1 or rankdir=”TB”
- capsul.pipeline.pipeline_tools.save_pipeline(pipeline, file, format=None)[source]¶
Save the pipeline either in XML, JSON, or .py source file
- capsul.pipeline.pipeline_tools.save_pipeline_parameters(filename, pipeline)[source]¶
Saving pipeline parameters (inputs and outputs) to a Json file.
- capsul.pipeline.pipeline_tools.set_pipeline_state_from_dict(pipeline, state_dict)[source]¶
Set a pipeline (or process) state from a dict description.
State includes parameters values, nodes activation, steps selection etc. The state is generally taken using
dump_pipeline_state_as_dict()
.
- capsul.pipeline.pipeline_tools.where_is_plug_value_from(plug, recursive=True)[source]¶
Find where the given (input) plug takes its value from. It has to be the output of an uphill process, or be unconnected. Looking for it may involve ascending through switches or pipeline walls.
- Parameters:
plug (Plug instance (mandatory)) – the plug to find source connection with
recursive (bool (optional)) – if this option is set, sub-pipelines will not be returned as a whole but will be parsed recursively to select individual leaf nodes. Note that if not set, a pipeline is regarded as a process, but pipelines may not use all their inputs/outputs so the result might be inaccurate. Default: True
- Returns:
node (Node) – origin pipeline node. May be None if the plug was not connected to an active source.
param_name (string) – origin plug name in the origin node. May be None if node is None
parent (Node) – Top-level parent node of the origin node. Useful to determine if the origin node is in a runtime pipeline step, which only records top-level nodes.
- capsul.pipeline.pipeline_tools.write_fake_pipeline(pipeline, module_name, dirname, sleep_time=0)[source]¶
Write a “fake pipeline” with same class name, structure, and parameters as the input pipeline, but replacing its processes with “fake” processes which do not actually do a real job while executing.
This is meant for tests, to mimic a “real” pipeline structure without its dependencies.
:warning:`This function actually modifies the input pipeline, which is transformed into a fake one.`
capsul.pipeline.pipeline_workflow submodule¶
Capsul Pipeline conversion into soma-workflow workflow.
Standard use case:
workflow = workflow_from_pipeline(pipeline)
controller, wf_id = workflow_run(workflow_name, workflow, study_config)
Functions¶
workflow_from_pipeline()
¶
workflow_run()
¶
- capsul.pipeline.pipeline_workflow.workflow_from_pipeline(pipeline, study_config=None, disabled_nodes=None, jobs_priority=0, create_directories=True, environment='global', check_requirements=True, complete_parameters=False)[source]¶
Create a soma-workflow workflow from a Capsul Pipeline
- Parameters:
pipeline (Pipeline (mandatory)) – a CAPSUL pipeline
study_config (StudyConfig (optional), or dict) – holds information about file transfers and shared resource paths. If not specified, it will be accessed through the pipeline.
disabled_nodes (sequence of pipeline nodes (Node instances) (optional)) – such nodes will be disabled on-the-fly in the pipeline, file transfers will be adapted accordingly (outputs may become inputs in the resulting workflow), and temporary files will be checked. If a disabled node was to produce a temporary files which is still used in an enabled node, then a ValueError exception will be raised. If disabled_nodes is not passed, they will possibly be taken from the pipeline (if available) using disabled steps: see Pipeline.define_steps()
jobs_priority (int (optional, default: 0)) – set this priority on soma-workflow jobs.
create_directories (bool (optional, default: True)) – if set, needed output directories (which will contain output files) will be created in a first job, which all other ones depend on.
environment (str (default: "global")) – configuration environment name (default: “global”). See
capsul.engine.CapsulEngine
andcapsul.engine.settings.Settings
.check_requirements (bool (default: True)) – if True, check the pipeline nodes requirements in the capsul engine settings, and issue an exception if they are not met instead of proceeding with the workflow.
complete_parameters (bool (default: False)) – Perform parameters completion on the input pipeline while building the workflow. The default is False because we should avoid to do things several times when it’s already done, but in iteration nodes, completion needs to be done anyway for each iteration, so this option offers to do the rest of the “parent” pipeline completion.
- Returns:
workflow – a soma-workflow workflow
- Return type:
Workflow
- capsul.pipeline.pipeline_workflow.workflow_run(workflow_name, workflow, study_config)[source]¶
Create a soma-workflow controller and submit a workflow
- Parameters:
workflow_name (str (mandatory)) – the name of the workflow
workflow (Workflow (mandatory)) – the soma-workflow workflow
study_config (StudyConfig (mandatory)) – contains needed configuration through the SomaWorkflowConfig module
capsul.pipeline.process_iteration submodule¶
Utility class for iterated nodes in a pipeline. This is mainly internal infrastructure, which a normal programmer should not have to bother about.
A pipeline programmer will not instantiate ProcessIteration
directly, but rather use the Pipeline
method add_iterative_process()
.
Classes¶
ProcessIteration
¶
- class capsul.pipeline.process_iteration.ProcessIteration(process, iterative_parameters, study_config=None, context_name=None)[source]¶
Note
Type ‘ProcessIteration.help()’ for a full description of this process parameters.
Type ‘<ProcessIteration>.get_input_spec()’ for a full description of this process input trait types.
Type ‘<ProcessIteration>.get_output_spec()’ for a full description of this process output trait types.
Initialize the Process class.
capsul.pipeline.python_export submodule¶
Pipeline exportation function as a python source code file.
Functions¶
save_py_pipeline()
¶
capsul.pipeline.topological_sort submodule¶
Build graph with dependencies from a pipeline
Classes¶
GraphNode
¶
- class capsul.pipeline.topological_sort.Graph[source]¶
Simple Graph Structure on which we want to perform a topological tree (no cycle).
The algorithm is based on the R.E. Tarjanlinear linear optimization (O(N+A)).
Create a Graph
- add_node(node)[source]¶
Method to add a GraphNode in the Graph
- Parameters:
node (GraphNode (mandatory))
insert (the node to)
- find_node(node_name)[source]¶
Method to find a GraphNode in the Graph
- Parameters:
node_name (str (mandatory))
node (the name of the desired)
- topological_sort()[source]¶
Perform the topological sort: find an order in which all the nodes can be taken. Step 1: Identify nodes that have no incoming link (nnil). Step 2: Loop until there are nnil a) Delete the current nodes c_nnil of in-degree 0. b) Place it in the output. c) Remove all its outgoing links from the graph. d) If the node has in-degree 0, add the node to nnil. Step 3: Assert that there is no loop in the graph.
capsul.pipeline.xml submodule¶
XML IO for pipelines
Functions¶
create_xml_pipeline()
¶
save_xml_pipeline()
¶
capsul.pipeline.custom_nodes submodule¶
This module is a placeholder for custom nodes types. Custom nodes can be used as pipeline nodes, do not represent a process (a physical job in a computing resource), but can help setting things up in a pipeline structure or parameters.
capsul.pipeline.custom_nodes.strcat_node¶
StrCatNode
¶
- class capsul.pipeline.custom_nodes.strcat_node.StrCatNode(pipeline, name, params, concat_plug, outputs, make_optional=(), param_types={})[source]¶
This “inert” node concatenates its inputs (as strings) and generates the concatenation on one of its plugs. All plugs may be inputs or outputs.
- Parameters:
pipeline (Pipeline) – pipeline which will hold the node
name (str) – node name
params (list) – names of parameters to be concatenated
concat_plug (str) – name of the concatenated plug (should not be part of params)
outputs (list) – list of parameters names which are outputs. May include elements from params, and/or concat_plug
make_optional (list) – list of plug names which should be optional.
param_types (dict) – parameters types dict: {param_name: trait_type_as_string}
capsul.pipeline.custom_nodes.strconv¶
StrConvNode
¶
- class capsul.pipeline.custom_nodes.strconv.StrConvNode(pipeline, name, input_type=None)[source]¶
This “inert” node converts the input into a string.
Generate a Node
- Parameters:
pipeline (Pipeline (mandatory)) – the pipeline object where the node is added
name (str (mandatory)) – the node name
inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)
outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)
- get_connections_through(plug_name, single=False)[source]¶
If the node has internal links (inside a pipeline, or in a switch or other custom connection node), return the “other side” of the internal connection to the selected plug. The returned plug may be in an internal node (in a pipeline), or in an external node connected to the node. When the node is “opaque” (no internal connections), it returns the input plug. When the node is inactive / disabled, it returns [].
- Parameters:
- Returns:
[(node, plug_name, plug), …] Returns [(self, plug_name, plug)] when the plug has no internal connection.
- Return type:
connected_plug; list of tuples
- is_job()[source]¶
if True, the node will be represented as a Job in Soma-Workflow. Otherwise the node is static and does not run.
capsul.pipeline.custom_nodes.cv_node¶
CrossValidationFoldNode
¶
- class capsul.pipeline.custom_nodes.cv_node.CrossValidationFoldNode(pipeline, name, input_type=None)[source]¶
This “inert” node filters a list to separate it into (typically) learn and test sublists.
The “outputs” are “train” and “test” output traits.
Generate a Node
- Parameters:
pipeline (Pipeline (mandatory)) – the pipeline object where the node is added
name (str (mandatory)) – the node name
inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)
outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)
capsul.pipeline.custom_nodes.loo_node¶
LeaveOneOutNode
¶
- class capsul.pipeline.custom_nodes.loo_node.LeaveOneOutNode(pipeline, name, is_output=True, input_type=None, test_is_output=True, has_index=True)[source]¶
This “inert” node excludes one input from the list of inputs, to allow leave-one-out applications. The “outputs” may be either an output trait (to serve as inputs to other nodes), or an input trait (to assign output values to other nodes).
Generate a Node
- Parameters:
pipeline (Pipeline (mandatory)) – the pipeline object where the node is added
name (str (mandatory)) – the node name
inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)
outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)
capsul.pipeline.custom_nodes.map_node¶
MapNode
¶
- class capsul.pipeline.custom_nodes.map_node.MapNode(pipeline, name, input_names=['inputs'], output_names=['output_%d'], input_types=None)[source]¶
This node converts lists into series of single items. Typically an input named
inputs
is a list of items. The node will separate items and output each of them as an output parameter. The i-th item will be output asoutput_<i>
by default. The inputs / outputs names can be customized using the constructor parametersinput_names
andoutput_names
. Several lists can be split in the same node. The node will also output alengths
parameter which will contain the input lists lengths. This lengths can typically be input in reduce nodes to perform the reverse operation (seeReduceNode
).input_names
is a list of input parameters names, each being a list to be split. The default is['inputs']
output_names
is a list of patterns used to build output parameters names. Each item is a string containing a substitution pattern"%d"
which will be replaced with a number. The default is['output_%d']
. Each pattern will be used to replace items from the corresponding input in the same order. Thusinput_names
andoutput_names
should be the same length.
Generate a Node
- Parameters:
pipeline (Pipeline (mandatory)) – the pipeline object where the node is added
name (str (mandatory)) – the node name
inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)
outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)
capsul.pipeline.custom_nodes.reduce_node¶
ReduceNode
¶
- class capsul.pipeline.custom_nodes.reduce_node.ReduceNode(pipeline, name, input_names=['input_%d'], output_names=['outputs'], input_types=None)[source]¶
Reduce node: converts series of inputs into lists. Typically a series of inputs named
input_0
..input_<n>
will be output as a single list namedoutputs
.Several input series can be handled by the node, and input names can be customized.
The numbers of inputs for each series is given as the
lengths
input parameter. It is typically linked from the output of aMapNode
.Input parameters names patterns are given as the
input_names
parameter. It is a list of patterns, each containing a"%d"
pattern for the input number. The default value is['input_%d']
.Output parameters names are given as the
output_names
parameter. The default is['outputs']
.
Generate a Node
- Parameters:
pipeline (Pipeline (mandatory)) – the pipeline object where the node is added
name (str (mandatory)) – the node name
inputs (list of dict (mandatory)) – a list of input parameters containing a dictionary with default values (mandatory key: name)
outputs (dict (mandatory)) – a list of output parameters containing a dictionary with default values (mandatory key: name)