Source code for cooltools.lib.common

import warnings
import numpy as np
import pandas as pd
import bioframe
from multiprocess import Pool
from functools import wraps
import logging

[docs]def assign_view_paired( features, view_df, cols_paired=["chrom1", "start1", "end1", "chrom2", "start2", "end2"], cols_view=["chrom", "start", "end"], features_view_cols=["region1", "region2"], view_name_col="name", drop_unassigned=False, ): """Assign region names from the view to each feature Assigns a regular 1D view independently to each side of a bedpe-style dataframe. Will add two columns with region names (`features_view_cols`) Parameters ---------- features : pd.DataFrame bedpe-style dataframe view_df : pandas.DataFrame ViewFrame specifying region start and ends for assignment. Attempts to convert dictionary and pd.Series formats to viewFrames. cols_paired : list of str The names of columns containing the chromosome, start and end of the genomic intervals. The default values are `"chrom1", "start1", "end1", "chrom2", "start2", "end2"`. cols_view : list of str The names of columns containing the chromosome, start and end of the genomic intervals in the view. The default values are `"chrom", "start", "end"`. features_view_cols : list of str Names of the columns where to save the assigned region names view_name_col : str Column of ``view_df`` with region names. Default "name". drop_unassigned : bool If True, drop intervals in df that do not overlap a region in the view. Default False. """ features = features.copy() features.reset_index(inplace=True, drop=True) cols_left = cols_paired[:3] cols_right = cols_paired[3:] bioframe.core.checks.is_bedframe(features, raise_errors=True, cols=cols_left) bioframe.core.checks.is_bedframe(features, raise_errors=True, cols=cols_right) view_df = bioframe.core.construction.make_viewframe( view_df, view_name_col=view_name_col, cols=cols_view ) features = bioframe.assign_view( features, view_df, drop_unassigned=drop_unassigned, df_view_col=features_view_cols[0], view_name_col=view_name_col, cols=cols_left, cols_view=cols_view, ) features[cols_right[1:]] = features[cols_right[1:]].astype( int ) # gets cast to float above... features = bioframe.assign_view( features, view_df, drop_unassigned=drop_unassigned, df_view_col=features_view_cols[1], view_name_col=view_name_col, cols=cols_right, cols_view=cols_view, ) return features
[docs]def assign_view_auto( features, view_df, cols_unpaired=["chrom", "start", "end"], cols_paired=["chrom1", "start1", "end1", "chrom2", "start2", "end2"], cols_view=["chrom", "start", "end"], features_view_col_unpaired="region", features_view_cols_paired=["region1", "region2"], view_name_col="name", drop_unassigned=False, combined_assignments_column="region", force=True, ): """Assign region names from the view to each feature Determines whether the `features` are unpaired (1D, bed-like) or paired (2D, bedpe-like) based on presence of column names (`cols_unpaired` vs `cols_paired`) Assigns a regular 1D view, independently to each side in case of paired features. Will add one or two columns with region names (`features_view_col_unpaired` or `features_view_cols_paired`) respectively, in case of unpaired and paired features. Parameters ---------- features : pd.DataFrame bedpe-style dataframe view_df : pandas.DataFrame ViewFrame specifying region start and ends for assignment. Attempts to convert dictionary and pd.Series formats to viewFrames. cols_unpaired : list of str The names of columns containing the chromosome, start and end of the genomic intervals for unpaired features. The default values are `"chrom", "start", "end"`. cols_paired : list of str The names of columns containing the chromosome, start and end of the genomic intervals for paired features. The default values are `"chrom1", "start1", "end1", "chrom2", "start2", "end2"`. cols_view : list of str The names of columns containing the chromosome, start and end of the genomic intervals in the view. The default values are `"chrom", "start", "end"`. features_view_col_unpaired : str Name of the column where to save the assigned region name for unpaired features features_view_cols_paired : list of str Names of the columns where to save the assigned region names for paired features view_name_col : str Column of ``view_df`` with region names. Default "name". drop_unassigned : bool If True, drop intervals in `features` that do not overlap a region in the view. Default False. combined_assignments_column : str or None If set to a string value, will combine assignments from two sides of paired features when they match into column with this name: region name when regions assigned to both sides match, np.nan if not. Default "region" force : bool, True or False if features already have features_view_col (paired or not, depending on the feature types), should we re-wrtie region columns or keep them. """ if set(cols_unpaired).issubset(features.columns.astype(str)): if set([features_view_col_unpaired]).issubset(features.columns.astype(str)): if force: features.drop([features_view_col_unpaired], inplace=True, axis=1) else: return features features = bioframe.assign_view( features, view_df, df_view_col=features_view_col_unpaired, view_name_col=view_name_col, cols=cols_unpaired, cols_view=cols_view, drop_unassigned=drop_unassigned, ) elif set(cols_paired).issubset(features.columns.astype(str)): if set(features_view_cols_paired).issubset(features.columns.astype(str)): if force: features.drop(features_view_cols_paired, inplace=True, axis=1) else: return features features = assign_view_paired( features=features, view_df=view_df, cols_paired=cols_paired, cols_view=cols_view, features_view_cols=features_view_cols_paired, view_name_col=view_name_col, drop_unassigned=drop_unassigned, ) if combined_assignments_column: features[combined_assignments_column] = np.where( features[features_view_cols_paired[0]] == features[features_view_cols_paired[1]], features[features_view_cols_paired[0]], np.nan, ) else: raise ValueError( "Could not determine whether features are paired using provided column names" ) return features
[docs]def assign_regions(features, supports): """ DEPRECATED. Will be removed in the future versions and replaced with bioframe.overlap() For each feature in features dataframe assign the genomic region (support) that overlaps with it. In case if feature overlaps multiple supports, the region with largest overlap will be reported. """ index_name = features.index.name # Store the name of index features = ( features.copy() .reset_index() .rename({"index" if index_name is None else index_name: "native_order"}, axis=1) ) # Store the original features' order as a column with original index if "chrom" in features.columns: overlap = bioframe.overlap( features, supports, how="left", cols1=["chrom", "start", "end"], cols2=["chrom", "start", "end"], keep_order=True, return_overlap=True, suffixes=("_1", "_2"), ) overlap_columns = overlap.columns # To filter out duplicates later overlap["overlap_length"] = overlap["overlap_end"] - overlap["overlap_start"] # Filter out overlaps with multiple regions: overlap = ( overlap.sort_values("overlap_length", ascending=False) .drop_duplicates(overlap_columns, keep="first") .sort_index() ) # Copy single column with overlapping region name: features["region"] = overlap["name_2"] if "chrom1" in features.columns: for idx in ("1", "2"): overlap = bioframe.overlap( features, supports, how="left", cols1=[f"chrom{idx}", f"start{idx}", f"end{idx}"], cols2=[f"chrom", f"start", f"end"], keep_order=True, return_overlap=True, suffixes=("_1", "_2"), ) overlap_columns = overlap.columns # To filter out duplicates later overlap[f"overlap_length{idx}"] = ( overlap[f"overlap_end{idx}"] - overlap[f"overlap_start{idx}"] ) # Filter out overlaps with multiple regions: overlap = ( overlap.sort_values(f"overlap_length{idx}", ascending=False) .drop_duplicates(overlap_columns, keep="first") .sort_index() ) # Copy single column with overlapping region name: features[f"region{idx}"] = overlap["name_2"] # Form a single column with region names where region1 == region2, and np.nan in other cases: features["region"] = np.where( features["region1"] == features["region2"], features["region1"], np.nan ) features = features.drop( ["region1", "region2"], axis=1 ) # Remove unnecessary columns features = features.set_index("native_order") # Restore the original index features.index.name = index_name # Restore original index title return features
[docs]def assign_supports(features, supports, labels=False, suffix=""): """ Assign support regions to a table of genomic intervals. Obsolete, replaced by assign_regions now. Parameters ---------- features : DataFrame Dataframe with columns `chrom`, `start`, `end` or `chrom1`, `start1`, `end1`, `chrom2`, `start2`, `end2` supports : array-like Support areas """ features = features.copy() supp_col = pd.Series(index=features.index, data=np.nan) c = "chrom" + suffix s = "start" + suffix e = "end" + suffix for col in (c, s, e): if col not in features.columns: raise ValueError( 'Column "{}" not found in features data frame.'.format(col) ) for i, region in enumerate(supports): # single-region support if len(region) in [3, 4]: sel = (features[c] == region[0]) & (features[e] > region[1]) if region[2] is not None: sel &= features[s] < region[2] # paired-region support elif len(region) == 2: region1, region2 = region sel1 = (features[c] == region1[0]) & (features[e] > region1[1]) if region1[2] is not None: sel1 &= features[s] < region1[2] sel2 = (features[c] == region2[0]) & (features[e] > region2[1]) if region2[2] is not None: sel2 &= features[s] < region2[2] sel = sel1 | sel2 supp_col.loc[sel] = i if labels: supp_col = supp_col.map(lambda i: supports[int(i)], na_action="ignore") return supp_col
[docs]def assign_regions_to_bins(bin_ids, regions_span): regions_binsorted = ( regions_span[(regions_span["bin_start"] >= 0) & (regions_span["bin_end"] >= 0)] .sort_values(["bin_start", "bin_end"]) .reset_index() ) bin_reg_idx_lo = regions_span["bin_start"].searchsorted(bin_ids, "right") - 1 bin_reg_idx_hi = regions_span["bin_end"].searchsorted(bin_ids, "right") mask_assigned = (bin_reg_idx_lo == bin_reg_idx_hi) & (bin_reg_idx_lo >= 0) region_ids = pd.array([pd.NA] * len(bin_ids)) region_ids[mask_assigned] = regions_span["name"][bin_reg_idx_lo[mask_assigned]] return region_ids
[docs]def make_cooler_view(clr, ucsc_names=False): """ Generate a full chromosome viewframe using cooler's chromsizes Parameters ---------- clr : cooler cooler-object to extract chromsizes ucsc_names : bool Use full UCSC formatted names instead of short chromosome names. Returns ------- cooler_view : viewframe full chromosome viewframe """ cooler_view = bioframe.make_viewframe(clr.chromsizes) if ucsc_names: # UCSC formatted names return cooler_view else: # rename back to short chromnames cooler_view["name"] = cooler_view["chrom"] return cooler_view
[docs]def view_from_track(track_df): bioframe.core.checks._verify_columns(track_df, ["chrom", "start", "end"]) return bioframe.make_viewframe( [ (chrom, df.start.min(), df.end.max()) for chrom, df in track_df.groupby("chrom") ] )
[docs]def mask_cooler_bad_bins(track, bintable): """ Mask (set to NaN) values in track where bin is masked in bintable. Currently used in `cli.get_saddle()`. TODO: determine if this should be used elsewhere. Parameters ---------- track : tuple of (DataFrame, str) bedGraph-like dataframe along with the name of the value column. bintable : tuple of (DataFrame, str) bedGraph-like dataframe along with the name of the weight column. Returns ------- track : DataFrame New bedGraph-like dataframe with bad bins masked in the value column """ # TODO: update to new track format track, name = track bintable, clr_weight_name = bintable track = pd.merge( track[["chrom", "start", "end", name]], bintable, on=["chrom", "start", "end"] ) track.loc[~np.isfinite(track[clr_weight_name]), name] = np.nan track = track[["chrom", "start", "end", name]] return track
[docs]def align_track_with_cooler( track, clr, view_df=None, clr_weight_name="weight", mask_clr_bad_bins=True, drop_track_na=True, ): """ Sync a track dataframe with a cooler bintable. Checks that bin sizes match between a track and a cooler, merges the cooler bintable with the track, and propagates masked regions from a cooler bintable to a track. Parameters ---------- track : pd.DataFrame bedGraph-like track DataFrame to check clr : cooler cooler object to check against view_df : bioframe.viewframe or None Optional viewframe of regions to check for their number of bins with assigned track values. If None, constructs a view_df from cooler chromsizes. clr_weight_name : str Name of the column in the bin table with weight mask_clr_bad_bins : bool Whether to propagate null bins from cooler bintable column clr_weight_name to the 'value' column of the output clr_track. Default True. drop_track_na : bool Whether to ignore missing values in the track (as if they are absent). Important for raising errors for unassigned regions and warnings for partial assignment. Default True, so NaN values are treated as not assigned. False means that NaN values are treated as assigned. Returns ------- clr_track track dataframe that has been aligned with the cooler bintable and has columns ['chrom','start','end','value'] """ from .checks import is_track, is_cooler_balanced try: is_track(track, raise_errors=True) except Exception as e: raise ValueError("invalid input track") from e # since tracks are currently allowed to have flexible column names c, s, e, v = track.columns[:4] # using median to allow for shorter / longer last bin on any chromosome track_bin_width = int((track[e] - track[s]).median()) if not (track_bin_width == clr.binsize): raise ValueError( "mismatch between track and cooler bin size, check track resolution" ) clr_track = ( (clr.bins()[:]) .copy() .merge( track.rename(columns={c: "chrom", s: "start", e: "end", v: "value"}), how="left", on=["chrom", "start"], suffixes=("", "_"), indicator=True, ) ) if clr_weight_name: try: is_cooler_balanced(clr, clr_weight_name=clr_weight_name, raise_errors=True) except Exception as e: raise ValueError( f"no column {clr_weight_name} detected in input cooler bintable" ) from e else: clr_track[clr_weight_name] = 1.0 valid_bins = clr_track[clr_weight_name].notna() num_valid_bins = valid_bins.sum() if drop_track_na: num_assigned_bins = (clr_track["value"][valid_bins].notna()).sum() else: num_assigned_bins = len(clr_track.query("_merge=='both'")["value"][valid_bins]) if num_assigned_bins == 0: raise ValueError("no track values assigned to cooler bintable") elif num_assigned_bins < 0.5 * num_valid_bins: warnings.warn("less than 50% of valid bins have been assigned a value") view_df = make_cooler_view(clr) if view_df is None else view_df for region in view_df.itertuples(index=False): track_region = bioframe.select(clr_track, region) if drop_track_na: num_assigned_region_bins = track_region["value"].notna().sum() else: num_assigned_region_bins = len(track_region["value"]) if num_assigned_region_bins == 0: raise ValueError( f"no track values assigned to region {bioframe.to_ucsc_string(region)}" ) if mask_clr_bad_bins: clr_track.loc[~valid_bins, "value"] = np.nan return clr_track[["chrom", "start", "end", "value"]]
[docs]def pool_decorator(func): """ A decorator function that enables multiprocessing for a given function. The function must have a ``map_functor`` keyword argument. It works by hijacking map_functor argument and substituting it with the parallel one: multiprocess.Pool(nproc).imap, when nproc > 1 Parameters ---------- func : callable The function to be decorated. Returns ------- A wrapper function that enables multiprocessing for the given function. """ @wraps(func) def wrapper(*args, **kwargs): # If alternative or third party map functors are provided if "map_functor" in kwargs.keys(): logging.info(f"using an alternative map functor: {kwargs['map_functor']}") return func(*args, **kwargs, map_functor=kwargs["map_functor"]) pool = None if "nproc" in kwargs.keys(): if kwargs["nproc"] > 1: logging.info(f"creating a Pool of {kwargs['nproc']} workers") pool = Pool(kwargs["nproc"]) mymap = pool.imap else: logging.info("fallback to serial implementation.") mymap = map try: result = func(*args, **kwargs, map_functor=mymap) finally: if pool is not None: pool.close() pool.terminate() return result else: logging.warning("nproc is not specified, single thread is used") return func(*args, **kwargs, map_functor=map) return wrapper