# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""The spm module provides basic functions for interfacing with matlab
and spm to access spm tools.
"""
# Standard library imports
import os
from glob import glob
# Third-party imports
import numpy as np
# Local imports
from ... import logging
from ...utils.filemanip import ensure_list, simplify_list, split_filename, load_spm_mat
from ..base import (
Bunch,
traits,
Tuple,
TraitedSpec,
File,
Directory,
OutputMultiPath,
InputMultiPath,
isdefined,
)
from .base import SPMCommand, SPMCommandInputSpec, scans_for_fnames, ImageFileSPM
__docformat__ = "restructuredtext"
iflogger = logging.getLogger("nipype.interface")
class Level1DesignInputSpec(SPMCommandInputSpec):
spm_mat_dir = Directory(
exists=True, field="dir", desc="directory to store SPM.mat file (opt)"
)
timing_units = traits.Enum(
"secs",
"scans",
field="timing.units",
desc="units for specification of onsets",
mandatory=True,
)
interscan_interval = traits.Float(
field="timing.RT", desc="Interscan interval in secs", mandatory=True
)
microtime_resolution = traits.Int(
field="timing.fmri_t", desc=("Number of time-bins per scan in secs (opt)")
)
microtime_onset = traits.Float(
field="timing.fmri_t0",
desc=("The onset/time-bin in seconds for alignment (opt)"),
)
session_info = traits.Any(
field="sess",
desc=("Session specific information generated by ``modelgen.SpecifyModel``"),
mandatory=True,
)
factor_info = traits.List(
traits.Dict(traits.Enum("name", "levels")),
field="fact",
desc=("Factor specific information file (opt)"),
)
bases = traits.Dict(
traits.Enum("hrf", "fourier", "fourier_han", "gamma", "fir"),
field="bases",
desc="""\
Dictionary names of the basis function to parameters:
* hrf
* derivs -- (2-element list) Model HRF Derivatives. No derivatives: [0,0],
Time derivatives : [1,0], Time and Dispersion derivatives: [1,1]
* fourier, fourier_han, gamma, or fir:
* length -- (int) Post-stimulus window length (in seconds)
* order -- (int) Number of basis functions
""",
mandatory=True,
)
volterra_expansion_order = traits.Enum(
1, 2, field="volt", desc=("Model interactions - no:1, yes:2")
)
global_intensity_normalization = traits.Enum(
"none",
"scaling",
field="global",
desc=("Global intensity normalization - scaling or none"),
)
mask_image = File(
exists=True, field="mask", desc="Image for explicitly masking the analysis"
)
mask_threshold = traits.Either(
traits.Enum("-Inf"),
traits.Float(),
desc="Thresholding for the mask",
default="-Inf",
usedefault=True,
)
model_serial_correlations = traits.Enum(
"AR(1)",
"FAST",
"none",
field="cvi",
desc=(
"Model serial correlations "
"AR(1), FAST or none. FAST "
"is available in SPM12"
),
)
flags = traits.Dict(
desc="Additional arguments to the job, e.g., a common SPM operation is to "
"modify the default masking threshold (mthresh)"
)
class Level1DesignOutputSpec(TraitedSpec):
spm_mat_file = File(exists=True, desc="SPM mat file")
class Level1Design(SPMCommand):
"""Generate an SPM design matrix
http://www.fil.ion.ucl.ac.uk/spm/doc/manual.pdf#page=59
Examples
--------
>>> level1design = Level1Design()
>>> level1design.inputs.timing_units = 'secs'
>>> level1design.inputs.interscan_interval = 2.5
>>> level1design.inputs.bases = {'hrf':{'derivs': [0,0]}}
>>> level1design.inputs.session_info = 'session_info.npz'
>>> level1design.inputs.flags = {'mthresh': 0.4}
>>> level1design.run() # doctest: +SKIP
"""
input_spec = Level1DesignInputSpec
output_spec = Level1DesignOutputSpec
_jobtype = "stats"
_jobname = "fmri_spec"
def _format_arg(self, opt, spec, val):
"""Convert input to appropriate format for spm"""
if opt in ["spm_mat_dir", "mask_image"]:
return np.array([str(val)], dtype=object)
if opt in ["session_info"]: # , 'factor_info']:
if isinstance(val, dict):
return [val]
else:
return val
return super()._format_arg(opt, spec, val)
def _parse_inputs(self):
"""validate spm realign options if set to None ignore"""
einputs = super()._parse_inputs(skip=("mask_threshold", "flags"))
if isdefined(self.inputs.flags):
einputs[0].update(self.inputs.flags)
for sessinfo in einputs[0]["sess"]:
sessinfo["scans"] = scans_for_fnames(
ensure_list(sessinfo["scans"]), keep4d=False
)
if not isdefined(self.inputs.spm_mat_dir):
einputs[0]["dir"] = np.array([str(os.getcwd())], dtype=object)
return einputs
def _make_matlab_command(self, content):
"""validates spm options and generates job structure
if mfile is True uses matlab .m file
else generates a job structure and saves in .mat
"""
if isdefined(self.inputs.mask_image):
# SPM doesn't handle explicit masking properly, especially
# when you want to use the entire mask image
postscript = "load SPM;\n"
postscript += "SPM.xM.VM = spm_vol('%s');\n" % simplify_list(
self.inputs.mask_image
)
postscript += "SPM.xM.I = 0;\n"
postscript += "SPM.xM.T = [];\n"
postscript += (
"SPM.xM.TH = ones(size(SPM.xM.TH))*(%s);\n" % self.inputs.mask_threshold
)
postscript += "SPM.xM.xs = struct('Masking', 'explicit masking only');\n"
postscript += "save SPM SPM;\n"
else:
postscript = None
return super()._make_matlab_command(content, postscript=postscript)
def _list_outputs(self):
outputs = self._outputs().get()
spm = os.path.join(os.getcwd(), "SPM.mat")
outputs["spm_mat_file"] = spm
return outputs
class EstimateModelInputSpec(SPMCommandInputSpec):
spm_mat_file = File(
exists=True,
field="spmmat",
copyfile=True,
mandatory=True,
desc="Absolute path to SPM.mat",
)
estimation_method = traits.Dict(
traits.Enum("Classical", "Bayesian2", "Bayesian"),
field="method",
mandatory=True,
desc=("Dictionary of either Classical: 1, Bayesian: 1, or Bayesian2: 1 (dict)"),
)
write_residuals = traits.Bool(
field="write_residuals", desc="Write individual residual images"
)
flags = traits.Dict(desc="Additional arguments")
class EstimateModelOutputSpec(TraitedSpec):
mask_image = ImageFileSPM(exists=True, desc="binary mask to constrain estimation")
beta_images = OutputMultiPath(
ImageFileSPM(exists=True), desc="design parameter estimates"
)
residual_image = ImageFileSPM(
exists=True, desc="Mean-squared image of the residuals"
)
residual_images = OutputMultiPath(
ImageFileSPM(exists=True),
desc="individual residual images (requires `write_residuals`",
)
RPVimage = ImageFileSPM(exists=True, desc="Resels per voxel image")
spm_mat_file = File(exists=True, desc="Updated SPM mat file")
labels = ImageFileSPM(exists=True, desc="label file")
SDerror = OutputMultiPath(
ImageFileSPM(exists=True), desc="Images of the standard deviation of the error"
)
ARcoef = OutputMultiPath(
ImageFileSPM(exists=True), desc="Images of the AR coefficient"
)
Cbetas = OutputMultiPath(
ImageFileSPM(exists=True), desc="Images of the parameter posteriors"
)
SDbetas = OutputMultiPath(
ImageFileSPM(exists=True),
desc="Images of the standard deviation of parameter posteriors",
)
con_images = OutputMultiPath(
File(exists=True),
desc=(
"contrast images from a t-contrast "
"(created if factor_info used in Level1Design)"
),
)
spmT_images = OutputMultiPath(
File(exists=True),
desc=(
"stat images from a t-contrast"
"(created if factor_info used in Level1Design)"
),
)
ess_images = OutputMultiPath(
File(exists=True),
desc=(
"contrast images from an F-contrast"
"(created if factor_info used in Level1Design)"
),
)
spmF_images = OutputMultiPath(
File(exists=True),
desc=(
"stat images from an F-contrast"
"(created if factor_info used in Level1Design)"
),
)
class EstimateModel(SPMCommand):
"""Use spm_spm to estimate the parameters of a model
http://www.fil.ion.ucl.ac.uk/spm/doc/manual.pdf#page=69
Examples
--------
>>> est = EstimateModel()
>>> est.inputs.spm_mat_file = 'SPM.mat'
>>> est.inputs.estimation_method = {'Classical': 1}
>>> est.run() # doctest: +SKIP
"""
input_spec = EstimateModelInputSpec
output_spec = EstimateModelOutputSpec
_jobtype = "stats"
_jobname = "fmri_est"
def _format_arg(self, opt, spec, val):
"""Convert input to appropriate format for spm"""
if opt == "spm_mat_file":
return np.array([str(val)], dtype=object)
if opt == "estimation_method":
if isinstance(val, (str, bytes)):
return {f"{val}": 1}
else:
return val
return super()._format_arg(opt, spec, val)
def _parse_inputs(self):
"""validate spm realign options if set to None ignore"""
einputs = super()._parse_inputs(skip=("flags"))
if isdefined(self.inputs.flags):
einputs[0].update(self.inputs.flags)
return einputs
def _list_outputs(self):
outputs = self._outputs().get()
pth = os.path.dirname(self.inputs.spm_mat_file)
outtype = "nii" if self.version.split(".")[0] in ["12", "25"] else "img"
spm = load_spm_mat(self.inputs.spm_mat_file, struct_as_record=False)
betas = [vbeta.fname[0] for vbeta in spm["SPM"][0, 0].Vbeta[0]]
if (
"Bayesian" in self.inputs.estimation_method
or "Bayesian2" in self.inputs.estimation_method
):
outputs["labels"] = os.path.join(pth, f"labels.{outtype}")
outputs["SDerror"] = glob(os.path.join(pth, "Sess*_SDerror*"))
outputs["ARcoef"] = glob(os.path.join(pth, "Sess*_AR_*"))
if betas:
outputs["Cbetas"] = [os.path.join(pth, f"C{beta}") for beta in betas]
outputs["SDbetas"] = [os.path.join(pth, f"SD{beta}") for beta in betas]
if "Classical" in self.inputs.estimation_method:
outputs["residual_image"] = os.path.join(pth, f"ResMS.{outtype}")
outputs["RPVimage"] = os.path.join(pth, f"RPV.{outtype}")
if self.inputs.write_residuals:
outputs["residual_images"] = glob(os.path.join(pth, "Res_*"))
if betas:
outputs["beta_images"] = [os.path.join(pth, beta) for beta in betas]
# When 'factor_info' is used in Level1Design
# spm automatically creates contrast
try:
contrast = [c.Vcon[0][0].fname[0] for c in spm["SPM"][0, 0].xCon[0]]
contrast_spm = [c.Vspm[0][0].fname[0] for c in spm["SPM"][0, 0].xCon[0]]
except Exception:
contrast = []
contrast_spm = []
if contrast:
outputs["con_images"] = [
os.path.join(pth, cont) for cont in contrast if 'con' in cont
]
outputs["ess_images"] = [
os.path.join(pth, cont) for cont in contrast if 'ess' in cont
]
if contrast_spm:
outputs["spmT_images"] = [
os.path.join(pth, cont) for cont in contrast_spm if 'spmT' in cont
]
outputs["spmF_images"] = [
os.path.join(pth, cont) for cont in contrast_spm if 'spmF' in cont
]
outputs["mask_image"] = os.path.join(pth, f"mask.{outtype}")
outputs["spm_mat_file"] = os.path.join(pth, "SPM.mat")
return outputs
class EstimateContrastInputSpec(SPMCommandInputSpec):
spm_mat_file = File(
exists=True,
field="spmmat",
desc="Absolute path to SPM.mat",
copyfile=True,
mandatory=True,
)
contrasts = traits.List(
traits.Either(
Tuple(
traits.Str,
traits.Enum("T"),
traits.List(traits.Str),
traits.List(traits.Float),
),
Tuple(
traits.Str,
traits.Enum("T"),
traits.List(traits.Str),
traits.List(traits.Float),
traits.List(traits.Float),
),
Tuple(
traits.Str,
traits.Enum("F"),
traits.List(
traits.Either(
Tuple(
traits.Str,
traits.Enum("T"),
traits.List(traits.Str),
traits.List(traits.Float),
),
Tuple(
traits.Str,
traits.Enum("T"),
traits.List(traits.Str),
traits.List(traits.Float),
traits.List(traits.Float),
),
)
),
),
),
desc="""List of contrasts with each contrast being a list of the form:
[('name', 'stat', [condition list], [weight list], [session list])]
If session list is None or not provided, all sessions are used. For
F contrasts, the condition list should contain previously defined
T-contrasts.""",
mandatory=True,
)
beta_images = InputMultiPath(
File(exists=True),
desc=("Parameter estimates of the design matrix"),
copyfile=False,
mandatory=True,
)
residual_image = File(
exists=True,
desc="Mean-squared image of the residuals",
copyfile=False,
mandatory=True,
)
use_derivs = traits.Bool(
desc="use derivatives for estimation", xor=["group_contrast"]
)
group_contrast = traits.Bool(desc="higher level contrast", xor=["use_derivs"])
class EstimateContrastOutputSpec(TraitedSpec):
con_images = OutputMultiPath(
File(exists=True), desc="contrast images from a t-contrast"
)
spmT_images = OutputMultiPath(
File(exists=True), desc="stat images from a t-contrast"
)
ess_images = OutputMultiPath(
File(exists=True), desc="contrast images from an F-contrast"
)
spmF_images = OutputMultiPath(
File(exists=True), desc="stat images from an F-contrast"
)
spm_mat_file = File(exists=True, desc="Updated SPM mat file")
class EstimateContrast(SPMCommand):
"""Use spm_contrasts to estimate contrasts of interest
Examples
--------
>>> import nipype.interfaces.spm as spm
>>> est = spm.EstimateContrast()
>>> est.inputs.spm_mat_file = 'SPM.mat'
>>> cont1 = ('Task>Baseline','T', ['Task-Odd','Task-Even'],[0.5,0.5])
>>> cont2 = ('Task-Odd>Task-Even','T', ['Task-Odd','Task-Even'],[1,-1])
>>> contrasts = [cont1,cont2]
>>> est.inputs.contrasts = contrasts
>>> est.run() # doctest: +SKIP
"""
input_spec = EstimateContrastInputSpec
output_spec = EstimateContrastOutputSpec
_jobtype = "stats"
_jobname = "con"
def _make_matlab_command(self, _):
"""Validate spm options and generate job structure."""
contrasts = []
cname = []
for i, cont in enumerate(self.inputs.contrasts):
cname.insert(i, cont[0])
contrasts.insert(
i,
Bunch(
name=cont[0],
stat=cont[1],
conditions=cont[2],
weights=None,
sessions=None,
),
)
if len(cont) >= 4:
contrasts[i].weights = cont[3]
if len(cont) >= 5:
contrasts[i].sessions = cont[4]
script = ["""\
%% generated by nipype.interfaces.spm
spm_defaults;
jobs{1}.stats{1}.con.spmmat = {'%s'};
load(jobs{1}.stats{1}.con.spmmat{:});
SPM.swd = '%s';
save(jobs{1}.stats{1}.con.spmmat{:},'SPM');
[msg,id] = lastwarn('');
if strcmp(id,'MATLAB:save:sizeTooBigForMATFile')
save(jobs{1}.stats{1}.con.spmmat{:},'SPM','-v7.3');
end
names = SPM.xX.name;""" % (self.inputs.spm_mat_file, os.getcwd())]
# get names for columns
if isdefined(self.inputs.group_contrast) and self.inputs.group_contrast:
script += ["condnames=names;"]
else:
if self.inputs.use_derivs:
script += [r"pat = 'Sn\([0-9]*\) (.*)';"]
else:
script += [
r"pat = 'Sn\([0-9]*\) (.*)\*bf\(1\)|Sn\([0-9]*\) "
r".*\*bf\([2-9]\)|Sn\([0-9]*\) (.*)';"
]
script += ["t = regexp(names,pat,'tokens');"]
# get sessidx for columns
script += [r"pat1 = 'Sn\(([0-9].*)\)\s.*';"]
script += ["t1 = regexp(names,pat1,'tokens');"]
script += ["""\
for i0=1:numel(t)
condnames{i0}='';
condsess(i0)=0;
if ~isempty(t{i0}{1})
condnames{i0} = t{i0}{1}{1};
condsess(i0)=str2num(t1{i0}{1}{1});
end;
end;"""]
# BUILD CONTRAST SESSION STRUCTURE
for i, contrast in enumerate(contrasts):
if contrast.stat == "T":
script += ["consess{%d}.tcon.name = '%s';" % (i + 1, contrast.name)]
script += ["consess{%d}.tcon.convec = zeros(1,numel(names));" % (i + 1)]
for c0, cond in enumerate(contrast.conditions):
script += ["idx = strmatch('%s',condnames,'exact');" % cond]
script += ["""\
if isempty(idx)
throw(MException('CondName:Chk', sprintf('Condition %%s not found in design','%s')));
end;""" % cond]
if contrast.sessions:
for sno, sw in enumerate(contrast.sessions):
script += ["sidx = find(condsess(idx)==%d);" % (sno + 1)]
script += [
"consess{%d}.tcon.convec(idx(sidx)) = %f;"
% (i + 1, sw * contrast.weights[c0])
]
else:
script += [
"consess{%d}.tcon.convec(idx) = %f;"
% (i + 1, contrast.weights[c0])
]
for i, contrast in enumerate(contrasts):
if contrast.stat == "F":
script += ["consess{%d}.fcon.name = '%s';" % (i + 1, contrast.name)]
for cl0, fcont in enumerate(contrast.conditions):
tidx = cname.index(fcont[0])
script += [
"consess{%d}.fcon.convec{%d} = consess{%d}.tcon.convec;"
% (i + 1, cl0 + 1, tidx + 1)
]
script += ["jobs{1}.stats{1}.con.consess = consess;"]
script += ["""\
if strcmp(spm('ver'),'SPM8')
spm_jobman('initcfg');
jobs=spm_jobman('spm5tospm8',{jobs});
end;"""]
script += ["spm_jobman('run',jobs);"]
return "\n".join(script)
def _list_outputs(self):
outputs = self._outputs().get()
pth, _ = os.path.split(self.inputs.spm_mat_file)
spm = load_spm_mat(self.inputs.spm_mat_file, struct_as_record=False)
con_images = []
spmT_images = []
for con in spm["SPM"][0, 0].xCon[0]:
con_images.append(str(os.path.join(pth, con.Vcon[0, 0].fname[0])))
spmT_images.append(str(os.path.join(pth, con.Vspm[0, 0].fname[0])))
if con_images:
outputs["con_images"] = con_images
outputs["spmT_images"] = spmT_images
spm12 = "12" in self.version.split(".")[0]
if spm12:
ess = glob(os.path.join(pth, "ess*.nii"))
else:
ess = glob(os.path.join(pth, "ess*.img"))
if len(ess) > 0:
outputs["ess_images"] = sorted(ess)
if spm12:
spmf = glob(os.path.join(pth, "spmF*.nii"))
else:
spmf = glob(os.path.join(pth, "spmF*.img"))
if len(spmf) > 0:
outputs["spmF_images"] = sorted(spmf)
outputs["spm_mat_file"] = self.inputs.spm_mat_file
return outputs
class ThresholdInputSpec(SPMCommandInputSpec):
spm_mat_file = File(
exists=True, desc="absolute path to SPM.mat", copyfile=True, mandatory=True
)
stat_image = File(exists=True, desc="stat image", copyfile=False, mandatory=True)
contrast_index = traits.Int(
mandatory=True, desc="which contrast in the SPM.mat to use"
)
use_fwe_correction = traits.Bool(
True,
usedefault=True,
desc=(
"whether to use FWE (Bonferroni) "
"correction for initial threshold "
"(height_threshold_type has to be "
"set to p-value)"
),
)
use_vox_fdr_correction = traits.Bool(
False,
usedefault=True,
desc=(
"whether to use voxel-based FDR "
"correction for initial threshold "
"(height_threshold_type has to be "
"set to q-value)"
),
)
use_topo_fdr = traits.Bool(
True,
usedefault=True,
desc=("whether to use FDR over cluster extent probabilities"),
)
height_threshold = traits.Float(
0.05,
usedefault=True,
desc=("value for initial thresholding (defining clusters)"),
)
height_threshold_type = traits.Enum(
"p-value",
"stat",
usedefault=True,
desc=("Is the cluster forming threshold a stat value or p-value?"),
)
extent_fdr_p_threshold = traits.Float(
0.05,
usedefault=True,
desc=("p threshold on FDR corrected cluster size probabilities"),
)
extent_threshold = traits.Int(
0, usedefault=True, desc="Minimum cluster size in voxels"
)
force_activation = traits.Bool(
False,
usedefault=True,
desc=(
"In case no clusters survive the "
"topological inference step this "
"will pick a culster with the highest "
"sum of t-values. Use with care."
),
)
class ThresholdOutputSpec(TraitedSpec):
thresholded_map = File(exists=True)
n_clusters = traits.Int()
pre_topo_fdr_map = File(exists=True)
pre_topo_n_clusters = traits.Int()
activation_forced = traits.Bool()
cluster_forming_thr = traits.Float()
[docs]
class Threshold(SPMCommand):
"""Topological FDR thresholding based on cluster extent/size. Smoothness is
estimated from GLM residuals but is assumed to be the same for all of the
voxels.
Examples
--------
>>> thresh = Threshold()
>>> thresh.inputs.spm_mat_file = 'SPM.mat'
>>> thresh.inputs.stat_image = 'spmT_0001.img'
>>> thresh.inputs.contrast_index = 1
>>> thresh.inputs.extent_fdr_p_threshold = 0.05
>>> thresh.run() # doctest: +SKIP
"""
input_spec = ThresholdInputSpec
output_spec = ThresholdOutputSpec
[docs]
def _gen_thresholded_map_filename(self):
_, fname, ext = split_filename(self.inputs.stat_image)
return os.path.abspath(fname + "_thr" + ext)
[docs]
def _gen_pre_topo_map_filename(self):
_, fname, ext = split_filename(self.inputs.stat_image)
return os.path.abspath(fname + "_pre_topo_thr" + ext)
[docs]
def _make_matlab_command(self, _):
script = "con_index = %d;\n" % self.inputs.contrast_index
script += "cluster_forming_thr = %f;\n" % self.inputs.height_threshold
if self.inputs.use_fwe_correction and self.inputs.use_vox_fdr_correction:
raise ValueError(
"'use_fwe_correction' and 'use_vox_fdr_correction' can't both be True"
)
if self.inputs.use_fwe_correction and not self.inputs.use_vox_fdr_correction:
script += "thresDesc = 'FWE';\n"
elif self.inputs.use_vox_fdr_correction and not self.inputs.use_fwe_correction:
script += "thresDesc = 'FDR';\n"
else:
script += "thresDesc = 'none';\n"
if self.inputs.use_topo_fdr:
script += "use_topo_fdr = 1;\n"
else:
script += "use_topo_fdr = 0;\n"
if self.inputs.force_activation:
script += "force_activation = 1;\n"
else:
script += "force_activation = 0;\n"
script += (
"cluster_extent_p_fdr_thr = %f;\n" % self.inputs.extent_fdr_p_threshold
)
script += "stat_filename = '%s';\n" % self.inputs.stat_image
script += "height_threshold_type = '%s';\n" % self.inputs.height_threshold_type
script += "extent_threshold = %d;\n" % self.inputs.extent_threshold
script += "load %s;\n" % self.inputs.spm_mat_file
script += """
FWHM = SPM.xVol.FWHM;
df = [SPM.xCon(con_index).eidf SPM.xX.erdf];
STAT = SPM.xCon(con_index).STAT;
VspmSv = cat(1,SPM.xCon(con_index).Vspm);
R = SPM.xVol.R;
S = SPM.xVol.S;
n = 1;
switch thresDesc
case 'FWE'
cluster_forming_thr = spm_uc(cluster_forming_thr,df,STAT,R,n,S);
case 'FDR'
cluster_forming_thr = spm_uc_FDR(cluster_forming_thr,df,STAT,n,VspmSv,0);
case 'none'
if strcmp(height_threshold_type, 'p-value')
cluster_forming_thr = spm_u(cluster_forming_thr^(1/n),df,STAT);
end
end
stat_map_vol = spm_vol(stat_filename);
[stat_map_data, stat_map_XYZmm] = spm_read_vols(stat_map_vol);
Z = stat_map_data(:)';
[x,y,z] = ind2sub(size(stat_map_data),(1:numel(stat_map_data))');
XYZ = cat(1, x', y', z');
XYZth = XYZ(:, Z >= cluster_forming_thr);
Zth = Z(Z >= cluster_forming_thr);
"""
script += (
"spm_write_filtered(Zth,XYZth,stat_map_vol.dim',"
"stat_map_vol.mat,'thresholded map', '%s');\n"
) % self._gen_pre_topo_map_filename()
script += """
max_size = 0;
max_size_index = 0;
th_nclusters = 0;
nclusters = 0;
if isempty(XYZth)
thresholded_XYZ = [];
thresholded_Z = [];
else
if use_topo_fdr
V2R = 1/prod(FWHM(stat_map_vol.dim > 1));
[uc,Pc,ue] = spm_uc_clusterFDR(cluster_extent_p_fdr_thr,df,STAT,R,n,Z,XYZ,V2R,cluster_forming_thr);
end
voxel_labels = spm_clusters(XYZth);
nclusters = max(voxel_labels);
thresholded_XYZ = [];
thresholded_Z = [];
for i = 1:nclusters
cluster_size = sum(voxel_labels==i);
if cluster_size > extent_threshold && (~use_topo_fdr || (cluster_size - uc) > -1)
thresholded_XYZ = cat(2, thresholded_XYZ, XYZth(:,voxel_labels == i));
thresholded_Z = cat(2, thresholded_Z, Zth(voxel_labels == i));
th_nclusters = th_nclusters + 1;
end
if force_activation
cluster_sum = sum(Zth(voxel_labels == i));
if cluster_sum > max_size
max_size = cluster_sum;
max_size_index = i;
end
end
end
end
activation_forced = 0;
if isempty(thresholded_XYZ)
if force_activation && max_size ~= 0
thresholded_XYZ = XYZth(:,voxel_labels == max_size_index);
thresholded_Z = Zth(voxel_labels == max_size_index);
th_nclusters = 1;
activation_forced = 1;
else
thresholded_Z = [0];
thresholded_XYZ = [1 1 1]';
th_nclusters = 0;
end
end
fprintf('activation_forced = %d\\n',activation_forced);
fprintf('pre_topo_n_clusters = %d\\n',nclusters);
fprintf('n_clusters = %d\\n',th_nclusters);
fprintf('cluster_forming_thr = %f\\n',cluster_forming_thr);
"""
script += (
"spm_write_filtered(thresholded_Z,thresholded_XYZ,"
"stat_map_vol.dim',stat_map_vol.mat,'thresholded map',"
" '%s');\n"
) % self._gen_thresholded_map_filename()
return script
[docs]
def aggregate_outputs(self, runtime=None):
outputs = self._outputs()
outputs.thresholded_map = self._gen_thresholded_map_filename()
outputs.pre_topo_fdr_map = self._gen_pre_topo_map_filename()
for line in runtime.stdout.split("\n"):
if line.startswith("activation_forced = "):
outputs.activation_forced = (
line[len("activation_forced = ") :].strip() == "1"
)
elif line.startswith("n_clusters = "):
outputs.n_clusters = int(line[len("n_clusters = ") :].strip())
elif line.startswith("pre_topo_n_clusters = "):
outputs.pre_topo_n_clusters = int(
line[len("pre_topo_n_clusters = ") :].strip()
)
elif line.startswith("cluster_forming_thr = "):
outputs.cluster_forming_thr = float(
line[len("cluster_forming_thr = ") :].strip()
)
return outputs
[docs]
def _list_outputs(self):
outputs = self._outputs().get()
outputs["thresholded_map"] = self._gen_thresholded_map_filename()
outputs["pre_topo_fdr_map"] = self._gen_pre_topo_map_filename()
return outputs
class ThresholdStatisticsInputSpec(SPMCommandInputSpec):
spm_mat_file = File(
exists=True, desc="absolute path to SPM.mat", copyfile=True, mandatory=True
)
stat_image = File(exists=True, desc="stat image", copyfile=False, mandatory=True)
contrast_index = traits.Int(
mandatory=True, desc="which contrast in the SPM.mat to use"
)
height_threshold = traits.Float(
desc=("stat value for initial thresholding (defining clusters)"), mandatory=True
)
extent_threshold = traits.Int(
0, usedefault=True, desc="Minimum cluster size in voxels"
)
class ThresholdStatisticsOutputSpec(TraitedSpec):
voxelwise_P_Bonf = traits.Float()
voxelwise_P_RF = traits.Float()
voxelwise_P_uncor = traits.Float()
voxelwise_P_FDR = traits.Float()
clusterwise_P_RF = traits.Float()
clusterwise_P_FDR = traits.Float()
class ThresholdStatistics(SPMCommand):
"""Given height and cluster size threshold calculate theoretical
probabilities concerning false positives
Examples
--------
>>> thresh = ThresholdStatistics()
>>> thresh.inputs.spm_mat_file = 'SPM.mat'
>>> thresh.inputs.stat_image = 'spmT_0001.img'
>>> thresh.inputs.contrast_index = 1
>>> thresh.inputs.height_threshold = 4.56
>>> thresh.run() # doctest: +SKIP
"""
input_spec = ThresholdStatisticsInputSpec
output_spec = ThresholdStatisticsOutputSpec
def _make_matlab_command(self, _):
script = "con_index = %d;\n" % self.inputs.contrast_index
script += "cluster_forming_thr = %f;\n" % self.inputs.height_threshold
script += "stat_filename = '%s';\n" % self.inputs.stat_image
script += "extent_threshold = %d;\n" % self.inputs.extent_threshold
script += "load '%s'\n" % self.inputs.spm_mat_file
script += """
FWHM = SPM.xVol.FWHM;
df = [SPM.xCon(con_index).eidf SPM.xX.erdf];
STAT = SPM.xCon(con_index).STAT;
R = SPM.xVol.R;
S = SPM.xVol.S;
n = 1;
voxelwise_P_Bonf = spm_P_Bonf(cluster_forming_thr,df,STAT,S,n)
voxelwise_P_RF = spm_P_RF(1,0,cluster_forming_thr,df,STAT,R,n)
stat_map_vol = spm_vol(stat_filename);
[stat_map_data, stat_map_XYZmm] = spm_read_vols(stat_map_vol);
Z = stat_map_data(:);
Zum = Z;
switch STAT
case 'Z'
VPs = (1-spm_Ncdf(Zum)).^n;
voxelwise_P_uncor = (1-spm_Ncdf(cluster_forming_thr)).^n
case 'T'
VPs = (1 - spm_Tcdf(Zum,df(2))).^n;
voxelwise_P_uncor = (1 - spm_Tcdf(cluster_forming_thr,df(2))).^n
case 'X'
VPs = (1-spm_Xcdf(Zum,df(2))).^n;
voxelwise_P_uncor = (1-spm_Xcdf(cluster_forming_thr,df(2))).^n
case 'F'
VPs = (1 - spm_Fcdf(Zum,df)).^n;
voxelwise_P_uncor = (1 - spm_Fcdf(cluster_forming_thr,df)).^n
end
VPs = sort(VPs);
voxelwise_P_FDR = spm_P_FDR(cluster_forming_thr,df,STAT,n,VPs)
V2R = 1/prod(FWHM(stat_map_vol.dim > 1));
clusterwise_P_RF = spm_P_RF(1,extent_threshold*V2R,cluster_forming_thr,df,STAT,R,n)
[x,y,z] = ind2sub(size(stat_map_data),(1:numel(stat_map_data))');
XYZ = cat(1, x', y', z');
[u, CPs, ue] = spm_uc_clusterFDR(0.05,df,STAT,R,n,Z,XYZ,V2R,cluster_forming_thr);
clusterwise_P_FDR = spm_P_clusterFDR(extent_threshold*V2R,df,STAT,R,n,cluster_forming_thr,CPs')
"""
return script
def aggregate_outputs(self, runtime=None, needed_outputs=None):
outputs = self._outputs()
cur_output = ""
for line in runtime.stdout.split("\n"):
if cur_output != "" and len(line.split()) != 0:
setattr(outputs, cur_output, float(line))
cur_output = ""
continue
if len(line.split()) != 0 and line.split()[0] in [
"clusterwise_P_FDR",
"clusterwise_P_RF",
"voxelwise_P_Bonf",
"voxelwise_P_FDR",
"voxelwise_P_RF",
"voxelwise_P_uncor",
]:
cur_output = line.split()[0]
continue
return outputs
class FactorialDesignInputSpec(SPMCommandInputSpec):
spm_mat_dir = Directory(
exists=True, field="dir", desc="directory to store SPM.mat file (opt)"
)
# Need to make an alias of InputMultiPath; the inputs below are not Path
covariates = InputMultiPath(
traits.Dict(
key_trait=traits.Enum("vector", "name", "interaction", "centering")
),
field="cov",
desc=("covariate dictionary {vector, name, interaction, centering}"),
)
threshold_mask_none = traits.Bool(
field="masking.tm.tm_none",
xor=["threshold_mask_absolute", "threshold_mask_relative"],
desc="do not use threshold masking",
)
threshold_mask_absolute = traits.Float(
field="masking.tm.tma.athresh",
xor=["threshold_mask_none", "threshold_mask_relative"],
desc="use an absolute threshold",
)
threshold_mask_relative = traits.Float(
field="masking.tm.tmr.rthresh",
xor=["threshold_mask_absolute", "threshold_mask_none"],
desc=("threshold using a proportion of the global value"),
)
use_implicit_threshold = traits.Bool(
field="masking.im", desc=("use implicit mask NaNs or zeros to threshold")
)
explicit_mask_file = File(
field="masking.em", # requires cell
desc="use an implicit mask file to threshold",
)
global_calc_omit = traits.Bool(
field="globalc.g_omit",
xor=["global_calc_mean", "global_calc_values"],
desc="omit global calculation",
)
global_calc_mean = traits.Bool(
field="globalc.g_mean",
xor=["global_calc_omit", "global_calc_values"],
desc="use mean for global calculation",
)
global_calc_values = traits.List(
traits.Float,
field="globalc.g_user.global_uval",
xor=["global_calc_mean", "global_calc_omit"],
desc="omit global calculation",
)
no_grand_mean_scaling = traits.Bool(
field="globalm.gmsca.gmsca_no", desc=("do not perform grand mean scaling")
)
global_normalization = traits.Enum(
1,
2,
3,
field="globalm.glonorm",
desc=("global normalization None-1, Proportional-2, ANCOVA-3"),
)
class FactorialDesignOutputSpec(TraitedSpec):
spm_mat_file = File(exists=True, desc="SPM mat file")
class FactorialDesign(SPMCommand):
"""Base class for factorial designs
http://www.fil.ion.ucl.ac.uk/spm/doc/manual.pdf#page=77
"""
input_spec = FactorialDesignInputSpec
output_spec = FactorialDesignOutputSpec
_jobtype = "stats"
_jobname = "factorial_design"
def _format_arg(self, opt, spec, val):
"""Convert input to appropriate format for spm"""
if opt in ["spm_mat_dir", "explicit_mask_file"]:
return np.array([str(val)], dtype=object)
if opt in ["covariates"]:
outlist = []
mapping = {
"name": "cname",
"vector": "c",
"interaction": "iCFI",
"centering": "iCC",
}
for dictitem in val:
outdict = {}
for key, keyval in list(dictitem.items()):
outdict[mapping[key]] = keyval
outlist.append(outdict)
return outlist
return super()._format_arg(opt, spec, val)
def _parse_inputs(self):
"""validate spm realign options if set to None ignore"""
einputs = super()._parse_inputs()
if not isdefined(self.inputs.spm_mat_dir):
einputs[0]["dir"] = np.array([str(os.getcwd())], dtype=object)
return einputs
def _list_outputs(self):
outputs = self._outputs().get()
spm = os.path.join(os.getcwd(), "SPM.mat")
outputs["spm_mat_file"] = spm
return outputs
class OneSampleTTestDesignInputSpec(FactorialDesignInputSpec):
in_files = traits.List(
File(exists=True),
field="des.t1.scans",
mandatory=True,
minlen=2,
desc="input files",
)
class OneSampleTTestDesign(FactorialDesign):
"""Create SPM design for one sample t-test
Examples
--------
>>> ttest = OneSampleTTestDesign()
>>> ttest.inputs.in_files = ['cont1.nii', 'cont2.nii']
>>> ttest.run() # doctest: +SKIP
"""
input_spec = OneSampleTTestDesignInputSpec
def _format_arg(self, opt, spec, val):
"""Convert input to appropriate format for spm"""
if opt in ["in_files"]:
return np.array(val, dtype=object)
return super()._format_arg(opt, spec, val)
class TwoSampleTTestDesignInputSpec(FactorialDesignInputSpec):
# very unlikely that you will have a single image in one group, so setting
# parameters to require at least two files in each group [SG]
group1_files = traits.List(
File(exists=True),
field="des.t2.scans1",
mandatory=True,
minlen=2,
desc="Group 1 input files",
)
group2_files = traits.List(
File(exists=True),
field="des.t2.scans2",
mandatory=True,
minlen=2,
desc="Group 2 input files",
)
dependent = traits.Bool(
field="des.t2.dept", desc=("Are the measurements dependent between levels")
)
unequal_variance = traits.Bool(
field="des.t2.variance",
desc=("Are the variances equal or unequal between groups"),
)
class TwoSampleTTestDesign(FactorialDesign):
"""Create SPM design for two sample t-test
Examples
--------
>>> ttest = TwoSampleTTestDesign()
>>> ttest.inputs.group1_files = ['cont1.nii', 'cont2.nii']
>>> ttest.inputs.group2_files = ['cont1a.nii', 'cont2a.nii']
>>> ttest.run() # doctest: +SKIP
"""
input_spec = TwoSampleTTestDesignInputSpec
def _format_arg(self, opt, spec, val):
"""Convert input to appropriate format for spm"""
if opt in ["group1_files", "group2_files"]:
return np.array(val, dtype=object)
return super()._format_arg(opt, spec, val)
class PairedTTestDesignInputSpec(FactorialDesignInputSpec):
paired_files = traits.List(
traits.List(File(exists=True), minlen=2, maxlen=2),
field="des.pt.pair",
mandatory=True,
minlen=2,
desc="List of paired files",
)
grand_mean_scaling = traits.Bool(
field="des.pt.gmsca", desc="Perform grand mean scaling"
)
ancova = traits.Bool(
field="des.pt.ancova", desc="Specify ancova-by-factor regressors"
)
class PairedTTestDesign(FactorialDesign):
"""Create SPM design for paired t-test
Examples
--------
>>> pttest = PairedTTestDesign()
>>> pttest.inputs.paired_files = [['cont1.nii','cont1a.nii'],['cont2.nii','cont2a.nii']]
>>> pttest.run() # doctest: +SKIP
"""
input_spec = PairedTTestDesignInputSpec
def _format_arg(self, opt, spec, val):
"""Convert input to appropriate format for spm"""
if opt in ["paired_files"]:
return [dict(scans=np.array(files, dtype=object)) for files in val]
return super()._format_arg(opt, spec, val)
class MultipleRegressionDesignInputSpec(FactorialDesignInputSpec):
in_files = traits.List(
File(exists=True),
field="des.mreg.scans",
mandatory=True,
minlen=2,
desc="List of files",
)
include_intercept = traits.Bool(
True,
field="des.mreg.incint",
usedefault=True,
desc="Include intercept in design",
)
user_covariates = InputMultiPath(
traits.Dict(key_trait=traits.Enum("vector", "name", "centering")),
field="des.mreg.mcov",
desc=("covariate dictionary {vector, name, centering}"),
)
class MultipleRegressionDesign(FactorialDesign):
"""Create SPM design for multiple regression
Examples
--------
>>> mreg = MultipleRegressionDesign()
>>> mreg.inputs.in_files = ['cont1.nii','cont2.nii']
>>> mreg.run() # doctest: +SKIP
"""
input_spec = MultipleRegressionDesignInputSpec
def _format_arg(self, opt, spec, val):
"""Convert input to appropriate format for spm"""
if opt in ["in_files"]:
return np.array(val, dtype=object)
if opt in ["user_covariates"]:
outlist = []
mapping = {"name": "cname", "vector": "c", "centering": "iCC"}
for dictitem in val:
outdict = {}
for key, keyval in list(dictitem.items()):
outdict[mapping[key]] = keyval
outlist.append(outdict)
return outlist
return super()._format_arg(opt, spec, val)