populse_mia.data_manager.data_history_inspect

This module is dedicated to pipeline history.

Contains:
Class:
  • ProtoProcess: A lightweight convenience class that stores a brick

    database entry along with additional usage information.

Functions:
  • brick_to_process: Convert a brick database entry into a

    ‘fake process’.

  • data_history_pipeline: Retrieves the complete processing history of

    a file in the database.

  • data_in_value: Determine if the specified filename is present within

    the given value.

  • find_procs_with_output: Identify processes that have the specified

    filename as part of their outputs.

  • get_data_history: Retrieves the processing history for a given data

    file

  • get_data_history_bricks: Retrieves the complete “useful” history of

    a file in the database.

  • get_data_history_processes: Retrieves the complete “useful”

    processing history of a file in the database.

  • get_direct_proc_ancestors: Retrieve processing bricks referenced in

    the direct filename history.

  • get_filenames_in_value: Extract filenames from a nested structure of

    lists, tuples, and dictionaries.

  • get_history_brick_process: Retrieve a brick from the database and

    return it as ProtoProcess instance.

  • get_proc_ancestors_via_tmp: Retrieve upstream processes connected

    via a temporary value (“<temp>”).

  • is_data_entry: Determine if the given filename is a valid database

    entry.

Functions

brick_to_process(brick, project)

Convert a brick database entry into a 'fake process'.

data_history_pipeline(filename, project)

Retrieves the complete processing history of a file in the database, formatted as a "fake pipeline".

data_in_value(value, filename, project)

Determine if the specified filename is present within the given value.

find_procs_with_output(procs, filename, project)

Identify processes in the given list that have the specified filename as

get_data_history(filename, project)

Retrieves the processing history for a given data file, based on get_data_history_processes().

get_data_history_bricks(filename, project)

Retrieves the complete "useful" history of a file in the database as a set of processing bricks.

get_data_history_processes(filename, project)

Retrieves the complete "useful" processing history of a file in the database.

get_direct_proc_ancestors(filename, project, ...)

Retrieve processing bricks referenced in the direct filename history.

get_filenames_in_value(value, project[, ...])

Extract filenames from a nested structure of lists, tuples, and dictionaries.

get_history_brick_process(brick_id, project)

Retrieve a brick from the database using its UUID and return it as a ProtoProcess instance.

get_proc_ancestors_via_tmp(proc, project, procs)

Retrieve upstream processes connected via a temporary value ("<temp>").

is_data_entry(filename, project[, allow_temp])

Determine if the given filename is a valid database entry within the specified project.

Classes

ProtoProcess([brick])

A lightweight convenience class that stores a brick database entry along with additional usage information.

class populse_mia.data_manager.data_history_inspect.Pipeline(autoexport_nodes_parameters=None, **kwargs)[source]

Bases: Process

Pipeline containing Process nodes, and links between node parameters.

A Pipeline is normally subclassed, and its pipeline_definition() method is overloaded to define its nodes and links. pipeline_definition() will be called by the pipeline constructor.

from capsul.pipeline import Pipeline

class MyPipeline(Pipeline):

  def pipeline_definition(self):
      self.add_process('proc1', 'my_toolbox.my_process1')
      self.add_process('proc2', 'my_toolbox.my_process2')
      self.add_switch('main_switch', ['in1', 'in2'], ['out1', 'out2'])
      self.add_link('proc1.out1->main_switch.in1_switch_out1')
      self.add_link('proc1.out2->main_switch.in1_switch_out2')
      self.add_link('proc2.out1->main_switch.in2_switch_out1')
      self.add_link('proc2.out1->main_switch.in2_switch_out2')

After execution of pipeline_definition(), the inner nodes parameters which are not connected will be automatically exported to the parent pipeline, with names prefixed with their process name, unless they are listed in a special “do_not_export” list (passed to add_process() or stored in the pipeline instance).

>>> pipeline = MyPipeline()
>>> print(pipeline.proc1_input)
<undefined>

Nodes

