Source code for cellmaps_vnn.runner

#! /usr/bin/env python

import os
import time
import logging
from cellmaps_utils import logutils, constants
from cellmaps_utils.provenance import ProvenanceUtil
from cellmaps_vnn.annotate import VNNAnnotate
import cellmaps_vnn.constants as vnnconstants

from cellmaps_vnn.predict import VNNPredict

from cellmaps_vnn.train import VNNTrain

import cellmaps_vnn
from cellmaps_vnn.exceptions import CellmapsvnnError

logger = logging.getLogger(__name__)


[docs] class VnnRunner(object): def __init__(self, outdir): if outdir is None: raise CellmapsvnnError('outdir is None') self._outdir = os.path.abspath(outdir)
[docs] def run(self): """ Runs VNN :raises NotImplementedError: Always raised cause subclasses need to implement """ raise NotImplementedError('subclasses need to implement')
[docs] class SLURMCellmapsvnnRunner(VnnRunner): def __init__(self, outdir=None, command=None, inputdir=None, gene_attribute_name=vnnconstants.GENE_SET_COLUMN_NAME, gene2id=None, cell2id=None, mutations=None, cn_deletions=None, cn_amplifications=None, training_data=None, batchsize=vnnconstants.DEFAULT_BATCHSIZE, cuda=vnnconstants.DEFAULT_CUDA, zscore_method=vnnconstants.DEFAULT_ZSCORE_METHOD, epoch=VNNTrain.DEFAULT_EPOCH, lr=VNNTrain.DEFAULT_LR, wd=VNNTrain.DEFAULT_WD, alpha=VNNTrain.DEFAULT_ALPHA, genotype_hiddens=vnnconstants.DEFAULT_GENOTYPE_HIDDENS, optimize=VNNTrain.DEFAULT_OPTIMIZE, n_trials=VNNTrain.DEFAULT_N_TRIALS, patience=VNNTrain.DEFAULT_PATIENCE, delta=VNNTrain.DEFAULT_DELTA, min_dropout_layer=VNNTrain.DEFAULT_MIN_DROPOUT_LAYER, dropout_fraction=VNNTrain.DEFAULT_DROPOUT_FRACTION, skip_parent_copy=False, cpu_count=VNNPredict.DEFAULT_CPU_COUNT, drug_count=VNNPredict.DEFAULT_DRUG_COUNT, predict_data=None, std=None, model_predictions=None, disease=None, hierarchy=None, parent_network=None, ndexserver=None, ndexuser=None, ndexpassword=None, visibility=False, gpu=False, slurm_partition=None, slurm_account=None, input_data_dict=None ): super().__init__(outdir) self._start_time = int(time.time()) self._command = command self._inputdir = inputdir self._gene_attribute_name = gene_attribute_name self._gene2id = gene2id self._cell2id = cell2id self._mutations = mutations self._cn_deletions = cn_deletions self._cn_amplifications = cn_amplifications self._training_data = training_data self._batchsize = batchsize self._cuda = cuda self._zscore_method = zscore_method self._epoch = epoch self._lr = lr self._wd = wd self._alpha = alpha self._genotype_hiddens = genotype_hiddens self._optimize = optimize self._n_trials = n_trials self._patience = patience self._delta = delta self._min_dropout_layer = min_dropout_layer self._dropout_fraction = dropout_fraction self._skip_parent_copy = skip_parent_copy self._cpu_count = cpu_count self._drug_count = drug_count self._predict_data = predict_data self._std = std self._model_predictions = model_predictions self._disease = disease self._hierarchy = hierarchy self._parent_network = parent_network self._ndexserver = ndexserver self._ndexuser = ndexuser self._ndexpassword = ndexpassword self._visibility = visibility self._gpu = gpu self._slurm_partition = slurm_partition self._slurm_account = slurm_account self._input_data_dict = input_data_dict def _write_slurm_directives(self, out, allocated_time='96:00:00', mem='32G', cpus_per_task='4', job_name='cellmaps_vnn'): """ Writes slurm directives :param allocated_time: :param mem: :param cpus_per_task: :param job_name: :return: """ out.write('#!/bin/bash\n\n') out.write('#SBATCH --job-name=' + str(job_name) + '\n') out.write('#SBATCH --chdir=' + self._outdir + '\n') out.write('#SBATCH --output=%x.%j.out\n') if self._slurm_partition is not None: out.write('#SBATCH --partition=' + self._slurm_partition + '\n') if self._slurm_account is not None: out.write('#SBATCH --account=' + self._slurm_account + '\n') if self._gpu: out.write('#SBATCH --gres=gpu:1\n') out.write('#SBATCH --dependency=singleton\n') else: out.write('#SBATCH --ntasks=1\n') out.write('#SBATCH --cpus-per-task=' + str(cpus_per_task) + '\n') out.write('#SBATCH --mem=' + str(mem) + '\n') out.write('#SBATCH --time=' + str(allocated_time) + '\n\n') out.write('echo $SLURM_JOB_ID\n') out.write('echo $HOSTNAME\n') def _write_job_for_command(self): """ Runs VNN :raises NotImplementedError: Always raised cause subclasses need to implement """ if isinstance(self._command, VNNTrain): if self._inputdir is None: raise CellmapsvnnError("Input dir with hierarchy is required for training.") filename = 'cellmapvnntrainjob.sh' with open(os.path.join(self._outdir, filename), 'w') as f: self._write_slurm_directives(out=f, job_name='cellmapvnntrain') f.write( 'cellmaps_vnncmd.py train ' + os.path.join(self._outdir, 'out_train') + ' --inputdir ' + os.path.abspath(self._inputdir) + ' --gene_attribute_name ' + self._gene_attribute_name) if self._gene2id: f.write(' --gene2id ' + os.path.abspath(self._gene2id)) if self._cell2id: f.write(' --cell2id ' + os.path.abspath(self._cell2id)) if self._mutations: f.write(' --mutations ' + os.path.abspath(self._mutations)) if self._cn_deletions: f.write(' --cn_deletions ' + os.path.abspath(self._cn_deletions)) if self._cn_amplifications: f.write(' --cn_amplifications ' + os.path.abspath(self._cn_amplifications)) if self._training_data: f.write(' --training_data ' + os.path.abspath(self._training_data)) if self._hierarchy: f.write(' --hierarchy ' + os.path.abspath(self._hierarchy)) if self._parent_network: f.write(' --parent_network ' + os.path.abspath(self._parent_network)) f.write( ' --batchsize ' + str(self._batchsize) + ' --cuda ' + str(self._cuda) + ' --zscore_method ' + self._zscore_method + ' --epoch ' + str(self._epoch) + ' --lr ' + str(self._lr) + ' --wd ' + str(self._wd) + ' --alpha ' + str(self._alpha) + ' --genotype_hiddens ' + str(self._genotype_hiddens) + ' --optimize ' + str(self._optimize) + ' --n_trials ' + str(self._n_trials) + ' --patience ' + str(self._patience) + ' --delta ' + str(self._delta) + ' --min_dropout_layer ' + str(self._min_dropout_layer) + ' --dropout_fraction ' + str(self._dropout_fraction) ) if self._skip_parent_copy: f.write(' --skip_parent_copy') f.write('\n') f.write('exit $?\n') os.chmod(os.path.join(self._outdir, filename), 0o755) return filename elif isinstance(self._command, VNNPredict): if self._inputdir is None: raise CellmapsvnnError("Input dir with trained model is required for prediction.") filename = 'cellmapvnnpredictjob.sh' with open(os.path.join(self._outdir, filename), 'w') as f: self._write_slurm_directives(out=f, job_name='cellmapvnnpredict') f.write( 'cellmaps_vnncmd.py predict ' + os.path.join(self._outdir, 'out_predict') + ' --inputdir ' + os.path.abspath(self._inputdir)) if self._gene2id: f.write(' --gene2id ' + os.path.abspath(self._gene2id)) if self._cell2id: f.write(' --cell2id ' + os.path.abspath(self._cell2id)) if self._mutations: f.write(' --mutations ' + os.path.abspath(self._mutations)) if self._cn_deletions: f.write(' --cn_deletions ' + os.path.abspath(self._cn_deletions)) if self._cn_amplifications: f.write(' --cn_amplifications ' + os.path.abspath(self._cn_amplifications)) if self._predict_data: f.write(' --predict_data ' + os.path.abspath(self._predict_data)) if self._hierarchy: f.write(' --hierarchy ' + os.path.abspath(self._hierarchy)) if self._parent_network: f.write(' --parent_network ' + os.path.abspath(self._parent_network)) f.write( ' --batchsize ' + str(self._batchsize) + ' --cuda ' + str(self._cuda) + ' --zscore_method ' + self._zscore_method + ' --genotype_hiddens ' + str(self._genotype_hiddens) + ' --cpu_count ' + str(self._cpu_count) + ' --drug_count ' + str(self._drug_count) ) if self._std: f.write(' --std ' + os.path.abspath(self._std)) f.write('\n') f.write('exit $?\n') os.chmod(os.path.join(self._outdir, filename), 0o755) return filename elif isinstance(self._command, VNNAnnotate): self._gpu = False filename = 'cellmapvnnannotatejob.sh' with open(os.path.join(self._outdir, filename), 'w') as f: self._write_slurm_directives(out=f, job_name='cellmapvnnannotate') f.write( 'cellmaps_vnncmd.py annotate ' + os.path.join(self._outdir, 'out_annotate') + ' --model_predictions ' + ' '.join(self._model_predictions)) if self._disease: f.write(' --disease ' + self._disease) if self._hierarchy: f.write(' --hierarchy ' + os.path.abspath(self._hierarchy)) if self._parent_network: f.write(' --parent_network ' + os.path.abspath(self._parent_network)) if self._ndexserver: f.write(' --ndexserver ' + self._ndexserver) if self._ndexuser: f.write(' --ndexuser ' + self._ndexuser) if self._ndexpassword: f.write(' --ndexpassword ' + self._ndexpassword) if self._visibility: f.write(' --visibility') f.write('\n') f.write('exit $?\n') os.chmod(os.path.join(self._outdir, filename), 0o755) return filename else: raise CellmapsvnnError("Command not recognized")
[docs] def run(self): """ Runs CM4AI Pipeline :return: """ logger.debug('In run method') if not os.path.isdir(self._outdir): os.makedirs(self._outdir, mode=0o755) logutils.write_task_start_json(outdir=self._outdir, start_time=self._start_time, data={ 'commandlineargs': self._input_data_dict}, version=cellmaps_vnn.__version__) exitcode = 99 try: self._write_job_for_command() exitcode = 0 finally: logutils.write_task_finish_json(outdir=self._outdir, start_time=self._start_time, status=exitcode) return exitcode
[docs] class CellmapsvnnRunner(VnnRunner): """ Class to run algorithm """ def __init__(self, outdir=None, command=None, inputdir=None, name=None, organization_name=None, project_name=None, exitcode=None, skip_logging=True, input_data_dict=None, provenance_utils=ProvenanceUtil()): """ Constructor :param outdir: Directory to create and put results in :type outdir: str :param skip_logging: If ``True`` skip logging, if ``None`` or ``False`` do NOT skip logging :type skip_logging: bool :param exitcode: value to return via :py:meth:`.CellmapsvnnRunner.run` method :type int: :param input_data_dict: Command line arguments used to invoke this :type input_data_dict: dict :param provenance_utils: Wrapper for `fairscape-cli <https://pypi.org/project/fairscape-cli>`__ which is used for `RO-Crate <https://www.researchobject.org/ro-crate>`__ creation and population :type provenance_utils: :py:class:`~cellmaps_utils.provenance.ProvenanceUtil` """ super().__init__(outdir) self._command = command self._inputdir = inputdir self._name = name self._project_name = project_name self._organization_name = organization_name self._keywords = None self._description = None self._exitcode = exitcode self._start_time = int(time.time()) if skip_logging is None: self._skip_logging = False else: self._skip_logging = skip_logging self._input_data_dict = input_data_dict self._provenance_utils = provenance_utils logger.debug('In constructor') def _update_provenance_fields(self): """ Updates the provenance attributes by merging the ROCrate provenance attributes from the input directory with optional overrides for the name, project name, and organization name and additional keywords. """ rocrate_dirs = [] dirs = [] if isinstance(self._inputdir, str): dirs = [self._inputdir] elif isinstance(self._inputdir, list): dirs = self._inputdir for entry in dirs: if os.path.exists(os.path.join(entry, constants.RO_CRATE_METADATA_FILE)): rocrate_dirs.append(entry) if len(rocrate_dirs) > 0: prov_attrs = self._provenance_utils.get_merged_rocrate_provenance_attrs(rocrate_dirs, override_name=self._name, override_project_name= self._project_name, override_organization_name= self._organization_name, extra_keywords=[ 'VNN', 'Visible Neural Network', self._command.COMMAND ]) if self._name is None: self._name = prov_attrs.get_name() if self._organization_name is None: self._organization_name = prov_attrs.get_organization_name() if self._project_name is None: self._project_name = prov_attrs.get_project_name() self._keywords = prov_attrs.get_keywords() self._description = prov_attrs.get_description() else: self._name = 'VNN tool' self._organization_name = 'Example' self._project_name = 'Example' self._keywords = ['vnn'] self._description = 'Example input dataset VNN' def _create_rocrate(self): """ Creates rocrate for output directory :raises CellMapsProvenanceError: If there is an error """ logger.debug('Registering rocrate with FAIRSCAPE') try: self._provenance_utils.register_rocrate(self._outdir, name=self._name, organization_name=self._organization_name, project_name=self._project_name, description=self._description, keywords=self._keywords) except TypeError as te: raise CellmapsvnnError('Invalid provenance: ' + str(te)) except KeyError as ke: raise CellmapsvnnError('Key missing in provenance: ' + str(ke)) def _register_software(self): """ Registers this tool :raises CellMapsImageEmbeddingError: If fairscape call fails """ software_keywords = self._keywords software_keywords.extend(['tools', cellmaps_vnn.__name__]) software_description = self._description + ' ' + cellmaps_vnn.__description__ self._softwareid = self._provenance_utils.register_software(self._outdir, name=cellmaps_vnn.__name__, description=software_description, author=cellmaps_vnn.__author__, version=cellmaps_vnn.__version__, file_format='py', keywords=software_keywords, url=cellmaps_vnn.__repo_url__) def _register_computation(self, generated_dataset_ids=[]): """ Registers the computation run details with the FAIRSCAPE platform. :param generated_dataset_ids: List of IDs for the datasets generated during the computation. :type generated_dataset_ids: list # Todo: added in used dataset, software and what is being generated :return: """ logger.debug('Getting id of input rocrate') used_dataset = [] if isinstance(self._inputdir, str): if os.path.exists(os.path.join(self._inputdir, constants.RO_CRATE_METADATA_FILE)): used_dataset = [self._provenance_utils.get_id_of_rocrate(self._inputdir)] elif isinstance(self._inputdir, list): for entry in self._inputdir: if os.path.exists(os.path.join(entry, constants.RO_CRATE_METADATA_FILE)): used_dataset.append(self._provenance_utils.get_id_of_rocrate(entry)) self._provenance_utils.register_computation(self._outdir, name=cellmaps_vnn.__name__ + ' computation', run_by=str(self._provenance_utils.get_login()), command=str(self._input_data_dict), description='run of ' + cellmaps_vnn.__name__, used_software=[self._softwareid], used_dataset=used_dataset, generated=generated_dataset_ids)
[docs] def run(self): """ Runs cellmaps_vnn :return: """ exitcode = 99 try: logger.debug('In run method') if os.path.isdir(self._outdir): raise CellmapsvnnError(self._outdir + ' already exists') if not os.path.isdir(self._outdir): os.makedirs(self._outdir, mode=0o755) if self._skip_logging is False: logutils.setup_filelogger(outdir=self._outdir, handlerprefix='cellmaps_vnn') logutils.write_task_start_json(outdir=self._outdir, start_time=self._start_time, data={'commandlineargs': self._input_data_dict}, version=cellmaps_vnn.__version__) self._update_provenance_fields() self._create_rocrate() self._register_software() generated_dataset_ids = [] if self._command: self._command.run() generated_dataset_ids.extend(self._command.register_outputs( self._outdir, self._description, self._keywords, self._provenance_utils)) else: raise CellmapsvnnError("No command provided to CellmapsvnnRunner") # register generated datasets self._register_computation(generated_dataset_ids=generated_dataset_ids) # set exit code to value passed in via constructor exitcode = self._exitcode if self._exitcode is not None else 0 finally: # write a task finish file logutils.write_task_finish_json(outdir=self._outdir, start_time=self._start_time, status=exitcode) return exitcode