Source code for capsul.process.nipype_process

# -*- coding: utf-8 -*-
##########################################################################
# CAPSUL - Copyright (C) CEA, 2013
# Distributed under the terms of the CeCILL-B license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html
# for details.
##########################################################################

'''
Utilities to link Capsul and NiPype interfaces

Functions
---------
:func:`nipype_factory`
++++++++++++++++++++++
'''

# System import
from __future__ import print_function
from __future__ import absolute_import
from soma.controller.trait_utils import relax_exists_constraint
from soma.controller import trait_ids
import sys
import os
import types
import logging
import six

# Define the logger
logger = logging.getLogger(__name__)

# Trait import
from traits.api import Directory, File, List, CTrait, Undefined, TraitError

# Capsul import
from .process import NipypeProcess


[docs] def nipype_factory(nipype_instance, base_class=NipypeProcess): """ From a nipype class instance generate dynamically a process instance that encapsulate the nipype instance. This function clones the nipye traits (also convert special traits) and connects the process and nipype instances traits. A new 'output_directory' nipype input trait is created. Since nipype inputs and outputs are separated and thus can have the same names, the nipype process outputs are prefixed with '_'. It also monkey patch some nipype functions in order to execute the process in a specific directory: the monkey patching has been written for Nipype version '0.10.0'. Parameters ---------- nipype_instance : instance (mandatory) a nipype interface instance. Returns ------- process_instance : instance a process instance. See Also -------- _run_interface _list_outputs _gen_filename _parse_inputs sync_nypipe_traits sync_process_output_traits clone_nipype_trait """ #################################################################### # Monkey patching for Nipype version '0.9.2'. #################################################################### # Change first level masking explicit in postscript def _make_matlab_command(self, content): from nipype.interfaces.spm import Level1Design return super(Level1Design, self)._make_matlab_command( content, postscript=None) if (nipype_instance.__class__.__module__.startswith( 'nipype.interfaces.spm.') and nipype_instance.__class__.__name__ == "Level1Design"): nipype_instance._make_matlab_command = types.MethodType( _make_matlab_command, nipype_instance) # Create new instance derived from Process if hasattr(base_class, '_nipype_class_type'): process_instance = base_class() else: process_instance = base_class(nipype_instance) #################################################################### # Define functions to synchronized the process and interface traits #################################################################### def sync_nypipe_traits(process_instance, name, old, value): """ Event handler function to update the nipype interface traits Parameters ---------- process_instance: process instance (mandatory) the process instance that contain the nipype interface we want to update. name: str (mandatory) the name of the trait we want to update. old: type (manndatory) the old trait value new: type (manndatory) the new trait value """ # Set the new nypipe interface value trait_map = getattr(process_instance, '_nipype_trait_mapping', {}) inames = [iname for iname, pname in trait_map.items() if pname == name] capsul_name = name if inames: name = inames[0] if name.startswith("nipype_"): setattr(process_instance._nipype_interface.inputs, name[7:], value) else: try: setattr(process_instance._nipype_interface.inputs, name, value) except Exception: # reset old value as the assignation has failed setattr(process_instance, capsul_name, old) raise def _replace_dir(value, directory): """ Replace directory in filename(s) in value. value may be a string, or a list """ if value in (None, Undefined, ""): return value if isinstance(value, list): value = [_replace_dir(x, directory) for x in value] else: value = os.path.join(directory, os.path.basename(value)) return value def sync_process_output_traits(process_instance, name, value): """ Event handler function to update the process instance outputs This callback is only called when an input process instance trait is modified. Parameters ---------- process_instance: process instance (mandatory) the process instance that contain the nipype interface we want to update. name: str (mandatory) the name of the trait we want to update. value: type (mandatory) the old trait value """ # Get all the input traits input_traits = process_instance.traits(output=False) output_directory \ = getattr(process_instance, 'output_directory', Undefined) # get the interface name from the process trait name trait_map = getattr(process_instance, '_nipype_trait_mapping', {}) # Try to update all the output process instance traits values when # a process instance input trait is modified or when the dedicated # 'synchronize' trait value is modified if name in input_traits or name in ("synchronize", 'output_directory'): # Try to set all the process instance output traits values from # the nipype autocompleted traits values try: nipype_outputs = (process_instance. _nipype_interface._list_outputs()) except Exception as e: # don't make it all crash because of a nipype trait assign # error print('EXCEPTION:', e, file=sys.stderr) print('while syncing nipype parameter', name, 'on', process_instance.name, file=sys.stderr) # when called during exit, the traceback module might have # already disappeared import traceback traceback.print_exc() ex_type, ex, tb = sys.exc_info() logger.debug( "Something wrong in the nipype output trait " "synchronization:\n\n\tError: {0} - {1}\n" "\tTraceback:\n{2}".format( ex_type, ex, "".join(traceback.format_tb(tb)))) nipype_outputs = {} # Synchronize traits: check file existence for out_name, out_value in six.iteritems(nipype_outputs): pname = trait_map.get(out_name, '_' + out_name) try: # if we have an output directory, replace it if output_directory not in (Undefined, None) \ and any([x for x in trait_ids(process_instance.trait( pname)) if 'File' in x or 'Directory' in x]): out_value = _replace_dir(out_value, output_directory) # Set the output process trait value process_instance.set_parameter(pname, out_value) # If we can't update the output process instance traits values, # print a logging debug message. except Exception as e: print('EXCEPTION:', e, file=sys.stderr) print('while setting nipype output parameter', pname, 'on', process_instance.name, 'with value:', out_value, file=sys.stderr) import traceback traceback.print_exc() ex_type, ex, tb = sys.exc_info() logger.debug( "Something wrong in the nipype output trait " "synchronization:\n\n\tError: {0} - {1}\n" "\tTraceback:\n{2}".format( ex_type, ex, "".join(traceback.format_tb(tb)))) if name in list(input_traits.keys()) + ['synchronize', 'output_directory']: names = [name] if name == 'output_directory': names = list(input_traits.keys()) for name in names: # check if the input trait is duplicated as an output trait = process_instance.trait(name) if trait.copyfile: out_trait = process_instance.trait('_modified_%s' % name) if out_trait: new_value = getattr(process_instance, name) if output_directory not in (Undefined, None): new_value = _replace_dir(new_value, output_directory) try: process_instance.set_parameter( "_modified_%s" % name, new_value) # If we can't update the output process instance # traits values, print a logging debug message. except Exception as e: print('EXCEPTION:', e) import traceback traceback.print_exc() ex_type, ex, tb = sys.exc_info() logger.debug( "Something wrong in the nipype output trait " "synchronization:\n\n\tError: {0} - {1}\n" "\tTraceback:\n{2}".format( ex_type, ex, "".join(traceback.format_tb(tb)))) #################################################################### # Clone nipype traits #################################################################### # The following function is not shared since it is too specific def clone_nipype_trait(process_instance, nipype_trait): """ Create a new trait (cloned and converted if necessary) from a nipype trait. Parameters ---------- nipype_trait: trait the nipype trait we want to clone and convert if necessary. Returns ------- process_trait: trait the cloned/converted trait that will be used in the process instance. """ # Clone the nipype trait process_trait = process_instance._clone_trait(nipype_trait) # Copy some information from the nipype trait process_trait.desc = nipype_trait.desc process_trait.optional = not nipype_trait.mandatory process_trait._metadata = {} return process_trait # beginning of nipype_factory function try: import nipype.interfaces.base.traits_extension as npe except (AttributeError, ImportError): # In some situations an AttributeError is raised, with the message: # module 'nipype.interfaces' has no attribute 'base' # but the module is actually here. Maybe it has not finished loading # (how can that happen?) if 'nipype.interfaces.base.traits_extension' in sys.modules: npe = sys.modules['nipype.interfaces.base.traits_extension'] else: # no nipype, or problem loading it. Give up, use regular traits. import traits.api as npe # Add nipype traits to the process instance # > input traits trait_map = getattr(process_instance, '_nipype_trait_mapping', {}) for trait_name, trait in nipype_instance.input_spec().items(): # Check if trait name already used in class attributes: # For instance nipype.interfaces.fsl.FLIRT has a save_log bool input # trait. trait_name = trait_map.get(trait_name, trait_name) if hasattr(process_instance, trait_name): trait_name = "nipype_" + trait_name # Relax nipype exists trait constraint relax_exists_constraint(trait) # Clone the nipype trait process_trait = clone_nipype_trait(process_instance, trait) # Add the cloned trait to the process instance process_instance.add_trait(trait_name, process_trait) # Need to copy all the nipype trait information process_instance.trait(trait_name).optional = not trait.mandatory process_instance.trait(trait_name).desc = trait.desc process_instance.trait(trait_name).output = False # initialize value with nipype interface initial value, (if we can...) try: if trait_name.startswith("nipype_"): setattr(process_instance, trait_name, getattr(nipype_instance.inputs, trait_name[7:])) else: setattr(process_instance, trait_name, getattr(nipype_instance.inputs, trait_name)) except TraitError: # the value in the nipype trait is actually invalid... pass # Add the callback to update nipype traits when a process input # trait is modified process_instance.on_trait_change(sync_nypipe_traits, name=trait_name) ## if copyfile is True, we assume the input will be modified, thus it ## also becomes an output #if trait.copyfile: #process_trait = clone_nipype_trait(process_instance, trait) #process_trait.output = True #process_trait.input_filename = False #process_trait.optional = True #private_name = "_" + trait_name + '_out' #process_instance.add_trait(private_name, process_trait) # Add callback to synchronize output process instance traits with nipype # autocompleted output traits process_instance.on_trait_change(sync_process_output_traits) # > output traits for trait_name, trait in nipype_instance.output_spec().items(): # Relax nipye exists trait constraint relax_exists_constraint(trait) # Clone the nipype trait process_trait = clone_nipype_trait(process_instance, trait) # Create the output process trait name: nipype trait name prefixed # by '_' private_name = trait_map.get(trait_name, '_' + trait_name) # Add the cloned trait to the process instance process_instance.add_trait(private_name, process_trait) process_trait = process_instance.trait(private_name) # Need to copy all the nipype trait information process_trait.optional = not trait.mandatory process_trait.desc = trait.desc process_trait.output = True process_trait.enabled = False # SPM output File traits and lists of File should have the # metatata input_filename=False if process_instance._nipype_interface_name == 'spm': if isinstance(process_trait.trait_type, (File, Directory, npe.File, npe.Directory)): process_trait.input_filename = False elif isinstance(process_trait.trait_type, List) \ and isinstance(process_trait.inner_traits[0].trait_type, (File, Directory, npe.File, npe.Directory)): process_trait.inner_traits[0].output = True process_trait.inner_traits[0].input_filename = False process_trait.input_filename = False # allow to save the SPM .m script if nipype_instance.__class__.__module__.startswith( 'nipype.interfaces.spm.'): script_name = trait_map.get('_spm_script_file', '_spm_script_file') process_instance.add_trait(script_name, File(output=True, optional=True)) return process_instance