Source code for capsul.study_config.process_instance

# -*- coding: utf-8 -*-
'''
Process instance factory

Functions
=========
:func:`is_process`
------------------
:func:`is_pipeline_node`
------------------------
:func:`get_process_instance`
----------------------------
:func:`get_node_class`
----------------------
:func:`get_node_instance`
-------------------------
'''

# System import
from __future__ import absolute_import
import sys
import os.path as osp
import importlib
import types
import re
import six
import os
import inspect

# Caspul import
from capsul.process.process import Process
from capsul.pipeline.pipeline import Pipeline
from capsul.process.nipype_process import nipype_factory
from capsul.process.xml import create_xml_process
from capsul.pipeline.xml import create_xml_pipeline
from capsul.pipeline.json_io import create_json_pipeline
from capsul.pipeline.pipeline_nodes import Node
from soma.controller import Controller

process_xml_re = re.compile(r'<process.*</process>', re.DOTALL)


_interface = None
_nipype_loaded = False

def _get_interface_class():
    '''
    returns the nypype Interface type, or a custom type if it cannot be
    imported.

    We use this function on demand because importing nipype is long (it
    sometimes takes several seconds) and it's not always needed.

    We don't really import nipype, but use sys.modules to get it instead,
    because here we only use Interface to check if a given object is an
    instance of Interface.
    '''
    global _interface, _nipype_loaded
    if _interface is not None and _nipype_loaded:
        return _interface
    if not _nipype_loaded:
        # Nipype import
        nipype = sys.modules.get('nipype.interfaces.base')
        if nipype is None:
            _interface = type("Interface", (object, ), {})
        else:
            _interface = getattr(nipype, 'Interface')
            _nipype_loaded = True
    return _interface


