Source code for capsul.pipeline.python_export

# -*- coding: utf-8 -*-
'''
Pipeline exportation function as a python source code file.

Functions
=========
:func:`save_py_pipeline`
------------------------
'''

from __future__ import print_function
from __future__ import absolute_import

from soma.controller import Controller
import traits.api as traits
import os
import six
import sys


[docs] def save_py_pipeline(pipeline, py_file): ''' Save a pipeline in an Python source file Parameters ---------- pipeline: Pipeline instance pipeline to save py_file: str or file-like object .py file to save the pipeline in ''' # imports are done locally to avoid circular imports from capsul.api import Process, Pipeline from capsul.pipeline.pipeline_nodes import ProcessNode, Switch, \ OptionalOutputSwitch, PipelineNode from capsul.pipeline.process_iteration import ProcessIteration from capsul.process.process import NipypeProcess from capsul.study_config.process_instance import get_process_instance from traits.api import Undefined def get_repr_value(value): # TODO: handle None/Undefined in lists/dicts etc if value is Undefined: repvalue = 'traits.Undefined' elif value is None: repvalue = 'None' elif isinstance(value, Controller): repvalue = repr(dict(value.export_to_dict())) else: repvalue = repr(value) return repvalue def _write_process(node, pyf, name, enabled, skip_invalid): process = node.process if isinstance(process, NipypeProcess): mod = process._nipype_interface.__module__ classname = process._nipype_interface.__class__.__name__ else: mod = process.__module__ # if process is a function with XML decorator, we need to # retrieve the original function name. func = getattr(process, '_function', None) if func: classname = func.__name__ else: classname = process.__class__.__name__ procname = '.'.join((mod, classname)) proc_copy = get_process_instance(procname) make_opt = [] for tname, trait in six.iteritems(proc_copy.user_traits()): ntrait = process.trait(tname) if ntrait.optional and not trait.optional: make_opt.append(tname) node_options = '' if len(make_opt) != 0: node_options += ', make_optional=%s' % repr(make_opt) if skip_invalid: node_options += ', skip_invalid=True' print(' self.add_process("%s", "%s"%s)' % (name, procname, node_options), file=pyf) # check that sub-nodes enable and plugs optional states are the # expected ones todo = [('self.nodes["%s"]' % name, node, ProcessNode(node.pipeline, node.name, proc_copy))] while todo: self_str, snode, cnode = todo.pop(0) if not snode.enabled: print(' %s.enabled = False' % self_str, file=pyf) # if the node is a (sub)pipeline, and this pipeline has additional # exported traits compared to the its base module/class instance # (proc_copy), then we must use explicit exports/links inside it if isinstance(snode, PipelineNode): sproc = snode.process cproc = cnode.process for param_name, trait in sproc.user_traits().items(): optional = None if param_name not in cproc.traits(): # param added, not in the original process is_input = not trait.output if (is_input and sproc.pipeline_node.plugs[ param_name].links_to) \ or (not is_input and sproc.pipeline_node.plugs[ param_name].links_from): if is_input: for link in sproc.pipeline_node.plugs[ param_name].links_to: print(f' {self_str}.process.add_link("{param_name}->{link[0]}.{link[1]}", allow_export=True)\n', file=pyf) else: for link in sproc.pipeline_node.plugs[ param_name].links_from: print(f' {self_str}.process.add_link("{link[0]}.{link[1]}->{param_name}", allow_export=True)\n', file=pyf) for param_name, plug in snode.plugs.items(): trait = snode.get_trait(param_name) ctrait = cnode.get_trait(param_name) optional = None if param_name not in cnode.plugs \ or trait.optional != ctrait.optional: optional = trait.optional if optional is not None: if hasattr(cnode, 'process'): print(f' {self_str}.process.trait("{param_name}").optional = {optional}\n', file=pyf) print(f' {self_str}.plugs["{param_name}"].optional = {optional}\n', file=pyf) value = snode.get_plug_value(param_name) if value != cnode.get_plug_value(param_name): #splug = snode.plugs[param_name] #if splug.output and len(splug.links_to) == 0 \ #or not splug.output and len(splug.links_from) == 0: ## unconnected with non-default value print( f' {self_str}.set_plug_value("{param_name}", %s)' % get_repr_value(value), file=pyf) if isinstance(snode, PipelineNode): sself_str = '%s.process.nodes["%s"]' % (self_str, '%s') for node_name, snode in snode.process.nodes.items(): scnode = cnode.process.nodes[node_name] if node_name == '': continue todo.append((sself_str % node_name, snode, scnode)) for pname in process.user_traits(): value = getattr(process, pname) init_value = getattr(proc_copy, pname, Undefined) if value != init_value \ and not (value is Undefined and init_value == ''): repvalue = get_repr_value(value) print(' self.nodes["%s"].process.%s = %s' % (name, pname, repvalue), file=pyf) #if isinstance(process, NipypeProcess): ## WARNING: not sure I'm doing the right things for nipype. To be ## fixed if needed. #for param in process.inputs_to_copy: #elem = ET.SubElement(procnode, 'nipype') #elem.set('name', param) #if param in process.inputs_to_clean: #elem.set('copyfile', 'discard') #else: #elem.set('copyfile', 'true') #np_input = getattr(process._nipype_interface.inputs, param) #if np_input: #use_default = getattr(np_input, 'usedefault', False) # is it that? #if use_default: #elem.set('use_default', 'true') #for param, np_input in \ #six.iteritems(process._nipype_interface.inputs.__dict__): #use_default = getattr(np_input, 'usedefault', False) # is it that? #if use_default and param not in process.inputs_to_copy: #elem = ET.SubElement(procnode, 'nipype') #elem.set('name', param) #elem.set('use_default', 'true') def _write_custom_node(node, pyf, name, enabled): mod = node.__module__ classname = node.__class__.__name__ nodename = '.'.join((mod, classname)) if hasattr(node, 'configured_controller'): c = node.configured_controller() params = dict((p, v) for p, v in six.iteritems(c.export_to_dict()) if v not in (None, traits.Undefined)) print( ' self.add_custom_node("%s", "%s", %s)' % (name, nodename, get_repr_value(params)), file=pyf) else: print(' self.add_custom_node("%s", "%s")' % (name, nodename), file=pyf) # optional plugs for plug_name, plug in six.iteritems(node.plugs): if plug.optional: print(' self.nodes["%s"].plugs["%s"].optional = True' % (name, plug_name), file=pyf) # non-default: values of unconnected plugs for plug_name, plug in six.iteritems(node.plugs): if len(plug.links_from) == 0 and len(plug.links_to) == 0 \ and plug_name in node.traits() \ and getattr(node, plug_name) \ != node.trait(plug_name).default: value = getattr(node, plug_name) print(' self.nodes["%s"].%s = %s' % (name, plug_name, get_repr_value(value)), file=pyf) def _write_iteration(process_iter, pyf, name, enabled): process = process_iter.process if isinstance(process, NipypeProcess): mod = process._nipype_interface.__module__ classname = process._nipype_interface.__class__.__name__ else: mod = process.__module__ # if process is a function with XML decorator, we need to # retrieve the original function name. func = getattr(process, '_function', None) if func: classname = func.__name__ else: classname = process.__class__.__name__ procname = '.'.join((mod, classname)) iteration_params = ', '.join(process_iter.iterative_parameters) # TODO: optional plugs, non-exported plugs... print(' self.add_iterative_process("%s", "%s", ' 'iterative_plugs=%s)' % (name, procname, process_iter.iterative_parameters), file=pyf) def _write_switch(switch, pyf, name, enabled): inputs_set = set() inputs = [] outputs = [] optional = [] opt_in = [] options = '' for plug_name, plug in six.iteritems(switch.plugs): if plug.output: outputs.append(plug_name) if plug.optional: optional.append(plug_name) else: name_parts = plug_name.split("_switch_") if len(name_parts) == 2 \ and name_parts[0] not in inputs_set: inputs_set.add(name_parts[0]) inputs.append(name_parts[0]) if plug.optional: opt_in.append(name_parts[0]) optional_p = '' if len(optional) != 0: optional_p = ', make_optional=%s' % repr(optional) # output types from .pipeline_tools import trait_str out_types = [] for p in switch._outputs: out_types.append(trait_str(switch.trait(p))) output_types = '[' + ', '.join(out_types) + ']' opt_inputs = getattr(switch, '_optional_input_nodes', None) if opt_inputs: opt_inputs = [i[1] for i in opt_inputs if i[0] in inputs] if opt_inputs == inputs: opt_inputs = True options += ', opt_nodes=%s' % repr(opt_inputs) value_p = '' if switch.switch != inputs[0]: value_p = ', switch_value=%s' % repr(switch.switch) print(' self.add_switch("%s", %s, %s, output_types=%s%s%s%s, export_switch=False)' % (name, repr(inputs), repr(outputs), output_types, optional_p, value_p, options), file=pyf) def _write_optional_output_switch(switch, pyf, name, enabled): output = None input = None for plug_name, plug in six.iteritems(switch.plugs): if plug.output: output = plug_name else: name_parts = plug_name.split("_switch_") if len(name_parts) == 2 and name_parts[0] != '_none': input = name_parts[0] if not name and output: name = output if not output or output == name: print(' self.add_optional_output_switch("%s", "%s")' % (name, input), file=pyf) else: print(' self.add_optional_output_switch("%s", "%s", "%s")' % (name, input, output), file=pyf) def _write_processes(pipeline, pyf): print(' # nodes', file=pyf) nodes = [] # sort nodes, processes first for node_name, node in six.iteritems(pipeline.nodes): if node_name == "": continue nodes.append((node_name, node)) for node_name, node in nodes: if isinstance(node, OptionalOutputSwitch): _write_optional_output_switch(node, pyf, node_name, node.enabled) elif isinstance(node, Switch): _write_switch(node, pyf, node_name, node.enabled) elif isinstance(node, ProcessNode) \ and isinstance(node.process, ProcessIteration): _write_iteration(node.process, pyf, node_name, node.enabled) elif isinstance(node, ProcessNode): _write_process(node, pyf, node_name, node.enabled, node_name in pipeline._skip_invalid_nodes) else: # custom node _write_custom_node(node, pyf, node_name, node.enabled) def _write_processes_selections(pipeline, pyf): selection_parameters = [] if hasattr(pipeline, 'processes_selection'): print('\n # processes selection', file=pyf) for selector_name, groups \ in six.iteritems(pipeline.processes_selection): print(' self.add_processes_selection("%s", %s)' % (selector_name, repr(groups)), file=pyf) return selection_parameters def _write_export(pipeline, pyf, param_name): plug = pipeline.pipeline_node.plugs[param_name] if plug.output: link = list(plug.links_from)[0] else: link = list(plug.links_to)[0] trait = pipeline.trait(param_name) node_name, plug_name = link[:2] if param_name == plug_name: param_name = '' else: param_name = ', "%s"' % param_name weak_link = '' if link[-1]: weak_link = ', weak_link=True' if trait is None: print('missing tait', param_name, 'on:', pipeline) #if pipeline.trait(param_name).optional: is_optional = ', is_optional=%s' % repr(trait.optional) print(' self.export_parameter("%s", "%s"%s%s%s)' % (node_name, plug_name, param_name, weak_link, is_optional), file=pyf) return node_name, plug_name def _write_links(pipeline, pyf): exported = set() print('\n # links', file=pyf) for node_name, node in six.iteritems(pipeline.nodes): for plug_name, plug in six.iteritems(node.plugs): if (node_name == "" and not plug.output) \ or (node_name != "" and plug.output): links = plug.links_to for link in links: exported_plug = None if node_name == "": src = plug_name if src not in exported: exported_plug = _write_export(pipeline, pyf, src) exported.add(src) else: src = "%s.%s" % (node_name, plug_name) if link[0] == "": dst = link[1] if dst not in exported: exported_plug = _write_export(pipeline, pyf, dst) exported.add(dst) else: dst = "%s.%s" % (link[0], link[1]) if not exported_plug \ or '.'.join(exported_plug) not in (src, dst): weak_link = '' if link[-1]: weak_link = ', weak_link=True' print(' self.add_link("%s->%s"%s)' % (src, dst, weak_link), file=pyf) def _write_param_order(pipeline, pyf): if len(pipeline.user_traits()) == 0: return print('\n # parameters order', file=pyf) names = ['"%s"' % n for n in pipeline.user_traits().keys() if n not in ('nodes_activation', 'pipeline_steps', 'visible_groups')] print('\n self.reorder_traits((%s))' % ', '.join(names), file=pyf) def _write_steps(pipeline, pyf): steps = pipeline.trait('pipeline_steps') if steps and getattr(pipeline, 'pipeline_steps', None): print('\n # pipeline steps', file=pyf) for step_name, step \ in six.iteritems(pipeline.pipeline_steps.user_traits()): enabled = getattr(pipeline.pipeline_steps, step_name) enabled_str = '' if not enabled: enabled_str = ', enabled=false' nodes = step.nodes print(' self.add_pipeline_step("%s", %s%s)' % (step_name, repr(nodes), enabled_str), file=pyf) def _write_nodes_positions(pipeline, pyf): node_position = getattr(pipeline, "node_position", None) if node_position: print('\n # nodes positions', file=pyf) print(' self.node_position = {', file=pyf) for node_name, pos in six.iteritems(pipeline.node_position): if not isinstance(pos, (list, tuple)): # pos is probably a QPointF pos = (pos.x(), pos.y()) print(' "%s": %s,' % (node_name, repr(pos)), file=pyf) print(' }', file=pyf) if hasattr(pipeline, "scene_scale_factor"): print(' self.scene_scale_factor = %s' % repr(pipeline.scene_scale_factor), file=pyf) ########### add by Irmage OM ######################### def _write_nodes_dimensions(pipeline, pyf): node_dimension = getattr(pipeline, "node_dimension", None) if node_dimension: print('\n # nodes dimensions', file=pyf) print(' self.node_dimension = {', file=pyf) for node_name, dim in six.iteritems(pipeline.node_dimension): if not isinstance(dim, (list, tuple)): dim = (dim.width(), dim.height()) print(' "%s": %s,' % (node_name, repr(dim)), file=pyf) print(' }', file=pyf) ###################################################### def _write_doc(pipeline, pyf): if hasattr(pipeline, "__doc__"): docstr = pipeline.__doc__ if docstr == Pipeline.__doc__: docstr = "" # don't use the builtin Pipeline help else: # remove automatically added doc splitdoc = docstr.split('\n') notepos = [i for i, x in enumerate(splitdoc[:-2]) if x.endswith('.. note::')] autodocpos = None if notepos: for i in notepos: if splitdoc[i+2].find( "* Type '{0}.help()'".format( pipeline.__class__.__name__)) != -1: autodocpos = i if autodocpos is not None: # strip empty trailing lines while autodocpos >= 1 \ and splitdoc[autodocpos - 1].strip() == '': autodocpos -= 1 docstr = '\n'.join(splitdoc[:autodocpos]) + '\n' if docstr.strip() == '': docstr = '' if docstr: doc = docstr.split('\n') docstr = '\n'.join([repr(x)[1:-1] for x in doc]) print(' \"\"\"%s \"\"\"' % docstr, file=pyf) def _write_values(pipeline, pyf): first = True for param_name, trait in six.iteritems(pipeline.user_traits()): if param_name not in pipeline.pipeline_node.plugs: continue # default cannot be set after trait creation #if trait.default: #if first: #first = False #print('\n # default and initial values', file=pyf) #print(' self.trait("%s").default = %s' #% (param_name, repr(trait.default)), file=pyf) value = getattr(pipeline, param_name) if value != trait.default and value not in (None, '', Undefined): if isinstance(value, Controller): value_repr = repr(dict(value.export_to_dict())) else: value_repr = repr(value) try: eval(value_repr) except Exception: print('warning, value of parameter %s cannot be saved' % param_name) continue if first: first = False print('\n # default and initial values', file=pyf) print(' self.%s = %s' % (param_name, value_repr), file=pyf) class_name = type(pipeline).__name__ if class_name == 'Pipeline': # don't accept the base Pipeline class if isinstance(py_file, str): class_name = os.path.basename(py_file) elif hasattr(py_file, 'name'): class_name = os.path.basename(py_file.name) else: class_name = 'CustomPipeline' if '.' in class_name: class_name = class_name[:class_name.index('.')] class_name = class_name[0].upper() + class_name[1:] if isinstance(py_file, str): pyf = open(py_file, 'w') else: pyf = py_file print('# -*- coding: utf-8 -*-\n', file=pyf) print('from capsul.api import Pipeline', file=pyf) print('import traits.api as traits', file=pyf) print(file=pyf) print(file=pyf) print('class %s(Pipeline):' % class_name, file=pyf) _write_doc(pipeline, pyf) print(file=pyf) print(' def pipeline_definition(self):', file=pyf) _write_processes(pipeline, pyf) _write_links(pipeline, pyf) _write_param_order(pipeline, pyf) _write_processes_selections(pipeline, pyf) _write_steps(pipeline, pyf) _write_values(pipeline, pyf) _write_nodes_positions(pipeline, pyf) _write_nodes_dimensions(pipeline, pyf) #add by Irmage OM print('\n self.do_autoexport_nodes_parameters = False', file=pyf) if pyf is not py_file: pyf.close()