Source code for capsul.engine

# -*- coding: utf-8 -*-
'''
This module defines the main API to interact with Capsul processes.
In order to execute a process, it is mandatory to have an instance of
:py:class:`CapsulEngine`. Such an instance can be created with factory
:py:func:`capsul_engine`

Classes
=======
:class:`CapsulEngine`
---------------------

Functions
=========
:func:`database_factory`
------------------------
:func:`capsul_engine`
---------------------
:func:`activate_configuration`
------------------------------
'''

from __future__ import absolute_import
from __future__ import print_function
import importlib
import json
import os
import os.path as osp
import re
import tempfile
import subprocess
import sys

from traits.api import Dict, String, Undefined

from soma.controller import Controller, controller_to_dict
from soma.serialization import to_json, from_json
from soma.sorted_dictionary import SortedDictionary
from soma.utils.weak_proxy import get_ref

from .database_json import JSONDBEngine
from .database_populse import PopulseDBEngine

from .settings import Settings
from .module import default_modules
from . import run
from .run import WorkflowExecutionError

# FIXME TODO: OBSOLETE

#Questions about API/implementation:

#* execution:
  #* workflows are not exposed, they are running a possibly different pipeline (single process case), thus we need to keep track on it
  #* logging / history / provenance, databasing
  #* retrieving output files with transfers: when ? currently in wait(), should it be a separate method ? should it be asynchronous ?
  #* setting output parameters: currently in wait(), should it be a separate method ?
  #* disconnections / reconnections client / server
  #* actually connect computing resource[s]
#* settings / config:
  #* see comments in settings.py
  #* GUI and constraints on parameters ?
  #* how to handle optional dependencies: ie nipype depends on spm if spm is installed / configured, otherwise we can run other nipype interfaces, but no spm ones
  #* integrate soma-workflow config + CE.computing_resource


