Source code for cooltools.api.saddle

from itertools import combinations
from functools import partial
from scipy.linalg import toeplitz
import numpy as np
import pandas as pd
from ..lib import numutils
from ..lib.checks import (
    is_compatible_viewframe,
    is_valid_expected,
    is_cooler_balanced,
    is_track,
)
from ..lib.common import view_from_track, align_track_with_cooler

import warnings
import bioframe


def _merge_dict(a, b):
    return {**a, **b}


def _ecdf(x, v, side="left"):
    """mask_bad_bins
    Return array `x`'s empirical CDF value(s) at the points in `v`.
    This is based on the :func:`statsmodels.distributions.ECDF` step function.
    This is the inverse of `quantile`.

    """
    x = np.asarray(x)
    ind = np.searchsorted(np.sort(x), v, side=side) - 1
    y = np.linspace(1.0 / len(x), 1.0, len(x))
    return y[ind]


def _quantile(x, q, **kwargs):
    """
    Return the values of the quantile cut points specified by fractions `q` of
    a sequence of data given by `x`.

    """
    x = np.asarray(x)
    p = np.asarray(q) * 100
    return np.nanpercentile(x, p, **kwargs)


def _make_cis_obsexp_fetcher(
    clr,
    expected,
    view_df,
    clr_weight_name="weight",
    expected_value_col="balanced.avg",
    view_name_col="name",
):
    """
    Construct a function that returns intra-chromosomal OBS/EXP for symmetrical regions
    defined in view_df.

    Used in `get_saddle()`.

    Parameters
    ----------
    clr : cooler.Cooler
        Observed matrix.
    expected : DataFrame
        Diagonal summary statistics for a number of regions.
    view_df: viewframe
        Viewframe with genomic regions.
    clr_weight_name : str
        Name of the column in the clr.bins to use as balancing weights.
    expected_value_col : str
        Name of the column in expected used for normalizing.
    view_name_col : str
        Name of column in view_df with region names.

    Returns
    -------
    getexpected(reg, _). 2nd arg is ignored.

    """
    expected = {
        k: x.values
        for k, x in expected.groupby(["region1", "region2"])[expected_value_col]
    }
    view_df = view_df.set_index(view_name_col)

    def _fetch_cis_oe(reg1, reg2):
        reg1_coords = tuple(view_df.loc[reg1])
        reg2_coords = tuple(view_df.loc[reg2])
        obs_mat = clr.matrix(balance=clr_weight_name).fetch(reg1_coords, reg2_coords)
        exp_mat = toeplitz(expected[reg1, reg2][: obs_mat.shape[0]])
        return obs_mat / exp_mat

    return _fetch_cis_oe


def _make_trans_obsexp_fetcher(
    clr,
    expected,
    view_df,
    clr_weight_name="weight",
    expected_value_col="balanced.avg",
    view_name_col="name",
):

    """
    Construct a function that returns OBS/EXP for any pair of chromosomes.

    Used in `get_saddle()`.

    Parameters
    ----------
    clr : cooler.Cooler
        Observed matrix.
    expected : DataFrame or scalar
        Average trans values. If a scalar, it is assumed to be a global trans
        expected value. If a tuple of (dataframe, name), the dataframe must
        have a MultiIndex with 'region1' and 'region2' and must also have a column
        labeled ``name``, with the values of expected.
    view_df: viewframe
        Viewframe with genomic regions.
    clr_weight_name : str
        Name of the column in the clr.bins to use as balancing weights
    expected_value_col : str
        Name of the column in expected used for normalizing.
    view_name_col : str
        Name of column in view_df with region names.

    Returns
    -----
    getexpected(reg1, reg2)

    """

    view_df = view_df.set_index(view_name_col)

    if np.isscalar(expected):

        def _fetch_trans_oe(reg1, reg2):
            reg1_coords = tuple(view_df.loc[reg1])
            reg2_coords = tuple(view_df.loc[reg2])
            obs_mat = clr.matrix(balance=clr_weight_name).fetch(
                reg1_coords, reg2_coords
            )
            return obs_mat / expected

        return _fetch_trans_oe

    elif type(expected) is pd.core.frame.DataFrame:

        expected = {
            k: x.values
            for k, x in expected.groupby(["region1", "region2"])[expected_value_col]
        }

        def _fetch_trans_exp(region1, region2):
            # Handle region flipping
            if (region1, region2) in expected.keys():
                return expected[region1, region2]
            elif (region2, region1) in expected.keys():
                return expected[region2, region1]
            # .loc is the right way to get [region1,region2] value from MultiIndex df:
            # https://pandas.pydata.org/pandas-docs/stable/advanced.html#advanced-indexing-with-hierarchical-index
            else:
                raise KeyError(
                    "trans-exp index is missing a pair of chromosomes: "
                    "{}, {}".format(region1, region2)
                )

        def _fetch_trans_oe(reg1, reg2):
            reg1_coords = tuple(view_df.loc[reg1])
            reg2_coords = tuple(view_df.loc[reg2])
            obs_mat = clr.matrix(balance=clr_weight_name).fetch(
                reg1_coords, reg2_coords
            )
            exp = _fetch_trans_exp(reg1, reg2)
            return obs_mat / exp

        return _fetch_trans_oe

    else:
        raise ValueError("Unknown type of expected")


