Source code for cooltools.api.directionality

import warnings
import numpy as np
import pandas as pd

def _dirscore(
    pixels, bins, window=10, ignore_diags=2, balanced=True, signed_chi2=False
):
    lo_bin_id = bins.index.min()
    hi_bin_id = bins.index.max() + 1
    N = hi_bin_id - lo_bin_id

    bad_bin_mask = (
        bins["weight"].isnull().values if balanced else np.zeros(N, dtype=bool)
    )

    diag_pixels = pixels[pixels["bin2_id"] - pixels["bin1_id"] <= (window - 1) * 2]
    if balanced:
        diag_pixels = diag_pixels[~diag_pixels["balanced"].isnull()]

    i = diag_pixels["bin1_id"].values - lo_bin_id
    j = diag_pixels["bin2_id"].values - lo_bin_id
    val = diag_pixels["balanced"].values if balanced else diag_pixels["count"].values

    sum_pixels_left = np.zeros(N)
    n_pixels_left = np.zeros(N)
    for i_shift in range(0, window):
        if i_shift < ignore_diags:
            continue

        mask = (i + i_shift == j) & (i + i_shift < N) & (j >= 0)
        sum_pixels_left += np.bincount(i[mask] + i_shift, val[mask], minlength=N)

        loc_bad_bin_mask = np.zeros(N, dtype=bool)
        if i_shift == 0:
            loc_bad_bin_mask |= bad_bin_mask
        else:
            loc_bad_bin_mask[i_shift:] |= bad_bin_mask[:-i_shift]
            loc_bad_bin_mask |= bad_bin_mask
        n_pixels_left[i_shift:] += 1 - loc_bad_bin_mask[i_shift:]

    sum_pixels_right = np.zeros(N)
    n_pixels_right = np.zeros(N)
    for j_shift in range(0, window):
        if j_shift < ignore_diags:
            continue

        mask = (i == j - j_shift) & (i < N) & (j - j_shift >= 0)

        sum_pixels_right += np.bincount(i[mask], val[mask], minlength=N)

        loc_bad_bin_mask = np.zeros(N, dtype=bool)
        loc_bad_bin_mask |= bad_bin_mask
        if j_shift == 0:
            loc_bad_bin_mask |= bad_bin_mask
        else:
            loc_bad_bin_mask[:-j_shift] |= bad_bin_mask[j_shift:]

        n_pixels_right[: (-j_shift if j_shift else None)] += (
            1 - loc_bad_bin_mask[: (-j_shift if j_shift else None)]
        )

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")

        a = sum_pixels_left
        b = sum_pixels_right
        if signed_chi2:
            e = (a + b) / 2.0
            score = np.sign(b - a) * ((a - e) ** 2 + (b - e) ** 2) / e
        else:
            score = (b - a) / (a + b)

    return score


def _dirscore_dense(A, window=10, signed_chi2=False):
    N = A.shape[0]
    di = np.zeros(N)
    for i in range(0, N):
        lo = max(0, i - window)
        hi = min((i + window) + 1, N)
        b, a = np.nansum(A[i, i:hi]), np.nansum(A[i, lo : i + 1])
        if signed_chi2:
            e = (a + b) / 2.0
            if e:
                di[i] = np.sign(b - a) * ((a - e) ** 2 + (b - e) ** 2) / e
        else:
            di[i] = (b - a) / (a + b)
    mask = np.nansum(A, axis=0) == 0
    di[mask] = np.nan
    return di


[docs]def directionality( clr, window_bp=100000, balance="weight", min_dist_bad_bin=2, ignore_diags=None, chromosomes=None, ): """Calculate the diamond insulation scores and call insulating boundaries. Parameters ---------- clr : cooler.Cooler A cooler with balanced Hi-C data. window_bp : int The size of the sliding diamond window used to calculate the insulation score. min_dist_bad_bin : int The minimal allowed distance to a bad bin. Do not calculate insulation scores for bins having a bad bin closer than this distance. ignore_diags : int The number of diagonals to ignore. If None, equals the number of diagonals ignored during IC balancing. Returns ------- ins_table : pandas.DataFrame A table containing the insulation scores of the genomic bins and the insulating boundary strengths. """ if chromosomes is None: chromosomes = clr.chromnames bin_size = clr.info["bin-size"] ignore_diags = ( ignore_diags if ignore_diags is not None else clr._load_attrs(clr.root.rstrip("/") + "/bins/weight")["ignore_diags"] ) window_bins = window_bp // bin_size if window_bp % bin_size != 0: raise Exception( "The window size ({}) has to be a multiple of the bin size {}".format( window_bp, bin_size ) ) dir_chrom_tables = [] for chrom in chromosomes: chrom_bins = clr.bins().fetch(chrom) chrom_pixels = clr.matrix(as_pixels=True, balance=balance).fetch(chrom) # mask neighbors of bad bins is_bad_bin = np.isnan(chrom_bins["weight"].values) bad_bin_neighbor = np.zeros_like(is_bad_bin) for i in range(0, min_dist_bad_bin): if i == 0: bad_bin_neighbor = bad_bin_neighbor | is_bad_bin else: bad_bin_neighbor = bad_bin_neighbor | np.r_[[True] * i, is_bad_bin[:-i]] bad_bin_neighbor = bad_bin_neighbor | np.r_[is_bad_bin[i:], [True] * i] dir_chrom = chrom_bins[["chrom", "start", "end"]].copy() dir_chrom["bad_bin_masked"] = bad_bin_neighbor with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) dir_track = _dirscore( chrom_pixels, chrom_bins, window=window_bins, ignore_diags=ignore_diags ) dir_track[bad_bin_neighbor] = np.nan dir_track[~np.isfinite(dir_track)] = np.nan dir_chrom["directionality_ratio_{}".format(window_bp)] = dir_track dir_track = _dirscore( chrom_pixels, chrom_bins, window=window_bins, ignore_diags=ignore_diags, signed_chi2=True, ) dir_track[bad_bin_neighbor] = np.nan dir_track[~np.isfinite(dir_track)] = np.nan dir_chrom["directionality_index_{}".format(window_bp)] = dir_track dir_chrom_tables.append(dir_chrom) dir_table = pd.concat(dir_chrom_tables) return dir_table