Source code for cellmaps_vnn.annotate

import copy
import json
import logging
import os
import shutil
from datetime import date
import getpass

import ndex2
import numpy as np
import pandas as pd
from cellmaps_utils.ndexupload import NDExHierarchyUploader
from cellmaps_utils import constants
from cellmaps_utils.provenance import ProvenanceUtil

from cellmaps_vnn.util import copy_and_register_gene2id_file
import ndex2.constants as ndexconstants

import cellmaps_vnn
import cellmaps_vnn.constants as vnnconstants
from ndex2.cx2 import RawCX2NetworkFactory, CX2Network

from cellmaps_vnn.exceptions import CellmapsvnnError

logger = logging.getLogger(__name__)


[docs] class VNNAnnotate: COMMAND = 'annotate' DEFAULT_NDEX_SERVER = 'ndexbio.org' DEFAULT_PASSWORD = '-' def __init__(self, outdir, model_predictions, disease=None, hierarchy=None, parent_network=None, ndexserver=DEFAULT_NDEX_SERVER, ndexuser=None, ndexpassword=DEFAULT_PASSWORD, visibility=False, slurm=False, slurm_partition=None, slurm_account=None): """ Constructor. Sets up the hierarchy path either directly from the arguments or by looking for a hierarchy.cx2 file in the first RO-Crate directory provided. If neither is found, raises an error. :raises CellmapsvnnError: If no hierarchy path is specified or found. """ self._outdir = os.path.abspath(outdir) self.original_hierarchy = None self.hierarchy = hierarchy self.parent_network = parent_network self._model_predictions = model_predictions self._disease = disease self._ndexserver = ndexserver self._ndexuser = ndexuser self._ndexpassword = ndexpassword self._visibility = visibility self._slurm = slurm self._slurm_partition = slurm_partition self._slurm_account = slurm_account
[docs] @staticmethod def add_subparser(subparsers): """ Adds a subparser for the 'annotate' command. """ # TODO: modify description later desc = f""" Version: {cellmaps_vnn.__version__} The 'annotate' command reads model prediction outputs (RLIPP scores) from one or more RO-Crate directories and uses them to annotate a given hierarchy network in CX2 format. The command can optionally filter scores by disease, generate SLURM scripts for batch processing, and upload annotated networks to NDEx for visualization and sharing. """ parser = subparsers.add_parser(VNNAnnotate.COMMAND, help='Run prediction using a trained model', description=desc, formatter_class=constants.ArgParseFormatter) parser.add_argument('outdir', help='Directory to write results to') parser.add_argument('--model_predictions', nargs='+', required=True, help='Path to one or multiple RO-Crate with the predictions and interpretations ' 'obtained from predict step', type=str) parser.add_argument('--disease', help='Specify the disease or cancer type for which the annotations will be ' 'performed. This allows the annotation process to tailor the results ' 'according to the particular disease or cancer type. If not set, ' 'prediction importance scores for all diseases will be aggregated.', type=str) parser.add_argument('--hierarchy', help='Path to hierarchy (optional). If not set, the process will search for ' 'hierarchy.cx2 in first RO-Crate (passed in --model_predictions).', type=str) parser.add_argument('--parent_network', help='Path to interactome (parent network, optional) of the annotated ' 'hierarchy or NDEx UUID of parent network. If not set, the ' 'process will search for hierarchy_parent.cx2 in first RO-Crate ' '(passed in --model_predictions).', type=str) parser.add_argument('--ndexserver', default=VNNAnnotate.DEFAULT_NDEX_SERVER, help='Server where annotated hierarchy will be uploaded to') parser.add_argument('--ndexuser', help='NDEx user account. Required if uploading to NDEx.') parser.add_argument('--ndexpassword', default=VNNAnnotate.DEFAULT_PASSWORD, help='NDEx password. Enter "-" to input password interactively, or provide a file ' 'containing the password. Required if uploading to NDEx.') parser.add_argument('--visibility', action='store_true', help='If set, makes Hierarchy and interactome network loaded onto ' 'NDEx publicly visible') parser.add_argument('--slurm', help='If set, slurm script for training will be generated.', action='store_true') parser.add_argument('--slurm_partition', help='Slurm partition', type=str) parser.add_argument('--slurm_account', help='Slurm account', type=str)
[docs] def run(self): """ The logic for annotating hierarchy with prediction results from cellmaps_vnn. It aggregates prediction scores from models, optionally filters them for a specific disease, and annotates the hierarchy with these scores. """ self._process_rocrates() self._process_input_hierarchy_and_parent() hierarchy_cx, original_hierarchy_cx = self._get_hierarchy_cx() parent_cx = self._get_parent_cx() interactome_dict, root_id = self._annotate_interactomes_of_systems(parent_cx, hierarchy_cx) self._process_scores_and_annotate_hierarchy(hierarchy_cx, original_hierarchy_cx) self._upload_to_ndex_if_credentials_provided(interactome_dict, root_id)
[docs] def register_outputs(self, outdir, description, keywords, provenance_utils): """ Registers the output files of the annotation process with the FAIRSCAPE service for data provenance. This includes the annotated hierarchy and the RLIPP output files. :param outdir: The output directory where the files are stored. :type outdir: str :param description: A description of the files for provenance registration. :type description: str :param keywords: A list of keywords associated with the files. :type keywords: list :param provenance_utils: The utility class for provenance registration. :type provenance_utils: ProvenanceUtility :return: A list of dataset IDs assigned to the registered files. :rtype: list """ hierarchy_id = self._register_hierarchy(outdir, description, keywords, provenance_utils) rlipp_id = self._register_rlipp_file(outdir, description, keywords, provenance_utils) return_ids = [hierarchy_id, rlipp_id] gene2ind_path = os.path.join(self._model_predictions[0], 'gene2ind.txt') if os.path.exists(gene2ind_path): gene2ind_id = copy_and_register_gene2id_file(gene2ind_path, outdir, description, keywords, provenance_utils) return_ids.append(gene2ind_id) if self.original_hierarchy is not None: original_hierarchy_id = self._register_original_hierarchy(outdir, description, keywords, provenance_utils) return_ids.append(original_hierarchy_id) if self.parent_network is not None and os.path.isfile(self.parent_network): hierarchy_parent_id = self._copy_and_register_hierarchy_parent(outdir, description, keywords, provenance_utils) return_ids.append(hierarchy_parent_id) return return_ids
def _process_scores_and_annotate_hierarchy(self, hierarchy_cx, original_hierarchy_cx): """ Orchestrates the score aggregation and annotation of the hierarchy network. This includes: - Aggregating RLIPP importance scores from prediction files - Selecting scores for a specific disease or averaging across diseases - Annotating nodes with scores - Propagating importance to hierarchy edges - Writing updated networks to file :param hierarchy_cx: Loaded CX2 network representing the hierarchy :param original_hierarchy_cx: Loaded original CX2 hierarchy (if available) """ self._aggregate_importance_scores_from_models() if self._disease is None: annotation_dict = self._aggregate_scores_from_diseases() else: annotation_dict = self._get_scores_for_disease(self._disease) if len(annotation_dict) == 0: print("No system importance scores available for annotation. Training was not sufficient. " "Increase number of epochs and run train and predict again.") raise CellmapsvnnError("No system importance scores available for annotation. " "Please ensure valid data is provided for the hierarchy annotation.") hierarchy, original_hierarchy = self._annotate_hierarchy(annotation_dict, hierarchy_cx, original_hierarchy_cx) hierarchy = self._annotate_hierarchy_edges(hierarchy) hierarchy = self._add_provenance_info(hierarchy) self._save_hierarchy_to_file(hierarchy, original_hierarchy) def _get_rlipp_out_dest_file(self): """ Constructs the file path for the RLIPP output file within the specified output directory. :return: The file path for the RLIPP output file. :rtype: str """ return os.path.join(self._outdir, vnnconstants.RLIPP_OUTPUT_FILE) def _get_hierarchy_dest_file(self): """ Constructs the file path for the hierarchy output file within the specified output directory. :return: The file path for the hierarchy output file. :rtype: str """ return os.path.join(self._outdir, vnnconstants.HIERARCHY_FILENAME) def _get_original_hierarchy_dest_file(self): """ Constructs the file path for the hierarchy output file within the specified output directory. :return: The file path for the hierarchy output file. :rtype: str """ return os.path.join(self._outdir, vnnconstants.ORIGINAL_HIERARCHY_FILENAME) def _aggregate_importance_scores_from_models(self): """ Aggregates prediction scores from multiple models' outputs by averaging them. The aggregated scores are then saved to the RLIPP output destination file. """ data = {} for directory in self._model_predictions: filepath = os.path.join(directory, vnnconstants.RLIPP_OUTPUT_FILE) has_disease = False with open(filepath, 'r') as file: for line in file: if line.startswith('Term') or not line.strip(): if 'Disease' in line: has_disease = True continue parts = line.strip().split('\t') if has_disease: key = (parts[0], parts[-1]) # (Term, Disease) values = np.array([float(v) for v in parts[1:-1]]) else: key = (parts[0], 'unspecified') values = np.array([float(v) for v in parts[1:]]) if key not in data: data[key] = [] data[key].append(values) averaged_data = {k: np.mean(v, axis=0) for k, v in data.items()} with open(self._get_rlipp_out_dest_file(), 'w') as outfile: outfile.write("Term\tP_rho\tP_pval\tC_rho\tC_pval\tRLIPP\tDisease\n") for (term, disease), values in averaged_data.items(): outfile.write(f"{term}\t" + "\t".join([f"{v:.5e}" for v in values]) + f"\t{disease}\n") def _aggregate_scores_from_diseases(self): """ Aggregates the prediction scores for all diseases by averaging P_rho score. :return: A dictionary mapping each term to its averaged P_rho score across all diseases. :rtype: dict """ data = pd.read_csv(self._get_rlipp_out_dest_file(), sep='\t') aggregated_data = data.groupby('Term').agg({ vnnconstants.PRHO_SCORE: 'mean', vnnconstants.P_PVAL_SCORE: 'mean', vnnconstants.CRHO_SCORE: 'mean', vnnconstants.C_PVAL_SCORE: 'mean', vnnconstants.RLIPP_SCORE: 'mean' }) aggregated_dict = { term: [row[vnnconstants.PRHO_SCORE], row[vnnconstants.P_PVAL_SCORE], row[vnnconstants.CRHO_SCORE], row[vnnconstants.C_PVAL_SCORE], row[vnnconstants.RLIPP_SCORE]] for term, row in aggregated_data.iterrows() } return aggregated_dict def _get_scores_for_disease(self, disease): """ Retrieves prediction scores for a specific disease, returning a dictionary mapping each term to its P_rho score for the given disease. :param disease: The disease or cancer type for which scores are requested. :type disease: str :return: A dictionary with Term as keys and P_rho scores as values for the specified disease. :rtype: dict """ data = pd.read_csv(self._get_rlipp_out_dest_file(), sep='\t') filtered_data = data[data['Disease'] == disease] if filtered_data.empty: return {} scores = { term: [row[vnnconstants.PRHO_SCORE], row[vnnconstants.P_PVAL_SCORE], row[vnnconstants.CRHO_SCORE], row[vnnconstants.C_PVAL_SCORE], row[vnnconstants.RLIPP_SCORE]] for term, row in filtered_data.set_index('Term').iterrows() } return scores def _upload_to_ndex_if_credentials_provided(self, interactome_list, root_id): """ Uploads hierarchy and parent network to NDEx if credentials are provided. This method checks if the NDEx server, user, and password credentials are provided. If they are, it uploads the hierarchy and parent network to NDEx. If the parent network is not specified, it raises an error. If the password is specified as '-', it prompts the user to enter the password interactively. """ if self._ndexserver and self._ndexuser and self._ndexpassword: if self._ndexpassword == '-': self._ndexpassword = getpass.getpass(prompt="Enter NDEx Password: ") ndex_uploader = NDExHierarchyUploader(self._ndexserver, self._ndexuser, self._ndexpassword, self._visibility) if self.parent_network is None: logger.warning("Parent network was not specified. Hierarchy will not be in cell view.") cx_factory = RawCX2NetworkFactory() hierarchy_network = cx_factory.get_cx2network(self._get_hierarchy_dest_file()) _, hierarchyurl = ndex_uploader._save_network(hierarchy_network) else: if interactome_list is not None: for system_id, (system_interactome, _) in interactome_list.items(): system_interactome_uuid, _ = ndex_uploader._save_network(system_interactome) self.styled_hierarchy.add_node_attribute(system_id, key='HCX::interactionNetworkUUID', value=system_interactome_uuid) if system_id == root_id: self.styled_hierarchy = ndex_uploader._update_hcx_annotations(self.styled_hierarchy, system_interactome_uuid) if root_id is not None: _, hierarchyurl = ndex_uploader._save_network(self.styled_hierarchy) else: _, _, _, hierarchyurl = ndex_uploader.save_hierarchy_and_parent_network(self.styled_hierarchy, self.parent_network) elif os.path.isfile(self.parent_network): _, _, _, hierarchyurl = ndex_uploader.upload_hierarchy_and_parent_network_from_files( hierarchy_path=self._get_hierarchy_dest_file(), parent_path=self.parent_network) else: cx_factory = RawCX2NetworkFactory() hierarchy_network = cx_factory.get_cx2network(self._get_hierarchy_dest_file()) _, _, _, hierarchyurl = ndex_uploader.save_hierarchy_and_parent_network(hierarchy_network, self.parent_network) print(f'Hierarchy uploaded. To view hierarchy on NDEx please paste this URL in your ' f'browser {hierarchyurl}. To view Hierarchy on new experimental Cytoscape on the Web, go to ' f'{ndex_uploader.get_cytoscape_url(hierarchyurl)}') @staticmethod def _annotate_with_score(hierarchy, original_hierarchy, node_id, score_name, score): """ Adds a score attribute to a node in both the hierarchy and the original hierarchy (if provided). :param hierarchy: The primary hierarchy network to annotate :param original_hierarchy: The original hierarchy network (optional) :param node_id: Node identifier to which the attribute is added :param score_name: Name of the attribute (e.g., P_rho) :param score: Score value to be annotated (float) """ hierarchy.add_node_attribute(node_id, score_name, score, datatype='double') if original_hierarchy is not None: original_hierarchy.add_node_attribute(node_id, score_name, score, datatype='double') def _annotate_hierarchy(self, annotation_dict, hierarchy, original_hierarchy): """ Annotates the hierarchy with P_rho scores from the given annotation dictionary, updating node attributes within the hierarchy file. :param annotation_dict: A dictionary mapping terms to their P_rho scores. :type annotation_dict: dict :param hierarchy: :param original_hierarchy: """ for term, score in annotation_dict.items(): node_id = term if not isinstance(term, int): node_id = hierarchy.lookup_node_id_by_name(term) if node_id is not None: self._annotate_with_score(hierarchy, original_hierarchy, node_id, vnnconstants.PRHO_SCORE, score[0]) self._annotate_with_score(hierarchy, original_hierarchy, node_id, vnnconstants.P_PVAL_SCORE, score[1]) self._annotate_with_score(hierarchy, original_hierarchy, node_id, vnnconstants.CRHO_SCORE, score[2]) self._annotate_with_score(hierarchy, original_hierarchy, node_id, vnnconstants.C_PVAL_SCORE, score[3]) self._annotate_with_score(hierarchy, original_hierarchy, node_id, vnnconstants.RLIPP_SCORE, score[4]) self._annotate_with_score(hierarchy, original_hierarchy, node_id, vnnconstants.IMPORTANCE_SCORE, score[0]) return hierarchy, original_hierarchy def _save_hierarchy_to_file(self, hierarchy, original_hierarchy): """ Saves the annotated hierarchy and original hierarchy (if provided) to CX2 files. This method applies visualization styling from built-in CX2 templates to enhance the appearance of the saved networks. The styled hierarchy is also stored for further use (e.g., NDEx upload). :param hierarchy: Annotated hierarchy network :param original_hierarchy: Original version of the hierarchy (if available) """ factory = RawCX2NetworkFactory() path_to_style_network = os.path.join(os.path.dirname(cellmaps_vnn.__file__), 'nest_style.cx2') style_network = factory.get_cx2network(path_to_style_network) vis_prop = style_network.get_visual_properties() hierarchy.set_visual_properties(vis_prop) hierarchy.write_as_raw_cx2(self._get_hierarchy_dest_file()) if original_hierarchy is not None: original_hierarchy.set_visual_properties(vis_prop) original_hierarchy.write_as_raw_cx2(self._get_original_hierarchy_dest_file()) self.styled_hierarchy = hierarchy def _annotate_interactomes_of_systems(self, parent_cx, hierarchy_cx): """ Creates and annotates interactome subnetworks for each system node in the hierarchy. For each system, gene-level importance scores are read from each prediction directory, averaged, and added as node attributes in a new subnetwork extracted from the parent interactome. :param parent_cx: CX2 parent network (interactome) used as source for subnetworks :param hierarchy_cx: CX2 hierarchy network containing system nodes :return: A tuple (interactome_dict, root_id) where: - interactome_dict maps system node IDs to (CX2Network, output file path) - root_id is the ID of the root system node (if identified) """ if parent_cx is None: return None, None interactome_dict = dict() hierarchy_net_attrs = copy.deepcopy(hierarchy_cx.get_network_attributes()) for attr_to_remove in ['ndexSchema', 'HCX::modelFileCount', 'HCX::interactionNetworkUUID', 'HCX::interactionNetworkName']: if attr_to_remove in hierarchy_net_attrs: del hierarchy_net_attrs[attr_to_remove] root_id = None for system_id, node_obj in hierarchy_cx.get_nodes().items(): new_subnet = CX2Network() factory = RawCX2NetworkFactory() interactome_style = factory.get_cx2network( os.path.join(os.path.dirname(cellmaps_vnn.__file__), 'interactome_style.cx2')) new_subnet.set_visual_properties(interactome_style.get_visual_properties()) system_name = node_obj.get(ndexconstants.ASPECT_VALUES, {}).get(ndexconstants.NODE_NAME, system_id) hierarchy_net_attrs['description'] = 'RESULTS FOR SYSTEM ' + str(system_name) new_subnet.set_network_attributes(hierarchy_net_attrs) new_subnet.set_name(str(system_name) + ' assembly') # Aggregate gene scores from all model_prediction directories gene_scores_accumulator = {} count = {} for directory in self._model_predictions: scores_file = os.path.join(directory, str(system_id) + vnnconstants.SCORE_FILE_NAME_SUFFIX) if not os.path.isfile(scores_file): continue df = pd.read_csv(scores_file, sep='\t') for _, row in df.iterrows(): gene = row['gene'] if gene not in gene_scores_accumulator: gene_scores_accumulator[gene] = { vnnconstants.MUTATION_IMPORTANCE_SCORE: 0.0, vnnconstants.DELETION_IMPORTANCE_SCORE: 0.0, vnnconstants.AMPLIFICATION_IMPORTANCE_SCORE: 0.0, vnnconstants.GENE_IMPORTANCE_SCORE: 0.0 } count[gene] = 0 gene_scores_accumulator[gene][vnnconstants.MUTATION_IMPORTANCE_SCORE] \ += row[vnnconstants.MUTATION_IMPORTANCE_SCORE] gene_scores_accumulator[gene][vnnconstants.DELETION_IMPORTANCE_SCORE] \ += row[vnnconstants.DELETION_IMPORTANCE_SCORE] gene_scores_accumulator[gene][vnnconstants.AMPLIFICATION_IMPORTANCE_SCORE] \ += row[vnnconstants.AMPLIFICATION_IMPORTANCE_SCORE] gene_scores_accumulator[gene][vnnconstants.GENE_IMPORTANCE_SCORE] \ += row[vnnconstants.GENE_IMPORTANCE_SCORE] count[gene] += 1 # Average the scores for gene in gene_scores_accumulator: for key in gene_scores_accumulator[gene]: gene_scores_accumulator[gene][key] /= count[gene] member_ids = [] for gene, scores in gene_scores_accumulator.items(): member_node_id = parent_cx.lookup_node_id_by_name(gene) if member_node_id is None: continue interactome_node = copy.deepcopy(parent_cx.get_node(member_node_id)) for key, val in scores.items(): interactome_node[ndexconstants.ASPECT_VALUES][key] = val new_subnet.add_node(node_id=member_node_id, attributes=interactome_node['v'], x=interactome_node['x'], y=interactome_node['y']) member_ids.append(member_node_id) edge_ids_to_add = set() for edge_id, edge_obj in parent_cx.get_edges().items(): if edge_obj['s'] in member_ids and edge_obj['t'] in member_ids: edge_ids_to_add.add(edge_id) for edge in edge_ids_to_add: interactome_edge = parent_cx.get_edge(edge) new_subnet.add_edge(edge_id=edge, source=interactome_edge['s'], target=interactome_edge['t'], attributes=interactome_edge['v']) if 'HCX::isRoot' in node_obj['v'] and node_obj['v']['HCX::isRoot'] is True: root_id = system_id subnet_file_name = os.path.join(self._outdir, str(system_name) + vnnconstants.SYSTEM_INTERACTOME_FILE_SUFFIX) new_subnet.write_as_raw_cx2(subnet_file_name) interactome_dict[system_id] = (new_subnet, subnet_file_name) return interactome_dict, root_id def _register_hierarchy(self, outdir, description, keywords, provenance_utils): """ Register annotated hierarchy file with the FAIRSCAPE service for data provenance. :param outdir: The output directory where the outputs are stored. :param description: Description of the file for provenance registration. :param keywords: List of keywords associated with the file. :param provenance_utils: The utility class for provenance registration. :return: The dataset ID assigned to the registered file. """ hierarchy_out_file = self._get_original_hierarchy_dest_file() data_dict = {'name': os.path.basename(hierarchy_out_file) + ' Annotated hierarchy file that was used to build ' 'VNN', 'description': description + ' Annotated hierarchy file that was used to build VNN', 'keywords': keywords, 'data-format': 'CX2', 'author': cellmaps_vnn.__name__, 'version': cellmaps_vnn.__version__, 'date-published': date.today().strftime('%m-%d-%Y')} dataset_id = provenance_utils.register_dataset(outdir, source_file=hierarchy_out_file, data_dict=data_dict) return dataset_id def _register_original_hierarchy(self, outdir, description, keywords, provenance_utils): """ Register annotated hierarchy file with the FAIRSCAPE service for data provenance. :param outdir: The output directory where the outputs are stored. :param description: Description of the file for provenance registration. :param keywords: List of keywords associated with the file. :param provenance_utils: The utility class for provenance registration. :return: The dataset ID assigned to the registered file. """ hierarchy_out_file = self._get_hierarchy_dest_file() data_dict = {'name': os.path.basename(hierarchy_out_file) + ' Annotated hierarchy file', 'description': description + ' Annotated hierarchy file', 'keywords': keywords, 'data-format': 'CX2', 'author': cellmaps_vnn.__name__, 'version': cellmaps_vnn.__version__, 'date-published': date.today().strftime('%m-%d-%Y')} dataset_id = provenance_utils.register_dataset(outdir, source_file=hierarchy_out_file, data_dict=data_dict) return dataset_id def _copy_and_register_hierarchy_parent(self, outdir, description, keywords, provenance_utils): """ Copies the parent network file to the output directory and registers it with FAIRSCAPE. :param outdir: Directory where outputs are stored :param description: Description for provenance metadata :param keywords: Keywords to associate with the registered dataset :param provenance_utils: Provenance utility for dataset registration :return: The dataset ID assigned to the registered parent network file """ hierarchy_parent_out_file = os.path.join(outdir, 'hierarchy_parent.cx2') shutil.copy(self.parent_network, hierarchy_parent_out_file) data_dict = {'name': os.path.basename(hierarchy_parent_out_file) + ' Hierarchy parent network file', 'description': description + ' Hierarchy parent network file', 'keywords': keywords, 'data-format': 'CX2', 'author': cellmaps_vnn.__name__, 'version': cellmaps_vnn.__version__, 'date-published': date.today().strftime('%m-%d-%Y')} dataset_id = provenance_utils.register_dataset(outdir, source_file=hierarchy_parent_out_file, data_dict=data_dict) return dataset_id def _register_rlipp_file(self, outdir, description, keywords, provenance_utils): """ Registers the rlipp aggregated file with the FAIRSCAPE service for data provenance. :param outdir: The output directory where the outputs are stored. :param description: Description of the file for provenance registration. :param keywords: List of keywords associated with the file. :param provenance_utils: The utility class for provenance registration. :return: The dataset ID assigned to the registered file. """ dest_path = self._get_rlipp_out_dest_file() description = description description += ' rlipp results file averaged with multiple models' keywords = keywords keywords.extend(['file']) data_dict = {'name': os.path.basename(dest_path) + ' rlipp aggregated file', 'description': description, 'keywords': keywords, 'data-format': 'txt', 'author': cellmaps_vnn.__name__, 'version': cellmaps_vnn.__version__, 'date-published': date.today().strftime(provenance_utils.get_default_date_format_str())} dataset_id = provenance_utils.register_dataset(outdir, source_file=dest_path, data_dict=data_dict) return dataset_id def _process_input_hierarchy_and_parent(self): """ Initializes file paths for hierarchy, original hierarchy, and parent network. If paths are not explicitly provided, they are inferred from the first prediction RO-Crate directory. Raises an error if hierarchy cannot be found. """ # HIERARCHY: Check first ro-crate for hierarchy if not specified by hierarchy flag if self.hierarchy is None: self.hierarchy = os.path.join(self._model_predictions[0], vnnconstants.HIERARCHY_FILENAME) if not os.path.exists(self.hierarchy): raise CellmapsvnnError("No hierarchy was specified or found in first ro-crate") self.hierarchy = os.path.abspath(self.hierarchy) # ORIGINAL HIERARCHY: Check first ro-crate for original hierarchy original_hierarchy_path = os.path.join(self._model_predictions[0], vnnconstants.ORIGINAL_HIERARCHY_FILENAME) if os.path.exists(original_hierarchy_path): self.original_hierarchy = os.path.abspath(original_hierarchy_path) # PARENT (INTERACTOME): Check first ro-crate for parent network if not specified by parent_network flag if self.parent_network is None: parent_network_path = os.path.join(self._model_predictions[0], vnnconstants.PARENT_NETWORK_NAME) if os.path.exists(parent_network_path): self.parent_network = parent_network_path if self.parent_network is not None and os.path.isfile(self.parent_network): self.parent_network = os.path.abspath(self.parent_network) def _get_hierarchy_cx(self): """ Loads the CX2 hierarchy and, if available, the original hierarchy from file paths. :return: A tuple containing (hierarchy, original_hierarchy) CX2Network objects """ ... factory = RawCX2NetworkFactory() hierarchy = factory.get_cx2network(self.hierarchy) original_hierarchy = None if self.original_hierarchy: original_hierarchy = factory.get_cx2network(self.original_hierarchy) return hierarchy, original_hierarchy def _get_parent_cx(self): """ Loads the parent network (interactome) from file or NDEx server. :return: CX2Network object representing the parent network, or None if not found """ factory = RawCX2NetworkFactory() if self.parent_network is None: return None elif os.path.isfile(self.parent_network): return factory.get_cx2network(self.parent_network) else: client = ndex2.client.Ndex2() client_resp = client.get_network_as_cx2_stream(self.parent_network) return factory.get_cx2network(json.loads(client_resp.content)) @staticmethod def _annotate_hierarchy_edges(hierarchy): """ Propagates node importance scores to the edges based on paths to the root node. Edges leading from high-importance nodes are annotated with the score if not already annotated. :param hierarchy: The CX2 hierarchy network to annotate :return: The modified hierarchy with updated edge attributes """ node_id_score_map = {} for node_id, node_obj in hierarchy.get_nodes().items(): score = node_obj[ndexconstants.ASPECT_VALUES].get(vnnconstants.IMPORTANCE_SCORE, 0) node_id_score_map[node_id] = score edge_target_map = {} for edge_id, edge_obj in hierarchy.get_edges().items(): edge_target_map[edge_obj[ndexconstants.EDGE_TARGET]] = edge_obj hierarchy.add_edge_attribute(edge_id, key=vnnconstants.EDGE_IMPORTANCE_SCORE, value=0.0, datatype=ndexconstants.DOUBLE_DATATYPE) for node_id, score in node_id_score_map.items(): if score < 0.7: continue is_root = False cur_node_id = node_id while is_root is False: if cur_node_id not in edge_target_map: break edge_obj = edge_target_map[cur_node_id] if not edge_obj[ndexconstants.ASPECT_VALUES][vnnconstants.EDGE_IMPORTANCE_SCORE] > 0.0: edge_obj[ndexconstants.ASPECT_VALUES][vnnconstants.EDGE_IMPORTANCE_SCORE] = score cur_node_id = edge_obj[ndexconstants.EDGE_SOURCE] parent_node = hierarchy.get_node(cur_node_id) is_root = parent_node[ndexconstants.ASPECT_VALUES].get('HCX::isRoot', False) return hierarchy def _process_rocrates(self): for idx, directory in enumerate(self._model_predictions): if not os.path.exists(os.path.join(directory, vnnconstants.RLIPP_OUTPUT_FILE)): self._model_predictions[idx] = os.path.join(directory, 'out_predict') def _add_provenance_info(self, hierarchy, provenance_utils=ProvenanceUtil(), author='cellmaps_vnn', version=cellmaps_vnn.__version__): hierarchy.add_network_attribute(key='prov:wasGeneratedBy', value=author + ' ' + version) rocrate_id = provenance_utils.get_id_of_rocrate(os.path.dirname(self.hierarchy)) if rocrate_id is not None: hierarchy.add_network_attribute(key='prov:wasDerivedFrom', value='RO-crate: ' + str(rocrate_id)) return hierarchy