"""File-indexing functionality. """
import os
import json
import re
from collections import defaultdict
from pathlib import Path
from functools import partial, lru_cache
from bids_validator import BIDSValidator
from ..utils import listify, make_bidsfile
from ..exceptions import BIDSConflictingValuesError
from .models import Config, Entity, Tag, FileAssociation, _create_tag_dict
from .validation import validate_indexing_args
def _regexfy(patt, root=None):
if hasattr(patt, "search"):
return patt
patt = Path(patt)
if patt.is_absolute():
patt = str(patt.relative_to(root or "/"))
return re.compile(r"^/" + str(patt) + r".*")
def _extract_entities(bidsfile, entities):
match_vals = {}
for e in entities.values():
m = e.match_file(bidsfile)
if m is None and e.mandatory:
break
if m is not None:
match_vals[e.name] = (e, m)
return match_vals
def _check_path_matches_patterns(path, patterns, root=None):
"""Check if the path matches at least one of the provided patterns. """
if not patterns:
return False
path = path.absolute()
if root is not None:
path = Path("/") / path.relative_to(root)
# Path now can be downcast to str
path = str(path)
for patt in patterns:
if patt.search(path):
return True
return False
def _validate_path(path, incl_patt=None, excl_patt=None, root=None):
if _check_path_matches_patterns(path, incl_patt, root=root):
return True
if _check_path_matches_patterns(path, excl_patt, root=root):
return False
[docs]
class BIDSLayoutIndexer:
""" Indexer class for BIDSLayout.
Parameters
----------
validate : bool, optional
If True, all files are checked for BIDS compliance when first indexed,
and non-compliant files are ignored. This provides a convenient way to
restrict file indexing to only those files defined in the "core" BIDS
spec, as setting ``validate=True`` will lead noncompliant files
like ``sub-01/nonbidsfile.txt`` to be ignored.
ignore : str or SRE_Pattern or list
Path(s) to exclude from indexing. Each path is either a string or a
SRE_Pattern object (i.e., compiled regular expression). If a string is
passed, it must be either an absolute path, or be relative to the BIDS
project root. If an SRE_Pattern is passed, the contained regular
expression will be matched against the full (absolute) path of all
files and directories. By default, indexing ignores all files in
'code/', 'stimuli/', 'sourcedata/', 'models/', and any hidden
files/dirs beginning with '.' at root level.
force_index : str or SRE_Pattern or list
Path(s) to forcibly index in the BIDSLayout, even if they would
otherwise fail validation. See the documentation for the ignore
argument for input format details. Note that paths in force_index takes
precedence over those in ignore (i.e., if a file matches both ignore
and force_index, it *will* be indexed).
Note: NEVER include 'derivatives' here; use the derivatives argument
(or :obj:`bids.layout.BIDSLayout.add_derivatives`) for that.
index_metadata : bool
If True, all metadata files are indexed. If False, metadata will not be
available (but indexing will be faster).
config_filename : str
Optional name of filename within directories
that contains configuration information.
**filters
keyword arguments passed to the .get() method of a
:obj:`bids.layout.BIDSLayout` object. These keyword arguments define
what files get selected for metadata indexing.
"""
[docs]
def __init__(
self,
validate=False,
ignore=None,
force_index=None,
index_metadata=True,
config_filename='layout_config.json',
**filters,
):
self.ignore = ignore
self.force_index = force_index
self.index_metadata = index_metadata
self.config_filename = config_filename
self.filters = filters
self.validator = None
if validate:
self.validator = BIDSValidator(index_associated=True)
# Layout-dependent attributes to be set in __call__()
self._layout = None
self._config = None
self._include_patterns = None
self._exclude_patterns = None
def __call__(self, layout):
self._layout = layout
self._config = list(layout.config.values())
ignore, force = validate_indexing_args(self.ignore, self.force_index,
self._layout._root)
# Do not accept string patterns
self._include_patterns = [
_regexfy(patt, root=self._layout._root) for patt in listify(force)
]
self._exclude_patterns = [
_regexfy(patt, root=self._layout._root) for patt in listify(ignore)
]
all_bfs, all_tag_dicts = self._index_dir(self._layout._root, self._config)
self.session.bulk_save_objects(all_bfs)
self.session.bulk_insert_mappings(Tag, all_tag_dicts)
self.session.commit()
if self.index_metadata:
self._index_metadata()
@property
def session(self):
return self._layout.connection_manager.session
def _validate_file(self, f):
matched_patt = _validate_path(
f,
incl_patt=self._include_patterns,
excl_patt=self._exclude_patterns,
root=self._layout._root
)
if matched_patt is not None:
return matched_patt
if self.validator is None:
return True
# BIDS validator expects absolute paths, but really these are relative
# to the BIDS project root.
to_check = f.relative_to(self._layout._root)
# Pretend the path is an absolute path
to_check = Path('/') / to_check
# bids-validator works with posix paths only
to_check = to_check.as_posix()
return self.validator.is_bids(to_check)
def _index_dir(self, path, config, force=None):
abs_path = self._layout._root / path
# Derivative directories must always be added separately
if self._layout._root.joinpath('derivatives') in abs_path.parents:
return [], []
config = list(config) # Shallow copy
# Check for additional config file in directory
layout_file = self.config_filename
config_file = abs_path / layout_file
if config_file.exists():
cfg = Config.load(config_file, session=self.session)
config.append(cfg)
# Track which entities are valid in filenames for this directory
config_entities = {}
for c in config:
config_entities.update(c.entities)
# Get lists of 1st-level subdirectories and files in the path directory
_, dirnames, filenames = next(os.walk(path))
# If layout configuration file exists, delete it
if self.config_filename in filenames:
filenames.remove(self.config_filename)
all_bfs = []
all_tag_dicts = []
for f in filenames:
abs_fn = path / f
# Skip files that fail validation, unless forcibly indexing
if force or self._validate_file(abs_fn):
bf, tag_dicts = self._index_file(abs_fn, config_entities)
all_tag_dicts += tag_dicts
all_bfs.append(bf)
# Recursively index subdirectories
for d in dirnames:
d = path / d
force = _validate_path(
d,
incl_patt=self._include_patterns,
excl_patt=self._exclude_patterns,
root=self._layout._root,
)
if force is not False:
dir_bfs, dir_tag_dicts = self._index_dir(d, config, force=force)
all_bfs += dir_bfs
all_tag_dicts += dir_tag_dicts
return all_bfs, all_tag_dicts
def _index_file(self, abs_fn, entities):
"""Create DB record for file and its tags. """
bf = make_bidsfile(abs_fn)
# Extract entity values
match_vals = {}
for e in entities.values():
m = e.match_file(bf)
if m is None and e.mandatory:
break
if m is not None:
match_vals[e.name] = (e, m)
# Create Entity <=> BIDSFile mappings
tag_dicts = [
_create_tag_dict(bf, ent, val, ent._dtype)
for ent, val in match_vals.values()
]
return bf, tag_dicts
def _index_metadata(self):
"""Index metadata for all files in the BIDS dataset.
"""
filters = self.filters
if filters:
# ensure we are returning objects
filters['return_type'] = 'object'
if filters.get(ext_key):
filters[ext_key] = listify(filters[ext_key])
# ensure json files are being indexed
if '.json' not in filters[ext_key]:
filters[ext_key].append('.json')
# Process JSON files first if we're indexing metadata
all_files = self._layout.get(absolute_paths=True, **filters)
# Track ALL entities we've seen in file names or metadatas
all_entities = {}
for c in self._config:
all_entities.update(c.entities)
# If key/value pairs in JSON files duplicate ones extracted from files,
# we can end up with Tag collisions in the DB. To prevent this, we
# store all filename/entity pairs and the value, and then check against
# that before adding each new Tag.
all_tags = {}
for t in self.session.query(Tag).all():
key = '{}_{}'.format(t.file_path, t.entity_name)
all_tags[key] = str(t.value)
# We build up a store of all file data as we iterate files. It looks
# like: { extension/suffix: dirname: [(entities, payload)]}}.
# The payload is left empty for non-JSON files.
file_data = {}
# Memoizing JSON loader
# Use as a function to allow lazy loading so only read JSON files
# if they correspond to data files that are indexed
@lru_cache(maxsize=None)
def load_json(path):
with open(path, 'r', encoding='utf-8') as handle:
try:
return json.load(handle)
except (UnicodeDecodeError, json.JSONDecodeError) as e:
raise OSError(
"Error occurred while trying to decode JSON "
f"from file {path}"
) from e
filenames = []
for bf in all_files:
if 'suffix' in bf.entities and 'extension' in bf.entities:
file_ents = bf.entities.copy()
suffix = file_ents.pop('suffix')
ext = file_ents.pop('extension')
key = "{}/{}".format(ext, suffix)
if key not in file_data:
file_data[key] = defaultdict(list)
payload = None
if ext == '.json':
payload = partial(load_json, bf.path)
else:
filenames.append(bf)
to_store = (file_ents, payload, bf.path)
file_data[key][bf._dirname].append(to_store)
# To avoid integrity errors, track primary keys we've seen
seen_assocs = set()
def create_association_pair(src, dst, kind, kind2=None):
objs = []
kind2 = kind2 or kind
pk1 = '#'.join([src, dst, kind])
if pk1 not in seen_assocs:
objs.append(FileAssociation(src=src, dst=dst, kind=kind))
seen_assocs.add(pk1)
pk2 = '#'.join([dst, src, kind2])
if pk2 not in seen_assocs:
objs.append((FileAssociation(src=dst, dst=src, kind=kind2)))
seen_assocs.add(pk2)
return objs
all_objs = []
all_tag_dicts = []
for bf in filenames:
file_ents = bf.entities.copy()
suffix = file_ents.pop('suffix', None)
ext = file_ents.pop('extension', None)
file_ent_keys = set(file_ents.keys())
if suffix is None or ext is None:
continue
# Extract metadata associated with the file. The idea is
# that we loop over parent directories, and if we find
# payloads in the file_data store (indexing by directory
# and current file suffix), we check to see if the
# candidate JS file's entities are entirely consumed by
# the current file. If so, it's a valid candidate, and we
# add the payload to the stack. Finally, we invert the
# stack and merge the payloads in order.
ext_key = "{}/{}".format(ext, suffix)
json_key = ".json/{}".format(suffix)
dirname = bf._dirname
payloads = []
ancestors = []
while True:
# Get JSON payloads
json_data = file_data.get(json_key, {}).get(dirname, [])
for js_ents, js_md, js_path in json_data:
js_keys = set(js_ents.keys())
if js_keys - file_ent_keys:
continue
matches = [js_ents[name] == file_ents[name]
for name in js_keys]
if all(matches):
payloads.append((js_md, js_path))
# Get all files this file inherits from
candidates = file_data.get(ext_key, {}).get(dirname, [])
for ents, _, path in candidates:
keys = set(ents.keys())
if keys - file_ent_keys:
continue
matches = [ents[name] == file_ents[name] for name in keys]
if all(matches):
ancestors.append(path)
parent = dirname.parent
if parent == dirname:
break
dirname = parent
if not payloads:
continue
# Missing data files can tolerate absent metadata files,
# but we will try to load it anyway
virtual_datafile = not bf._path.exists()
# Create DB records for metadata associations
js_file = payloads[0][1]
all_objs += create_association_pair(js_file, bf.path, 'Metadata')
# Consolidate metadata by looping over inherited JSON files
file_md = {}
for pl, js_file in payloads[::-1]:
try:
file_md.update(pl())
except FileNotFoundError:
if not virtual_datafile:
raise
# Drop metadata if any files are missing
# Otherwise missing overrides could give misleading metadata
file_md = {}
break
# Create FileAssociation records for JSON inheritance
n_pl = len(payloads)
for i, (pl, js_file) in enumerate(payloads):
if (i + 1) < n_pl:
other = payloads[i + 1][1]
all_objs += create_association_pair(js_file, other, 'Child', 'Parent')
# Inheritance for current file
n_pl = len(ancestors)
for i, src in enumerate(ancestors):
if (i + 1) < n_pl:
dst = ancestors[i + 1]
all_objs += create_association_pair(src, dst, 'Child', 'Parent')
# Files with IntendedFor field always get mapped to targets
intended = listify(file_md.get('IntendedFor', []))
for target in intended:
# Per spec, IntendedFor paths are relative to sub dir.
target = self._layout._root.joinpath(
'sub-{}'.format(bf.entities['subject']),
target)
all_objs += create_association_pair(bf.path, str(target), 'IntendedFor',
'InformedBy')
# Link files to BOLD runs
if suffix in ['physio', 'stim', 'events', 'sbref']:
images = self._layout.get(
extension=['.nii', '.nii.gz'], suffix='bold',
return_type='filename', **file_ents)
for img in images:
all_objs += create_association_pair(bf.path, img, 'IntendedFor',
'InformedBy')
# Link files to DWI runs
if suffix == 'sbref' or ext in ['bvec', 'bval']:
images = self._layout.get(
extension=['.nii', '.nii.gz'], suffix='dwi',
return_type='filename', **file_ents)
for img in images:
all_objs += create_association_pair(bf.path, img, 'IntendedFor',
'InformedBy')
# Create Tag <-> Entity mappings, and any newly discovered Entities
for md_key, md_val in file_md.items():
# Treat null entries (deserialized to None) as absent
# Alternative is to cast None to null in layout.models._create_tag_dict
if md_val is None:
continue
tag_string = '{}_{}'.format(bf.path, md_key)
# Skip pairs that were already found in the filenames
if tag_string in all_tags:
file_val = all_tags[tag_string]
if str(md_val) != file_val:
msg = (
"Conflicting values found for entity '{}' in "
"filename {} (value='{}') versus its JSON sidecar "
"(value='{}'). Please reconcile this discrepancy."
)
raise BIDSConflictingValuesError(
msg.format(md_key, bf.path, file_val, md_val))
continue
if md_key not in all_entities:
all_entities[md_key] = Entity(md_key)
self.session.add(all_entities[md_key])
tag = _create_tag_dict(bf, all_entities[md_key], md_val, is_metadata=True)
all_tag_dicts.append(tag)
self.session.bulk_save_objects(all_objs)
self.session.bulk_insert_mappings(Tag, all_tag_dicts)
self.session.commit()