Skip to content

Commit

Permalink
Adding new --use-gpus option and adding support for deepsomatic with …
Browse files Browse the repository at this point in the history
…GPUs and CPUs
  • Loading branch information
skchronicles committed Nov 12, 2024
1 parent 1267ed4 commit 1be576d
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 81 deletions.
2 changes: 1 addition & 1 deletion config/cluster/slurm.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
},
"deepsomatic_postprocess_variants": {
"threads": "6",
"mem": "72G",
"mem": "120G",
"time": "1-00:00:00",
"gres": "lscratch:450"
},
Expand Down
20 changes: 19 additions & 1 deletion genome-seek
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def parsed_arguments(name, description):
[--open-cravat] [--oc-annotators OC_ANNOTATORS] [--oc-modules OC_MODULES] \\
[--pairs PAIRS] [--pon PANEL_OF_NORMALS] [--wes-mode] [--wes-bed WES_BED] \\
[--skip-qc] [--tmp-dir TMP_DIR] [--silent] [--sif-cache SIF_CACHE] \\
[--singularity-cache SINGULARITY_CACHE] \\
[--singularity-cache SINGULARITY_CACHE] [--use-gpus] \\
[--resource-bundle RESOURCE_BUNDLE] \\
[--dry-run] [--threads THREADS] \\
--input INPUT [INPUT ...] \\
Expand Down Expand Up @@ -621,6 +621,12 @@ def parsed_arguments(name, description):
recommended setting this vaule to the maximum number
of CPUs available on the host machine, default: 2.
Example: --threads: 16
--use-gpus
Utilize GPU computing for any suppported tools.
Several steps in the pipeline can be accelerated
with GPUs. This feature is only supported on
Biowulf at the NIH.
Example: --use-gpus
{3}{4}Misc Options:{5}
-h, --help Show usage information, help message, and exit.
Expand Down Expand Up @@ -942,6 +948,18 @@ def parsed_arguments(name, description):
help = argparse.SUPPRESS
)

# Use GPU computing to speed
# up any time consuming steps,
# this is only supported for
# a few tools.
subparser_run.add_argument(
'--use-gpus',
action = 'store_true',
required = False,
default = False,
help = argparse.SUPPRESS
)

# Sub-parser for the "unlock" sub-command
# Grouped sub-parser arguments are currently
# not supported: https://bugs.python.org/issue9341
Expand Down
5 changes: 5 additions & 0 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ today = str(datetime.datetime.today()).split()[0].replace('-', '')
# Global workflow variables
configfile: 'config.json' # Generated from user input and config/*.json
run_mode = config['options']['mode'] # Run mode: local, slurm, or uge
bindpath = config['bindpaths'] # Singularity paths to bind to the container
samples = config['samples'] # Base name of input samples
workpath = config['project']['workpath'] # Pipeline's output directory
filetype = config['project']['filetype'] # 'paired-end' or 'single-end' (not supported)
Expand All @@ -36,6 +37,10 @@ normals = [n for n in normals if n] # Remove tumor-onlys, i.e. empty
tumor2normal = config['pairs'] # Dict to map a tumor to its normal
# List of tumor samples with a paired normal
tumorWnormal = [t for t in tumors if tumor2normal[t]]
# Use GPU-compute for any supported tools
use_gpus = str_bool(
config['options']['use_gpus']
)

