Source code for cooltools.api.insulation

import re
import logging

logging.basicConfig(level=logging.INFO)

import warnings
from functools import partial

import numpy as np
import pandas as pd
import cooler
from skimage.filters import threshold_li, threshold_otsu

from ..lib._query import CSRSelector
from ..lib import peaks, numutils

from ..lib.checks import is_compatible_viewframe, is_cooler_balanced
from ..lib.common import make_cooler_view, pool_decorator


[docs]def get_n_pixels(bad_bin_mask, window=10, ignore_diags=2): """ Calculate the number of "good" pixels in a diamond at each bin. """ N = len(bad_bin_mask) n_pixels = np.zeros(N) loc_bad_bin_mask = np.zeros(N, dtype=bool) for i_shift in range(0, window): for j_shift in range(0, window): if i_shift + j_shift < ignore_diags: continue loc_bad_bin_mask[:] = False if i_shift == 0: loc_bad_bin_mask |= bad_bin_mask else: loc_bad_bin_mask[i_shift:] |= bad_bin_mask[:-i_shift] if j_shift == 0: loc_bad_bin_mask |= bad_bin_mask else: loc_bad_bin_mask[:-j_shift] |= bad_bin_mask[j_shift:] n_pixels[i_shift : (-j_shift if j_shift else None)] += ( 1 - loc_bad_bin_mask[i_shift : (-j_shift if j_shift else None)] ) return n_pixels
[docs]def insul_diamond( pixel_query, bins, window=10, ignore_diags=2, norm_by_median=True, clr_weight_name="weight", ): """ Calculates the insulation score of a Hi-C interaction matrix. Parameters ---------- pixel_query : RangeQuery object <TODO:update description> A table of Hi-C interactions. Must follow the Cooler columnar format: bin1_id, bin2_id, count, balanced (optional)). bins : pandas.DataFrame A table of bins, is used to determine the span of the matrix and the locations of bad bins. window : int The width (in bins) of the diamond window to calculate the insulation score. ignore_diags : int If > 0, the interactions at separations < `ignore_diags` are ignored when calculating the insulation score. Typically, a few first diagonals of the Hi-C map should be ignored due to contamination with Hi-C artifacts. norm_by_median : bool If True, normalize the insulation score by its NaN-median. clr_weight_name : str or None Name of balancing weight column from the cooler to use. Using raw unbalanced data is not supported for insulation. """ lo_bin_id = bins.index.min() hi_bin_id = bins.index.max() + 1 N = hi_bin_id - lo_bin_id sum_counts = np.zeros(N) sum_balanced = np.zeros(N) if clr_weight_name is None: # define n_pixels n_pixels = get_n_pixels( np.repeat(False, len(bins)), window=window, ignore_diags=ignore_diags ) else: # calculate n_pixels n_pixels = get_n_pixels( bins[clr_weight_name].isnull().values, window=window, ignore_diags=ignore_diags, ) # define transform - balanced and raw ('count') for now weight1 = clr_weight_name + "1" weight2 = clr_weight_name + "2" transform = lambda p: p["count"] * p[weight1] * p[weight2] for chunk_dict in pixel_query.read_chunked(): chunk = pd.DataFrame(chunk_dict, columns=["bin1_id", "bin2_id", "count"]) diag_pixels = chunk[chunk.bin2_id - chunk.bin1_id <= (window - 1) * 2] if clr_weight_name: diag_pixels = cooler.annotate(diag_pixels, bins[[clr_weight_name]]) diag_pixels["balanced"] = transform(diag_pixels) valid_pixel_mask = ~diag_pixels["balanced"].isnull().values i = diag_pixels.bin1_id.values - lo_bin_id j = diag_pixels.bin2_id.values - lo_bin_id for i_shift in range(0, window): for j_shift in range(0, window): if i_shift + j_shift < ignore_diags: continue mask = ( (i + i_shift == j - j_shift) & (i + i_shift < N) & (j - j_shift >= 0) ) sum_counts += np.bincount( i[mask] + i_shift, diag_pixels["count"].values[mask], minlength=N ) if clr_weight_name: sum_balanced += np.bincount( i[mask & valid_pixel_mask] + i_shift, diag_pixels["balanced"].values[mask & valid_pixel_mask], minlength=N, ) with warnings.catch_warnings(): warnings.simplefilter("ignore") if clr_weight_name: score = sum_balanced / n_pixels else: score = sum_counts / n_pixels if norm_by_median: score /= np.nanmedian(score) return score, n_pixels, sum_balanced, sum_counts
[docs]@pool_decorator def calculate_insulation_score( clr, window_bp, view_df=None, ignore_diags=None, min_dist_bad_bin=0, is_bad_bin_key="is_bad_bin", append_raw_scores=False, chunksize=20000000, clr_weight_name="weight", verbose=False, nproc=1, map_functor=map, ): """Calculate the diamond insulation scores for all bins in a cooler. Parameters ---------- clr : cooler.Cooler A cooler with balanced Hi-C data. window_bp : int or list of integers The size of the sliding diamond window used to calculate the insulation score. If a list is provided, then a insulation score if calculated for each value of window_bp. view_df : bioframe.viewframe or None Viewframe for independent calculation of insulation scores for regions ignore_diags : int | None The number of diagonals to ignore. If None, equals the number of diagonals ignored during IC balancing. min_dist_bad_bin : int The minimal allowed distance to a bad bin to report insulation score. Fills bins that have a bad bin closer than this distance by nans. is_bad_bin_key : str Name of the output column to store bad bins append_raw_scores : bool If True, append columns with raw scores (sum_counts, sum_balanced, n_pixels) to the output table. clr_weight_name : str or None Name of the column in the bin table with weight. Using unbalanced data with `None` will avoid masking "bad" pixels. verbose : bool If True, report real-time progress. nproc : int, optional How many processes to use for calculation. Ignored if map_functor is passed. map_functor : callable, optional Map function to dispatch the matrix chunks to workers. If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; If nproc=1 this defaults the builtin map. Returns ------- ins_table : pandas.DataFrame A table containing the insulation scores of the genomic bins """ if view_df is None: view_df = make_cooler_view(clr) else: # Make sure view_df is a proper viewframe try: _ = is_compatible_viewframe( view_df, clr, check_sorting=True, raise_errors=True, ) except Exception as e: raise ValueError("view_df is not a valid viewframe or incompatible") from e # check if cooler is balanced if clr_weight_name: try: _ = is_cooler_balanced(clr, clr_weight_name, raise_errors=True) except Exception as e: raise ValueError( f"provided cooler is not balanced or {clr_weight_name} is missing" ) from e bin_size = clr.info["bin-size"] # check if ignore_diags is valid if ignore_diags is None: try: ignore_diags = clr._load_attrs( clr.root.rstrip("/") + f"/bins/{clr_weight_name}" )["ignore_diags"] except: raise ValueError( f"ignore_diags not provided, and not found in cooler balancing weights {clr_weight_name}" ) elif isinstance(ignore_diags, int): pass # keep it as is else: raise ValueError(f"ignore_diags must be int or None, got {ignore_diags}") if np.isscalar(window_bp): window_bp = [window_bp] window_bp = np.array(window_bp, dtype=int) bad_win_sizes = window_bp % bin_size != 0 if np.any(bad_win_sizes): raise ValueError( f"The window sizes {window_bp[bad_win_sizes]} has to be a multiple of the bin size {bin_size}" ) # Calculate insulation score for each region separately. # Using try-clause to close mp.Pool properly # Apply get_region_insulation: job = partial( _get_region_insulation, clr, is_bad_bin_key, clr_weight_name, chunksize, window_bp, min_dist_bad_bin, ignore_diags, append_raw_scores, verbose, ) ins_region_tables = map_functor(job, view_df[["chrom", "start", "end", "name"]].values) ins_table = pd.concat(ins_region_tables) return ins_table
def _get_region_insulation( clr, is_bad_bin_key, clr_weight_name, chunksize, window_bp, min_dist_bad_bin, ignore_diags, append_raw_scores, verbose, region, ): """ Auxilary function to make calculate_insulation_score parallel. """ # XXX -- Use a delayed query executor nbins = len(clr.bins()) selector = CSRSelector( clr.open("r"), shape=(nbins, nbins), field="count", chunksize=chunksize ) # Convert window sizes to bins: bin_size = clr.info["bin-size"] window_bins = window_bp // bin_size # Parse region and set up insulation table for the region: chrom, start, end, name = region region = [chrom, start, end] region_bins = clr.bins().fetch(region) ins_region = region_bins[["chrom", "start", "end"]].copy() ins_region.loc[:, "region"] = name ins_region[is_bad_bin_key] = ( region_bins[clr_weight_name].isnull() if clr_weight_name else False ) if verbose: logging.info(f"Processing region {name}") if min_dist_bad_bin: ins_region = ins_region.assign( dist_bad_bin=numutils.dist_to_mask(ins_region[is_bad_bin_key]) ) # XXX --- Create a delayed selection c0, c1 = clr.extent(region) region_query = selector[c0:c1, c0:c1] for j, win_bin in enumerate(window_bins): with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) # XXX -- updated insul_diamond ins_track, n_pixels, sum_balanced, sum_counts = insul_diamond( region_query, region_bins, window=win_bin, ignore_diags=ignore_diags, clr_weight_name=clr_weight_name, ) ins_track[ins_track == 0] = np.nan ins_track = np.log2(ins_track) ins_track[~np.isfinite(ins_track)] = np.nan ins_region[f"log2_insulation_score_{window_bp[j]}"] = ins_track ins_region[f"n_valid_pixels_{window_bp[j]}"] = n_pixels if min_dist_bad_bin: mask_bad = ins_region.dist_bad_bin.values < min_dist_bad_bin ins_region.loc[mask_bad, f"log2_insulation_score_{window_bp[j]}"] = np.nan if append_raw_scores: ins_region[f"sum_counts_{window_bp[j]}"] = sum_counts ins_region[f"sum_balanced_{window_bp[j]}"] = sum_balanced return ins_region
[docs]def find_boundaries( ins_table, min_frac_valid_pixels=0.66, min_dist_bad_bin=0, log2_ins_key="log2_insulation_score_{WINDOW}", n_valid_pixels_key="n_valid_pixels_{WINDOW}", is_bad_bin_key="is_bad_bin", ): """Call insulating boundaries. Find all local minima of the log2(insulation score) and calculate their chromosome-wide topographic prominence. Parameters ---------- ins_table : pandas.DataFrame A bin table with columns containing log2(insulation score), annotation of regions (required), the number of valid pixels per diamond and (optionally) the mask of bad bins. Normally, this should be an output of calculate_insulation_score. view_df : bioframe.viewframe or None Viewframe for independent boundary calls for regions min_frac_valid_pixels : float The minimal fraction of valid pixels in a diamond to be used in boundary picking and prominence calculation. min_dist_bad_bin : int The minimal allowed distance to a bad bin to be used in boundary picking. Ignore bins that have a bad bin closer than this distance. log2_ins_key, n_valid_pixels_key : str The names of the columns containing log2_insulation_score and the number of valid pixels per diamond. When a template containing `{WINDOW}` is provided, the calculation is repeated for all pairs of columns matching the template. Returns ------- ins_table : pandas.DataFrame A bin table with appended columns with boundary prominences. """ if min_dist_bad_bin: ins_table = pd.concat( [ df.assign(dist_bad_bin=numutils.dist_to_mask(df[is_bad_bin_key])) for region, df in ins_table.groupby("region") ] ) if "{WINDOW}" in log2_ins_key: windows = set() for col in ins_table.columns: m = re.match(log2_ins_key.format(WINDOW=r"(\d+)"), col) if m: windows.add(int(m.groups()[0])) else: windows = set([None]) min_valid_pixels = { win: ins_table[n_valid_pixels_key.format(WINDOW=win)].max() * min_frac_valid_pixels for win in windows } dfs = [] index_name = ins_table.index.name # Store the name of the index and soring order sorting_order = ins_table.index.values ins_table.index.name = "sorting_index" ins_table.reset_index(drop=False, inplace=True) for region, df in ins_table.groupby("region"): df = df.sort_values(["start"]) # Force sorting by the bin start coordinate for win in windows: mask = ( df[n_valid_pixels_key.format(WINDOW=win)].values >= min_valid_pixels[win] ) if min_dist_bad_bin: mask &= df.dist_bad_bin.values >= min_dist_bad_bin ins_track = df[log2_ins_key.format(WINDOW=win)].values[mask] poss, proms = peaks.find_peak_prominence(-ins_track) ins_prom_track = np.zeros_like(ins_track) * np.nan ins_prom_track[poss] = proms if win is not None: bs_key = f"boundary_strength_{win}" else: bs_key = "boundary_strength" df[bs_key] = np.nan df.loc[mask, bs_key] = ins_prom_track dfs.append(df) df = pd.concat(dfs) df = df.set_index("sorting_index") # Restore original sorting order and name df.index.name = index_name df = df.loc[sorting_order, :] return df
def _insul_diamond_dense(mat, window=10, ignore_diags=2, norm_by_median=True): """ Calculates the insulation score of a Hi-C interaction matrix. Parameters ---------- mat : numpy.array A dense square matrix of Hi-C interaction frequencies. May contain nans, e.g. in rows/columns excluded from the analysis. window : int The width of the window to calculate the insulation score. ignore_diags : int If > 0, the interactions at separations < `ignore_diags` are ignored when calculating the insulation score. Typically, a few first diagonals of the Hi-C map should be ignored due to contamination with Hi-C artifacts. norm_by_median : bool If True, normalize the insulation score by its NaN-median. Returns ------- score : ndarray an array with normalized insulation scores for provided matrix """ if ignore_diags: mat = mat.copy() for i in range(-ignore_diags + 1, ignore_diags): numutils.set_diag(mat, np.nan, i) with warnings.catch_warnings(): warnings.simplefilter("ignore") N = mat.shape[0] score = np.nan * np.ones(N) for i in range(0, N): lo = max(0, i + 1 - window) hi = min(i + window, N) # nanmean of interactions to reduce the effect of bad bins score[i] = np.nanmean(mat[lo : i + 1, i:hi]) if norm_by_median: score /= np.nanmedian(score) return score def _find_insulating_boundaries_dense( clr, window_bp=100000, view_df=None, clr_weight_name="weight", min_dist_bad_bin=2, ignore_diags=None, ): """Calculate the diamond insulation scores and call insulating boundaries. Parameters ---------- clr : cooler.Cooler A cooler with balanced Hi-C data. Balancing weights are required for the detection of bad_bins. window_bp : int The size of the sliding diamond window used to calculate the insulation score. view_df : bioframe.viewframe or None Viewframe for independent calculation of insulation scores for regions clr_weight_name : str Name of the column in bin table that stores the balancing weights. min_dist_bad_bin : int The minimal allowed distance to a bad bin. Do not calculate insulation scores for bins having a bad bin closer than this distance. ignore_diags : int The number of diagonals to ignore. If None, equals the number of diagonals ignored during IC balancing. Returns ------- ins_table : pandas.DataFrame A table containing the insulation scores of the genomic bins and the insulating boundary strengths. """ if view_df is None: view_df = make_cooler_view(clr) else: # Make sure view_df is a proper viewframe try: _ = is_compatible_viewframe( view_df, clr, check_sorting=True, raise_errors=True, ) except Exception as e: raise ValueError("view_df is not a valid viewframe or incompatible") from e bin_size = clr.info["bin-size"] # check if cooler is balanced if clr_weight_name: try: _ = is_cooler_balanced(clr, clr_weight_name, raise_errors=True) except Exception as e: raise ValueError( f"provided cooler is not balanced or {clr_weight_name} is missing" ) from e # check if ignore_diags is valid if ignore_diags is None: ignore_diags = clr._load_attrs( clr.root.rstrip("/") + f"/bins/{clr_weight_name}" )["ignore_diags"] elif isinstance(ignore_diags, int): pass # keep it as is else: raise ValueError(f"provided ignore_diags {ignore_diags} is not int or None") window_bins = window_bp // bin_size if window_bp % bin_size != 0: raise ValueError( f"The window size ({window_bp}) has to be a multiple of the bin size {bin_size}" ) ins_region_tables = [] for chrom, start, end, name in view_df[["chrom", "start", "end", "name"]].values: region = [chrom, start, end] ins_region = clr.bins().fetch(region)[["chrom", "start", "end"]] is_bad_bin = np.isnan(clr.bins().fetch(region)[clr_weight_name].values) # extract dense Hi-C heatmap for a given "region" m = clr.matrix(balance=clr_weight_name).fetch(region) with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) ins_track = _insul_diamond_dense(m, window_bins, ignore_diags) ins_track[ins_track == 0] = np.nan ins_track = np.log2(ins_track) bad_bin_neighbor = np.zeros_like(is_bad_bin) for i in range(0, min_dist_bad_bin): if i == 0: bad_bin_neighbor = bad_bin_neighbor | is_bad_bin else: bad_bin_neighbor = bad_bin_neighbor | np.r_[[True] * i, is_bad_bin[:-i]] bad_bin_neighbor = bad_bin_neighbor | np.r_[is_bad_bin[i:], [True] * i] ins_track[bad_bin_neighbor] = np.nan ins_region["bad_bin_masked"] = bad_bin_neighbor ins_track[~np.isfinite(ins_track)] = np.nan ins_region[f"log2_insulation_score_{window_bp}"] = ins_track poss, proms = peaks.find_peak_prominence(-ins_track) ins_prom_track = np.zeros_like(ins_track) * np.nan ins_prom_track[poss] = proms ins_region[f"boundary_strength_{window_bp}"] = ins_prom_track ins_region[f"boundary_strength_{window_bp}"] = ins_prom_track ins_region_tables.append(ins_region) ins_table = pd.concat(ins_region_tables) return ins_table
[docs]def insulation( clr, window_bp, view_df=None, ignore_diags=None, clr_weight_name="weight", min_frac_valid_pixels=0.66, min_dist_bad_bin=0, threshold="Li", append_raw_scores=False, chunksize=20000000, verbose=False, nproc=1, ): """Find insulating boundaries in a contact map via the diamond insulation score. For a given cooler, this function (a) calculates the diamond insulation score track, (b) detects all insulating boundaries, and (c) removes weak boundaries via an automated thresholding algorithm. Parameters ---------- clr : cooler.Cooler A cooler with balanced Hi-C data. window_bp : int or list of integers The size of the sliding diamond window used to calculate the insulation score. If a list is provided, then a insulation score if done for each value of window_bp. view_df : bioframe.viewframe or None Viewframe for independent calculation of insulation scores for regions ignore_diags : int | None The number of diagonals to ignore. If None, equals the number of diagonals ignored during IC balancing. clr_weight_name : str Name of the column in the bin table with weight min_frac_valid_pixels : float The minimal fraction of valid pixels in a diamond to be used in boundary picking and prominence calculation. min_dist_bad_bin : int The minimal allowed distance to a bad bin to report insulation score. Fills bins that have a bad bin closer than this distance by nans. threshold : "Li", "Otsu" or float Rule used to threshold the histogram of boundary strengths to exclude weak boundaries. "Li" or "Otsu" use corresponding methods from skimage.thresholding. Providing a float value will filter by a fixed threshold append_raw_scores : bool If True, append columns with raw scores (sum_counts, sum_balanced, n_pixels) to the output table. verbose : bool If True, report real-time progress. nproc : int, optional How many processes to use for calculation Returns ------- ins_table : pandas.DataFrame A table containing the insulation scores of the genomic bins """ # Create view: if view_df is None: # full chromosomes: view_df = make_cooler_view(clr) else: # Make sure view_df is a proper viewframe try: _ = is_compatible_viewframe( view_df, clr, # must be sorted for pairwise regions combinations # to be in the upper right of the heatmap check_sorting=True, raise_errors=True, ) except Exception as e: raise ValueError("view_df is not a valid viewframe or incompatible") from e if threshold == "Li": thresholding_func = lambda x: x >= threshold_li(x) elif threshold == "Otsu": thresholding_func = lambda x: x >= threshold_otsu(x) else: try: thr = float(threshold) thresholding_func = lambda x: x >= thr except ValueError: raise ValueError( "Insulating boundary strength threshold can be Li, Otsu or a float" ) # Calculate insulation score: ins_table = calculate_insulation_score( clr, view_df=view_df, window_bp=window_bp, ignore_diags=ignore_diags, min_dist_bad_bin=min_dist_bad_bin, append_raw_scores=append_raw_scores, clr_weight_name=clr_weight_name, chunksize=chunksize, verbose=verbose, nproc=nproc, ) # Find boundaries: ins_table = find_boundaries( ins_table, min_frac_valid_pixels=min_frac_valid_pixels, min_dist_bad_bin=min_dist_bad_bin, ) for win in window_bp: strong_boundaries = thresholding_func( ins_table[f"boundary_strength_{win}"].values ) ins_table[f"is_boundary_{win}"] = strong_boundaries return ins_table