def _accumulate(
    S, C, getmatrix, digitized, reg1, reg2, min_diag=3, max_diag=-1, verbose=False
):
    """
    Helper function to aggregate across region pairs.
    If regions are identical, also masks returned matrices below min_diag and above max_diag.

    Used in `get_saddle()`.
    """

    n_bins = S.shape[0]
    matrix = getmatrix(reg1, reg2)

    if reg1 == reg2:
        for d in np.arange(-min_diag + 1, min_diag):
            numutils.set_diag(matrix, np.nan, d)
        if max_diag >= 0:
            for d in np.append(
                np.arange(-matrix.shape[0], -max_diag),
                np.arange(max_diag + 1, matrix.shape[0]),
            ):
                numutils.set_diag(matrix, np.nan, d)

    if verbose:
        print("regions {} vs {}".format(reg1, reg2))

    for i in range(n_bins):
        row_mask = digitized[reg1] == i
        for j in range(n_bins):
            col_mask = digitized[reg2] == j
            data = matrix[row_mask, :][:, col_mask]
            data = data[np.isfinite(data)]
            S[i, j] += np.sum(data)
            C[i, j] += float(len(data))


def _make_binedges(track_values, n_bins, vrange=None, qrange=None):
    """
    Helper function to make bins for `get_digitized()`.

    Nakes binedges in real space from value limits provided by vrange,
    or in quantile space from quantile limits provided by qrange.

    """

    if qrange is not None and vrange is not None:
        raise ValueError("only one of vrange or qrange can be supplied")

    elif vrange is not None:
        lo, hi = vrange
        if lo > hi:
            raise ValueError("vrange does not satisfy vrange[0]<vrange[1]")
        binedges = np.linspace(lo, hi, n_bins + 1)
        return binedges, lo, hi

    elif qrange is not None:
        qlo, qhi = qrange
        if qlo < 0.0 or qhi > 1.0:
            raise ValueError("qrange must specify quantiles in (0.0,1.0)")
        if qlo > qhi:
            raise ValueError("qrange does not satisfy qrange[0]<qrange[1]")
        q_edges = np.linspace(qlo, qhi, n_bins + 1)
        binedges = _quantile(track_values, q_edges)
        return binedges, qlo, qhi

    else:
        raise ValueError("either vrange or qrange must be supplied")