# Analysis Options
call_cnv = str_bool( # Call copy number variation (CNVs),
Expand Down
268 changes: 189 additions & 79 deletions workflow/rules/somatic.smk
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ from scripts.common import (
joint_option
)


# Helper functions for tumor, normal pairs
def get_normal_recal_bam(wildcards):
"""
Expand Down Expand Up @@ -797,7 +798,6 @@ rule deepsomatic_make_examples:
normal_name_option = lambda w: "--sample_name_normal {0}".format(
tumor2normal[w.name]
) if tumor2normal[w.name] else "",
message: "Running DeepSomatic make_examples on '{input.tumor}' input file"
threads: int(allocated("threads", "deepsomatic_make_examples", cluster))
container: config['images']['deepsomatic']
envmodules: config['tools']['deepsomatic']
Expand Down Expand Up @@ -849,76 +849,176 @@ rule deepsomatic_make_examples:
&& touch {output.success}
"""


rule deepsomatic_call_variants:
"""
Data processing step to call somatic variants using deep neural
network. The make_examples step prepares the input data for the
deepsomatic's CNN. DeepSomatic is an extension of deep learning-
based variant caller Deepvariant. It is composed of multiple steps
that takes aligned reads (in BAM or CRAM format), produces pileup
image tensors from them, classifies each tensor using a convolutional
neural network, and finally reports the results in a standard VCF or
gVCF file. This rule is the first step in the deepsomatic pipeline:
1. make_examples (CPU, parallelizable with gnu-parallel)
* 2. call_variants (GPU, use a GPU node)
3. postprocess_variants (CPU, single-threaded)
Running deepsomatic in a single step using run_deepsomatic is not
optimal for large-scale projects as it will consume resources very
inefficently. As so, it is better to run the 1st/3rd step on a
compute node and run the 2nd step on a GPU node.
@Input:
Flag file to indicate success of make_examples (scatter)
@Output:
Per sample call_variants tensorflow records file
"""
input:
success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"),
output:
callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"),
params:
rname = "ds_callvars",
genome = config['references']['GENOME'],
tmpdir = tmpdir,
# NOTE: There BE dragons here!
# We need allocation info from make_examples rule
# to determine the number of shards that were
# used in the make_examples step, this is used
# to resolve a dependency file of this rule,
# which is the examples tf record file produced by
# make_examples. This file gets passed to the
# --examples option of call_variants.
example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format(
w.name,
int(allocated("threads", "deepsomatic_make_examples", cluster))
)),
# TODO: add option --ffpe option to pipeline, that
# selects either the ffpe_wgs or ffpe_wes checkpoints.
# Building option for checkpoint file (assumes TN-pairs and
# non-FFPE samples), where:
# @WES = "/opt/models/deepsomatic/wes"
# @WGS = "/opt/models/deepsomatic/wgs"
ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs",
message: "Running deepsomatic call_variants on '{wildcards.name}' sample"
threads: int(allocated("threads", "deepsomatic_call_variants", cluster))
container: config['images']['deepsomatic']
envmodules: config['tools']['deepsomatic']
shell: """
# Setups temporary directory for
# intermediate files with built-in
# mechanism for deletion on exit
if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi
tmp=$(mktemp -d -p "{params.tmpdir}")
trap 'rm -rf "${{tmp}}"' EXIT
echo "Using tmpdir: ${{tmp}}"
export TMPDIR="${{tmp}}"
# Run DeepSomatic call_variants
time call_variants \\
--outfile {output.callvar} \\
--examples {params.example} \\
--checkpoint {params.ckpt}
"""
if use_gpus:
# Use GPU-acceleration to speed up
# the second step in deepsomatic
rule deepsomatic_call_variants_gpu:
"""
Data processing step to call somatic variants using deep neural
network. The make_examples step prepares the input data for the
deepsomatic's CNN. DeepSomatic is an extension of deep learning-
based variant caller Deepvariant. It is composed of multiple steps
that takes aligned reads (in BAM or CRAM format), produces pileup
image tensors from them, classifies each tensor using a convolutional
neural network, and finally reports the results in a standard VCF or
gVCF file. This rule is the first step in the deepsomatic pipeline:
1. make_examples (CPU, parallelizable with gnu-parallel)
* 2. call_variants (GPU, use a GPU node)
3. postprocess_variants (CPU, single-threaded)
Running deepsomatic in a single step using run_deepsomatic is not
optimal for large-scale projects as it will consume resources very
inefficently. As so, it is better to run the 1st/3rd step on a
compute node and run the 2nd step on a GPU node.
NOTE: When deepsomatic is run on a GPU/TPU, it will scatter the
writing of the output *.call_variants.tfrecord.gz across a pool
of processes (by default, --writer_threads 16). This causes causes
the final output file to be different if you are running DeepSomatic
on a CPU versus GPU.
@Input:
Flag file to indicate success of make_examples (scatter)
@Output:
Flag file to indicate success of call_variants,
actually produces (given 16 writer threads):
{name}.call_variants-00000-of-00016.tfrecord.gz, ...
{name}.call_variants-00015-of-00016.tfrecord.gz
"""
input:
success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"),
output:
success = join(workpath, "deepsomatic", "call_variants", "{name}.cv.success"),
params:
rname = "ds_callvars_gpu",
genome = config['references']['GENOME'],
# Singularity options
sif = config['images']['deepsomatic_gpu'],
bindpaths = ','.join(bindpath),
tmpdir = tmpdir,
# NOTE: There BE dragons here!
# We need allocation info from make_examples rule
# to determine the number of shards that were
# used in the make_examples step, this is used
# to resolve a dependency file of this rule,
# which is the examples tf record file produced by
# make_examples. This file gets passed to the
# --examples option of call_variants.
example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format(
w.name,
int(allocated("threads", "deepsomatic_make_examples", cluster))
)),
callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"),
# TODO: add option --ffpe option to pipeline, that
# selects either the ffpe_wgs or ffpe_wes checkpoints.
# Building option for checkpoint file (assumes TN-pairs and
# non-FFPE samples), where:
# @WES = "/opt/models/deepsomatic/wes"
# @WGS = "/opt/models/deepsomatic/wgs"
ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs",
threads: max(int(allocated("threads", "deepsomatic_call_variants_gpu", cluster)) - 2, 4),
envmodules: config['tools']['deepsomatic']
shell: """
# Setups temporary directory for
# intermediate files with built-in
# mechanism for deletion on exit
if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi
tmp=$(mktemp -d -p "{params.tmpdir}")
trap 'rm -rf "${{tmp}}"' EXIT
echo "Using tmpdir: ${{tmp}}"
export TMPDIR="${{tmp}}"
# Run DeepSomatic call_variants
# using a GPU acceleration
singularity exec \\
-c \\
--nv \\
-B {params.bindpaths},${{tmp}}:/tmp \\
{params.sif} /bin/bash -c \\
'time call_variants \\
--outfile {params.callvar} \\
--examples {params.example} \\
--checkpoint {params.ckpt} \\
--writer_threads {threads}'
touch "{output.success}"
"""
else:
# Use CPU accelerated version for the
# second step in deepsomatic
rule deepsomatic_call_variants_cpu:
"""
Data processing step to call somatic variants using deep neural
network. The make_examples step prepares the input data for the
deepsomatic's CNN. DeepSomatic is an extension of deep learning-
based variant caller Deepvariant. It is composed of multiple steps
that takes aligned reads (in BAM or CRAM format), produces pileup
image tensors from them, classifies each tensor using a convolutional
neural network, and finally reports the results in a standard VCF or
gVCF file. This rule is the first step in the deepsomatic pipeline:
1. make_examples (CPU, parallelizable with gnu-parallel)
* 2. call_variants (CPU, multi-threaded)
3. postprocess_variants (CPU, single-threaded)
Running deepsomatic in a single step using run_deepsomatic is not
optimal for large-scale projects as it will consume resources very
inefficently. As so, it is better to run the 1st/3rd step on a
compute node and run the 2nd step on a GPU node.
NOTE: There be dragens here! When deepsomatic is run on a GPU/TPU,
it will scatter the writing of the output *.call_variants.tfrecord.gz
across a pool of processes (by default, --writer_threads 16). This
causes causes the final output file to be different if you are
running DeepSomatic on a CPU versus GPU.
@Input:
Flag file to indicate success of make_examples (scatter)
@Output:
Flag file to indicate success of call_variants,
actually produces:
{name}.call_variants.tfrecord.gz
"""
input:
success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"),
output:
success = join(workpath, "deepsomatic", "call_variants", "{name}.cv.success"),
params:
rname = "ds_callvars_cpu",
genome = config['references']['GENOME'],
tmpdir = tmpdir,
# NOTE: There BE dragons here!
# We need allocation info from make_examples rule
# to determine the number of shards that were
# used in the make_examples step, this is used
# to resolve a dependency file of this rule,
# which is the examples tf record file produced by
# make_examples. This file gets passed to the
# --examples option of call_variants.
example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format(
w.name,
int(allocated("threads", "deepsomatic_make_examples", cluster))
)),
callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"),
# TODO: add option --ffpe option to pipeline, that
# selects either the ffpe_wgs or ffpe_wes checkpoints.
# Building option for checkpoint file (assumes TN-pairs and
# non-FFPE samples), where:
# @WES = "/opt/models/deepsomatic/wes"
# @WGS = "/opt/models/deepsomatic/wgs"
ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs",
threads: int(allocated("threads", "deepsomatic_call_variants_cpu", cluster))
container: config['images']['deepsomatic']
envmodules: config['tools']['deepsomatic']
shell: """
# Setups temporary directory for
# intermediate files with built-in
# mechanism for deletion on exit
if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi
tmp=$(mktemp -d -p "{params.tmpdir}")
trap 'rm -rf "${{tmp}}"' EXIT
echo "Using tmpdir: ${{tmp}}"
export TMPDIR="${{tmp}}"
# Run CPU DeepSomatic call_variants
time call_variants \\
--outfile {params.callvar} \\
--examples {params.example} \\
--checkpoint {params.ckpt}
touch "{output.success}"
"""


