Skip to content

Commit

Permalink
add option for create filter set to use sample_info with is_loaded (#…
Browse files Browse the repository at this point in the history
…7434)

* add is_loaded check in query
* add job id to logging
* make sample names optional
* separate queries for vet_new
* fix table creation
  • Loading branch information
ahaessly authored Aug 27, 2021
1 parent 5b3ca7b commit 0cc1f8c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 32 deletions.
31 changes: 18 additions & 13 deletions scripts/variantstore/wdl/GvsPrepareCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ workflow GvsPrepareCallset {
String data_project
String default_dataset
String destination_cohort_table_prefix
File sample_names_to_extract
Boolean localize_sample_names_with_service_account = false
File? sample_names_to_extract

# inputs with defaults
String query_project = data_project
Expand All @@ -26,18 +25,10 @@ workflow GvsPrepareCallset {

String docker_final = select_first([docker, "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_20210806"])

if (localize_sample_names_with_service_account && defined(service_account_json_path)) {
call LocalizeFile {
input:
file = "~{sample_names_to_extract}",
service_account_json_path = select_first([service_account_json_path])
}
}

call PrepareCallsetTask {
input:
destination_cohort_table_prefix = destination_cohort_table_prefix,
sample_names_to_extract = select_first([LocalizeFile.localized_file, sample_names_to_extract]),
sample_names_to_extract = sample_names_to_extract,
query_project = query_project,
query_labels = query_labels,
fq_petvet_dataset = fq_petvet_dataset,
Expand All @@ -63,7 +54,7 @@ task PrepareCallsetTask {

input {
String destination_cohort_table_prefix
File sample_names_to_extract
File? sample_names_to_extract
String query_project
Array[String]? query_labels

Expand All @@ -80,21 +71,35 @@ task PrepareCallsetTask {
Array[String] query_label_args = if defined(query_labels) then prefix("--query_labels ", select_first([query_labels])) else []

String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false'
String use_sample_names_file = if (defined(sample_names_to_extract)) then 'true' else 'false'
String python_option = if (defined(sample_names_to_extract)) then '--sample_names_to_extract sample_names_file' else '--fq_cohort_sample_names ' + fq_sample_mapping_table

parameter_meta {
sample_names_to_extract: {
localization_optional: true
}
}

command <<<
set -e

echo ~{python_option}

if [ ~{has_service_account_file} = 'true' ]; then
gsutil cp ~{service_account_json_path} local.service_account.json
SERVICE_ACCOUNT_STANZA="--sa_key_path local.service_account.json "
fi

if [ ~{use_sample_names_file} = 'true' ]; then
gsutil cp ~{sample_names_to_extract} sample_names_file
fi

python3 /app/create_cohort_extract_data_table.py \
--fq_petvet_dataset ~{fq_petvet_dataset} \
--fq_temp_table_dataset ~{fq_temp_table_dataset} \
--fq_destination_dataset ~{fq_destination_dataset} \
--destination_cohort_table_prefix ~{destination_cohort_table_prefix} \
--sample_names_to_extract ~{sample_names_to_extract} \
~{python_option} \
--query_project ~{query_project} \
~{sep=" " query_label_args} \
--fq_sample_mapping_table ~{fq_sample_mapping_table} \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def dump_job_stats():
bytes_billed = int(0 if job.total_bytes_billed is None else job.total_bytes_billed)
total = total + bytes_billed

print(jobid[0], " <====> Cache Hit:", job.cache_hit, bytes_billed/(1024 * 1024), " MBs")
print(jobid[0], "jobid: (", jobid[1], ") <====> Cache Hit:", job.cache_hit, bytes_billed/(1024 * 1024), " MBs")

print(" Total GBs billed ", total/(1024 * 1024 * 1024), " GBs")

Expand All @@ -63,18 +63,18 @@ def execute_with_retry(label, sql):
job_config = bigquery.QueryJobConfig(labels=job_labels)
query = client.query(sql, job_config=job_config)

print(f"STARTING - {label}")
print(f"STARTING - {label} (jobid: {query.job_id})")
JOB_IDS.add((label, query.job_id))
results = query.result()
print(f"COMPLETED ({time.time() - start} s, {3-len(retry_delay)} retries) - {label}")
print(f"COMPLETED ({time.time() - start} s, {3-len(retry_delay)} retries) - {label} (jobid: {query.job_id})")
return results
except Exception as err:
# if there are no retries left... raise
if (len(retry_delay) == 0):
raise err
else:
t = retry_delay.pop(0)
print(f"Error {err} running query {label}, sleeping for {t}")
print(f"Error {err} running query {label} (jobid: {query.job_id}), sleeping for {t}")
time.sleep(t)

def get_partition_range(i):
Expand Down Expand Up @@ -121,7 +121,8 @@ def get_all_sample_ids(fq_destination_table_samples):

def create_extract_samples_table(fq_destination_table_samples, fq_sample_name_table, fq_sample_mapping_table):
sql = f"CREATE OR REPLACE TABLE `{fq_destination_table_samples}` AS (" \
f"SELECT m.sample_id, m.sample_name FROM `{fq_sample_name_table}` s JOIN `{fq_sample_mapping_table}` m ON (s.sample_name = m.sample_name) )"
f"SELECT m.sample_id, m.sample_name FROM `{fq_sample_name_table}` s JOIN `{fq_sample_mapping_table}` m ON (s.sample_name = m.sample_name) " \
f"WHERE m.is_loaded is TRUE)"

results = execute_with_retry("create extract sample table", sql)
return results
Expand All @@ -146,22 +147,28 @@ def get_subselect(fq_vet_table, samples, id):
# KCIBUL -- grr, should be fixed width
fq_vet_table = f"{fq_pet_vet_dataset}.{VET_TABLE_PREFIX}{i:03}"
if len(partition_samples) > 0:
subs = {}
create_or_insert = f"\nCREATE OR REPLACE TABLE `{fq_temp_table_dataset}.{VET_NEW_TABLE}` {TEMP_TABLE_TTL} AS \n WITH \n" if i == 1 \
else f"\nINSERT INTO `{fq_temp_table_dataset}.{VET_NEW_TABLE}` \n WITH \n"
fq_vet_table = f"{fq_pet_vet_dataset}.{VET_TABLE_PREFIX}{i:03}"
j = 1

for samples in split_lists(partition_samples, 1000):
id = f"{i}_{j}"
subs[id] = get_subselect(fq_vet_table, samples, id)
j = j + 1

sql = f"CREATE OR REPLACE TABLE `{fq_temp_table_dataset}.{VET_NEW_TABLE}` {TEMP_TABLE_TTL} AS \n" + \
"WITH\n" + \
("\n".join(subs.values())) + "\n" \
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"
sql = create_or_insert + ("\n".join(subs.values())) + "\n" + \
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"

print(sql)
print(f"VET Query is {utf8len(sql)/(1024*1024)} MB in length")
results = execute_with_retry("insert vet new table", sql)
return results
print(sql)
print(f"VET Query is {utf8len(sql)/(1024*1024)} MB in length")
if i == 1:
execute_with_retry("create and populate vet new table", sql)
else:
execute_with_retry("populate vet new table", sql)
return



Expand Down Expand Up @@ -202,7 +209,7 @@ def get_pet_subselect(fq_pet_table, samples, id):

if len(partition_samples) > 0:
subs = {}
create_or_insert = f"\nCREATE OR REPLACE TABLE `{fq_temp_table_dataset}.{PET_NEW_TABLE}` {TEMP_TABLE_TTL} AS \n WITH \n" if i == 1 \
create_or_insert = f"\nCREATE OR REPLACE TABLE `{fq_temp_table_dataset}.{PET_NEW_TABLE}` {TEMP_TABLE_TTL} AS \n WITH \n" if i==1 \
else f"\nINSERT INTO `{fq_temp_table_dataset}.{PET_NEW_TABLE}` \n WITH \n"
fq_pet_table = f"{fq_pet_vet_dataset}.{PET_TABLE_PREFIX}{i:03}"
j = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Map<Long, String> getMap() {
}

protected void initializeMaps(TableReference sampleTable, String executionProjectId, boolean printDebugInformation, Optional<String> originTool) {
TableResult queryResults = querySampleTable(sampleTable.getFQTableName(), "", executionProjectId, printDebugInformation, originTool);
TableResult queryResults = querySampleTable(sampleTable.getFQTableName(), "is_loaded is TRUE", executionProjectId, printDebugInformation, originTool);

// Add our samples to our map:
for (final FieldValueList row : queryResults.iterateAll()) {
Expand Down Expand Up @@ -80,7 +80,7 @@ private TableResult querySampleTable(
// Get the query string:
final String sampleListQueryString =
"SELECT " + SchemaUtils.SAMPLE_ID_FIELD_NAME + ", " + SchemaUtils.SAMPLE_NAME_FIELD_NAME +
" FROM `" + fqSampleTableName + "`" + whereClause;
" FROM `" + fqSampleTableName + "` " + whereClause;

Map<String, String> labelForQuery = new HashMap<String, String>();
if (originTool.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private static TableResult submitQueryAndWaitForResults( final BigQuery bigQuery

// Wait for the query to complete.
try {
logger.info("Waiting for query to complete...");
logger.info("Waiting for query " + queryJob.getJobId() + " to complete...");
queryJob = queryJob.waitFor();
}
catch (final InterruptedException ex) {
Expand Down Expand Up @@ -371,7 +371,7 @@ private static TableResult submitQueryAndWaitForResults( final BigQuery bigQuery
result = queryJob.getQueryResults();

long bytesProcessed = ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getTotalBytesProcessed();
logger.info(String.format("%.2f MB actually scanned", bytesProcessed / 1000000.0));
logger.info(String.format("%.2f MB actually scanned for job: %s", bytesProcessed / 1000000.0, queryJob.getJobId()));

}
catch (final InterruptedException ex) {
Expand Down

0 comments on commit 0cc1f8c

Please sign in to comment.