Source code for cooltools.api.coverage

import numpy as np
import cooler.parallel
from ..lib.checks import is_cooler_balanced
from ..lib.common import pool_decorator



def _apply_balancing(chunk, bias, balanced_column_name='balanced'):
    """
    Multiply raw pixel counts by the balancing bias and return a modified
    chunk with an additional column named balanced_column_name
    """
    pixels = chunk["pixels"]
    chunk['pixels'][balanced_column_name] = bias[pixels["bin1_id"]] * bias[pixels["bin2_id"]] * pixels["count"]
    # returning modified chunks with an additional column
    return chunk


def _zero_diags(chunk, n_diags):
    if n_diags > 0:
        mask = np.abs(chunk["pixels"]["bin1_id"] - chunk["pixels"]["bin2_id"]) < n_diags
        chunk["pixels"]["count"][mask] = 0

    return chunk


def _get_chunk_coverage(chunk, pixel_weight_key="count"):
    """
    Compute cis and total coverages of a cooler chunk.
    Every interaction is contributing to the "coverage" twice:
    at its row coordinate bin1_id, and at its column coordinate bin2_id

    Parameters
    ----------
    chunk : dict of dict/pd.DataFrame
        A cooler chunk produced by the cooler split-apply-combine pipeline.
    pixel_weight_key: str
        The key of a pixel chunk to retrieve pixel weights.

    Returns
    -------
    covs : np.array 2 x n_bins
        A numpy array with cis (the first row) and total (the 4nd) coverages.
    """

    bins = chunk["bins"]
    pixels = chunk["pixels"]
    n_bins = len(bins["chrom"])
    covs = np.zeros((2, n_bins))
    pixel_weights = pixels[pixel_weight_key]

    cis_mask = bins["chrom"][pixels["bin1_id"]] == bins["chrom"][pixels["bin2_id"]]
    covs[0] += np.bincount(
        pixels["bin1_id"], weights=pixel_weights * cis_mask, minlength=n_bins
    )
    covs[0] += np.bincount(
        pixels["bin2_id"], weights=pixel_weights * cis_mask, minlength=n_bins
    )

    covs[1] += np.bincount(pixels["bin1_id"], weights=pixel_weights, minlength=n_bins)
    covs[1] += np.bincount(pixels["bin2_id"], weights=pixel_weights, minlength=n_bins)

    return covs

[docs]@pool_decorator def coverage( clr, ignore_diags=None, chunksize=int(1e7), use_lock=False, clr_weight_name=None, store=False, store_prefix="cov", nproc=1, map_functor=map, ): """ Calculate the sums of cis and genome-wide contacts (aka coverage aka marginals) for a sparse Hi-C contact map in Cooler HDF5 format. Note that for raw coverage (i.e. clr_weight_name=None) the sum(tot_cov) from this function is two times the number of reads contributing to the cooler, as each side contributes to the coverage. Parameters ---------- clr : cooler.Cooler Cooler object ignore_diags : int, optional Drop elements occurring on the first ``ignore_diags`` diagonals of the matrix (including the main diagonal). If None, equals the number of diagonals ignored during IC balancing. chunksize : int, optional Split the contact matrix pixel records into equally sized chunks to save memory and/or parallelize. Default is 10^7 clr_weight_name : str Name of the weight column. Specify to calculate coverage of balanced cooler. store : bool, optional If True, store the results in the input cooler file when finished. If clr_weight_name=None, also stores total cis counts in the cooler info. Default is False. store_prefix : str, optional Name prefix of the columns of the bin table to save cis and total coverages. Will add suffixes _cis and _tot, as well as _raw in the default case or _clr_weight_name if specified. nproc : int, optional How many processes to use for calculation. Ignored if map_functor is passed. map_functor : callable, optional Map function to dispatch the matrix chunks to workers. If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; If nproc=1 this defaults the builtin map. Returns ------- cis_cov : 1D array, whose shape is the number of bins in ``h5``. Vector of bin sums in cis. tot_cov : 1D array, whose shape is the number of bins in ``h5``. Vector of bin sums. """ try: ignore_diags = ( ignore_diags if ignore_diags is not None else clr._load_attrs(clr.root.rstrip("/") + "/bins/weight")["ignore_diags"] ) except: raise ValueError( "Please, specify ignore_diags and/or IC balance this cooler! Cannot access the value used in IC balancing. " ) if clr_weight_name is None: # summing raw pixel counts pixel_weight_key = "count" elif is_cooler_balanced(clr, clr_weight_name): # initialize balancing bias and masking bias = clr.bins()[clr_weight_name][:] bias_na_mask = np.isnan(bias) # remember masked bins bias = np.nan_to_num(bias) # summing balanced pixels pixel_weight_key = "balanced" else: raise ValueError( "cooler is not balanced, or" f"balancing weight {clr_weight_name} is not available in the cooler." ) chunks = cooler.parallel.split(clr, chunksize=chunksize, map=map_functor, use_lock=use_lock) if ignore_diags: chunks = chunks.pipe(_zero_diags, n_diags=ignore_diags) if clr_weight_name is not None: chunks = chunks.pipe(_apply_balancing, bias=bias, balanced_column_name=pixel_weight_key) n_bins = clr.info["nbins"] covs = chunks.pipe(_get_chunk_coverage, pixel_weight_key=pixel_weight_key).reduce(np.add, np.zeros(n_bins)) if clr_weight_name is None: covs = covs.astype(int) store_names = [f"{store_prefix}_cis_raw", f"{store_prefix}_tot_raw" ] else: covs[:, bias_na_mask] = np.nan store_names = [f"{store_prefix}_cis_{clr_weight_name}", f"{store_prefix}_tot_{clr_weight_name}" ] if store: with clr.open("r+") as grp: dtype = int if clr_weight_name is None else float for store_name, cov_arr in zip(store_names, covs): if store_name in grp["bins"]: del grp["bins"][store_name] h5opts = dict(compression="gzip", compression_opts=6) grp["bins"].create_dataset( store_name, data=cov_arr, **h5opts, dtype=dtype ) if clr_weight_name is None: grp.attrs.create("cis", np.sum(covs[0]) // 2, dtype=int) return covs