rule deepsomatic_postprocess_variants:
Expand All @@ -938,20 +1038,29 @@ rule deepsomatic_postprocess_variants:
optimal for large-scale projects as it will consume resources very
inefficently. As so, it is better to run the 1st/3rd step on a
compute node and run the 2nd step on a GPU node.
NOTE: There be dragens here! Deepsomatic will generate a different
set of output files at the call_variants steps if it is run on a
CPU versus a GPU. A flag file is used to indicate this step was
successful and the actual sharded/non-shared file (which is input
to this step) is resolved in the params section. Please note this
file will not actually exist if call_variants was run with a GPU.
Looking at their source code, it appears deepsomatic has some logic
to detect if a sharded writer was used in the previous step, and it
will read in the set of sharded call_variants files without issues.
@Input:
Per-sample call_variants tensorflow records file (scatter)
@Output:
Single-sample gVCF file with called variants
Single-sample VCF file with called variants
"""
input:
callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"),
success = join(workpath, "deepsomatic", "call_variants", "{name}.cv.success"),
output:
vcf = join(workpath, "deepsomatic", "somatic", "{name}.deepsomatic.vcf"),
params:
rname = "ds_postprovars",
genome = config['references']['GENOME'],
tmpdir = tmpdir,
message: "Running DeepSomatic postprocess_variants on '{input.callvar}' input file"
rname = "ds_postprovars",
genome = config['references']['GENOME'],
tmpdir = tmpdir,
callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"),
threads: int(allocated("threads", "deepsomatic_postprocess_variants", cluster))
container: config['images']['deepsomatic']
envmodules: config['tools']['deepsomatic']
Expand All @@ -968,9 +1077,10 @@ rule deepsomatic_postprocess_variants:
# Run DeepSomatic postprocess_variants
time postprocess_variants \\
--ref {params.genome} \\
--infile {input.callvar} \\
--infile {params.callvar} \\
--outfile {output.vcf} \\
--process_somatic=true
--process_somatic=true \\
--cpus={threads}
"""


Expand Down

0 comments on commit 1be576d

Please sign in to comment.