A pipeline is made of nodes, and links between their parameters. Several types of nodes may be part of a pipeline:

  • process nodes (pipeline_nodes.ProcessNode) are the leaf nodes which represent actual processing bricks.

  • pipeline nodes (pipeline_nodes.PipelineNode) are sub-pipelines which allow to reuse an existing pipeline within another one

  • switch nodes (pipeline_nodes.Switch) allows to select values between several possible inputs. The switch mechanism also allows to select between several alternative processes or processing branches.

  • iterative process (:py:class:process_iteration.ProcessIteration`) represent parallel processing of the same pipeline on a set of parameters.

Note that you normally do not instantiate these nodes explicitly when building a pipeline. Rather programmers may call the add_process(), add_switch(), add_iterative_process() methods.

Nodes activation

Pipeline nodes may be enabled or disabled. Disabling a node will trigger a global pipeline nodes activation step, where all nodes which do not form a complete chain will be inactive. This way a branch may be disabled by disabling one of its nodes. This process is used by the switch system, which allows to select one processing branch between several, and disables the unselected ones.

Pipeline steps

Pipelines may define execution steps: they are user-oriented groups of nodes that are to be run together, or disabled together for runtime execution. They are intended to allow partial, or step-by-step execution. They do not work like the nodes enabling mechanism described above.

Steps may be defined within the pipeline_definition() method. See add_pipeline_step().

Note also that pipeline steps only act at the highest level: if a sub-pipeline has disabled steps, they will not be taken into account in the higher level pipeline execution, because executing by steps a part of a sub-pipeline within the context of a higher one does generally not make sense.

Main methods

  • pipeline_definition()

  • add_process()

  • add_switch()

  • add_custom_node()

  • add_iterative_process()

  • add_optional_output_switch()

  • add_processes_selection()

  • add_link()

  • remove_link()

  • export_parameter()

  • autoexport_nodes_parameters()

  • add_pipeline_step()

  • define_pipeline_steps()

  • define_groups_as_steps()

  • remove_pipeline_step()

  • enable_all_pipeline_steps()

  • disabled_pipeline_steps_nodes()

  • get_pipeline_step_nodes()

  • find_empty_parameters()

  • count_items()

nodes

a dictionary containing the pipeline nodes and where the pipeline node name is ‘’

Type:

dict {node_name: node}

workflow_list

a list of ordered nodes that can be executed

Type:

list

workflow_repr

a string representation of the workflow list <node_i>-><node_i+1>

Type:

str

Note

  • Type ‘Pipeline.help()’ for a full description of this process parameters.

  • Type ‘<Pipeline>.get_input_spec()’ for a full description of this process input trait types.

  • Type ‘<Pipeline>.get_output_spec()’ for a full description of this process output trait types.

_doc_path = 'api/pipeline.html#pipeline'
do_autoexport_nodes_parameters = True
hide_nodes_activation = True
__init__(autoexport_nodes_parameters=None, **kwargs)[source]

Initialize the Pipeline class

Parameters:

autoexport_nodes_parameters (bool) – if True (default) nodes containing pipeline plugs are automatically exported.

pipeline_definition()[source]

Define pipeline structure, nodes, sub-pipelines, switches, and links.

This method should be overloaded in subclasses, it does nothing in the base Pipeline class.

autoexport_nodes_parameters(include_optional=False)[source]

Automatically export nodes plugs to the pipeline.

Some parameters can be explicitly preserved from exportation if they are listed in the pipeline “do_not_export” variable (list or set).

Parameters:

include_optional (bool (optional)) – If True (the default), optional plugs are not exported Exception: optional output plugs of switches are exported (otherwise they are useless). It should probably be any single output plug of a node.

add_trait(name, trait)[source]

Add a trait to the pipeline

Parameters:
  • name (str (mandatory)) – the trait name

  • trait (trait instance (mandatory)) – the trait we want to add

remove_trait(name)[source]

Remove a trait to the pipeline

Parameters:

name (str (mandatory)) – the trait name

reorder_traits(names)[source]

Reimplementation of Controller method reorder_traits() so that we also reorder the pipeline node plugs.

_make_subprocess_context_name(name)[source]

build full contextual name on process instance

_set_subprocess_context_name(process, name)[source]

set full contextual name on process instance

add_process(name, process, do_not_export=None, make_optional=None, inputs_to_copy=None, inputs_to_clean=None, skip_invalid=False, **kwargs)[source]

Add a new node in the pipeline

Note about invalid nodes:

A pipeline can typically offer alternatives (through a switch) to different algorithmic nodes, which may have different dependencies, or may be provided through external modules, thus can be missing. To handle this, Capsul can be telled that a process node can be invalid (or missing) without otherwise interfering the rest of the pipeline. This is done using the “skip_invalid” option. When used, the process to be added is tested, and if its instantiation fails, it will not be added in the pipeline, but will not trigger an error. Instead the missing node will be marked as “allowed invalid”, and links and exports built using this node will silently do nothing. thus the pipeline will work normally, without the invalid node.

Such nodes are generally gathered through a switch mechanism. However the switch inputs should be restricted to actually available nodes. The recommended method is to check that nodes have actually been added in the pipeline. Then links can be made normally as if the nodes were all present:

self.add_process('method1', 'module1.Module1', skip_invalid=True)
self.add_process('method2', 'module2.Module2', skip_invalid=True)
self.add_process('method3', 'module3.Module3', skip_invalid=True)

input_params = [n for n in ['method1', 'method2', 'method3']
                if n in self.nodes]
self.add_switch('select_method', input_params, 'output')

self.add_link('method1.input->select_method.method1_switch_output')
self.add_link('method2.input->select_method.method2_switch_output')
self.add_link('method3.input->select_method.method3_switch_output')

A last note about invalid nodes:

When saving a pipeline (through the graphical editor typically), missing nodes will not be saved because they are not actually in the pipeline. So be careful to save only pipelines with full features.

Parameters:
  • name (str (mandatory)) – the node name (has to be unique).

  • process (Process (mandatory)) – the process we want to add. May be a string (‘module.process’), a process instance or a class.

  • do_not_export (list of str (optional)) – a list of plug names that we do not want to export.

  • make_optional (list of str (optional)) – a list of plug names that we do not want to export.

  • inputs_to_copy (list of str (optional)) – a list of item to copy.

  • inputs_to_clean (list of str (optional)) – a list of temporary items.

  • skip_invalid (bool) – if True, if the process is failing (cannot be instantiated), don’t throw an exception but instead don’t insert the node, and mark it as such in order to make add_link() to also silently do nothing. This option is useful for optional process nodes which may or may not be available depending on their dependencies, typically in a switch offering several alternative methods.

remove_node(node_name)[source]

Remove a node from the pipeline

add_iterative_process(name, process, iterative_plugs=None, do_not_export=None, make_optional=None, inputs_to_copy=None, inputs_to_clean=None, **kwargs)[source]

Add a new iterative node in the pipeline.

Parameters:
  • name (str (mandatory)) – the node name (has to be unique).

  • process (Process or str (mandatory)) – the process we want to add.

  • iterative_plugs (list of str (optional)) – a list of plug names on which we want to iterate. If None, all plugs of the process will be iterated.

  • do_not_export (list of str (optional)) – a list of plug names that we do not want to export.

  • make_optional (list of str (optional)) – a list of plug names that we do not want to export.

  • inputs_to_copy (list of str (optional)) – a list of item to copy.

  • inputs_to_clean (list of str (optional)) – a list of temporary items.

call_process_method(process_name, method, *args, **kwargs)[source]

Call a method of a process previously added with add_process or add_iterative_process.

Parameters:
  • process_name (str (mandatory)) – name given to the process node.

  • method (str (mandatory)) – name of the method to call.

add_switch(name, inputs, outputs, export_switch=True, make_optional=(), output_types=None, switch_value=None, opt_nodes=None)[source]

Add a switch node in the pipeline

Parameters:
  • name (str (mandatory)) – name for the switch node (has to be unique)

  • inputs (list of str (mandatory)) – names for switch inputs. Switch activation will select amongst them. Inputs names will actually be a combination of input and output, in the shape “input_switch_output”. This behaviour is needed when there are several outputs, and thus several input groups.

  • outputs (list of str (mandatory)) – names for outputs.

  • export_switch (bool (optional)) – if True, export the switch trigger to the parent pipeline with name as parameter name

  • make_optional (sequence (optional)) – list of optional outputs. These outputs will be made optional in the switch output. By default they are mandatory.

  • output_types (sequence of traits (optional)) – If given, this sequence should have the same size as outputs. It will specify each switch output parameter type (as a standard trait). Input parameters for each input block will also have this type.

  • switch_value (str (optional)) – Initial value of the switch parameter (one of the inputs names). Defaults to 1st input.

  • opt_nodes (bool or list) – tells that switch values are node names, and some of them may be optional and missing. In such a case, missing nodes are not added as inputs. If a list is passed, then it is a list of node names which length should match the number of inputs, and which order tells nodes related to inputs (in case inputs names are not directly node names).

Examples

>>> pipeline.add_switch('group_switch', ['in1', 'in2'],
                        ['out1', 'out2'])

will create a switch with 4 inputs and 2 outputs: inputs: “in1_switch_out1”, “in2_switch_out1”, “in1_switch_out2”, “in2_switch_out2” outputs: “out1”, “out2”

add_optional_output_switch(name, input, output=None)[source]

Add an optional output switch node in the pipeline

An optional switch activates or disables its input/output link according to the output value. If the output value is not None or Undefined, the link is active, otherwise it is inactive.

This kind of switch is meant to make a pipeline output optional, but still available for temporary files values inside the pipeline.

Ex:

A.output -> B.input

B.input is mandatory, but we want to make A.output available and optional in the pipeline outputs. If we directlty export A.output, then if the pipeline does not set a value, B.input will be empty and the pipeline run will fail.

Instead we can add an OptionalOutputSwitch between A.output and pipeline.output. If pipeline.output is set a valid value, then A.output and B.input will have the same valid value. If pipeline.output is left Undefined, then A.output and B.input will get a temporary value during the run.

Add an optional output switch node in the pipeline

Parameters:
  • name (str (mandatory)) – name for the switch node (has to be unique)

  • input (str (mandatory)) – name for switch input. Switch activation will select between it and a hidden input, “_none”. Inputs names will actually be a combination of input and output, in the shape “input_switch_output”.

  • output (str (optional)) – name for output. Default is the switch name

Examples

>>> pipeline.add_optional_output_switch('out1', 'in1')
>>> pipeline.add_link('node1.output->out1.in1_switch_out1')

See also

capsul.pipeline.pipeline_nodes.OptionalOutputSwitch

add_custom_node(name, node_type, parameters=None, make_optional=(), do_not_export=None, **kwargs)[source]

Inserts a custom node (Node subclass instance which is not a Process) in the pipeline.

Parameters:
  • node_type (str or Node subclass or Node instance) – node type to be built. Either a class (Node subclass) or a Node instance (the node will be re-instantiated), or a string describing a module and class.

  • parameters (dict or Controller or None) – configuration dict or Controller defining parameters needed to build the node. The controller should be obtained using the node class’s configure_node() static method, then filled with the desired values. If not given the node is supposed to be built with no parameters, which will not work for every node type.

  • make_optional (list or tuple) – parameters names to be made optional

  • do_not_export (list of str (optional)) – a list of plug names that we do not want to export.

  • kwargs (default values of node parameters)

Parse a link coming from export_parameter method.

Parameters:
  • link (str) – the link description of the form ‘node_from.plug_name->node_to.plug_name’

  • check (bool) – if True, check that the node and plug exist

Returns:

output – tuple containing the link description and instances

Return type:

tuple

Examples

>>> Pipeline.parse_link("node1.plug1->node2.plug2")
"node1", "plug1", <instance node1>, <instance plug1>,
"node2", "plug2", <instance node2>, <instance plug2>

For a pipeline node:

>>> Pipeline.parse_link("plug1->node2.plug2")
"", "plug1", <instance pipeline>, <instance plug1>,
"node2", "plug2", <instance node2>, <instance plug2>
parse_parameter(name, check=True)[source]

Parse parameter of a node from its description.

Parameters:
  • name (str) – the description plug we want to load ‘node.plug’

  • check (bool) – if True, check that the node and plug exist

Returns:

output – tuple containing the plug description and instances

Return type:

tuple

Add a link between pipeline nodes.

If the destination node is a switch, force the source plug to be not optional.

Parameters:
  • link (str or list/tuple) – link description. Its shape should be: “node.output->other_node.input”. If no node is specified, the pipeline itself is assumed. Alternatively the link can be (source_node, source_plug_name, dest_node, dest_plug_name)

  • weak_link (bool) – this property is used when nodes are optional, the plug information may not be generated.

  • allow_export (bool) – if True, if the link links from/to the pipeline node with a plug name which doesn’t exist, the plug will be created, and the function will act exactly like export_parameter. This may be a more convenient way of exporting/connecting pipeline plugs to several nodes without having to export the first one, then link the others.

  • value (any) – if given, set this value instead of the source plug value

Remove a link between pipeline nodes

Parameters:

link (str or list/tuple) – link description. Its shape should be: “node.output->other_node.input”. If no node is specified, the pipeline itself is assumed. Alternatively the link can be (source_node, source_plug_name, dest_node, dest_plug_name)

export_parameter(node_name, plug_name, pipeline_parameter=None, weak_link=False, is_enabled=None, is_optional=None, allow_existing_plug=None)[source]

Export a node plug at the pipeline level.

Parameters:
  • node_name (str (mandatory)) – the name of node containing the plug we want to export

  • plug_name (str (mandatory)) – the node plug name we want to export

  • pipeline_parameter (str (optional)) – the name to access this parameter at the pipeline level. Default None, the plug name is used

  • weak_link (bool (optional)) – this property is used when nodes are weak, FIXME: what does it exactly mean ? the plug information may not be generated.

  • is_enabled (bool (optional)) – a property to specify that it is not a user-parameter automatic generation)

  • is_optional (bool (optional)) – sets the exported parameter to be optional

  • allow_existing_plug (bool (optional)) – the same pipeline plug may be connected to several process plugs

_set_node_enabled(node_name, is_enabled)[source]

Method to enable or disabled a node

Parameters:
  • node_name (str (mandatory)) – the node name

  • is_enabled (bool (mandatory)) – the desired property

propagate_metadata(node, param, metadata)[source]

Set metadata on a node parameter, and propagate these values to the connected plugs.

Typically needed to propagate the “forbid_completion” metadata to avoid manuyally set values to be overridden by completion.

node may be a Node instance or a node name

all_nodes()[source]

Iterate over all pipeline nodes including sub-pipeline nodes.

Returns:

nodes – Iterates over all nodes

Return type:

Generator of Node

_check_local_node_activation(node)[source]

Try to activate a node and its plugs according to its state and the state of its direct neighbouring nodes.

Parameters:

node (Node (mandatory)) – node to check

Returns:

plugs_activated – list of (plug_name,plug) containing all plugs that have been activated

Return type:

list

_check_local_node_deactivation(node)[source]

Check plugs that have to be deactivated according to node activation state and to the state of its direct neighbouring nodes.

Parameters:

node (Node (mandatory)) – node to check

Returns:

plugs_deactivated – list of (plug_name,plug) containing all plugs that have been deactivated

Return type:

list

delay_update_nodes_and_plugs_activation()[source]
restore_update_nodes_and_plugs_activation()[source]
update_nodes_and_plugs_activation()[source]

Reset all nodes and plugs activations according to the current state of the pipeline (i.e. switch selection, nodes disabled, etc.). Activations are set according to the following rules.

workflow_graph(remove_disabled_steps=True, remove_disabled_nodes=True)[source]

Generate a workflow graph

Returns:

  • graph (topological_sort.Graph) – graph representation of the workflow from the current state of the pipeline

  • remove_disabled_steps (bool (optional)) – When set, disabled steps (and their children) will not be included in the workflow graph. Default: True

  • remove_disabled_nodes (bool (optional)) – When set, disabled nodes will not be included in the workflow graph. Default: True

workflow_ordered_nodes(remove_disabled_steps=True)[source]

Generate a workflow: list of process node to execute

Returns:

  • workflow_list (list of Process) – an ordered list of Processes to execute

  • remove_disabled_steps (bool (optional)) – When set, disabled steps (and their children) will not be included in the workflow graph. Default: True

_check_temporary_files_for_node(node, temp_files)[source]

Check temporary outputs and allocate files for them.

Temporary files or directories will be appended to the temp_files list, and the node parameters will be set to temp file names.

This internal function is called by the sequential execution, _run_process() (also used through __call__()). The pipeline state will be restored at the end of execution using _free_temporary_files().

Parameters:
  • node (Node) – node to check temporary outputs on

  • temp_files (list) – list of temporary files for the pipeline execution. The list will be modified (completed).

_free_temporary_files(temp_files)[source]

Delete and reset temp files after the pipeline execution.

This internal function is called at the end of _run_process() (sequential execution)

_run_process()[source]

Pipeline execution is managed by StudyConfig class. This method must not be called.

find_empty_parameters()[source]

Find internal File/Directory parameters not exported to the main input/output parameters of the pipeline with empty values. This is meant to track parameters which should be associated with temporary files internally.

Returns:

Each element is a list with 3 values: node, parameter_name, optional

Return type:

list

count_items()[source]

Count pipeline items to get its size.

Returns:

items – (nodes_count, processes_count, plugs_count, params_count, links_count, enabled_nodes_count, enabled_procs_count, enabled_links_count)

Return type:

tuple

pipeline_state()[source]

Return an object composed of basic Python objects that contains the whole structure and state of the pipeline. This object can be given to compare_to_state method in order to get the differences with a previously stored state. This is typically used in tests scripts.

Returns:

pipeline_state – todo

Return type:

dictionary

compare_to_state(pipeline_state)[source]

Returns the differences between this pipeline and a previously recorded state.

Returns:

differences – each element is a human readable string explaining one difference (e.g. ‘node “my_process” is missing’)

Return type:

list

Set callbacks when traits value change, and follow plugs links to debug links propagation and problems in it.

Parameters:
  • log_file (str (optional)) – file-like object to write links propagation in. If none is specified, a temporary file will be created for it.

  • handler (function (optional)) – Callback to be processed for debugging. If none is specified, the default pipeline debugging function will be used. This default handler prints traits changes and links to be processed in the log_file. The handler function will receive a prefix string, a node, and traits parameters, namely the object (process) owning the changed value, the trait name and value in this object.

  • prefix (str (optional)) – prefix to be prepended to traits names, typically the parent pipeline full name

Returns:

log_file

Return type:

the file object where events will be written in

Remove links debugging callbacks set by install_links_debug_handler

define_pipeline_steps(steps)[source]

Define steps in the pipeline. Steps are pipeline portions that form groups, and which can be enabled or disabled on a runtime basis (when building workflows).

Once steps are defined, their activation may be accessed through the “step” trait, which has one boolean property for each step:

Ex:

steps = OrderedDict()
steps['preprocessings'] = [
    'normalization',
    'bias_correction',
    'histo_analysis']
steps['brain_extraction'] = [
    'brain_segmentation',
    'hemispheres_split']
pipeline.define_pipeline_steps(steps)
>>> print(pipeline.pipeline_steps.preprocessings)
True
>>> pipeline.pipeline_steps.brain_extraction = False

See also add_pipeline_step()

Parameters:

steps (dict or preferably OrderedDict or SortedDictionary (mandatory)) – The steps dict keys are steps names, the values are lists of nodes names forming the step.

add_pipeline_step(step_name, nodes, enabled=True)[source]

Add a step definition to the pipeline (see also define_steps).

Steps are groups of pipeline nodes, which may be disabled at runtime. They are normally defined in a logical order regarding the workflow streams. They are different from pipelines in that steps are purely virtual groups, they do not have parameters.

Disabling a step acts differently as the pipeline node activation: other nodes are not inactivated according to their dependencies. Instead, those steps are not run.

Parameters:
  • step_name (string (mandatory)) – name of the new step

  • nodes (list or sequence) – nodes contained in the step (Node instances)

  • enabled (bool (optional)) – initial state of the step

remove_pipeline_step(step_name)[source]

Remove the given step

disabled_pipeline_steps_nodes()[source]

List nodes disabled for runtime execution

Returns:

disabled_nodes – list of pipeline nodes (Node instances) which will not run in a workflow created from this pipeline state.

Return type:

list

get_pipeline_step_nodes(step_name)[source]

Get the nodes in the given pipeline step

enable_all_pipeline_steps()[source]

Set all defined steps (using add_step() or define_steps()) to be enabled. Useful to reset the pipeline state after it has been changed.

_change_processes_selection(selection_name, selection_group)[source]
add_processes_selection(selection_parameter, selection_groups, value=None)[source]

Add a processes selection switch definition to the pipeline.

Selectors are a “different” kind of switch: one pipeline node set in a group is enabled, the others are disabled.

The selector has 2 levels:

selection_parameter selects a group.

A group contains a set of nodes which will be activated together. Groups are mutually exclusive.

Parameters:
  • selection_parameter (string (mandatory)) – name of the selector parameter: the parameter is added in the pipeline, and its value is the name of the selected group.

  • selection_groups (dict or OrderedDict) – nodes groups contained in the selector : {group_name: [Node names]}

  • value (str (optional)) – initial state of the selector (default: 1st group)

get_processes_selections()[source]

Get process_selection groups names (corresponding to selection parameters on the pipeline)

get_processes_selection_groups(selection_parameter)[source]

Get groups names involved in a processes selection switch

get_processes_selection_nodes(selection_parameter, group)[source]

Get nodes names involved in a processes selection switch with value group

set_study_config(study_config)[source]

Set a StudyConfig for the process. Note that it can only be done once: once a non-null StudyConfig has been assigned to the process, it should not change.

define_groups_as_steps(exclusive=True)[source]

Define parameters groups according to which steps they are connected to.

Parameters:

exclusive (bool (optional)) – if True, a parameter is assigned to a single group, the first step it is connected to. If False, a parameter is assigned all steps groups it is connected to.

check_requirements(environment='global', message_list=None)[source]

Reimplementation for pipelines of capsul.process.process.Process.check_requirements

A pipeline will return a list of unique configuration values.

rename_node(old_node_name, new_node_name)[source]

Change the name of the selected node and updates the pipeline.

Parameters:
  • old_node_name (str) – old node name

  • new_node_name (str) – new node name

class populse_mia.data_manager.data_history_inspect.Process(**kwargs)[source]

Bases: Controller

A process is an atomic component that contains a processing.

A process is typically an object with typed parameters, and an execution function. Parameters are described using Enthought traits through Soma-Base Controller base class.

In addition to describing its parameters, a Process must implement its execution function, either through a python method, by overloading _run_process(), or through a commandline execution, by overloading get_commandline(). The second way allows to run on a remote processing machine which has not necessary capsul, nor python, installed.

Parameters are declared or queried using the traits API, and their values are in the process instance variables:

from __future__ import print_function
from capsul.api import Process
import traits.api as traits

class MyProcess(Process):

    # a class trait
    param1 = traits.Str('def_param1')

    def __init__(self):
        super(MyProcess, self).__init__()
        # declare an input param
        self.add_trait('param2', traits.Int())
        # declare an output param
        self.add_trait('out_param', traits.File(output=True))

    def _run_process(self):
        with open(self.out_param, 'w') as f:
            print('param1:', self.param1, file=f)
            print('param2:', self.param2, file=f)

# run it with parameters
MyProcess()(param2=12, out_param='/tmp/log.txt')

Note about the File and Directory traits

The File trait type represents a file parameter. A file is actually two things: a filename (string), and the file itself (on the filesystem). For an input it is OK not to distinguish them, but for an output, there are two different cases:

  • the file (on the filesystem) is an output, but the filename (string) is given as an input: this is the classical “commandline” behavior, when we tell the program where it should write its output file.

  • the file is an output, and the filename is also an output: this is rather a “function return value” behavior: the process determines internally where it should write the file, and tells as an output where it did.

To distinguish these two cases, in Capsul we normally add in the File or Directory trait a property input_filename which is True when the filename is an input, and False when the filename is an output:

self.add_trait('out_file',
               traits.File(output=True, input_filename=False))

However, as most of our processes are based on the “commandline behavior” (the filename is an input) and we often forget to specify the input_filename parameter, the default is the “filename is an input” behavior, when not specified.

Attributes

name

the class name.

Type:

str

id

the string description of the class location (ie., module.class).

Type:

str

log_file

if None, the log will be generated in the current directory otherwise it will be written in log_file path.

Type:

str (default None)

Note

  • Type ‘Process.help()’ for a full description of this process parameters.

  • Type ‘<Process>.get_input_spec()’ for a full description of this process input trait types.

  • Type ‘<Process>.get_output_spec()’ for a full description of this process output trait types.

__init__(**kwargs)[source]

Initialize the Process class.

add_trait(name, trait)[source]

Ensure that trait.output and trait.optional are set to a boolean value before calling parent class add_trait.

run(**kwargs)[source]

Obsolete: use self.__call__ instead

_run_process()[source]

Runs the processings when the instance is called.

Either this _run_process() or get_commandline() must be defined in derived classes.

Note that _run_process() is called as a python function, on a Process instance. When using remote processing (cluster for instance), this means that the commandline which will run needs to be able to re- instantiate the same process: the process thus has to be stored in a file or python module which can be accessed from the remote machine, and python / capsul correctly installed and available on it.

get_commandline() at the contrary, can implement commandlines which are completely independent from Capsul, and from python.

Note

If both methods are not defined in the derived class a NotImplementedError error is raised.

On the other side, if both methods are overloaded, the process behavior in local sequential computing mode and in Soma-Workflow modes may be different.

_before_run_process()[source]

This method is called by StudyConfig.run() before calling _run_process(). By default, it does nothing but can be overridden in derived classes.

_after_run_process(run_process_result)[source]

This method is called by StudyConfig.run() after calling _run_process(). It is expected to return the final result of the process. By default, it does nothing but can be overridden in derived classes.

_get_log(exec_info)[source]

Method that generate the logging structure from the execution information and class attributes.

Parameters:

exec_info (dict (mandatory)) – the execution information, the dictionary is supposed to contain a runtime attribute.

Returns:

log – the logging information.

Return type:

dict

_rst_table(data)[source]

Create a rst formatted table.

Parameters:

data (list of lists of str (mandatory)) – the table line-cell centent.

Returns:

rsttable – the rst formatted table containing the input data.

Return type:

list of str

save_log(returncode)[source]

Method to save process execution information in json format.

If the class attribute log_file is not set, a log.json output file is generated in the process call current working directory.

Parameters:

returncode (ProcessResult) – the process result return code.

classmethod help(returnhelp=False)[source]

Method to print the full help.

Parameters:
  • cls (process class (mandatory)) – a process class

  • returnhelp (bool (optional, default False)) – if True return the help string message, otherwise display it on the console.

get_commandline()[source]

Method to generate a commandline representation of the process.

If not implemented, it will generate a commandline running python, instantiating the current process, and calling its _run_process() method.

Returns:

commandline – Arguments are in separate elements of the list.

Return type:

list of strings

params_to_command()[source]

Generates a commandline representation of the process.

If not implemented, it will generate a commandline running python, instantiating the current process, and calling its _run_process() method.

This method is new in Capsul v3 and is a replacement for get_commandline().

It can be overwritten by custom Process subclasses. Actually each process should overwrite either params_to_command() or _run_process().

The returned commandline is a list, which first element is a “method”, and others are the actual commandline with arguments. There are several methods, the process is free to use either of the supported ones, depending on how the execution is implemented.

Methods:

capsul_job: Capsul process run in python

The command will run the _run_process() execution method of the process, after loading input parameters from a JSON dictionary file. The only second element in the commandline list is the process identifier (module/class as in get_process_instance()). The location of the JSON file will be passed to the job execution through an environment variable SOMAWF_INPUT_PARAMS:

return ['capsul_job', 'morphologist.capsul.morphologist']
format_string: free commandline with replacements for parameters

Command arguments can be, or contain, format strings in the shape ‘%(param)s’, where param is a parameter of the process. This way we can map values correctly, and call a foreign command:

return ['format_string', 'ls', '%(input_dir)s']
json_job: free commandline with JSON file for input parameters

A bit like capsul_job but without the automatic wrapper:

return ['json_job', 'python', '-m', 'my_module']
Returns:

commandline – Arguments are in separate elements of the list.

Return type:

list of strings

make_commandline_argument(*args)[source]

This helper function may be used to build non-trivial commandline arguments in get_commandline implementations. Basically it concatenates arguments, but it also takes care of keeping track of temporary file objects (if any), and converts non-string arguments to strings (using repr()).

Ex:

>>> process.make_commandline_argument('param=', self.param)

will return the same as:

>>> 'param=' + self.param

if self.param is a string (file name) or a temporary path.

static run_from_commandline(process_definition)[source]

Run a process from a commandline call. The process name (with module) are given in argument, input parameters should be passed through a JSON file which location is in the SOMAWF_INPUT_PARAMS environment variable.

If the process has outputs, the SOMAWF_OUTUT_PARAMS environment variable should contain the location of an output file which will be written with a dict containing output parameters values.

get_log()[source]

Load the logging file.

Returns:

log – the content of the log file.

Return type:

dict

get_input_spec()[source]

Method to access the process input specifications.

Returns:

outputs – a string representation of all the input trait specifications.

Return type:

str

get_output_spec()[source]

Method to access the process output specifications.

Returns:

outputs – a string representation of all the output trait specifications.

Return type:

str

get_inputs()[source]

Method to access the process inputs.

Returns:

outputs – a dictionary with all the input trait names and values.

Return type:

dict

get_outputs()[source]

Method to access the process outputs.

Returns:

outputs – a dictionary with all the output trait names and values.

Return type:

dict

get_help(returnhelp=False, use_labels=False)[source]

Generate description of a process parameters.

Parameters:
  • returnhelp (bool (optional, default False)) – if True return the help string message formatted in rst, otherwise display the raw help string message on the console.

  • use_labels (bool) – if True, input and output sections will get a RestructuredText label to avoid ambiguities.

get_input_help(rst_formating=False)[source]

Generate description for process input parameters.

Parameters:

rst_formating (bool (optional, default False)) – if True generate a rst table with the input descriptions.

Returns:

helpstr – the class input traits help

Return type:

str

get_output_help(rst_formating=False)[source]

Generate description for process output parameters.

Parameters:

rst_formating (bool (optional, default False)) – if True generate a rst table with the input descriptions.

Returns:

helpstr – the trait output help descriptions

Return type:

str

set_parameter(name, value, protected=None)[source]

Method to set a process instance trait value.

For File and Directory traits the None value is replaced by the special Undefined trait value.

Parameters:
  • name (str (mandatory)) – the trait name we want to modify

  • value (object (mandatory)) – the trait value we want to set

  • protected (None or bool (tristate)) – if True or False, force the “protected” status of the plug. If None, keep it as is.

get_parameter(name)[source]

Method to access the value of a process instance.

Parameters:

name (str (mandatory)) – the trait name we want to modify

Returns:

value – the trait value we want to access

Return type:

object

get_study_config()[source]

Get (or create) the StudyConfig this process belongs to

set_study_config(study_config)[source]

Set a StudyConfig for the process. Note that it can only be done once: once a non-null StudyConfig has been assigned to the process, it should not change.

get_missing_mandatory_parameters()[source]

Returns a list of parameters which are not optional, and which value is Undefined or None, or an empty string for a File or Directory parameter.

requirements()[source]

Requirements needed to run the process. It is a dictionary which keys are config/settings modules and values are requests for them.

The default implementation returns an empty dict (no requirements), and should be overloaded by processes which actually have requirements.

Ex:

{'spm': 'version >= "12" and standalone == "True"')
check_requirements(environment='global', message_list=None)[source]

Checks the process requirements against configuration settings values in the attached CapsulEngine. This makes use of the requirements() method and checks that there is one matching config value for each required module.

Parameters:
  • environment (str) – config environment id. Normally corresponds to the computing resource name, and defaults to “global”.

  • message_list (list) – if not None, this list will be updated with messages for unsatisfied requirements, in order to present the user with an understandable error.

Returns:

config – if None is returned, requirements are not met: the process cannot run. If a dict is returned, it corresponds to the matching config values. When no requirements are needed, an empty dict is returned. A pipeline, if its requirements are met will return a list of configuration values, because different nodes may require different config values.

Return type:

dict, list, or None

class populse_mia.data_manager.data_history_inspect.ProtoProcess(brick=None)[source]

Bases: object

A lightweight convenience class that stores a brick database entry along with additional usage information.

Parameters:

brick – The brick database entry to store. Defaults to None.

__init__(brick=None)[source]
populse_mia.data_manager.data_history_inspect.brick_to_process(brick, project)[source]

Convert a brick database entry into a ‘fake process’.

This function transforms a brick database entry into a Process instance that represents its parameters and values. The process gets a name, uuid, and exec_time from the brick. This “fake process” cannot perform actual processing but serves as a representation of the brick’s traits and values.

Parameters:
  • str) (brick (dict or) – The brick database entry to convert. If a string is provided, it is treated as the brick’s unique ID, and the corresponding brick document is retrieved from the project’s database.

  • (object) (project) – The project object providing access to the database and its documents.

Return (Process or None):

A Process instance representing the brick’s parameters and values. Returns None if the brick is not found.

populse_mia.data_manager.data_history_inspect.data_history_pipeline(filename, project)[source]

Retrieves the complete processing history of a file in the database, formatted as a “fake pipeline”.

The generated pipeline consists of unspecialized (fake) processes, each representing a processing step with all parameters of type Any. The pipeline includes connections and traces all upstream ancestors of the file, capturing the entire processing path leading to the latest version of the file.

If the file was modified multiple times, the pipeline reflects only the relevant processing steps that contributed to the final output. Orphaned processing steps from overwritten versions are omitted.

Parameters:
  • (str) (filename) – The name of the file whose processing history is being retrieved.

  • (Project) (project) – The project object containing the database and relevant details.

Return (Pipeline | None):

A Pipeline object representing the processing history, or None if no relevant history is found.

populse_mia.data_manager.data_history_inspect.data_in_value(value, filename, project)[source]

Determine if the specified filename is present within the given value.

This function recursively searches through the value, which can be a string, list, tuple, or dictionary, to check if it contains the specified filename. The filename can be a special placeholder “<temp>” or a “short” filename, which is a relative path within the project’s database data directory.

Parameters:
  • dict) (value (str, list, tuple, or) –

    The data structure to search.

    It can be:

    • A string representing a file path.

    • A list or tuple containing multiple file paths.

    • A dictionary where file paths are stored as values.

  • (str) (filename) – The filename to search for. It can be: - The special placeholder “<temp>” indicating a temporary value. - A relative file path to the project database data directory.

  • (object) (project) – The project object containing the project’s folder path as an attribute (project.folder).

Return (bool):

True if the filename is found in the value, False otherwise.

populse_mia.data_manager.data_history_inspect.find_procs_with_output(procs, filename, project)[source]

Identify processes in the given list that have the specified filename as part of their outputs.

This function searches through a list of processes to determine which ones have the specified filename in their output values. The results are organized by execution time.

Parameters:
  • ProtoProcess) (procs (iterable of) – A collection of ProtoProcess instances to search through.

  • (str) (filename) – The filename to search for within the processes’ outputs.

  • (Project) (project) – An instance of the project, used to access the database folder.

Return (dict):

A dictionary where keys are execution times and values are lists of tuples. Each tuple contains a process and the parameter name associated with the filename. Format: {exec_time: [(process, param_name), …]}.

populse_mia.data_manager.data_history_inspect.get_data_history(filename, project)[source]

Retrieves the processing history for a given data file, based on get_data_history_processes().

The returned dictionary contains:

  • “parent_files”: A set of filenames representing data (direct or

    indirect) used to produce the given file.

  • “processes”: A set of UUIDs of processing bricks that contributed

    to the file’s creation.

Parameters:
  • (str) (filename) – The name of the file whose processing history is being retrieved.

  • (Project) (project) – The project object containing the database and relevant details.

Return (dict):

A dictionary with the following keys: - “processes” (set): A set of UUIDs representing the

processing bricks involved.

  • “parent_files” (set): A set of filenames that were

    used to produce the data.

populse_mia.data_manager.data_history_inspect.get_data_history_bricks(filename, project)[source]

Retrieves the complete “useful” history of a file in the database as a set of processing bricks.

This function is a filtered version of get_data_history_processes(), similar to data_history_pipeline(), but instead of constructing a pipeline, it returns only the set of brick elements that were actually used in the relevant processing history of the file.

Parameters:
  • (str) (filename) – The name of the file whose processing history is being retrieved.

  • (Project) (project) – The project object containing the database and relevant details.

Return (set):

A set of brick elements representing the “useful” processing steps that contributed to the final version of the given data file.

populse_mia.data_manager.data_history_inspect.get_data_history_processes(filename, project)[source]

Retrieves the complete “useful” processing history of a file in the database.

This function returns: - A dictionary of processes (ProtoProcess instances), where

keys are process UUIDs.

  • A set of links between these processes, forming the processing graph.

Unlike data_history_pipeline(), which converts the history into a Pipeline, this function provides a lower-level representation. Some processes retrieved during history traversal may not be used; they are distinguished by their used attribute (set to True for relevant processes).

Processing bricks that are not used (possibly from earlier runs where the data file was overwritten) may either be absent from the history or have used = False.

Parameters:
  • (str) (filename) – The name of the file whose processing history is being retrieved.

  • (Project) (project) – The project object containing the database and relevant details.

Return (tuple):
  • procs (dict): {uuid: ProtoProcess instance} mapping.

  • links (set): `{
    (

    src_protoprocess, src_plug_name, dst_protoprocess, dst_plug_name

    )

    }`.

    External connections are represented with None as src_protoprocess or dst_protoprocess.

populse_mia.data_manager.data_history_inspect.get_direct_proc_ancestors(filename, project, procs, before_exec_time=None, only_latest=True, org_proc=None)[source]

Retrieve processing bricks referenced in the direct filename history.

This function identifies the most recent processing steps that generated the given filename. If multiple processes share the same execution time, they are all retained to account for ambiguity. The function also allows filtering by execution time and excluding a specified originating process.

Parameters:
  • (str) (filename) – The data filename to inspect.

  • (Project) (project) – The project instance used to access the database.

  • (dict) (procs) – Dictionary mapping process UUIDs to ProtoProcess instances. This dictionary is updated with newly retrieved processes.

  • (datetime) (before_exec_time) – If specified, only processing bricks executed before this time are considered.

  • (bool) (only_latest) – If True (default), keeps only the latest processes found in the history. If before_exec_time is specified, retains only the latest before that time.

  • (ProtoProcess) (org_proc) – The originating process, which is excluded from execution time filtering but included in the ancestor list.

Return (dict):

A dictionary mapping brick UUIDs to ProtoProcess instances.

populse_mia.data_manager.data_history_inspect.get_filenames_in_value(value, project, allow_temp=True)[source]

Extract filenames from a nested structure of lists, tuples, and dictionaries.

This function parses the given value, which can be a nested combination of lists, tuples, and dictionaries, to retrieve all filenames referenced within it. Only filenames that are valid database entries or the special “<temp>” value (if allow_temp is True) are retained. Other filenames are considered read-only static data and are not included in the results.

Parameters:
  • (object) (project) – The value to parse. It can be a single string, a list, tuple, dictionary, or a nested combination of these types.

  • (object) – The project object providing access to the database.

  • optional) (allow_temp (bool,) – If True, includes the temporary filename “<temp>” in the results. Defaults to True.

Return (set):

A set of filenames that are valid database entries or the temporary filename “<temp>” (if allowed).

populse_mia.data_manager.data_history_inspect.get_history_brick_process(brick_id, project, before_exec_time=None)[source]

Retrieve a brick from the database using its UUID and return it as a ProtoProcess instance.

This function fetches a brick from the database using its unique identifier (UUID). It returns the brick as a ProtoProcess instance if the brick has been executed (its execution status is “Done”) and, if specified, its execution time is not later than before_exec_time. If the brick does not meet these criteria or is not found in the database, the function returns None.

Parameters:
  • (str) (before_exec_time) – The unique identifier (UUID) of the brick to retrieve.

  • (object) (project) – The project object providing access to the database.

  • (str) – An execution time filter. If provided, bricks executed after this timestamp are discarded.

Return (ProtoProcess or None):

A ProtoProcess instance representing the brick if it meets the criteria; otherwise, None.

populse_mia.data_manager.data_history_inspect.get_proc_ancestors_via_tmp(proc, project, procs)[source]

Retrieve upstream processes connected via a temporary value (“<temp>”).

This function is intended for internal use within get_data_history_processes and data_history_pipeline. It attempts to identify upstream processes connected to the given process (proc) through a temporary filename.

The function first searches the direct history of the process’s output files. If no matching process is found, it searches the entire database of bricks, which may be slower for large databases. Matching is based on the temporary filename and processing time, which can be error-prone.

Parameters:
  • (ProtoProcess) (proc) – The process whose ancestors need to be determined.

  • (object) (project) – The project object providing access to the session and other necessary functionalities for processing.

  • (dict) (procs) – A dictionary of processes, where keys are process IDs and values are ProtoProcess instances.

Returns:

  • new_procs (dict): A dictionary mapping process UUIDs to

    ProtoProcess instances.

  • links (set): A set of tuples representing pipeline links in the format (src_protoprocess, src_plug_name, dst_protoprocess, dst_plug_name). Links from/to the pipeline main plugs are also included, where src_protoprocess or dst_protoprocess may be None.

Contains:
Private function:
  • _get_tmp_param: Identifies a process parameter associated

    with a temporary value.

populse_mia.data_manager.data_history_inspect.is_data_entry(filename, project, allow_temp=True)[source]

Determine if the given filename is a valid database entry within the specified project.

This function checks whether the input filename is either a recognized temporary value (“<temp>”) or a file located within the project’s database data directory. If the filename is valid, it returns either the relative path to the database data directory or “<temp>” (if allowed). If the file is not found in the database, the function returns None.

Parameters:
  • (str) (filename) – The full path or special value “<temp>” to be checked.

  • (object) (project) – The project object providing access to the database and folder structure.

  • optional) (allow_temp (bool,) – If True, allows the special value “<temp>” to be considered a valid entry. Defaults to True.

Return (str or None):
  • The relative path to the project’s database data directory if the filename is a valid database entry.

  • “<temp>” if the input is “<temp>” and allow_temp is True.

  • None if the filename is not a valid database entry.