Source code for capsul.study_config.run

# -*- coding: utf-8 -*-
'''
Process and pipeline execution management

Functions
=========
:func:`run_process`
-------------------
'''

# System import
from __future__ import absolute_import
from __future__ import print_function
import errno
import os
import logging
import six

# CAPSUL import
from capsul.study_config.memory import Memory
from capsul.process.process import Process

# TRAIT import
from traits.api import Undefined, File, Directory

# Define the logger
logger = logging.getLogger(__name__)


[docs] def run_process(output_dir, process_instance, generate_logging=False, verbose=0, configuration_dict=None, cachedir=None, **kwargs): """ Execute a capsul process in a specific directory. Parameters ---------- output_dir: str (mandatory) the folder where the process will write results. process_instance: Process (mandatory) the capsul process we want to execute. cachedir: str (optional, default None) save in the cache the current process execution. If None, no caching is done. generate_logging: bool (optional, default False) if True save the log stored in the process after its execution. verbose: int if different from zero, print console messages. configuration_dict: dict (optional) configuration dictionary Returns ------- returncode: ProcessResult contains all execution information. output_log_file: str the path to the process execution log file. """ # Message logger.info("Study Config: executing process '{0}'...".format( process_instance.id)) study_config = process_instance.get_study_config() if configuration_dict is None: configuration_dict \ = process_instance.check_requirements('global') # create directories for outputs if study_config.create_output_directories: for name, trait in process_instance.user_traits().items(): if trait.output and isinstance(trait.handler, (File, Directory)): value = getattr(process_instance, name) if value is not Undefined and value: base = os.path.dirname(value) if base and not os.path.exists(base): try: os.makedirs(base) except OSError as err: if err.errno != errno.EEXIST: raise # We have a race condition? pass if configuration_dict is None: configuration_dict = {} # clear activations for now. from capsul import engine engine.activated_modules = set() #print('activate config:', configuration_dict) engine.activate_configuration(configuration_dict) # Run if study_config.get_trait_value("use_smart_caching") in [None, False]: cachedir = None elif cachedir is None: cachedir = output_dir # Update the output directory folder if necessary if output_dir not in (None, Undefined) and output_dir: if study_config.process_output_directory: output_dir = os.path.join(output_dir, '%s-%s' % (study_config.process_counter, process_instance.name)) # Guarantee that the output directory exists if not os.path.isdir(output_dir): os.makedirs(output_dir) if study_config.process_output_directory: if 'output_directory' in process_instance.user_traits(): if (process_instance.output_directory is Undefined or not(process_instance.output_directory)): process_instance.output_directory = output_dir # Set the current directory directory if necessary if hasattr(process_instance, "_nipype_interface"): if "spm" in process_instance._nipype_interface_name: process_instance._nipype_interface.mlab.inputs.prescript += [ "cd('{0}');".format(output_dir)] # Setup the process log file output_log_file = None if generate_logging and output_dir not in (None, Undefined): output_log_file = os.path.join( os.path.basename(output_dir), os.path.dirname(output_dir) + ".json") process_instance.log_file = output_log_file # Check extra parameters name for arg_name in kwargs: # If the extra parameter name does not match with a user # trait parameter name, raise a AttributeError if arg_name not in process_instance.user_traits(): raise AttributeError( "execution of process {0} got an unexpected keyword " "argument '{1}'".format(process_instance, arg_name)) # Information message if verbose: input_parameters = {} for name, trait in six.iteritems(process_instance.user_traits()): value = process_instance.get_parameter(name) # Skip undefined trait attributes and outputs if not trait.output and value is not Undefined: # Store the input parameter input_parameters[name] = value input_parameters = ["{0}={1}".format(n, v) for n, v in six.iteritems(input_parameters)] call_with_inputs = "{0}({1})".format(process_instance.id, ", ".join(input_parameters)) print("{0}\n[Process] Calling {1}...\n{2}".format( 80 * "_", process_instance.id, call_with_inputs)) if cachedir: # Create a memory object mem = Memory(cachedir) proxy_instance = mem.cache(process_instance, verbose=verbose) # Execute the proxy process returncode = proxy_instance(**kwargs) else: for k, v in six.iteritems(kwargs): setattr(process_instance, k, v) missing = process_instance.get_missing_mandatory_parameters() if len(missing) != 0: raise ValueError('In process %s: missing mandatory parameters: %s' % (process_instance.name, ', '.join(missing))) process_instance._before_run_process() returncode = process_instance._run_process() returncode = process_instance._after_run_process(returncode) # Save the process log if generate_logging: process_instance.save_log(returncode) # Increment the number of executed process count study_config.process_counter += 1 return returncode, output_log_file