Skip to content

Commit

Permalink
more refactor/clean
Browse files Browse the repository at this point in the history
  • Loading branch information
dpark01 committed Aug 20, 2024
1 parent 26e67f3 commit 7b51e63
Showing 1 changed file with 41 additions and 62 deletions.
103 changes: 41 additions & 62 deletions pipes/WDL/tasks/tasks_terra.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -411,80 +411,60 @@ task create_or_update_sample_tables {
print(workspace_project + "\n" + workspace_name + "\n" + "bucket: " + workspace_bucket)
# get_entities -> python list of dicts
def get_entities_to_table(project, workspace, table_name):
table = json.loads(fapi.get_entities(project, workspace, table_name).text)
headers = collections.OrderedDict()
rows = []
headers[table_name + "_id"] = 0
for row in table:
outrow = row['attributes']
for x in outrow.keys():
headers[x] = 0
if type(outrow[x]) == dict and set(outrow[x].keys()) == set(('itemsType', 'items')):
outrow[x] = outrow[x]['items']
outrow[table_name + "_id"] = row['name']
rows.append(outrow)
return (headers, rows)
# # populate sample table from run outputs
# In particular, popualte the raw_bam and cleaned_bam columns
# create tsvs for row insertion based on input bam lists
# create tsv to populate library table with raw_bam and cleaned_bam columns
print(f"creating library->bam mapping tsv files from file input")
raw_bams_list = '~{sep="*" raw_reads_unaligned_bams}'.split('*')
raw_library_id_list = [bam.split("/")[-1].replace(".bam", "") for bam in raw_bams_list]
df_library_table = pd.DataFrame({"entity:library_id" : raw_library_id_list, "raw_bam" : raw_bams_list})
raw_library_fname = flowcell_data_id + "-raw_bams.tsv"
df_library_table.to_csv(raw_library_fname, sep="\t", index=False)
cleaned_bams_list = '~{sep="*" cleaned_reads_unaligned_bams}'.split('*')
cleaned_library_id_list = [bam.split("/")[-1].replace(".bam", "").replace(".cleaned", "") for bam in cleaned_bams_list]
df_library_table = pd.DataFrame({"entity:library_id" : cleaned_library_id_list, "cleaned_bam" : cleaned_bams_list})
cleaned_library_fname = flowcell_data_id + "-cleaned_bams.tsv"
df_library_table.to_csv(cleaned_library_fname, sep="\t", index=False)
# call the create_tsv function and save files to data tables
tsv_list = [cleaned_library_fname, raw_library_fname]
print (f"wrote outputs to {tsv_list}")
for tsv in tsv_list:
# ToDo: check whether subject to race condition (and if so, implement via async/promises)
response = fapi.upload_entities_tsv(workspace_project, workspace_name, tsv, model="flexible")
if response.status_code != 200:
print('ERROR UPLOADING: See full error message:')
print(response.text)
else:
print("Upload complete. Check your workspace for new table!")
raw_bams_list = '~{sep="*" raw_reads_unaligned_bams}'.split('*')
raw_library_id_list = [bam.split("/")[-1].replace(".bam", "") for bam in raw_bams_list]
df_library_table_raw_bams = pd.DataFrame({"entity:library_id" : raw_library_id_list, "raw_bam" : raw_bams_list})
# # update sample_set with new set memberships and flowcell metadata
cleaned_bams_list = '~{sep="*" cleaned_reads_unaligned_bams}'.split('*')
cleaned_library_id_list = [bam.split("/")[-1].replace(".bam", "").replace(".cleaned", "") for bam in cleaned_bams_list]
df_library_table_clean_bams = pd.DataFrame({"entity:library_id" : cleaned_library_id_list, "cleaned_bam" : cleaned_bams_list})
# API call to get existing sample->library mappings
header, rows = get_entities_to_table(workspace_project, workspace_name, "sample")
df_sample = pd.DataFrame.from_records(rows, columns=header, index="sample_id")
df_library_bams = pd.merge(df_library_table_raw_bams, df_library_table_clean_bams, on="entity:library_id", how="outer")
library_bams_tsv = flowcell_data_id + "-all_bams.tsv"
df_library_bams.to_csv(library_bams_tsv, sep="\t", index=False)
# API call to get all existing library ids
header, rows = get_entities_to_table(workspace_project, workspace_name, "library")
df_library = pd.DataFrame.from_records(rows, columns=header, index="library_id")
# # update sample_set with new set memberships and flowcell metadata
# load library metadata from json
with open('~{meta_by_filename_json}',"r") as meta_fp:
library_meta_dict = json.load(meta_fp)
print("json describes {} libraries".format(len(library_meta_dict)))
# grab the meta_by_filename values to create new sample->library (sample_set->sample) mappings
# restrict to libraries/samples that we actually have bam files for
sample_to_libraries = {}
libraries_already_in_table = set()
libraries_in_bams = set()
for library_id, data in library_meta_dict.items():
sample_id = data['sample']
sample_to_libraries.setdefault(sample_id, [])
if library_id in df_library.index:
if library_id in df_library_bams.index:
sample_to_libraries[sample_id].append(library_id)
libraries_already_in_table.add(library_id)
libraries_in_bams.add(library_id)
else:
print (f"missing {library_id} from library table")
print("json describes {} unique samples".format(len(sample_to_libraries)))
print("json describes {} libraries already present in Terra table and {} new to Terra table".format(len(libraries_already_in_table), len(library_meta_dict) - len(libraries_already_in_table)))
print (f"missing {library_id} from bam list")
print("json describes {} libraries from {} unique samples".format(len(library_meta_dict), len(sample_to_libraries)))
print("json describes {} libraries we have bam files for and {} libraries we will ignore".format(len(libraries_in_bams), len(library_meta_dict) - len(libraries_in_bams)))
# API call to get existing sample->library mappings <-- THIS IS THE VOLATILE PART
# get_entities -> python list of dicts
def get_entities_to_table(project, workspace, table_name):
table = json.loads(fapi.get_entities(project, workspace, table_name).text)
headers = collections.OrderedDict()
rows = []
headers[table_name + "_id"] = 0
for row in table:
outrow = row['attributes']
for x in outrow.keys():
headers[x] = 0
if type(outrow[x]) == dict and set(outrow[x].keys()) == set(('itemsType', 'items')):
outrow[x] = outrow[x]['items']
outrow[table_name + "_id"] = row['name']
rows.append(outrow)
return (headers, rows)
header, rows = get_entities_to_table(workspace_project, workspace_name, "sample")
df_sample = pd.DataFrame.from_records(rows, columns=header, index="sample_id")
# merge in new sample->library mappings with any pre-existing sample->library mappings
if len(df_sample)>0:
Expand All @@ -508,7 +488,7 @@ task create_or_update_sample_tables {
for sample_id, libraries in sample_to_libraries.items():
outf.write(f'{sample_id}\t{json.dumps([{"entityType":"library","entityName":library_name} for library_name in libraries])}\n')
# prepare library metadata
# create tsv to populate library table with metadata from json
library_meta_fname = "library_metadata.tsv"
with open(library_meta_fname, 'wt') as outf:
outf.write("entity:")
Expand All @@ -525,8 +505,8 @@ task create_or_update_sample_tables {
out_rows.append(out_row)
writer.writerows(out_rows)
# write them to the Terra table!
for fname in (library_meta_fname,sample_fname):
# write everything to the Terra table! -- TO DO: move this to separate task
for fname in (library_bams_tsv, library_meta_fname, sample_fname):
response = fapi.upload_entities_tsv(workspace_project, workspace_name, fname, model="flexible")
if response.status_code != 200:
print(f'ERROR UPLOADING {fname}: See full error message:')
Expand All @@ -546,8 +526,7 @@ task create_or_update_sample_tables {
output {
File library_metadata_tsv = "library_metadata.tsv"
File sample_membership_tsv = "sample_membership.tsv"
File library_cleaned_bams_tsv = "~{flowcell_run_id}-cleaned_bams.tsv"
File library_raw_bams_tsv = "~{flowcell_run_id}-raw_bams.tsv"
File library_bams_tsv = "~{flowcell_run_id}-all_bams.tsv"
File stdout_log = stdout()
File stderr_log = stderr()
Int max_ram_gb = ceil(read_float("MEM_BYTES")/1000000000)
Expand Down

0 comments on commit 7b51e63

Please sign in to comment.