[docs]def digitize( track, n_bins, vrange=None, qrange=None, digitized_suffix=".d", ): """ Digitize genomic signal tracks into integers between `1` and `n`. Parameters ---------- track : 4-column DataFrame bedGraph-like dataframe with columns understood as (chrom,start,end,value). n_bins : int number of bins for signal quantization. vrange : tuple Low and high values used for binning track values. E.g. if `vrange`=(-0.05, 0.05), equal width bins would be generated between the value -0.05 and 0.05. qrange : tuple Low and high values for quantile binning track values. E.g., if `qrange`=(0.02, 0.98) the lower bin would start at the 2nd percentile and the upper bin would end at the 98th percentile of the track value range. Low must be 0.0 or more, high must be 1.0 or less. digitized_suffix : str suffix to append to the track value name in the fourth column. Returns ------- digitized : DataFrame New track dataframe (bedGraph-like) with digitized value column with name suffixed by '.d' The digized column is returned as a categorical. binedges : 1D array (length n + 1) Bin edges used in quantization of track. For `n` bins, there are `n + 1` edges. See encoding details in Notes. Notes ----- The digital encoding is as follows: - `1..n` <-> values assigned to bins defined by vrange or qrange - `0` <-> left outlier values - `n+1` <-> right outlier values - `-1` <-> missing data (NaNs) """ if type(n_bins) is not int: raise ValueError("n_bins must be provided as an int") is_track(track, raise_errors=True) digitized = track.copy() track_value_col = track.columns[3] digitized_col = track_value_col + digitized_suffix track_values = track[track_value_col].copy() track_values = track_values.astype({track_value_col: np.float64}).values binedges, lo, hi = _make_binedges( track_values, n_bins, vrange=vrange, qrange=qrange ) digitized[digitized_col] = np.digitize(track_values, binedges, right=False) # re-assign values equal to the max value to bin n digitized.loc[ digitized[track_value_col] == np.max(binedges), track_value_col ] = n_bins mask = track[track_value_col].isnull() digitized.loc[mask, digitized_col] = -1 digitized_cats = pd.CategoricalDtype( categories=np.arange(-1, n_bins + 2), ordered=True ) digitized = digitized.astype({digitized_col: digitized_cats}) # return a 4-column digitized track digitized = digitized[list(track.columns[:3]) + [digitized_col]] return digitized, binedges
[docs]def saddle( clr, expected, track, contact_type, n_bins, vrange=None, qrange=None, view_df=None, clr_weight_name="weight", expected_value_col="balanced.avg", view_name_col="name", min_diag=3, max_diag=-1, trim_outliers=False, verbose=False, drop_track_na=False, ): """ Get a matrix of average interactions between genomic bin pairs as a function of a specified genomic track. The provided genomic track is either: (a) digitized inside this function by passing 'n_bins', and one of 'v_range' or 'q_range' (b) passed as a pre-digitized track with a categorical value column as generated by `get_digitized()`. Parameters ---------- clr : cooler.Cooler Observed matrix. expected : DataFrame in expected format Diagonal summary statistics for each chromosome, and name of the column with the values of expected to use. contact_type : str If 'cis' then only cis interactions are used to build the matrix. If 'trans', only trans interactions are used. track : DataFrame A track, i.e. BedGraph-like dataframe, which is digitized with the options n_bins, vrange and qrange. Can optionally be passed as a pre-digitized dataFrame with a categorical value column, as generated by get_digitzied(), also passing n_bins as None. n_bins : int or None number of bins for signal quantization. If None, then track must be passed as a pre-digitized track. vrange : tuple Low and high values used for binning track values. See get_digitized(). qrange : tuple Low and high values for quantile binning track values. Low must be 0.0 or more, high must be 1.0 or less. Only one of vrange or qrange can be passed. See get_digitzed(). view_df: viewframe Viewframe with genomic regions. If none, generate from track chromosomes. clr_weight_name : str Name of the column in the clr.bins to use as balancing weights. Using raw unbalanced data is not supported for saddles. expected_value_col : str Name of the column in expected used for normalizing. view_name_col : str Name of column in view_df with region names. min_diag : int Smallest diagonal to include in computation. Ignored with contact_type=trans. max_diag : int Biggest diagonal to include in computation. Ignored with contact_type=trans. trim_outliers : bool, optional Remove first and last row and column from the output matrix. verbose : bool, optional If True then reports progress. drop_track_na : bool, optional If True then drops NaNs in input track (as if they were missing), If False then counts NaNs as present in dataframe. In general, this only adds check form chromosomes that have all missing values, but does not affect the results. Returns ------- interaction_sum : 2D array The matrix of summed interaction probability between two genomic bins given their values of the provided genomic track. interaction_count : 2D array The matrix of the number of genomic bin pairs that contributed to the corresponding pixel of ``interaction_sum``. """ if type(n_bins) is int: # perform digitization track = align_track_with_cooler( track, clr, view_df=view_df, clr_weight_name=clr_weight_name, mask_clr_bad_bins=True, drop_track_na=drop_track_na, # this adds check for chromosomes that have all missing values ) digitized_track, binedges = digitize( track.iloc[:, :4], n_bins, vrange=vrange, qrange=qrange, digitized_suffix=".d", ) digitized_col = digitized_track.columns[3] elif n_bins is None: # assume and test if track is pre-digitized digitized_track = track digitized_col = digitized_track.columns[3] is_track(track.astype({digitized_col: "float"}), raise_errors=True) if ( type(digitized_track.dtypes[3]) is not pd.core.dtypes.dtypes.CategoricalDtype ): raise ValueError( "when n_bins=None, saddle assumes the track has been " + "pre-digitized and the value column is a " + "pandas categorical. See get_digitized()." ) cats = digitized_track[digitized_col].dtype.categories.values # cats has two additional categories, 0 and n_bins+1, for values # falling outside range, as well as -1 for NAs. n_bins = len(cats[cats > -1]) - 2 else: raise ValueError("n_bins must be provided as int or None") if view_df is None: view_df = view_from_track(digitized_track) else: # Make sure view_df is a proper viewframe try: _ = is_compatible_viewframe( view_df, clr, check_sorting=True, # just in case raise_errors=True, ) except Exception as e: raise ValueError("view_df is not a valid viewframe or incompatible") from e # make sure provided expected is compatible try: _ = is_valid_expected( expected, contact_type, view_df, verify_cooler=clr, expected_value_cols=[ expected_value_col, ], raise_errors=True, ) except Exception as e: raise ValueError("provided expected is not compatible") from e # check if cooler is balanced if clr_weight_name: try: _ = is_cooler_balanced(clr, clr_weight_name, raise_errors=True) except Exception as e: raise ValueError( f"provided cooler is not balanced or {clr_weight_name} is missing" ) from e digitized_tracks = {} for num, reg in view_df.iterrows(): digitized_reg = bioframe.select(digitized_track, reg) digitized_tracks[reg[view_name_col]] = digitized_reg[digitized_col] # set "cis" or "trans" for supports (regions to iterate over) and matrix fetcher if contact_type == "cis": # only symmetric intra-chromosomal regions : supports = list(zip(view_df[view_name_col], view_df[view_name_col])) getmatrix = _make_cis_obsexp_fetcher( clr, expected, view_df, view_name_col=view_name_col, expected_value_col=expected_value_col, clr_weight_name=clr_weight_name, ) elif contact_type == "trans": # asymmetric inter-chromosomal regions : supports = list(combinations(view_df[view_name_col], 2)) supports = [ i for i in supports if ( view_df["chrom"].loc[view_df[view_name_col] == i[0]].values != view_df["chrom"].loc[view_df[view_name_col] == i[1]].values ) ] getmatrix = _make_trans_obsexp_fetcher( clr, expected, view_df, view_name_col=view_name_col, expected_value_col=expected_value_col, clr_weight_name=clr_weight_name, ) else: raise ValueError("Allowed values for contact_type are 'cis' or 'trans'.") # n_bins here includes 2 open bins for values <lo and >hi. interaction_sum = np.zeros((n_bins + 2, n_bins + 2)) interaction_count = np.zeros((n_bins + 2, n_bins + 2)) for reg1, reg2 in supports: _accumulate( interaction_sum, interaction_count, getmatrix, digitized_tracks, reg1, reg2, min_diag=min_diag, max_diag=max_diag, verbose=verbose, ) interaction_sum += interaction_sum.T interaction_count += interaction_count.T if trim_outliers: interaction_sum = interaction_sum[1:-1, 1:-1] interaction_count = interaction_count[1:-1, 1:-1] return interaction_sum, interaction_count
[docs]def saddleplot( track, saddledata, n_bins, vrange=None, qrange=(0.0, 1.0), cmap="coolwarm", scale="log", vmin=0.5, vmax=2, color=None, title=None, xlabel=None, ylabel=None, clabel=None, fig=None, fig_kws=None, heatmap_kws=None, margin_kws=None, cbar_kws=None, subplot_spec=None, ): """ Generate a saddle plot. Parameters ---------- track : pd.DataFrame See get_digitized() for details. saddledata : 2D array-like Saddle matrix produced by `make_saddle`. It will include 2 flanking rows/columns for outlier signal values, thus the shape should be `(n+2, n+2)`. cmap : str or matplotlib colormap Colormap to use for plotting the saddle heatmap scale : str Color scaling to use for plotting the saddle heatmap: log or linear vmin, vmax : float Value limits for coloring the saddle heatmap color : matplotlib color value Face color for margin bar plots fig : matplotlib Figure, optional Specified figure to plot on. A new figure is created if none is provided. fig_kws : dict, optional Passed on to `plt.Figure()` heatmap_kws : dict, optional Passed on to `ax.imshow()` margin_kws : dict, optional Passed on to `ax.bar()` and `ax.barh()` cbar_kws : dict, optional Passed on to `plt.colorbar()` subplot_spec : GridSpec object Specify a subregion of a figure to using a GridSpec. Returns ------- Dictionary of axes objects. """ warnings.warn( "Generating a saddleplot will be deprecated in future versions, " + "please see https://github.com/open2c_examples for examples on how to plot saddles.", DeprecationWarning, ) from matplotlib.gridspec import GridSpec, GridSpecFromSubplotSpec from matplotlib.colors import Normalize, LogNorm from matplotlib import ticker import matplotlib.pyplot as plt class MinOneMaxFormatter(ticker.LogFormatter): def set_locs(self, locs=None): self._sublabels = set([vmin % 10 * 10, vmax % 10, 1]) def __call__(self, x, pos=None): if x not in [vmin, 1, vmax]: return "" else: return "{x:g}".format(x=x) track_value_col = track.columns[3] track_values = track[track_value_col].values # Digitize the track and calculate histogram digitized_track, binedges = digitize(track, n_bins, vrange=vrange, qrange=qrange) x = digitized_track[digitized_track.columns[3]].values.astype(int).copy() x = x[(x > -1) & (x < len(binedges) + 1)] hist = np.bincount(x, minlength=len(binedges) + 1) if vrange is not None: lo, hi = vrange if qrange is not None: lo, hi = qrange # Reset the binedges for plotting binedges = np.linspace(lo, hi, n_bins + 1) # Histogram and saddledata are flanked by outlier bins n = saddledata.shape[0] X, Y = np.meshgrid(binedges, binedges) C = saddledata if (n - n_bins) == 2: C = C[1:-1, 1:-1] hist = hist[1:-1] # Layout if subplot_spec is not None: GridSpec = partial(GridSpecFromSubplotSpec, subplot_spec=subplot_spec) grid = {} gs = GridSpec( nrows=3, ncols=3, width_ratios=[0.2, 1, 0.1], height_ratios=[0.2, 1, 0.1], wspace=0.05, hspace=0.05, ) # Figure if fig is None: fig_kws_default = dict(figsize=(5, 5)) fig_kws = _merge_dict(fig_kws_default, fig_kws if fig_kws is not None else {}) fig = plt.figure(**fig_kws) # Heatmap if scale == "log": norm = LogNorm(vmin=vmin, vmax=vmax) elif scale == "linear": norm = Normalize(vmin=vmin, vmax=vmax) else: raise ValueError("Only linear and log color scaling is supported") grid["ax_heatmap"] = ax = plt.subplot(gs[4]) heatmap_kws_default = dict(cmap=cmap, rasterized=True) heatmap_kws = _merge_dict( heatmap_kws_default, heatmap_kws if heatmap_kws is not None else {} ) img = ax.pcolormesh(X, Y, C, norm=norm, **heatmap_kws) plt.gca().yaxis.set_visible(False) # Margins margin_kws_default = dict(edgecolor="k", facecolor=color, linewidth=1) margin_kws = _merge_dict( margin_kws_default, margin_kws if margin_kws is not None else {} ) # left margin hist grid["ax_margin_y"] = plt.subplot(gs[3], sharey=grid["ax_heatmap"]) plt.barh( binedges[:-1], height=np.diff(binedges), width=hist, align="edge", **margin_kws ) plt.xlim(plt.xlim()[1], plt.xlim()[0]) # fliplr plt.ylim(hi, lo) plt.gca().spines["top"].set_visible(False) plt.gca().spines["bottom"].set_visible(False) plt.gca().spines["left"].set_visible(False) plt.gca().xaxis.set_visible(False) # top margin hist grid["ax_margin_x"] = plt.subplot(gs[1], sharex=grid["ax_heatmap"]) plt.bar( binedges[:-1], width=np.diff(binedges), height=hist, align="edge", **margin_kws ) plt.xlim(lo, hi) # plt.ylim(plt.ylim()) # correct plt.gca().spines["top"].set_visible(False) plt.gca().spines["right"].set_visible(False) plt.gca().spines["left"].set_visible(False) plt.gca().xaxis.set_visible(False) plt.gca().yaxis.set_visible(False) # Colorbar grid["ax_cbar"] = plt.subplot(gs[5]) cbar_kws_default = dict(fraction=0.8, label=clabel or "") cbar_kws = _merge_dict(cbar_kws_default, cbar_kws if cbar_kws is not None else {}) if scale == "linear" and vmin is not None and vmax is not None: grid["cbar"] = cb = plt.colorbar(img, **cbar_kws) # cb.set_ticks(np.arange(vmin, vmax + 0.001, 0.5)) # # do linspace between vmin and vmax of 5 segments and trunc to 1 decimal: decimal = 10 nsegments = 5 cd_ticks = np.trunc(np.linspace(vmin, vmax, nsegments) * decimal) / decimal cb.set_ticks(cd_ticks) else: grid["cbar"] = cb = plt.colorbar(img, format=MinOneMaxFormatter(), **cbar_kws) cb.ax.yaxis.set_minor_formatter(MinOneMaxFormatter()) # extra settings grid["ax_heatmap"].set_xlim(lo, hi) grid["ax_heatmap"].set_ylim(hi, lo) plt.grid(False) plt.axis("off") if title is not None: grid["ax_margin_x"].set_title(title) if xlabel is not None: grid["ax_heatmap"].set_xlabel(xlabel) if ylabel is not None: grid["ax_margin_y"].set_ylabel(ylabel) return grid
[docs]def saddle_strength(S, C): """ Parameters ---------- S, C : 2D arrays, square, same shape Saddle sums and counts, respectively Returns ------- 1D array Ratios of cumulative corner interaction scores, where the saddle data is grouped over the AA+BB corners and AB+BA corners with increasing extent. """ m, n = S.shape if m != n: raise ValueError("`saddledata` should be square.") ratios = np.zeros(n) for k in range(1, n): intra_sum = np.nansum(S[0:k, 0:k]) + np.nansum(S[n - k : n, n - k : n]) intra_count = np.nansum(C[0:k, 0:k]) + np.nansum(C[n - k : n, n - k : n]) intra = intra_sum / intra_count inter_sum = np.nansum(S[0:k, n - k : n]) + np.nansum(S[n - k : n, 0:k]) inter_count = np.nansum(C[0:k, n - k : n]) + np.nansum(C[n - k : n, 0:k]) inter = inter_sum / inter_count ratios[k] = intra / inter return ratios