Skip to content

Commit

Permalink
Merge pull request #552 from broadinstitute/dp-demux
Browse files Browse the repository at this point in the history
demux_deplete small updates
  • Loading branch information
dpark01 committed Aug 22, 2024
2 parents 5623df0 + 22aa08c commit 7bbfee4
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 10 deletions.
8 changes: 4 additions & 4 deletions pipes/WDL/tasks/tasks_ncbi.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ task gisaid_meta_prep {
out_headers = ('submitter', 'fn', 'covv_virus_name', 'covv_type', 'covv_passage', 'covv_collection_date', 'covv_location', 'covv_add_location', 'covv_host', 'covv_add_host_info', 'covv_sampling_strategy', 'covv_gender', 'covv_patient_age', 'covv_patient_status', 'covv_specimen', 'covv_outbreak', 'covv_last_vaccinated', 'covv_treatment', 'covv_seq_technology', 'covv_assembly_method', 'covv_coverage', 'covv_orig_lab', 'covv_orig_lab_addr', 'covv_provider_sample_id', 'covv_subm_lab', 'covv_subm_lab_addr', 'covv_subm_sample_id', 'covv_authors', 'covv_comment', 'comment_type')
with open('~{out_name}', 'wt') as outf:
with open('~{out_name}', 'w', newline='') as outf:
writer = csv.DictWriter(outf, out_headers, dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
Expand Down Expand Up @@ -615,13 +615,13 @@ task biosample_to_table {
if v and (k not in biosample_headers) and k not in ('message', 'accession'):
biosample_headers.append(k)
print("biosample headers ({}): {}".format(len(biosample_headers), biosample_headers))
print("biosample rows ({})".format(len(biosample_attributes)))
print("biosample output rows ({})".format(len(biosample_attributes)))
samples_seen_without_biosample = set(sample_names_seen) - set(row['sample_name'] for row in biosample_attributes)
print("samples seen in bams without biosample entries ({}): {}".format(len(samples_seen_without_biosample), sorted(samples_seen_without_biosample)))
# write reformatted table
with open('~{base}.entities.tsv', 'wt') as outf:
writer = csv.DictWriter(outf, delimiter='\t', fieldnames=["~{sanitized_id_col}"]+biosample_headers, quoting=csv.QUOTE_MINIMAL)
with open('~{base}.entities.tsv', 'w', newline='') as outf:
writer = csv.DictWriter(outf, delimiter='\t', fieldnames=["~{sanitized_id_col}"]+biosample_headers, dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for row in biosample_attributes:
outrow = {h: row[h] for h in biosample_headers}
Expand Down
2 changes: 1 addition & 1 deletion pipes/WDL/tasks/tasks_nextstrain.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ task derived_cols {
out_headers.extend(adder.extra_headers())
with open_or_gzopen(out_tsv, 'wt') as outf:
writer = csv.DictWriter(outf, out_headers, delimiter='\t')
writer = csv.DictWriter(outf, out_headers, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for row in reader:
for adder in adders:
Expand Down
24 changes: 20 additions & 4 deletions pipes/WDL/tasks/tasks_terra.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ task download_entities_tsv {
rows.append(outrow)
# dump to tsv
with open(out_fname, 'wt') as outf:
with open(out_fname, 'w', newline='') as outf:
writer = csv.DictWriter(outf, headers.keys(), delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
writer.writerows(rows)
Expand Down Expand Up @@ -385,6 +385,8 @@ task create_or_update_sample_tables {
Array[String] cleaned_reads_unaligned_bams
File meta_by_filename_json
File? read_counts_raw_json
File? read_counts_cleaned_json
String sample_table_name = "sample"
String library_table_name = "library"
Expand All @@ -397,6 +399,7 @@ task create_or_update_sample_tables {
}
command <<<
set -e
python3<<CODE
flowcell_data_id = '~{flowcell_run_id}'
workspace_project = '~{workspace_namespace}'
Expand All @@ -413,6 +416,16 @@ task create_or_update_sample_tables {
print(workspace_project + "\n" + workspace_name)
# process read counts if available
read_counts_raw = {}
read_counts_cleaned = {}
if '~{default="" read_counts_raw_json}':
with open('~{default="" read_counts_raw_json}','rt') as inf:
read_counts_raw = {pair['left']: pair['right'] for pair in json.load(inf)}
if '~{default="" read_counts_cleaned_json}':
with open('~{default="" read_counts_cleaned_json}','rt') as inf:
read_counts_cleaned = {pair['left']: pair['right'] for pair in json.load(inf)}
# create tsv to populate library table with raw_bam and cleaned_bam columns
raw_bams_list = '~{sep="*" raw_reads_unaligned_bams}'.split('*')
raw_library_id_list = [bam.split("/")[-1].replace(".bam", "") for bam in raw_bams_list]
Expand All @@ -435,9 +448,9 @@ task create_or_update_sample_tables {
# create tsv to populate library table with metadata from demux json / samplesheet
# to do: maybe just merge this into df_library_bams instead and make a single tsv output
library_meta_fname = "library_metadata.tsv"
with open(library_meta_fname, 'wt') as outf:
with open(library_meta_fname, 'w', newline='') as outf:
copy_cols = ["sample_original", "spike_in", "control", "batch_lib", "library", "lane", "library_id_per_sample", "library_strategy", "library_source", "library_selection", "design_description"]
out_header = [lib_col_name] + copy_cols
out_header = [lib_col_name, 'flowcell', 'read_count_raw', 'read_count_cleaned'] + copy_cols
print(f"library_metadata.tsv output header: {out_header}")
writer = csv.DictWriter(outf, out_header, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
Expand All @@ -447,6 +460,9 @@ task create_or_update_sample_tables {
if library['run'] in library_bam_names:
out_row = {col: library.get(col, '') for col in copy_cols}
out_row[lib_col_name] = library['run']
out_row['flowcell'] = flowcell_data_id
out_row['read_count_raw'] = read_counts_raw.get(library['run'], '')
out_row['read_count_cleaned'] = read_counts_cleaned.get(library['run'], '')
out_rows.append(out_row)
writer.writerows(out_rows)
Expand Down Expand Up @@ -498,7 +514,7 @@ task create_or_update_sample_tables {
print (f"\tsample {sample_id} pre-exists in Terra table, merging old members {already_associated_libraries} with new members {libraries}")
merged_sample_ids.add(sample_id)
outf.write(f'{sample_id}\t{json.dumps([{"entityType":"library","entityName":library_name} for library_name in libraries])}\n')
outf.write(f'{sample_id}\t{json.dumps([{"entityType":"~{library_table_name}","entityName":library_name} for library_name in libraries])}\n')
print(f"wrote {len(sample_to_libraries)} samples to {sample_fname} where {len(merged_sample_ids)} samples were already in the Terra table")
# write everything to the Terra table! -- TO DO: move this to separate task
Expand Down
12 changes: 11 additions & 1 deletion pipes/WDL/workflows/demux_deplete.wdl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
version 1.0

#DX_SKIP_WORKFLOW
import "../tasks/tasks_demux.wdl" as demux
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_reports.wdl" as reports
Expand Down Expand Up @@ -176,6 +178,8 @@ workflow demux_deplete {
if (read_count_post_depletion < min_reads_per_bam) {
File empty_bam = raw_reads
}
Pair[String,Int] count_raw = (basename(raw_reads, '.bam'), spikein.reads_total)
Pair[String,Int] count_cleaned = (basename(raw_reads, '.bam'), read_count_post_depletion)
}

if(defined(biosample_map)) {
Expand Down Expand Up @@ -214,7 +218,9 @@ workflow demux_deplete {

raw_reads_unaligned_bams = flatten(illumina_demux.raw_reads_unaligned_bams),
cleaned_reads_unaligned_bams = select_all(cleaned_bam_passing),
meta_by_filename_json = meta_filename.merged_json
meta_by_filename_json = meta_filename.merged_json,
read_counts_raw_json = write_json(count_raw),
read_counts_cleaned_json = write_json(count_cleaned)
}
if(defined(biosample_map)) {
Expand Down Expand Up @@ -279,6 +285,10 @@ workflow demux_deplete {
File run_info_json = illumina_demux.run_info_json[0]
String run_id = illumina_demux.run_info[0]['run_id']
File? terra_library_table = create_or_update_sample_tables.library_metadata_tsv
File? terra_sample_library_map = create_or_update_sample_tables.sample_membership_tsv
File? terra_sample_metadata = biosample_to_table.sample_meta_tsv
String demux_viral_core_version = illumina_demux.viralngs_version[0]
}
}
2 changes: 2 additions & 0 deletions pipes/WDL/workflows/sarscov2_illumina_full.wdl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
version 1.0

#DX_SKIP_WORKFLOW
import "../tasks/tasks_read_utils.wdl" as read_utils
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_nextstrain.wdl" as nextstrain
Expand Down

0 comments on commit 7bbfee4

Please sign in to comment.