From 1be576dd7f361678c64914543a792288d55dcf27 Mon Sep 17 00:00:00 2001 From: skchronicles Date: Tue, 12 Nov 2024 18:21:58 -0500 Subject: [PATCH] Adding new --use-gpus option and adding support for deepsomatic with GPUs and CPUs --- config/cluster/slurm.json | 2 +- genome-seek | 20 ++- workflow/Snakefile | 5 + workflow/rules/somatic.smk | 268 ++++++++++++++++++++++++++----------- 4 files changed, 214 insertions(+), 81 deletions(-) diff --git a/config/cluster/slurm.json b/config/cluster/slurm.json index 15db48a..9d6b5f5 100644 --- a/config/cluster/slurm.json +++ b/config/cluster/slurm.json @@ -99,7 +99,7 @@ }, "deepsomatic_postprocess_variants": { "threads": "6", - "mem": "72G", + "mem": "120G", "time": "1-00:00:00", "gres": "lscratch:450" }, diff --git a/genome-seek b/genome-seek index d2c6747..94dcd78 100755 --- a/genome-seek +++ b/genome-seek @@ -388,7 +388,7 @@ def parsed_arguments(name, description): [--open-cravat] [--oc-annotators OC_ANNOTATORS] [--oc-modules OC_MODULES] \\ [--pairs PAIRS] [--pon PANEL_OF_NORMALS] [--wes-mode] [--wes-bed WES_BED] \\ [--skip-qc] [--tmp-dir TMP_DIR] [--silent] [--sif-cache SIF_CACHE] \\ - [--singularity-cache SINGULARITY_CACHE] \\ + [--singularity-cache SINGULARITY_CACHE] [--use-gpus] \\ [--resource-bundle RESOURCE_BUNDLE] \\ [--dry-run] [--threads THREADS] \\ --input INPUT [INPUT ...] \\ @@ -621,6 +621,12 @@ def parsed_arguments(name, description): recommended setting this vaule to the maximum number of CPUs available on the host machine, default: 2. Example: --threads: 16 + --use-gpus + Utilize GPU computing for any suppported tools. + Several steps in the pipeline can be accelerated + with GPUs. This feature is only supported on + Biowulf at the NIH. + Example: --use-gpus {3}{4}Misc Options:{5} -h, --help Show usage information, help message, and exit. @@ -942,6 +948,18 @@ def parsed_arguments(name, description): help = argparse.SUPPRESS ) + # Use GPU computing to speed + # up any time consuming steps, + # this is only supported for + # a few tools. + subparser_run.add_argument( + '--use-gpus', + action = 'store_true', + required = False, + default = False, + help = argparse.SUPPRESS + ) + # Sub-parser for the "unlock" sub-command # Grouped sub-parser arguments are currently # not supported: https://bugs.python.org/issue9341 diff --git a/workflow/Snakefile b/workflow/Snakefile index 87618b6..353629c 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -22,6 +22,7 @@ today = str(datetime.datetime.today()).split()[0].replace('-', '') # Global workflow variables configfile: 'config.json' # Generated from user input and config/*.json run_mode = config['options']['mode'] # Run mode: local, slurm, or uge +bindpath = config['bindpaths'] # Singularity paths to bind to the container samples = config['samples'] # Base name of input samples workpath = config['project']['workpath'] # Pipeline's output directory filetype = config['project']['filetype'] # 'paired-end' or 'single-end' (not supported) @@ -36,6 +37,10 @@ normals = [n for n in normals if n] # Remove tumor-onlys, i.e. empty tumor2normal = config['pairs'] # Dict to map a tumor to its normal # List of tumor samples with a paired normal tumorWnormal = [t for t in tumors if tumor2normal[t]] +# Use GPU-compute for any supported tools +use_gpus = str_bool( + config['options']['use_gpus'] +) # Analysis Options call_cnv = str_bool( # Call copy number variation (CNVs), diff --git a/workflow/rules/somatic.smk b/workflow/rules/somatic.smk index 0564b95..79ce855 100644 --- a/workflow/rules/somatic.smk +++ b/workflow/rules/somatic.smk @@ -5,6 +5,7 @@ from scripts.common import ( joint_option ) + # Helper functions for tumor, normal pairs def get_normal_recal_bam(wildcards): """ @@ -797,7 +798,6 @@ rule deepsomatic_make_examples: normal_name_option = lambda w: "--sample_name_normal {0}".format( tumor2normal[w.name] ) if tumor2normal[w.name] else "", - message: "Running DeepSomatic make_examples on '{input.tumor}' input file" threads: int(allocated("threads", "deepsomatic_make_examples", cluster)) container: config['images']['deepsomatic'] envmodules: config['tools']['deepsomatic'] @@ -849,76 +849,176 @@ rule deepsomatic_make_examples: && touch {output.success} """ - -rule deepsomatic_call_variants: - """ - Data processing step to call somatic variants using deep neural - network. The make_examples step prepares the input data for the - deepsomatic's CNN. DeepSomatic is an extension of deep learning- - based variant caller Deepvariant. It is composed of multiple steps - that takes aligned reads (in BAM or CRAM format), produces pileup - image tensors from them, classifies each tensor using a convolutional - neural network, and finally reports the results in a standard VCF or - gVCF file. This rule is the first step in the deepsomatic pipeline: - 1. make_examples (CPU, parallelizable with gnu-parallel) - * 2. call_variants (GPU, use a GPU node) - 3. postprocess_variants (CPU, single-threaded) - Running deepsomatic in a single step using run_deepsomatic is not - optimal for large-scale projects as it will consume resources very - inefficently. As so, it is better to run the 1st/3rd step on a - compute node and run the 2nd step on a GPU node. - @Input: - Flag file to indicate success of make_examples (scatter) - @Output: - Per sample call_variants tensorflow records file - """ - input: - success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"), - output: - callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"), - params: - rname = "ds_callvars", - genome = config['references']['GENOME'], - tmpdir = tmpdir, - # NOTE: There BE dragons here! - # We need allocation info from make_examples rule - # to determine the number of shards that were - # used in the make_examples step, this is used - # to resolve a dependency file of this rule, - # which is the examples tf record file produced by - # make_examples. This file gets passed to the - # --examples option of call_variants. - example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format( - w.name, - int(allocated("threads", "deepsomatic_make_examples", cluster)) - )), - # TODO: add option --ffpe option to pipeline, that - # selects either the ffpe_wgs or ffpe_wes checkpoints. - # Building option for checkpoint file (assumes TN-pairs and - # non-FFPE samples), where: - # @WES = "/opt/models/deepsomatic/wes" - # @WGS = "/opt/models/deepsomatic/wgs" - ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs", - message: "Running deepsomatic call_variants on '{wildcards.name}' sample" - threads: int(allocated("threads", "deepsomatic_call_variants", cluster)) - container: config['images']['deepsomatic'] - envmodules: config['tools']['deepsomatic'] - shell: """ - # Setups temporary directory for - # intermediate files with built-in - # mechanism for deletion on exit - if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi - tmp=$(mktemp -d -p "{params.tmpdir}") - trap 'rm -rf "${{tmp}}"' EXIT - echo "Using tmpdir: ${{tmp}}" - export TMPDIR="${{tmp}}" - - # Run DeepSomatic call_variants - time call_variants \\ - --outfile {output.callvar} \\ - --examples {params.example} \\ - --checkpoint {params.ckpt} - """ +if use_gpus: + # Use GPU-acceleration to speed up + # the second step in deepsomatic + rule deepsomatic_call_variants_gpu: + """ + Data processing step to call somatic variants using deep neural + network. The make_examples step prepares the input data for the + deepsomatic's CNN. DeepSomatic is an extension of deep learning- + based variant caller Deepvariant. It is composed of multiple steps + that takes aligned reads (in BAM or CRAM format), produces pileup + image tensors from them, classifies each tensor using a convolutional + neural network, and finally reports the results in a standard VCF or + gVCF file. This rule is the first step in the deepsomatic pipeline: + 1. make_examples (CPU, parallelizable with gnu-parallel) + * 2. call_variants (GPU, use a GPU node) + 3. postprocess_variants (CPU, single-threaded) + Running deepsomatic in a single step using run_deepsomatic is not + optimal for large-scale projects as it will consume resources very + inefficently. As so, it is better to run the 1st/3rd step on a + compute node and run the 2nd step on a GPU node. + NOTE: When deepsomatic is run on a GPU/TPU, it will scatter the + writing of the output *.call_variants.tfrecord.gz across a pool + of processes (by default, --writer_threads 16). This causes causes + the final output file to be different if you are running DeepSomatic + on a CPU versus GPU. + @Input: + Flag file to indicate success of make_examples (scatter) + @Output: + Flag file to indicate success of call_variants, + actually produces (given 16 writer threads): + {name}.call_variants-00000-of-00016.tfrecord.gz, ... + {name}.call_variants-00015-of-00016.tfrecord.gz + """ + input: + success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"), + output: + success = join(workpath, "deepsomatic", "call_variants", "{name}.cv.success"), + params: + rname = "ds_callvars_gpu", + genome = config['references']['GENOME'], + # Singularity options + sif = config['images']['deepsomatic_gpu'], + bindpaths = ','.join(bindpath), + tmpdir = tmpdir, + # NOTE: There BE dragons here! + # We need allocation info from make_examples rule + # to determine the number of shards that were + # used in the make_examples step, this is used + # to resolve a dependency file of this rule, + # which is the examples tf record file produced by + # make_examples. This file gets passed to the + # --examples option of call_variants. + example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format( + w.name, + int(allocated("threads", "deepsomatic_make_examples", cluster)) + )), + callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"), + # TODO: add option --ffpe option to pipeline, that + # selects either the ffpe_wgs or ffpe_wes checkpoints. + # Building option for checkpoint file (assumes TN-pairs and + # non-FFPE samples), where: + # @WES = "/opt/models/deepsomatic/wes" + # @WGS = "/opt/models/deepsomatic/wgs" + ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs", + threads: max(int(allocated("threads", "deepsomatic_call_variants_gpu", cluster)) - 2, 4), + envmodules: config['tools']['deepsomatic'] + shell: """ + # Setups temporary directory for + # intermediate files with built-in + # mechanism for deletion on exit + if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi + tmp=$(mktemp -d -p "{params.tmpdir}") + trap 'rm -rf "${{tmp}}"' EXIT + echo "Using tmpdir: ${{tmp}}" + export TMPDIR="${{tmp}}" + + # Run DeepSomatic call_variants + # using a GPU acceleration + singularity exec \\ + -c \\ + --nv \\ + -B {params.bindpaths},${{tmp}}:/tmp \\ + {params.sif} /bin/bash -c \\ + 'time call_variants \\ + --outfile {params.callvar} \\ + --examples {params.example} \\ + --checkpoint {params.ckpt} \\ + --writer_threads {threads}' + touch "{output.success}" + """ +else: + # Use CPU accelerated version for the + # second step in deepsomatic + rule deepsomatic_call_variants_cpu: + """ + Data processing step to call somatic variants using deep neural + network. The make_examples step prepares the input data for the + deepsomatic's CNN. DeepSomatic is an extension of deep learning- + based variant caller Deepvariant. It is composed of multiple steps + that takes aligned reads (in BAM or CRAM format), produces pileup + image tensors from them, classifies each tensor using a convolutional + neural network, and finally reports the results in a standard VCF or + gVCF file. This rule is the first step in the deepsomatic pipeline: + 1. make_examples (CPU, parallelizable with gnu-parallel) + * 2. call_variants (CPU, multi-threaded) + 3. postprocess_variants (CPU, single-threaded) + Running deepsomatic in a single step using run_deepsomatic is not + optimal for large-scale projects as it will consume resources very + inefficently. As so, it is better to run the 1st/3rd step on a + compute node and run the 2nd step on a GPU node. + NOTE: There be dragens here! When deepsomatic is run on a GPU/TPU, + it will scatter the writing of the output *.call_variants.tfrecord.gz + across a pool of processes (by default, --writer_threads 16). This + causes causes the final output file to be different if you are + running DeepSomatic on a CPU versus GPU. + @Input: + Flag file to indicate success of make_examples (scatter) + @Output: + Flag file to indicate success of call_variants, + actually produces: + {name}.call_variants.tfrecord.gz + """ + input: + success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"), + output: + success = join(workpath, "deepsomatic", "call_variants", "{name}.cv.success"), + params: + rname = "ds_callvars_cpu", + genome = config['references']['GENOME'], + tmpdir = tmpdir, + # NOTE: There BE dragons here! + # We need allocation info from make_examples rule + # to determine the number of shards that were + # used in the make_examples step, this is used + # to resolve a dependency file of this rule, + # which is the examples tf record file produced by + # make_examples. This file gets passed to the + # --examples option of call_variants. + example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format( + w.name, + int(allocated("threads", "deepsomatic_make_examples", cluster)) + )), + callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"), + # TODO: add option --ffpe option to pipeline, that + # selects either the ffpe_wgs or ffpe_wes checkpoints. + # Building option for checkpoint file (assumes TN-pairs and + # non-FFPE samples), where: + # @WES = "/opt/models/deepsomatic/wes" + # @WGS = "/opt/models/deepsomatic/wgs" + ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs", + threads: int(allocated("threads", "deepsomatic_call_variants_cpu", cluster)) + container: config['images']['deepsomatic'] + envmodules: config['tools']['deepsomatic'] + shell: """ + # Setups temporary directory for + # intermediate files with built-in + # mechanism for deletion on exit + if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi + tmp=$(mktemp -d -p "{params.tmpdir}") + trap 'rm -rf "${{tmp}}"' EXIT + echo "Using tmpdir: ${{tmp}}" + export TMPDIR="${{tmp}}" + + # Run CPU DeepSomatic call_variants + time call_variants \\ + --outfile {params.callvar} \\ + --examples {params.example} \\ + --checkpoint {params.ckpt} + touch "{output.success}" + """ rule deepsomatic_postprocess_variants: @@ -938,20 +1038,29 @@ rule deepsomatic_postprocess_variants: optimal for large-scale projects as it will consume resources very inefficently. As so, it is better to run the 1st/3rd step on a compute node and run the 2nd step on a GPU node. + NOTE: There be dragens here! Deepsomatic will generate a different + set of output files at the call_variants steps if it is run on a + CPU versus a GPU. A flag file is used to indicate this step was + successful and the actual sharded/non-shared file (which is input + to this step) is resolved in the params section. Please note this + file will not actually exist if call_variants was run with a GPU. + Looking at their source code, it appears deepsomatic has some logic + to detect if a sharded writer was used in the previous step, and it + will read in the set of sharded call_variants files without issues. @Input: Per-sample call_variants tensorflow records file (scatter) @Output: - Single-sample gVCF file with called variants + Single-sample VCF file with called variants """ input: - callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"), + success = join(workpath, "deepsomatic", "call_variants", "{name}.cv.success"), output: vcf = join(workpath, "deepsomatic", "somatic", "{name}.deepsomatic.vcf"), params: - rname = "ds_postprovars", - genome = config['references']['GENOME'], - tmpdir = tmpdir, - message: "Running DeepSomatic postprocess_variants on '{input.callvar}' input file" + rname = "ds_postprovars", + genome = config['references']['GENOME'], + tmpdir = tmpdir, + callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"), threads: int(allocated("threads", "deepsomatic_postprocess_variants", cluster)) container: config['images']['deepsomatic'] envmodules: config['tools']['deepsomatic'] @@ -968,9 +1077,10 @@ rule deepsomatic_postprocess_variants: # Run DeepSomatic postprocess_variants time postprocess_variants \\ --ref {params.genome} \\ - --infile {input.callvar} \\ + --infile {params.callvar} \\ --outfile {output.vcf} \\ - --process_somatic=true + --process_somatic=true \\ + --cpus={threads} """