Source code for bids.variables.variables

""" Classes for representing BIDS variables. """

import math
import warnings
from copy import deepcopy
from abc import abstractmethod, ABCMeta
from itertools import chain
from functools import reduce

import numpy as np
import pandas as pd

from bids.utils import listify

class BIDSVariable(metaclass=ABCMeta):
    """Base representation of a column in a BIDS project. """

    # Columns that define special properties (e.g., onset, duration). These
    # will be stored separately from the main data object, and are accessible
    # as properties on the BIDSVariable instance.
    _property_columns = set()

    def __init__(self, name, values, source):
        self.name = name
        self.values = values
        self.source = source
        self.entities = self._extract_entities()

    def __repr__(self):
        return f"<{self.__class__.__name__}(name='{self.name}', source='{self.source}')>"

    def clone(self, data=None, **kwargs):
        """Clone (deep copy) the current column, optionally replacing its
        data and/or any other attributes.

        Parameters
        ----------
        data : :obj:`pandas.DataFrame` or array_like
            Optional new data to substitute into
            the cloned column. Must have same dimensionality as the
            original.
        kwargs : dict
            Optional keyword arguments containing new attribute
            values to set in the copy. E.g., passing `name='my_name'`
            would set the `.name` attribute on the cloned instance to the
            passed value.
        """
        result = deepcopy(self)
        if data is not None:
            if data.shape != self.values.shape:
                # If data can be re-shaped safely, do so
                if data.squeeze().shape == self.values.squeeze().shape:
                    data = data.values.reshape(self.values.shape)
                else:
                    raise ValueError("Replacement data has shape %s; must have"
                                     " same shape as existing data %s." %
                                     (data.shape, self.values.shape))
            result.values = pd.DataFrame(data)

        if kwargs:
            for k, v in kwargs.items():
                setattr(result, k, v)

        # Need to update name on Series as well
        # result.values.name = kwargs.get('name', self.name)
        return result

    def filter(self, filters=None, query=None, strict=False, inplace=False):
        """Returns a copy of the current Variable with only rows that match
        the filters retained.

        Parameters
        ----------
        filters : dict
            Dictionary of filters to apply. Keys can be either
            'amplitude' or any named entity. Values must be single values
            or lists.
        query : str
            Optional query string to pass to df.query(). Will not
            be validated in any way, so must have valid column names. Takes
            precedence over filters in the event that both are passed.
        strict : bool
            By default, keys in 'filters' that cannot be found
            in the Variable will be silently ignored. If strict=True, None
            will be returned in such cases.
        inplace : bool
            If True, filtering is performed in place. If False,
            a filtered copy of the Variable is returned.

        Returns
        -------
        BIDSVariable or None if no rows are left after filtering.
        """

        if filters is None and query is None:
            raise ValueError("Either the 'filters' or the 'query' argument "
                             "must be provided!")

        if filters is not None and query is None:
            query = []
            for name, val in filters.items():
                if name != 'amplitude' and name not in self.index.columns:
                    if strict:
                        return None
                    continue
                oper = 'in' if isinstance(val, (list, tuple)) else '=='
                q = '{name} {oper} {val}'.format(name=name, oper=oper,
                                                 val=repr(val))
                query.append(q)
            query = ' and '.join(query)

        var = self if inplace else self.clone()

        if query:
            inds = self.to_df().query(query).index

            var.values = var.values.loc[inds]
            var.index = var.index.loc[inds]
            if hasattr(self, '_build_entity_index'):
                var._build_entity_index()

        if not inplace:
            return var

    @classmethod
    def merge(cls, variables, name=None, **kwargs):
        """Merge/concatenate a list of variables along the row axis.

        Parameters
        ----------
        variables : list
            A list of Variables to merge.
        name : str
            Optional name to assign to the output Variable. By
            default, uses the same name as the input variables.
        kwargs : dict
            Optional keyword arguments to pass onto the class-specific
            merge() call. See merge_variables docstring for details.

        Returns
        -------
        A single BIDSVariable of the same class as the input variables.

        See also
        --------
        merge_variables
        """

        variables = listify(variables)
        if len(variables) == 1:
            return variables[0]

        var_names = set([v.name for v in variables])
        if len(var_names) > 1:
            raise ValueError("Columns with different names cannot be merged. "
                             "Column names provided: %s" % var_names)

        if name is None:
            name = variables[0].name

        return cls._merge(variables, name, **kwargs)

    @classmethod
    @abstractmethod
    def _merge(cls, variables, name, **kwargs):
        pass

    def get_grouper(self, groupby='run'):
        """Return a list suitable for use in groupby calls.

        Parameters
        ----------
        groupby : str or list
            Name(s) of column(s) defining the grouper
            object. Anything that would be valid inside a .groupby() call
            on a pandas structure.

        Returns
        -------
        list
            A list defining the groups.
        """
        grouper = self.index.loc[:, groupby]
        return grouper.apply(lambda x: '@@@'.join(x.astype(str).values),
                             axis=1)

    def apply(self, func, groupby='run', *args, **kwargs):
        """Applies the passed function to the groups defined by the groupby
        argument. Works identically to the standard pandas df.groupby() call.

        Parameters
        ----------
        func : callable
            The function to apply to each group.
        groupby : str or list
            Name(s) of column(s) defining the grouping.
        args, kwargs : dict
            Optional positional and keyword arguments to pass
            onto the function call.
        """
        grouper = self.get_grouper(groupby)
        return self.values.groupby(grouper, group_keys=False).apply(func, *args, **kwargs)

    def to_df(self, condition=True, entities=True, **kwargs):
        """Convert to a DataFrame, with columns for name and entities.

        Parameters
        ----------
        condition : bool
            If True, adds a column for condition name, and
            names the amplitude column 'amplitude'. If False, returns just
            onset, duration, and amplitude, and gives the amplitude column
            the current column name.
        entities : bool
            If True, adds extra columns for all entities.
        """
        amp = 'amplitude' if condition else self.name
        data = pd.DataFrame({amp: self.values.values.ravel()})

        for sc in self._property_columns:
            data[sc] = getattr(self, sc)

        if condition:
            data['condition'] = self.name

        if entities:
            ent_data = self.index.reset_index(drop=True)
            data = pd.concat([data, ent_data], axis=1, sort=True)

        return data.reset_index(drop=True)

    def _extract_entities(self):
        """Returns a dict of all non-varying entities for the current Variable.

        Notes
        -----
        Only entity key/value pairs common to all rows in the Variable
        are returned. E.g., if a Variable contains events extracted from
        runs 1, 2 and 3 from subject '01', the returned dict will be
        {'subject': '01'}; the runs will be excluded as they vary across
        the Variable contents.
        """
        def is_unique(s):
            a = s.to_numpy()
            return (a[0] == a).all()

        constant = self.index.apply(is_unique)
        if constant.empty:
            return {}
        else:
            keep = self.index.columns[constant]
            first_row_ix = self.index.index[0]
            res = {}
            for k in keep:
                v = self.index.loc[first_row_ix, k]
                if pd.isna(v): # Only drop NaNs if we get that on first try
                    v = self.index[k].dropna().iloc[0]
                res[k] = v
            return res


