Source code for cooltools.api.eigdecomp

import numpy as np
import scipy
import scipy.stats

import pandas as pd
from ..lib import numutils
from ..lib.checks import is_compatible_viewframe, is_cooler_balanced
from ..lib.common import make_cooler_view, align_track_with_cooler

import bioframe


def _phase_eigs(eigvals, eigvecs, phasing_track, sort_metric=None):
    """
    Flip `eigvecs` to achieve a positive correlation with `phasing_track`.

    Parameters
    ----------
    sort_metric : str
        If provided, re-sort `eigenvecs` and `eigvals` in the order of
        decreasing correlation between phasing_track and eigenvector, using the
        specified measure of correlation. Possible values:
        'pearsonr' - sort by decreasing Pearson correlation.
        'var_explained' - sort by decreasing absolute amount of variation in
        `eigvecs` explained by `phasing_track` (i.e. R^2 * var(eigvec)).
        'MAD_explained' - sort by decreasing absolute amount of Median Absolute
        Deviation from the median of `eigvecs` explained by `phasing_track`
        (i.e. COMED(eigvec, phasing_track) * MAD(eigvec)).
        'spearmanr' - sort by decreasing Spearman correlation.
    """

    corrs = []
    for eigvec in eigvecs:
        mask = np.isfinite(eigvec) & np.isfinite(phasing_track)
        if sort_metric is None or sort_metric == "spearmanr":
            corr = scipy.stats.spearmanr(phasing_track[mask], eigvec[mask])[0]
        elif sort_metric == "pearsonr":
            corr = scipy.stats.pearsonr(phasing_track[mask], eigvec[mask])[0]
        elif sort_metric == "var_explained":
            corr = scipy.stats.pearsonr(phasing_track[mask], eigvec[mask])[0]
            # multiply by the sign to keep the phasing information
            corr = np.sign(corr) * corr * corr * np.var(eigvec[mask])
        elif sort_metric == "MAD_explained":
            corr = numutils.COMED(phasing_track[mask], eigvec[mask]) * numutils.MAD(
                eigvec[mask]
            )
        else:
            raise ValueError("Unknown sorting metric: {}".format(sort_metric))

        corrs.append(corr)

    # flip eigvecs
    for i in range(len(eigvecs)):
        eigvecs[i] = np.sign(corrs[i]) * eigvecs[i]

    # sort eigvecs
    if sort_metric is not None:
        idx = np.argsort(-np.abs(corrs))
        eigvals, eigvecs = eigvals[idx], eigvecs[idx]

    return eigvals, eigvecs


