Source code for capsul.process.runprocess

# -*- coding: utf-8 -*-
#
#  This software and supporting documentation were developed by
#      CEA/DSV/SHFJ and IFR 49
#      4 place du General Leclerc
#      91401 Orsay cedex
#      France
#
# This software is governed by the CeCILL license version 2 under
# French law and abiding by the rules of distribution of free software.
# You can  use, modify and/or redistribute the software under the
# terms of the CeCILL license version 2 as circulated by CEA, CNRS
# and INRIA at the following URL "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license version 2 and that you accept its terms.

"""
capsul.process.runprocess is not a real python module, but rather an executable script with commandline arguments and options parsing. It is provided as a module just to be easily called via the python command in a portable way::

    python -m capsul.process.runprocess <process name> <process arguments>

Classes
-------
:class:`ProcessParamError`
++++++++++++++++++++++++++

Functions
---------
:func:`set_process_param_from_str`
++++++++++++++++++++++++++++++++++
:func:`get_process_with_params`
+++++++++++++++++++++++++++++++
:func:`run_process_with_distribution`
+++++++++++++++++++++++++++++++++++++
:func:`convert_commandline_parameter`
+++++++++++++++++++++++++++++++++++++
:func:`main`
++++++++++++


"""

from capsul.api import StudyConfig
from capsul.api import capsul_engine
from capsul.api import Pipeline
from capsul.attributes.completion_engine import ProcessCompletionEngine
import os
import logging
import sys, re
from optparse import OptionParser, OptionGroup
from traits.api import Undefined, List
try:
    import yaml
except ImportError:
    yaml = None
    import json
import six

# Define the logger
logger = logging.getLogger(__name__)


