# -*- coding: utf-8 -*-
'''
File Organization Model (FOM)
=============================
A FOM is a JSON file (dictionary) describing how to make filenames from a set of attributes.
FOM definition
--------------
Ex::
{
"fom_name": "morphologist-auto-nonoverlap-1.0",
"fom_import": ["formats-brainvisa-1.0", "brainvisa-formats-3.2.0",
"shared-brainvisa-1.0"],
"attribute_definitions" : {
"acquisition" : {"default_value" : "default_acquisition"},
"analysis" : {"default_value" : "default_analysis"},
"sulci_recognition_session" : {"default_value" : "default_session"},
"graph_version": {"default_value": "3.1"},
},
"shared_patterns": {
"acquisition": "<center>/<subject>/t1mri/<acquisition>",
"analysis": "{acquisition}/<analysis>",
"recognition_analysis": "{analysis}/folds/<graph_version>/<sulci_recognition_session>_auto",
},
"processes" : {
"Morphologist" : {
"t1mri":
[["input:{acquisition}/<subject>", "images"],
"imported_t1mri":
[["output:{acquisition}/<subject>", "images"]],
"t1mri_referential":
[["output:{acquisition}/registration/RawT1-<subject>_<acquisition>", "Referential"]],
"reoriented_t1mri":
[["output:{acquisition}/<subject>", "images"]],
"t1mri_nobias":
[["output:{analysis}/nobias_<subject>", "images" ]],
"split_brain":
[["output:{analysis}/segmentation/voronoi_<subject>","images"]],
"left_graph":
[["output:{analysis}/folds/<graph_version>/<side><subject>",
"Graph and data",
{"side": "L", "labelled": "No"}]],
"left_labelled_graph":
[["output:{recognition_analysis}/<side><subject>_<sulci_recognition_session>_auto",
"Graph and data", {"side": "L"}]],
"right_graph":
[["output:{analysis}/folds/<graph_version>/<side><subject>",
"Graph and data", {"side":"R","labelled":"No"}]],
"right_labelled_graph":
[["output:{recognition_analysis}/<side><subject>_<sulci_recognition_session>_auto",
"Graph and data", {"side": "R"}]],
"Talairach_transform":
[["output:{acquisition}/registration/RawT1-<subject>_<acquisition>_TO_Talairach-ACPC",
"Transformation matrix"]]
}
}
}
The dictionary may contain:
**fom_name**: string
identifier of the FOM model. Several FOMs may coexist under different
identifiers.
**fom_import**: list
dependencies between FOMs. A Fom may import others to benefit from its
formats, patterns etc.
**attribute_definitions**: dict
a dict of predefines attributes. It is generally used to define default
values for some attributes. Each attribute defined here is also a dict. In
this sub-dict, the key "default_value" provides the attribute default
value.
Attributes don't *need* to be defined here, they are automatically defined
as they are used in path patterns. But here we can assign them additional
information (typically default values).
**shared_patterns**: dict
mapping of reusable patterns. Such patterns will be replaced with their
contents when used in the file paths patterns. To use such a pattern, place
it with brackets {} in file paths patterns::
"input:{acquisition}/<subject>"
here ``{aquisition}`` is the *shared pattern* "aquisition", and
``<subject>`` is the *attribute* "subject"
A pattern definition may also use attributes between ``<attribute_name>``
quotes, and / or reuse other patterns between ``{pattern}`` curly brackets.
**processes**: dict
dictionary of processes supported by the FOM, and path definitions for
their parameters. The dict is organized by process, then in each process a
sub-dict contains its parameters definitions. For a given parameter, a list
of the FOM rules is given: several rules are allowed.
Process names keys may be a fully qualified module/class name, or a short
identifier, or a "contextual" name: the name a process has in the context
of a pipeline.
A FOM rule is a list of 2 or 3 elements::
[patterrn, formats, attributes]
*pattern*: string
the FOM pattern for the file path of the given parameter. A pattern
starts with a directory identifier (``input``, ``output``,
``shared``), followed by a semicolon (``:``), and a file pattern which
may contain attributes (``<attrib>``) and/or shared patterns
(``{pattern}``). Ex::
"input:{acquisition}/<subject>"
*formats*: string or list
name of a format, a formats list, or a list of allowed formats names.
The formats will rule the possible file extensions.
*attributes*: dict, optional
An optional dict assigning locally some attributes values. Ex::
{"side": "left"}
The attributes values here are used both to replace attributes values in the current file path pattern, and to select the matching rule when attributes values are given externally (hm, to be checked actually).
A FOM file is a JSON file (actually a JSON/YAML extended file, to allow comments within it), which should be placed in a common directory where the FOM manager will look for it (``share/foms/``)
How to use FOMS
---------------
At higher level, they are used for instance in `CAPSUL <http://brainvisa.info/capsul/>`_.
At lower level, they are used through several classes:
* :class:`FileOrganizationModelManager` manages a set of FOMs, looks for them in a search path, reads them.
* :class:`FileOrganizationModels` represents a FOM rules set.
* :class:`AttributesToPaths` is used to convert a set of attributes into filenames (which is the main use of FOMs).
* :class:`PathToAttributes` performs the reverse operation: match attributes and determines their values from a given filename.
'''
from __future__ import absolute_import
from __future__ import print_function
import sys
import os
import os.path as osp
import stat
import time
import re
import pprint
import sqlite3
import json
import six
from six.moves import range
try:
import bz2
except ImportError:
bz2 = None
from collections import OrderedDict
try:
import yaml
[docs] class json_reader(object):
'''
This class has a single static method load that loads an
JSON file with two features not provided by all JSON readers:
- JSON syntax is extended. For instance comments are allowed.
- The order of elements in dictionaries can be preserved by
using parameter object_pairs_hook=OrderedDict (as in Python
2.7 JSON reader).
'''
@staticmethod
def load(stream, object_pairs_hook=dict):
class OrderedLoader(yaml.Loader):
pass
def construct_mapping(loader, node):
loader.flatten_mapping(node)
return object_pairs_hook(loader.construct_pairs(node))
OrderedLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
construct_mapping)
return yaml.load(stream, OrderedLoader)
except ImportError:
import json as json_reader
from soma.path import split_path
[docs]def deep_update(update, original):
'''
Recursively update a dict.
Subdict's won't be overwritten but also updated.
'''
for key, value in six.iteritems(original):
if not key in update:
update[key] = value
elif isinstance(value, dict):
deep_update(update[key], value)
elif value != update[key]:
raise ValueError('In deep_update, for key %s, cannot merge %s and %s' %
(repr(key), repr(update[key]), repr(value)))
[docs]def read_json(file_name):
''' Read a json-like file using yaml or json.
In case of failure, issue a clearer message with filename, and when
appropriate a warning about yaml not being installed.
'''
try:
with open(file_name, 'r') as f:
return json_reader.load(f, object_pairs_hook=OrderedDict)
except ValueError as e:
if json_reader.__name__ != 'yaml':
extra_msg = ' Check your python installation, and perhaps un a "pip install PyYAML" or "easy_install PyYAML"'
else:
extra_msg = ''
raise ValueError('%s: %s. This may be due to yaml module not installed.%s' %
(file_name, str(e), extra_msg))
class DirectoryAsDict(object):
def __new__(cls, directory, cache=None):
if osp.isdir(directory):
return super(DirectoryAsDict, cls).__new__(cls, directory, cache)
else:
with open(directory) as f:
return json.load(f)
def __init__(self, directory, cache=None):
self.directory = directory
if cache is None:
self.cache = DirectoriesCache()
else:
self.cache = cache
def __repr__(self):
return '<DirectoryAsDict( %s )>' % repr(self.directory)
def iteritems(self):
st_content = self.cache.get_directory(self.directory)
if st_content is not None:
st, content = st_content
for i in six.iteritems(content):
yield i
else:
try:
listdir = os.listdir(self.directory)
except OSError:
yield '', [None, None]
return
for name in listdir:
full_path = osp.join(self.directory, name)
st_content = self.cache.get_directory(full_path)
if st_content is not None:
yield st_content
else:
st = os.stat(full_path)
if stat.S_ISDIR(st.st_mode):
yield (name, [tuple(st), DirectoryAsDict(full_path)])
else:
yield (name, [tuple(st), None])
@staticmethod
def get_directory(directory, debug=None):
return DirectoryAsDict._get_directory(
directory, debug, 0, 0, 0, 0, 0, 0, 0)[0]
@staticmethod
def _get_directory(directory, debug, directories, files, links,
files_size, path_size, errors, count):
try:
listdir = os.listdir(directory)
result = {}
except OSError:
errors += 1
result = None
if result is not None:
for name in listdir:
if debug and count % 100 == 0:
debug.info('%s files=%d, directories=%d, size=%d'
% (time.asctime(), files + links, directories,
files_size))
path_size += len(name)
count += 1
full_path = osp.join(directory, name)
st = os.lstat(full_path)
if stat.S_ISREG(st.st_mode):
files += 1
files_size += st.st_size
result[name] = [tuple(st), None]
elif stat.S_ISDIR(st.st_mode):
content, directories, files, links, files_size, \
path_size, errors, count = \
DirectoryAsDict._get_directory(full_path, debug,
directories +
1, files, links, files_size,
path_size, errors, count)
result[name] = [tuple(st), content]
else:
links += 1
result[name] = [tuple(st), None]
return result, directories, files, links, files_size, path_size, \
errors, count
@staticmethod
def paths_to_dict(*paths):
result = {}
for path in paths:
current_dir = result
path_list = split_path(path)
for name in path_list[:-1]:
st_content = current_dir.setdefault(name, [None, {}])
if st_content[1] is None:
st_content[1] = {}
current_dir = st_content[1]
current_dir.setdefault(path_list[-1], [None, None])
return result
@staticmethod
def get_statistics(dirdict, debug=None):
return DirectoryAsDict._get_statistics(
dirdict, debug, 0, 0, 0, 0, 0, 0, 0)[:-1]
@staticmethod
def _get_statistics(dirdict, debug, directories, files, links, files_size,
path_size, errors, count):
if debug and count % 100 == 0:
debug.info('%s files=%d, directories=%d, size=%d'
% (time.asctime(), files + links, directories, files_size))
count += 1
for name, content in six.iteritems(dirdict):
path_size += len(name)
st, content = content
if st:
st = os.stat(st)
if stat.S_ISREG(st.st_mode):
files += 1
files_size += st.st_size
elif stat.S_ISDIR(st.st_mode):
if content is None:
directories += 1
errors += 1
else:
directories, files, links, files_size, path_size, \
errors, count \
= DirectoryAsDict._get_statistics(
content, debug, directories + 1, files, links,
files_size, path_size, errors, count)
else:
links += 1
else:
errors += 1
return (directories, files, links, files_size, path_size, errors,
count)
class DirectoriesCache(object):
def __init__(self):
self.directories = {}
def add_directory(self, directory, content=None, debug=None):
if content is None:
st = tuple(os.stat(directory))
content = DirectoryAsDict.get_directory(directory, debug=debug)
else:
st = None
self.directories[directory] = [st, content]
def remove_directory(self, directory):
del self.directories[directory]
def has_directory(self, directory):
return directory in self.directories
def get_directory(self, directory):
return self.directories.get(directory)
def save(self, path):
if bz2:
f = bz2.BZ2File(path, 'w')
else:
f = open(path, 'w')
with f:
json.dump(self.directories, f)
@classmethod
def load(cls, path):
result = cls()
if bz2:
try:
with bz2.BZ2File(path, 'r') as f:
result.directories = json.load(f)
except IOError:
with open(path, 'r') as f:
result.directories = json.load(f)
else:
with open(path, 'r') as f:
result.directories = json.load(f)
return result
[docs]class FileOrganizationModelManager(object):
'''
Manage the discovery and instanciation of available FileOrganizationModel
(FOM). A FOM can be represented as a YAML/JSON file (or a series of
YAML/JSON files in a directory). This class allows to identify these files
contained in a predefined set of directories (see find_fom method) and to
instanciate a FileOrganizationModel for each identified file (see get_fom
method).
'''
def __init__(self, paths=None):
'''
Create a FOM manager that will use the given paths to find available FOMs.
'''
if paths is None:
paths = [osp.join(osp.dirname(osp.dirname(osp.dirname(__file__))),
'share', 'foms')]
self.paths = paths
self._cache = None
[docs] def find_foms(self):
'''Return a list of file organisation model (FOM) names.
These FOMs can be loaded with load_foms. FOM files (or directories) are
looked for in self.paths.'''
#print('*** find_foms ***')
#import time
#t0 = time.time()
self._cache = {}
for path in self.paths:
# print(' ', path)
if os.path.isdir(path):
for i in os.listdir(path):
full_path = osp.join(path, i)
if osp.isdir(full_path):
for ext in ('.json', '.yaml'):
main_file = osp.join(full_path, i + ext)
if osp.exists(main_file):
d = read_json(main_file)
name = d.get('fom_name')
if not name:
raise ValueError(
'file %s does not contain fom_name'
% main_file)
self._cache[name] = full_path
elif i.endswith('.json') or i.endswith('.yaml'):
d = read_json(full_path)
if d:
name = d.get('fom_name')
if not name:
raise ValueError(
'file %s does not contain fom_name'
% full_path)
self._cache[name] = full_path
#print(' find_foms done: %f s' % (time.time() - t0))
return list(self._cache.keys())
[docs] def fom_files(self):
'''Return a list of file organisation model (FOM) names, as in
:meth:`find_foms`, but does not clear and reload the cache.
These FOMs can be loaded with load_foms. FOM files (or directories) are
looked for in self.paths.'''
if not self._cache:
self.find_foms()
return list(self._cache.keys())
def clear_cache(self):
self._cache = None
def load_foms(self, *names):
if self._cache is None:
self.find_foms()
foms = FileOrganizationModels()
for name in names:
foms.import_file(self._cache[name], foms_manager=self)
return foms
def file_name(self, fom):
if self._cache is None:
self.find_foms()
return self._cache[fom]
def read_definition(self, fom_name, done=None):
jsons = OrderedDict()
stack = [fom_name]
while stack:
fom_name = stack.pop(0)
if fom_name not in jsons:
json = jsons[fom_name] = read_json(self.file_name(fom_name))
stack.extend(json.get('fom_import', []))
jsons = list(jsons.values())
result = jsons.pop(0)
for json in jsons:
for n in ('attribute_definitions', 'formats', 'format_lists', 'shared_patterns', 'patterns', 'processes'):
d = json.get(n)
if d:
deep_update(d, result.get(n, {}))
result[n] = d
r = json.get('rules', [])
if r:
result.setdefault('rules', []).extend(r)
return result
class FileOrganizationModels(object):
def __init__(self):
self._directories_regex = re.compile(r'{([A-Za-z][A-Za-z0-9_]*)}')
self._attributes_regex = re.compile('<([^>]+)>')
self.fom_names = []
self.attribute_definitions = {
"fom_name": {
"descr": "File Organization Model (FOM) in which a pattern is defined.",
"values": set(self.fom_names),
},
"fom_format": {
"descr": "Format of a file.",
"values": set(),
}
}
self.formats = {}
self.format_lists = {}
self.shared_patterns = {}
self.patterns = {}
self.rules = []
def _expand_shared_pattern(self, pattern):
expanded_pattern = []
last_end = 0
for match in self._directories_regex.finditer(pattern):
c = pattern[last_end: match.start()]
if c:
expanded_pattern.append(c)
attribute = match.group(1)
expanded_pattern.append(self.shared_patterns[attribute])
last_end = match.end()
if expanded_pattern:
last = pattern[last_end:]
if last:
expanded_pattern.append(last)
return ''.join(expanded_pattern)
else:
return pattern
def import_file(self, file_or_dict, foms_manager=None):
if not isinstance(file_or_dict, dict):
json_dict = read_json(file_or_dict)
else:
json_dict = file_or_dict
foms = json_dict.get('fom_import', [])
if foms and foms_manager is None:
raise RuntimeError(
'Cannot import FOM because no FileOrganizationModelManager has been provided')
for fom in foms:
self.import_file(
foms_manager.file_name(fom), foms_manager=foms_manager)
fom_name = json_dict['fom_name']
if fom_name in self.fom_names:
return
self.fom_names.append(fom_name)
# Update attribute definitions
attribute_definitions = json_dict.get('attribute_definitions')
if attribute_definitions:
for attribute, definition in six.iteritems(attribute_definitions):
existing_definition = self.attribute_definitions.get(
attribute)
values = definition.get('values')
if existing_definition:
existing_values = existing_definition.get('values')
if (existing_values is None) != bool(values is None):
raise ValueError(
'Incompatible values redefinition for attribute %s' % attribute)
if (definition.get('default_value') is None) != (existing_definition.get('default_value') is None):
raise ValueError(
'Incompatible default value redefinition of attribute %s' % attribute)
if values:
existing_values.extend(values)
else:
definition = definition.copy()
if values is not None:
definition['values'] = set(values)
self.attribute_definitions[attribute] = definition
# Process shared patterns to expand the ones that reference other shared
# patterns
self.formats.update(json_dict.get('formats', {}))
self.format_lists.update(json_dict.get('format_lists', {}))
self.shared_patterns.update(json_dict.get('shared_patterns', {}))
if self.shared_patterns:
stack = list(self.shared_patterns.items())
while stack:
name, pattern = stack.pop()
if isinstance(pattern, list):
if pattern and isinstance(pattern[0], six.string_types):
pattern[0] = self._expand_shared_pattern(pattern[0])
else:
for i in pattern:
i[0] = self._expand_shared_pattern(i[0])
else:
expanded_pattern = self._expand_shared_pattern(pattern)
if expanded_pattern != pattern:
stack.append((name, expanded_pattern))
else:
self.shared_patterns[name] = pattern
rules = json_dict.get('rules')
patterns = json_dict.get('patterns', {}).copy()
processes = json_dict.get('processes')
if rules:
patterns['fom_dummy'] = rules
new_patterns = {}
self._expand_json_patterns(
patterns, new_patterns, {'fom_name': fom_name})
self._parse_patterns(new_patterns, self.patterns)
if processes:
process_patterns = OrderedDict()
for process, parameters in six.iteritems(processes):
process_dict = OrderedDict()
process_patterns[process] = process_dict
for parameter, rules in six.iteritems(parameters):
if isinstance(rules, six.string_types):
rules = self.shared_patterns[rules[1:-1]]
parameter_rules = []
process_dict[parameter] = parameter_rules
for rule in rules:
if len(rule) == 2:
pattern, formats = rule
rule_attributes = {}
else:
try:
pattern, formats, rule_attributes = rule
except Exception as e:
print('error in FOM: %s, process: %s, param: '
'%s, rule:'
% (fom_name, process, parameter), rule)
raise
rule_attributes['fom_process'] = process
rule_attributes['fom_parameter'] = parameter
parameter_rules.append(
[pattern, formats, rule_attributes])
new_patterns = OrderedDict()
self._expand_json_patterns(
process_patterns, new_patterns, {'fom_name': fom_name})
self._parse_patterns(new_patterns, self.patterns)
def get_attributes_without_value(self):
att_no_value = {}
for att in self.shared_patterns:
if not att.startswith('shared.'):
for attrec in self._attributes_regex.findall(self.shared_patterns[att]):
if attrec not in att_no_value:
att_no_value[attrec] = ''
return att_no_value
def selected_rules(self, selection, debug=None):
if selection:
format = selection.get('format')
for rule_pattern, rule_attributes in self.rules:
if debug:
debug.debug('selected_rules: %s, %s' %
(repr(rule_pattern), repr(rule_attributes)))
rule_formats = rule_attributes.get('fom_formats', [])
if format:
if format in ('fom_first', 'fom_preferred'):
if not rule_formats:
if debug:
debug.debug(
'selected_rules: -- no format in rule')
continue
elif format not in rule_formats:
if debug:
debug.debug('selected_rules: -- format %s not in %s' % (repr(
format), repr(rule_formats)))
continue
keep = True
for attribute, selection_value in six.iteritems(selection):
if attribute == 'format':
continue
rule_value = rule_attributes.get(attribute)
if rule_value is None or rule_value != selection_value:
if debug:
debug.debug('selected_rules: -- selection value %s != rule value %s' %
(repr(selection_value), repr(rule_value)))
keep = False
break
if keep:
if debug:
debug.debug('selected_rules: ++')
yield (rule_pattern, rule_attributes)
else:
for rule in self.rules:
yield rule
def _expand_json_patterns(self, json_patterns, parent, parent_attributes):
attributes = parent_attributes.copy()
attributes.update(json_patterns.get('fom_attributes', {}))
for attribute, value in six.iteritems(attributes):
if attribute not in self.attribute_definitions:
self.attribute_definitions[
attribute] = {'values': set((value,))}
else:
values = self.attribute_definitions[
attribute].setdefault('values', set())
values.add(value)
key_attribute = json_patterns.get('fom_key_attribute', None)
if key_attribute:
self.attribute_definitions.setdefault(key_attribute, {})
# raise ValueError( 'Attribute "%s" must be declared in
# attribute_definitions' % key_attribute )
for key, value in six.iteritems(json_patterns):
if key.startswith('fom_') and key != 'fom_dummy':
continue
if key_attribute:
attributes[key_attribute] = key
self.attribute_definitions[key_attribute].setdefault(
'values', set()).add(key)
if isinstance(value, dict):
self._expand_json_patterns(
value, parent.setdefault(key, OrderedDict()), attributes)
else:
rules = []
parent[key] = rules
for rule in value:
if len(rule) == 2:
pattern, format_list = rule
rule_attributes = attributes.copy()
else:
pattern, format_list, rule_attributes = rule
for attribute, value in six.iteritems(rule_attributes):
definition = self.attribute_definitions.setdefault(
attribute, {})
values = definition.setdefault('values', set())
values.add(value)
if attributes:
new_attributes = attributes.copy()
new_attributes.update(rule_attributes)
rule_attributes = new_attributes
# Expand format_list
rule_formats = []
if isinstance(format_list, six.string_types):
format_list = [format_list]
if format_list:
for format in format_list:
formats = self.format_lists.get(format)
if formats is not None:
for f in formats:
rule_formats.append(f)
else:
rule_formats.append(format)
rule_attributes['fom_formats'] = rule_formats
# Expand patterns in rules
while True:
expanded_pattern = []
last_end = 0
for match in self._directories_regex.finditer(pattern):
c = pattern[last_end: match.start()]
if c:
expanded_pattern.append(c)
attribute = match.group(1)
expanded_pattern.append(
self.shared_patterns[attribute])
last_end = match.end()
if expanded_pattern:
last = pattern[last_end:]
if last:
expanded_pattern.append(last)
pattern = ''.join(expanded_pattern)
else:
break
rules.append([pattern, rule_attributes])
def _parse_patterns(self, patterns, dest_patterns):
for key, value in six.iteritems(patterns):
if isinstance(value, dict):
self._parse_patterns(
value, dest_patterns.setdefault(key, OrderedDict()))
else:
pattern_rules = dest_patterns.setdefault(key, [])
for rule in value:
pattern, rule_attributes = rule
for attribute in self._attributes_regex.findall(pattern):
s = attribute.find('|')
if s > 0:
attribute = attribute[:s]
definition = self.attribute_definitions.setdefault(
attribute, {})
value = rule_attributes.get(attribute)
if value is not None:
definition.setdefault('values', set()).add(value)
elif 'fom_open_value' not in definition:
definition['fom_open_value'] = True
# raise ValueError( 'Attribute "%s" must be
# declared in attribute_definitions' % attribute )
if attribute in rule_attributes:
pattern = pattern.replace(
'<' + attribute + '>', rule_attributes[attribute])
i = pattern.find(':')
if i > 0:
rule_attributes['fom_directory'] = pattern[:i]
pattern = pattern[i + 1:]
pattern_rules.append([pattern, rule_attributes])
self.rules.append([pattern, rule_attributes])
def pprint(self, out=sys.stdout):
for i in ('fom_names', 'attribute_definitions', 'formats', 'format_lists', 'shared_patterns', 'patterns', 'rules'):
print('-' * 20, i, '-' * 20, file=out)
pprint.pprint(getattr(self, i), out)
[docs]class PathToAttributes(object):
'''
Utility class for file paths -> attributes set transformation.
Part of the FOM engine.
'''
def __init__(self, foms, selection=None):
self._attributes_regex = re.compile('<([^>]+)>')
self.hierarchical_patterns = OrderedDict()
for rule_pattern, rule_attributes in foms.selected_rules(selection):
rule_formats = rule_attributes.get('fom_formats', [])
parent = self.hierarchical_patterns
attributes_found = set()
splited_pattern = rule_pattern.split('/')
count = 0
for pattern in splited_pattern:
count += 1
regex = ['^']
last_end = 0
for match in self._attributes_regex.finditer(pattern):
c = pattern[last_end: match.start()]
if c:
regex.append(re.escape(c))
attribute = match.group(1)
s = attribute.find('|')
if s > 0:
attribute_re = attribute[s + 1:]
attribute = attribute[:s]
else:
attribute_re = '[^/]*'
if attribute in attributes_found:
regex.append('%(' + attribute + ')s')
else:
attribute_type = foms.attribute_definitions[attribute]
values = attribute_type.get('values')
if values and not attribute_type.get('fom_open_value', True):
regex.append(
'(?P<%s>%s)' % (attribute, '|'.join('(?:' + re.escape(i) + ')' for i in values)))
else:
regex.append(
'(?P<%s>%s)' % (attribute, attribute_re))
attributes_found.add(attribute)
last_end = match.end()
last = pattern[last_end:]
if last:
regex.append(re.escape(last))
if count == len(splited_pattern):
if rule_formats:
for format in rule_formats:
if format not in foms.formats:
print('format "%s" not if FOM "%s"'
% (format, foms.fom_names))
extension = foms.formats[format]
d = rule_attributes.copy()
d['fom_format'] = format
d.pop('fom_formats', None)
parent.setdefault(''.join(regex) + '$', [OrderedDict(), OrderedDict()])[
0].setdefault(extension, []).append(d)
else:
parent.setdefault(''.join(regex) + '$', [OrderedDict(), OrderedDict()])[
0].setdefault('', []).append(rule_attributes)
else:
parent = parent.setdefault(
''.join(regex) + '$', [OrderedDict(), OrderedDict()])[1]
def pprint(self, file=sys.stdout):
self._pprint(file, self.hierarchical_patterns, 0)
def _pprint(self, file, node, indent):
if node:
print(' ' * indent + '{', file=file)
for pattern, rules_subpattern in six.iteritems(node):
ext_rules, subpattern = rules_subpattern
print(' ' * (indent + 1) + repr(pattern) + ': { (', file=file)
if ext_rules:
print(' ' * (indent + 1) + '{', file=file)
for ext, rules in six.iteritems(ext_rules):
print(' ' * \
(indent + 2) + repr(ext) + ': ', repr(rules),
file=file)
print(' ' * (indent + 1) + '},', file=file)
else:
print(' ' * (indent + 1) + '{},', file=file)
self._pprint(file, subpattern, indent + 1)
print('),', file=file)
print(' ' * indent + '}', file=file, end=' ')
else:
print(' ' * indent + '{}', file=file, end=' ')
def parse_directory(self, dirdict, single_match=False, all_unknown=False, log=None):
if isinstance(dirdict, six.string_types):
dirdict = DirectoryAsDict.paths_to_dict(dirdict)
return self._parse_directory(dirdict, [([], self.hierarchical_patterns, {})], single_match, all_unknown, log)
def _parse_directory(self, dirdict, parsing_list, single_match, all_unknown, log):
for name, content in six.iteritems(dirdict):
st, content = content
# Split extention on left most dot
l = name.split('.')
possible_extension_split = [('.'.join(l[:i]), '.'.join(l[i:]))
for i in range(1, len(l) + 1)]
# split = name.split('.', 1)
# name_no_ext = split[0]
# if len(split) == 2:
# ext = split[1]
# else:
# ext = ''
matched_directories = []
matched = False
sent = False
recurse_parsing_list = []
for path, hierarchical_patterns, pattern_attributes in parsing_list:
if log:
log.debug('?? ' + name + ' ' + repr(
pattern_attributes) + ' ' + repr(list(hierarchical_patterns.keys())))
branch_matched = False
for pattern, rules_subpattern \
in six.iteritems(hierarchical_patterns):
stop_parsing = False
for name_no_ext, ext in possible_extension_split:
ext_rules, subpattern = rules_subpattern
pattern = pattern % pattern_attributes
match = re.match(pattern, name_no_ext)
if log:
log.debug(
'try %s for %s' % (repr(pattern), repr(name_no_ext)))
if match:
if log:
log.debug('match ' + pattern)
new_attributes = match.groupdict()
new_attributes.update(pattern_attributes)
rules = ext_rules.get(ext)
if (subpattern and
not ext and
(st is None or
stat.S_ISDIR(st[0])) and
content is not None):
matched = branch_matched = True
stop_parsing = single_match
full_path = path + [name]
if log:
log.debug('directory matched: %s %s' % (
repr(full_path), (repr([i[0] for i in six.iteritems(content)]) if content else None)))
matched_directories.append(
(full_path, subpattern, new_attributes))
else:
if log:
log.debug(
'no directory matched for %s' % repr(name))
if rules is not None and ext:
matched = branch_matched = True
if log:
log.debug(
'extension matched: ' + repr(ext))
for rule_attributes in rules:
yield_attributes = new_attributes.copy()
yield_attributes.update(rule_attributes)
stop_parsing = single_match or yield_attributes.pop(
'fom_stop_parsing', False)
if log:
log.debug(
'-> ' + '/'.join(path + [name]) + ' ' + repr(yield_attributes))
sent = True
yield path + [name], st, yield_attributes
break
else:
if log:
log.debug(
'no extension matched: ' + repr(ext))
if stop_parsing:
break
if stop_parsing:
break
if branch_matched:
for full_path, subpattern, new_attributes in matched_directories:
if content:
recurse_parsing_list.append(
(full_path, subpattern, new_attributes))
if recurse_parsing_list:
for i in self._parse_directory(content, recurse_parsing_list, single_match, all_unknown, log):
yield i
if not matched and all_unknown:
if log:
log.debug('-> ' + '/'.join(path + [name]) + ' None')
sent = True
yield path + [name], st, None
if content:
for i in self._parse_unknown_directory(content, path + [name], log):
yield i
if not sent and all_unknown:
if log:
log.debug('-> ' + '/'.join(path + [name]) + ' None')
yield path + [name], st, None
def _parse_unknown_directory(self, dirdict, path, log):
for name, content in six.iteritems(dirdict):
st, content = content
if log:
log.debug('?-> ' + '/'.join(path + [name]) + ' None')
yield path + [name], st, None
if content is not None:
for i in self._parse_unknown_directory(content, path + [name], log):
yield i
def parse_path(self, path, single_match=False, log=None):
dirdict = DirectoryAsDict.paths_to_dict(path)
spath = split_path(path)
for p, s, a in self.parse_directory(dirdict, single_match=single_match, log=log):
if spath == p:
yield (p, s, a)
[docs]class AttributesToPaths(object):
'''
Utility class for attributes set -> file paths transformation.
Part of the FOM engine.
'''
def __init__(self, foms, selection=None, directories={}, preferred_formats=set(), debug=None):
self.foms = foms
self.selection = selection or {}
self.directories = directories
self._db = sqlite3.connect(':memory:', check_same_thread=False)
self._db.execute('PRAGMA journal_mode = OFF;')
self._db.execute('PRAGMA synchronous = OFF;')
self.all_attributes = tuple(
i for i in self.foms.attribute_definitions if i != 'fom_formats')
self.default_values = dict(
(i, self.foms.attribute_definitions[i]['default_value']) for i in self.all_attributes if 'default_value' in self.foms.attribute_definitions[i])
self.non_discriminant_attributes = set(
i for i in self.all_attributes if not self.foms.attribute_definitions[i].get('discriminant', True))
fom_format_index = self.all_attributes.index('fom_format')
sql = 'CREATE TABLE rules ( %s, _fom_first, _fom_preferred_format, _fom_rule )' % ','.join(repr('_' + str(i))
for i in self.all_attributes)
if debug:
debug.debug(sql)
self._db.execute(sql)
columns = ['_%s' %
i for i in self.all_attributes + ('fom_first', 'fom_preferred_format')]
sql = 'CREATE INDEX rules_index ON rules (%s)' % ','.join(columns)
self._db.execute(sql)
for i in columns:
sql = 'CREATE INDEX rules%s_index ON rules (%s)' % (i, i)
self._db.execute(sql)
sql_insert = 'INSERT INTO rules VALUES ( %s )' % ','.join(
'?' for i in range(len(self.all_attributes) + 3))
self.rules = []
for pattern, rule_attributes in foms.selected_rules(self.selection, debug=debug):
if debug:
debug.debug(
'pattern: ' + pattern + ' ' + repr(rule_attributes))
pattern_attributes = set((i if '|' not in i else i[: i.find('|')])
for i in self.foms._attributes_regex.findall(pattern))
values = []
for attribute in self.all_attributes:
value = rule_attributes.get(attribute)
if not value and attribute in pattern_attributes:
value = ''
values.append(value)
values.append(True)
values.append(False)
values.append(len(self.rules))
self.rules.append(
(re.sub(r'<([^>|]*)(\|[^>]*)?>', r'%(\1)s', pattern), rule_attributes))
fom_formats = rule_attributes.get('fom_formats')
if fom_formats and 'fom_format' not in rule_attributes:
first = True
for format in fom_formats:
if format in preferred_formats:
preferred_format = format
break
else:
preferred_format = fom_formats[0]
sys.stdout.flush()
for format in fom_formats:
values[fom_format_index] = format
values[-3] = first
values[-2] = bool(format == preferred_format)
first = False
if debug:
debug.debug(sql_insert + ' ' + repr(values))
self._db.execute(sql_insert, values)
else:
if debug:
debug.debug(sql_insert + ' ' + repr(values))
self._db.execute(sql_insert, values)
self._db.commit()
def find_paths(self, attributes={}, debug=None):
if debug:
debug.debug('!find_path! %s' % repr(attributes))
d = self.selection.copy()
d.update(attributes)
attributes = d
select = []
values = []
selection_attributes = {}
default_values = []
for attribute in self.all_attributes:
value = attributes.get(attribute)
if value is None:
value = self.selection.get(attribute)
if value is None:
default_value = self.default_values.get(attribute)
if default_value is not None:
default_values.append((attribute, default_value))
if attribute not in self.non_discriminant_attributes:
select.append(
'(_' + attribute + " IN ('','%s') OR _" % default_value + attribute + ' IS NULL )')
else:
if attribute not in self.non_discriminant_attributes:
select.append(
'(_' + attribute + " != '' OR _" + attribute + ' IS NULL )')
elif attribute == 'fom_format':
selected_format = attributes.get('fom_format')
if selected_format == 'fom_first':
select.append('_fom_first = 1')
elif selected_format == 'fom_preferred':
select.append('_fom_preferred_format = 1')
elif isinstance(value, list):
select.append('_' + attribute + " IN (%s)" %
','.join('?' for i in value))
values.extend(value)
else:
select.append('_' + attribute + " = ?")
values.append(value)
elif isinstance(value, list):
if attribute not in self.non_discriminant_attributes:
select.append('_' + attribute + " IN ( %s, '' )" %
','.join('?' for i in value))
values.extend(value)
else:
if attribute not in self.non_discriminant_attributes:
select.append('_' + attribute + " IN ( ?, '' )")
values.append(value)
selection_attributes[attribute] = value
columns = ['_fom_rule', '_fom_format'] + ['_' + i[0]
for i in default_values]
sql = 'SELECT %s FROM rules WHERE %s' % (','.join(columns), ' AND '.join(
select))
if debug:
debug.debug('!sql! %s' %
(sql.replace('?', '%s') % tuple(repr(i) for i in values)))
for row in self._db.execute(sql, values):
rule_index, format = row[:2]
row = row[2:]
# bool_output = False
rule, rule_attributes = self.rules[rule_index]
rule_attributes = rule_attributes.copy()
default_attributes = {}
for i in range(len(default_values)):
if not row[i]:
rule_attributes[
default_values[i][0]] = default_values[i][1]
default_attributes[
default_values[i][0]] = default_values[i][1]
# rule_attributes = self.foms.rules[ rule_index ][ 1 ].copy()
fom_formats = rule_attributes.pop('fom_formats', [])
# if rule_attributes.get( 'fom_directory' ) == 'output':
# bool_output=True
if debug:
debug.debug('!rule matching! %s' %
repr((rule, fom_formats, rule_attributes)))
if format:
ext = self.foms.formats[format]
if ext != '':
ext = '.' + ext
rule_attributes['fom_format'] = format
default_attributes.update(attributes)
try:
path = rule % default_attributes + ext
except KeyError:
continue
if debug:
debug.debug('!single format! %s: %s' % (
format, path))
r = self._join_directory(
path, rule_attributes,
selection_attributes)
if r:
if debug:
debug.debug('!-->! %s' % repr(r))
yield r
else:
if fom_formats:
for f in fom_formats:
ext = self.foms.formats[f]
if ext != '':
ext = '.' + ext
rule_attributes['fom_format'] = f
default_attributes.update(attributes)
try:
path = rule % default_attributes + ext
except KeyError:
continue
if debug:
debug.debug('!format from fom_formats! %s: %s' %
(f, path))
r = self._join_directory(
path, rule_attributes,
selection_attributes)
if r:
if debug:
debug.debug('!-->! %s' % repr(r))
yield r
else:
default_attributes.update(attributes)
try:
path = rule % default_attributes
except KeyError:
continue
if debug:
debug.debug('!no format! %s' % path)
r = self._join_directory(
path, rule_attributes,
selection_attributes)
if r:
if debug:
debug.debug('!-->! %s' % repr(r))
yield r
def find_discriminant_attributes(self, **selection):
result = []
if self.rules:
for attribute in self.all_attributes:
sql = 'SELECT DISTINCT "%s" FROM rules' % ('_' + attribute)
if selection:
sql += ' WHERE ' + \
' AND '.join('_' + i + ' = ?' for i in selection)
values = list(self._db.execute(sql,
list(selection.values())))
else:
values = list(self._db.execute(sql))
if values and (len(values) > 1 or ('',) in values):
result.append(attribute)
return result
def find_attributes_values(self, **selection):
result = {}
if self.rules:
for attribute in self.all_attributes:
sql = 'SELECT DISTINCT "%s" FROM rules' % ('_' + attribute)
if selection:
sql += ' WHERE ' + \
' AND '.join('_' + i + ' = ?' for i in selection)
values = list(self._db.execute(sql,
list(selection.values())))
else:
values = list(self._db.execute(sql))
result[attribute] = values
return result
def _join_directory(self, path, rule_attributes, selection_attributes):
attributes = selection_attributes.copy()
attributes.update(rule_attributes)
fom_directory = rule_attributes.get('fom_directory')
if fom_directory:
directory = self.directories.get(fom_directory)
if directory:
return (osp.join(directory, *path.split('/')), attributes)
#return (osp.join(directory, path), attributes)
return (osp.join(*path.split('/')), attributes)
#return (path, attributes)
def allowed_formats_for_parameter(self, process_name, param):
formats = []
for rule, attributes in self.rules:
if attributes.get('fom_process') == process_name \
and attributes.get('fom_parameter') == param:
rformats = attributes.get('fom_formats', [])
for format in rformats:
if format not in formats:
formats.append(format)
return formats
[docs] def allowed_extensions_for_parameter(self, **kwargs):
'''
Either formats or (process_name and param) should be passed
kwargs
------
formats: list of str
if provided only this parameter will be used
process_name: str
name of the process
param: str
name of the process parameter
'''
if 'formats' in kwargs:
formats = list(kwargs['formats'])
elif 'process_name' in kwargs and 'param' in kwargs:
process_name = kwargs['process_name']
param = kwargs['param']
formats = self.allowed_formats_for_parameter(process_name, param)
else:
raise KeyError('Either formats or (process_name and param) should '
'be passed')
exts = set()
while formats:
format = formats.pop(0)
if format not in self.foms.formats \
and format in self.foms.format_lists:
formats += self.foms.format_lists[format]
continue
exts.add(self.foms.formats[format])
return sorted(exts)
def call_before_application_initialization(application):
try:
from traits.api import ListStr
except ImportError:
from enthought.traits.api import ListStr
application.add_trait(
'fom_path',
ListStr(descr='Path for finding file organization models'))
if application.install_directory:
application.fom_path = [osp.join(application.install_directory,
'share', 'foms')]
def call_after_application_initialization(application):
application.fom_manager = FileOrganizationModelManager(
application.fom_path)
if __name__ == '__main__':
from soma.application import Application
# First thing to do is to create an Application with name and version
app = Application('soma.fom', '1.0')
# Register module to load and call functions before and/or after
# initialization
app.plugin_modules.append('soma.fom')
# Application initialization (e.g. configuration file may be read here)
app.initialize()
# process_completion( sys.argv[1], {'spm' : '/here/is/spm', 'shared' :
# '/volatile/bouin/build/trunk/share/brainvisa-share-4.4' })
import logging
# logging.root.setLevel( logging.DEBUG )
fom = app.fom_manager.load_foms('morphologist-brainvisa-pipeline-1.0')
# atp = AttributesToPaths( fom, selection={ 'fom_process':'morphologistSimp.SimplifiedMorphologist' },
# preferred_formats=set( ('NIFTI',) ),
# debug=logging )
# form='MINC'
# form=','+'MESH'
# print('form',form)
directories = {"input_directory": "/input",
"output_directory": "/output",
"shared_directory": "/shared"}
fomr = ['NIFTI', 'MESH']
atp = AttributesToPaths(
fom, selection={'fom_process': 'morphologistPipeline.HeadMesh'},
preferred_formats=fomr, directories=directories,
debug=logging)
d = {
'protocol': u'subjects', 'analysis': 'default_analysis', 'fom_parameter': 'head_mesh',
'acquisition': 'default_acquisition', 'subject': u'002_S_0816_S18402_I40732', 'fom_format': 'fom_preferred'}
for p, a in atp.find_paths(d, debug=logging):
print('->', repr(p), a)
# for parameter in fom.patterns[ 'morphologistSimp.SimplifiedMorphologist' ]:
# print('- %s' % parameter)
# for p, a in atp.find_paths( { 'fom_parameter': parameter,
#'protocol': 'c',
#'subject': 's',
#'analysis': 'p',
#'acquisition': 'a',
#'fom_format': 'fom_preferred',
#} ):
# print(' ', repr( p ), a)