""" Model classes used in BIDSLayouts. """
import re
import os
from pathlib import Path
import warnings
import json
from copy import deepcopy
from itertools import chain
from functools import lru_cache
from collections import UserDict
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy import Column, String, Boolean, ForeignKey, Table
from sqlalchemy.orm import reconstructor, relationship, backref, object_session
try:
from sqlalchemy.orm import declarative_base
except ImportError: # sqlalchemy < 1.4
from sqlalchemy.ext.declarative import declarative_base
from ..utils import listify
from .writing import build_path, write_to_file
from ..config import get_option
from .utils import BIDSMetadata, PaddedInt
from ..exceptions import BIDSChildDatasetError
Base = declarative_base()
class LayoutInfo(Base):
""" Contains information about a BIDSLayout's initialization parameters."""
__tablename__ = 'layout_info'
root = Column(String, primary_key=True)
absolute_paths = Column(Boolean)
_derivatives = Column(String)
_config = Column(String)
def __init__(self, **kwargs):
init_args = self._sanitize_init_args(kwargs)
raw_cols = ['root', 'absolute_paths']
json_cols = ['derivatives', 'config']
all_cols = raw_cols + json_cols
missing_cols = set(all_cols) - set(init_args.keys())
if missing_cols:
raise ValueError("Missing mandatory initialization args: {}"
.format(missing_cols))
for col in all_cols:
setattr(self, col, init_args[col])
if col in json_cols:
json_data = json.dumps(init_args[col])
setattr(self, '_' + col, json_data)
@reconstructor
def _init_on_load(self):
for col in ['derivatives', 'config']:
db_val = getattr(self, '_' + col)
setattr(self, col, json.loads(db_val))
def _sanitize_init_args(self, kwargs):
""" Prepare initialization arguments for serialization """
if 'root' in kwargs:
kwargs['root'] = str(Path(kwargs['root']).absolute())
if 'config' in kwargs and isinstance(kwargs['config'], list):
kwargs['config'] = [
str(Path(config).absolute())
if isinstance(config, os.PathLike) else config
for config in kwargs['config']
]
# Get abspaths
if kwargs.get('derivatives') not in (None, True, False):
kwargs['derivatives'] = [
str(Path(der).absolute())
for der in listify(kwargs['derivatives'])
]
return kwargs
def __repr__(self):
return f"<LayoutInfo {self.root}>"
[docs]
class Config(Base):
"""Container for BIDS configuration information.
Parameters
----------
name : str
The name to give the Config (e.g., 'bids').
entities : list
A list of dictionaries containing entity configuration
information.
default_path_patterns : list
Optional list of patterns used to build new paths.
session : :obj:`sqlalchemy.orm.session.Session` or None
An optional SQLAlchemy session. If passed,
the session is used to update the database with any newly created
Entity objects. If None, no database update occurs.
"""
__tablename__ = 'configs'
name = Column(String, primary_key=True)
_default_path_patterns = Column(String)
entities = relationship(
"Entity", secondary="config_to_entity_map",
collection_class=attribute_mapped_collection('name'))
def __init__(self, name, entities=None, default_path_patterns=None,
session=None):
self.name = name
self.default_path_patterns = default_path_patterns
self._default_path_patterns = json.dumps(default_path_patterns)
if entities:
for ent in entities:
if session is not None:
existing = (session.query(Config)
.filter_by(name=ent['name']).first())
else:
existing = None
ent = existing or Entity(**ent)
self.entities[ent.name] = ent
if session is not None:
session.add_all(list(self.entities.values()))
session.commit()
@reconstructor
def _init_on_load(self):
self.default_path_patterns = json.loads(self._default_path_patterns)
@classmethod
def load(self, config, session=None):
"""Load a Config instance from the passed configuration data.
Parameters
----------
config : str or dict
A string or dict containing configuration information.
Must be one of:
* A string giving the name of a predefined config file
(e.g., 'bids' or 'derivatives')
* A path to a JSON file containing config information
* A dictionary containing config information
session : :obj:`sqlalchemy.orm.session.Session` or None
An optional SQLAlchemy Session instance.
If passed, the session is used to check the database for (and
return) an existing Config with name defined in config['name'].
Returns
-------
A Config instance.
"""
if isinstance(config, (str, Path)):
config_paths = get_option('config_paths')
if config in config_paths:
config = config_paths[config]
if not Path(config).exists():
raise ValueError("{} is not a valid path.".format(config))
else:
with open(config, 'r') as f:
config = json.load(f)
# Return existing Config record if one exists
if session is not None:
result = session.query(Config).filter_by(name=config['name']).first()
if result:
return result
return Config(session=session, **config)
def __repr__(self):
return f"<Config {self.name}>"
[docs]
class BIDSFile(Base):
"""Represents a single file or directory in a BIDS dataset.
Parameters
----------
filename : str
The path to the corresponding file.
"""
__tablename__ = 'files'
path = Column(String, primary_key=True)
filename = Column(String)
dirname = Column(String)
entities = association_proxy("tags", "value")
is_dir = Column(Boolean, index=True)
class_ = Column(String(20))
_associations = relationship('BIDSFile', secondary='associations',
primaryjoin='FileAssociation.dst == BIDSFile.path',
secondaryjoin='FileAssociation.src == BIDSFile.path')
__mapper_args__ = {
'polymorphic_on': class_,
'polymorphic_identity': 'file'
}
def __init__(self, filename):
self.path = str(filename)
self.filename = self._path.name
self.dirname = str(self._path.parent)
self.is_dir = not self.filename
@property
def _path(self):
return Path(self.path)
@property
def _dirname(self):
return Path(self.dirname)
def __getattr__(self, attr):
# Ensures backwards compatibility with old File_ namedtuple, which is
# deprecated as of 0.7.
# _ check first to not mask away access to __setstate__ etc.
# AFAIK None of the entities are allowed to start with _ anyways
# so the check is more generic than __
if not attr.startswith('_') and attr in self.entities:
warnings.warn("Accessing entities as attributes is deprecated as "
"of 0.7. Please use the .entities dictionary instead"
" (i.e., .entities['%s'] instead of .%s."
% (attr, attr))
return self.entities[attr]
raise AttributeError("%s object has no attribute named %r" %
(self.__class__.__name__, attr))
def __repr__(self):
return "<{} filename='{}'>".format(self.__class__.__name__, self.path)
def __fspath__(self):
return self.path
@property
@lru_cache()
def relpath(self):
"""Return path relative to layout root"""
root = object_session(self).query(LayoutInfo).first().root
return str(Path(self.path).relative_to(root))
def get_associations(self, kind=None, include_parents=False):
"""Get associated files, optionally limiting by association kind.
Parameters
----------
kind : str
The kind of association to return (e.g., "Child").
By default, all associations are returned.
include_parents : bool
If True, files related through inheritance
are included in the returned list. If False, only directly
associated files are returned. For example, a file's JSON
sidecar will always be returned, but other JSON files from
which the sidecar inherits will only be returned if
include_parents=True.
Returns
-------
list
A list of BIDSFile instances.
"""
if kind is None and not include_parents:
return self._associations
session = object_session(self)
q = (session.query(BIDSFile)
.join(FileAssociation, BIDSFile.path == FileAssociation.dst)
.filter_by(src=self.path))
if kind is not None:
q = q.filter_by(kind=kind)
associations = q.all()
if not include_parents:
return associations
def collect_associations(results, bidsfile):
results.append(bidsfile)
for p in bidsfile.get_associations('Child'):
results = collect_associations(results, p)
return results
return list(chain(*[collect_associations([], bf) for bf in associations]))
def get_metadata(self):
"""Return all metadata associated with the current file. """
md = BIDSMetadata(self.path)
md.update(self.get_entities(metadata=True))
return md
def get_entities(self, metadata=False, values='tags'):
"""Return entity information for the current file.
Parameters
----------
metadata : bool or None
If False (default), only entities defined
for filenames (and not those found in the JSON sidecar) are
returned. If True, only entities found in metadata files (and not
defined for filenames) are returned. If None, all available
entities are returned.
values : str
The kind of object to return in the dict's values.
Must be one of:
* 'tags': Returns only the tagged value--e.g., if the key
is "subject", the value might be "01".
* 'objects': Returns the corresponding Entity instance.
Returns
-------
dict
A dict, where keys are entity names and values are Entity
instances.
"""
if metadata is None and values == 'tags':
return self.entities
session = object_session(self)
query = (session.query(Tag)
.filter_by(file_path=self.path)
.join(Entity))
if metadata not in (None, 'all'):
query = query.filter(Tag.is_metadata == metadata)
results = query.all()
if values.startswith('obj'):
return {t.entity_name: t.entity for t in results}
return {t.entity_name: t.value for t in results}
def copy(self, path_patterns, symbolic_link=False, root=None,
conflicts='fail'):
"""Copy the contents of a file to a new location.
Parameters
----------
path_patterns : list
List of patterns used to construct the new
filename. See :obj:`build_path` documentation for details.
symbolic_link : bool
If True, use a symbolic link to point to the
existing file. If False, creates a new file.
root : str
Optional path to prepend to the constructed filename.
conflicts : str
Defines the desired action when the output path already exists.
Must be one of:
'fail': raises an exception
'skip' does nothing
'overwrite': overwrites the existing file
'append': adds a suffix to each file copy, starting with 1
"""
new_filename = build_path(self.entities, path_patterns)
if not new_filename:
return None
if new_filename[-1] == os.sep:
new_filename += self.filename
if self._path.is_absolute() or root is None:
path = self._path
else:
path = Path(root) / self._path
if not path.exists():
raise ValueError("Target filename to copy/symlink (%s) doesn't "
"exist." % path)
kwargs = dict(path=new_filename, root=root, conflicts=conflicts)
if symbolic_link:
kwargs['link_to'] = path
else:
kwargs['copy_from'] = path
write_to_file(**kwargs)
[docs]
class BIDSDataFile(BIDSFile):
"""Represents a single data file in a BIDS dataset.
Derived from `BIDSFile` and provides additional functionality such as
obtaining pandas DataFrame data representation (via `get_df`).
"""
__mapper_args__ = {
'polymorphic_identity': 'data_file'
}
def get_df(self, include_timing=True, adjust_onset=False,
enforce_dtypes=True, **pd_args):
"""Return the contents of a tsv file as a pandas DataFrame.
Parameters
----------
include_timing : bool
If True, adds an "onset" column to dense
timeseries files (e.g., *_physio.tsv.gz).
adjust_onset : bool
If True, the onset of each sample in a dense
timeseries file is shifted to reflect the "StartTime" value in
the JSON sidecar. If False, the first sample starts at 0 secs.
Ignored if include_timing=False.
enforce_dtypes : bool
If True, enforces the data types defined in
the BIDS spec on core columns (e.g., subject_id and session_id
must be represented as strings).
pd_args : dict
Optional keyword arguments to pass onto pd.read_csv().
Returns
-------
:obj:`pandas.DataFrame`
A pandas DataFrame.
"""
import pandas as pd
import numpy as np
if enforce_dtypes:
dtype = {
'subject_id': str,
'session_id': str,
'participant_id': str
}
else:
dtype = None
# TODO: memoize this for efficiency. (Note: caching is insufficient,
# because the dtype enforcement will break if we ignore the value of
# enforce_dtypes).
suffix = self.entities['suffix']
header = None if suffix in {'physio', 'stim'} else 'infer'
self.data = pd.read_csv(self.path, sep='\t', na_values='n/a',
dtype=dtype, header=header, **pd_args)
data = self.data.copy()
if self.entities['extension'] == '.tsv.gz':
md = self.get_metadata()
# We could potentially include some validation here, but that seems
# like a job for the BIDS Validator.
data.columns = md['Columns']
if include_timing:
onsets = np.arange(len(data)) / md['SamplingFrequency']
if adjust_onset:
onsets += md['StartTime']
data.insert(0, 'onset', onsets)
return data
class BIDSImageFile(BIDSFile):
"""Represents a single neuroimaging data file in a BIDS dataset.
Derived from `BIDSFile` and provides additional functionality such as
obtaining nibabel's image file representation (via `get_image`).
"""
__mapper_args__ = {
'polymorphic_identity': 'image_file'
}
def get_image(self, **kwargs):
"""Return the associated image file (if it exists) as a NiBabel object
Any keyword arguments are passed to ``nibabel.load``.
"""
try:
import nibabel as nb
return nb.load(self.path, **kwargs)
except Exception as e:
raise ValueError("'{}' does not appear to be an image format "
"NiBabel can read.".format(self.path)) from e
[docs]
class BIDSJSONFile(BIDSFile):
"""Represents a single JSON metadata file in a BIDS dataset.
Derived from `BIDSFile` and provides additional functionality for reading
the contents of JSON files as either dicts or strings.
"""
__mapper_args__ = {
'polymorphic_identity': 'json_file'
}
def get_dict(self):
"""Return the contents of the current file as a dictionary. """
d = json.loads(self.get_json())
if not isinstance(d, dict):
raise ValueError("File %s is a json containing %s, not a dict which was expected" % (self.path, type(d)))
return d
def get_json(self):
"""Return the contents of the current file as a JSON string. """
with open(self.path, 'r') as f:
return f.read()
[docs]
class Entity(Base):
"""
Represents a single entity defined in the JSON config.
Parameters
----------
name : str
The name of the entity (e.g., 'subject', 'run', etc.)
pattern : str
A regex pattern used to match against file names.
Must define at least one group, and only the first group is
kept as the match.
mandatory : bool
If True, every File _must_ match this entity.
directory : str
Optional pattern defining a directory associated
with the entity.
dtype : str
The optional data type of the Entity values. Must be
one of 'int', 'float', 'bool', or 'str'. If None, no type
enforcement will be attempted, which means the dtype of the
value may be unpredictable.
"""
__tablename__ = 'entities'
name = Column(String, primary_key=True)
mandatory = Column(Boolean, default=False)
pattern = Column(String)
directory = Column(String, nullable=True)
_dtype = Column(String, default='str')
files = association_proxy("tags", "value")
def __init__(self, name, pattern=None, mandatory=False, directory=None,
dtype='str'):
self.name = name
self.pattern = pattern
self.mandatory = mandatory
self.directory = directory
if not isinstance(dtype, str):
dtype = dtype.__name__
self._dtype = dtype
self._init_on_load()
def __repr__(self):
return f"<Entity {self.name} (pattern={self.pattern}, dtype={self.dtype})>"
@reconstructor
def _init_on_load(self):
if self._dtype not in ('str', 'float', 'int', 'bool'):
raise ValueError("Invalid dtype '{}'. Must be one of 'int', "
"'float', 'bool', or 'str'.".format(self._dtype))
if self._dtype == "int":
self.dtype = PaddedInt
else:
self.dtype = eval(self._dtype)
self.regex = re.compile(self.pattern) if self.pattern is not None else None
def __iter__(self):
yield from self.unique()
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
# Directly copy the SQLAlchemy connection before any setattr calls,
# otherwise failures occur sporadically on Python 3.5 when the
# _sa_instance_state attribute (randomly!) disappears.
result._sa_instance_state = self._sa_instance_state
memo[id(self)] = result
for k, v in self.__dict__.items():
if k == '_sa_instance_state':
continue
new_val = getattr(self, k) if k == 'regex' else deepcopy(v, memo)
setattr(result, k, new_val)
return result
def match_file(self, f):
"""
Determine whether the passed file matches the Entity.
Parameters
----------
f : BIDSFile
The BIDSFile instance to match against.
Returns
-------
the matched value if a match was found, otherwise None.
"""
if self.regex is None:
return None
m = self.regex.search(f.path)
val = m.group(1) if m is not None else None
return self._astype(val)
def unique(self):
"""Return all unique values/levels for the current entity.
"""
return list(set(self.files.values()))
def count(self, files=False):
"""Return a count of unique values or files.
Parameters
----------
files : bool
When True, counts all files mapped to the Entity.
When False, counts all unique values.
Returns
-------
int
Count of unique values or files.
"""
return len(self.files) if files else len(self.unique())
def _astype(self, val):
if val is not None and self.dtype is not None:
val = self.dtype(val)
return val
type_map = {
'str': str,
'int': PaddedInt,
'float': float,
'bool': bool,
'json': 'json',
}
[docs]
class Tag(Base):
"""Represents an association between a File and an Entity.
Parameters
----------
file : BIDSFile
The associated BIDSFile.
entity : Entity
The associated Entity.
value : json-serializable type
The value to store for this file/entity pair. Must be of type
str, int, float, bool, or any json-serializable structure.
dtype : str
Optional type for the value field. If None, inferred from
value. If passed, must be one of str, int, float, bool, or json.
Any other value will be treated as json (and will fail if the
value can't be serialized to json).
is_metadata : bool
Indicates whether or not the Entity is derived
from JSON sidecars (True) or is a predefined Entity from a
config (False).
"""
__tablename__ = 'tags'
file_path = Column(String, ForeignKey('files.path'), primary_key=True)
entity_name = Column(String, ForeignKey('entities.name'), primary_key=True)
_value = Column(String, nullable=False)
_dtype = Column(String, default='str')
is_metadata = Column(Boolean, default=False)
file = relationship('BIDSFile', backref=backref(
"tags", collection_class=attribute_mapped_collection("entity_name")))
entity = relationship('Entity', backref=backref(
"tags", collection_class=attribute_mapped_collection("file_path")))
def __init__(self, file, entity, value, dtype=None, is_metadata=False):
data = _create_tag_dict(file, entity, value, dtype, is_metadata)
self.file_path = data['file_path']
self.entity_name = data['entity_name']
self._dtype = data['_dtype']
self._value = data['_value']
self.is_metadata = data['is_metadata']
self.dtype = type_map[self._dtype]
if self._dtype != 'json':
self.value = self.dtype(value)
else:
self.value = value
def __repr__(self):
msg = "<Tag file:{!r} entity:{!r} value:{!r}>"
return msg.format(self.file_path, self.entity_name, self.value)
@reconstructor
def _init_on_load(self):
if self._dtype == 'json':
self.value = json.loads(self._value)
self.dtype = 'json'
elif self._dtype == 'bool':
self.value = self._value == 'True'
self.dtype = bool
else:
self.dtype = type_map[self._dtype]
self.value = self.dtype(self._value)
def _create_tag_dict(file, entity, value, dtype=None, is_metadata=False):
data = {}
if dtype is None:
dtype = type(value)
if not isinstance(dtype, str):
dtype = dtype.__name__
if dtype in ['list', 'dict']:
_dtype = 'json'
_value = json.dumps(value)
else:
_dtype = dtype
_value = str(value)
if _dtype not in ('str', 'float', 'int', 'bool', 'json'):
raise ValueError(
f"Passed value has an invalid dtype ({dtype}). Must be one of "
"int, float, bool, or str.")
data['is_metadata'] = is_metadata
data['file_path'] = file.path
data['entity_name'] = entity.name
data['_dtype'] = _dtype
data['_value'] = _value
return data
class FileAssociation(Base):
__tablename__ = 'associations'
src = Column(String, ForeignKey('files.path'), primary_key=True)
dst = Column(String, ForeignKey('files.path'), primary_key=True)
kind = Column(String, primary_key=True)
# Association objects
config_to_entity_map = Table('config_to_entity_map', Base.metadata,
Column('config', String, ForeignKey('configs.name')),
Column('entity', String, ForeignKey('entities.name'))
)
class DerivativeDatasets(UserDict):
def __getitem__(self, key):
try:
return super().__getitem__(key)
except KeyError:
pass
try:
result = self.get_pipeline(key)
warnings.warn(
"Directly selecting derivative datasets using "
"pipeline name (i.e. dataset.derivatives[<pipeline_name>] will be "
"phased out in an upcoming release. Select instead using the folder "
"name of the dataset (i.e. dataset.derivatives[<folder_name>]), or use "
"dataset.derivatives.get_pipeline(<pipeline_name>).",
DeprecationWarning,
)
return result
except KeyError as err:
raise KeyError(
f"No datasets found matching {key} either as a pipeline name or as "
"a dataset file name."
) from err
def get_pipeline(self, pipeline):
matches = {
(name, dataset) for name, dataset in self.data.items()
if dataset.source_pipeline == pipeline
}
if len(matches) > 1:
datasets = "\n\t- ".join(match[0] for match in matches)
raise BIDSChildDatasetError(
f"Multiple datasets generated by {pipeline} were found:\n"
f"\t- {datasets}\n\n"
"Select a specific dataset by using "
"dataset.derivatives[<dataset_folder_name>]."
)
if not matches:
raise KeyError(f"No match found for {pipeline}")
return next(iter(matches))[1]