CAPSUL: chain algorithms in pipelines

Capsul is a simple and efficient Python tool that aims to organize a set of processings. It is accessible to everybody, and is reusable in various contexts. The project is hosted on github: https://github.com/populse/capsul.

Documentation: http://populse.github.io/capsul

The following examples are using CAPSUL, and PyQt (or PySide). To get the GUI running in a non-blocking way, the IPython notebook should be started with the option –gui=qt:

ipython notebook --gui=qt

Otherwise calls to the Qt loop will be blocking until windows are closed at each demo step.

Definitions

  • A Process is a processing that can derived directly from a Python function and that can be used as a building block of a pipeline.

  • A Pipeline is a series of connected processes.

First check

In order to test if capsul is installed on your machine, you can ask the the Capsul version:

[1]:
# just to ensure compatibility of this notebook with python 2 and 3
from __future__ import print_function
# the following to avoid display when this notebook is converted to sphinx doc
import os
if os.environ.get('ALLOW_GUI', 'TRUE') in ('FALSE', '0'):
    use_gui = False
else:
    %gui qt4
    use_gui = True
    from soma.qt_gui import qt_backend
    qt_backend.set_qt_backend()
[2]:
import capsul
print(capsul.__version__)
2.5.1

Process and pipeline creation API

A process can be either a Process class instance, or a wrapping of a function

Process and parameters

[3]:
# Capsul import
from capsul.api import Process

# Trait import
from traits.api import Float

class Process1(Process):
    f = Float(output=False)

    def __init__(self):
        super(Process1, self).__init__()
        self.add_trait("ff", Float(output=False))
        self.add_trait("out", Float(output=True))

    def _run_process(self):
        self.out = self.f + self.ff
        print('Process1 execution, f:', self.f, ', ff:', self.ff)

process = Process1()
print(process.user_traits().keys())

process.ff = 132.6
process.f = 13.3

#execution
process()
print('Process result =', process.out)
['f', 'ff', 'out']
Process1 execution, f: 13.3 , ff: 132.6
Process result = 145.9
[4]:
from traits.api import Str

class Process2(Process):

    def __init__(self):
        super(Process2, self).__init__()
        self.add_trait("a", Float(output=True))
        self.add_trait("b", Str(output=False))

    def get_commandline(self):
        return ['echo', 'Process2 execution, a: %f, b: %s' % (self.a, self.b)]

Use a function as a building block

It is possible to convert a function in Process and thus use it as a building block of a pipeline. In the following example we will use an addition simple Python function:

[5]:
from capsul.process.xml import xml_process
from capsul.api import get_process_instance

@xml_process('''
<process capsul_xml="2.0">
    <input name="a" type="int" doc="An integer"/>
    <input name="b" type="int" doc="Another integer"/>
    <return name="addition" type="int" doc="a + b"/>
</process>
''')
def add(a, b):
     return a + b

This is a pure Python function with an XML Process description in the @xml_process decorator. Inside <process> and </process> elements, each input parameters are described as well as the returned value. The parameters are typed and a description is asked in order to generate proper tooltips or documentations. See XML specifications for more information.

We can now create a Process from this Python function:

[6]:
from capsul.api import get_process_instance

process = get_process_instance('__main__.add')

We can set some input parameters and execute the process:

[7]:
process.a = 40
process.b = 2
process()
print(process.a, '+', process.b, '=', process.addition)

40 + 2 = 42

Output files

There is a particular case for output files: a file is a filename (string) and the file itself. When the file itself is an output, the filename may in some cases be an input, when the output filename is specified in advance, or an output when the filename itself is generated by the process and output by it. For the case output file + input filename, we have to specify that the filename is actually an input, and to which parameter it is attached:

[8]:
@xml_process('''
<process capsul_xml="2.0">
    <input name="a" type="file" doc="A file"/>
    <input name="b" type="file" doc="Another file"/>
    <output name="c" type="file" input_filename="c" doc="output concatenated file"/>
</process>
''')
def cat(a, b, c):
     with open(c, 'w') as f:
         f.write(open(a).read())
         f.write(open(b).read())

process = get_process_instance('__main__.cat')

Pipeline

A pipeline uses processes, or sub-pipelines, in order to define a full processing chain, with links between building blocks. A pipeline may be defined either using the Python API, as a Pipeline subclass, or using a XML definition file.

Pipeline API

[9]:
from capsul.api import Pipeline

class Pipeline1(Pipeline):

    def pipeline_definition(self):
        # Create processes
        self.add_process("node1", Process1())
        self.add_process("node2", Process2())

pipeline1 = Pipeline1()

Viewing / debugging a pipeline

Pipeline structure