[docs] class CapsulEngine(Controller): ''' A CapsulEngine is the mandatory entry point of all software using Capsul. It contains objects to store configuration and metadata, defines execution environment(s) (possibly remote) and performs pipelines execution. A CapsulEngine must be created using capsul.engine.capsul_engine function. For instance:: from capsul.engine import capsul_engine ce = capsul_engine() Or:: from capsul.api import capsul_engine ce = capsul_engine() By default, CapsulEngine only stores necessary configuration. But it may be necessary to modify the Python environment globally to apply this configuration. For instance, Nipype must be configured globally. If SPM is configured in CapsulEngine, it is necessary to explicitly activate the configuration in order to modify the global configuration of Nipype for SPM. This activation is done by explicitly activating the execution context of the capsul engine with the following code, inside a running process:: from capsul.engine import capsul_engine, activate_configuration ce = capsul_engine() # Nipype is not configured here config = capsul_engine.settings.select_configurations( 'global', {'nipype': 'any'}) activate_configuration(config) # Nipype is configured here .. note:: CapsulEngine is the replacement of the older :class:`~capsul.study_config.study_config.StudyConfig`, which is still present in Capsul 2.2 for backward compatibility, but will disappear in later versions. In Capsul 2.2 both objects exist, and are synchronized internally, which means that a StudyConfig object will also create a CapsulEngine, and the other way, and modifications in the StudyConfig object will change the corresponding item in CapsulEngine and vice versa. Functionalities of StudyConfig are moving internally to CapsulEngine, StudyConfig being merely a wrapper. **Using CapsulEngine** It is used to store configuration variables, and to handle execution within the configured context. The configuration has 2 independent axes: configuration modules, which provide additional configuration variables, and "environments" which typically represent computing resources. *Computing resources* Capsul is using :somaworkflow:`Soma-Workflow <index.html>` to run processes, and is thus able to connect and execute on a remote computing server. The remote computing resource may have a different configuration from the client one (paths for software or data, available external software etc). So configurations specific to different computing resources should be handled in CapsulEngine. For this, the configuration section is split into several configuration entries, one for each computing resource. As this is a little bit complex to handle at first, a "global" configuration (what we call "environment") is used to maintain all common configuration options. It is typically used to work on the local machine, especially for users who only work locally. Configuration is stored in a database (either internal or persistent), through the :class:`~capsul.engine.settings.Settings` object found in ``CapsulEngine.settings``. Access and modification of settings should occur within a session block using ``with capsul_engine.settings as session``. See the :class:`~capsul.engine.settings.Settings` class for details. :: >>> from capsul.api import capsul_engine >>> ce = capsul_engine() >>> config = ce.settings.select_configurations('global') >>> config = ce.global_config >>> print(config) {'capsul_engine': {'uses': {'capsul.engine.module.fsl': 'ALL', 'capsul.engine.module.matlab': 'ALL', 'capsul.engine.module.spm': 'ALL'}}} Whenever a new computing resource is used, it can be added as a new environment key to all configuration operations. Note that the settings store all possible configurations for all environments (or computing resources), but are not "activated": this is only done at runtime in specific process execution functions: each process may need to select and use a different configuration from other ones, and activate it individually. :class:`~capsul.process.process.Process` subclasses or instances may provide their configuration requirements via their :meth:`~capsul.process.process.Process.requirements` method. This method returns a dictionary of request strings (one element per needed module) that will be used to select one configuration amongst the available settings entries of each required module. *configuration modules* The configuration is handled through a set of configuration modules. Each is dedicated for a topic (for instance handling a specific external software paths, or managing process parameters completion, etc). A module adds a settings table in the database, with its own variables, and is able to manage runtime configuration of programs, if needed, through its ``activate_configurations`` function. Capsul comes with a set of predefined modules: :class:`~capsul.engine.module.attributes`, :class:`~capsul.engine.module.axon`, :class:`~capsul.engine.module.fom`, :class:`~capsul.engine.module.fsl`, :class:`~capsul.engine.module.matlab`, :class:`~capsul.engine.module.spm` **Methods** ''' def __init__(self, database_location, database, require): ''' CapsulEngine.__init__(self, database_location, database, config=None) The CapsulEngine constructor should not be called directly. Use :func:`capsul_engine` factory function instead. ''' super(CapsulEngine, self).__init__() self._settings = None self._database_location = database_location self._database = database self._loaded_modules = set() self.load_modules(require) from capsul.study_config.study_config import StudyConfig self.study_config = StudyConfig(engine=self) self._metadata_engine = from_json(database.json_value('metadata_engine')) self._connected_resource = '' @property def settings(self): if self._settings is None: self._settings = Settings(self.database.db) return self._settings @property def database(self): return self._database @property def database_location(self): return self._database_location @property def metadata_engine(self): return self._metadata_engine @metadata_engine.setter def metadata_engine(self, metadata_engine): self._metadata_engine = metadata_engine self.database.set_json_value('metadata_engine', to_json(self._metadata_engine))
[docs] def load_modules(self, require): ''' Call self.load_module for each required module. The list of modules to load is located in self.modules (if it is None, capsul.module.default_modules is used). ''' if require is None: require = default_modules for module in require: self.load_module(module)
[docs] def load_module(self, module_name): ''' Load a module if it has not already been loaded (is this case, nothing is done) A module is a fully qualified name of a Python module (as accepted by Python import statement). Such a module must define the two following functions (and may define two others, see below): def load_module(capsul_engine, module_name): def set_environ(config, environ): load_module of each module is called once before reading and applying the configuration. It can be used to add traits to the CapsulEngine in order to define the configuration options that are used by the module. Values of these traits are automatically stored in configuration in database when self.save() is used, and they are retrieved from database before initializing modules. set_environ is called in the context of the processing (i.e. on the, possibly remote, machine that runs the pipelines). It receives the configuration as a JSON compatible dictionary (for instance a CapsulEngine attribute `capsul_engine.spm.directory` would be config['spm']['directory']). The function must modify the environ dictionary to set the environment variables that must be defined for pipeline configuration. These variables are typically used by modules in capsul.in_context module to manage running external software with appropriate configuration. ''' module_name = self.settings.module_name(module_name) if module_name not in self._loaded_modules: self._loaded_modules.add(module_name) python_module = importlib.import_module(module_name) init_settings = getattr(python_module, 'init_settings', None) if init_settings is not None: init_settings(self) return True return False
# # Method imported from self.database # # TODO: take computing resource in account in the following methods def set_named_directory(self, name, path): return self.database.set_named_directory(name, path) def named_directory(self, name): return self.database.named_directory(name) def named_directories(self): return self.database.set_named_directories() def set_json_value(self, name, json_value): return self.database.set_json_value(name, json_value) def json_value(self, name): return self.database.json_value(name) def set_path_metadata(self, path, metadata, named_directory=None): return self.database.set_path_metadata(path, metadata, named_directory) def path_metadata(self, path, named_directory=None): return self.database.set_path_metadata(path, named_directory)
[docs] def import_configs(self, environment, config_dict, cont_on_error=False): ''' Import config values from a dictionary as given by :meth:`Settings.select_configurations`. Compared to :meth:`Settings.import_configs` this method (at :class:`CapsulEngine` level) also loads the required modules. ''' modules = config_dict.get('capsul_engine', {}).get('uses', {}) for module in modules: self.load_module(module) self.settings.import_configs(environment, config_dict, cont_on_error)
# # Processes and pipelines related methods #
[docs] def get_process_instance(self, process_or_id, **kwargs): ''' The only official way to get a process instance is to use this method. For now, it simply calls self.study_config.get_process_instance but it will change in the future. ''' instance = self.study_config.get_process_instance(process_or_id, **kwargs) return instance
[docs] def get_iteration_pipeline(self, pipeline_name, node_name, process_or_id, iterative_plugs=None, do_not_export=None, make_optional=None, **kwargs): """ Create a pipeline with an iteration node iterating the given process. Parameters ---------- pipeline_name: str pipeline name node_name: str iteration node name in the pipeline process_or_id: process description as in :meth:`get_process_instance` iterative_plugs: list (optional) passed to :meth:`Pipeline.add_iterative_process` do_not_export: list passed to :meth:`Pipeline.add_iterative_process` make_optional: list passed to :meth:`Pipeline.add_iterative_process` Returns ------- pipeline: :class:`Pipeline` instance """ from capsul.pipeline.pipeline import Pipeline pipeline = Pipeline() pipeline.name = pipeline_name pipeline.set_study_config(get_ref(self.study_config)) pipeline.add_iterative_process(node_name, process_or_id, iterative_plugs, do_not_export, **kwargs) pipeline.autoexport_nodes_parameters(include_optional=True) return pipeline
[docs] def start(self, process, workflow=None, history=True, get_pipeline=False, **kwargs): ''' Asynchronously start the execution of a process or pipeline in the connected computing environment. Returns an identifier of the process execution and can be used to get the status of the execution or wait for its termination. TODO: if history is True, an entry of the process execution is stored in the database. The content of this entry is to be defined but it will contain the process parameters (to restart the process) and will be updated on process termination (for instance to store execution time if possible). Parameters ---------- process: Process or Pipeline instance workflow: Workflow instance (optional - if already defined before call) history: bool (optional) TODO: not implemented yet. get_pipeline: bool (optional) if True, start() will return a tuple (execution_id, pipeline). The pipeline is normally the input pipeline (process) if it is actually a pipeline. But if the input process is a "single process", it will be inserted into a small pipeline for execution. This pipeline will be the one actually run, and may be passed to :meth:`wait` to set output parameters. Returns ------- execution_id: int execution identifier (actually a soma-workflow id) pipeline: Pipeline instance (optional) only returned if get_pipeline is True. ''' return run.start(self, process, workflow, history, get_pipeline, **kwargs)
[docs] def connect(self, computing_resource): ''' Connect the capsul engine to a computing resource ''' self._connected_resource = computing_resource
[docs] def connected_to(self): ''' Return the name of the computing resource this capsul engine is connected to or None if it is not connected. ''' return self._connected_resource
[docs] def disconnect(self): ''' Disconnect from a computing resource. ''' self._connected_resource = None
[docs] def executions(self): ''' List the execution identifiers of all processes that have been started but not disposed in the connected computing resource. Raises an exception if the computing resource is not connected. ''' raise NotImplementedError()
[docs] def dispose(self, execution_id, conditional=False): ''' Update the database with the current state of a process execution and free the resources used in the computing resource (i.e. remove the workflow from SomaWorkflow). If ``conditional`` is set to True, then dispose is only done if the configuration does not specify to keep succeeded / failed workflows. ''' run.dispose(self, execution_id, conditional=conditional)
[docs] def interrupt(self, execution_id): ''' Try to stop the execution of a process. Does not wait for the process to be terminated. ''' return run.interrupt(self, execution_id)
[docs] def wait(self, execution_id, timeout=-1, pipeline=None): ''' Wait for the end of a process execution (either normal termination, interruption or error). ''' return run.wait(self, execution_id, timeout=timeout, pipeline=pipeline)
[docs] def status(self, execution_id): ''' Return a simple value with the status of an execution (queued, running, terminated, error, etc.) ''' return run.status(self, execution_id)
[docs] def detailed_information(self, execution_id): ''' Return complete (and possibly big) information about a process execution. ''' return run.detailed_information(self, execution_id)
def call(self, process, history=True, *kwargs): return run.call(self, process, history=history, **kwargs) def check_call(self, process, history=True, **kwargs): return run.check_call(self, process, history=history, **kwargs)
[docs] def raise_for_status(self, status, execution_id=None): ''' Raise an exception if a process execution failed ''' run.raise_for_status(self, status, execution_id)
_populsedb_url_re = re.compile(r'^\w+(\+\w+)?://(.*)')
[docs] def database_factory(database_location): ''' Create a DatabaseEngine from its location string. This location can be either a sqlite file path (ending with '.sqlite' or ':memory:' for an in memory database for testing) or a populse_db URL, or None. ''' global _populsedb_url_re engine_directory = None if database_location is None: database_location = ':memory:' match = _populsedb_url_re.match(database_location) if match: path = match.groups(2) _, path = osp.splitdrive(path) if path.startswith(os.apth.sep): engine_directory = osp.abspath(osp.dirname(path)) populse_db = database_location elif database_location.endswith('.sqlite'): populse_db = 'sqlite:///%s' % database_location engine_directory = osp.abspath(osp.dirname(database_location)) elif database_location == ':memory:': populse_db = 'sqlite:///:memory:' else: raise ValueError('Invalid database location: %s' % database_location) engine = PopulseDBEngine(populse_db) if engine_directory: engine.set_named_directory('capsul_engine', engine_directory) return engine
[docs] def capsul_engine(database_location=None, require=None): ''' User facrory for creating capsul engines. If no database_location is given, it will default to an internal (in- memory) database with no persistent settings or history values. Configuration is read from a dictionary stored in two database entries. The first entry has the key 'global_config' (i.e. database.json_value('global_config')), it contains the configuration values that are shared by all processing engines. The second entry is computing_config`. It contains a dictionary with one item per computing resource where the key is the resource name and the value is configuration values that are specific to this computing resource. Before initialization of the CapsulEngine, modules are loaded. The list of loaded modules is searched in the 'modules' value in the database (i.e. in database.json_value('modules')) ; if no list is defined in the database, capsul.module.default_modules is used. ''' #if database_location is None: #database_location = osp.expanduser('~/.config/capsul/capsul_engine.sqlite') database = database_factory(database_location) capsul_engine = CapsulEngine(database_location, database, require=require) return capsul_engine
configurations = None activated_modules = set()
[docs] def activate_configuration(selected_configurations): ''' Activate a selected configuration (set of modules) for runtime. ''' global configurations configurations = selected_configurations modules = configurations.get('capsul_engine', {}).get('uses', {}).keys() for m in modules: activate_module(m)
[docs] def activate_module(module_name): ''' Activate a module configuration for runtime. This function is called by activate_configuration() and assumes the global variable ``capsul.engine.configurations`` is properly setup. ''' global activated_modules if module_name not in activated_modules: activated_modules.add(module_name) module = importlib.import_module(module_name) check_configurations = getattr(module, 'check_configurations', None) complete_configurations = getattr(module, 'complete_configurations', None) if check_configurations: error = check_configurations() if error: if complete_configurations: complete_configurations() error = check_configurations() if error: raise EnvironmentError(error) activate_configurations = getattr(module, 'activate_configurations', None) if activate_configurations: activate_configurations()