-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GWAS Catalog harmonisation of summary statistics #42
Comments
Post harmonisation benchmark of resultsThe harmonisation batch jobs were run with those two steps on 2024-10-15. The harmonisation job performed two tasks:
The harmonisation results can be found in Tasks:
Randomly choosen summary statistics checkfile = "gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90002037"
df = session.spark.read.parquet(file)
df.show() The output of the summary statistics looks healthy
Check log filesDump the harmonisation summaries, to see which logs have to be attended. This needs to be done once. After that the data will be saved to the harmonisation_summary_metrics = "gs://gwas_catalog_inputs/statistics/20241015/harmonisation_summary_metrics.tsv"
input_path = "gs://gwas_catalog_inputs/harmonisation_summary/*/latest/harmonisation.csv"
# Read the input data
session.spark.read.csv(input_path).repartition(1).write.parquet("/tmp/harmonisation_summary.parquet")
s = session.spark.read.parquet("/tmp/harmonisation_summary.parquet")
# Rewrite the data to drop the header rows
data = s.filter(f.col("_c0") != "study")
columns = ["study", "harmonisationExitCode", "qcExitCode", "rawSumstatFile", "rawSumstatFileSize", "rawUnzippedSumstatFileSize"]
data = data.select(*map(lambda val: f.col(val[0]).alias(columns[val[1]]), [(c, idx) for idx, c in enumerate(data.columns)]))
# Save the full harmonisation summary to the flat file
data.toPandas().to_csv(harmonisation_summary_metrics, sep="\t", index=False)
print(f"Harmonisation summary metrics saved to the {harmonisation_summary_metrics}") Harmonisation summary metrics saved to the gs://gwas_catalog_inputs/statistics/20241015/harmonisation_summary_metrics.tsv Harmonisation step checksSee the number of studies that failed the harmonisation step with exit code != 0 # Count the number of studies that failed Harmonisation overall
data.filter(f.col("harmonisationExitCode") != 0).cache().show() This resulted in
GCST90444202 failed harmonisation due to the too big file to handle by the machines. The job was reported to fail a single harmonisation in last batch. This study was ingested from FTP on 2024-10-14. The size of the zipped file is ~5G, after unzipping it's ~15G QC Step checksSee the number of studies that failed the qc step with exit code != 0 # Count the number of studies that failed QC overall
failed_qc_count = data.filter(f.col("qcExitCode") > 0).count()
failed_qc_studies = [d["study"] for d in data.filter(f.col("qcExitCode") != 0).cache().select("study").collect()]
print(f"Sumstats that fail the QC: {failed_qc_count}")
Extracting errors from logsExtract the logs for the studies in the
Read the logs for the studies with failed qcThese studies are contained in the failed_paths = [f"gs://gwas_catalog_inputs/harmonisation_summary/{study}/latest/harmonisation.log" for study in failed_qc_studies]
df = session.spark.read.text(failed_paths, wholetext=True).cache()
pattern = re.compile(r"error|exception", re.IGNORECASE)
# split the logs by the separator to filter the error code
error_logs = df.withColumn("logs", f.split(f.col("value"), "\n")).cache().withColumn("study", f.regexp_replace(f.regexp_extract(f.col("value"), r"/GCST\d+/", 0), "/", ""))
error_logs_2 = error_logs.withColumn("error", f.filter(f.col("logs"), lambda x: f.regexp_extract(x, r"(ValueError:){1}", 0) != "")).cache()
# extract the studyId
error_logs_3 = error_logs_2.withColumn("error", f.regexp_extract(f.col("error").getItem(0), r"ValueError: Parquet file is empty", 0))
error_logs_3.groupBy("error").count().cache().show(truncate=False)
FindingsAll of 8165 studies reported to fail the QC, due to the fact that harmonisation produced empty parquet file. |
Task is to perform the harmonisation on GWAS Catalog summary statistics synched from the EBI FTP.
The full size of the dataset (on 2024-10-14) was:
The harmonisation should be run as a google batch job and should encompass two gentropy steps:
GWASCatalogSumstatsPreprocessStep
SummaryStatisticsQCStep
The need to rerun the harmonisation on all synced summary statistics arised, because of the addition of
sanity_filter
in opentargets/gentropy#455The text was updated successfully, but these errors were encountered: