diff --git a/.dockstore.yml b/.dockstore.yml index b8e53db2c..a3fba3a43 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -115,6 +115,11 @@ workflows: primaryDescriptorPath: /pipes/WDL/workflows/demux_deplete.wdl testParameterFiles: - empty.json + - name: demux_deplete_and_table_insert + subclass: WDL + primaryDescriptorPath: /pipes/WDL/workflows/demux_deplete_and_table_insert.wdl + testParameterFiles: + - empty.json - name: demux_plus subclass: WDL primaryDescriptorPath: /pipes/WDL/workflows/demux_plus.wdl @@ -259,6 +264,11 @@ workflows: primaryDescriptorPath: /pipes/WDL/workflows/nextclade_single.wdl testParameterFiles: - empty.json + - name: populate_library_and_sample_tables_from_flowcell + subclass: WDL + primaryDescriptorPath: /pipes/WDL/workflows/populate_library_and_sample_tables_from_flowcell.wdl + testParameterFiles: + - empty.json - name: reconstruct_from_alignments subclass: WDL primaryDescriptorPath: /pipes/WDL/workflows/reconstruct_from_alignments.wdl diff --git a/pipes/WDL/tasks/tasks_demux.wdl b/pipes/WDL/tasks/tasks_demux.wdl index 43fc7399e..c3e2d4ff5 100644 --- a/pipes/WDL/tasks/tasks_demux.wdl +++ b/pipes/WDL/tasks/tasks_demux.wdl @@ -383,7 +383,7 @@ task illumina_demux { --threads $num_fastqc_threads" \ ::: $(cat $OUT_BASENAMES) - mv metrics.txt "~{out_base}-demux_metrics.txt" + mv metrics.txt "~{out_base}-demux_metrics.txt" mv runinfo.json "~{out_base}-runinfo.json" cat /proc/uptime | cut -f 1 -d ' ' > UPTIME_SEC diff --git a/pipes/WDL/tasks/tasks_reports.wdl b/pipes/WDL/tasks/tasks_reports.wdl index d49084c11..4b44f5ed9 100644 --- a/pipes/WDL/tasks/tasks_reports.wdl +++ b/pipes/WDL/tasks/tasks_reports.wdl @@ -530,12 +530,10 @@ task MultiQC { input { Array[File] input_files - Boolean force = false - Boolean full_names = false String? title String? comment String? file_name - String out_dir = "./multiqc-output" + String out_dir = "./multiqc-output" String? template String? tag String? ignore_analysis_files @@ -543,15 +541,17 @@ task MultiQC { File? sample_names Array[String]? exclude_modules Array[String]? module_to_use - Boolean data_dir = false - Boolean no_data_dir = false String? output_data_format - Boolean zip_data_dir = false - Boolean export = false - Boolean flat = false - Boolean interactive = true - Boolean lint = false - Boolean pdf = false + Boolean force = false + Boolean full_names = false + Boolean data_dir = false + Boolean no_data_dir = false + Boolean zip_data_dir = false + Boolean export = false + Boolean flat = false + Boolean interactive = true + Boolean lint = false + Boolean pdf = false Boolean megaQC_upload = false # Upload generated report to MegaQC if MegaQC options are found File? config # directory String? config_yaml diff --git a/pipes/WDL/tasks/tasks_terra.wdl b/pipes/WDL/tasks/tasks_terra.wdl index ecb8ca40f..275b27d4f 100644 --- a/pipes/WDL/tasks/tasks_terra.wdl +++ b/pipes/WDL/tasks/tasks_terra.wdl @@ -38,7 +38,8 @@ task check_terra_env { description: "task for inspection of backend to determine whether the task is running on Terra and/or GCP" } command <<< - set -ex + # set -x # echo commands upon execution [commented out to avoid leaking the gcloud auth token] + set -e # exit on pipe fail # create gcloud-related output file touch gcloud_config_info.log @@ -50,6 +51,12 @@ task check_terra_env { touch workspace_bucket_path.txt touch input_table_name.txt touch input_row_id.txt + touch method_version.txt + touch method_source.txt + touch method_path.txt + + # disable the version update alert messages gcloud sometimes emits when executing any command + gcloud config set component_manager/disable_update_check true # write system environment variables to output file env | tee -a env_info.log @@ -80,7 +87,7 @@ task check_terra_env { echo "false" > RUNNING_ON_GCP fi - if grep "true" RUNNING_ON_GCP && grep "true" RUNNING_ON_TERRA; then + if grep --quiet "true" RUNNING_ON_GCP && grep --quiet "true" RUNNING_ON_TERRA; then echo "Running on Terra+GCP" # === Determine Terra workspace ID and submission ID for the workspace responsible for this job @@ -88,7 +95,7 @@ task check_terra_env { # Scrape various workflow / workspace info from the localization and delocalization scripts. # from: https://github.com/broadinstitute/gatk/blob/ah_var_store/scripts/variantstore/wdl/GvsUtils.wdl#L35-L40 WORKSPACE_ID="$(sed -n -E 's!.*gs://fc-(secure-)?([^\/]+).*!\2!p' /cromwell_root/gcs_delocalization.sh | sort -u | tee workspace_id.txt)" - echo "WORKSPACE_ID: ${WORKSPACE_ID}" + echo "WORKSPACE_ID: ${WORKSPACE_ID}" # bucket path prefix #BUCKET_PREFIX="$(sed -n -E 's!.*(gs://(fc-(secure-)?[^\/]+)).*!\1!p' /cromwell_root/gcs_delocalization.sh | sort -u | tee bucket_prefix.txt)" @@ -105,21 +112,23 @@ task check_terra_env { #GOOGLE_PROJECT_ID="$(sed -n -E 's!.*(terra-[0-9a-f]+).*# project to use if requester pays$!\1!p' /cromwell_root/gcs_localization.sh | sort -u)" # ======================================= + GCLOUD_OAUTH_BEARER_TOKEN="$(gcloud auth print-access-token)" + # === request workspace name AND namespace from API, based on bucket path / ID === curl -s -X 'GET' \ "https://api.firecloud.org/api/workspaces/id/${WORKSPACE_ID}?fields=workspace.name%2Cworkspace.namespace%2Cworkspace.googleProject" \ -H 'accept: application/json' \ - -H "Authorization: Bearer $(gcloud auth print-access-token)" > workspace_info.json + -H "Authorization: Bearer $GCLOUD_OAUTH_BEARER_TOKEN" > workspace_info.json - WORKSPACE_NAME="$(jq -cr '.workspace.name | select (.!=null)' workspace_info.json)" + WORKSPACE_NAME="$(jq -cr '.workspace.name | select (.!=null)' workspace_info.json | tee workspace_name.txt)" WORKSPACE_NAME_URL_ENCODED="$(jq -rn --arg x "${WORKSPACE_NAME}" '$x|@uri')" - WORKSPACE_NAMESPACE="$(jq -cr '.workspace.namespace | select (.!=null)' workspace_info.json)" - WORKSPACE_BUCKET="gs://${WORKSPACE_ID}" + WORKSPACE_NAMESPACE="$(jq -cr '.workspace.namespace | select (.!=null)' workspace_info.json | tee workspace_namespace.txt)" + WORKSPACE_BUCKET="$(echo gs://${WORKSPACE_ID} | tee workspace_bucket_path.txt)" - echo "${WORKSPACE_NAME}" | tee workspace_name.txt - echo "${WORKSPACE_NAMESPACE}" | tee workspace_namespace.txt - echo "${WORKSPACE_BUCKET}" | tee workspace_bucket_path.txt + echo "WORKSPACE_NAME: ${WORKSPACE_NAME}" + echo "WORKSPACE_NAMESPACE: ${WORKSPACE_NAMESPACE}" + echo "WORKSPACE_BUCKET: ${WORKSPACE_BUCKET}" # --- less direct way of obtaining workspace info by matching Terra project ID -- # preserved here for potential utility in obtaining workspace info for other projects/workspaces @@ -127,7 +136,7 @@ task check_terra_env { #curl -s -X 'GET' \ #'https://api.firecloud.org/api/workspaces?fields=workspace.name%2Cworkspace.namespace%2Cworkspace.bucketName%2Cworkspace.googleProject' \ #-H 'accept: application/json' \ - #-H "Authorization: Bearer $(gcloud auth print-access-token)" > workspace_list.json + #-H "Authorization: Bearer $GCLOUD_OAUTH_BEARER_TOKEN" > workspace_list.json # extract workspace name #WORKSPACE_NAME=$(jq -cr '.[] | select( .workspace.googleProject == "'${GOOGLE_PROJECT_ID}'" ).workspace | .name' workspace_list.json) @@ -142,23 +151,49 @@ task check_terra_env { # ======================================= - # === obtain info on job submission inputs (table name, row ID)=== + # === obtain info on job submission inputs (table name, row ID) === touch submission_metadata.json - curl -s -X 'GET' \ + curl -s 'GET' \ "https://api.firecloud.org/api/workspaces/${WORKSPACE_NAMESPACE}/${WORKSPACE_NAME_URL_ENCODED}/submissions/${TOP_LEVEL_SUBMISSION_ID}" \ -H 'accept: application/json' \ - -H "Authorization: Bearer $(gcloud auth print-access-token)" > submission_metadata.json + -H "Authorization: Bearer $GCLOUD_OAUTH_BEARER_TOKEN" > submission_metadata.json - INPUT_TABLE_NAME="$(jq -cr 'if .submissionEntity == null then "" elif (.workflows | length)==1 then .submissionEntity.entityType else [.workflows[].workflowEntity.entityType] | join(",") end' submission_metadata.json)" - INPUT_ROW_ID="$(jq -cr 'if .submissionEntity == null then "" elif (.workflows | length)==1 then .submissionEntity.entityName else [.workflows[].workflowEntity.entityName] | join(",") end' submission_metadata.json)" + INPUT_TABLE_NAME="$(jq -cr 'if .submissionEntity == null then "" elif (.workflows | length)==1 then .submissionEntity.entityType else [.workflows[].workflowEntity.entityType] | join(",") end' submission_metadata.json | tee input_table_name.txt)" + INPUT_ROW_ID="$(jq -cr 'if .submissionEntity == null then "" elif (.workflows | length)==1 then .submissionEntity.entityName else [.workflows[].workflowEntity.entityName] | join(",") end' submission_metadata.json | tee input_row_id.txt)" - echo "$INPUT_TABLE_NAME" | tee input_table_name.txt - echo "$INPUT_ROW_ID" | tee input_row_id.txt + echo "INPUT_TABLE_NAME: $INPUT_TABLE_NAME" + echo "INPUT_ROW_ID: $INPUT_ROW_ID" + # ======================================= + + # === obtain info on workflow version (branch/tag) and source (dockstore, etc.) === + curl -s 'GET' \ + "https://rawls.dsde-prod.broadinstitute.org/api/workspaces/${WORKSPACE_NAMESPACE}/${WORKSPACE_NAME_URL_ENCODED}/submissions/${TOP_LEVEL_SUBMISSION_ID}/configuration" \ + -H 'accept: application/json' \ + -H "Authorization: Bearer $GCLOUD_OAUTH_BEARER_TOKEN" > workflow_version_info.json + + # .methodConfigVersion corresponds to snapshot of input/output config (or a method version stored in Broad methods repo?) + #jq -cr .methodConfigVersion workflow_version_info.json + METHOD_VERSION="$(jq -cr '.methodRepoMethod.methodVersion | select (.!=null)' workflow_version_info.json | tee method_version.txt)" + METHOD_SOURCE="$(jq -cr '.methodRepoMethod.sourceRepo | select (.!=null)' workflow_version_info.json | tee method_source.txt)" + METHOD_PATH="$(jq -cr '.methodRepoMethod.methodPath | select (.!=null)' workflow_version_info.json | tee method_path.txt)" + + echo "METHOD_VERSION: $METHOD_VERSION" + echo "METHOD_SOURCE: $METHOD_SOURCE" + echo "METHOD_PATH: $METHOD_PATH" # ======================================= else echo "Not running on Terra+GCP" fi - + ls -1 /sys + echo "--" + ls -1 /sys/fs + echo "--" + ls -1 /sys/fs/cgroup + echo "-- memory.peak:" + cat /sys/fs/cgroup/memory.peak + echo "--" + #ls -1 /sys/fs/cgroup/memory + echo -n'' "MEM_BYTES: "; { if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } | tee MEM_BYTES >>> output { Boolean is_running_on_terra = read_boolean("RUNNING_ON_TERRA") @@ -169,7 +204,11 @@ task check_terra_env { String workspace_id = read_string("workspace_id.txt") String workspace_name = read_string("workspace_name.txt") String workspace_namespace = read_string("workspace_namespace.txt") - String workspace_bucket_path = read_string("workspace_bucket_path.txt") + String workspace_bucket_path = read_string("workspace_bucket_path.txt") + + String method_version = read_string("method_version.txt") + String method_source = read_string("method_source.txt") + String method_path = read_string("method_path.txt") String input_table_name = read_string("input_table_name.txt") String input_row_id = read_string("input_row_id.txt") @@ -178,6 +217,8 @@ task check_terra_env { File env_info = "env_info.log" File gcloud_config_info = "gcloud_config_info.log" + + Int max_ram_gb = ceil(read_float("MEM_BYTES")/1000000000) } runtime { docker: docker @@ -326,7 +367,13 @@ task create_or_update_sample_tables { String workspace_name String workspace_bucket - String docker = "quay.io/broadinstitute/viral-core:2.2.2" #skip-global-version-pin + Array[String]? raw_reads_unaligned_bams + Array[String]? cleaned_reads_unaligned_bams + + File? meta_by_filename_json + File? meta_by_sample_json + + String docker = "quay.io/broadinstitute/viral-core:2.2.4" #skip-global-version-pin } meta { @@ -335,13 +382,216 @@ task create_or_update_sample_tables { command <<< python3< python list of dicts + def get_entities_to_table(project, workspace, table_name): + table = json.loads(fapi.get_entities(project, workspace, table_name).text) + headers = collections.OrderedDict() + rows = [] + headers[table_name + "_id"] = 0 + for row in table: + outrow = row['attributes'] + for x in outrow.keys(): + headers[x] = 0 + if type(outrow[x]) == dict and set(outrow[x].keys()) == set(('itemsType', 'items')): + outrow[x] = outrow[x]['items'] + outrow[table_name + "_id"] = row['name'] + rows.append(outrow) + return (headers, rows) + + # # populate sample table from run outputs + # In particular, popualte the raw_bam and cleaned_bam columns + + def get_bam_lists_for_flowcell_from_live_table(project, workspace, runID): + # API call to get flowcell_data table + response = fapi.get_entities_tsv(project, workspace, "flowcell", model="flexible") + + # read API response into data frame + df = pd.read_csv(StringIO(response.text), sep="\t", index_col="entity:flowcell_id") + + # create library.tsv data frame (entity:library_id) + cleaned_bams_list = literal_eval(df.cleaned_reads_unaligned_bams[runID]) + raw_bams_list = literal_eval(df.raw_reads_unaligned_bams[runID]) + + return (cleaned_bams_list,raw_bams_list) + + def create_library_to_bam_tsvs(cleaned_bams_list, raw_bams_list, runID): + cleaned_library_id_list = [bam.split("/")[-1].replace(".bam", "").replace(".cleaned", "") for bam in cleaned_bams_list] + df_library_table = pd.DataFrame({"entity:library_id" : cleaned_library_id_list, + "cleaned_bam" : cleaned_bams_list}) + cleaned_library_fname = runID + "_cleaned_bams.tsv" + df_library_table.to_csv(cleaned_library_fname, sep="\t", index=False) + + # create library.tsv data frame (entity:library_id) + raw_library_id_list = [bam.split("/")[-1].replace(".bam", "") for bam in raw_bams_list] + df_library_table = pd.DataFrame({"entity:library_id" : raw_library_id_list, + "raw_bam" : raw_bams_list}) + raw_library_fname = runID + "_raw_bams.tsv" + df_library_table.to_csv(raw_library_fname, sep="\t", index=False) + + return (cleaned_library_fname, raw_library_fname) + + # IF optional input bam file lists passed in + # create tsvs for row insertion based on them + # ELSE + # create tsvs based on data from live data table + # + if ( (os.path.exists(raw_reads_unaligned_bams_list_filepath) and os.path.getsize(raw_reads_unaligned_bams_list_filepath)) > 0 and + (os.path.exists(cleaned_reads_unaligned_bams_list_filepath) and os.path.getsize(cleaned_reads_unaligned_bams_list_filepath)) > 0 ): + print(f"creating library->bam mapping tsv files from file input") + with open(raw_reads_unaligned_bams_list_filepath) as raw_reads_unaligned_bams_list_fp, open(cleaned_reads_unaligned_bams_list_filepath) as cleaned_reads_unaligned_bams_list_fp: + cleaned_bams_list,raw_bams_list = ( cleaned_reads_unaligned_bams_list_fp.read().splitlines(), + raw_reads_unaligned_bams_list_fp.read().splitlines() ) + else: + print(f"creating library->bam mapping tsv files from live table data (no file lists passed in to task)") + cleaned_bams_list,raw_bams_list = get_bam_lists_for_flowcell_from_live_table(workspace_project, workspace_name, flowcell_data_id) + + # call the create_tsv function and save files to data tables + tsv_list = create_library_to_bam_tsvs(cleaned_bams_list, raw_bams_list, flowcell_data_id) + + print (f"wrote outputs to {tsv_list}") + + for tsv in tsv_list: + # ToDo: check whether subject to race condition (and if so, implement via async/promises) + response = fapi.upload_entities_tsv(workspace_project, workspace_name, tsv, model="flexible") + if response.status_code != 200: + print('ERROR UPLOADING: See full error message:') + print(response.text) + else: + print("Upload complete. Check your workspace for new table!") + + # # update sample_set with new set memberships and flowcell metadata + + # columns to copy from flowcell_data to library table + copy_cols = ["sample_original", "spike_in"] + + # API call to get existing sample_set mappings + header, rows = get_entities_to_table(workspace_project, workspace_name, "sample") + df_sample = pd.DataFrame.from_records(rows, columns=header, index="sample_id") + + # API call to get all existing library ids + header, rows = get_entities_to_table(workspace_project, workspace_name, "library") + df_library = pd.DataFrame.from_records(rows, columns=header, index="library_id") + + # if meta_by_filename_json specified and size>0 bytes + # parse as json, pass to for loop below + # else + # get data from live table + if ( len('~{default="" meta_by_filename_json}')>0 and + os.path.getsize('~{meta_by_filename_json}') > 0 ): + + library_meta_dict = {} + with open('~{meta_by_filename_json}',"r") as meta_fp: + library_meta_dict = json.load(meta_fp) + else: + # API call to get flowcell_data table + header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell") + df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id") + library_meta_dict = df_flowcell.meta_by_filename[flowcell_data_id] + + # grab the meta_by_filename values to create new sample->library (sample_set->sample) mappings + sample_to_libraries = {} + for library_id, data in library_meta_dict.items(): + sample_id = data['sample'] + sample_to_libraries.setdefault(sample_id, []) + if library_id in df_library.index: + sample_to_libraries[sample_id].append(library_id) + else: + print (f"missing {library_id} from library table") + + # merge in new sample->library mappings with any pre-existing sample->library mappings + if len(df_sample)>0: + print(df_sample.index) + for sample_id in sample_to_libraries.keys(): + if sample_id in df_sample.index: + print (f"sample_set {sample_id} pre-exists in Terra table, merging with new members") + #sample_set_to_samples[set_id].extend(df_sample_set.samples[set_id]) + already_associated_libraries = [entity["entityName"] for entity in df_sample.libraries[sample_id]] + + print(f"already_associated_libraries {already_associated_libraries}") + print(f"sample_to_libraries[sample_id] {sample_to_libraries[sample_id]}") + continue + sample_to_libraries[sample_id].extend(already_associated_libraries) + # collapse duplicate sample IDs + sample_to_libraries[sample_id] = list(set(sample_to_libraries[sample_id])) + + sample_fname = 'sample_membership.tsv' + with open(sample_fname, 'wt') as outf: + outf.write('entity:sample_id\tlibraries\n') + for sample_id, libraries in sample_to_libraries.items(): + #for library_id in sorted(libraries): + outf.write(f'{sample_id}\t{json.dumps([{"entityType":"library","entityName":library_name} for library_name in libraries])}\n') + + # if meta_by_filename_json specified and size>0 bytes + # parse as json, pass to for loop below + # else + # get data from live table + if ( len('~{default="" meta_by_sample_json}')>0 and + os.path.getsize('~{meta_by_sample_json}') > 0 ): + + sample = {} + with open('~{meta_by_sample_json}',"r") as meta_fp: + meta_by_library_all = json.load(meta_fp) + else: + # API call to get flowcell_data table + header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell") + df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id") + # grab the meta_by_sample values from one row in the flowcell_data table + meta_by_library_all = df_flowcell.meta_by_sample[flowcell_data_id] + + # grab all the library IDs + header, rows = get_entities_to_table(workspace_project, workspace_name, "library") + out_rows = [] + out_header = ['library_id'] + copy_cols + print(f"out_header {out_header}") + for row in rows: + out_row = {'library_id': row['library_id']} + + for sample_id,sample_library_metadata in meta_by_library_all.items(): + if sample_library_metadata["library"] in row['library_id']: + for col in copy_cols: + out_row[col] = sample_library_metadata.get(col, '') + out_rows.append(out_row) + + library_meta_fname = "sample_metadata.tsv" + with open(library_meta_fname, 'wt') as outf: + outf.write("entity:") + writer = csv.DictWriter(outf, out_header, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL) + writer.writeheader() + writer.writerows(out_rows) + + # write them to the Terra table! + for fname in (library_meta_fname,sample_fname): + response = fapi.upload_entities_tsv(workspace_project, workspace_name, fname, model="flexible") + if response.status_code != 200: + print(f'ERROR UPLOADING {fname}: See full error message:') + print(response.text) + else: + print("Upload complete. Check your workspace for new table!") CODE + { if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } > MEM_BYTES >>> runtime { docker: docker @@ -352,5 +602,6 @@ task create_or_update_sample_tables { output { File stdout_log = stdout() File stderr_log = stderr() + Int max_ram_gb = ceil(read_float("MEM_BYTES")/1000000000) } } diff --git a/pipes/WDL/workflows/demux_deplete.wdl b/pipes/WDL/workflows/demux_deplete.wdl index f44f44e6f..e00b6c15c 100644 --- a/pipes/WDL/workflows/demux_deplete.wdl +++ b/pipes/WDL/workflows/demux_deplete.wdl @@ -1,9 +1,10 @@ version 1.0 import "../tasks/tasks_demux.wdl" as demux -import "../tasks/tasks_taxon_filter.wdl" as taxon_filter -import "../tasks/tasks_reports.wdl" as reports import "../tasks/tasks_ncbi.wdl" as ncbi +import "../tasks/tasks_reports.wdl" as reports +import "../tasks/tasks_taxon_filter.wdl" as taxon_filter +import "../tasks/tasks_terra.wdl" as terra workflow demux_deplete { meta { @@ -19,6 +20,7 @@ workflow demux_deplete { String? read_structure Boolean sort_reads=true + Boolean insert_demux_outputs_into_terra_tables=false File? sample_rename_map File? biosample_map @@ -70,6 +72,10 @@ workflow demux_deplete { description: "Output bam files will be sorted by read name.", category: "advanced" } + insert_demux_outputs_into_terra_tables: { + description: "Terra only: if set to 'true', demux output will be used to insert entries in 'library' (per library-lane) and 'sample tables' (referencing one or more libraries per sample ID)", + category: "advanced" + } bmtaggerDbs: { description: "Tool that can discriminate between human and bacterial reads and other reads by using short fragments. Databases must be provided to onset depletion.Sequences in fasta format will be indexed on the fly, pre-bmtagger-indexed databases may be provided as tarballs.", category: "advanced" @@ -167,6 +173,25 @@ workflow demux_deplete { } } + if(insert_demux_outputs_into_terra_tables){ + call terra.check_terra_env + + if(check_terra_env.is_running_on_terra) { + call terra.create_or_update_sample_tables { + input: + flowcell_run_id = illumina_demux.run_info[0]['run_id'], + workspace_name = check_terra_env.workspace_name, + workspace_namespace = check_terra_env.workspace_namespace, + workspace_bucket = check_terra_env.workspace_bucket_path, + + raw_reads_unaligned_bams = flatten(illumina_demux.raw_reads_unaligned_bams), + cleaned_reads_unaligned_bams = select_all(cleaned_bam_passing), + meta_by_filename_json = meta_filename.merged_json, + meta_by_sample_json = meta_sample.merged_json + } + } + } + #### SRA submission prep if(defined(biosample_map)) { call ncbi.sra_meta_prep { diff --git a/pipes/WDL/workflows/dump_gcloud_env_info.wdl b/pipes/WDL/workflows/dump_gcloud_env_info.wdl index 4d16e6de6..44c9bd675 100644 --- a/pipes/WDL/workflows/dump_gcloud_env_info.wdl +++ b/pipes/WDL/workflows/dump_gcloud_env_info.wdl @@ -23,6 +23,10 @@ workflow dump_gcloud_env_info { String workspace_name = check_terra_env.workspace_name String workspace_namespace = check_terra_env.workspace_namespace String workspace_bucket_path = check_terra_env.workspace_bucket_path + + String method_version = check_terra_env.method_version + String method_source = check_terra_env.method_source + String method_path = check_terra_env.method_path String input_table_name = check_terra_env.input_table_name String input_row_id = check_terra_env.input_row_id diff --git a/pipes/WDL/workflows/populate_library_and_sample_tables_from_flowcell.wdl b/pipes/WDL/workflows/populate_library_and_sample_tables_from_flowcell.wdl new file mode 100644 index 000000000..5484f565e --- /dev/null +++ b/pipes/WDL/workflows/populate_library_and_sample_tables_from_flowcell.wdl @@ -0,0 +1,30 @@ +version 1.0 + +import "../tasks/tasks_terra.wdl" as terra + +workflow populate_library_and_sample_tables_from_flowcell { + meta { + description: "Terra only: Populate per-library-lane and per-sample tables from existing demultiplexed flowcell output" + author: "Broad Viral Genomics" + email: "viral-ngs@broadinstitute.org" + allowNestedInputs: true + } + + input { + String flowcell_run_id + } + + # obtain runtime workspace info necessary to read or change data in + # Terra tables of the workspace associated with a job + call terra.check_terra_env + + if(check_terra_env.is_running_on_terra) { + call terra.create_or_update_sample_tables { + input: + flowcell_run_id = flowcell_run_id, + workspace_name = check_terra_env.workspace_name, + workspace_namespace = check_terra_env.workspace_namespace, + workspace_bucket = check_terra_env.workspace_bucket_path + } + } +} \ No newline at end of file