# -*- coding: utf-8 -*-
"""Capsul Pipeline conversion into soma-workflow workflow.
Standard use case::
workflow = workflow_from_pipeline(pipeline)
controller, wf_id = workflow_run(workflow_name, workflow, study_config)
Functions
=========
:func:`workflow_from_pipeline`
------------------------------
:func:`workflow_run`
--------------------
"""
from __future__ import print_function
from __future__ import absolute_import
import os
import socket
import sys
import six
import weakref
import soma_workflow.client as swclient
import soma_workflow.info as swinfo
from capsul.pipeline.pipeline import Pipeline, Switch, PipelineNode
from capsul.pipeline import pipeline_tools
from capsul.process.process import Process
from capsul.pipeline.topological_sort import Graph
from traits.api import Directory, Undefined, File, Str, Any, List
from soma.sorted_dictionary import OrderedDict, SortedDictionary
from .process_iteration import ProcessIteration
from capsul.attributes import completion_engine_iteration
from capsul.attributes.completion_engine import ProcessCompletionEngine
from capsul.pipeline.pipeline_nodes import ProcessNode
from soma_workflow.custom_jobs import MapJob, ReduceJob
from six.moves import range
[docs]
class TempFile(str):
# class needed temporary to identify temporary paths in the pipeline.
# must inherit a string type since it is used as a trait value
def __init__(self, string=''):
# in python3 super(..).__init__() cannot take an argument
# moreover the str value is assigned anyway.
super(TempFile, self).__init__()
if isinstance(string, TempFile):
self.pattern = string.pattern
self.value = string.value
self.ref = string.ref if string.ref else string
else:
self.pattern = '%s'
self.value = string
self.ref = None
def referent(self):
return self.ref if self.ref else self
def get_value(self):
return self.referent().value
def __add__(self, other):
res = TempFile(str(self) + str(other))
res.pattern = self.pattern + str(other)
res.value = self.value
res.ref = self.referent()
return res
def __radd__(self, other):
res = TempFile(str(other) + str(self))
res.pattern = str(other) + self.pattern
res.value = self.value
res.ref = self.referent()
return res
def __iadd__(self, other):
self.pattern += str(other)
str(TempFile, self).__iadd__(str(other))
def __str__(self):
return self.pattern % self.get_value()
def __hash__(self):
if self.ref:
return self.referent().__hash__()
return super(TempFile, self).__hash__()
def __eq__(self, other):
if not isinstance(other, TempFile):
return False
return self.referent() is other.referent()
def __lt__(self, other):
return str(self) < other
def __gt__(self, other):
return str(self) > other
def __le__(self, other):
return str(self) <= other
def __ge__(self, other):
return str(self) >= other
[docs]
def workflow_from_pipeline(pipeline, study_config=None, disabled_nodes=None,
jobs_priority=0, create_directories=True,
environment='global', check_requirements=True,
complete_parameters=False):
""" Create a soma-workflow workflow from a Capsul Pipeline
Parameters
----------
pipeline: Pipeline (mandatory)
a CAPSUL pipeline
study_config: StudyConfig (optional), or dict
holds information about file transfers and shared resource paths.
If not specified, it will be accessed through the pipeline.
disabled_nodes: sequence of pipeline nodes (Node instances) (optional)
such nodes will be disabled on-the-fly in the pipeline, file transfers
will be adapted accordingly (outputs may become inputs in the resulting
workflow), and temporary files will be checked. If a disabled node was
to produce a temporary files which is still used in an enabled node,
then a ValueError exception will be raised.
If disabled_nodes is not passed, they will possibly be taken from the
pipeline (if available) using disabled steps:
see Pipeline.define_steps()
jobs_priority: int (optional, default: 0)
set this priority on soma-workflow jobs.
create_directories: bool (optional, default: True)
if set, needed output directories (which will contain output files)
will be created in a first job, which all other ones depend on.
environment: str (default: "global")
configuration environment name (default: "global"). See
:class:`capsul.engine.CapsulEngine` and
:class:`capsul.engine.settings.Settings`.
check_requirements: bool (default: True)
if True, check the pipeline nodes requirements in the capsul engine
settings, and issue an exception if they are not met instead of
proceeding with the workflow.
complete_parameters: bool (default: False)
Perform parameters completion on the input pipeline while building
the workflow. The default is False because we should avoid to do things
several times when it's already done, but in iteration nodes,
completion needs to be done anyway for each iteration, so this option
offers to do the rest of the "parent" pipeline completion.
Returns
-------
workflow: Workflow
a soma-workflow workflow
"""
def _files_group(path, merged_formats):
bname = os.path.basename(path)
l0 = len(path) - len(bname)
p0 = 0
paths = [path]
while True:
p = bname.find('.', p0)
if p < 0:
break
ext = bname[p:]
p0 = p + 1
format_def = merged_formats.get(ext)
if format_def:
path0 = path[:l0 + p]
paths += [path0 + e[0] for e in format_def]
break
paths.append(path + '.minf')
return paths
def _translated_path(path, shared_map, shared_paths, trait=None):
if path is None or path is Undefined \
or not shared_paths \
or (trait is not None
and not isinstance(trait.trait_type, File) \
and not isinstance(trait.trait_type, Directory)):
return None # not a path
item = shared_map.get(path)
if item is not None:
# already in map
return item
for base_dir, (namespace, uuid) in six.iteritems(shared_paths):
if path.startswith(base_dir + os.sep):
rel_path = path[len(base_dir)+1:]
#uuid = path
item = swclient.SharedResourcePath(
rel_path, namespace, uuid=uuid)
shared_map[path] = item
return item
return None
def _replace_in_list(rlist, temp_map):
for i, item in enumerate(rlist):
if isinstance(item, (list, tuple, set)):
deeperlist = list(item)
_replace_in_list(deeperlist, temp_map)
if isinstance(item, tuple):
deeperlist = tuple(deeperlist)
elif isinstance(item, set):
deeperlist = set(deeperlist)
rlist[i] = deeperlist
#elif item is Undefined:
#rlist[i] = ''
elif isinstance(item, (dict, OrderedDict, SortedDictionary)):
_replace_in_dict(item, temp_map)
elif item in temp_map:
value = temp_map[item]
value = value.__class__(value)
if hasattr(item, 'pattern'):
# temp case (differs from shared case)
value.pattern = item.pattern
rlist[i] = value
def _replace_in_dict(rdict, temp_map):
for name, item in six.iteritems(rdict):
if isinstance(item, (list, tuple, set)):
deeperlist = list(item)
_replace_in_list(deeperlist, temp_map)
if isinstance(item, tuple):
deeperlist = tuple(deeperlist)
elif isinstance(item, set):
deeperlist = set(deeperlist)
rdict[name] = deeperlist
#elif item is Undefined:
#rdict[name] = ''
elif isinstance(item, (dict, OrderedDict, SortedDictionary)):
_replace_in_dict(item, temp_map)
elif item in temp_map:
value = temp_map[item]
value = value.__class__(value)
if hasattr(item, 'pattern'):
# temp case (differs from shared case)
value.pattern = item.pattern
rdict[name] = value
def _get_replaced(rlist, temp_map):
if isinstance(rlist, (dict, OrderedDict, SortedDictionary)):
return _get_replaced(list(rlist.values()), temp_map)
if isinstance(rlist, (list, tuple)):
l = [x for x in [_get_replaced(y, temp_map) for y in rlist]
if x is not None]
out = []
while l:
item = l.pop(0)
if isinstance(item, list):
l = item + l
else:
out.append(item)
return out
if rlist not in temp_map:
if isinstance(rlist, swclient.SpecialPath):
return rlist
return None
value = temp_map[rlist]
if hasattr(rlist, 'pattern'):
# temp case (differs from shared case)
value.pattern = rlist.pattern
return [value]
def _replace_transfers(rlist, process, itransfers, otransfers):
param_name = None
i = 3
for item in rlist[3:]:
if param_name is None:
param_name = item
else:
transfer = itransfers.get(param_name)
if transfer is None:
transfer = otransfers.get(param_name)
if transfer is not None:
value = transfer[0]
if isinstance(item, (list, tuple)):
# TODO: handle lists of files [transfers]
#deeperlist = list(item)
#_replace_in_list(deeperlist, transfers)
#rlist[i] = deeperlist
print('*** LIST! ***')
else:
rlist[i] = value
param_name = None
i += 1
def _replace_dict_transfers(rdict, process, itransfers, otransfers):
for param_name, item in six.iteritems(rdict):
transfer = itransfers.get(param_name)
if transfer is None:
transfer = otransfers.get(param_name)
if transfer is not None:
value = transfer[0]
if isinstance(item, (list, tuple)):
# TODO: handle lists of files [transfers]
#deeperlist = list(item)
#_replace_in_list(deeperlist, transfers)
#rdict[param_name] = deeperlist
print('*** LIST! ***')
else:
rdict[param_name] = value
def build_job(process, temp_map={}, shared_map={}, transfers=[{}, {}],
shared_paths={}, forbidden_temp=set(), name='', priority=0,
step_name='', engine=None, environment='global'):
""" Create a soma-workflow Job from a Capsul Process
Parameters
----------
process: Process (mandatory) or custom Node
a CAPSUL process instance or custom Node instance
temp_map: dict (optional)
temporary paths map.
shared_map: dict (optional)
file shared translated paths, global pipeline dict.
This dict is updated when needed during the process.
shared_paths: dict (optional)
holds information about shared resource paths base dirs for
soma-workflow.
If not specified, no translation will be used.
forbidden_temp: dict
values forbidden for temporary files (?)
name: string (optional)
job name. If empty, use the process name.
priority: int (optional)
priority assigned to the job
step_name: str (optional)
the step name will be stored in the job user_storage variable
engine: CapsulEngine (optional)
used to check configuration requirements for non-process nodes
environment: str (default: "global")
configuration environment name (default: "global"). See
:class:`capsul.engine.CapsulEngine` and
:class:`capsul.engine.settings.Settings`.
Returns
-------
job: Job
a soma-workflow Job instance that will execute the CAPSUL process
"""
job_name = name
if not job_name:
job_name = process.name
# check for special modified paths in parameters
input_replaced_paths = []
output_replaced_paths = []
param_dict = {}
has_outputs = False
forbidden_traits = ('nodes_activation', 'selection_changed',
'activated', 'enabled', 'name', 'node_type', )
for param_name, parameter in six.iteritems(process.user_traits()):
if param_name not in forbidden_traits:
if parameter.output \
and (parameter.input_filename is False
or not (isinstance(parameter.trait_type,
(File, Directory))
or (isinstance(parameter.trait_type,
List)
and isinstance(
parameter.inner_traits[0].trait_type,
(File, Directory))))):
has_outputs = True
value = getattr(process, param_name)
if isinstance(value, list):
values = value
else:
values = [value]
for value in values:
if isinstance(value, TempFile):
# duplicate swf temp and copy pattern into it
tval = temp_map[value]
tval = tval.__class__(tval)
tval.pattern = value.pattern
if parameter.output:
output_replaced_paths.append(tval)
else:
if value in forbidden_temp:
raise ValueError(
'Temporary value used cannot be generated '
'in the workflkow: %s.%s'
% (job_name, param_name))
input_replaced_paths.append(tval)
else:
_translated_path(value, shared_map, shared_paths,
parameter)
# Get the process command line
#process_cmdline = process.get_commandline()
process_cmdline = process.params_to_command()
# and replace in commandline
iproc_transfers = transfers[0].get(process, {})
oproc_transfers = transfers[1].get(process, {})
#proc_transfers = dict(iproc_transfers)
#proc_transfers.update(oproc_transfers)
_replace_in_list(process_cmdline, temp_map)
_replace_in_list(process_cmdline, shared_map)
_replace_transfers(
process_cmdline, process, iproc_transfers, oproc_transfers)
config = {}
config = process.check_requirements(environment)
if config is None:
# here we bypass unmet requirements, it's not our job here.
config = {}
use_input_params_file = False
if process_cmdline[0] == 'capsul_job':
# use python executable from config, if any
pconf = config.get('capsul.engine.module.python')
if not pconf:
pconf = process.get_study_config().engine.settings. \
select_configurations(environment, {'python': 'any'})
if pconf:
if not config:
config = pconf
else:
if 'capsul.engine.module.python' in pconf:
config['capsul.engine.module.python'] \
= pconf['capsul.engine.module.python']
uses = pconf.get('capsul_engine', {}).get('uses', {})
if uses:
config.setdefault('capsul_engine', {}).setdefault(
'uses', {}).update(uses)
python_command = pconf.get(
'capsul.engine.module.python', {}).get('executable')
if not python_command:
python_command = os.path.basename(sys.executable)
path_trick = ''
# python path cannot be passed in a library since the access to
# this library (capsul module typically) may be conditioned by
# this path. We cannot use PYTHONPATH env either because it would
# completely erase any user settings (.bashrc). So we add it here.
ppath = pconf.get(
'capsul.engine.module.python', {}).get('path')
if ppath:
path_trick = 'import sys; sys.path = %s + sys.path; ' \
% repr(ppath)
process_cmdline = [
'capsul_job', python_command, '-c',
'%sfrom capsul.api import Process; '
'Process.run_from_commandline("%s")'
% (path_trick, process_cmdline[1])]
use_input_params_file = True
param_dict = process.export_to_dict(exclude_undefined=True)
elif process_cmdline[0] in ('json_job', 'custom_job'):
use_input_params_file = True
param_dict = process.export_to_dict(exclude_undefined=True)
for name in forbidden_traits:
if name in param_dict:
del param_dict[name]
_replace_in_dict(param_dict, temp_map)
_replace_in_dict(param_dict, shared_map)
_replace_dict_transfers(
param_dict, process, iproc_transfers, oproc_transfers)
# handle native specification (cluster-specific specs as in
# soma-workflow)
native_spec = getattr(process, 'native_specification', None)
# Return the soma-workflow job
if process_cmdline[0] == 'custom_job':
job = build_custom_job(
process, process_cmdline, name=job_name,
referenced_input_files
=input_replaced_paths \
+ [x[0] for x in iproc_transfers.values()],
referenced_output_files
=output_replaced_paths \
+ [x[0] for x in oproc_transfers.values()],
param_dict=param_dict)
else:
job = swclient.Job(
name=job_name,
command=process_cmdline[1:],
referenced_input_files
=input_replaced_paths \
+ [x[0] for x in iproc_transfers.values()],
referenced_output_files
=output_replaced_paths \
+ [x[0] for x in oproc_transfers.values()],
priority=priority,
native_specification=native_spec,
param_dict=param_dict,
use_input_params_file=use_input_params_file,
has_outputs=has_outputs)
# print('job command:', job.command)
# handle parallel job info (as in soma-workflow)
parallel_job_info = getattr(process, 'parallel_job_info', None)
if parallel_job_info:
job.parallel_job_info = parallel_job_info
if step_name:
job.user_storage = step_name
if hasattr(process, 'uuid'):
# propagate the process uuid, if any, to maintain link with it
job.uuid = process.uuid
job.configuration = config
# associate job with process
job.process = weakref.ref(process)
job._do_not_pickle = ['process']
job.process_hash = id(process)
return job
def build_custom_job(node, process_cmdline, name,
referenced_input_files, referenced_output_files,
param_dict):
''' Build a custom job (generally running on engine side) from a
custom pipeline node.
'''
if hasattr(node, 'build_job'):
job = node.build_job(
name=name, referenced_input_files=referenced_input_files,
referenced_output_files=referenced_output_files,
param_dict=param_dict)
return job
return None
def build_group(name, jobs):
""" Create a group of jobs
Parameters
----------
name: str (mandatory)
the group name
jobs: list of Job (mandatory)
the jobs we want to insert in the group
Returns
-------
group: Group
the soma-workflow Group instance
"""
return swclient.Group(jobs, name=name)
def get_jobs(group, groups):
gqueue = list(group.elements)
jobs = []
while gqueue:
group_or_job = gqueue.pop(0)
if group_or_job in groups:
gqueue += group_or_job.elements
else:
jobs.append(group_or_job)
return jobs
def assign_temporary_filenames(pipeline, count_start=0):
''' Find and temporarily assign necessary temporary file names'''
temp_filenames = pipeline.find_empty_parameters()
temp_map = {}
count = count_start
for node, plug_name, optional in temp_filenames:
if hasattr(node, 'process'):
process = node.process
else:
process = node
trait = process.trait(plug_name)
if trait.input_filename is False:
# filename is explicitly an output: not a temporary.
continue
is_list = isinstance(trait.trait_type, List)
values = []
if is_list:
todo = getattr(process, plug_name)
trait = trait.inner_traits[0]
if trait.input_filename is False:
# filename is explicitly an output: not a temporary.
continue
else:
todo = [Undefined]
for item in todo:
if item not in (Undefined, '', None):
# non-empty list element
values.append(item)
continue
is_directory = isinstance(trait.trait_type, Directory)
if trait.allowed_extensions:
suffix = trait.allowed_extensions[0]
else:
suffix = ''
swf_tmp = swclient.TemporaryPath(is_directory=is_directory,
suffix=suffix, name='temporary_%d' % count)
tmp_file = TempFile('%d' % count)
count += 1
temp_map[tmp_file] = (swf_tmp, node, plug_name, optional)
values.append(tmp_file)
# set a TempFile value to identify the params / value
if is_list:
setattr(process, plug_name, values)
else:
setattr(process, plug_name, values[0])
return temp_map
def restore_empty_filenames(temporary_map):
''' Set back Undefined values to temporarily assigned file names (using
assign_temporary_filenames()
'''
for tmp_file, item in six.iteritems(temporary_map):
node, plug_name = item[1:3]
if hasattr(node, 'process'):
process = node.process
else:
process = node
value = getattr(process, plug_name)
if isinstance(value, list):
# FIXME TODO: only restore values in list which correspond to
# a temporary.
# Problem: they are sometimes transformed into strings
# FIXME: several temp items can be part of the same list,
# so this assignment is likely to be done several times.
# It could probably be optimized.
# WARNING: we set "" values instead of Undefined because they may
# be mandatory
setattr(process, plug_name, [''] * len(value))
else:
setattr(process, plug_name, Undefined)
def _get_swf_paths(engine, environment):
resource_conf = engine.settings.select_configurations(
environment, {'somaworkflow': 'config_id=="somaworkflow"'})
if 'capsul.engine.module.somaworkflow' in resource_conf:
resource_conf = resource_conf['capsul.engine.module.somaworkflow']
return (resource_conf.get('transfer_paths', []),
resource_conf.get('path_translations', {}))
# otherwise fallback to old StudyConfig config -- OBSOLETE...
study_config = engine.study_config
computing_resource = getattr(
study_config, 'somaworkflow_computing_resource', None)
if computing_resource in (None, Undefined):
return [], {}
resources_conf = getattr(
study_config, 'somaworkflow_computing_resources_config', None)
if resources_conf in (None, Undefined):
return [], {}
resource_conf = getattr(resources_conf, computing_resource, None)
if resource_conf in (None, Undefined):
return [], {}
return (resource_conf.transfer_paths,
resource_conf.path_translations.export_to_dict())
def _propagate_transfer(node, param, path, output, transfers,
transfer_item):
todo_plugs = [(node, param, output)]
done_plugs = set()
while todo_plugs:
node, param, output = todo_plugs.pop()
plug = node.plugs[param]
is_pipeline = False
if plug is None or not plug.enabled or not plug.activated \
or plug in done_plugs:
continue
done_plugs.add(plug)
if isinstance(node, Switch):
if output:
# propagate to active input
other_param = node.switch + '_switch_' + param
#plug = node.plugs[input_param]
else:
other_param = param[len(node.switch + '_switch_'):]
#other_plug = node.plugs[other_param]
todo_plugs.append((node, other_param, not output))
else:
process = node.process
if hasattr(process, 'nodes'):
is_pipeline = True
#plug = process.nodes[''].plugs.get(param)
else:
# process: replace its param
# check trait type (must be File or Directory, not Any)
trait = process.user_traits()[param]
#plug = node.plugs[param]
if isinstance(trait.trait_type, File) \
or isinstance(trait.trait_type, Directory):
transfers[bool(trait.output)].setdefault(process, {})[
param] = (transfer_item, path)
if trait.output:
# it's an output of a "real" process: the transfer
# will be as output only
transfer_item.initial_status \
= swclient.constants.FILES_DO_NOT_EXIST
#output = not output # invert IO status
#if plug is None or not plug.enabled or not plug.activated \
#or plug in done_plugs:
#continue
if output:
links = plug.links_to
else:
links = plug.links_from
for proc_name, param_name, node, other_plug, act in links:
if not node.activated or not node.enabled \
or not other_plug.activated or not other_plug.enabled \
or other_plug in done_plugs:
continue
todo_plugs.append((node, param_name, not output))
if is_pipeline:
# in a pipeline node, go both directions
if output:
links = plug.links_from
else:
links = plug.links_to
for proc_name, param_name, node, plug, act in links:
if not node.activated or not node.enabled \
or not plug.activated or not plug.enabled \
or plug in done_plugs:
continue
todo_plugs.append((node, param_name, output))
def _get_transfers(pipeline, transfer_paths, merged_formats):
""" Create and list FileTransfer objects needed in the pipeline.
Parameters
----------
pipeline: Pipeline
pipeline to build workflow for
transfer_paths: list
paths basedirs for translations from soma-worflow config
Returns
-------
[in_transfers, out_transfers]
each of which is a dict: { Process: proc_dict }
proc_dict is a dict: { file_path : FileTransfer object }
FileTransfer objects are reused when referring to the same path
used from different processes within the pipeline.
"""
in_transfers = {}
out_transfers = {}
transfers = [in_transfers, out_transfers]
todo_nodes = [pipeline.pipeline_node]
while todo_nodes:
node = todo_nodes.pop(0)
if hasattr(node, 'process'):
process = node.process
else:
process = node
for param, trait in six.iteritems(process.user_traits()):
if isinstance(trait.trait_type, File) \
or isinstance(trait.trait_type, Directory) \
or type(trait.trait_type) is Any:
# is value in paths
path = getattr(process, param)
if path is None or path is Undefined \
or isinstance(path, TempFile) or path == '':
continue
output = bool(trait.output)
existing_transfers = transfers[output].get(process, {})
existing_transfer = existing_transfers.get(param)
if existing_transfer:
continue
for tpath in transfer_paths:
if path.startswith(os.path.join(tpath, '')):
transfer_item = swclient.FileTransfer(
is_input=not output,
client_path=path,
client_paths=_files_group(path,
merged_formats))
_propagate_transfer(node, param,
path, not output, transfers,
transfer_item)
break
if hasattr(process, 'nodes'):
todo_nodes += [sub_node
for name, sub_node
in six.iteritems(process.nodes)
if name != ''
and not isinstance(sub_node, Switch)]
return transfers
def _expand_nodes(nodes):
'''Expands the nodes list or set to leaf nodes by replacing pipeline
nodes by their children list.
Returns
-------
set of leaf nodes.
'''
nodes_list = list(nodes)
expanded_nodes = set()
while nodes_list:
node = nodes_list.pop(0)
if not hasattr(node, 'process'):
continue # switch or something
if isinstance(node.process, Pipeline):
nodes_list.extend([p for p
in six.itervalues(node.process.nodes)
if p is not node])
else:
expanded_nodes.add(node)
return expanded_nodes
def _handle_disable_nodes(pipeline, temp_map, transfers, disabled_nodes):
'''Take into account disabled nodes by changing FileTransfer outputs
for such nodes to inputs, and recording output temporary files, so as
to ensure that missing temporary outputs will not be used later in
the workflow.
disabled_nodes should be a list, or preferably a set, of leaf process
nodes. Use _expand_nodes() if needed before calling
_handle_disable_nodes().
'''
move_to_input = {}
remove_temp = set()
for node in disabled_nodes:
if not hasattr(node, 'process'):
continue # switch or something else
process = node.process
otrans = transfers[1].get(process, None)
for param, trait in six.iteritems(process.user_traits()):
if trait.output and (isinstance(trait.trait_type, File) \
or isinstance(trait.trait_type, Directory) \
or type(trait.trait_type) is Any):
path = getattr(process, param)
if otrans is not None:
transfer, path2 = otrans.get(param, (None, None))
else:
transfer = None
if transfer is not None:
print('transfered output path:', path,
'from: %s.%s changes to input.'
% (node.name, param))
move_to_input[path] = transfer
transfer.initial_status \
= swclient.constants.FILES_ON_CLIENT
elif path in temp_map:
print('temp path in: %s.%s will not be produced.'
% (node.name, param))
remove_temp.add(path)
return move_to_input, remove_temp
def iter_to_workflow(process, node_name, step_name, temp_map,
shared_map, transfers, shared_paths,
disabled_nodes, remove_temp,
steps, study_config, iteration, map_job=None,
reduce_job=None, environment='global'):
'''
Build a workflow for a single iteration step of a process/sub-pipeline
is called for each iteration by build_iteration()
Jobs inserted are tuples (process, iteration) because each job will be
converted into as many jobs as the number of iterations.
Returns
-------
(jobs, dependencies, groups, root_jobs, links)
'''
if isinstance(process, Pipeline):
temp_map2 = assign_temporary_filenames(process, len(temp_map))
temp_subst_list = [(x1, x2[0]) for x1, x2
in six.iteritems(temp_map2)]
temp_subst_map = dict(temp_subst_list)
temp_subst_map.update(temp_map)
try:
(jobs1, dependencies, groups, sub_root_jobs, plinks, nodes) = \
workflow_from_pipeline_structure(
process, temp_subst_map, shared_map, transfers,
shared_paths, disabled_nodes=disabled_nodes,
forbidden_temp=remove_temp, steps=steps,
study_config=study_config, environment=environment)
jobs = {}
for proc, job in six.iteritems(jobs1):
jobs[(proc, iteration)] = job
groot = []
for j in list(sub_root_jobs.values()):
if isinstance(j, list):
groot += j
else:
groot.append(j)
links = {}
for dproc, dplink in six.iteritems(plinks):
dlink = {}
links[(dproc, iteration)] = dlink
for param, linkl in six.iteritems(dplink):
dlink[param] = [((link[0], iteration), link[1])
for link in linkl]
group = build_group(node_name, groot)
groups[(process, iteration)] = group
root_jobs = {(process, iteration): group}
finally:
restore_empty_filenames(temp_map2)
elif isinstance(process, ProcessIteration):
# sub-iteration
pipeline = Pipeline()
pipeline.set_study_config(study_config)
pipeline.add_process('iter', process.process)
return build_iteration(pipeline, step_name, temp_map=temp_map,
shared_map=shared_map,
transfers=transfers,
shared_paths=shared_paths,
disabled_nodes=disabled_nodes,
remove_temp=remove_temp,
steps=steps,
study_config=study_config,
environment=environment)
else:
# single process
job = build_job(process, temp_map, shared_map,
transfers, shared_paths,
forbidden_temp=remove_temp,
name=node_name,
priority=jobs_priority,
step_name=step_name,
engine=getattr(study_config, 'engine', None),
environment=environment)
jobs = {(process, iteration): job}
groups = {}
dependencies = {}
links = {}
root_jobs = {(process, iteration): job}
nodes = []
return (jobs, dependencies, groups, root_jobs, links, []) # nodes)
def build_iteration(it_node, step_name, temp_map,
shared_map, transfers, shared_paths, disabled_nodes,
remove_temp, steps, study_config={},
environment='global'):
'''
Build workflow for an iterative process: the process / sub-pipeline is
filled with appropriate parameters for each iteration, and its
workflow is generated.
Returns
-------
(jobs, dependencies, groups, root_jobs, links, nodes)
'''
it_process = it_node.process
completion_engine = ProcessCompletionEngine.get_completion_engine(
it_process)
# # check if it is an iterative completion engine
# if hasattr(completion_engine, 'complete_iteration_step'):
size = completion_engine.iteration_size()
#no_output_value = None
#size = None
#size_error = False
## calculate the number of iterations
#for parameter in it_process.iterative_parameters:
#trait = it_process.trait(parameter)
#psize = len(getattr(it_process, parameter))
#if psize:
#if size is None:
#size = psize
#elif size != psize:
#size_error = True
#break
#if trait.output:
#if no_output_value is None:
#no_output_value = False
#elif no_output_value:
#size_error = True
#break
#else:
#if trait.output:
#if no_output_value is None:
#no_output_value = True
#elif not no_output_value:
#size_error = True
#break
#if size_error:
#raise ValueError('Iterative parameter values must be lists of the '
#'same size: %s' % ','.join('%s=%d'
#% (n, len(getattr(it_process, n)))
#for n in it_process.iterative_parameters))
jobs = {}
workflows = []
for parameter in it_process.regular_parameters:
setattr(it_process.process, parameter,
getattr(it_process, parameter))
jobs = {}
dependencies = set()
groups = {}
root_jobs = {}
links = {}
nodes = []
if size == 0:
return (jobs, dependencies, groups, root_jobs, links, nodes)
#if no_output_value:
## this case is a "really" dynamic iteration, the number of
## iterations and parameters are determined in runtime, so we
## cannot handle it at the moment.
#raise ValueError('Dynamic iteration is not handled in this '
#'version of CAPSUL / Soma-Workflow')
#for parameter in it_process.iterative_parameters:
#trait = it_process.trait(parameter)
#if trait.output:
#setattr(it_process, parameter, [])
#outputs = {}
#for iteration in range(size):
#for parameter in it_process.iterative_parameters:
#if not no_output_value \
#or not it_process.trait(parameter).output:
#values = getattr(it_process, parameter)
#if len(values) != 0:
#if len(values) > iteration:
#setattr(it_process.process, parameter,
#values[iteration])
#else:
#setattr(it_process.process, parameter,
#values[-1])
## operate completion
#complete_iteration(it_process, iteration)
##workflow = workflow_from_pipeline(it_process.process,
##study_config=study_config)
##workflows.append(workflow)
#for parameter in it_process.iterative_parameters:
#trait = it_process.trait(parameter)
#if trait.output:
#outputs.setdefault(
#parameter,[]).append(getattr(it_process.process,
#parameter))
#for parameter, value in six.iteritems(outputs):
#setattr(it_process, parameter, value)
#else:
if True:
# iterations are built using
# * a map job to dispatch input lists
# * the iterated process or pipeline jobs, duplicated for each
# iteration
# * a reduce job to gather outputs into lists
# dependencies and parameters links have to be built.
# links to processes outside the iteration are made, in order to
# connect the iteration node to its neighbors.
# collect iterated inputs / outputs
map_param_dict = {}
forbidden_traits = ('nodes_activation', 'selection_changed',
'pipeline_steps', 'visible_groups',
'protected_parameters', )
# copy non-iterative inputs
for param, trait in six.iteritems(it_process.user_traits()):
if param in forbidden_traits:
continue # skip
value = getattr(it_process, param)
if not trait.output:
map_param_dict[param] = value
in_params = [p for p in it_process.iterative_parameters
if not it_process.trait(p).output]
out_params = [p for p in it_process.iterative_parameters
if it_process.trait(p).output]
in_values = [getattr(it_process, param) for param in in_params]
out_values = [getattr(it_process, param) for param in out_params]
# build map and reduce nodes
map_param_dict.update({
'input_names': in_params,
'output_names': ['%s' % p + '_%d' for p in in_params],
})
reduce_param_dict = {}
for param, trait in six.iteritems(it_process.user_traits()):
if trait.output and param not in forbidden_traits:
value = getattr(it_process, param)
reduce_param_dict[param] = value
if param in out_params:
# set input params of reduce node for a non-dynamic
# reduce case (outputs are set from outside)
for i, p in enumerate(value):
reduce_param_dict['%s_%d' % (param, i)] = p
reduce_param_dict.update({
'input_names': ['%s' % p + '_%d' for p in out_params],
'output_names': out_params,
'lengths': [size] * len(out_params),
})
# replace special paths in map/reduce parameters
_replace_in_list(in_values, temp_map)
in_values = _get_replaced(in_values, shared_map)
_replace_in_list(out_values, temp_map)
out_values = _get_replaced(out_values, shared_map)
_replace_in_dict(map_param_dict, temp_map)
_replace_in_dict(map_param_dict, shared_map)
_replace_dict_transfers(
map_param_dict, it_process, transfers[0], transfers[1])
_replace_in_dict(reduce_param_dict, temp_map)
_replace_in_dict(reduce_param_dict, shared_map)
_replace_dict_transfers(
reduce_param_dict, it_process, transfers[0], transfers[1])
map_job = MapJob(
referenced_input_files=in_values,
referenced_output_files=in_values,
name=it_process.process.name + '_map',
param_dict=map_param_dict)
reduce_job = ReduceJob(
referenced_input_files=out_values,
referenced_output_files=out_values,
name=it_process.process.name + '_reduce',
param_dict=reduce_param_dict)
map_job.process_hash = id(it_process)
reduce_job.process_hash = id(it_process)
# connect inputs of the map node, outputs to reduce node,
# and record connections to iterated jobs
map_links = {}
map_iter_links = {}
reduce_iter_links = {}
red_iter_links = {}
for param, plug in six.iteritems(it_node.plugs):
if param in forbidden_traits:
continue
if not plug.output:
# connect inputs of the map node
sources = pipeline_tools.find_plug_connection_sources(
it_node.plugs[param], it_node)
for pnode, pparam, pparent in sources:
pproc = pnode
if hasattr(pnode, 'process'):
pproc = pnode.process
map_links.setdefault(param, []).append((pproc, pparam))
# record dest of links in iterated nodes
if isinstance(it_process.process, Pipeline):
dest = \
pipeline_tools.find_plug_connection_destinations(
it_process.process.pipeline_node.plugs[param],
it_process.process.pipeline_node)
for pnode, pparam, pparent in dest:
pproc = pnode
if hasattr(pnode, 'process'):
pproc = pnode.process
map_iter_links.setdefault(pproc, {}) \
.setdefault(pparam, []).append(
(map_job, param))
else:
map_iter_links.setdefault(it_process.process, {}) \
.setdefault(param, []).append(
(map_job, param))
else:
# connect outputs of the reduce node
dest = pipeline_tools.find_plug_connection_destinations(
it_node.plugs[param], it_node)
for pnode, pparam, pparent in dest:
pproc = pnode
if hasattr(pnode, 'process'):
pproc = pnode.process
links.setdefault(pproc, {}).setdefault(pparam, []) \
.append((reduce_job, param))
# record source of links in iterated nodes
if isinstance(it_process.process, Pipeline):
#print('reduce from pipeline', param)
sources = \
pipeline_tools.find_plug_connection_sources(
it_process.process.pipeline_node.plugs[param],
it_process.process.pipeline_node)
#print('sources:', sources)
for pnode, pparam, pparent in sources:
pproc = pnode
if hasattr(pnode, 'process'):
pproc = pnode.process
red_iter_links.setdefault(param, []).append(
(pproc, pparam))
else:
red_iter_links.setdefault(param, []).append(
(it_process.process, param))
links[map_job] = map_links
reduce_iter_links[reduce_job] = red_iter_links
jobs[map_job] = map_job
jobs[reduce_job] = reduce_job
root_jobs[map_job] = map_job
root_jobs[reduce_job] = reduce_job
# iterate the iterates process / pipeline
iter_values = {}
for iteration in range(size):
for parameter in it_process.iterative_parameters:
if it_process.process.trait(parameter).input_filename \
is False:
# dynamic output has no forced value
continue
values = getattr(it_process, parameter)
if len(values) != 0:
if len(values) > iteration:
setattr(it_process.process, parameter,
values[iteration])
else:
setattr(it_process.process, parameter, values[-1])
# operate completion
complete_iteration(it_process, iteration)
# get iteration values to set on the parent iter node
for parameter in it_process.iterative_parameters:
if it_process.process.trait(parameter).input_filename \
is False:
# dynamic output has no forced value
continue
value = getattr(it_process.process, parameter)
iter_values.setdefault(parameter, []).append(value)
# build a workflow for the job / pipeline iteration
process_name = it_process.process.name + '_%d' % iteration
(sub_jobs, sub_dependencies, sub_groups, sub_root_jobs,
sub_links, sub_nodes) = \
iter_to_workflow(it_process.process, process_name,
step_name,
temp_map, shared_map, transfers,
shared_paths, disabled_nodes, remove_temp, steps,
study_config, iteration, map_job=map_job,
reduce_job=reduce_job, environment=environment)
nodes += sub_nodes
jobs.update(sub_jobs)
dependencies.update(sub_dependencies)
groups.update(sub_groups)
root_jobs.update(sub_root_jobs)
links.update(sub_links)
# connect map / reduce nodes to iterated jobs
for proc, dlink in six.iteritems(map_iter_links):
slink = links.setdefault((proc, iteration), {})
for dparam, linkl in six.iteritems(dlink):
l = slink.setdefault(dparam, [])
for link in linkl:
if link[1] in in_params:
# iterative param
l.append((link[0],
'%s_%d' % (link[1], iteration)))
else:
l.append(link)
for proc, dlink in six.iteritems(reduce_iter_links):
for dparam, linkl in six.iteritems(dlink):
if dparam in out_params:
# iterative param
dparam = '%s_%d' % (dparam, iteration)
for link in linkl:
links.setdefault(proc, {}) \
.setdefault(dparam, []) \
.append(((link[0], iteration), link[1]))
# set values on the iter node (what a full completion does, but we're
# doing it only partially here)
for param, value in iter_values.items():
setattr(it_process, param, value)
# the iteration process is not a single job, but can be reached
# (for links) through the map and reduce nodes. So we record a tuple
# (map, reduce) here for this special node.
jobs[it_process] = (map_job, reduce_job) # special job(s)
return (jobs, dependencies, groups, root_jobs, links, nodes)
def complete_iteration(it_process, iteration):
completion_engine = ProcessCompletionEngine.get_completion_engine(
it_process)
# check if it is an iterative completion engine
if hasattr(completion_engine, 'complete_iteration_step'):
completion_engine.complete_iteration_step(iteration)
def workflow_from_pipeline_structure(
pipeline, temp_map={}, shared_map={},
transfers=[{}, {}], shared_paths={},
disabled_nodes=set(), forbidden_temp=set(),
jobs_priority=0, steps={}, current_step='',
study_config={}, with_links=True, environment='global'):
""" Convert a CAPSUL pipeline into a soma-workflow workflow
Parameters
----------
pipeline: Pipeline (mandatory)
a CAPSUL pipeline
temp_map: dict (optional)
temporary files to replace by soma_workflow TemporaryPath objects
shared_map: dict (optional)
shared translated paths maps (global to pipeline).
This dict is updated when needed during the process.
transfers: list of 2 dicts (optional)
File transfers dicts (input / output), indexed by process, then by
file path.
shared_paths: dict (optional)
holds information about shared resource paths from soma-worflow
section in study config.
If not specified, no translation will be used.
jobs_priority: int (optional, default: 0)
set this priority on soma-workflow jobs.
steps: dict (optional)
node name -> step name dict
current_step: str (optional)
the parent node step name
study_config: StydyConfig instance (optional)
used only for iterative nodes, to be passed to create sub-workflows
with_links: bool (optional)
follow links to include other dependencies
environment: str (default: "global")
configuration environment name (default: "global"). See
:class:`capsul.engine.CapsulEngine` and
:class:`capsul.engine.settings.Settings`.
Returns
-------
workflow: tuple (jobs, dependencies, groups, root_jobs, links, nodes)
the corresponding soma-workflow workflow definition (to be passed
to Workflow constructor)
"""
def _update_links(links1, links2):
for dest_node, slink in six.iteritems(links2):
nlink = links1.setdefault(dest_node, {})
nlink.update(slink)
jobs = {}
groups = {}
root_jobs = {}
dependencies = set()
group_nodes = {}
links = {}
nodes = [(pipeline, node_name, node)
for node_name, node in six.iteritems(pipeline.nodes)
if node.activated and node.enabled
and node is not pipeline.pipeline_node]
all_nodes = []
engine = None
if study_config:
engine = getattr(study_config, 'engine', None)
# Go through all graph nodes
for node_desc in nodes:
n_pipeline, node_name, node = node_desc
if node in disabled_nodes \
or not pipeline_tools.is_node_enabled(n_pipeline,
node_name=node_name):
continue
if not node.is_job():
continue
all_nodes.append(node_desc)
if isinstance(node, PipelineNode):
# sub-pipeline
group_nodes[node_name] = node
step_name = current_step or steps.get(node_name, '')
(sub_jobs, sub_deps, sub_groups, sub_root_jobs, sub_links,
sub_nodes) \
= workflow_from_pipeline_structure(
node.process, temp_map, shared_map, transfers,
shared_paths, disabled_nodes,
jobs_priority=jobs_priority,
steps=steps, current_step=step_name, with_links=False,
environment=environment)
group = build_group(node_name,
sum(list(sub_root_jobs.values()), []))
groups[node] = group
root_jobs[node] = [group]
jobs.update(sub_jobs)
groups.update(sub_groups)
dependencies.update(sub_deps)
all_nodes += sub_nodes
#_update_links(links, sub_links)
# Otherwise convert all the processes to jobs
else:
sub_jobs = {}
process = None
if isinstance(node, Process):
process = node
elif hasattr(node, 'process'):
# process node
process = node.process
else:
process = node # custom node
step_name = current_step or steps.get(node.name)
if isinstance(process, ProcessIteration):
# iterative node
group_nodes.setdefault(
node_name, []).append(node)
sub_workflows = build_iteration(
node, step_name, temp_map,
shared_map, transfers, shared_paths,
disabled_nodes,
{}, steps, study_config={})
(sub_jobs, sub_deps, sub_groups, sub_root_jobs,
sub_links, sub_nodes) = sub_workflows
group = build_group(node_name, list(sub_root_jobs.values()))
groups.setdefault(process, []).append(group)
root_jobs.setdefault(process, []).append(group)
groups.update(sub_groups)
jobs.update(sub_jobs)
dependencies.update(sub_deps)
all_nodes += sub_nodes
_update_links(links, sub_links)
else:
job = build_job(process, temp_map, shared_map,
transfers, shared_paths,
forbidden_temp=forbidden_temp,
name=node_name,
priority=jobs_priority,
step_name=step_name,
engine=engine, environment=environment)
if job:
sub_jobs[process] = job
root_jobs[process] = [job]
jobs.update(sub_jobs)
# links / dependencies
if with_links:
for node_desc in all_nodes:
sub_pipeline, node_name, node = node_desc
dproc = getattr(node, 'process', node)
if isinstance(dproc, Pipeline):
continue # pipeline nodes are virtual
for param, plug in six.iteritems(node.plugs):
sources = pipeline_tools.find_plug_connection_sources(
plug, True)
for source in sources:
snode, param_name, parent = source
if node in disabled_nodes \
or not pipeline_tools.is_node_enabled(
pipeline, node=snode):
continue
process = getattr(snode, 'process', snode)
if not isinstance(snode, ProcessNode) \
and snode not in jobs:
# the node is a phantom node (switch). Add deps to
# all upstream nodes
new_nodes = [snode]
while new_nodes:
mnode = new_nodes.pop(0)
moredep = [
pipeline_tools.find_plug_connection_sources
(mlink[3])
for mplug in mnode.plugs.values()
for mlink in mplug.links_from
if not mplug.output
]
dependencies.update(
[(x[0].process, node.process)
for x in moredep
if isinstance(x[0], ProcessNode)])
new_nodes += [x[0] for x in moredep
if x[0] is not None
and not isinstance(
x[0], ProcessNode)]
elif isinstance(snode, PipelineNode):
# either link from main input, or from an
# unconnected optional input in a sub-pipeline:
# no dependency link
continue
else: # ProcessNode
sjob = jobs[process]
if isinstance(sjob, tuple): # iteration
sjob = sjob[1] # source
djob = jobs[dproc]
if isinstance(djob, tuple): # iteration
djob = djob[0] # destination
dependencies.add((sjob, djob))
trait = process.trait(param_name)
if trait.input_filename is False \
or (not isinstance(trait.trait_type,
(File, Directory)) \
and (not isinstance(trait.trait_type,
List)
or not isinstance(
trait.inner_traits[0],
(File, Directory)))):
links.setdefault(dproc, {}).setdefault(
param, []).append((process, param_name))
return jobs, dependencies, groups, root_jobs, links, all_nodes
def _create_directories_job(pipeline, shared_map={}, shared_paths={},
priority=0, transfer_paths=[]):
def _is_transfer(d, transfer_paths):
for path in transfer_paths:
if d.startswith(os.path.join(path, '')):
return True
return False
directories = [d
for d in pipeline_tools.get_output_directories(
pipeline)[1]
if not _is_transfer(d, transfer_paths)]
if len(directories) == 0:
return None # no dirs to create.
paths = []
# check for path translations
for path in directories:
new_path = _translated_path(path, shared_map, shared_paths)
paths.append(new_path or path)
# use a python command to avoid the shell command mkdir
cmdline = ['python', '-c',
'import sys, os; [os.makedirs(p) if not os.path.exists(p) '
'else None '
'for p in sys.argv[1:]]'] \
+ paths
job = swclient.Job(
name='output directories creation',
command=cmdline,
priority=priority)
return job
# TODO: handle formats in a separate, centralized place
# formats: {name: ext_props}
# ext_props: {ext: [dependent_exts]}
# dependent_exts: (ext, mandatory)
formats = {
'NIFTI-1': {'.nii': [], '.img': [('.hdr', True)], '.nii.gz': []},
'GIS': {'.ima': [('.dim', True)]},
'GIFTI': {'.gii': []},
'MESH': {'.mesh': []},
'ARG': {'.arg': [('.data', False)]},
}
# transform it to an ext-based dict
# merged_formats: {ext: [dependent_exts]}
# (formats names are lost here)
merged_formats = {}
for format, values in six.iteritems(formats):
merged_formats.update(values)
if study_config is None:
study_config = pipeline.get_study_config()
engine = study_config.engine
if check_requirements:
ml = []
if pipeline.check_requirements(environment, message_list=ml) is None:
raise ValueError(
'The pipeline requirements are not met in the current '
'configuration settings: %s' % str(ml))
temp_pipeline = False
if not isinstance(pipeline, Pipeline):
# "pipeline" is actually a single process (or should, if it is not a
# pipeline). Get it into a pipeline (with a single node) to make the
# workflow.
new_pipeline = Pipeline()
new_pipeline.set_study_config(study_config)
# "pipeline" is actually a single process, the name is ClassName_1
new_pipeline.add_process(pipeline.name.lower() + "_1", pipeline)
new_pipeline.autoexport_nodes_parameters(include_optional=True)
pipeline = new_pipeline
temp_pipeline = True
if complete_parameters:
completion = ProcessCompletionEngine.get_completion_engine(pipeline)
if completion:
attributes = completion.get_attribute_values()
completion.complete_parameters(complete_iterations=False)
temp_map = assign_temporary_filenames(pipeline)
temp_subst_list = [(x1, x2[0]) for x1, x2 in six.iteritems(temp_map)]
temp_subst_map = dict(temp_subst_list)
shared_map = {}
swf_paths = _get_swf_paths(engine, environment)
transfers = _get_transfers(pipeline, swf_paths[0], merged_formats)
# get complete list of disabled leaf nodes
if disabled_nodes is None:
disabled_nodes = pipeline.disabled_pipeline_steps_nodes()
disabled_nodes = disabled_nodes \
+ [name for name, node in six.iteritems(pipeline.nodes)
if node.node_type != 'processing_node'
and name not in disabled_nodes]
disabled_nodes = _expand_nodes(disabled_nodes)
move_to_input, remove_temp = _handle_disable_nodes(
pipeline, temp_subst_map, transfers, disabled_nodes)
#print('changed transfers:', move_to_input)
#print('removed temp:', remove_temp)
#print('temp_map:', temp_map, '\n')
#print('SWF transfers:', swf_paths[0])
#print('shared paths:', swf_paths[1])
if create_directories:
# create job
dirs_job = _create_directories_job(
pipeline, shared_map=shared_map, shared_paths=swf_paths[1],
transfer_paths=swf_paths[0])
# build steps map
steps = {}
if hasattr(pipeline, 'pipeline_steps'):
for step_name, step \
in six.iteritems(pipeline.pipeline_steps.user_traits()):
nodes = step.nodes
steps.update(dict([(node, step_name) for node in nodes]))
# actually build the workflow, recursively if needed.
try:
(jobs, dependencies, groups, root_jobs, links, nodes) \
= workflow_from_pipeline_structure(
pipeline, temp_subst_map, shared_map, transfers,
swf_paths[1],
disabled_nodes=disabled_nodes, forbidden_temp=remove_temp,
steps=steps, study_config=study_config,
environment=environment)
finally:
restore_empty_filenames(temp_map)
# post-process links to replace nodes with jobs
param_links = {}
# expand jobs map because jobs keys may be tuples (for iterations)
jobs_map = {}
for process, job in six.iteritems(jobs):
while isinstance(process, tuple):
process = process[0]
jobs_map.setdefault(process, []).append(job)
for dnode, dlinks in six.iteritems(links):
if isinstance(dnode, (Pipeline, ProcessIteration)) \
or (isinstance(dnode, tuple)
and isinstance(dnode[0], (Pipeline, ProcessIteration))):
continue # FIXME handle this
djlinks = {}
for param, linkl in six.iteritems(dlinks):
for link in linkl:
if link[0] is not pipeline \
and not isinstance(link[0],
(Pipeline, ProcessIteration)):
# FIXME handle ProcessIteration cases
if isinstance(link[0], tuple):
if not isinstance(link[0][0],
(Pipeline, ProcessIteration)):
djlinks.setdefault(param, []) \
.append((jobs[link[0]], link[1]))
else:
for job in jobs_map[link[0]]:
djlinks.setdefault(param, []) \
.append((job, link[1]))
if isinstance(dnode, tuple):
param_links[jobs[dnode]] = djlinks
else:
for job in jobs_map[dnode]:
param_links[job] = djlinks
all_jobs = [job for job in list(jobs.values()) if not isinstance(job, tuple)]
root_jobs = sum(list(root_jobs.values()), [])
# if directories have to be created, all other primary jobs will depend
# on this first one
if create_directories and dirs_job is not None:
dependend_jobs = set()
for dependency in dependencies:
dependend_jobs.add(dependency[1])
new_deps = [(dirs_job, job) for job in all_jobs
if job not in dependend_jobs]
dependencies.update(new_deps)
all_jobs.insert(0, dirs_job)
root_jobs.insert(0, dirs_job)
workflow = swclient.Workflow(jobs=all_jobs,
dependencies=dependencies,
root_group=root_jobs,
name=pipeline.name,
param_links=param_links)
# mark workflow with pipeline
workflow.pipeline = weakref.ref(pipeline)
if temp_pipeline:
# the pipeline is temporary - it will be deleted if we don't save a ref
# to it somewhere else in the workflow
workflow._pipeline = pipeline
workflow._do_not_pickle = ['pipeline', '_pipeline']
if hasattr(pipeline, 'uuid'):
workflow.uuid = pipeline.uuid
return workflow
[docs]
def workflow_run(workflow_name, workflow, study_config):
""" Create a soma-workflow controller and submit a workflow
Parameters
----------
workflow_name: str (mandatory)
the name of the workflow
workflow: Workflow (mandatory)
the soma-workflow workflow
study_config: StudyConfig (mandatory)
contains needed configuration through the SomaWorkflowConfig module
"""
swm = study_config.modules['SomaWorkflowConfig']
swm.connect_resource()
controller = swm.get_workflow_controller()
resource_id = swm.get_resource_id()
queue = None
if hasattr(study_config.somaworkflow_computing_resources_config,
resource_id):
res_conf = getattr(
study_config.somaworkflow_computing_resources_config, resource_id)
queue = res_conf.queue
if queue is Undefined:
queue = None
wf_id = controller.submit_workflow(workflow=workflow, name=workflow_name,
queue=queue)
swclient.Helper.transfer_input_files(wf_id, controller)
swclient.Helper.wait_workflow(wf_id, controller)
# get output values
pipeline = getattr(workflow, 'pipeline', None)
if pipeline:
pipeline = pipeline() # dereference the weakref
proc_map = {}
todo = [pipeline]
while todo:
process = todo.pop(0)
if isinstance(process, Pipeline):
todo += [n.process for n in process.nodes.values()
if n is not process.pipeline_node
and isinstance(n, ProcessNode)]
else:
proc_map[id(process)] = process
eng_wf = controller.workflow(wf_id)
for job in eng_wf.jobs:
if job.has_outputs:
out_params = controller.get_job_output_params(
eng_wf.job_mapping[job].job_id)
if out_params:
process = proc_map.get(job.process_hash)
if process is None:
# iteration or non-process job
continue
for param in list(out_params.keys()):
if process.trait(param) is None:
del out_params[param]
process.import_from_dict(out_params)
# TODO: should we transfer if the WF fails ?
swclient.Helper.transfer_output_files(wf_id, controller)
return controller, wf_id