[docs] class ProcessParamError(Exception): ''' Exception used in the ``runprocess`` module ''' pass
[docs] def set_process_param_from_str(process, k, arg): """Set a process parameter from a string representation.""" if not process.trait(k): raise ProcessParamError("Unknown parameter {0} for process {1}" .format(k, process.name)) try: evaluate = process.trait(k).trait_type.evaluate except AttributeError: evaluate = None print('set_process_param_from_str:', process, k, repr(arg)) if evaluate: arg = evaluate(arg) setattr(process, k, arg) process.trait(k).forbid_completion = True if isinstance(process, Pipeline): process.propagate_metadata(process.pipeline_node, k, {'forbid_completion': True})
[docs] def get_process_with_params(process_name, study_config, iterated_params=[], attributes={}, *args, **kwargs): ''' Instantiate a process, or an iteration over processes, and fill in its parameters. Parameters ---------- process_name: string name (mosule and class) of the process to instantiate study_config: StudyConfig instance iterated_params: list (optional) parameters names which should be iterated on. If this list is not empty, an iteration process is built. All parameters values corresponding to the selected names should be lists with the same size. attributes: dict (optional) dictionary of attributes for completion system. *args: sequential parameters for the process. In iteration, "normal" parameters are set with the same value for all iterations, and iterated parameters dispatch their values to each iteration. **kwargs: named parameters for the process. Same as above for iterations. Returns ------- process: Process instance ''' process = study_config.get_process_instance(process_name) signature = process.user_traits() params = list(signature.keys()) # check for iterations if iterated_params: pipeline = study_config.get_process_instance(Pipeline) pipeline.add_iterative_process('iteration', process, iterated_params) pipeline.autoexport_nodes_parameters(include_optional=True) process = pipeline # transform iterated attributes into lists if needed for param, value in attributes.items(): if not isinstance(value, list) and not isinstance(value, tuple): attributes[param] = list([value]) for i, arg in enumerate(args): set_process_param_from_str(process, params[i], arg) for k, arg in six.iteritems(kwargs): set_process_param_from_str(process, k, arg) completion_engine = ProcessCompletionEngine.get_completion_engine(process) completion_engine.get_attribute_values().import_from_dict(attributes) completion_engine.complete_parameters() return process
[docs] def run_process_with_distribution( study_config, process, use_soma_workflow=False, resource_id=None, password=None, config=None, rsa_key_pass=None, queue=None, input_file_processing=None, output_file_processing=None, keep_workflow=False, keep_failed_workflow=False, write_workflow_only=None): ''' Run the given process, either sequentially or distributed through Soma-Workflow. Parameters ---------- study_config: StudyConfig instance process: Process instance the process to execute (or pipeline, or iteration...) use_soma_workflow: bool or None (default=None) if False, run sequentially, otherwise use Soma-Workflow. Its configuration has to be setup and valid for non-local execution, and additional file transfer options may be used. resource_id: string (default=None) soma-workflow resource ID, defaults to localhost password: string password to access the remote computing resource. Do not specify it if using a ssh key. config: dict (optional) Soma-Workflow config: Not used for now... rsa_key_pass: string RSA key password, for ssh key access queue: string Queue to use on the computing resource. If not specified, use the default queue. input_file_processing: brainvisa.workflow.ProcessToSomaWorkflow processing code Input files processing: local_path (NO_FILE_PROCESSING), transfer (FILE_TRANSFER), translate (SHARED_RESOURCE_PATH), or translate_shared (BV_DB_SHARED_PATH). output_file_processing: same as for input_file_processing Output files processing: local_path (NO_FILE_PROCESSING), transfer (FILE_TRANSFER), or translate (SHARED_RESOURCE_PATH). The default is local_path. keep_workflow: bool keep the workflow in the computing resource database after execution. By default it is removed. keep_failed_workflow: bool keep the workflow in the computing resource database after execution, if it has failed. By default it is removed. write_workflow_only: str if specified, this is an output filename where the workflow file will be written. The workflow will not be actually run, because int his situation the user probably wants to use the workflow on his own. ''' if write_workflow_only: use_soma_workflow = True if use_soma_workflow is not None: study_config.use_soma_workflow = use_soma_workflow if study_config.use_soma_workflow: if write_workflow_only: # Create soma workflow pipeline from capsul.pipeline.pipeline_workflow \ import workflow_from_pipeline import soma_workflow.client as swclient workflow = workflow_from_pipeline(process) swclient.Helper.serialize(write_workflow_only, workflow) return swm = study_config.modules['SomaWorkflowConfig'] resource_id = swm.get_resource_id(resource_id, set_it=True) if password is not None or rsa_key_pass is not None: swm.set_computing_resource_password(resource_id, password, rsa_key_pass) if queue is not None: if not hasattr( study_config.somaworkflow_computing_resources_config, resource_id): setattr(study_config.somaworkflow_computing_resources_config, resource_id, {}) getattr(study_config.somaworkflow_computing_resources_config, resource_id).queue = queue res = study_config.run(process) return res
def convert_commandline_parameter(i): if len(i) > 0 and ( i[0] in '[({' or i in ( 'None', 'True', 'False' ) ): try: res=eval(i) except Exception: res=i else: res = i return res # main
[docs] def main(): ''' Run the :mod:`capsul.process.runprocess` module as a commandline ''' usage = '''Usage: python -m capsul [options] processname [arg1] [arg2] ... [argx=valuex] [argy=valuey] ... Example:: python -m capsul threshold ~/data/irm.ima /tmp/th.nii threshold1=80 Named arguments (in the shape argx=valuex) may address sub-processes of a pipeline, using the dot separator:: PrepareSubject.t1mri=/home/myself/mymri.nii For a more precise description, please look at the web documentation: https://brainvisa.info/capsul/user_guide_tree/index.html Configuration: The ``--config`` option allows to pass a Capsul configuration file (a JSON dict). If you have a configuration in Brainvisa/Axon and want to convert it, use the following:: axon-runprocess capsul://capsul.engine.write_engine_config engine.json Then the file ``engine.json`` will be OK. ''' # Set up logging on stderr. This must be called before any logging takes # place, to avoid "No handlers could be found for logger" errors. logging.basicConfig() parser = OptionParser(description='Run a single CAPSUL process', usage=usage) group1 = OptionGroup( parser, 'Config', description='Processing configuration, database options') group1.add_option('--studyconfig', dest='studyconfig', help='load StudyConfig configuration from the given file (JSON)') group1.add_option('--config', dest='config', help='load Capsul engine configuration from the given file (JSON) ' '(CapsulEngine shape, not Studyconfig -- use --studyconfig otherwise)') group1.add_option('-i', '--input', dest='input_directory', help='input data directory (if not specified in ' 'studyconfig file). If not specified neither on the ' 'commandline nor study configfile, taken as the same as ' 'output.') group1.add_option('-o', '--output', dest='output_directory', help='output data directory (if not specified in ' 'studyconfig file). If not specified neither on the ' 'commandline nor study configfile, taken as the same as ' 'input.') group1.add_option('--params', dest='paramsfile', default=None, help='specify a file containing commandline parameters. ' 'The file will contain arguments for this commandline ' '(argv): it is an alternative to providing them here. ' 'It can be useful to reuse parameters, or when the ' 'parameters are too long (in a large iteration, ' 'typically). The file syntax is one line per parameter, ' 'with no further parsing. It will be processed after ' 'all the current commandline arguments, not right now ' 'as the argument appears. But if a parameter has ' 'already been set (via commandline), it will not be ' 'replaced: first set arguments have priority. If the ' 'params file itself contains a --params parameter, ' 'then another file will be read afterwards, and so on.') parser.add_option_group(group1) group2 = OptionGroup(parser, 'Processing', description='Processing options, distributed execution') group2.add_option('--swf', '--soma_workflow', dest='soma_workflow', default=False, action='store_true', help='use soma_workflow. Soma-Workflow ' 'configuration has to be setup and valid for non-local ' 'execution, and additional file transfer options ' 'may be used. The default is *not* to use SWF and ' 'process mono-processor, sequential execution.') group2.add_option('-r', '--resource_id', dest='resource_id', default=None, help='soma-workflow resource ID, defaults to localhost') group2.add_option('-w', '--write-workflow-only', dest='write_workflow', default=None, help='if specified, this is an output ' 'filename where the workflow file will be written. The ' 'workflow will not be actually run, because in this ' 'situation the user probably wants to use the workflow ' 'on his own.') group2.add_option('-p', '--password', dest='password', default=None, help='password to access the remote computing resource. ' 'Do not specify it if using a ssh key') group2.add_option('--rsa-pass', dest='rsa_key_pass', default=None, help='RSA key password, for ssh key access') group2.add_option('--queue', dest='queue', default=None, help='Queue to use on the computing resource. If not ' 'specified, use the default queue.') #group2.add_option('--input-processing', dest='input_file_processing', #default=None, help='Input files processing: local_path, ' #'transfer, translate, or translate_shared. The default is ' #'local_path if the computing resource is the localhost, or ' #'translate_shared otherwise.') #group2.add_option('--output-processing', dest='output_file_processing', #default=None, help='Output files processing: local_path, ' #'transfer, or translate. The default is local_path.') group2.add_option('--keep-succeeded-workflow', dest='keep_succeded_workflow', action='store_true', default=False, help='keep the workflow in the computing resource ' 'database after execution. By default it is removed.') group2.add_option('--delete-failed-workflow', dest='delete_failed_workflow', action='store_true', default=False, help='delete the workflow in the computing resource ' 'database after execution, if it has failed. By default ' 'it is kept.') parser.add_option_group(group2) group3 = OptionGroup(parser, 'Iteration', description='Iteration') group3.add_option('-I', '--iterate', dest='iterate_on', action='append', help='Iterate the given process, iterating over the ' 'given parameter(s). Multiple parameters may be ' 'iterated jointly using several -I options. In the ' 'process parameters, values are replaced by lists, all ' 'iterated lists should have the same size.\n' 'Ex:\n' 'python -m capsul -I par_a -I par_c a_process ' 'par_a="[1, 2]" par_b="something" ' 'par_c="[\\"one\\", \\"two\\"]"') parser.add_option_group(group3) group4 = OptionGroup(parser, 'Attributes completion') group4.add_option('-a', '--attribute', dest='attributes', action='append', default=[], help='set completion (including FOM) attribute. ' 'Syntax: attribute=value, value the same syntax as ' 'process parameters (python syntax for lists, for ' 'instance), with proper quotes if needed for shell ' 'escaping.\n' 'Ex: -a acquisition="default" ' '-a subject=\'["s1", "s2"]\'') parser.add_option_group(group4) group5 = OptionGroup(parser, 'Help', description='Help and documentation options') group5.add_option('--process-help', dest='process_help', action='store_true', default=False, help='display specified process help') parser.add_option_group(group5) parser.disable_interspersed_args() (options, args) = parser.parse_args() while options.paramsfile: pfile = options.paramsfile options.paramsfile = None with open(pfile) as f: new_argv = [l.strip() for l in f.readlines()] new_options, new_args = parser.parse_args(new_argv) for k, v in new_options.__dict__.items(): if not getattr(options, k, None): setattr(options, k, v) args += new_args if options.config: engine = capsul_engine() with open(options.config) as f: if yaml: conf = yaml.load(f, Loader=yaml.SafeLoader) else: conf = json.load(f) for env, c in conf.items(): engine.import_configs(env, c) study_config = engine.study_config elif options.studyconfig: study_config = StudyConfig( modules=StudyConfig.default_modules + ['FomConfig', 'BrainVISAConfig']) with open(options.studyconfig) as f: if yaml: scdict = yaml.load(f, Loader=yaml.SafeLoader) else: scdict = json.load(f) study_config.set_study_configuration(scdict) else: study_config = StudyConfig( modules=StudyConfig.default_modules + ['FomConfig']) study_config.read_configuration() study_config.use_fom = True if options.input_directory: study_config.input_directory = options.input_directory if options.output_directory: study_config.output_directory = options.output_directory if study_config.output_directory in (None, Undefined, '') \ and study_config.input_directory not in (None, Undefined, ''): study_config.output_directory = study_config.input_directory if study_config.input_directory in (None, Undefined, '') \ and study_config.output_directory not in (None, Undefined, ''): study_config.input_directory = study_config.output_directory study_config.somaworkflow_keep_succeeded_workflows \ = options.keep_succeded_workflow study_config.somaworkflow_keep_failed_workflows \ = not options.delete_failed_workflow kwre = re.compile(r'([a-zA-Z_](\.?[a-zA-Z0-9_])*)\s*=\s*(.*)$') attributes = {} for att in options.attributes: m = kwre.match(att) if m is None: raise SyntaxError('syntax error in attribute definition: %s' % att) attributes[m.group(1)] = convert_commandline_parameter(m.group(3)) args = tuple((convert_commandline_parameter(i) for i in args)) kwargs = {} todel = [] for arg in args: if isinstance(arg, six.string_types): m = kwre.match(arg) if m is not None: kwargs[m.group(1)] = convert_commandline_parameter(m.group(3)) todel.append(arg) args = [arg for arg in args if arg not in todel] if not args: parser.print_usage() sys.exit(2) # get the main process process_name = args[0] args = args[1:] iterated = options.iterate_on try: process = get_process_with_params(process_name, study_config, iterated, attributes, *args, **kwargs) except ProcessParamError as e: print("error: {0}".format(e), file=sys.stderr) sys.exit(1) if options.process_help: process.help() print() completion_engine \ = ProcessCompletionEngine.get_completion_engine(process) attribs = completion_engine.get_attribute_values() aval = attribs.export_to_dict() print('Completion attributes:') print('----------------------') print() print('(note: may differ depending on study config file contents, ' 'completion rules (FOM)...)') print() skipped = set(['generated_by_parameter', 'generated_by_process']) for name, value in six.iteritems(aval): if name in skipped: continue ttype = attribs.trait(name).trait_type.__class__.__name__ if isinstance(attribs.trait(name).trait_type, List): ttype += '(%s)' \ % attribs.trait(name).inner_traits[ 0].trait_type.__class__.__name__ print('%s:' % name, ttype) if value not in (None, Undefined): print(' ', value) print() del aval, attribs, completion_engine, process sys.exit(0) resource_id = options.resource_id password = options.password rsa_key_pass = options.rsa_key_pass queue = options.queue file_processing = [] study_config.use_soma_workflow = options.soma_workflow if options.soma_workflow: file_processing = [None, None] else: file_processing = [None, None] res = run_process_with_distribution( study_config, process, options.soma_workflow, resource_id=resource_id, password=password, rsa_key_pass=rsa_key_pass, queue=queue, input_file_processing=file_processing[0], output_file_processing=file_processing[1], write_workflow_only=options.write_workflow) # if there was no exception, we assume the process has succeeded. # sys.exit(0) # no error, do a dirty exit, but avoid cleanup crashes after the process # has succeeded... os._exit(0)
# otherwise it has raised an exception, exit "normally" if __name__ == '__main__': main()