Source code for bids.variables.collections

"""Classes and functions related to the management of sets of BIDSVariables."""
from copy import copy
import warnings
import re
from collections import OrderedDict
from itertools import chain
import fnmatch

import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype

from .variables import (
    SparseRunVariable,
    SimpleVariable,
    DenseRunVariable,
    merge_variables,
    BIDSVariable,
)
from bids.utils import listify


[docs] class BIDSVariableCollection: """A container for one or more variables extracted from variable files at a single level of analysis. Parameters ---------- variables : list A list of BIDSVariables or SimpleVariables. name : str Optional name to assign to the collection. Notes ----- Variables in the list must all share the same analysis level, which must be one of 'session', 'subject', or 'dataset' level. For run-level Variables, use the BIDSRunVariableCollection. """
[docs] def __init__(self, variables, name=None): self.name = name if not variables: raise ValueError("No variables were provided") SOURCE_TO_LEVEL = { "events": "run", "physio": "run", "stim": "run", "regressors": "run", "scans": "session", "sessions": "subject", "participants": "dataset", } var_levels = set( [ SOURCE_TO_LEVEL[v.source] if v.source in SOURCE_TO_LEVEL else v.source for v in variables ] ) # TODO: relax this requirement & allow implicit merging between levels if len(var_levels) > 1: raise ValueError( "A Collection cannot be initialized from " "variables at more than one level of analysis. " "Levels found in input variables: %s" % var_levels ) elif not var_levels: raise ValueError( "None of the provided variables matched any of the known levels, which are: %s" % (", ".join(sorted(SOURCE_TO_LEVEL.values()))) ) self.level = list(var_levels)[0] variables = self.merge_variables(variables) self.variables = {v.name: v for v in variables} self._index_entities() # Container for variable groups (see BIDS-StatsModels spec)--maps from # group names to lists of variables. self.groups = {}
@staticmethod def merge_variables(variables, **kwargs): """Concatenates Variables along row axis. Parameters ---------- variables : list List of Variables to merge. Variables can have different names (and all Variables that share a name will be concatenated together). Returns ------- list A list of Variables. """ var_dict = OrderedDict() for v in variables: if v.name not in var_dict: var_dict[v.name] = [] var_dict[v.name].append(v) return [merge_variables(vars_, **kwargs) for vars_ in list(var_dict.values())] def to_df( self, variables=None, format="wide", fillna=np.nan, entities=True, timing=True ): """Merge BIDVariables in the collection into a single pandas DataFrame. Parameters ---------- variables : list of str or BIDSVariable Optional list of variables or variable names to retain. If strings are passed, each one gives the name of a variable in the current collection. If BIDSVariables are passed, they will be used as-is. If None, all variables are returned. Strings and BIDSVariables cannot be mixed in the list. format : {'wide', 'long'} Whether to return a DataFrame in 'wide' or 'long' format. In 'wide' format, each row is defined by a unique entity combination, and each variable is in a separate column. In 'long' format, each row is a unique combination of entities and variable names, and a single 'amplitude' column provides the value. fillna : value Replace missing values with the specified value. entities : bool Whether or not to include a column for each entity. timing : bool Whether or not to include onset and duration columns. Returns ------- :obj:`pandas.DataFrame` A pandas DataFrame. """ if variables is None: variables = list(self.variables.keys()) # Can receive already-selected Variables from sub-classes if not isinstance(variables[0], BIDSVariable): variables = [v for v in self.variables.values() if v.name in variables] # Convert all variables to separate DFs. # Note: bad things can happen if we pass the conditions, entities, and # timing flags through to the individual variables and then do # concat/reshaping operations. So instead, we set them all to True # temporarily, do what we need to, then drop them later if needed. dfs = [v.to_df(True, True, timing=True) for v in variables] # Always concatenate along row axis (for format='wide', we'll pivot). df = pd.concat(dfs, axis=0, sort=True) all_cols = set(df.columns) ent_cols = list(all_cols - {"condition", "amplitude", "onset", "duration"}) if format == "long": df = df.reset_index(drop=True).fillna(fillna) else: # Rows in wide format can only be defined by combinations of level entities # plus (for run-level variables) onset and duration. valid_vars = {"run", "session", "subject", "dataset", "onset", "duration"} idx_cols = list(valid_vars & all_cols) df["amplitude"] = df["amplitude"].fillna("n/a") wide_df = df.pivot_table( index=idx_cols, columns="condition", values="amplitude", aggfunc="first" ) select_cols = list(set(ent_cols) - set(idx_cols)) if entities and select_cols: ent_df = df.groupby(idx_cols)[select_cols].first() df = pd.concat([wide_df, ent_df], axis=1) else: df = wide_df df = df.reset_index().replace("n/a", fillna) df.columns.name = None # Drop any columns we don't want if not timing: df.drop(["onset", "duration"], axis=1, inplace=True) if not entities: df.drop(ent_cols, axis=1, inplace=True, errors="ignore") return df @classmethod def from_df(cls, data, entities=None, source="contrast"): """Create a Collection from a pandas DataFrame. Parameters ---------- df : :obj:`pandas.DataFrame` The DataFrame to convert to a Collection. Each column will be converted to a SimpleVariable. entities : :obj:`pandas.DataFrame` An optional second DataFrame containing entity information. source : str The value to set as the source for all Variables. Returns ------- BIDSVariableCollection """ variables = [] for col in data.columns: _data = pd.DataFrame(data[col].values, columns=["amplitude"]) if entities is not None: _data = pd.concat([_data, entities], axis=1, sort=True) variables.append(SimpleVariable(name=col, data=_data, source=source)) return BIDSVariableCollection(variables) def clone(self): """Returns a copy of the current instance. """ # We can't simply deepcopy, because variables have non-serializable # attributes. So we shallow copy then explicitly clone collections and # more complex objects. clone = copy(self) clone.entities = self.entities.copy() clone.variables = {k: v.clone() for (k, v) in self.variables.items()} return clone def _index_entities(self): """Sets current instance's entities based on the existing index. Notes ----- Only entity key/value pairs common to all rows in all contained Variables are returned. E.g., if a Collection contains Variables extracted from runs 1, 2 and 3 from subject '01', the returned dict will be {'subject': '01'}; the runs will be excluded as they vary across the Collection contents. """ all_ents = pd.DataFrame.from_records( [v.entities for v in self.variables.values()] ) constant = all_ents.apply(lambda x: x.nunique() == 1) if constant.empty: self.entities = {} else: keep = all_ents.columns[constant] ents = {k: all_ents[k].dropna().iloc[0] for k in keep} self.entities = {k: v for k, v in ents.items() if pd.notnull(v)} def __getitem__(self, var): if var in self.variables: return self.variables[var] keys = list(self.variables.keys()) raise ValueError( "No variable named '{}' found in this collection. " "Available names are {}.".format(var, keys) ) def __setitem__(self, var, obj): # Ensure name matches collection key, but raise warning if needed. if obj.name != var: warnings.warn( "The provided key to use in the collection ('%s') " "does not match the passed Column object's existing " "name ('%s'). The Column name will be set to match " "the provided key." % (var, obj.name) ) obj.name = var self.variables[var] = obj def match_variables(self, pattern, return_type="name", match_type="unix"): """Return columns whose names match the provided pattern. Parameters ---------- pattern : str, list One or more regex patterns to match all variable names against. return_type : {'name', 'variable'} What to return. Must be one of: 'name': Returns a list of names of matching variables. 'variable': Returns a list of Variable objects whose names match. match_type : str Matching approach to use. Either 'regex' (full-blown regular expression matching) or 'unix' (unix-style pattern matching via the fnmatch module). Returns ------- A list of all matching variables or variable names """ pattern = listify(pattern) results = [] for patt in pattern: if match_type.lower().startswith("re"): patt = re.compile(patt) vars_ = [v for v in self.variables.keys() if patt.search(v)] else: vars_ = fnmatch.filter(list(self.variables.keys()), patt) if return_type.startswith("var"): vars_ = [self.variables[v] for v in vars_] results.extend(vars_) return results def __repr__(self): return f"<{self.__class__.__name__}{sorted(list(self.variables.keys()))}>"
[docs] class BIDSRunVariableCollection(BIDSVariableCollection): """A container for one or more RunVariables--i.e., Variables that have a temporal dimension. Parameters ---------- variables : list A list of SparseRunVariable and/or DenseRunVariable. sampling_rate : float Sampling rate (in Hz) to use when working with dense representations of variables. If None, defaults to 10. Notes ----- Variables in the list must all be at the 'run' level. For other levels (session, subject, or dataset), use the BIDSVariableCollection. """
[docs] def __init__(self, variables, sampling_rate=None): # Don't put the default value in signature because None is passed from # several places and we don't want multiple conflicting defaults. if sampling_rate: if isinstance(sampling_rate, str): raise ValueError("Sampling rate must be numeric.") self.sampling_rate = sampling_rate or 10 super(BIDSRunVariableCollection, self).__init__(variables)
def get_dense_variables(self, variables=None): """Returns a list of all stored DenseRunVariables.""" if variables is None: variables = set(self.variables.keys()) return [ v for v in self.variables.values() if isinstance(v, DenseRunVariable) and v.name in variables ] def get_sparse_variables(self, variables=None): """Returns a list of all stored SparseRunVariables.""" if variables is None: variables = set(self.variables.keys()) return [ v for v in self.variables.values() if isinstance(v, SparseRunVariable) and v.name in variables ] def all_dense(self): return len(self.get_dense_variables()) == len(self.variables) def all_sparse(self): return len(self.get_sparse_variables()) == len(self.variables) def _get_sampling_rate(self, sampling_rate): """Parse sampling rate argument and return appropriate value.""" if sampling_rate is None: return self.sampling_rate if isinstance(sampling_rate, (float, int)): return sampling_rate if sampling_rate == "TR": trs = {var.run_info[0].tr for var in self.variables.values()} if not trs: raise ValueError( "Repetition time unavailable; specify " "sampling_rate in Hz explicitly or set to" " 'highest'." ) elif len(trs) > 1: raise ValueError( "Non-unique Repetition times found " "({!r}); specify sampling_rate explicitly".format(trs) ) return 1.0 / trs.pop() if sampling_rate.lower() == "highest": dense_vars = self.get_dense_variables() # If no dense variables are available, fall back on instance SR if not dense_vars: return self.sampling_rate var_srs = [v.sampling_rate for v in dense_vars] if len(var_srs) == 1: return var_srs[0] return max(*var_srs) raise ValueError( "Invalid sampling_rate value '{}' provided. Must be " "a float, None, 'TR', or 'highest'.".format(sampling_rate) ) def _densify_and_resample( self, sampling_rate=None, variables=None, resample_dense=False, force_dense=False, in_place=False, kind="linear", ): sr = self._get_sampling_rate(sampling_rate) _dense, _sparse = [], [] # Filter variables and sort by class for name, var in self.variables.items(): if variables is not None and name not in variables: continue if isinstance(var, DenseRunVariable): _dense.append(var) else: _sparse.append(var) _variables = {} if force_dense: for v in _sparse: if is_numeric_dtype(v.values): _variables[v.name] = v.to_dense(sr) if resample_dense: # Propagate 'TR' if exact match to TR is required sr_arg = sampling_rate if sampling_rate == 'TR' else sr for v in _dense: _variables[v.name] = v.resample(sr_arg, kind=kind) for v in _dense: if v.name not in _variables: _variables[v.name] = v coll = self if in_place else self.clone() if in_place: coll.variables.update(_variables) else: coll.variables = _variables coll.sampling_rate = sr return coll def to_dense( self, sampling_rate=None, variables=None, in_place=False, resample_dense=False, kind="linear" ): """Convert all contained SparseRunVariables to DenseRunVariables. Parameters ---------- sampling_rate : None, {'TR', 'highest'}, float Sampling rate to use when densifying sparse variables. If None, uses the currently stored instance value. If 'TR', the repetition time is used, if available, to select the sampling rate (1/TR). If 'highest', all variables are resampled to the highest sampling rate of any of the existing dense variables. The sampling rate may also be specified explicitly in Hz as a float. variables : list Optional list of names of Variables to resample. If None, all variables are resampled. in_place : bool When True, all variables are overwritten in-place. When False, returns resampled versions of all variables. kind : str Argument to pass to scipy's interp1d; indicates the kind of interpolation approach to use. See interp1d docs for valid values. Returns ------- A BIDSVariableCollection (if in_place is False). Notes ----- Categorical variables are ignored. """ return self._densify_and_resample( sampling_rate, variables, resample_dense=resample_dense, in_place=in_place, kind=kind, force_dense=True, ) def resample( self, sampling_rate=None, variables=None, force_dense=False, in_place=False, kind="linear", ): """Resample all dense variables (and optionally, sparse ones) to the specified sampling rate. Parameters ---------- sampling_rate : int or float Target sampling rate (in Hz). If None, uses the instance value. variables : list Optional list of names of Variables to resample. If None, all variables are resampled. force_dense : bool if True, all sparse variables will be forced to dense. in_place : bool When True, all variables are overwritten in-place. When False, returns resampled versions of all variables. kind : str Argument to pass to scipy's interp1d; indicates the kind of interpolation approach to use. See interp1d docs for valid values. Returns ------- A BIDSVariableCollection (if in_place is False). """ return self._densify_and_resample( sampling_rate, variables, force_dense=force_dense, in_place=in_place, kind=kind, resample_dense=True, ) def to_df( self, variables=None, format="wide", fillna=np.nan, sampling_rate="highest", include_sparse=True, include_dense=True, entities=True, timing=True, ): """Merge variables into a single pandas DataFrame. Parameters ---------- variables : list Optional list of variable names to retain; if None, all variables are written out. format : str Whether to return a DataFrame in 'wide' or 'long' format. In 'wide' format, each row is defined by a unique onset/duration, and each variable is in a separate column. In 'long' format, each row is a unique combination of onset, duration, and variable name, and a single 'amplitude' column provides the value. fillna : value Replace missing values with the specified value. sampling_rate : float Specifies the sampling rate to use for all variables in the event that resampling needs to be performed (i.e., if some variables are sparse, or if dense variables have different sampling rates). Must be one of 'TR', 'highest', None, or a float (specifying the rate in Hz). If None, uses the instance sampling rate (10 Hz by default). include_sparse : bool Whether or not to include sparse variables in the output. include_dense : bool Whether or not to include dense variables in the output. entities : bool Whether or not to include a column for each entity. timing : bool Whether or not to include onset and duration columns. Returns ------- :obj:`pandas.DataFrame` A pandas DataFrame. Notes ----- The precise format of the resulting DataFrame depends on the variables contained in the current instance. If all variables are sparse, the output will also be sparse--i.e., the events in the DataFrame may have non-uniform timing. If at least one dense variable is present, and the user has not explicitly excluded dense variables (by setting include_dense=False), all selected variables will be implicitly converted to dense using the specified `sampling_rate` (if provided). To avoid unexpected behavior, we recommend converting mixed collections to all-dense form explicitly via the `to_dense()` or `resample()` methods before calling `to_df()`. """ if not include_sparse and not include_dense: raise ValueError( "You can't exclude both dense and sparse " "variables! That leaves nothing!" ) _vars = [] if include_sparse: _vars += self.get_sparse_variables(variables) if include_dense: _vars += self.get_dense_variables(variables) if not _vars: raise ValueError("No variables were selected for output.") # If all variables are sparse/simple, we can pass them as-is. Otherwise # we first force all variables to dense via .resample(). if all(isinstance(v, SimpleVariable) for v in _vars): variables = _vars else: sampling_rate = sampling_rate or self.sampling_rate var_names = [v.name for v in _vars] collection = self.resample( sampling_rate, variables=var_names, force_dense=include_sparse ) variables = list(collection.variables.values()) return super().to_df( variables, format, fillna, entities=entities, timing=timing )
[docs] def merge_collections(collections, sampling_rate="highest", output_level=None, variables=None): """Merge two or more collections at the same level of analysis. Parameters ---------- collections : list List of Collections to merge. sampling_rate : int or str Sampling rate to use if it becomes necessary to resample DenseRunVariables. Either an integer or 'highest' (see merge_variables docstring for further explanation). output_level : str, optional Assign a new level (e.g., 'run', 'subject', etc.) to the merged collection. If None, the current level is retained. variables : list Optional list of names of variables to keep. If None, all are retained. Returns ------- BIDSVariableCollection or BIDSRunVariableCollection Result type depends on the type of the input collections. """ collections = listify(collections) if len(collections) == 1 and variables is None: return collections[0] levels = set([c.level for c in collections]) if len(levels) > 1: raise ValueError( "At the moment, it's only possible to merge " "Collections at the same level of analysis. You " "passed collections at levels: %s." % levels ) cls = collections[0].__class__ # Flatten all variables from all collections into a single list keep_vars = list(chain(*[c.variables.values() for c in collections])) if variables is not None: keep_vars = [var for var in keep_vars if var.name in variables] variables = keep_vars # merge_variables will automatically merge all variables that share name variables = cls.merge_variables(variables, sampling_rate=sampling_rate) if isinstance(collections[0], BIDSRunVariableCollection): if sampling_rate == 'highest': rates = [ var.sampling_rate for var in variables if isinstance(var, DenseRunVariable) ] # TODO: this looks like it takes first, not highest... verify sampling_rate = rates[0] if rates else None return cls(variables, sampling_rate) # For non-run collections, we may need to set a different output level coll = cls(variables) if output_level is not None: coll.level = output_level return coll