Source code for capsul.study_config.memory

# -*- coding: utf-8 -*-
'''
Memory caching. Probably mostly obsolete, this code is not much used now.

Classes
=======
:class:`UnMemorizedProcess`
---------------------------
:class:`MemorizedProcess`
-------------------------
:class:`CapsulResultEncoder`
----------------------------
:class:`Memory`
---------------

Functions
=========
:func:`get_process_signature`
-----------------------------
:func:`has_attribute`
---------------------
:func:`file_fingerprint`
------------------------
'''

# System import
from __future__ import with_statement
from __future__ import absolute_import
from __future__ import print_function
import os
import hashlib
import time
import shutil
import json
import logging
import six
import sys

# CAPSUL import
from capsul.process.process import Process, ProcessResult

# TRAITS import
from traits.api import Undefined


# Define the logger
logger = logging.getLogger(__name__)


###########################################################################
# Proxy process objects
###########################################################################

[docs] class UnMemorizedProcess(object): """ This class replaces MemorizedProcess when there is no cache. It provides an identical API but does not write anything on disk. """ def __init__(self, process, verbose=1): """ Initialize the UnMemorizedProcess class. Parameters ---------- process: capsul process the process instance to wrap. verbose: int if different from zero, print console messages. """ self.process = process self.verbose = verbose def __call__(self, **kwargs): """ Call the process. .. note:: matlab process input image headers are saved since matlab tools may modify image headers. Parameters ---------- kwargs: dict (optional) should correspond to the declared process parameters. """ # Set the process inputs early to get some argument checking for name, value in six.iteritems(kwargs): self.process.set_parameter(name, value) input_parameters = self._get_process_arguments() # Information message if self.verbose != 0: print("{0}\n[Process] Calling {1}...\n{2}".format( 80 * "_", self.process.id, get_process_signature(self.process, input_parameters))) # Start a timer start_time = time.time() # Execute the process result = self.process() duration = time.time() - start_time # Information message if self.verbose != 0: msg = "{0:.1f}s, {1:.1f}min".format(duration, duration / 60.) print(max(0, (80 - len(msg))) * '_' + msg) return result def _get_process_arguments(self): """ Get the process arguments. The user process traits are accessed through the user_traits() method that returns a sorted dictionary. Returns ------- input_parameters: dict the process input parameters. """ # Store for input parameters input_parameters = {} # Go through all the user traits for name, trait in six.iteritems(self.process.user_traits()): # Get the trait value value = self.process.get_parameter(name) # Split input and output traits is_input = True if "output" in trait.__dict__ and trait.output: is_input = False # Skip undefined trait attributes and outputs if is_input and value is not Undefined: # Store the input parameter input_parameters[name] = value return input_parameters def __getattr__(self, name): """ Define behavior for when a user attempts to access an attribute of a MemorizedProcess instance. First check the MemorizedProcess object and then the Process object. Parameters ---------- name: string the name of the parameter we want to access. """ if name in self.__dict__: super(UnMemorizedProcess, self).__getattr__(name) elif name in self.process.user_traits(): return self.process.get_parameter(name) elif name in dir(self.process): return getattr(self.process, name) else: raise AttributeError( "'UnMemorizedProcess' and 'Process' objects have no attribute " "'{0}'".format(name)) def __setattr__(self, name, value): """ Define behavior for when a user attempts to set an attribute of a MemorizedProcess instance. First check the MemorizedProcess object and then the Process object. Parameters ---------- name: string the name of the parameter we want to set. value: object the parameter value. """ if ("process" in self.__dict__ and name in self.__dict__["process"].__dict__): super(self.__dict__["process_class"], self.__dict__["process"]).__setattr__(name, value) else: super(UnMemorizedProcess, self).__setattr__(name, value) def __repr__(self): """ ProcessFunc class representation. """ return "{0}({1})".format(self.__class__.__name__, self.process.id)
[docs] class MemorizedProcess(object): """ Callable object decorating a capsul process for caching its return values each time it is called. All values are cached on the filesystem, in a deep directory structure. Methods are provided to inspect the cache or clean it. """ def __init__(self, process, cachedir, timestamp=None, verbose=1): """ Initialize the MemorizedProcess class. Parameters ---------- process: capsul process the process instance to wrap. cachedir: string the directory in which the computation will be stored timestamp: float (optional) The reference time from which times in tracing messages are reported. callback: callable (optional) an optional callable called each time after the function is called. verbose: int if different from zero, print console messages. """ # Check the a process is passed self.process_class = process.__class__ self.process = process if not (isinstance(self.process_class, object) and issubclass(self.process_class, Process)): raise ValueError( "The 'process' argument should be a capsul process class, " "but '{0}' (type '{1}') was passed.".format( self.process_class, type(self.process_class))) # Check the memory directory cachedir = os.path.abspath(cachedir) if not os.path.exists(cachedir) and os.path.isdir(cachedir): raise ValueError("'base_dir' should be an existing directory.") self.cachedir = cachedir # Define the cache time if timestamp is None: timestamp = time.time() self.timestamp = timestamp # Set the documentation of the class self.__doc__ = self.process.get_help(returnhelp=True) # Store if some messages have to be displayed self.verbose = verbose def __call__(self, **kwargs): """ Call wrapped process and cache result, or read cache if available. This function returns the wrapped function output and some metadata. .. note:: matlab process input image headers are saved since matlab tools may modify image headers. Parameters ---------- kwargs: dict (optional) should correspond to the declared process parameters. """ # Set the process inputs early to get some argument checking for name, value in six.iteritems(kwargs): self.process.set_parameter(name, value) # Create the destination folder and a unique id for the current # process process_dir, process_hash, input_parameters = self._get_process_id() # Execute the process if not os.path.isdir(process_dir): # Create the destination memory folder os.makedirs(process_dir) # Try to execute the process and if an error occurred remove the # cache folder try: # Run result = self._call_process(process_dir, input_parameters) # Save the result files in the memory with the corresponding # mapping output_parameters = {} for name, trait in self.process.traits(output=True).items(): # Get the trait value value = self.process.get_parameter(name) output_parameters[name] = value file_mapping = [] self._copy_files_to_memory(output_parameters, process_dir, file_mapping) map_fname = os.path.join(process_dir, "file_mapping.json") with open(map_fname, "w") as open_file: open_file.write(json.dumps(file_mapping)) except Exception as e: # noqa: E722 print('error in MemorizedProcess.__call__:', e) shutil.rmtree(process_dir) raise # Restore the process results from the cache folder else: # Restore the memorized files map_fname = os.path.join(process_dir, "file_mapping.json") with open(map_fname, "r") as json_data: file_mapping = json.load(json_data) # Go through all mapping files for workspace_file, memory_file in file_mapping: # Determine if the workspace directory is writeable if os.access(os.path.dirname(workspace_file), os.W_OK): shutil.copy2(memory_file, workspace_file) else: logger.debug("Can't restore file '{0}', access rights are " "not sufficients.".format(workspace_file)) # Update the process output traits result = self._load_process_result(process_dir, input_parameters) return result def _copy_files_to_memory(self, python_object, process_dir, file_mapping): """ Copy file items inside the memory. Parameters ---------- python_object: object a generic python object. process_dir: str the process memory path. file_mapping: list of 2-uplet store in this structure the mapping between the workspace and the memory (workspace_file, memory_file). """ # Deal with dictionary if isinstance(python_object, dict): for val in python_object.values(): if val is not Undefined: self._copy_files_to_memory(val, process_dir, file_mapping) # Deal with tuple and list elif isinstance(python_object, (list, tuple)): for val in python_object: if val is not Undefined: self._copy_files_to_memory(val, process_dir, file_mapping) # Otherwise start the copy if the object is a file else: if (python_object is not Undefined and isinstance(python_object, six.string_types) and os.path.isfile(python_object)): fname = os.path.basename(python_object) out = os.path.join(process_dir, fname) shutil.copy2(python_object, out) file_mapping.append((python_object, out)) def _call_process(self, process_dir, input_parameters): """ Call a process. Parameters ---------- process_dir: string the directory where the cache has been written. input_parameters: dict the process input_parameters. Returns ------- result: dict the process results. """ # Information message if self.verbose != 0: print("{0}\n[Memory] Calling {1}...\n{2}".format( 80 * "_", self.process.id, get_process_signature(self.process, input_parameters))) # Start a timer start_time = time.time() # Execute the process study_config = self.process.get_study_config() caching = getattr(study_config, 'use_smart_caching', None) # avoid recusrion study_config.use_smart_caching = False result = self.process() study_config.use_smart_caching = caching duration = time.time() - start_time # Save the result in json format cache = {'parameters': dict((i, getattr(self.process, i)) for i in self.process.user_traits()), 'result': result} json_data = json.dumps(cache, sort_keys=True, check_circular=True, indent=4, cls=CapsulResultEncoder) result_fname = os.path.join(process_dir, "result.json") with open(result_fname, "w") as open_file: open_file.write(json_data) # Information message if self.verbose != 0: msg = "{0:.1f}s, {1:.1f}min".format(duration, duration / 60.) print(max(0, (80 - len(msg))) * '_' + msg) return result def _load_process_result(self, process_dir, input_parameters): """ Load the result of a process. Parameters ---------- process_dir: string the directory where the cache has been written. input_parameters: dict the process input_parameters. Returns ------- result: ProcessResult the process cached results. """ # Display an information message if self.verbose != 0: print("[Memory]: Loading {0}...".format( get_process_signature(self.process, input_parameters))) # Load the process result result_fname = os.path.join(process_dir, "result.json") if not os.path.isfile(result_fname): raise KeyError( "Non-existing cache value (may have been cleared).\n" "File {0} does not exist.".format(result_fname)) with open(result_fname, "r") as json_data: result_dict = json.load(json_data, cls=CapsulResultDecoder) ## Update the process output traits for name, value in six.iteritems(result_dict['parameters']): self.process.set_parameter(name, value) return result_dict['result'] def _get_process_id(self, **kwargs): """ Return the directory in which are persisted the result of the process called with the given arguments. Returns ------- process_dir: string the directory where the cache should be write. process_hash: string the process md5 hash. input_parameters: dict the process input_parameters. """ # Get the process id process_hash, input_parameters = self._get_argument_hash() process_dir = os.path.join(self._get_process_dir(), process_hash) return process_dir, process_hash, input_parameters def _get_argument_hash(self): """ Get a hash of the process arguments. The user process traits are accessed through the user_traits() method that returns a sorted dictionary. Some parameters are not considered during the hash computation: * if the parameter value is not defined * if the corresponding trait has an attribute 'nohash' Add the tool versions to check roughly if the running codes have changed. Returns ------- process_hash: string the process md5 hash. input_parameters: dict the process input_parameters. """ # Store for input parameters input_parameters = {} # Go through all the user traits for name, trait in six.iteritems(self.process.user_traits()): # Get the trait value value = self.process.get_parameter(name) # Split input and output traits is_input = True if "output" in trait.__dict__ and trait.output: is_input = False # Skip undefined trait attributes and outputs if is_input and value is not Undefined: # Check specific flags before hash if has_attribute(trait, "nohash", attribute_value=True, recursive=True): continue # Store the input parameter input_parameters[name] = value # Add the tool versions to check roughly if the running codes have # changed and add file path fingerprints process_parameters = input_parameters.copy() process_parameters = self._add_fingerprints(process_parameters) process_parameters["versions"] = self.process.versions # Generate the process hash hasher = hashlib.new("md5") hasher.update(json.dumps(process_parameters, sort_keys=True).encode()) process_hash = hasher.hexdigest() return process_hash, input_parameters def _add_fingerprints(self, python_object): """ Add file path fingerprints. Parameters ---------- python_object: object a generic python object. Returns ------- out: object the input object with fingerprint-file representation. """ # Deal with dictionary out = {} if isinstance(python_object, dict): for key, val in six.iteritems(python_object): if val is not Undefined: out[key] = self._add_fingerprints(val) # Deal with tuple and list elif isinstance(python_object, (list, tuple)): out = [] for val in python_object: if val is not Undefined: out.append(self._add_fingerprints(val)) if isinstance(python_object, tuple): out = tuple(out) # Otherwise start the deletion if the object is a file else: out = python_object if (python_object is not Undefined and isinstance(python_object, six.string_types) and os.path.isfile(python_object)): out = file_fingerprint(python_object) return out def _get_process_dir(self): """ Get the directory corresponding to the cache for the current process. Returns ------- process_dir: string the directory where the cache should be write. """ # Build the memory path from the process id path = [self.cachedir] path.extend(self.process.id.split(".")) process_dir = os.path.join(*path) # Guarantee the path exists on the disk if not os.path.exists(process_dir): os.makedirs(process_dir) return process_dir def __repr__(self): """ ProcessFunc class representation. """ return "{0}({1}, base_dir={2})".format( self.__class__.__name__, self.process.id, self.cachedir) def __getattr__(self, name): """ Define behavior for when a user attempts to access an attribute of a MemorizedProcess instance. First check the MemorizedProcess object and then the Process object. Parameters ---------- name: string the name of the parameter we want to access. """ if name in self.__dict__: super(MemorizedProcess, self).__getattr__(name) elif name in self.process.user_traits(): return self.process.get_parameter(name) elif name in dir(self.process): return getattr(self.process, name) else: raise AttributeError( "'MemorizedProcess' and 'Process' objects have no attribute " "'{0}'".format(name)) def __setattr__(self, name, value): """ Define behavior for when a user attempts to set an attribute of a MemorizedProcess instance. First check the MemorizedProcess object and then the Process object. Parameters ---------- name: string the name of the parameter we want to set. value: object the parameter value. """ if ("process" in self.__dict__ and name in self.__dict__["process"].__dict__): super(self.__dict__["process_class"], self.__dict__["process"]).__setattr__(name, value) else: super(MemorizedProcess, self).__setattr__(name, value)
[docs] def get_process_signature(process, input_parameters): """ Generate the process signature. Parameters ---------- process: Process a capsul process object input_parameters: dict the process input_parameters. Returns ------- signature: string the process signature. """ kwargs = ["{0}={1}".format(name, value) for name, value in six.iteritems(input_parameters)] return "{0}({1})".format(process.id, ", ".join(kwargs))
[docs] def has_attribute(trait, attribute_name, attribute_value=None, recursive=True): """ Checks if a given trait has an attribute and optionally if it is set to particular value. Parameters ---------- trait: Trait the input trait object. attribute_name: string the trait attribute name that will be checked. attribute_value: object (optional) the trait attribute axpected value. recursive: bool (optional, default True) check for the attribute in the inner traits. Returns ------- res: bool True if input given trait has an attribute and optionally if it is set to a particular value. """ # Count the number of trait having the target attribute count = 0 # Check the current trait if (attribute_name in trait.__dict__ and (trait.__dict__[attribute_name] == attribute_value or attribute_value is None)): count += 1 # Check the inner traits if recursive: if len(trait.inner_traits) > 0: for inner_trait in trait.inner_traits: count += has_attribute(inner_trait, attribute_name, attribute_value, recursive) return count > 0
[docs] def file_fingerprint(a_file): """ Computes the file fingerprint. Do not consider the file content, just the fingerprint (ie. the mtime, the size and the file location). Parameters ---------- a_file: string the file to process. Returns ------- fingerprint: tuple the file location, mtime and size. """ fingerprint = { "name": a_file, "mtime": None, "size": None } if os.path.isfile(a_file): stat = os.stat(a_file) fingerprint["size"] = str(stat.st_size) fingerprint["mtime"] = str(stat.st_mtime) return fingerprint
[docs] class CapsulResultEncoder(json.JSONEncoder): """ Deal with ProcessResult in json. """
[docs] def default(self, obj): try: import numpy except ImportError: # numpy is not here numpy = None # File special case if isinstance(obj, ProcessResult): result_dict = {} for name in ["runtime", "returncode", "inputs", "outputs"]: result_dict[name] = tuple_json_encoder(getattr(obj, name)) return result_dict # Undefined parameter special case if isinstance(obj, Undefined.__class__): return "<undefined_trait_value>" # InterfaceResult special case # avoid explicitly loading nipype: it takes much time... nipype = sys.modules.get('nipype.interfaces.base') if nipype: InterfaceResult = getattr(nipype, 'InterfaceResult') else: class InterfaceResult(object): pass if isinstance(obj, InterfaceResult): return "<skip_nipype_interface_result>" # Array special case if numpy is not None and isinstance(obj, numpy.ndarray): return obj.tolist() # Call the base class default method return json.JSONEncoder.default(self, obj)
[docs] def tuple_json_encoder(obj): """ Encode a tuple in order to save it in json format. Parameters ---------- obj: object a python object to encode. Returns ------- encobj: object the encoded object. """ if isinstance(obj, tuple): return { "__tuple__": True, "items": [tuple_json_encoder(item) for item in obj] } elif isinstance(obj, list): return [tuple_json_encoder(item) for item in obj] elif isinstance(obj, dict): return dict((tuple_json_encoder(key), tuple_json_encoder(value)) for key, value in six.iteritems(obj)) else: return obj
[docs] class CapsulResultDecoder(json.JSONDecoder): """ Deal with ProcessResult in json. """ def __init__(self, *args, **kargs): json.JSONDecoder.__init__(self, object_hook=self.object_object, *args, **kargs) def object_object(self, obj): # Tuple special case if "__tuple__" in obj: return tuple(obj["items"]) # Undefined parameter special case elif obj == "<undefined_trait_value>": return Undefined elif isinstance(obj, dict): for key, value in six.iteritems(obj): if value == "<undefined_trait_value>": obj[key] = Undefined return obj # Default else: return obj
############################################################################ # Memory manager: provide some tracking about what is computed when, to # be able to flush the disk ############################################################################
[docs] class Memory(object): """ Memory context to provide caching for processes. Attributes ---------- `cachedir`: string the location for the caching. If None is given, no caching is done. Methods ------- cache clear """ def __init__(self, cachedir): """ Initialize the Memory class. Parameters ---------- base_dir: string the directory name of the location for the caching. """ # Build the capsul memory folder if cachedir is not None: cachedir = os.path.join( os.path.abspath(cachedir), "capsul_memory") if not os.path.exists(cachedir): os.makedirs(cachedir) elif not os.path.isdir(cachedir): raise ValueError("'base_dir' should be a directory") # Define class parameters self.cachedir = cachedir self.timestamp = time.time()
[docs] def cache(self, process, verbose=1): """ Create a proxy of the given process in order to only execute the process for input parameters not cached on disk. Parameters ---------- process: capsul process the capsul Process to be wrapped and cached. verbose: int if different from zero, print console messages. Returns ------- proxy_process: MemorizedProcess object the returned object is a MemorizedProcess object, that behaves as a process object, but offers extra methods for cache lookup and management. Examples -------- Create a temporary memory folder >>> from tempfile import mkdtemp >>> mem = Memory(mkdtemp()) Here we create a callable that can be used to apply an fsl.Merge interface to files >>> from capsul.process import get_process_instance >>> nipype_fsl_merge = get_process_instance( ... "nipype.interfaces.fsl.Merge") >>> fsl_merge = mem.cache(nipype_fsl_merge) Now we apply it to a list of files. We need to specify the list of input files and the dimension along which the files should be merged. >>> results = fsl_merge(in_files=['a.nii', 'b.nii'], dimension='t') We can retrieve the resulting file from the outputs: >>> results.outputs._merged_file """ # If a proxy process is found get the encapsulated process if (isinstance(process, MemorizedProcess) or isinstance(process, UnMemorizedProcess)): process = process.process # If the cachedir is None no caching is done if self.cachedir is None: return UnMemorizedProcess(process, verbose) # Otherwise a proxy process is created else: return MemorizedProcess(process, self.cachedir, self.timestamp, verbose)
[docs] def clear(self, skips=None): """ Remove all the cache apart from those given to the method input. Parameters ---------- skips: list a list of path to keep during the cache deletion. """ # Get all memory directories to remove to_remove_folders = [] skips = skips or [] for root, dirs, files in os.walk(self.cachedir): if "result.json" and files and dirs == [] and root not in skips: to_remove_folders.append(root) # Delete memory directories for folder in to_remove_folders: shutil.rmtree(folder)
def __repr__(self): """ Memory class representation. """ return "{0}(cachedir={1})".format(self.__class__.__name__, self.cachedir)