[10]:
if globals().get('use_gui', True):
    import sys
    # note that the qt backend may be either PyQt4 or PySide.
    from soma.qt_gui.qt_backend import QtGui
    from capsul.qt_gui.widgets import PipelineDevelopperView

    # here we determine whether the Qt GUI is already running or not.
    run_qt_loop = False
    if QtGui.QApplication.instance() is None:
        app = QtGui.QApplication(sys.argv)
        run_qt_loop = True
    else:
        app = QtGui.QApplication.instance()
    # in the following we will reuse this run_qt_loop variable for simplicity

    # now the real thing for pipeline viewing
    view1 = PipelineDevelopperView(pipeline1)
    view1.show()

    if run_qt_loop:
        print('close window to gon on...')
        app.exec_()

It should display something like this: capsul pipeline view

Entering parameters

[11]:
if globals().get('use_gui', True):
    from soma.qt_gui.controller_widget import ControllerWidget

    controller1 = ControllerWidget(pipeline1, live=True)
    controller1.show()
    controller2 = ControllerWidget(pipeline1, live=True)
    controller2.show()
    if run_qt_loop:
        app.exec_()

controller views

Using the Python API

[18]:
class Pipeline3(Pipeline):

    def pipeline_definition(self):
        # Create processes
        self.add_process("node1", Process1())
        self.add_process("node2", Process2())
        self.add_switch("switch", ["case1", "case2"], ["output"])
        #links
        self.add_link("node1.out->switch.case1_switch_output")
        self.add_link("node2.a->switch.case2_switch_output")
[19]:
pipeline3 = Pipeline3()
if globals().get('use_gui', True):
    view3 = PipelineDevelopperView(pipeline3, allow_open_controller=True, show_sub_pipelines=True)
    view3.show()
    if run_qt_loop:
        app.exec_()

pipeline3

[20]:
pipeline3.switch = "case2"

if globals().get('use_gui', True):
    view3.show()
    if run_qt_loop:
        app.exec_()
[21]:
pipeline3(b='yup')
Process2 execution, a: 0.000000, b: yup

Using XML definition

[22]:
import os
import capsul.process.test as test

xmldesc = os.path.join(os.path.dirname(test.__file__), "test_pipeline.xml")
with open(xmldesc, "r") as openfile:
    print("".join(openfile.readlines()))
<pipeline capsul_xml="2.0">
    <process name="threshold_gt_1"
     module="capsul.process.test.test_load_from_description.threshold">
        <set name="threshold" value="1"/>
        <set name="method" value="'gt'"/>
    </process>
    <process name="threshold_gt_10"
     module="capsul.process.test.test_load_from_description.threshold">
        <set name="threshold" value="10"/>
        <set name="method" value="'gt'"/>
    </process>
    <process name="threshold_gt_100"
     module="capsul.process.test.test_load_from_description.threshold">
        <set name="threshold" value="100"/>
        <set name="method" value="'gt'"/>
    </process>
    <process name="threshold_lt_1"
     module="capsul.process.test.test_load_from_description.threshold">
        <set name="threshold" value="1"/>
        <set name="method" value="'lt'"/>
    </process>
    <process name="threshold_lt_10"
     module="capsul.process.test.test_load_from_description.threshold">
        <set name="threshold" value="10"/>
        <set name="method" value="'lt'"/>
    </process>
    <process name="threshold_lt_100"
     module="capsul.process.test.test_load_from_description.threshold">
        <set name="threshold" value="100"/>
        <set name="method" value="'lt'"/>
    </process>
    <process name="mask_1"
     module="capsul.process.test.test_load_from_description.mask">
    </process>
    <process name="mask_10"
     module="capsul.process.test.test_load_from_description.mask">
    </process>
    <process name="mask_100"
     module="capsul.process.test.test_load_from_description.mask">
    </process>

    <link source="input_image" dest="threshold_gt_1.input_image"/>
    <link source="input_image" dest="threshold_gt_10.input_image"/>
    <link source="input_image" dest="threshold_gt_100.input_image"/>

    <link source="input_image" dest="threshold_lt_1.input_image"/>
    <link source="input_image" dest="threshold_lt_10.input_image"/>
    <link source="input_image" dest="threshold_lt_100.input_image"/>

    <link source="input_image" dest="mask_1.input_image"/>
    <link source="input_image" dest="mask_10.input_image"/>
    <link source="input_image" dest="mask_100.input_image"/>

    <link source="threshold_gt_1.output_image" dest="mask_1.mask"/>
    <link source="threshold_gt_10.output_image" dest="mask_10.mask"/>
    <link source="threshold_gt_100.output_image" dest="mask_100.mask"/>
    <link source="threshold_lt_1.output_image" dest="mask_1.mask"/>
    <link source="threshold_lt_10.output_image" dest="mask_10.mask"/>
    <link source="threshold_lt_100.output_image" dest="mask_100.mask"/>

    <link source="mask_1.output_image" dest="output_1"/>
    <link source="mask_10.output_image" dest="output_10"/>
    <link source="mask_100.output_image" dest="output_100"/>

    <processes_selection name="select_method">
        <processes_group name="greater than">
            <process name="threshold_gt_1"/>
            <process name="threshold_gt_10"/>
            <process name="threshold_gt_100"/>
        </processes_group>
        <processes_group name="lower than">
            <process name="threshold_lt_1"/>
            <process name="threshold_lt_10"/>
            <process name="threshold_lt_100"/>
        </processes_group>
    </processes_selection>

    <gui>
        <position name="threshold_gt_100" x="386.0" y="403.0"/>
        <position name="inputs" x="50.0" y="50.0"/>
        <position name="mask_1" x="815.0" y="153.0"/>
        <position name="threshold_gt_10" x="374.0" y="242.0"/>
        <position name="threshold_lt_100" x="556.0" y="314.0"/>
        <position name="threshold_gt_1" x="371.0" y="88.0"/>
        <position name="mask_10" x="820.0" y="293.0"/>
        <position name="mask_100" x="826.0" y="451.0"/>
        <position name="threshold_lt_1" x="570.0" y="6.0"/>
        <position name="threshold_lt_10" x="568.0" y="145.0"/>
        <position name="outputs" x="1000" y="100" />

        <!--
        <position name="threshold_gt_100" x="182.35615" y="658.8043">
        <position name="mask_1" x="348.3267625" y="205.125">
        <position name="threshold_gt_10" x="183.6139625" y="329.3854">
        <position name="inputs" x="0.0" y="488.9169">
        <position name="outputs" x="512.08755" y="488.9169">
        <position name="mask_100" x="348.3267625" y="744.8167">
        <position name="threshold_lt_1" x="183.6139625" y="146.998">
        <position name="threshold_gt_1" x="183.6139625" y="0.0">
        <position name="threshold_lt_100" x="183.6139625" y="805.8291">
        <position name="mask_10" x="348.3267625" y="488.9169">
        <position name="threshold_lt_10" x="183.6139625" y="476.4169">
        -->
    </gui>