[docs] class SimpleVariable(BIDSVariable): """Represents a simple design matrix column that has no timing information. Parameters ---------- name : str Name of the column. data : :obj:`pandas.DataFrame` A pandas DataFrame minimally containing a column named 'amplitude' as well as any identifying entities. source : str The type of BIDS variable file the data were extracted from. Must be one of: 'events', 'physio', 'stim', 'regressors', 'scans', 'sessions', 'participants', or 'beh'. kwargs : dict Optional keyword arguments passed onto superclass. """ _entity_columns = {'condition', 'amplitude'}
[docs] def __init__(self, name, data, source, **kwargs): ent_cols = list(set(data.columns) - self._entity_columns) self.index = data.loc[:, ent_cols] values = data['amplitude'].reset_index(drop=True) values.name = name super(SimpleVariable, self).__init__(name, values, source)
def split(self, grouper): """Split the current SparseRunVariable into multiple columns. Parameters ---------- grouper : :obj:`pandas.DataFrame` Binary DF specifying the design matrix to use for splitting. Number of rows must match current ``SparseRunVariable``; a new ``SparseRunVariable`` will be generated for each column in the grouper. Returns ------- A list of SparseRunVariables, one per column in the grouper DF. """ data = self.to_df(condition=True, entities=True) data = data.drop('condition', axis=1) subsets = [] for i, col_name in enumerate(grouper.columns): col_data = data.loc[grouper[col_name].astype(bool), :] name = '{}.{}'.format(self.name, col_name) col = self.__class__(name=name, data=col_data, source=self.source, run_info=getattr(self, 'run_info', None)) subsets.append(col) return subsets @classmethod def _merge(cls, variables, name, **kwargs): dfs = [v.to_df() for v in variables] data = pd.concat(dfs, axis=0, sort=True).reset_index(drop=True) data = data.rename(columns={name: 'amplitude'}) return cls(name, data, source=variables[0].source, **kwargs) def select_rows(self, rows): """Truncate internal arrays to keep only the specified rows. Parameters ---------- rows : array_like An integer or boolean array identifying the indices of rows to keep. """ self.values = self.values.iloc[rows] self.index = self.index.iloc[rows, :] for prop in self._property_columns: vals = getattr(self, prop)[rows] setattr(self, prop, vals)
[docs] class SparseRunVariable(SimpleVariable): """A sparse representation of a single column of events. Parameters ---------- name : str Name of the column. data : :obj:`pandas.DataFrame` A pandas DataFrame minimally containing the columns 'onset', 'duration', and 'amplitude'. run_info : list A list of RunInfo objects carrying information about all runs represented in the Variable. source : str The type of BIDS variable file the data were extracted from. Must be one of: 'events', 'physio', 'stim', 'regressors', 'scans', 'sessions', 'participants', or 'beh'. kwargs : dict Optional keyword arguments passed onto superclass. """ _property_columns = {'onset', 'duration'}
[docs] def __init__(self, name, data, run_info, source, **kwargs): if hasattr(run_info, 'duration'): run_info = [run_info] if not isinstance(run_info, list): raise TypeError("We expect a list of run_info, got %s" % repr(run_info)) self.run_info = run_info for sc in self._property_columns: setattr(self, sc, data.pop(sc).values) super(SparseRunVariable, self).__init__(name, data, source, **kwargs)
def get_duration(self): """Return the total duration of the Variable's run(s). """ return sum([r.duration for r in self.run_info]) def to_dense(self, sampling_rate=None): """Convert the current sparse column to a dense representation. If sampling_rate is not provided, the largest interval able to faithfully represent all onsets and durations will be determined. The sampling rate is the reciprocal of that interval. Parameters ---------- sampling_rate : float or None Sampling rate (in Hz) to use when constructing the DenseRunVariable Returns ------- DenseRunVariable """ # Cast onsets and durations to milliseconds onsets = np.round(self.onset * 1000).astype(int) durations = np.round(self.duration * 1000).astype(int) gcd = np.gcd.reduce(np.r_[onsets, durations]) bin_sr = 1000. / gcd # never use a computed SR smaller than the requested one, because # when events are widely-spaced and timing is very regular, this can # result in a nasty loss of precision in the resampling step. if sampling_rate is not None: bin_sr = max(bin_sr, sampling_rate) else: sampling_rate = bin_sr duration = int(math.ceil(bin_sr * self.get_duration())) ts = np.zeros(duration, dtype=self.values.dtype) onsets = np.round(self.onset * bin_sr).astype(int) durations = np.round(self.duration * bin_sr).astype(int) run_i, start, last_ind = 0, 0, 0 for i, val in enumerate(self.values.values): if onsets[i] < last_ind: start += self.run_info[run_i].duration * bin_sr run_i += 1 _onset = int(start + onsets[i]) _offset = int(_onset + durations[i]) if _onset >= duration: warnings.warn("The onset time of a variable seems to exceed " "the runs duration, hence runs are incremented " "by one internally.") ts[_onset:_offset] = val last_ind = onsets[i] if bin_sr != sampling_rate: # Resample the time series to the requested sampling rate num = int(math.ceil(duration * sampling_rate / bin_sr)) ts = _resample(ts, sampling_rate, bin_sr, num) run_info = list(self.run_info) dense_var = DenseRunVariable( name=self.name, values=ts, run_info=run_info, source=self.source, sampling_rate=sampling_rate) return dense_var def _extract_entities(self): # Get all entities common to all runs. The super method already does # this for entities that show up in filenames, so we just add the # ones that show up in the RunInfo tuples, as those include metadata. ent_items = [run.entities.items() for run in self.run_info] entities = reduce(lambda x, y: x & y, ent_items, ent_items[0]) base_ents = super()._extract_entities() return dict(entities, **base_ents) @classmethod def _merge(cls, variables, name, **kwargs): run_info = list(chain(*[v.run_info for v in variables])) return super(SparseRunVariable, cls)._merge(variables, name, run_info=run_info, **kwargs)
[docs] class DenseRunVariable(BIDSVariable): """A dense representation of a single column. Parameters ---------- name : :obj:`str` The name of the column. values : :obj:`numpy.ndarray` The values/amplitudes to store. run_info : :obj:`list` A list of RunInfo objects carrying information about all runs represented in the Variable. source : {'events', 'physio', 'stim', 'regressors', 'scans', 'sessions', 'participants', 'beh'} The type of BIDS variable file the data were extracted from. sampling_rate : :obj:`float` Mandatory sampling rate (in Hz) to use. Must match the sampling rate used to generate the values. """
[docs] def __init__(self, name, values, run_info, source, sampling_rate): values = pd.DataFrame(values) if not isinstance(sampling_rate, (float, int)): raise TypeError("sampling_rate must be a float or integer, not %s" % type(sampling_rate)) if hasattr(run_info, 'duration'): run_info = [run_info] self.run_info = run_info self.sampling_rate = sampling_rate self.index = self._build_entity_index(run_info, sampling_rate) super(DenseRunVariable, self).__init__(name, values, source)
def split(self, grouper): """Split the current DenseRunVariable into multiple columns. Parameters ---------- grouper : :obj:`pandas.DataFrame` Binary DF specifying the design matrix to use for splitting. Number of rows must match current ``DenseRunVariable``; a new ``DenseRunVariable`` will be generated for each column in the grouper. Returns ------- A list of DenseRunVariables, one per unique value in the grouper. """ values = grouper.values * self.values.values df = pd.DataFrame(values, columns=grouper.columns) return [DenseRunVariable(name='%s.%s' % (self.name, name), values=df[name].values, run_info=self.run_info, source=self.source, sampling_rate=self.sampling_rate) for i, name in enumerate(df.columns)] def _build_entity_index(self, run_info, sampling_rate, match_vol=False): """Build the entity index from run information. """ interval = int(round(1000. / sampling_rate)) def _create_index(all_keys, all_reps, all_ents): all_keys = np.array(sorted(all_keys)) df = pd.DataFrame(np.zeros((sum(all_reps), len(all_keys))), columns=all_keys) prev_ix = 0 for i, reps in enumerate(all_reps): for k, v in all_ents[i].items(): col_ix = np.where(all_keys == k)[0][0] df.iloc[prev_ix:prev_ix + reps, col_ix] = v prev_ix = reps return df all_reps = [] all_ents = [] all_keys = set() for run in run_info: if match_vol: # If TR, fix reps to n_vols to ensure match all_reps.append(run.n_vols) else: all_reps.append(int(math.ceil(run.duration * sampling_rate))) all_ents.append(run.entities) all_keys.update(run.entities.keys()) self.timestamps = pd.date_range(0, periods=sum(all_reps), freq='%sms' % interval) return _create_index(all_keys, all_reps, all_ents) def resample(self, sampling_rate, inplace=False, kind='linear'): """Resample the Variable to the specified sampling rate. Parameters ---------- sampling_rate : :obj:`int`, :obj:`float` Target sampling rate (in Hz). inplace : :obj:`bool`, optional If True, performs resampling in-place. If False, returns a resampled copy of the current Variable. Default is False. kind : {'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic'} Argument to pass to :obj:`scipy.interpolate.interp1d`; indicates the kind of interpolation approach to use. See interp1d docs for valid values. Default is 'linear'. """ if not inplace: var = self.clone() var.resample(sampling_rate, True, kind) return var match_vol = False if sampling_rate == 'TR': match_vol = True sampling_rate = 1. / self.run_info[0].tr if sampling_rate == self.sampling_rate: return self.index = self._build_entity_index(self.run_info, sampling_rate, match_vol) self.values = pd.DataFrame( _resample( self.values.values.ravel(), sampling_rate, self.sampling_rate, len(self.index), kind=kind) ) assert len(self.values) == len(self.index) self.sampling_rate = sampling_rate def to_df(self, condition=True, entities=True, timing=True, sampling_rate=None): """Convert to a DataFrame, with columns for name and entities. Parameters ---------- condition : :obj:`bool` If True, adds a column for condition name, and names the amplitude column 'amplitude'. If False, returns just onset, duration, and amplitude, and gives the amplitude column the current column name. entities : :obj:`bool` If True, adds extra columns for all entities. timing : :obj:`bool` If True, includes onset and duration columns (even though events are sampled uniformly). If False, omits them. """ if sampling_rate not in (None, self.sampling_rate): return self.resample(sampling_rate).to_df(condition, entities) df = super(DenseRunVariable, self).to_df(condition, entities) if timing: df['onset'] = self.timestamps.values.astype(float) / 1e+9 df['duration'] = 1. / self.sampling_rate return df @classmethod def _merge(cls, variables, name, sampling_rate=None, **kwargs): if not isinstance(sampling_rate, int): rates = set([v.sampling_rate for v in variables]) if len(rates) == 1: sampling_rate = list(rates)[0] else: if sampling_rate == 'auto': sampling_rate = max(rates) else: msg = ("Cannot merge DenseRunVariables (%s) with different" " sampling rates (%s). Either specify an integer " "sampling rate to use for all variables, or set " "sampling_rate='highest' to use the highest sampling" " rate found." % (name, rates)) raise ValueError(msg) variables = [v.resample(sampling_rate) for v in variables] values = pd.concat([v.values for v in variables], axis=0, sort=True) run_info = list(chain(*[v.run_info for v in variables])) source = variables[0].source return DenseRunVariable( name=name, values=values, run_info=run_info, source=source, sampling_rate=sampling_rate)
[docs] def merge_variables(variables, **kwargs): """Merge/concatenate a list of variables along the row axis. Parameters ---------- variables : :obj:`list` A list of Variables to merge. kwargs Optional keyword arguments to pass onto the class-specific merge() call. Possible args: - sampling_rate (int, str): The sampling rate to use if resampling of DenseRunVariables is necessary for harmonization. If 'highest', the highest sampling rate found will be used. This argument is only used when passing DenseRunVariables in the variables list. Returns ------- A single BIDSVariable of the same class as the input variables. Notes ----- - Currently, this function only support homogeneously-typed lists. In future, it may be extended to support implicit conversion. - Variables in the list must all share the same name (i.e., it is not possible to merge two different variables into a single variable.) """ classes = set([v.__class__ for v in variables]) if len(classes) > 1: raise ValueError("Variables of different classes cannot be merged. " "Variables passed are of classes: %s" % classes) sources = set([v.source for v in variables]) if len(sources) > 1: raise ValueError("Variables extracted from different types of files " "cannot be merged. Sources found: %s" % sources) return list(classes)[0].merge(variables, **kwargs)
def _resample(y, new_sr, old_sr, new_num, kind='linear'): n = len(y) x = np.arange(n) if new_sr < old_sr: # Downsampling, so filter the signal from scipy.signal import butter, filtfilt # cutoff = new Nyqist / old Nyquist b, a = butter(5, (new_sr / 2.0) / (old_sr / 2.0), btype='low', output='ba', analog=False) y = filtfilt(b, a, y) from scipy.interpolate import interp1d f = interp1d(x, y, kind=kind) x_new = np.linspace(0, n - 1, num=new_num) return f(x_new)