Skip to content

Commit

Permalink
workaround some cromwell delocalization limitations
Browse files Browse the repository at this point in the history
  • Loading branch information
dpark01 committed Mar 20, 2024
1 parent 8c2de26 commit f42a420
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
19 changes: 15 additions & 4 deletions pipes/WDL/tasks/tasks_assembly.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,23 @@ task select_references {
--loglevel=DEBUG

# create basename-only version of ref_clusters output file
# create tar-bundles of ref_clusters fastas, since Cromwell doesn't delocalize files in a Array[Array[File]] = read_tsv
python3 <<CODE
import os.path
import os, os.path, shutil, tarfile
os.mkdir("clusters")
with open("~{contigs_basename}.ref_clusters.tsv", 'r') as inf:
with open("~{contigs_basename}.ref_clusters.basenames.tsv", 'w') as outf:
for line in inf:
fnames = line.strip().split('\t')
fnames = list([f[:-6] if f.endswith('.fasta') else f for f in map(os.path.basename, fnames)])
outf.write('\t'.join(fnames) + '\n')
assert fnames
basefnames = list([f[:-6] if f.endswith('.fasta') else f for f in map(os.path.basename, fnames)])
outf.write('\t'.join(basefnames) + '\n')
with tarfile.open(os.path.join("clusters", basefnames[0] + "." + str(len(basefnames)) + ".tar.gz"), "w:gz") as tarball:
for f in fnames:
shutil.copy(f, ".")
tarball.add(os.path.basename(f))
os.unlink(os.path.basename(f))
CODE
# create top-hits output files
Expand All @@ -147,7 +156,7 @@ task select_references {
>>>
output {
Array[Array[File]] matched_reference_clusters_fastas = read_tsv("~{contigs_basename}.ref_clusters.tsv")
Array[File] matched_reference_clusters_fastas_tars = glob("clusters/*.tar.gz")
Array[Array[String]] matched_reference_clusters_basenames = read_tsv("~{contigs_basename}.ref_clusters.basenames.tsv")
Array[String] top_matches_per_cluster_basenames = read_lines("TOP_FASTAS_BASENAMES")
Array[File] top_matches_per_cluster_fastas = read_lines("TOP_FASTAS")
Expand Down Expand Up @@ -279,6 +288,7 @@ task scaffold {
cut -f 3 "~{sample_name}.refs_skani_dist.full.tsv" | tail +2 | head -1 > SKANI_ANI
cut -f 4 "~{sample_name}.refs_skani_dist.full.tsv" | tail +2 | head -1 > SKANI_REF_AF
cut -f 5 "~{sample_name}.refs_skani_dist.full.tsv" | tail +2 | head -1 > SKANI_CONTIGS_AF
basename "$CHOSEN_REF_FASTA" .fasta > CHOSEN_REF_BASENAME
assembly.py order_and_orient \
"~{contigs_fasta}" \
Expand Down Expand Up @@ -345,6 +355,7 @@ task scaffold {
Int reference_num_segments_required = read_int("reference_num_segments_required")
Int reference_length = read_int("reference_length")
Array[String] scaffolding_chosen_ref_names = read_lines("~{sample_name}.scaffolding_chosen_refs.txt")
String scaffolding_chosen_ref_basename = read_string("CHOSEN_REF_BASENAME")
File scaffolding_chosen_ref = "~{sample_name}.scaffolding_chosen_ref.fasta"
File scaffolding_stats = "~{sample_name}.refs_skani_dist.full.tsv"
File scaffolding_alt_contigs = "~{sample_name}.scaffolding_alt_contigs.fasta"
Expand Down
29 changes: 29 additions & 0 deletions pipes/WDL/tasks/tasks_utils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,35 @@ task sed {
}
}
task tar_extract {
meta {
description: "Extract a tar file"
}
input {
File tar_file
Int disk_size = 375
String tar_opts = "-z"
}
command <<<
mkdir -p unpack
cd unpack
tar -xv ~{tar_opts} -f "~{tar_file}"
>>>
runtime {
docker: "quay.io/broadinstitute/viral-baseimage:0.2.0"
memory: "2 GB"
cpu: 1
disks: "local-disk " + disk_size + " LOCAL"
disk: disk_size + " GB" # TES
dx_instance_type: "mem1_ssd1_v2_x2"
maxRetries: 2
preemptible: true
}
output {
Array[File] files = glob("unpack/*")
}
}
task fasta_to_ids {
meta {
description: "Return the headers only from a fasta file"
Expand Down
17 changes: 11 additions & 6 deletions pipes/WDL/workflows/scaffold_and_refine_multitaxa.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import "assemble_refbased.wdl" as assemble_refbased
workflow scaffold_and_refine_multitaxa {
meta {
description: "Scaffold de novo contigs against a set of possible references and subsequently polish with reads."
description: "Scaffold de novo contigs against a set of possible references and subsequently polish with reads. This workflow accepts a very large set of input reference genomes. It will subset the reference genomes to those with ANI hits to the provided contigs/MAGs and cluster the reference hits by any ANI similarity to each other. It will choose the top reference from each cluster and produce one assembly for each cluster. This is intended to allow for the presence of multiple diverse viral taxa (coinfections) while forcing a choice of the best assembly from groups of related reference genomes."
author: "Broad Viral Genomics"
email: "viral-ngs@broadinstitute.org"
allowNestedInputs: true
Expand Down Expand Up @@ -51,14 +51,19 @@ workflow scaffold_and_refine_multitaxa {

# assemble and produce stats for every reference cluster
Array[String] assembly_header = ["entity:assembly_id", "assembly_name", "sample_id", "sample_name", "taxid", "tax_name", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "scaffolding_num_segments_recovered", "reference_num_segments_required", "reference_length", "reference_accessions", "skani_num_ref_clusters", "skani_this_cluster_num_refs", "skani_dist_tsv", "scaffolding_ani", "scaffolding_pct_ref_cov", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned", "coverage_genbank", "assembly_method", "sample"]
scatter(ref_cluster in select_references.matched_reference_clusters_fastas) {
scatter(ref_cluster_tar in select_references.matched_reference_clusters_fastas_tars) {

call utils.tar_extract {
input:
tar_file = ref_cluster_tar
}

# assemble (scaffold-and-refine) genome against this reference cluster
call assembly.scaffold {
input:
reads_bam = reads_unmapped_bam,
contigs_fasta = contigs_fasta,
reference_genome_fasta = ref_cluster,
reference_genome_fasta = tar_extract.files,
min_length_fraction = 0,
min_unambig = 0,
allow_incomplete_output = true
Expand Down Expand Up @@ -120,8 +125,8 @@ workflow scaffold_and_refine_multitaxa {
"reference_length" : scaffold.reference_length,
"reference_accessions" : tax_lookup.map["accessions"],

"skani_num_ref_clusters" : length(select_references.matched_reference_clusters_basenames),
"skani_this_cluster_num_refs" : length(ref_cluster),
"skani_num_ref_clusters" : length(select_references.matched_reference_clusters_fastas_tars),
"skani_this_cluster_num_refs" : length(tar_extract.files),
"skani_dist_tsv" : scaffold.scaffolding_stats,
"scaffolding_ani" : scaffold.skani_ani,
"scaffolding_pct_ref_cov" : scaffold.skani_ref_aligned_frac,
Expand Down Expand Up @@ -164,7 +169,7 @@ workflow scaffold_and_refine_multitaxa {
String assembly_method = "viral-ngs/scaffold_and_refine_multitaxa"

#String assembly_top_taxon_id = select_references.top_matches_per_cluster_basenames[0]
Int skani_num_ref_clusters = length(select_references.matched_reference_clusters_basenames)
Int skani_num_ref_clusters = length(select_references.matched_reference_clusters_fastas_tars)
File skani_contigs_to_refs_dist_tsv = select_references.skani_dist_full_tsv

Array[String] assembly_all_taxids = taxid
Expand Down

0 comments on commit f42a420

Please sign in to comment.