Source code for easydecon.easydecon

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import scanpy as sc
import numpy as np
try:
    import fireducks.pandas as pd
    #print("Using fireducks.pandas for enhanced functionality.")
except ImportError:
    import pandas as pd
    #print("fireducks.pandas not found. Falling back to standard pandas.")

try:
    import spatialdata as sd
    import spatialdata_io
    from spatialdata import get_extent
    from spatialdata import bounding_box_query
    from spatialdata import match_element_to_table
    import spatialdata_plot
except ImportError:
    print("SpatialData not found. You may want to install SpatialData (https://spatialdata.readthedocs.io/en/latest/)")
    pass

from scipy.stats import spearmanr
from scipy.spatial.distance import cosine
from scipy.spatial.distance import euclidean
from scipy.stats import gamma
from scipy.stats import expon
from scipy.optimize import nnls
from scipy.stats import zscore
from scipy.stats import genpareto
from tqdm.auto import tqdm
from scipy.sparse import issparse

import logging

from sklearn.linear_model import Ridge, Lasso, ElasticNet


def _suppress_warnings_in_worker():
    """ Suppress warnings and logging inside joblib workers. """
    logging.getLogger().setLevel(logging.CRITICAL)
    warnings.simplefilter("ignore")

[docs] def process_row_with_suppression(row, func, **kwargs): """ Wrapper around `process_row` to suppress warnings in each worker. """ _suppress_warnings_in_worker() # Suppress inside the worker return process_row(row, func, **kwargs)
from .config import config from joblib import Parallel, delayed from spatialdata import polygon_query # Ensure that the progress_apply method is available #tqdm.pandas() logger = logging.getLogger(__name__)
[docs] def sparse_var(matrix, axis=0): if issparse(matrix): # Sparse matrix mean_sq = matrix.mean(axis=axis).A1 ** 2 sq_mean = matrix.multiply(matrix).mean(axis=axis).A1 else: # Dense numpy array mean_sq = np.mean(matrix, axis=axis) ** 2 sq_mean = np.mean(matrix * matrix, axis=axis) return sq_mean - mean_sq
[docs] def common_markers_gene_expression_and_filter( sdata: object, marker_genes, # can be list, dict, or DataFrame common_group_name: str = "MarkerGroup", # will create this column in the spatial data table, used if marker_genes is a list celltype: str = "group", # DF column holding group IDs gene_id_column: str = "names", # DF column holding marker gene names exclude_group_names: list[str] = [], bin_size: int = 8, aggregation_method: str = "sum", add_to_obs: bool = True, filtering_algorithm: str = "permutation", # or "quantile" num_permutations: int = 5000, # permutation param total number of permutations alpha: float = 0.01, #significance level for the permutation-based cutoff subsample_size: int = 25000, #permutation param subsample_signal_quantile: float = 0.1, #permutation param, between 0 and 1, if 0.1, 10% of the bins with the lowest and highest expression will be discarded permutation_gene_pool_fraction: float = 0.3, # top fraction of genes to be used for the null distribution parametric: bool = True, #if parametric, gamma or exponential distribution is used n_subs: int = 5, # number of subsamples quantile: float = 0.7, #if quantile selected, output_stat: str = "expression", # NEW: {"expression", "minus_log10_p"} **kwargs ) -> pd.DataFrame: """ Extended version allowing marker_genes as a list, dict, or DataFrame, with customizable column names for the DataFrame. If marker_genes is: 1) list[str]: Single group of markers -> create one column named `common_group_name`. 2) dict[str, list[str]]: Multiple groups -> each dict key becomes a column in table.obs. 3) pd.DataFrame: Must contain columns for groups and gene names (by default 'group' and 'names'), but these can be overridden by `celltype` and `gene_id_column`. Steps (for each group): 1) Compute aggregator (sum, mean, median, cs) for all bins over that group's marker genes. 2) If filtering_algorithm="permutation", subsample bins (subsample_size) and build a null distribution by randomly picking genes of size=len(marker_genes). 3) If filtering_algorithm="quantile", compute threshold from (1 - quantile). 4) Apply cutoff to all bins (values below threshold become 0). 5) Merge results back into `table.obs` if `add_to_obs=True`. Parameters ---------- sdata : object Spatial data container or AnnData object. Expected to have sdata.tables[f"square_{bin_size:03}um"]. marker_genes : list, dict, or pd.DataFrame Marker genes to use: - list[str]: Single group of marker genes (assigned to `common_group_name`). - dict[str, list[str]]: Mapping of group names to marker gene lists. - pd.DataFrame: Must have columns for group and gene names (default: 'group' and 'names'), customizable via `celltype` and `gene_id_column`. common_group_name : str, optional Column name assigned if marker_genes is a list. Default is "MarkerGroup". celltype : str, optional Column name in marker_genes DataFrame for group identifier. Default is "group". gene_id_column : str, optional Column name in marker_genes DataFrame for gene names. Default is "names". exclude_group_names : list[str], optional Names of groups to exclude bins where those groups are nonzero. Default is empty. bin_size : int, optional Spatial bin size in microns (e.g., 8 means "square_008um" table). Default is 8. aggregation_method : str, optional How to aggregate gene expression across marker genes. One of "sum", "mean", "median", or "cs" (composite score). Default is "sum". add_to_obs : bool, optional Whether to add the results into the `obs` of the spatial data table. Default is True. filtering_algorithm : str, optional Method to determine expression cutoff. Options: "permutation" or "quantile". Default is "permutation". num_permutations : int, optional Number of permutations for null distribution (used if filtering_algorithm="permutation"). Default is 5000. alpha : float, optional Significance level (1 - alpha quantile) for thresholding using permutation. Default is 0.01. subsample_size : int, optional Number of bins used in permutation subsampling. Default is 25000. subsample_signal_quantile : float, optional Quantile range for selecting moderate-expression bins before permutation. Default is 0.1. permutation_gene_pool_fraction : float, optional Fraction of most variable genes used as the background gene pool for permutation. Default is 0.3. parametric : bool, optional If True, fit a parametric distribution (Gamma or Exponential) to null scores for thresholding. Otherwise use empirical quantile. Default is True. n_subs : int, optional Number of smaller subsets to split the permutation into. Default is 5. quantile : float, optional Quantile cutoff if filtering_algorithm="quantile". Default is 0.7. **kwargs Additional keyword arguments (currently unused but reserved for future extensions). Returns ------- pd.DataFrame The final DataFrame with aggregated + thresholded expression for each group. Columns = one per group, indexed by bin. """ # ----------------------------------------------------------- # 0) Convert marker_genes input to a dictionary: group -> list of genes # ----------------------------------------------------------- if isinstance(marker_genes, list): # Single group: user gave a plain list of gene names group_dict = {common_group_name: marker_genes} elif isinstance(marker_genes, dict): # Already in dict form: group_name -> [list_of_genes] group_dict = marker_genes elif isinstance(marker_genes, pd.DataFrame): # Expect columns group_col and gene_col required_cols = {celltype, gene_id_column} if not required_cols.issubset(marker_genes.columns): raise ValueError( f"DataFrame for marker_genes must have columns: {celltype}, {gene_id_column}." ) # Build a dict: group_name -> list_of_genes group_dict = {} marker_genes_tmp = marker_genes.copy() try: marker_genes_tmp.index.names=[""] except: pass for gname, sub_df in marker_genes_tmp.groupby(celltype): # Extract unique gene names in this group genes_for_gname = sub_df[gene_id_column].unique().tolist() group_dict[gname] = genes_for_gname else: raise TypeError( "marker_genes must be a list, dict, or DataFrame with the appropriate columns." ) # 1) Retrieve the table try: table_name = f"square_{bin_size:03}um" table = sdata.tables[table_name] except (AttributeError, KeyError): try: table = sdata.tables["table"] except (AttributeError, KeyError): table = sdata gene_variability = sparse_var(table.X,axis=0) gene_pool = table.var_names[np.argsort(gene_variability)[-int(permutation_gene_pool_fraction * len(table.var_names)):]] # 2) Exclude spots spots_to_be_used = table.obs.index if exclude_group_names: for g in exclude_group_names: if g in table.obs.columns: spots_excluded = table.obs[table.obs[g] != 0].index spots_to_be_used = spots_to_be_used.difference(spots_excluded) #patch for Xenium, may be deleted if isinstance(spots_to_be_used, (list, tuple, set)): spots_to_be_used = pd.Index(spots_to_be_used) elif isinstance(spots_to_be_used, np.ndarray): spots_to_be_used = pd.Index(spots_to_be_used) # If it's a boolean array/Series, use it directly as a mask if getattr(spots_to_be_used, "dtype", None) is not None and spots_to_be_used.dtype == bool: obs_mask = np.asarray(spots_to_be_used, dtype=bool) else: # treat as labels; if you actually meant positions, change to `.iloc` below obs_mask = table.obs_names.isin(spots_to_be_used) shape_hat = loc_hat = scale_hat = None nonzero_null_vals = None # Prepare a final DataFrame to collect group results result_df = pd.DataFrame(index=spots_to_be_used) # Aggregation functions aggregation_funcs = { "sum": "sum", "mean": "mean", "median": "median", "cs": composite_score, # or any custom aggregator } if aggregation_method not in aggregation_funcs: raise ValueError("aggregation_method must be one of: sum, mean, median or cs.") aggregator = aggregation_funcs[aggregation_method] tqdm.pandas() # ----------------------------------------------------------- # Loop over each group in the dictionary # ----------------------------------------------------------- for group_name, gene_list in group_dict.items(): # Intersect gene_list with table.var_names filtered_genes = set(gene_list).intersection(table.var_names) filtered_genes = list(filtered_genes) if not filtered_genes: print(f"Warning: No valid marker genes found for group '{group_name}'.") # We'll create a column of all zeros result_df[group_name] = 0 continue var_mask = table.var_names.isin(filtered_genes) # Retrieve expression for the selected spots & genes #expr_matrix = table[spots_to_be_used, filtered_genes].to_df() expr_matrix = table[obs_mask, var_mask].to_df() if isinstance(aggregator, str): aggregated_vals = expr_matrix.agg(aggregator, axis=1) else: aggregated_vals = expr_matrix.apply(aggregator, axis=1) group_expression = aggregated_vals.to_frame(name=group_name) # Apply the chosen filtering algorithm if filtering_algorithm == "quantile": # Threshold from non-zero aggregator values non_zero_vals = group_expression[group_expression[group_name] != 0][group_name] threshold = non_zero_vals.quantile(quantile) elif filtering_algorithm == "permutation": marker_expr = table[:, filtered_genes].to_df().agg(aggregator, axis=1) signal_low, signal_high = marker_expr.quantile([subsample_signal_quantile, 1-subsample_signal_quantile]) candidate_spots = marker_expr[(marker_expr >= signal_low) & (marker_expr <= signal_high)].index if len(candidate_spots) == 0: # Edge case: if everything was below cutoff print(f"Warning: no bins passed the total_counts quantile filter for {group_name}.") threshold = 0 else: all_null_scores = [] marker_set_size = len(filtered_genes) # Determine each subset size subset_size_each = subsample_size // n_subs remainder = subsample_size % n_subs # if not divisible np.random.seed(10) # n_subs loops for i in range(n_subs): # For remainder distribution, you can let the first few subsets be bigger or smaller current_subset_size = subset_size_each if remainder > 0: current_subset_size += 1 remainder -= 1 if len(candidate_spots) > current_subset_size: subset_spots = np.random.choice(candidate_spots, size=current_subset_size, replace=False) else: subset_spots = candidate_spots # Build null distribution for this subset all_expr_df = table[subset_spots, :].to_df() np.random.seed(10) for _ in tqdm( range(int(num_permutations/n_subs)), #desc=f"Perm sub {i+1}/{n_subs} of {current_subset_size} for {group_name}", desc=f"Subsample {current_subset_size*(i+1)}/{subsample_size} for {group_name}", leave=True, position=0 ): random_genes = np.random.choice(gene_pool, size=marker_set_size, replace=False) if isinstance(aggregator, str): random_vals = all_expr_df[random_genes].agg(aggregator, axis=1) else: random_vals = all_expr_df[random_genes].apply(aggregator, axis=1) all_null_scores.append(random_vals.values) # Concatenate results null_scores_concat = np.concatenate(all_null_scores) nonzero_null_vals = null_scores_concat[null_scores_concat > 0] if len(nonzero_null_vals) == 0: print("Warning: no positive values in null distribution, threshold set to 0.") threshold = 0 else: if not parametric: threshold = np.quantile(nonzero_null_vals, 1 - alpha) else: shape_hat, loc_hat, scale_hat = gamma.fit(nonzero_null_vals,floc=0) threshold = gamma.ppf(1 - alpha, shape_hat, loc=loc_hat, scale=scale_hat) elif filtering_algorithm == "nb": if "counts" not in table.layers: raise ValueError("NB filtering requires raw counts in table.layers['counts'].") if aggregation_method != "sum": raise ValueError("NB filtering currently supports aggregation_method='sum' only.") # Observed marker-sum per bin (raw counts) Xg = table[obs_mask, var_mask].layers["counts"] # sparse is fine S = np.asarray(Xg.sum(axis=1)).ravel().astype(float) # Library size per bin (raw counts) Xall = table[obs_mask, :].layers["counts"] total = np.asarray(Xall.sum(axis=1)).ravel().astype(float) median_total = np.median(total) + 1e-8 sf = total / median_total # Baseline composition for these marker genes (q_g) # Use all bins in obs_mask for stability gene_sum = np.asarray(Xg.sum(axis=0)).ravel().astype(float) total_sum = total.sum() + 1e-8 q = gene_sum / total_sum # length = n_marker_genes # Expected mu_bg and variance under global NB dispersion theta = kwargs.get("nb_global_theta", 50.0) # minimal: read from kwargs if provided mu = (sf[:, None] * q[None, :] * median_total) mu = np.maximum(mu, 1e-8) var = mu + (mu ** 2) / float(theta) ES = mu.sum(axis=1) VS = var.sum(axis=1) + 1e-8 from scipy.stats import norm Z = (S - ES) / np.sqrt(VS) pvals = norm.sf(Z) # right-tail # Fill group_expression in the same shape/index you already use group_expression = pd.DataFrame({group_name: S}, index=table.obs_names[obs_mask]) # Then reuse your existing output_stat block, but with: # - threshold as bin-specific (expression mode) # - pvals from NB (minus_log10_p mode) if output_stat == "expression": zcrit = norm.isf(alpha) thresh = ES + zcrit * np.sqrt(VS) kept = np.where(S >= thresh, S, 0.0) result_df[group_name] = pd.Series(kept, index=group_expression.index).fillna(0.0) elif output_stat == "minus_log10_p": pclip = np.clip(pvals, 1e-300, 1.0) score = -np.log10(pclip) score[pvals > alpha] = 0.0 result_df[group_name] = pd.Series(score, index=group_expression.index).fillna(0.0) else: raise ValueError(f"Unsupported output_stat: {output_stat}") else: raise ValueError("Invalid filtering_algorithm. Use 'quantile' or 'permutation'.") if filtering_algorithm != "nb": # --- NEW: choose what to output based on output_stat --- if output_stat == "expression": # Original behavior: threshold on expression, keep values above threshold group_expression[group_name] = np.where( group_expression[group_name] >= threshold, group_expression[group_name], 0, ) result_df[group_name] = group_expression[group_name].fillna(0) elif output_stat == "minus_log10_p": # Compute p-values relative to the null distribution and output -log10(p) vals = group_expression[group_name].values # p-values: P(null >= observed) if parametric: pvals = gamma.sf(vals, shape_hat, loc=loc_hat, scale=scale_hat) else: # empirical right-tail p-value sorted_null = np.sort(nonzero_null_vals) M = len(sorted_null) # for each v: position of v in null; P(null >= v) = (M - idx) / M idx = np.searchsorted(sorted_null, vals, side="left") pvals = (M - idx) / float(M) # Clip to avoid log(0) pvals_clipped = np.clip(pvals, 1e-300, 1.0) minus_log10_p = -np.log10(pvals_clipped) # Zero out non-significant entries (p > alpha) minus_log10_p[pvals > alpha] = 0.0 result_df[group_name] = pd.Series( minus_log10_p, index=group_expression.index, ).fillna(0.0) else: raise ValueError(f"Unsupported output_stat: {output_stat}") # ----------------------------------------------------------- # Merge results back into obs if requested # ----------------------------------------------------------- if add_to_obs: print("Adding results to table.obs of sdata object") # Drop existing columns of same names if present for col in result_df.columns: if col in table.obs.columns: table.obs.drop(columns=[col], inplace=True, errors='ignore') # Merge table.obs = pd.merge(table.obs, result_df, left_index=True, right_index=True, how='left') for col in result_df.columns: table.obs[col] = table.obs[col].fillna(0) return result_df
[docs] def get_clusters_by_similarity_on_tissue( sdata, markers_df, common_group_name=None, bin_size=8, gene_id_column="names", #similarity_by_column="logfoldchanges", method="wjaccard", add_to_obs=False, **kwargs, ): """ Compute cluster assignments based on a chosen similarity method. Parameters ---------- sdata : AnnData-like object Spatial (or single-cell) data containing expression matrices. It is expected to have 'tables' attribute with keys like "square_00Xum", or simply be treated as a table if the key doesn't exist. markers_df : pd.DataFrame DataFrame containing marker genes for each cluster. Rows typically represent clusters, columns represent information about each gene (e.g., logfoldchanges, names, etc.). common_group_name : str, optional Name of a column in `table.obs` specifying spots to process. If found, only spots where `common_group_name != 0` are processed. Otherwise, all spots are processed. Default is None. bin_size : int, optional Determines the bin size (like "square_008um") for looking up the table in `sdata.tables`. Default is 8. gene_id_column : str, optional Name of the column in `markers_df` that contains gene IDs. Default is "names". similarity_by_column : str, optional Column in `markers_df` used to measure similarity or weight. Default is "logfoldchanges". method : str, optional Method to use for computing similarity. Supported methods include: "correlation", "cosine", "jaccard", "overlap", "wjaccard", "diagnostic", "sum", "mean", "median". Default is "wjaccard". add_to_obs : bool, optional If True, adds the resulting assignment columns to `table.obs`. Default is True. **method_kwargs : Additional, method-specific parameters. For example: - For method="wjaccard": supply ``lambda_param``, etc. Returns ------- pd.DataFrame A DataFrame whose index matches `table.obs.index` with cluster assignment columns (or other metrics) computed by the specified method. """ # Try to get the appropriate table from sdata; if not present, treat sdata as the table try: table_name = f"square_{bin_size:03}um" table = sdata.tables[table_name] except (AttributeError, KeyError): try: table = sdata.tables["table"] except (AttributeError, KeyError): table = sdata # Enable tqdm progress bar in pandas tqdm.pandas() # Determine which spots to process if common_group_name in table.obs.columns: print(f"Processing spots with {common_group_name} != 0") spots_with_expression = table.obs[table.obs[common_group_name] != 0].index else: print("common_group_name column not found in the table, processing all spots.") spots_with_expression = table.obs.index # Select similarity function based on method similarity_methods = { "correlation": function_row_spearman, "cosine": function_row_cosine, "jaccard": function_row_jaccard, "overlap": function_row_overlap, "wjaccard": function_row_weighted_jaccard, "diagnostic": function_row_diagnostic, "sum": function_row_sum, "mean": function_row_mean, "median": function_row_median, "euclidean": function_row_euclidean, "auc": function_row_auc_specific } if method not in similarity_methods: raise ValueError( "Invalid method. Choose from: correlation, cosine, jaccard, overlap, " "wjaccard, diagnostic, sum, mean, median, euclidean, auc" ) func = similarity_methods[method] # Show parallelization info #from .config import config # Import inside function to prevent issues with joblib reloading print("Number of threads used:", config.n_jobs) print("Batch size:", config.batch_size) # Run computations in parallel results = Parallel( n_jobs=config.n_jobs, backend="loky", # Ensure workers do not inherit unnecessary imports )( delayed(process_row_with_suppression)(row, func, markers_df=markers_df, gene_id_column=gene_id_column, **kwargs) for _, row in tqdm(table[spots_with_expression,].to_df().iterrows(), total=len(spots_with_expression),leave=True, position=0) ) # Convert results to a DataFrame result_df = pd.DataFrame(results) result_df.set_index("Index", inplace=True) result_df = result_df["assigned_cluster"].apply(pd.Series) # For spots not processed (e.g., excluded by common_group_name != 0) # fill with zeros or NaNs, depending on your needs others_df = pd.DataFrame( 0, index=list(set(table.obs.index) - set(spots_with_expression)), columns=result_df.columns ) df = pd.concat([result_df, others_df]) # Optionally merge back into table.obs if method != "diagnostic" and add_to_obs: print("Adding results to table.obs of sdata object") table.obs.drop(columns=df.columns, inplace=True, errors='ignore') table.obs = pd.merge(table.obs, df, left_index=True, right_index=True) return df
#this function is used to read the markers from a file or from an single-cell anndata object and return a dataframe
[docs] def read_markers_dataframe(sdata, filename=None, adata=None, exclude_celltype=[], bin_size=8, top_n_genes=60, sort_by_column="scores", ascending=False, gene_id_column="names", celltype="group", key="rank_genes_groups", log2fc_min=0.25, pval_cutoff=0.05, drop_ribosomal=False, drop_mitochondrial=False): """ Reads and processes marker genes data for spatial transcriptomics analysis. This function can read marker genes data either from a file or from an AnnData object, and processes it to create a filtered and sorted DataFrame of marker genes. Parameters: ----------- sdata : SpatialData object The spatial data object containing the spatial transcriptomics data. filename : str, optional Path to the input file containing marker genes data (CSV or Excel format). Required if `adata` is not provided. adata : AnnData object, optional AnnData object containing the marker genes data. Required if `filename` is not provided. exclude_celltype : list, optional List of cell types to exclude from the analysis. Default: [] bin_size : int, optional Size of the spatial bin in micrometers. Default: 8 top_n_genes : int, optional Number of top genes to keep per cell type. Default: 60 sort_by_column : str, optional Column name to sort the genes by. Default: "scores" ascending : bool, optional Whether to sort in ascending order. Default: False gene_id_column : str, optional Column name containing gene IDs. Default: "names" celltype : str, optional Column name containing cell type information. Default: "group" key : str, optional Key in adata.uns where marker genes are stored. Default: "rank_genes_groups" log2fc_min : float, optional Minimum log2 fold change threshold for gene selection. Default: 0.25 pval_cutoff : float, optional Maximum adjusted p-value threshold for gene selection. Default: 0.05 drop_ribosomal : bool, optional Whether to remove ribosomal genes before final selection. Removes genes starting with RPS or RPL (case-insensitive). Default: False drop_mitochondrial : bool, optional Whether to remove mitochondrial genes before final selection. Removes genes starting with MT- or mt-. Default: False Returns: -------- pandas.DataFrame Processed DataFrame containing filtered and sorted marker genes data. The DataFrame includes columns for cell types, gene IDs, and scores. Raises: ------ ValueError If neither `filename` nor `adata` is provided. If invalid `adata` object is provided. Notes: ----- - The function automatically handles both CSV and Excel file formats. - Genes are filtered based on log2 fold change and adjusted p-value thresholds. - The resulting DataFrame is sorted by the specified column and limited to the top N genes per cell type. """ try: table_name = f"square_{bin_size:03}um" table = sdata.tables[table_name] except (AttributeError, KeyError): try: table = sdata.tables["table"] except (AttributeError, KeyError): table = sdata if adata is None: if filename is None: raise ValueError("Please provide a filename or an adata object") else: try: df=pd.read_csv(filename,dtype={gene_id_column:str,celltype:str}) except: df=pd.read_excel(filename,dtype={gene_id_column:str,celltype:str}) else: try: df=sc.get.rank_genes_groups_df(adata,group=None, key=key, pval_cutoff=pval_cutoff, log2fc_min=log2fc_min) except: raise ValueError("Please provide a valid adata object with rank_genes_groups key") if "logfoldchanges" in df.columns: df=df[df["logfoldchanges"] >= log2fc_min] if "pvals_adj" in df.columns: df=df[df["pvals_adj"] <= pval_cutoff] # Optional gene family filtering if drop_ribosomal: gene_upper = df[gene_id_column].str.upper() df = df[~gene_upper.str.startswith(("RPS", "RPL"))] if drop_mitochondrial: gene_upper = df[gene_id_column].str.upper() df = df[~gene_upper.str.startswith("MT-")] df = df[df[gene_id_column].isin(table.var_names)] #check if the var_names are present in the spatial data df = df[~df[celltype].isin(exclude_celltype)] df = df.sort_values(by=sort_by_column, ascending=ascending) df = df.drop_duplicates(subset=[celltype, gene_id_column], keep='first') df = df.groupby(celltype).head(top_n_genes) print("Unique cell types detected in the dataframe:") print(df[celltype].unique()) df.set_index(celltype,inplace=True,drop=False) return df
[docs] def assign_clusters_from_df(sdata, df, bin_size=8, results_column="easydecon", method="max", allow_multiple=False, diagnostic=None, fold_change_threshold=2.0, add_to_obs=True): """ Assigns cell clusters to spatial spots based on deconvolution results. This function takes deconvolution results and assigns cell type clusters to each spatial spot using different methods (max, zmax, or hybrid). The results are stored in the spatial data table. Parameters: ----------- sdata : SpatialData object The spatial data object containing the spatial transcriptomics data. df : pandas.DataFrame DataFrame containing deconvolution results with cell type proportions. Rows should correspond to spatial spots, columns to cell types. bin_size : int, optional Size of the spatial bin in micrometers. Default: 8 results_column : str, optional Name of the column in the table.obs where results will be stored. Default: "easydecon" method : str, optional Method to use for cluster assignment: - "max": Assigns the cell type with the highest proportion - "zmax": Uses z-score normalization before finding the maximum - "hybrid": Combines similarity scores and adaptive probabilities Default: "max" allow_multiple : bool, optional Whether to allow multiple cell type assignments per spot. Default: False diagnostic : str, optional Path to save diagnostic information (if needed). Default: None fold_change_threshold : float, optional Threshold for fold change filtering. Default: 2.0 Returns: -------- None The function modifies the input sdata object in place by adding cluster assignments to the table.obs DataFrame under the specified results_column. Raises: ------ ValueError If an invalid method is specified. Notes: ----- - The function automatically handles missing values and ensures proper indexing. - Results are stored as categorical variables in the table.obs DataFrame. - The function supports three different methods for cluster assignment, each with its own characteristics and use cases. """ try: table_name = f"square_{bin_size:03}um" table = sdata.tables[table_name] except (AttributeError, KeyError): try: table = sdata.tables["table"] except (AttributeError, KeyError): table = sdata table.obs.drop(columns=[results_column], inplace=True, errors='ignore') df_filtered = df.loc[table.obs.index] def softmax(row, **kwargs): v = row.to_numpy(dtype=float) # treat non-finite as very negative so they don't contribute v = np.where(np.isfinite(v), v, -np.inf) # if row has <2 finite or variance ~0 → uninformative finite = np.isfinite(v) if finite.sum() < 2 or (np.nanmax(v[finite]) - np.nanmin(v[finite]) < 1e-12): return pd.Series(np.zeros_like(v, dtype=float), index=row.index) m = np.nanmax(v) ex = np.exp(v - m) ex[~np.isfinite(ex)] = 0.0 s = ex.sum() probs = ex / s if s > 0 else np.zeros_like(ex) return pd.Series(probs, index=row.index) if method == "max" and not allow_multiple: df_reindexed = df_filtered[~(df_filtered == 0).all(axis=1)].idxmax(axis=1).to_frame(results_column).astype('category').reindex(table.obs.index, fill_value=np.nan) elif method == "zmax" and not allow_multiple: tmp = df_filtered.replace(0, np.nan).apply(lambda x: zscore(x, nan_policy='omit'), axis=0).idxmax(axis=1) df_reindexed = tmp.to_frame(results_column).astype('category').reindex(table.obs.index, fill_value=np.nan) elif method == "hybrid": # row-wise zscore if allow_multiple: print("Multiple assignments per spot allowed, so hybrid assignment method is selected...") row_is_all_zero_or_nan = ~np.isfinite(df_filtered).any(axis=1) | (df_filtered.replace(0, np.nan).isna().all(axis=1)) informative_mask = ~row_is_all_zero_or_nan similarity_zscores = df_filtered[informative_mask].apply( lambda r: zscore(r.to_numpy(dtype=float), nan_policy='omit'), axis=1 ) similarity_zscores = pd.DataFrame( np.vstack(similarity_zscores.values), index=df_filtered[informative_mask].index, columns=df_filtered[informative_mask].columns ).replace([np.inf, -np.inf], np.nan).fillna(0.0) adaptive_probs = similarity_zscores.apply(softmax, axis=1) def adaptive_assign(row): sorted_probs = row.sort_values(ascending=False) min_probability = 1.0 / len(row) if len(sorted_probs) < 2: if sorted_probs.iloc[0] >= min_probability: return sorted_probs.index[0] else: return np.nan top_prob = sorted_probs.iloc[0] second_prob = sorted_probs.iloc[1] if (top_prob >= min_probability) and (top_prob >= fold_change_threshold * second_prob): return sorted_probs.index[0] elif allow_multiple: eligible = sorted_probs[sorted_probs >= min_probability] if not eligible.empty: return ';'.join(eligible.index.tolist()) else: return np.nan else: return np.nan assigned_clusters = [] for _, row in tqdm(adaptive_probs.iterrows(), total=adaptive_probs.shape[0], desc="Assigning clusters", leave=True,position=0): assigned_clusters.append(adaptive_assign(row)) df_reindexed = pd.DataFrame(assigned_clusters, index=adaptive_probs.index, columns=[results_column]).astype('category').reindex(table.obs.index, fill_value=np.nan) else: raise ValueError("Please provide a valid method: max, zmax, or hybrid") if allow_multiple and method == "hybrid": tmp = pd.DataFrame() tmp[[f"{results_column}", f"{results_column}_second", f"{results_column}_third"]] = df_reindexed[results_column].str.split(";",n=2,expand=True) df_reindexed = tmp.astype('category').reindex(table.obs.index) if add_to_obs: print("Adding results to table.obs of sdata object") table.obs.drop( columns=df_reindexed.columns, inplace=True, errors='ignore' ) table.obs = pd.merge(table.obs, df_reindexed, left_index=True, right_index=True, how="left") if diagnostic is not None: for r in table.obs[[results_column]].itertuples(index=True, name='Pandas'): if not pd.isna(getattr(r, results_column)): table.obs.at[r.Index, results_column] = getattr(r, results_column) return df_reindexed
[docs] def visualize_only_selected_clusters(sdata,clusters,bin_size=8,results_column="easydecon",temp_column="tmp"): try: table_name = f"square_{bin_size:03}um" table = sdata.tables[table_name] except (AttributeError, KeyError): try: table = sdata.tables["table"] except (AttributeError, KeyError): table = sdata table.obs.drop(columns=[temp_column],inplace=True,errors='ignore') #table.obs=pd.merge(table.obs, df.idxmax(axis=1).to_frame(results_column).astype('category'), left_index=True, right_index=True) table.obs[temp_column]=table.obs[results_column].apply(lambda x: x if x in clusters else np.nan) return
[docs] def plot_assigned_clusters_from_dataframe(sdata,dataframe,sample_id,bin_size=8,title="Assigned Clusters",cmap="tab20",legend_fontsize=8,figsize=(5,5),dpi=200,method="matplotlib",scale=1): assign_clusters_from_df(sdata,df=dataframe,bin_size=8,results_column="plotted_clusters") sdata.pl.render_images("queried_cytassist").pl.render_shapes( f"{sample_id}_square_{bin_size:03}um", color="plotted_clusters",cmap=cmap,method=method,scale=scale ).pl.show(coordinate_systems="global", title=title, legend_fontsize=legend_fontsize,figsize=figsize,dpi=dpi) return
[docs] def napari_region_assignment(sdata,key="Shapes",bin_size=8,column="napari",target_coordinate_system="global"): try: sdata[key] except: raise ValueError("Please provide a valid key for the shapes in the spatial data object that assigned via Napari") sdata.tables[f"square_{bin_size:03}um"].obs.drop(columns=column,inplace=True,errors='ignore') indices_list = [] for g in sdata[key].geometry: indices=polygon_query(sdata,polygon=g,target_coordinate_system=target_coordinate_system).tables[f"square_{bin_size:03}um"].obs.index indices_list.extend(indices) df=pd.DataFrame("No", index=sdata.tables[f"square_{bin_size:03}um"].obs.index, columns=[column]) df[column]=np.where(df.index.isin(set(indices_list)), 'Yes', 'No') sdata.tables[f"square_{bin_size:03}um"].obs=pd.merge(sdata.tables[f"square_{bin_size:03}um"].obs, df, left_index=True, right_index=True) return
[docs] def process_row(row,func, **kwargs): return pd.Series({ 'Index': row.name, #'assigned_cluster': func(row, markers_df, gene_id_column=gene_id_column, similarity_by_column=similarity_by_column,threshold=threshold) 'assigned_cluster': func(row, **kwargs) })
[docs] def function_row_spearman(row, markers_df,**kwargs): gene_id_column=kwargs.get("gene_id_column","names") similarity_by_column=kwargs.get("similarity_by_column","logfoldchanges") #penalty_param=kwargs.get("penalty_param",0.5) a = {} for c in markers_df.index.unique(): vector_series = pd.Series(markers_df[[gene_id_column,similarity_by_column]].loc[[c]][similarity_by_column].values, index=markers_df[[gene_id_column, similarity_by_column]].loc[[c]][gene_id_column].values) l = len(vector_series) vector_series = vector_series.reindex(row.index, fill_value=np.nan) valid_mask = ~vector_series.isna() & ~row.isna() t = (row[valid_mask] != 0).sum() if t == 0: # No valid pairs a[c] = 0.0 else: #sp = (spearmanr(row[valid_mask], vector_series[valid_mask], nan_policy="omit")[0])*((t/l)**penalty_param) sp = (spearmanr(row[valid_mask], vector_series[valid_mask], nan_policy="omit")[0])*((t/l)) a[c] = sp if sp > 0 else 0.0 # Assign 0 if correlation is negative return a
[docs] def function_row_cosine(row, markers_df,**kwargs): gene_id_column=kwargs.get("gene_id_column","names") similarity_by_column=kwargs.get("similarity_by_column","logfoldchanges") #penalty_param=kwargs.get("penalty_param",0) a = {} for c in markers_df.index.unique(): vector_series = pd.Series(markers_df[[gene_id_column,similarity_by_column]].loc[[c]][similarity_by_column].values, index=markers_df[[gene_id_column, similarity_by_column]].loc[[c]][gene_id_column].values) l = len(vector_series) vector_series = vector_series.reindex(row.index, fill_value=np.nan) vector_series = min_max_scale(vector_series) valid_mask = ~vector_series.isna() & ~row.isna() t = (row[valid_mask] != 0).sum() if t == 0: # No valid pairs a[c] = 0.0 else: #a[c] = (1 - cosine(row[valid_mask], vector_series[valid_mask]))*((t/l)**penalty_param) #penalize the cosine similarity by the fraction of valid pairs a[c] = (1 - cosine(row[valid_mask], vector_series[valid_mask])) return a
[docs] def function_row_euclidean(row, markers_df, **kwargs): gene_id_column = kwargs.get("gene_id_column", "names") similarity_by_column = kwargs.get("similarity_by_column", "logfoldchanges") #penalty_param = kwargs.get("penalty_param", 0) a = {} for c in markers_df.index.unique(): vector_series = pd.Series( markers_df[[gene_id_column, similarity_by_column]].loc[[c]][similarity_by_column].values, index=markers_df[[gene_id_column, similarity_by_column]].loc[[c]][gene_id_column].values ) l = len(vector_series) vector_series = vector_series.reindex(row.index, fill_value=np.nan) vector_series = min_max_scale(vector_series) valid_mask = ~vector_series.isna() & ~row.isna() # Number of non-zero valid entries t = (row[valid_mask] != 0).sum() if t == 0: # No valid pairs a[c] = 0.0 else: distance_val = euclidean(row[valid_mask], vector_series[valid_mask]) # Convert Euclidean distance to similarity in [0,1]: higher distance -> lower similarity similarity_val = 1 / (1 + distance_val) # Apply the penalty factor #a[c] = similarity_val * ((t / l) ** penalty_param) a[c] = similarity_val return a
[docs] def min_max_scale(series): series = series.fillna(0)# fill nan values with 0 min_val = series.min() max_val = series.max() if min_val == max_val: #return series.apply(lambda x: 0.0) # what if all values are the same? for now, return 0 #warnings.warn("All values in the series are identical; returning zeros.") return pd.Series(0.0, index=series.index) return (series - min_val) / (max_val - min_val)
[docs] def function_row_auc(row, markers_df, **kwargs): """ AUROC-style similarity between a spot (row) and each cluster's marker set. For each cluster c: - positives = marker genes of c present in row.index - negatives = all other genes in row.index - score = AUROC that positives have higher expression than negatives Parameters ---------- row : pandas.Series Expression values for one spot. Index must be gene IDs. markers_df : pandas.DataFrame DataFrame with markers. Index = cluster/cell type, and a column with gene IDs (e.g. 'names'). gene_id_column : str, in kwargs Name of the column in markers_df that holds gene IDs. min_markers : int, in kwargs, optional Minimum number of markers that must be present in the spot to compute a score. Otherwise returns fallback (default 0.5). fallback_auc : float, in kwargs, optional Value to use when AUC is undefined (e.g. too few markers or no negatives). Returns ------- dict {cluster_label: auc_score} """ gene_id_column = kwargs.get("gene_id_column", "names") min_markers = kwargs.get("min_markers", 3) fallback_auc = kwargs.get("fallback_auc", 0.5) # Rank genes once per row (1 = lowest expression, N = highest) ranks = row.rank(method="average", ascending=True) N = len(ranks) # Pre-compute for safety if N < 2: # Degenerate case: only one gene return {c: fallback_auc for c in markers_df.index.unique()} scores = {} # Iterate clusters / groups in markers_df for c in markers_df.index.unique(): # Get marker genes for this cluster gene_list = markers_df.loc[[c], gene_id_column].astype(str) # Intersect with genes present in this spot genes_pos = pd.Index(gene_list).intersection(ranks.index) n_pos = len(genes_pos) n_neg = N - n_pos # Not enough markers or no negatives -> undefined AUC if (n_pos < min_markers) or (n_neg <= 0): scores[c] = fallback_auc continue # Sum of ranks of positives sum_ranks = ranks[genes_pos].sum() # Wilcoxon U statistic U = sum_ranks - n_pos * (n_pos + 1) / 2.0 # Normalize to AUC auc = U / (n_pos * n_neg) # Numerical safety: clamp to [0,1] if not np.isfinite(auc): auc = fallback_auc else: auc = max(0.0, min(1.0, auc)) scores[c] = auc return scores
import pandas as pd import numpy as np
[docs] def function_row_auc_specific(row, markers_df, **kwargs): """ High-specificity AUROC scoring. Filters out noise and focuses on top markers to reduce false positives. """ gene_id_column = kwargs.get("gene_id_column", "names") min_markers = kwargs.get("min_markers", 3) fallback_auc = kwargs.get("fallback_auc", 0.5) # NEW 1: Expression Threshold # Only rank genes that have biologically relevant expression. # Everything below this is treated as 'negative' background. expr_threshold = kwargs.get("expression_threshold", 0.1) # NEW 2: Limit to Top N Markers # Only look for the top N most specific markers per cluster. top_n = kwargs.get("top_n_markers", None) # Filter the row first valid_genes = row[row > expr_threshold] # If the spot is empty after filtering, return fallback if len(valid_genes) < 2: return {c: fallback_auc for c in markers_df.index.unique()} # Rank only the expressed genes # Genes not in valid_genes are implicitly rank 0 (or below these ranks) ranks = valid_genes.rank(method="average", ascending=True) N = len(row) # The universe is still the total gene space of the spot scores = {} for c in markers_df.index.unique(): # Get marker genes gene_list = markers_df.loc[[c], gene_id_column].astype(str) # NEW 2 Implementation: Slice the top N markers if top_n is not None: gene_list = gene_list.iloc[:top_n] # Intersect with valid (above threshold) genes genes_pos = pd.Index(gene_list).intersection(ranks.index) n_pos_found = len(genes_pos) n_total_markers = len(gene_list) # STRICTER CHECK: # You might want to fail if we found fewer than X% of the top markers if n_pos_found < min_markers: scores[c] = fallback_auc continue # Calculate AUC components # Note: We treat genes below threshold as having rank 0. # However, for standard AUC calculation in this specific context (Genes vs Background), # we usually only care about the ranks of the DETECTED markers relative to DETECTED background. # Rank Sum of detected markers sum_ranks = ranks[genes_pos].sum() # The number of negatives is the number of valid genes minus the markers found n_neg = len(valid_genes) - n_pos_found if n_neg <= 0: scores[c] = fallback_auc continue # Wilcoxon U U = sum_ranks - n_pos_found * (n_pos_found + 1) / 2.0 auc = U / (n_pos_found * n_neg) # NEW 3: Recovery Penalty (Optional) # If you want to penalize spots that only have 3 out of 50 markers, # multiply the AUC by the fraction of markers recovered. # Uncomment the line below to enable: # auc = auc * (n_pos_found / n_total_markers) if not np.isfinite(auc): auc = fallback_auc else: auc = max(0.0, min(1.0, auc)) scores[c] = auc return scores
[docs] def function_row_jaccard(row, markers_df, **kwargs): a = {} gene_id_column=kwargs.get("gene_id_column") #threshold=kwargs.get("threshold") for c in markers_df.index.unique(): #row_set = set(row[row > threshold].sort_values(ascending=False).index) row_set = set(row[row > 0].sort_values(ascending=False).index) #non-zero values vector_set = set(markers_df.loc[[c]][gene_id_column].values) # Calculate intersection and union i = row_set.intersection(vector_set) union = len(row_set.union(vector_set)) if union == 0: jaccard_sim = 0.0 # If both sets are empty, define similarity as 0 else: jaccard_sim = len(i) / union a[c] = jaccard_sim return a
#Szymkiewicz–Simpson
[docs] def function_row_overlap(row, markers_df, **kwargs): a = {} gene_id_column=kwargs.get("gene_id_column") #threshold=kwargs.get("threshold") for c in markers_df.index.unique(): row_set = set(row[row > 0].sort_values(ascending=False).index) #non-zero values vector_set = set(markers_df.loc[[c]][gene_id_column].values) # Calculate intersection and union i = row_set.intersection(vector_set) union = min(len(row_set),len(vector_set)) if union == 0: overlap_sim = 0.0 # If both sets are empty, define similarity as 0 else: overlap_sim = len(i) / union #what if min len differs, investigate! a[c] = overlap_sim #return the intersection as well so we can use it later for common genes return a
[docs] def function_row_diagnostic(row, markers_df, **kwargs): a = {} gene_id_column=kwargs.get("gene_id_column") for c in markers_df.index.unique(): row_set = set(row[row > 0].sort_values(ascending=False).index) #non-zero values vector_set = set(markers_df.loc[[c]][gene_id_column].values) # Calculate intersection a[c] = row_set.intersection(vector_set) return a
[docs] def function_row_sum(row, markers_df, **kwargs): a = {} gene_id_column=kwargs.get("gene_id_column") for c in markers_df.index.unique(): #row_set = set(row[row > 0].sort_values(ascending=False).index) #non-zero values vector_set = markers_df.loc[[c]][gene_id_column].values # Calculate intersection and union #a[c] = row_set.intersection(vector_set) a[c] = row[vector_set].sum() return a
[docs] def function_row_mean(row, markers_df, **kwargs): a = {} gene_id_column=kwargs.get("gene_id_column") for c in markers_df.index.unique(): #row_set = set(row[row > 0].sort_values(ascending=False).index) #non-zero values vector_set = markers_df.loc[[c]][gene_id_column].values # Calculate intersection and union #a[c] = row_set.intersection(vector_set) a[c] = row[vector_set].mean() return a
[docs] def function_row_median(row, markers_df, **kwargs): a = {} gene_id_column=kwargs.get("gene_id_column") for c in markers_df.index.unique(): #row_set = set(row[row > 0].sort_values(ascending=False).index) #non-zero values vector_set = markers_df.loc[[c]][gene_id_column].values # Calculate intersection and union #a[c] = row_set.intersection(vector_set) a[c] = row[vector_set].median() return a
[docs] def function_row_weighted_jaccard(row, markers_df, **kwargs): gene_id_column = kwargs.get("gene_id_column","names") weight_column = kwargs.get("weight_column", None) # Name of the weight column in markers_df lambda_param = kwargs.get("lambda_param", 0.25) # Default lambda for exponential decay a = {} # Get the genes and their expression levels from 'row' (target set) # Normalize the expression levels to range between 0 and 1 target_genes = row[row > 0] # Select genes with expression > 0 max_expr = target_genes.max() if max_expr > 0: target_weights = target_genes / max_expr #target_weights = target_genes / target_genes.sum() #L1 normalization else: target_weights = target_genes # Will be an empty Series # Determine if pre-calculated weights are to be used use_precalculated_weights = weight_column is not None and weight_column in markers_df.columns # Iterate over each cluster for c in markers_df.index.unique(): # Extract genes and weights for the current cluster cluster_df = markers_df.loc[[c]] cluster_genes = cluster_df[gene_id_column].reset_index(drop=True) if use_precalculated_weights: # Use pre-calculated weights cluster_weight_values = cluster_df[weight_column].reset_index(drop=True) # Normalize cluster weights to range between 0 and 1 max_weight = cluster_weight_values.max() if max_weight > 0: cluster_weights = cluster_weight_values / max_weight #cluster_weights = cluster_weight_values / cluster_weight_values.sum() #L1 normalization else: cluster_weights = cluster_weight_values # Create a pandas Series with genes as index and weights as values cluster_weights = pd.Series(cluster_weights.values, index=cluster_genes) else: # Assign exponential rank-based weights N = len(cluster_genes) if N > 0: ranks = np.arange(N) # Rank positions starting from 0 weights = np.exp(-lambda_param * ranks) #weights /= weights.sum() # Normalize weights to sum to 1 L1 normalization else: weights = np.array([]) # Create a pandas Series with weights assigned to genes cluster_weights = pd.Series(weights, index=cluster_genes) # Union of genes in 'cluster_weights' and 'target_weights' all_genes = set(cluster_weights.index).union(target_weights.index) # Initialize numerator and denominator for Weighted Jaccard Index numerator = 0.0 denominator = 0.0 for gene in all_genes: # Weight in 'cluster_weights', 0 if gene not present a_i = cluster_weights.get(gene, 0.0) # Weight in 'target_weights' (normalized expression level), 0 if gene not present b_i = target_weights.get(gene, 0.0) numerator += min(a_i, b_i) denominator += max(a_i, b_i) # Compute the Weighted Jaccard Index if denominator == 0.0: jaccard_sim = 0.0 else: jaccard_sim = numerator / denominator a[c] = jaccard_sim return a
[docs] def add_df_to_spatialdata(sdata,df,bin_size=8): try: table_name = f"square_{bin_size:03}um" table = sdata.tables[table_name] except (AttributeError, KeyError): try: table = sdata.tables["table"] except (AttributeError, KeyError): table = sdata table.obs.drop(columns=df.columns,inplace=True,errors='ignore') table.obs=pd.merge(table.obs, df, left_index=True, right_index=True) print("DataFrame added to SpatialData object") print(df.columns) return
[docs] def test_function(): print("Easydecon loaded!") print("Test function executed!")
[docs] def composite_score(row): nonzero = row[row > 0] return nonzero.sum() * (len(nonzero) / len(row)) if not nonzero.empty else 0