[docs]def cis_eig( A, n_eigs=3, phasing_track=None, ignore_diags=2, clip_percentile=0, sort_metric=None ): """ Compute compartment eigenvector on a dense cis matrix. Note that the amplitude of compartment eigenvectors is weighted by their corresponding eigenvalue Parameters ---------- A : 2D array balanced dense contact matrix n_eigs : int number of eigenvectors to compute phasing_track : 1D array, optional if provided, eigenvectors are flipped to achieve a positive correlation with `phasing_track`. ignore_diags : int the number of diagonals to ignore clip_percentile : float if >0 and <100, clip pixels with diagonal-normalized values higher than the specified percentile of matrix-wide values. sort_metric : str If provided, re-sort `eigenvecs` and `eigvals` in the order of decreasing correlation between phasing_track and eigenvector, using the specified measure of correlation. Possible values: 'pearsonr' - sort by decreasing Pearson correlation. 'var_explained' - sort by decreasing absolute amount of variation in `eigvecs` explained by `phasing_track` (i.e. R^2 * var(eigvec)) 'MAD_explained' - sort by decreasing absolute amount of Median Absolute Deviation from the median of `eigvecs` explained by `phasing_track` (i.e. COMED(eigvec, phasing_track) * MAD(eigvec)). 'spearmanr' - sort by decreasing Spearman correlation. This option is designed to report the most "biologically" informative eigenvectors first, and prevent eigenvector swapping caused by translocations. In reality, however, sometimes it shows poor performance and may lead to reporting of non-informative eigenvectors. Off by default. Returns ------- eigenvalues, eigenvectors .. note:: ALWAYS check your EVs by eye. The first one occasionally does not reflect the compartment structure, but instead describes chromosomal arms or translocation blowouts. """ A = np.array(A) A[~np.isfinite(A)] = 0 mask = A.sum(axis=0) > 0 if A.shape[0] <= ignore_diags + 3 or mask.sum() <= ignore_diags + 3: return ( np.array([np.nan for i in range(n_eigs)]), np.array([np.ones(A.shape[0]) * np.nan for i in range(n_eigs)]), ) if ignore_diags: for d in range(-ignore_diags + 1, ignore_diags): numutils.set_diag(A, 1.0, d) OE, _, _, _ = numutils.observed_over_expected(A, mask) if clip_percentile and clip_percentile < 100: OE = np.clip(OE, 0, np.percentile(OE[mask, :][:, mask], clip_percentile)) # subtract 1.0 OE -= 1.0 # empty invalid rows, so that get_eig can find them OE[~mask, :] = 0 OE[:, ~mask] = 0 eigvecs, eigvals = numutils.get_eig(OE, n_eigs, mask_zero_rows=True) eigvecs /= np.sqrt(np.nansum(eigvecs ** 2, axis=1))[:, None] eigvecs *= np.sqrt(np.abs(eigvals))[:, None] # Orient and reorder if phasing_track is not None: eigvals, eigvecs = _phase_eigs(eigvals, eigvecs, phasing_track, sort_metric) return eigvals, eigvecs
def _filter_heatmap(A, transmask, perc_top, perc_bottom): # Truncate trans blowouts lim = np.percentile(A[transmask], perc_top) tdata = A[transmask] tdata[tdata > lim] = lim A[transmask] = tdata # Remove bins with poor coverage in trans marg = np.sum(A, axis=0) marg_nz = marg[np.sum(A, axis=0) > 0] min_cutoff = np.percentile(marg_nz, perc_bottom) dropmask = (marg > 0) & (marg < min_cutoff) A[dropmask, :] = 0 A[:, dropmask] = 0 return A def _fake_cis(A, cismask): cismask = cismask.astype(np.uint8) s = np.abs(np.sum(A, axis=0)) <= 1e-10 cismask[:, s] = 2 cismask[s, :] = 2 numutils.fake_cis(A, cismask) return A
[docs]def trans_eig( A, partition, n_eigs=3, perc_top=99.95, perc_bottom=1, phasing_track=None, sort_metric=False, ): """ Compute compartmentalization eigenvectors on trans contact data Parameters ---------- A : 2D array balanced whole genome contact matrix partition : sequence of int bin offset of each contiguous region to treat separately (e.g., chromosomes or chromosome arms) n_eigs : int number of eigenvectors to compute; default = 3 perc_top : float (percentile) filter - clip trans blowout contacts above this cutoff; default = 99.95 perc_bottom : float (percentile) filter - remove bins with trans coverage below this cutoff; default=1 phasing_track : 1D array, optional if provided, eigenvectors are flipped to achieve a positive correlation with `phasing_track`. sort_metric : str If provided, re-sort `eigenvecs` and `eigvals` in the order of decreasing correlation between phasing_track and eigenvector, using the specified measure of correlation. Possible values: 'pearsonr' - sort by decreasing Pearson correlation. 'var_explained' - sort by decreasing absolute amount of variation in `eigvecs` explained by `phasing_track` (i.e. R^2 * var(eigvec)) 'MAD_explained' - sort by decreasing absolute amount of Median Absolute Deviation from the median of `eigvecs` explained by `phasing_track` (i.e. COMED(eigvec, phasing_track) * MAD(eigvec)). 'spearmanr' - sort by decreasing Spearman correlation. This option is designed to report the most "biologically" informative eigenvectors first, and prevent eigenvector swapping caused by translocations. In reality, however, sometimes it shows poor performance and may lead to reporting of non-informative eigenvectors. Off by default. Returns ------- eigenvalues, eigenvectors .. note:: ALWAYS check your EVs by eye. The first one occasionally does not reflect the compartment structure, but instead describes chromosomal arms or translocation blowouts. """ A = np.array(A) if A.shape[0] != A.shape[1]: raise ValueError("A is not symmetric") n_bins = A.shape[0] if not ( partition[0] == 0 and partition[-1] == n_bins and np.all(np.diff(partition) > 0) ): raise ValueError( "Not a valid partition. Must be a monotonic sequence " "from 0 to {}.".format(n_bins) ) # Delete cis data and create trans mask extents = zip(partition[:-1], partition[1:]) part_ids = [] for n, (i0, i1) in enumerate(extents): A[i0:i1, i0:i1] = 0 part_ids.extend([n] * (i1 - i0)) part_ids = np.array(part_ids) is_trans = part_ids[:, None] != part_ids[None, :] # Filter heatmap is_bad_bin = np.nansum(A, axis=0) == 0 is_good_bin = ~is_bad_bin is_valid = np.logical_and.outer(is_good_bin, is_good_bin) A[is_bad_bin, :] = 0 A[:, is_bad_bin] = 0 A = _filter_heatmap(A, is_trans & is_valid, perc_top, perc_bottom) is_bad_bin = np.nansum(A, axis=0) == 0 is_good_bin = ~is_bad_bin is_valid = np.logical_and.outer(is_good_bin, is_good_bin) A[is_bad_bin, :] = 0 A[:, is_bad_bin] = 0 # Fake cis and re-balance A = _fake_cis(A, ~is_trans) A = numutils.iterative_correction_symmetric(A)[0] A = _fake_cis(A, ~is_trans) A = numutils.iterative_correction_symmetric(A)[0] # Compute eig Abar = np.mean(A[is_valid]) # center by scalar mean O = (A - Abar) / Abar O[is_bad_bin, :] = 0 O[:, is_bad_bin] = 0 eigvecs, eigvals = numutils.get_eig(O, n_eigs, mask_zero_rows=True) eigvecs /= np.sqrt(np.nansum(eigvecs ** 2, axis=1))[:, None] eigvecs *= np.sqrt(np.abs(eigvals))[:, None] if phasing_track is not None: eigvals, eigvecs = _phase_eigs(eigvals, eigvecs, phasing_track, sort_metric) return eigvals, eigvecs
[docs]def eigs_cis( clr, phasing_track=None, view_df=None, n_eigs=3, clr_weight_name="weight", ignore_diags=None, clip_percentile=99.9, sort_metric=None, map=map, ): """ Compute compartment eigenvector for a given cooler `clr` in a number of symmetric intra chromosomal regions defined in view_df (cis-regions), or for each chromosome. Note that the amplitude of compartment eigenvectors is weighted by their corresponding eigenvalue. Eigenvectors can be oriented by passing a binned `phasing_track` with the same resolution as the cooler. Parameters ---------- clr : cooler cooler object to fetch data from phasing_track : DataFrame binned track with the same resolution as cooler bins, the fourth column is used to phase the eigenvectors, flipping them to achieve a positive correlation. view_df : iterable or DataFrame, optional if provided, eigenvectors are calculated for the regions of the view only, otherwise chromosome-wide eigenvectors are computed, for chromosomes specified in phasing_track. n_eigs : int number of eigenvectors to compute clr_weight_name : str name of the column with balancing weights to be used. ignore_diags : int, optional the number of diagonals to ignore. Derived from cooler metadata if not specified. clip_percentile : float if >0 and <100, clip pixels with diagonal-normalized values higher than the specified percentile of matrix-wide values. sort_metric : str If provided, re-sort `eigenvecs` and `eigvals` in the order of decreasing correlation between phasing_track and eigenvector, using the specified measure of correlation. Possible values: 'pearsonr' - sort by decreasing Pearson correlation. 'var_explained' - sort by decreasing absolute amount of variation in `eigvecs` explained by `phasing_track` (i.e. R^2 * var(eigvec)) 'MAD_explained' - sort by decreasing absolute amount of Median Absolute Deviation from the median of `eigvecs` explained by `phasing_track` (i.e. COMED(eigvec, phasing_track) * MAD(eigvec)). 'spearmanr' - sort by decreasing Spearman correlation. This option is designed to report the most "biologically" informative eigenvectors first, and prevent eigenvector swapping caused by translocations. In reality, however, sometimes it shows poor performance and may lead to reporting of non-informative eigenvectors. Off by default. map : callable, optional Map functor implementation. Returns ------- eigvals, eigvec_table -> DataFrames with eigenvalues for each region and a table of eigenvectors filled in the `bins` table. .. note:: ALWAYS check your EVs by eye. The first one occasionally does not reflect the compartment structure, but instead describes chromosomal arms or translocation blowouts. Possible mitigations: employ `view_df` (e.g. arms) to avoid issues with chromosomal arms, consider blacklisting regions with translocations during balancing. """ # get chromosomes from cooler, if view_df not specified: if view_df is None: view_df = make_cooler_view(clr) else: # Make sure view_df is a proper viewframe try: _ = is_compatible_viewframe( view_df, clr, check_sorting=True, raise_errors=True, ) except Exception as e: raise ValueError("view_df is not a valid viewframe or incompatible") from e # check if cooler is balanced try: _ = is_cooler_balanced(clr, clr_weight_name, raise_errors=True) except Exception as e: raise ValueError( f"provided cooler is not balanced or {clr_weight_name} is missing" ) from e # ignore diags as in cooler unless specified ignore_diags = ( clr._load_attrs(f"bins/{clr_weight_name}").get("ignore_diags", 2) if ignore_diags is None else ignore_diags ) bins = clr.bins()[:] if phasing_track is not None: phasing_track = align_track_with_cooler( phasing_track, clr, view_df=view_df, clr_weight_name=clr_weight_name, mask_clr_bad_bins=True, drop_track_na=True # this adds check for chromosomes that have all missing values ) # prepare output table for eigen vectors eigvec_table = bioframe.assign_view(bins, view_df).dropna(subset=["view_region"], axis=0) eigvec_table = eigvec_table.loc[:, bins.columns] eigvec_columns = [f"E{i + 1}" for i in range(n_eigs)] for ev_col in eigvec_columns: eigvec_table[ev_col] = np.nan # prepare output table for eigenvalues eigvals_table = view_df.copy() eigval_columns = [f"eigval{i + 1}" for i in range(n_eigs)] for eval_col in eigval_columns: eigvals_table[eval_col] = np.nan def _each(region): """ perform eigen decomposition for a given region assuming safety checks are done outside of this function. Parameters ---------- region: tuple-like tuple of the form (chroms,start,end,*) Returns ------- _region, eigvals, eigvecs -> ndarrays array of eigenvalues and an array eigenvectors """ _region = region[:3] # take only (chrom, start, end) A = clr.matrix(balance=clr_weight_name).fetch(_region) # extract phasing track relevant for the _region if phasing_track is not None: phasing_track_region = bioframe.select(phasing_track, _region) phasing_track_region_values = phasing_track_region["value"].values else: phasing_track_region_values = None eigvals, eigvecs = cis_eig( A, n_eigs=n_eigs, ignore_diags=ignore_diags, phasing_track=phasing_track_region_values, clip_percentile=clip_percentile, sort_metric=sort_metric, ) return _region, eigvals, eigvecs # eigendecompose matrix per region (can be multiprocessed) # output assumes that the order of results matches regions results = map(_each, view_df.values) # go through eigendecomposition results and fill in # output table eigvec_table and eigvals_table for _region, _eigvals, _eigvecs in results: idx = bioframe.select(eigvec_table, _region).index eigvec_table.loc[idx, eigvec_columns] = _eigvecs.T idx = bioframe.select(eigvals_table, _region).index eigvals_table.loc[idx, eigval_columns] = _eigvals return eigvals_table, eigvec_table
[docs]def eigs_trans( clr, phasing_track=None, n_eigs=3, partition=None, clr_weight_name="weight", sort_metric=None, **kwargs, ): # check if cooler is balanced try: _ = is_cooler_balanced(clr, clr_weight_name, raise_errors=True) except Exception as e: raise ValueError( f"provided cooler is not balanced or {clr_weight_name} is missing" ) from e # TODO: implement usage of view for eigs_trans view_df = None if view_df is None: view_df = make_cooler_view(clr) else: raise NotImplementedError("views are not currently implemented for eigs_trans") if partition is None: partition = np.r_[ [clr.offset(chrom) for chrom in clr.chromnames], len(clr.bins()) ] lo = partition[0] hi = partition[-1] A = clr.matrix(balance=clr_weight_name)[lo:hi, lo:hi] bins = clr.bins()[lo:hi] if phasing_track is not None: phasing_track = align_track_with_cooler( phasing_track, clr, view_df=view_df, clr_weight_name=clr_weight_name, mask_clr_bad_bins=True, drop_track_na=True # this adds check for chromosomes that have all missing values ) phasing_track_values = phasing_track["value"].values[lo:hi] else: phasing_track_values = None eigvals, eigvecs = trans_eig( A, partition, n_eigs=n_eigs, phasing_track=phasing_track_values, sort_metric=sort_metric, **kwargs, ) eigvec_table = bins.copy() for i, eigvec in enumerate(eigvecs): eigvec_table[f"E{i + 1}"] = eigvec eigvals = pd.DataFrame( data=np.atleast_2d(eigvals), columns=[f"eigval{i + 1}" for i in range(n_eigs)], ) return eigvals, eigvec_table