# -*- coding: utf-8 -*-
"""
Config module for :mod:`File Organization models (FOMs) <capsul.attributes.fom_completion_engine>`
Classes
=======
:class:`FomConfig`
------------------
"""
from __future__ import absolute_import
import os
import six
import traits.api as traits
from soma.fom import AttributesToPaths, PathToAttributes
from soma.application import Application
from soma.sorted_dictionary import SortedDictionary
import weakref
import capsul.engine
from functools import partial
def init_settings(capsul_engine):
with capsul_engine.settings as session:
session.ensure_module_fields(
"fom",
[
dict(name="input_fom", type="str", description="input FOM"),
dict(name="output_fom", type="str", description="output FOM"),
dict(name="shared_fom", type="str", description="shared data FOM"),
dict(
name="volumes_format",
type="str",
description="Format used for volumes",
),
dict(
name="meshes_format",
type="str",
description="Format used for meshes",
),
dict(
name="auto_fom",
type="bool",
description="Look in all FOMs when a process is not found "
"(in addition to the standard share/foms). Note that "
"auto_fom looks for the first FOM matching the process to "
"get completion for, and does not handle ambiguities. "
"Moreover it brings an overhead (typically 6-7 seconds) the "
"first time it is used since it has to parse all available "
"FOMs.",
),
dict(
name="fom_path",
type="list[str]",
description="list of additional directories where to look "
"for FOMs",
),
# FIXME: until directories are included in another config module
dict(
name="input_directory",
type="str",
description="input study data directory",
),
dict(
name="output_directory",
type="str",
description="output study data directory",
),
],
)
capsul_engine.load_module("capsul.engine.module.axon")
capsul_engine.load_module("capsul.engine.module.spm")
capsul_engine.load_module("capsul.engine.module.attributes")
with capsul_engine.settings as session:
config = session.config("fom", "global")
if not config:
values = {
capsul_engine.settings.config_id_field: "fom",
"auto_fom": True,
"fom_path": [],
}
session.new_config("fom", "global", values)
if not hasattr(capsul_engine, "_modules_data"):
capsul_engine._modules_data = {}
store = capsul_engine._modules_data.setdefault("fom", {})
store["foms"] = {}
store["all_foms"] = SortedDictionary()
store["fom_atp"] = {"all": {}}
store["fom_pta"] = {"all": {}}
capsul_engine.settings.module_notifiers["capsul.engine.module.fom"] = [
partial(fom_config_updated, weakref.proxy(capsul_engine), "global")
]
capsul_engine.settings.module_notifiers.setdefault(
"capsul.engine.module.axon", []
).append(partial(config_updated, weakref.proxy(capsul_engine), "global"))
capsul_engine.settings.module_notifiers.setdefault(
"capsul.engine.module.spm", []
).append(partial(config_updated, weakref.proxy(capsul_engine), "global"))
# link with StudyConfig
if (
hasattr(capsul_engine, "study_config")
and "FomConfig" not in capsul_engine.study_config.modules
):
scmod = capsul_engine.study_config.load_module("FomConfig", {})
scmod.initialize_module()
scmod.initialize_callbacks()
update_fom(capsul_engine, "global")
def config_dependencies(config):
return {
"axon": "any",
#'spm': 'any',
"attributes": "any",
}
def config_updated(capsul_engine, environment, param=None, value=None):
if param in (None, "directory", "shared_directory"):
update_fom(capsul_engine, environment)
def spm_config_updated(capsul_engine, environment, param=None, value=None):
if param in (None, "directory"):
update_fom(capsul_engine, environment)
def fom_config_updated(capsul_engine, environment="global", param=None, value=None):
if param in ("volumes_format", "meshes_format"):
update_formats(capsul_engine, environment)
return
if param in ("fom_path",):
reset_foms(capsul_engine, environment)
return
# otherwise use update_fom()
update_fom(capsul_engine, environment, param, value)
[docs]
def update_fom(capsul_engine, environment="global", param=None, value=None):
"""Load configured FOMs and create FOM completion data"""
# print('***update_fom ***')
with capsul_engine.settings as session:
config = session.config("fom", environment)
if config is None:
return
soma_app = Application("capsul", plugin_modules=["soma.fom"])
if "soma.fom" not in soma_app.loaded_plugin_modules:
# WARNING: this is unsafe, may erase configured things, and
# probably not thread-safe.
soma_app.initialize()
if config.fom_path:
fom_path = [
p for p in config.fom_path if p not in soma_app.fom_path
] + soma_app.fom_path
else:
fom_path = list(soma_app.fom_path)
soma_app.fom_manager.paths = fom_path
soma_app.fom_manager.fom_files()
store = capsul_engine._modules_data["fom"]
if config.auto_fom and len(store["all_foms"]) <= 3:
for schema in soma_app.fom_manager.fom_files():
if schema not in store["all_foms"]:
store["all_foms"][schema] = None # not loaded yet.
foms = (
("input", config.input_fom),
("output", config.output_fom),
("shared", config.shared_fom),
)
for fom_type, fom_filename in foms:
if fom_filename not in ("", None, traits.Undefined):
fom = store["all_foms"].get(fom_filename)
if fom is None:
fom, atp, pta = load_fom(
capsul_engine, fom_filename, config, session, environment
)
else:
atp = store["fom_atp"]["all"][fom_filename]
pta = store["fom_pta"]["all"][fom_filename]
store["foms"][fom_type] = fom
store["fom_atp"][fom_type] = atp
store["fom_pta"][fom_type] = pta
# update directories
directories = {}
spm = session.config("spm", environment)
if spm:
directories["spm"] = spm.directory
axon = session.config("axon", environment)
if axon:
directories["shared"] = axon.shared_directory
directories["input"] = config.input_directory
directories["output"] = config.output_directory
for atp in store["fom_atp"]["all"].values():
atp.directories = directories
# backward compatibility for StudyConfig
capsul_engine.study_config.modules_data.foms = store["foms"]
capsul_engine.study_config.modules_data.all_foms = store["all_foms"]
capsul_engine.study_config.modules_data.fom_atp = store["fom_atp"]
capsul_engine.study_config.modules_data.fom_pta = store["fom_pta"]
def update_formats(capsul_engine, environment):
with capsul_engine.settings as session:
config = session.config("fom", environment)
if config is None:
return
directories = {}
spm = session.config("spm", environment)
if spm:
directories["spm"] = spm.directory
axon = session.config("axon", environment)
if axon:
directories["shared"] = axon.shared_directory
directories["input"] = config.input_directory
directories["output"] = config.output_directory
with config._storage.data() as data:
fields = data[config._collection].keys()
formats = tuple(
getattr(config, key)
for key in fields
if key.endswith("_format") and getattr(config, key) is not None
)
store = capsul_engine._modules_data["fom"]
for schema, fom in store["all_foms"].items():
if fom is None:
continue
atp = AttributesToPaths(
fom,
selection={},
directories=directories,
preferred_formats=set((formats)),
)
old_atp = store["fom_atp"]["all"].get(schema)
store["fom_atp"]["all"][schema] = atp
if old_atp is not None:
for t in ("input", "output", "shared"):
if store["fom_atp"].get(t) is old_atp:
store["fom_atp"][t] = atp
def load_fom(capsul_engine, schema, config, session, environment="global"):
# print('=== load fom', schema, '===')
# import time
# t0 = time.time()
soma_app = Application("capsul", plugin_modules=["soma.fom"])
if "soma.fom" not in soma_app.loaded_plugin_modules:
# WARNING: this is unsafe, may erase configured things, and
# probably not thread-safe.
soma_app.initialize()
old_fom_path = soma_app.fom_path
if config.fom_path:
soma_app.fom_path = [
six.ensure_str(p) for p in config.fom_path if p not in soma_app.fom_path
] + soma_app.fom_path
else:
soma_app.fom_path = list(soma_app.fom_path)
try:
fom = soma_app.fom_manager.load_foms(schema)
except KeyError:
return None, None, None
soma_app.fom_path = old_fom_path
store = capsul_engine._modules_data["fom"]
store["all_foms"][schema] = fom
# Create FOM completion data
with config._storage.data() as data:
fields = data[config._collection].keys()
formats = tuple(
getattr(config, key)
for key in fields
if key.endswith("_format") and getattr(config, key) is not None
)
directories = {}
spm = session.config("spm", environment)
if spm:
directories["spm"] = spm.directory
axon = session.config("axon", environment)
if axon:
directories["shared"] = axon.shared_directory
directories["input"] = config.input_directory
directories["output"] = config.output_directory
atp = AttributesToPaths(
fom, selection={}, directories=directories, preferred_formats=set((formats))
)
store["fom_atp"]["all"][schema] = atp
pta = PathToAttributes(fom, selection={})
store["fom_pta"]["all"][schema] = pta
# print(' load fom done:', time.time() - t0, 's')
return fom, atp, pta
def reset_foms(capsul_engine, environment):
soma_app = Application("capsul", plugin_modules=["soma.fom"])
if "soma.fom" not in soma_app.loaded_plugin_modules:
# WARNING: this is unsafe, may erase configured things, and
# probably not thread-safe.
soma_app.initialize()
soma_app.fom_manager.clear_cache()
update_fom(capsul_engine, environment)