Source code for cellmaps_vnn.rlipp_calculator

import os

import numpy as np
import pandas as pd
import time
from cellmaps_vnn import constants
from scipy import stats
from multiprocessing import Pool
from joblib import Parallel, delayed
from sklearn.decomposition import PCA
from sklearn.linear_model import RidgeCV
import logging
from cellmaps_vnn.exceptions import CellmapsvnnError
from cellmaps_vnn.importance_score import ImportanceScoreCalculator

logger = logging.getLogger(__name__)


[docs] class RLIPPCalculator(ImportanceScoreCalculator): """ A calculator for Relative Importance of Predictor Performance (RLIPP) scores. Parameters: outdir (str): Output directory for the RLIPP scores and gene correlations. hierarchy (CX2Network): A hierarchy in HCX format. test_data (str): predicted_data (str): Path to the file containing predicted values. gene2idfile (str): Path to the file mapping genes to IDs. cell2idfile (str): Path to the file mapping cells to IDs. hidden_dir (str): Directory containing hidden layer outputs. rlipp_file (str): Path of the output file where results of rlipp algorithm will be saved gene_rho_file (str): Path of the output file where gene rho scores will be saved cpu_count (int): No of available cores num_hiddens_genotype (int): Mapping for the number of neurons in each term in genotype parts drug_count (int): No of top performing drugs """ def __init__(self, outdir, hierarchy, test_data, predicted_data, gene2idfile, cell2idfile, hidden_dir, cpu_count, num_hiddens_genotype, drug_count, excluded_terms=[]): super().__init__(hierarchy=hierarchy, outdir=outdir) all_terms = list(hierarchy.get_nodes().keys()) self.terms = [term for term in all_terms if term not in list(excluded_terms)] try: test_df = pd.read_csv(test_data, sep='\t', header=None, names=['C', 'D', 'AUC', 'DS']) except Exception as e: raise CellmapsvnnError(f"Failed to read test data from {test_data}: {e}") try: self.predicted_vals = np.loadtxt(predicted_data) except Exception as e: raise CellmapsvnnError(f"Failed to load predicted values from {predicted_data}: {e}") try: self.genes = pd.read_csv(gene2idfile, sep='\t', header=None, names=['I', 'G'])['G'] except Exception as e: raise CellmapsvnnError(f"Failed to read gene ID file from {gene2idfile}: {e}") try: cell_index = pd.read_csv(cell2idfile, sep="\t", header=None, names=['I', 'C']) except Exception as e: raise CellmapsvnnError(f"Failed to read cell ID file from {cell2idfile}: {e}") self.test_df = test_df[test_df['C'].isin(list(cell_index['C']))].reset_index(drop=True) self.hidden_dir = hidden_dir self.rlipp_file = os.path.join(self._outdir, constants.RLIPP_OUTPUT_FILE) self.gene_rho_file = os.path.join(self._outdir, constants.GENE_RHO_FILE) self.cpu_count = cpu_count self.num_hiddens_genotype = num_hiddens_genotype self.drug_count = drug_count self.drugs = list(set(self.test_df['D'])) if self.drug_count == 0: self.drug_count = len(self.drugs)
[docs] def create_drug_pos_map(self): """ Creates a mapping from drugs to their positions in the test data file. :return: A dictionary where keys are drugs and values are lists of positions in the test data. :rtype: dict """ drug_pos_map = {} for i, row in self.test_df.iterrows(): if row['D'] not in drug_pos_map: drug_pos_map[row['D']] = [] drug_pos_map[row['D']].append(i) return drug_pos_map
[docs] def create_drug_corr_map_sorted(self, drug_pos_map): """ Creates a sorted mapping of drugs to their Spearman correlation values. :param drug_pos_map: A dictionary mapping drugs to their positions in the test data. :type drug_pos_map: dict :return: A dictionary of drugs sorted by their Spearman correlation values in descending order. :rtype: dict """ test_auc = np.array(self.test_df['AUC']) drug_corr_map = {} for drug, positions in drug_pos_map.items(): if not positions: drug_corr_map[drug] = 0.0 continue try: test_vals = np.take(test_auc, positions) pred_vals = np.take(self.predicted_vals, positions) drug_corr_map[drug] = stats.spearmanr(test_vals, pred_vals)[0] except IndexError: drug_corr_map[drug] = 0.0 return {drug: corr for drug, corr in sorted(drug_corr_map.items(), key=lambda item: item[1], reverse=True)}
[docs] def load_feature(self, element, size): """ Loads hidden features for a given element. :param element: The element (term or gene) whose features are to be loaded. :type element: str :param size: The number of columns (features) to load. :type size: int :return: A numpy array of the hidden features for the given element. :rtype: numpy.ndarray """ file_name = self.hidden_dir + "/" + str(element) + '.hidden' return np.loadtxt(file_name, usecols=range(size))
[docs] def load_term_features(self, term): """ Loads hidden features for a given term. :param term: The term whose features are to be loaded. :type term: str :return: A numpy array of the hidden features for the given term. :rtype: numpy.ndarray """ return self.load_feature(term, self.num_hiddens_genotype)
[docs] def load_gene_features(self, gene): """ Loads hidden features for a given gene. :param gene: The gene whose features are to be loaded. :type gene: str :return: A numpy array of the hidden features for the given gene. :rtype: numpy.ndarray """ return self.load_feature(gene, 1)
[docs] def create_child_feature_map(self, feature_map, term): """ Creates a map of child features for a given term. :param feature_map: A dictionary mapping terms/genes to their features. :type feature_map: dict :param term: The term for which child features are to be created. :type term: str :return: A list of child features for the given term. :rtype: list """ child_features = [term] child_features.extend( feature_map[edge_data['t']] for _, edge_data in self._hierarchy.get_edges().items() if edge_data['s'] == term) return child_features
[docs] def load_all_features(self): """ Loads hidden features for all terms and genes. :return: A tuple containing two dictionaries, one mapping terms/genes to their features and the other mapping terms to their child features. :rtype: (dict, dict) """ feature_map = {} # Load term and gene features with Pool(self.cpu_count) as p: term_features = p.map(self.load_term_features, self.terms) gene_features = p.map(self.load_gene_features, self.genes) # Merge results into the feature map for i, term in enumerate(self.terms): feature_map[term] = term_features[i] for i, gene in enumerate(self.genes): feature_map[gene] = gene_features[i] # Build child feature map child_feature_map = {} for term in self.terms: children = [edge_data['t'] for edge_id, edge_data in self._hierarchy.get_edges().items() if edge_data['s'] == term] child_feature_map[term] = [feature_map[child] for child in children if child in feature_map] return feature_map, child_feature_map
[docs] @staticmethod def get_child_features(term_child_features, position_map): """ Gets a matrix of hidden features for a given term's children. :param term_child_features: A list of features for the children of a term. :type term_child_features: list :param position_map: A list of positions for which features are to be extracted. :type position_map: list :return: A matrix of hidden features for the children of the given term. :rtype: numpy.ndarray """ child_features = [] for f in term_child_features: child_features.append(np.take(f, position_map, axis=0)) return np.column_stack([f for f in child_features])
[docs] def exec_lm(self, X, y): """ Executes 5-fold cross-validated Ridge regression for a given hidden features matrix and returns the Spearman correlation value of the predicted output. :param X: The input matrix for regression. :type X: numpy.ndarray :param y: The target variable. :type y: numpy.ndarray :return: A tuple containing the Spearman correlation coefficient and p-value. :rtype: (float, float) """ pca = PCA(n_components=self.num_hiddens_genotype) X_pca = pca.fit_transform(X) regr = RidgeCV(cv=5) regr.fit(X_pca, y) y_pred = regr.predict(X_pca) return stats.spearmanr(y_pred, y)
[docs] def calc_term_rlipp(self, term_features, term_child_features, position_map, term, drug): """ Calculates the RLIPP score for a given term and drug. :param term_features: The features for the parent term. :type term_features: numpy.ndarray :param term_child_features: The features for the children of the term. :type term_child_features: list :param position_map: A list of positions for which RLIPP is to be calculated. :type position_map: list :param term: The term for which RLIPP is calculated. :type term: str :param drug: The drug for which RLIPP is calculated. :type drug: str :return: A formatted string containing the term, Spearman correlations, p-values, and RLIPP score. :rtype: str """ if not term_child_features: return '' X_parent = np.take(term_features, position_map, axis=0) X_child = self.get_child_features(term_child_features, position_map) y = np.take(self.predicted_vals, position_map) p_rho, p_pval = self.exec_lm(X_parent, y) c_rho, c_pval = self.exec_lm(X_child, y) message = (f"The model was not sufficiently trained - the system importance scores cannot be calculated " f"correctly. ") if c_rho == 0: reason = f"Reason: Division by zero error: c_rho is zero for term '{term}' and drug '{drug}'" raise CellmapsvnnError(message + reason) rlipp = p_rho / c_rho if np.isnan(rlipp) or np.isinf(rlipp): reason = f"Reason: Invalid RLIPP value: {rlipp} for term '{term}' and drug '{drug}'" raise CellmapsvnnError(message + reason) rlipp = p_rho / c_rho result = '{}\t{:.3e}\t{:.3e}\t{:.3e}\t{:.3e}\t{:.3e}\n'.format(term, p_rho, p_pval, c_rho, c_pval, rlipp) return result
[docs] def calc_gene_rho(self, gene_features, position_map, gene, drug): """ Calculates Spearman correlation between gene embeddings and predicted AUC. :param gene_features: The features for the gene. :type gene_features: numpy.ndarray :param position_map: A list of positions for which correlation is to be calculated. :type position_map: list :param gene: The gene for which correlation is calculated. :type gene: str :param drug: The drug for which correlation is calculated. :type drug: str :return: A formatted string containing the gene, Spearman correlation, and p-value. :rtype: str """ pred = np.take(self.predicted_vals, position_map) gene_embeddings = np.take(gene_features, position_map) rho, p_val = stats.spearmanr(pred, gene_embeddings) result = '{}\t{:.3e}\t{:.3e}\n'.format(gene, rho, p_val) return result
[docs] def calc_scores(self): """ Calculates RLIPP scores for top n drugs (n = drug_count), and prints the result in "Drug Term P_rho C_rho RLIPP" format. This method runs the calculation in parallel for efficiency. """ logger.info('Starting prediction process') print('Starting score calculation') drug_pos_map = self.create_drug_pos_map() sorted_drugs = list(self.create_drug_corr_map_sorted(drug_pos_map).keys())[0:self.drug_count] start = time.time() feature_map, child_feature_map = self.load_all_features() time_passed = time.time() - start logger.info('Time taken to load features: {:.4f}'.format(time_passed)) with open(self.rlipp_file, "w") as rlipp_file, open(self.gene_rho_file, "w") as gene_rho_file: rlipp_file.write('Term\tP_rho\tP_pval\tC_rho\tC_pval\tRLIPP\n') gene_rho_file.write('Gene\tRho\tP_val\n') with Parallel(backend="multiprocessing", n_jobs=self.cpu_count) as parallel: for i, drug in enumerate(sorted_drugs): try: start = time.time() # Parallel computation of RLIPP and gene correlation results rlipp_results = parallel( delayed(self.calc_term_rlipp)(feature_map[term], child_feature_map[term], drug_pos_map[drug], term, drug) for term in self.terms) gene_rho_results = parallel( delayed(self.calc_gene_rho)(feature_map[gene], drug_pos_map[drug], gene, drug) for gene in self.genes) # After collecting all results, write them to files for result in rlipp_results: rlipp_file.write(result) for result in gene_rho_results: gene_rho_file.write(result) time_passed = time.time() - start logger.info('Drug {} completed in {:.4f} seconds'.format((i + 1), time_passed)) except CellmapsvnnError as e: print(e) raise CellmapsvnnError(e) except Exception as e: logger.warning(f"Error during processing for drug {drug}: {e}") print(f"Error during processing for drug {drug}: {e}")