Source code for capsul.pipeline.custom_nodes.strcat_node

# -*- coding: utf-8 -*-
'''
:class:`StrCatNode`
-------------------
'''

from __future__ import absolute_import
from capsul.pipeline.pipeline_nodes import Node
from soma.controller import Controller
import traits.api as traits
import sys
import six
from six.moves import zip


[docs] class StrCatNode(Node): ''' This "inert" node concatenates its inputs (as strings) and generates the concatenation on one of its plugs. All plugs may be inputs or outputs. ''' _doc_path = 'api/pipeline.html#strcatnode' def __init__(self, pipeline, name, params, concat_plug, outputs, make_optional=(), param_types={}): ''' Parameters ---------- pipeline: Pipeline pipeline which will hold the node name: str node name params: list names of parameters to be concatenated concat_plug: str name of the concatenated plug (should not be part of params) outputs: list list of parameters names which are outputs. May include elements from params, and/or concat_plug make_optional: list list of plug names which should be optional. param_types: dict parameters types dict: {param_name: trait_type_as_string} ''' node_inputs = [dict(name=i, optional=(i in make_optional)) for i in params if i not in outputs] node_outputs = [dict(name=i, optional=(i in make_optional)) for i in outputs if i in outputs] if concat_plug in outputs: node_outputs.append({'name': concat_plug, 'optional': concat_plug in make_optional}) else: node_inputs.append({'name': concat_plug, 'optional': concat_plug in make_optional}) super(StrCatNode, self).__init__(pipeline, name, node_inputs, node_outputs) self._concat_sequence = params self._concat_plug = concat_plug self.add_parameters(param_types) self.cat_callback() self.set_callbacks() def add_parameters(self, param_types={}): added_traits = [self._concat_plug] for name in self._concat_sequence + added_traits: plug = self.plugs[name] ptype = param_types.get(name) if ptype is None: ptype = traits.Any(traits.Undefined) self.add_trait(name, ptype) self.trait(name).output = plug.output self.trait(name).optional = plug.optional def set_callbacks(self, update_callback=None): if update_callback is None: update_callback = self.cat_callback self.on_trait_change(update_callback, self._concat_sequence) def cat_callback(self): result = ''.join([six.text_type(getattr(self, name)) for name in self._concat_sequence]) setattr(self, self._concat_plug, result) def configured_controller(self): c = self.configure_controller() c.parameters = self._concat_sequence c.concat_plug = self._concat_plug param_types = [self.trait(x).trait_type.__class__.__name__ for x in c.parameters + [c.concat_plug]] c.outputs = [x for x in c.parameters + [c.concat_plug] if self.trait(x).output] c.param_types = param_types return c @classmethod def configure_controller(cls): c = Controller() c.add_trait('parameters', traits.List(traits.Str())) c.add_trait('concat_plug', traits.Str()) c.add_trait('outputs', traits.List(traits.Str())) c.add_trait('param_types', traits.List(traits.Str('Str'))) return c @classmethod def build_node(cls, pipeline, name, conf_controller): params = [(x, x in conf_controller.outputs) for x in conf_controller.parameters] t = {} if conf_controller.param_types: for pname, ptype in zip(conf_controller.parameters + [conf_controller.concat_plug], conf_controller.param_types): t[pname] = getattr(traits, ptype)() node = StrCatNode(pipeline, name, conf_controller.parameters, conf_controller.concat_plug, conf_controller.outputs, param_types=t) return node def params_to_command(self): return ['custom_job'] def build_job(self, name=None, referenced_input_files=[], referenced_output_files=[], param_dict=None): from soma_workflow.custom_jobs import StrCatJob param_dict['input_names'] = self._concat_sequence param_dict['output_name'] = self._concat_plug # transmit values for param in self._concat_sequence: param_dict[param] = getattr(self, param) # [re] build the concatenated output self.cat_callback() param_dict[self._concat_plug] = getattr(self, self._concat_plug) job = StrCatJob(name=name, referenced_input_files=referenced_input_files, referenced_output_files=referenced_output_files, param_dict=param_dict) return job