</pipeline>

Again we can create a Pipeline from his xml description:

[23]:
from capsul.api import get_process_instance

xmlpipe = get_process_instance("capsul.process.test.test_pipeline")
xmlpipe.help()



.. note::

    * Type 'test_pipeline.help()' for a full description of this process parameters.
    * Type '<test_pipeline>.get_input_spec()' for a full description of this process input trait types.
    * Type '<test_pipeline>.get_output_spec()' for a full description of this process output trait types.


Inputs
~~~~~~

[Mandatory]

nodes_activation: a legal value (['ControllerTrait'] - mandatory)
    No description.
input_image: a string or os.PathLike object (['File'] - mandatory)
    Path of a NIFTI-1 image file.
select_method: a legal value (['Enum'] - mandatory)
    No description.

Outputs
~~~~~~~

output_1: a string or os.PathLike object (['File (filename: output)']
    - mandatory)
    Output file name.
output_10: a string or os.PathLike object (['File (filename: output)']
    - mandatory)
    Output file name.
output_100: a string or os.PathLike object (['File (filename:
    output)'] - mandatory)
    Output file name.

And generate its graphical representation:

[24]:
if globals().get('use_gui', True):
    from soma.qt_gui.qt_backend import QtGui
    from capsul.qt_gui.widgets import PipelineDevelopperView
    from soma.qt_gui.controller_widget import ControllerWidget

    view = PipelineDevelopperView(xmlpipe)
    controller = ControllerWidget(xmlpipe, live=True)
    view.show()
    controller.show()
    if run_qt_loop:
        app.exec_()

xml pipeline xml pipeline controller

Iteration over a process or pipeline

A process can be used several times in parallel: some of its parameters will be changed for lists

[25]:
@xml_process('''<process capsul_xml="2.0">
    <input name="inputs" type="list_float" doc="numbers"/>
    <return name="addition" type="float" doc="sum all inputs"/>
</process>
''')
def sum_node(inputs):
    return sum(inputs)

@xml_process('''<process capsul_xml="2.0">
    <input name="a" type="float" doc="number"/>
    <return name="b" type="float" doc="a + 12"/>
</process>
''')
def add_12(a):
    return a + 12

class Pipeline4(Pipeline):

    def pipeline_definition(self):
        self.add_iterative_process("add_12", '__main__.add_12', iterative_plugs=['a', 'b'])
        self.add_process("sum_node", '__main__.sum_node')
        self.add_link('add_12.b->sum_node.inputs')

[26]:
pipeline4 = Pipeline4()
if globals().get('use_gui', True):
    view4 = PipelineDevelopperView(pipeline4, allow_open_controller=True, show_sub_pipelines=True)
    view4.show()
    if run_qt_loop:
        app.exec_()

iteration The green node here is an iterative node, it can be opened (double-clicked) to see what is inside it.

Set input parameters and run.

We set 3 inputs, 4.2, 6.8 and 7.

Each should be added 12, then all outputs will be summed up.

This should make (4.2 + 12) + (6.8 + 12) + (7. + 12) = 54.

[27]:
pipeline4.a = [4.2, 6.8, 7.]
pipeline4()
print(pipeline4.addition)
assert(pipeline4.addition == 54)
54.0

Optional outputs and temporary files

Let’s consider the following pipeline, with an optional output from the intermediate output:

[28]:
@xml_process('''<process capsul_xml="2.0">
    <input name="input" type="file" doc="input file"/>
    <output name="output" type="file" input_filename="output" doc="output file"/>
</process>
''')
def node_first(input, output):
    lines = open(input).readlines()
    with open(output, 'w') as f:
        f.write('\n'.join(['* %s *' % l[:-1] for l in lines]))

@xml_process('''<process capsul_xml="2.0">
    <input name="a" type="file" doc="input file"/>
    <output name="b" type="file" input_filename="b" doc="output file"/>
</process>
''')
def node_second(a, b):
    lines = open(a).readlines()
    with open(b, 'w') as f:
        f.write(''.join([l.replace('*', '^') for l in lines]))

class Pipeline5(Pipeline):

    def pipeline_definition(self):
        self.add_process("node1", '__main__.node_first')
        self.add_process("node2", '__main__.node_second')
        self.add_link('node1.output->node2.a')
        self.export_parameter('node1', 'output', 'intermediate', is_optional=True)
[29]:
pipeline5 = Pipeline5()
if globals().get('use_gui', True):
    view5 = PipelineDevelopperView(pipeline5, allow_open_controller=True, show_sub_pipelines=True)
    view5.show()
    if run_qt_loop:
        app.exec_()

pipeline5

But there is really a problem here now: if we do not specify pipeline5.intermediate, even when it is optional, the pipeline will not work because node1 needs to write its output into a file. As its output is exported, it will not get a temporary value. Thus in the current state, pipeline5.intermediate is actually mandatory.

[30]:
infile = '/tmp/dummy.txt'
outfile = '/tmp/dummy_modified.txt'
intermediate = '/tmp/dummy_intermediate.txt'
open(infile, 'w').write('I ate 5 cakes this morning.\nLet\'s eat 3 or 4 more.\n')
pipeline5.input = infile
pipeline5.b = outfile
pipeline5.intermediate = intermediate
pipeline5()
print(open(outfile).read())
^ I ate 5 cakes this morning. ^
^ Let's eat 3 or 4 more. ^
[31]:
# cleanup
os.unlink(outfile)
os.unlink(intermediate)

To solve this issue, we need an element which conditionally bridges or breaks the link between node1.output and intermediate. This is the job of optional output switches:

[32]:
class Pipeline6(Pipeline):

    def pipeline_definition(self):
        self.add_process("node1", '__main__.node_first')
        self.add_process("node2", '__main__.node_second')
        self.add_optional_output_switch('b', 'input')
        self.add_link('node1.output->node2.a')
        self.add_link('node1.output->b.input_switch_b')
        self.export_parameter('b', 'b', 'intermediate', is_optional=True)
[33]:
pipeline6 = Pipeline6()
if globals().get('use_gui', True):
    view6 = PipelineDevelopperView(pipeline6, allow_open_controller=True, show_sub_pipelines=True)
    view6.show()
    if run_qt_loop:
        app.exec_()

pipeline6

as long as pipeline6.intermediate is empty, the switch is off. node1.output gets a temporary value.

[34]:
pipeline6.input = infile
pipeline6.b = outfile
pipeline6()
print(open(outfile).read())
^ I ate 5 cakes this morning. ^
^ Let's eat 3 or 4 more. ^
[35]:
# cleanup
os.unlink(outfile)
[36]:
pipeline6.intermediate = intermediate
[37]:
# the switch has been switched on automatically
# node1.output gets the value assigned to pipeline6.intermediate
if globals().get('use_gui', True):
    view6.show()
    if run_qt_loop:
        app.exec_()

pipeline6 connected

[38]:
pipeline6()
print(open(intermediate).read())
* I ate 5 cakes this morning. *
* Let's eat 3 or 4 more. *
[39]:
# cleanup
os.unlink(outfile)
os.unlink(intermediate)
os.unlink(infile)

StudyConfig

StudyConfig is a placeholder for study-wide parameters, settings, paths and so on. It is a modular configuration tool, which has modules to configure some external software.

A helper to configure state of the art medical software

Capsul propose a module to configure external software:

  • FSL

  • SPM

  • FreeSurfer

  • BrainVisa

With this module it is also possible to configure the execution of the pipeline:

  • Use smart caching

  • Generate some logging

  • Soma-Worflow to handle population imaging

For instance:

[40]:
from capsul.api import StudyConfig
# optional config modules, need not to be explicitly loaded
# from capsul.study_config.config_modules.freesurfer_config import FreeSurferConfig
# from capsul.study_config.config_modules.brainvisa_config import BrainVISAConfig

default_config = {"use_soma_workflow": True}
study_config = StudyConfig(initial_config=default_config,
                           modules=StudyConfig.default_modules + \
                           ['BrainVISAConfig', 'FSLConfig', 'FomConfig'])

# inspect config options
for k in study_config.user_traits().keys(): print(k, ':  ', getattr(study_config, k))
create_output_directories :   True
generate_logging :   False
input_directory :   <undefined>
output_directory :   <undefined>
process_output_directory :   False
study_name :   <undefined>
user_level :   0
afni_path :   <undefined>
use_afni :   <undefined>
ants_path :   <undefined>
use_ants :   <undefined>
fsl_config :   <undefined>
fsl_prefix :   <undefined>
use_fsl :   <undefined>
matlab_exec :   <undefined>
use_matlab :   False
mrtrix_path :   <undefined>
use_mrtrix :   <undefined>
use_smart_caching :   False
use_soma_workflow :   False
somaworkflow_computing_resource :   <undefined>
somaworkflow_config_file :   <undefined>
somaworkflow_keep_failed_workflows :   True
somaworkflow_keep_succeeded_workflows :   False
somaworkflow_computing_resources_config :   <soma.controller.controller.OpenKeyController object at 0x7f09e8b7cdb0>
spm_directory :   <undefined>
spm_standalone :   <undefined>
spm_version :   <undefined>
spm_exec :   <undefined>
spm_mcr_directory :   <undefined>
use_spm :   <undefined>
shared_directory :   /opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/site-packages/share/brainvisa-share-5.2
input_fom :
output_fom :
shared_fom :
volumes_format :   <undefined>
meshes_format :   <undefined>
auto_fom :   True
fom_path :   []
use_fom :   True
attributes_schema_paths :   ['capsul.attributes.completion_engine_factory']
attributes_schemas :   {}
process_completion :   builtin
path_completion :   <undefined>

Let show how to configure FSL:

[41]:
from capsul.api import StudyConfig

study_config = StudyConfig(
    modules=["FSLConfig"],
    fsl_config="/etc/fsl/5.0/fsl.sh",
    use_smart_caching=True,
    output_directory="/tmp/capsul_demo")
print(study_config.run.__doc__)
Method to execute a process or a pipeline in a study configuration
         environment.

         Depending on the studies_config settings, it may be a sequential run,
         or a parallel run, which can involve remote execution (through soma-
         workflow).

         Only pipeline nodes can be filtered on the 'execute_qc_nodes'
         attribute.

         A valid output directory is expected to execute the process or the
         pepeline without soma-workflow.

        Parameters
        ----------
        process_or_pipeline: Process or Pipeline instance (mandatory)
            the process or pipeline we want to execute
        output_directory: Directory name (optional)
            the output directory to use for process execution. This replaces
            self.output_directory but left it unchanged.
        execute_qc_nodes: bool (optional, default False)
            if True execute process nodes that are tagged as qualtity control
            process nodes.
        verbose: int
            if different from zero, print console messages.
        configuration_dict: dict (optional)
            configuration dictionary

Execution of the pipeline

In this section a simple execution is performed on your machine using one CPU (if more than one CPU are used it means that the called external software is parallelized). We just have to call the StudyConfig run method:

[42]:
study_config.reset_process_counter()
study_config.run(pipeline2, verbose=1)
________________________________________________________________________________
[Process] Calling __main__.Process2...
__main__.Process2(b=blop)
Process2 execution, a: 0.000000, b: blop
________________________________________________________________________________
[Process] Calling __main__.Process1...
__main__.Process1(f=13.2, ff=0.0)
Process1 execution, f: 13.2 , ff: 0.0

Distributed execution using Soma-Workflow

Capsul can execute a pipeline through Soma-Workflow in order to address large parallelized pipelines, or huge datasets in the case of population imaging studies.

Limitation:

Currently, when used in Soma-Workflow, process outputs can only be files. Output numbers, strings, etc. will be lost.

[43]:
class Process1(Process):
    f = Float(output=False)

    def __init__(self):
        super(Process1, self).__init__()
        self.add_trait("ff", Float(output=False))
        self.add_trait("out", Float(output=True))

    def get_commandline(self):
        return ['echo', 'Process1 execution, f: %f, ff: %f' % (self.f, self.ff)]

class Process2(Process):

    def __init__(self):
        super(Process2, self).__init__()
        self.add_trait("a", Float(output=True))
        self.add_trait("b", Str(output=False))

    def get_commandline(self):
        return ['echo', 'Process2 execution, a: %f, b: %s' % (self.a, self.b)]

class Pipeline2(Pipeline):

    def pipeline_definition(self):
        # Create processes
        self.add_process("node1", Process1())
        self.add_process("node2", Process2())
        # links
        self.add_link('node2.a->node1.ff')
        # custom exports
        self.export_parameter("node2", "b", "node_string")
[44]:
from capsul.pipeline.pipeline_workflow import workflow_from_pipeline

pipeline2 = get_process_instance(Pipeline2)
workflow = workflow_from_pipeline(pipeline2)
print('jobs:', workflow.jobs)
print('dependencies:', workflow.dependencies)
jobs: [<soma_workflow.client_types.Job object at 0x7f09e8a73e20>, <soma_workflow.client_types.Job object at 0x7f09e8a89070>]
dependencies: {(<soma_workflow.client_types.Job object at 0x7f09e8a89070>, <soma_workflow.client_types.Job object at 0x7f09e8a73e20>)}

The workwlow can be saved and reloaded in soma_workflow_gui, or used in a soma-workflow controller:

[45]:
from soma_workflow import client as swclient

# save workflow to disk in json format
swclient.Helper.serialize('/tmp/pipeline2.workflow', workflow)

# run locally via a workflow controller
wc = swclient.WorkflowController()
wf_id = wc.submit_workflow(workflow)
swclient.Helper.wait_workflow(wf_id, wc)
print('execution status:', wc.workflow_status(wf_id))
wc.delete_workflow(wf_id)
soma-workflow starting in light mode
Workflow controller initialised
execution status: workflow_done
[45]:
True

ok if we don’t use it anymore, we can remove this workflow file.

[46]:
import os
os.unlink('/tmp/pipeline2.workflow')

More transparently, Soma-Workflow execution can be used from StudyConfig: For this, StudyConfig should have “SomaWorkflowConfig” amongst its modules, and it should be enabled (which should be the default when the module is loaded in the config).

[47]:
study_config = StudyConfig(modules=['SomaWorkflowConfig'])
study_config.use_soma_workflow = True
study_config.run(pipeline2)
# if execution fails, an exception should be raised.
soma-workflow starting in light mode
Workflow controller initialised

If we wanted more control over the executed workflow, we could have added things:

[48]:
study_config.somaworkflow_keep_succeeded_workflows = True
wc = study_config.modules['SomaWorkflowConfig'].get_workflow_controller()
if wc is not None:
    init_wf_list = wc.workflows().keys()
else:
    init_wf_list = []
try:
    study_config.run(pipeline2)
except Exception as e:
    print('Workflow failed:', e)
# get workflow after running
wc = study_config.modules['SomaWorkflowConfig'].get_workflow_controller()
wf_list = wc.workflows().keys()
wf_id = [wf for wf in wf_list if wf not in init_wf_list][0]
print('workflow id:', wf_id)
wc.delete_workflow(wf_id)
del WorkflowController
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
Cell In[48], line 14
     12 wc = study_config.modules['SomaWorkflowConfig'].get_workflow_controller()
     13 wf_list = wc.workflows().keys()
---> 14 wf_id = [wf for wf in wf_list if wf not in init_wf_list][0]
     15 print('workflow id:', wf_id)
     16 wc.delete_workflow(wf_id)

IndexError: list index out of range

Use thirdparty software in Capsul

Some thirdparty software need to be configured before they are called. StudyConfig is used to hold this configuration. There are a set of modules that can be modified to enable/disable configuration of thirdparty software.

[49]:
from capsul.api import StudyConfig

print(StudyConfig.default_modules)
['AFNIConfig', 'ANTSConfig', 'FSLConfig', 'MatlabConfig', 'MRTRIXConfig', 'SmartCachingConfig', 'SomaWorkflowConfig', 'SPMConfig']

In order to change modules that are used, one must give a full module list to StudyConfig():

[50]:
study_config = StudyConfig(modules=StudyConfig.default_modules + ['BrainVISAConfig', 'NipypeConfig'])

Use FSL in Capsul

FSL is called via specific subprocess-like functions taking into account a configuration done in StudyConfig. If a StudyConfig is not configured to use FSL, it may be automatically configured. Automatic configuration had been tested in the two following cases :

  1. FSL was installed from the FMRIB site and, at least, FSLDIR environment variable is set (fsl.sh can be sourced or not)

  2. FSL was installed from Neurodebian packages

Automatic configuration is done lazily. To start it, one must either use StudyConfig(use_fsl=True) or use one of the calling functions of capsul.subprocess.fsl.

For calling FSL command with capsul.subprocess.fsl module, the first argument of command line must be the FSL executable without any path nor prefix. Prefix are used in Neurodebian install. For instance on Ubuntu 16.04 Neurodebian FSL commands are prefixed with fsl5.0-. The appropriate path and eventually prefix are added from the configuration of the StudyConfig instance.

[51]:
import sys
import os
import os.path as osp
import tempfile

from traits.api import File, Undefined

from capsul.api import StudyConfig, Process
from capsul.in_context import fsl

class Bet(Process):
    '''
    A process that calls bet command without any parameter except input and output files.
    '''
    input_image = File(optional=False, output=False)
    output_image = File(optional=False, output=True)


    def _run_process(self):
        fsl.fsl_check_call(['bet', self.input_image, self.output_image])

try:
    study_config = StudyConfig(use_fsl=True)
except EnvironmentError as e:
    # If FSL cannot be configured automatically, tells why
    print('FSL cannot be configured automatically:', str(e))
    study_config = None
if study_config and study_config.use_fsl is True:
    with study_config.engine:
        # Try to find an image in FSL installation directory
        test_image = '/usr/share/data/fsl-mni152-templates/MNI152_T1_1mm_brain.nii.gz'
        if not osp.exists(test_image):
            fsl_dir = os.environ.get('FSLDIR')
            if not fsl_dir and study_config.fsl_config is not Undefined:
                fsl_dir = osp.dirname(osp.dirname(osp.dirname(study_config.fsl_config)))
                if fsl_dir:
                    test_image = glob(osp.join(fsl_dir, 'fslpython/envs/fslpython/lib/python*/site-packages/nibabel/tests/data/anatomical.nii'))
                    if test_image:
                        test_image = test_image[0]
                    else:
                        print('FSL test data cannot be found')
                        test_image = None

        if test_image:
            bet = study_config.get_process_instance(Bet)
            with tempfile.NamedTemporaryFile(suffix='.nii.gz') as tmp:
                bet.run(input_image=test_image, output_image=tmp.name)
                print('Output file size:', os.stat(bet.output_image).st_size)

FSL cannot be configured automatically: FSL command "bet" cannot be found in PATH

Use Nipype in Capsul

It is possible to use all the nipype interfaces (FSL, SPM, FreeSurfer, …) as building blocks in Capsul. This step requires nipype to be properly installed as well as the software we want to use. For instance if we want to perform a brain extraction with FSL we can simply write:

[52]:
from capsul.api import StudyConfig, get_process_instance
from soma.path import find_in_path

study_config = StudyConfig(modules=StudyConfig.default_modules + ['NipypeConfig'])

# It is necessary to have FLS commands (such as "bet") in
# the PATH in order to use FSL via Nipype
if study_config.use_nipype and find_in_path('bet'):
    betpipe = get_process_instance("nipype.interfaces.fsl.BET")
    betpipe.get_help()
    betpipe.in_file="/tmp/MNI152_T1_2mm.nii.gz"
    betpipe.output_directory = '/tmp'
else:
    betpipe = None
    print('NiPype is not present.')
NiPype is not present.

As shown it is possible to set the BET algorithm input parameters. Note that in capsul the standard nipype outputs are prefixed with underscores. We can execute this Process but unfortunately, as mentioned by the nipype warnings, FSL needs to be configured in the study confit, otherwise the pipeline will not run. As we have done it above, we can run it:

[53]:
if betpipe:
    study_config.reset_process_counter()
    study_config.run(betpipe, verbose=1)

Parameters completion using FOM (File Organization Model)

FOMs allow to complete file names in large pipelines with many parameters from a small set of attributes. To illustrate this feature, we will first create a pipeline with several such parameters, from a XML description.

[54]:
from capsul.pipeline.xml import create_xml_pipeline

xmlstr = '''<pipeline capsul_xml="2.0">
        <process name="proc1"
            module="capsul.pipeline.test.test_pipeline.DummyProcess">
        </process>
        <process name="proc2"
            module="capsul.pipeline.test.test_pipeline.DummyProcess">
        </process>
        <process name="proc3"
            module="capsul.pipeline.test.test_pipeline.DummyProcess">
        </process>
        <process name="proc4"
            module="capsul.pipeline.test.test_pipeline.DummyProcess">
        </process>
        <switch name="proc_select" switch_value="proc1">
            <input name="proc1"/>
            <input name="proc2"/>
            <output name="image"/>
        </switch>
        <switch name="proc_select2" switch_value="proc3">
            <input name="proc3"/>
            <input name="proc4"/>
            <output name="image"/>
        </switch>
        <link source="input_image1" dest="proc1.input_image"/>
        <link source="input_image2" dest="proc2.input_image"/>
        <link source="proc_select" dest="proc_select.switch"/>
        <link source="proc_select2" dest="proc_select2.switch"/>
        <link source="input_image4" dest="proc4.input_image"/>
        <link source="input_image3" dest="proc3.input_image"/>

        <link dest="image_out1" source="proc_select.image" weak_link="true"/>
        <link dest="image_out3" source="proc3.output_image" weak_link="true"/>
        <link dest="image_out5" source="proc_select2.image" weak_link="true"/>

        <link source="input_image1" dest="proc1.input_image"></link>
        <link source="input_image2" dest="proc2.input_image"></link>
        <link source="proc_select" dest="proc_select.switch"></link>
        <link source="proc_select2" dest="proc_select2.switch"></link>
        <link source="input_image4" dest="proc4.input_image"></link>
        <link source="input_image3" dest="proc3.input_image"></link>
        <link source="proc1.output_image" dest="proc_select.proc1_switch_image"></link>
        <link source="proc2.output_image" dest="proc_select.proc2_switch_image"></link>
        <link source="proc3.output_image" dest="proc_select2.proc3_switch_image"></link>
        <link source="proc4.output_image" dest="proc_select2.proc4_switch_image"></link>
    <gui>
        <position name="inputs" x="-151.0" y="227.4447"></position>
        <position name="proc_select" x="381.6498" y="160.1012"></position>
        <position name="outputs" x="668.1498" y="319.9886"></position>
        <position name="proc_select2" x="381.6498" y="507.1947"></position>
        <position name="proc4" x="144.2624" y="589.7949"></position>
        <position name="proc1" x="123.2624" y="-4.0"></position>
        <position name="proc3" x="146.2624" y="391.9886"></position>
        <position name="proc2" x="132.2624" y="171.8197"></position>
    </gui>
</pipeline>'''

DemoPipeline = create_xml_pipeline('__main__', 'DemoPipeline', xmlstr)
pipeline = DemoPipeline()
[55]:
if globals().get('use_gui', True):
    from capsul.qt_gui.widgets import PipelineDevelopperView

    pv = PipelineDevelopperView(pipeline, allow_open_controller=True, show_sub_pipelines=True)
    pv.show()

    if run_qt_loop:
        app.exec_()

pipeline

FOM definition

[56]:
fom_content = '''{
    "fom_name": "demo_fom",

    "formats": {
        "NIFTI": "nii",
        "NIFTI gz": "nii.gz"
    },
    "format_lists": {
        "images": ["NIFTI gz", "NIFTI"]
    },

    "shared_patterns": {
      "subject_dir": "<center>_<subject>"
    },

    "processes": {
        "DemoPipeline": {
            "input_image1":
                [["input:<center>_<subject>/<subject>", "images"]],
            "input_image2":
                [["input:{subject_dir}/alt2_<subject>", "images"]],
            "input_image3":
                [["input:{subject_dir}/alt2_<subject>", "images"]],
            "input_image4":
                [["input:{subject_dir}/alt4_<subject>", "images"]],
            "image_out1":
                [["output:{subject_dir}/out_image_<subject>_1", "images"]],
            "image_out3":
                [["output:{subject_dir}/out_image_<subject>_3", "images"]]
        },
        "DemoPipeline.proc4": {
            "output_image": [["output:{subject_dir}/out_image_<subject>_4", "images"]]
        }
    }

}
'''

try: os.path.makedirs('/tmp/capsul_demo')
except: pass
open('/tmp/capsul_demo/demo_fom.json', 'w').write(fom_content)

[56]:
1035

File names completion using FOM

FOM completion is used through the attributes completion system ProcessCompletionEngine.

[57]:
import sys
import os
from capsul.api import StudyConfig
from capsul.attributes.completion_engine import ProcessCompletionEngine
if globals().get('use_gui', True):
    from soma.qt_gui.controller_widget import ScrollControllerWidget
    from capsul.qt_gui.widgets.attributed_process_widget import AttributedProcessWidget

# the following 4 lines are a hack to add /tmp/capsul_demo to the FOM search path
# before it is used by StudyConfig
from soma.application import Application
soma_app = Application('capsul', plugin_modules=['soma.fom'])
soma_app.initialize()
soma_app.fom_manager.paths.append('/tmp/capsul_demo')

config = {
  "name" : "morphologist_fom",
  "input_directory" : "/data/capsul_demo",
  "output_directory" : "/data/capsul_demo",
  "input_fom" : "demo_fom",
  "output_fom" : "demo_fom",
  "use_soma_workflow" : True,
  "use_fom" : True,
  "volumes_format" : "nii.gz",
  "meshes_format" : "gii",
}

study_config = StudyConfig(init_config=config,
                           modules=StudyConfig.default_modules + ['FomConfig', 'BrainVISAConfig'])
soma_app.fom_manager._cache = None # while debugging

mp = study_config.get_process_instance(DemoPipeline)

ce = ProcessCompletionEngine.get_completion_engine(mp)
print('completion engine type:', type(ce).__name__)
attributes = ce.get_attribute_values()
ce.install_auto_completion()
attributes.center = 'subjects'
attributes.subject = 'irm2'

if globals().get('use_gui', True):
    # pipeline controller GUI
    p_view = ScrollControllerWidget(mp, live=True)
    p_view.show()
    # attributed pipeline controller GUI
    ap_view = AttributedProcessWidget(mp)
    ap_view.show()

    pv = PipelineDevelopperView(mp, allow_open_controller=True, show_sub_pipelines=True)
    pv.show()

    if run_qt_loop:
      app.exec_()

completion engine type: FomProcessCompletionEngine

pipeline with FOM completion controller controller

Note how the output “image_out5” depends on the proc_select2 switch value:

While “image_out1” is fixed via the FOM completion, its value “back-propagates” to both “proc1.output_image” and “proc2.output_image”. For “image_out5” the FOM does not impose its value, it is deduced from either “proc3.output_image” (in turn set via the global “image_out3”) or “proc4.output_image”, depending on the proc_select2 swtch value.

[58]:
mp.proc_select2 = "proc3"
print("switch proc_select2 value:", mp.proc_select2)
print("output image_out5:", mp.image_out5)
mp.proc_select2 = "proc4"
print("switch proc_select2 value:", mp.proc_select2)
print("output image_out5:", mp.image_out5)
switch proc_select2 value: proc3
output image_out5: /data/capsul_demo/subjects_irm2/out_image_irm2_3.nii.gz
switch proc_select2 value: proc4
output image_out5: /data/capsul_demo/subjects_irm2/out_image_irm2_4.nii.gz

changed pipeline changed completion controller

Now Cleanup the files we have created…

[59]:
import shutil
shutil.rmtree('/tmp/capsul_demo')