[docs] def is_process(item): """ Check if the input item is a process class or function with decorator or XML docstring which makes it seen as a process """ Interface = _get_interface_class() if inspect.isclass(item) and item not in (Pipeline, Process) \ and (issubclass(item, Process) or issubclass(item, Interface)): return True if not inspect.isfunction(item): return False if hasattr(item, 'capsul_xml'): return True if item.__doc__: match = process_xml_re.search(item.__doc__) if match: return True return False
[docs] def is_pipeline_node(item): """ Check if the input is an instance of pipeline Node """ return item is not Node and isinstance(item, Node)
[docs] def get_process_instance(process_or_id, study_config=None, **kwargs): """ Return a Process instance given an identifier. Note that it is convenient to create a process from a StudyConfig instance: StudyConfig.get_process_instance() The identifier is either: * a derived Process class. * a derived Process class instance. * a Nipype Interface instance. * a Nipype Interface class. * a string description of the class `<module>.<class>`. * a string description of a function to warp `<module>.<function>`. * a string description of a module containing a single process `<module>` * a string description of a pipeline `<module>.<fname>.xml`. * an XML filename for a pipeline. * a JSON filename for a pipeline. * a Python (.py) filename with process name in it: `/path/process_source.py#ProcessName`. * a Python (.py) filename for a file containing a single process. Default values of the process instance are passed as additional parameters. .. note: If no process is found an ImportError is raised. .. note: If the 'process_or_id' parameter is not valid a ValueError is raised. .. note: If the function to warp does not contain a process description in its docstring ('<process>...</process>') a ValueError is raised. Parameters ---------- process_or_id: instance or class description (mandatory) a process/nipype interface instance/class or a string description. study_config: StudyConfig instance (optional) A Process instance belongs to a StudyConfig framework. If not specified the study_config can be set afterwards. kwargs: default values of the process instance parameters. Returns ------- result: Process an initialized process instance. """ # NOTE # here we make a bidouille to make study_config accessible from processes # constructors. It is used for instance in ProcessIteration. # This is not elegant, not thread-safe, and forbids to have one pipeline # build a second one in a different study_config context. # I don't have a better solution, however. import capsul.study_config.study_config as study_cmod set_study_config = (study_config is not None and study_cmod._default_study_config is not study_config) try: if set_study_config: old_default_study_config = study_cmod._default_study_config study_cmod._default_study_config = study_config return _get_process_instance(process_or_id, study_config=study_config, **kwargs) finally: if set_study_config: study_cmod._default_study_config = old_default_study_config
def _execfile(filename): # This chunk of code cannot be put inline in python 2.6 glob_dict = {} exec(compile(open(filename, "rb").read(), filename, 'exec'), glob_dict, glob_dict) return glob_dict def _load_module(filename, modname=None): if not modname: modname = os.path.basename(filename).rsplit('.', 2)[0] if sys.version_info >= (3, 5): import importlib.util spec = importlib.util.spec_from_file_location(modname, filename) mod = importlib.util.module_from_spec(spec) sys.modules[modname] = mod spec.loader.exec_module(mod) return mod elif sys.version_info[0] >= 3: from importlib.machinery import SourceFileLoader mod = SourceFileLoader(modname, filename).load_module() else: import imp mod = imp.load_source(modname, filename) if mod is not None: sys.modules[modname] = mod return mod def _get_process_instance(process_or_id, study_config=None, **kwargs): def _find_single_process(module_dict, filename): ''' Scan objects in module_dict and find out if a single one of them is a process ''' object_name = None for name, item in six.iteritems(module_dict): if is_process(item): if object_name is not None: raise KeyError( 'file %s contains several processes. Please ' 'specify which one should be used using ' 'filename.py#ProcessName or ' 'module.submodule.ProcessName' % filename) object_name = name return object_name result = None Interface = _get_interface_class() # If the function 'process_or_id' parameter is already a Process # instance. if isinstance(process_or_id, Process): result = process_or_id # If the function 'process_or_id' parameter is a Process class. elif (isinstance(process_or_id, type) and issubclass(process_or_id, Process)): result = process_or_id() # If the function 'process_or_id' parameter is already a Nipye # interface instance, wrap this structure in a Process class elif isinstance(process_or_id, Interface): result = nipype_factory(process_or_id) # If the function 'process_or_id' parameter is an Interface class. elif (isinstance(process_or_id, type) and issubclass(process_or_id, Interface)): result = nipype_factory(process_or_id()) # If the function 'process_or_id' parameter is a function. elif isinstance(process_or_id, types.FunctionType): xml = getattr(process_or_id, 'capsul_xml', None) if xml is None: # Check docstring if process_or_id.__doc__: match = process_xml_re.search( process_or_id.__doc__) if match: xml = match.group(0) if xml: result = create_xml_process(process_or_id.__module__, process_or_id.__name__, process_or_id, xml)() else: raise ValueError('Cannot find XML description to make function {0} a process'.format(process_or_id)) # If the function 'process_or_id' parameter is a class string # description elif isinstance(process_or_id, six.string_types) \ and not process_or_id.startswith('<pipeline'): py_url = os.path.basename(process_or_id).split('#') object_name = None as_xml = False as_py = False module_dict = None module = None if len(py_url) >= 2 and py_url[-2].endswith('.py') \ or len(py_url) == 1 and py_url[0].endswith('.py'): # python file + process name: something.py#ProcessName # or just something.py if it contains only one process class if len(py_url) >= 2: filename = process_or_id[:-len(py_url[-1]) - 1] object_name = py_url[-1] else: filename = process_or_id object_name = None module = _load_module(filename) module_name = module.__name__ module_dict = module.__dict__ if object_name is None: object_name = _find_single_process( module_dict, module_name) if object_name is not None: module_name = process_or_id as_py = True elif object_name in module_dict: as_py = True if object_name is None: elements = process_or_id.rsplit('.', 1) if len(elements) < 2: module_name, object_name = '__main__', elements[0] else: module_name, object_name = elements try: module = importlib.import_module(module_name) # update the Interface class since we may have loaded nipype # during import Interface = _get_interface_class() if object_name not in module.__dict__ \ or not is_process(getattr(module, object_name)): # maybe a module with a single process in it module = importlib.import_module(process_or_id) module_dict = module.__dict__ # update the Interface class since we may have loaded # nipype during import Interface = _get_interface_class() object_name = _find_single_process( module_dict, module_name) if object_name is not None: module_name = process_or_id as_py = True else: as_py = True except ImportError as e: pass if not as_py: # maybe XML or JSON filename or URL for ext in ('.xml', '.json'): xml_url = process_or_id + ext if osp.exists(xml_url): object_name = None as_xml = True elif process_or_id.endswith(ext) and osp.exists(process_or_id): xml_url = process_or_id object_name = None as_xml = True else: # maybe XML or JSON file with pipeline name in it xml_url = module_name + ext if not osp.exists(xml_url) and module_name.endswith(ext) \ and osp.exists(module_name): xml_url = module_name if not osp.exists(xml_url): # try XML file in a module directory + class name basename = None module_name2 = None if module_name in sys.modules: basename = object_name module_name2 = module_name object_name = None # to allow unmatching class / xml if basename.endswith(ext): basename = basename[:-4] else: elements = module_name.rsplit('.', 1) if len(elements) == 2: module_name2, basename = elements if module_name2 and basename: try: importlib.import_module(module_name2) mod_dirname = osp.dirname( sys.modules[module_name2].__file__) xml_url = osp.join(mod_dirname, basename + ext) if not osp.exists(xml_url): # if basename includes .xml extension xml_url = osp.join(mod_dirname, basename) as_xml = True except ImportError as e: raise ImportError('Cannot import %s: %s' % (module_name, str(e))) if as_xml: break if osp.exists(xml_url): loaders = {'.xml': create_xml_pipeline, '.json': create_json_pipeline} result = loaders[ext](module_name, object_name, xml_url)() if result is None and not as_xml: if module_dict is not None: module_object = module_dict.get(object_name) else: module = sys.modules[module_name] module_object = getattr(module, object_name, None) if module_object is not None: if (isinstance(module_object, type) and issubclass(module_object, Process)): result = module_object() elif isinstance(module_object, Interface): # If we have a Nipype interface, wrap this structure in a # Process class result = nipype_factory(result) elif (isinstance(module_object, type) and issubclass(module_object, Interface)): result = nipype_factory(module_object()) elif isinstance(module_object, types.FunctionType): xml = getattr(module_object, 'capsul_xml', None) if xml is None: # Check docstring if module_object.__doc__: match = process_xml_re.search( module_object.__doc__) if match: xml = match.group(0) if xml: result = create_xml_process(module_name, object_name, module_object, xml)() if result is None and module is not None: xml_file = osp.join(osp.dirname(module.__file__), object_name + '.xml') if osp.exists(xml_file): result = create_xml_pipeline(module_name, None, xml_file)() elif hasattr(process_or_id, 'read') \ or (isinstance(process_or_id, bytes) and process_or_id.startswith(b'<pipeline')) \ or (isinstance(process_or_id, str) and process_or_id.startswith('<pipeline')): # file-like object or xml string module_name = __name__ # can be either XML or python #try: result = create_xml_pipeline(module_name, None, process_or_id)() #except Exception as e: #result = create_xml_pipeline(module_name, None, process_or_id)() if result is None: raise ValueError("Invalid process_or_id argument. " "Got '{0}' and expect a Process instance/string " "description or an Interface instance/string " "description".format(process_or_id)) # Set the instance default parameters for name, value in six.iteritems(kwargs): result.set_parameter(name, value) if study_config is not None: if result.study_config is not None \ and result.study_config is not study_config: raise ValueError("StudyConfig mismatch in get_process_instance " "for process %s" % result) result.set_study_config(study_config) return result
[docs] def get_node_class(node_type): """ Get a custom node class from module + class string. The class name is optional if the module contains only one node class. It is OK to pass a Node subclass or a Node instance also. """ if inspect.isclass(node_type): if issubclass(node_type, Node): return node_type.__name__, node_type # already a Node class return Node if isinstance(node_type, Node): return node_type.__class__.__name__, node_type.__class__ cls = None try: mod = importlib.import_module(node_type) for name, val in six.iteritems(mod.__dict__): if inspect.isclass(val) and val.__name__ != 'Node' \ and issubclass(val, Node): cls = val break else: return None except ImportError: name = node_type.split('.')[-1] modname = node_type[:-len(name) - 1] mod = importlib.import_module(modname) cls = getattr(mod, name) if cls is None: return None return name, cls
[docs] def get_node_instance(node_type, pipeline, conf_dict=None, name=None, **kwargs): """ Get a custom node instance from a module + class name (see :func:`get_node_class`) and a configuration dict or Controller. The configuration contains parameters needed to instantiate the node type. Each node class may specify its parameters via its class method `configure_node`. Parameters ---------- node_type: str or Node subclass or Node instance node type to be built. Either a class (Node subclass) or a Node instance (the node will be re-instantiated), or a string describing a module and class. pipeline: Pipeline pipeline in which the node will be inserted. conf_dict: dict or Controller configuration dict or Controller defining parameters needed to build the node. The controller should be obtained using the node class's `configure_node()` static method, then filled with the desired values. If not given the node is supposed to be built with no parameters, which will not work for every node type. kwargs: default values of the node instance parameters. """ cls_and_name = get_node_class(node_type) if cls_and_name is None: raise ValueError("Could not find node class %s" % node_type) nname, cls = cls_and_name if not name: name = nname if isinstance(conf_dict, Controller): conf_controller = conf_dict elif conf_dict is not None: if hasattr(cls, 'configure_controller'): conf_controller = cls.configure_controller() if conf_controller is None: raise ValueError("node type %s has a configuration controller " "problem (see %s.configure_controller()" % (node_type, node_type)) conf_controller.import_from_dict(conf_dict) else: conf_controller = Controller() else: if hasattr(cls, 'configure_controller'): conf_controller = cls.configure_controller() else: conf_controller = Controller() if hasattr(cls, 'build_node'): node = cls.build_node(pipeline, name, conf_controller) else: # probably bound to fail... node = cls(pipeline, name, [], []) # Set the instance default parameters for name, value in six.iteritems(kwargs): setattr(node, name, value) return node