Skip to content

Commit

Permalink
Merge pull request #504 from broadinstitute/reconstructr
Browse files Browse the repository at this point in the history
add reconstructR workflow
  • Loading branch information
dpark01 authored Dec 19, 2023
2 parents d659825 + 9bedb51 commit e95b378
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 54 deletions.
5 changes: 5 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/nextclade_single.wdl
testParameterFiles:
- empty.json
- name: reconstruct_from_alignments
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/reconstruct_from_alignments.wdl
testParameterFiles:
- empty.json
- name: sarscov2_batch_relineage
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/sarscov2_batch_relineage.wdl
Expand Down
178 changes: 124 additions & 54 deletions pipes/WDL/tasks/tasks_interhost.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -166,29 +166,29 @@ task multi_align_mafft_ref {
String fasta_basename = basename(reference_fasta, '.fasta')
Int disk_size = 200

command {
command <<<
interhost.py --version | tee VERSION
interhost.py multichr_mafft \
${reference_fasta} ${sep=' ' assemblies_fasta} \
"~{reference_fasta}" ~{sep=' ' assemblies_fasta} \
. \
${'--ep=' + mafft_ep} \
${'--gapOpeningPenalty=' + mafft_gapOpeningPenalty} \
${'--maxiters=' + mafft_maxIters} \
--outFilePrefix align_mafft-${fasta_basename} \
~{'--ep=' + mafft_ep} \
~{'--gapOpeningPenalty=' + mafft_gapOpeningPenalty} \
~{'--maxiters=' + mafft_maxIters} \
--outFilePrefix "align_mafft-~{fasta_basename}" \
--preservecase \
--localpair \
--sampleNameListFile align_mafft-${fasta_basename}-sample_names.txt \
--sampleNameListFile "align_mafft-~{fasta_basename}-sample_names.txt" \
--loglevel DEBUG
}
>>>

output {
#File sampleNamesFile = "align_mafft-${fasta_basename}-sample_names.txt"
Array[File]+ alignments_by_chr = glob("align_mafft-${fasta_basename}*.fasta")
#File sampleNamesFile = "align_mafft-~{fasta_basename}-sample_names.txt"
Array[File]+ alignments_by_chr = glob("align_mafft-~{fasta_basename}*.fasta")
String viralngs_version = read_string("VERSION")
}

runtime {
docker: "${docker}"
docker: docker
memory: select_first([machine_mem_gb, 60]) + " GB"
cpu: 8
disks: "local-disk " + disk_size + " HDD"
Expand All @@ -212,29 +212,29 @@ task multi_align_mafft {

Int disk_size = 200

command {
command <<<
interhost.py --version | tee VERSION
interhost.py multichr_mafft \
${sep=' ' assemblies_fasta} \
~{sep=' ' assemblies_fasta} \
. \
${'--ep=' + mafft_ep} \
${'--gapOpeningPenalty=' + mafft_gapOpeningPenalty} \
${'--maxiters=' + mafft_maxIters} \
--outFilePrefix ${out_prefix} \
~{'--ep=' + mafft_ep} \
~{'--gapOpeningPenalty=' + mafft_gapOpeningPenalty} \
~{'--maxiters=' + mafft_maxIters} \
--outFilePrefix ~{out_prefix} \
--preservecase \
--localpair \
--sampleNameListFile ${out_prefix}-sample_names.txt \
--sampleNameListFile ~{out_prefix}-sample_names.txt \
--loglevel DEBUG
}
>>>

output {
File sampleNamesFile = "${out_prefix}-sample_names.txt"
Array[File] alignments_by_chr = glob("${out_prefix}*.fasta")
File sampleNamesFile = "~{out_prefix}-sample_names.txt"
Array[File] alignments_by_chr = glob("~{out_prefix}*.fasta")
String viralngs_version = read_string("VERSION")
}

runtime {
docker: "${docker}"
docker: docker
memory: select_first([machine_mem_gb, 30]) + " GB"
cpu: 8
disks: "local-disk " + disk_size + " HDD"
Expand Down Expand Up @@ -292,7 +292,7 @@ task beast {
# platform-agnostic number of GPUs we're actually using
Int gpu_count_used = select_first([accelerator_count, gpu_count, 1])

command {
command <<<
set -e
beast -beagle_info
nvidia-smi
Expand All @@ -314,7 +314,7 @@ task beast {
-beagle_scaling always \
~{'-beagle_order ' + beagle_order} \
~{beauti_xml}
}
>>>

output {
File beast_log = glob("*.log")[0]
Expand All @@ -326,7 +326,7 @@ task beast {
}

runtime {
docker: "${docker}"
docker: docker
memory: "7 GB"
cpu: 4
disks: "local-disk " + disk_size + " HDD"
Expand Down Expand Up @@ -356,15 +356,15 @@ task index_ref {

Int disk_size = 100

command {
command <<<
read_utils.py --version | tee VERSION
read_utils.py novoindex \
"${referenceGenome}" \
${"--NOVOALIGN_LICENSE_PATH=" + novocraft_license}
"~{referenceGenome}" \
~{"--NOVOALIGN_LICENSE_PATH=" + novocraft_license}

read_utils.py index_fasta_samtools "${referenceGenome}"
read_utils.py index_fasta_picard "${referenceGenome}"
}
read_utils.py index_fasta_samtools "~{referenceGenome}"
read_utils.py index_fasta_picard "~{referenceGenome}"
>>>

output {
File referenceNix = "*.nix"
Expand All @@ -373,7 +373,7 @@ task index_ref {
String viralngs_version = read_string("VERSION")
}
runtime {
docker: "${docker}"
docker: docker
cpu: 2
memory: select_first([machine_mem_gb, 4]) + " GB"
disks: "local-disk " + disk_size + " HDD"
Expand All @@ -395,15 +395,15 @@ task trimal_clean_msa {

Int disk_size = 100

command {
trimal -fasta -automated1 -in "${in_aligned_fasta}" -out "${input_basename}_trimal_cleaned.fasta"
}
command <<<
trimal -fasta -automated1 -in "~{in_aligned_fasta}" -out "~{input_basename}_trimal_cleaned.fasta"
>>>

output {
File trimal_cleaned_fasta = "${input_basename}_trimal_cleaned.fasta"
File trimal_cleaned_fasta = "~{input_basename}_trimal_cleaned.fasta"
}
runtime {
docker: "${docker}"
docker: docker
memory: select_first([machine_mem_gb, 7]) + " GB"
cpu: 4
disks: "local-disk " + disk_size + " HDD"
Expand All @@ -429,7 +429,7 @@ task merge_vcfs_bcftools {
patterns: ["*.vcf.gz"] }
}

command {
command <<<

# copy files to CWD (required for tabix indexing)
INFILES=~{write_lines(in_vcfs_gz)}
Expand All @@ -454,15 +454,15 @@ task merge_vcfs_bcftools {

# tabix index the vcf to create .tbi file
tabix -p vcf ${output_prefix}.vcf.gz
}
>>>

output {
File merged_vcf_gz = "${output_prefix}.vcf.gz"
File merged_vcf_gz_tbi = "${output_prefix}.vcf.gz.tbi"
File merged_vcf_gz = "~{output_prefix}.vcf.gz"
File merged_vcf_gz_tbi = "~{output_prefix}.vcf.gz.tbi"
}

runtime {
docker: "${docker}"
docker: docker
memory: select_first([machine_mem_gb, 3]) + " GB"
cpu: 2
dx_instance_type: "mem1_ssd1_v2_x2"
Expand Down Expand Up @@ -492,42 +492,112 @@ task merge_vcfs_gatk {
}
}

command {
command <<<

# tabix index input vcfs (must be gzipped)
parallel -I ,, \
"tabix -p vcf ,," \
::: "${sep=' ' in_vcfs_gz}"
::: "~{sep=' ' in_vcfs_gz}"

# index reference to create .fai and .dict indices
samtools faidx "${ref_fasta}"
picard CreateSequenceDictionary R="${ref_fasta}" O=$(basename $(basename "${ref_fasta}" .fasta) .fa).dict
samtools faidx "~{ref_fasta}"
picard CreateSequenceDictionary R="~{ref_fasta}" O=$(basename $(basename "~{ref_fasta}" .fasta) .fa).dict

# store input vcf file paths in file
for invcf in $(echo "${sep=' ' in_vcfs_gz}"); do
for invcf in $(echo "~{sep=' ' in_vcfs_gz}"); do
echo "$invcf" > input_vcfs.list
done

# merge
gatk3 -T CombineVariants -R "${ref_fasta}" -V input_vcfs.list -o "${output_prefix}.vcf" -genotypeMergeOptions UNIQUIFY
gatk3 -T CombineVariants -R "~{ref_fasta}" -V input_vcfs.list -o "~{output_prefix}.vcf" -genotypeMergeOptions UNIQUIFY

# bgzip output
bgzip "${output_prefix}.vcf"
bgzip "~{output_prefix}.vcf"

# tabix index the vcf to create .tbi file
tabix -p vcf "${output_prefix}.vcf.gz"
}
tabix -p vcf "~{output_prefix}.vcf.gz"
>>>

output {
File merged_vcf_gz = "${output_prefix}.vcf.gz"
File merged_vcf_gz_tbi = "${output_prefix}.vcf.gz.tbi"
File merged_vcf_gz = "~{output_prefix}.vcf.gz"
File merged_vcf_gz_tbi = "~{output_prefix}.vcf.gz.tbi"
}

runtime {
docker: "${docker}"
docker: docker
memory: select_first([machine_mem_gb, 3]) + " GB"
cpu: 2
dx_instance_type: "mem1_ssd1_v2_x2"
maxRetries: 2
}
}

task reconstructr {
input {
File msa_fasta
File ref_fasta
File date_csv
File depth_csv
Array[File]+ lofreq_vcfs
Int n_iters

String out_basename = "reconstructR"

Int cpus = 8
Int machine_mem_gb = 15
Int disk_size = 375
String docker = "ghcr.io/broadinstitute/reconstructr:main"
}

command <<<
set -e -o pipefail

# stage input files
mkdir -p input_data input_data/vcf input_data/coverage
/opt/reconstructR/scripts/cp_and_decompress.sh "~{msa_fasta}" input_data/aligned.fasta
/opt/reconstructR/scripts/cp_and_decompress.sh "~{ref_fasta}" input_data/ref.fasta
/opt/reconstructR/scripts/cp_and_decompress.sh "~{date_csv}" input_data/date.csv
/opt/reconstructR/scripts/cp_and_decompress.sh "~{depth_csv}" input_data/depth.csv
/opt/reconstructR/scripts/mcp_and_decompress.sh "~{sep='" "' lofreq_vcfs}" input_data/vcf

# run reconstructR
R --no-save<<CODE
library(reconstructR)
results <- run_mcmc(~{n_iters})
p <- visualize(results)
write.table(tabulate(results), quote=FALSE, sep='\t', row.names=FALSE,
file="~{out_basename}-tabulate.tsv")
write.table(decipher(results), quote=FALSE, sep='\t', row.names=FALSE,
file="~{out_basename}-decipher.tsv")
save(results, file="~{out_basename}-states.Rdata")
CODE
# compress outputs
pigz -9 -p $(nproc) "~{out_basename}-tabulate.tsv" "~{out_basename}-decipher.tsv" "~{out_basename}-states.Rdata"
# profiling and stats
cat /proc/uptime | cut -f 1 -d ' ' > UPTIME_SEC
cat /proc/loadavg > CPU_LOAD
{ if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.peak ]; then cat /sys/fs/cgroup/memory/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } > MEM_BYTES
>>>
output {
File tabulated_tsv_gz = "~{out_basename}-tabulate.tsv.gz"
File deciphered_tsv_gz = "~{out_basename}-decipher.tsv.gz"
File mcmc_states_Rdata_gz = "~{out_basename}-states.Rdata.gz"
Int max_ram_gb = ceil(read_float("MEM_BYTES")/1000000000)
Int runtime_sec = ceil(read_float("UPTIME_SEC"))
String cpu_load = read_string("CPU_LOAD")
}
runtime {
docker: docker
memory: machine_mem_gb + " GB"
cpu: cpus
disks: "local-disk " + disk_size + " HDD"
disk: disk_size + " GB" # TES
bootDiskSizeGb: 50
dx_instance_type: "mem1_ssd1_v2_x4"
maxRetries: 1
}
}
5 changes: 5 additions & 0 deletions pipes/WDL/tasks/tasks_ncbi_tools.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ task Fetch_SRA_to_BAM {
input {
String SRA_ID

String? sample_name
Int? machine_mem_gb
String docker = "quay.io/broadinstitute/ncbi-tools:2.10.7.10"
}
Expand All @@ -27,6 +28,10 @@ task Fetch_SRA_to_BAM {
LIBRARY=$(jq -r .EXPERIMENT_PACKAGE_SET.EXPERIMENT_PACKAGE.EXPERIMENT.alias SRA.json)
RUNDATE=$(jq -r '.EXPERIMENT_PACKAGE_SET.EXPERIMENT_PACKAGE.RUN_SET.RUN.SRAFiles|if (.SRAFile|type) == "object" then .SRAFile.date else [.SRAFile[]|select(.supertype == "Original")][0].date end' SRA.json | cut -f 1 -d ' ')

if [[ -n "~{sample_name}" ]]; then
SAMPLE="~{sample_name}"
fi

if [ "$PLATFORM" = "OXFORD_NANOPORE" ]; then
# per the SAM/BAM specification
SAM_PLATFORM="ONT"
Expand Down
26 changes: 26 additions & 0 deletions pipes/WDL/tasks/tasks_read_utils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@ task group_bams_by_sample {
}
}
task get_bam_samplename {
input {
File bam
String docker = "quay.io/broadinstitute/viral-core:2.1.33"
}
Int disk_size = round(size(bam, "GB")) + 50
command <<<
set -e -o pipefail
samtools view -H "~{bam}" | \
perl -lane 'if (/^\@RG\t.*SM:(\S+)/) { print "$1" }' | \
sort | uniq > SAMPLE_NAME
>>>
runtime {
docker: docker
memory: "1 GB"
cpu: 1
disks: "local-disk " + disk_size + " HDD"
disk: disk_size + " GB" # TES
dx_instance_type: "mem1_ssd1_v2_x2"
maxRetries: 2
}
output {
String sample_name = read_string("SAMPLE_NAME")
}
}
task get_sample_meta {
input {
Array[File] samplesheets_extended
Expand Down
Loading

0 comments on commit e95b378

Please sign in to comment.