Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GWAS Catalog harmonisation of summary statistics #42

Closed
project-defiant opened this issue Oct 15, 2024 · 1 comment
Closed

GWAS Catalog harmonisation of summary statistics #42

project-defiant opened this issue Oct 15, 2024 · 1 comment

Comments

@project-defiant
Copy link
Collaborator

Task is to perform the harmonisation on GWAS Catalog summary statistics synched from the EBI FTP.

The full size of the dataset (on 2024-10-14) was:

❯ gsutil ls 'gs://gwas_catalog_inputs/raw_summary_statistics/**/*h.tsv.gz' | wc -l
   77464

The harmonisation should be run as a google batch job and should encompass two gentropy steps:

The need to rerun the harmonisation on all synced summary statistics arised, because of the addition of sanity_filter in opentargets/gentropy#455

@project-defiant
Copy link
Collaborator Author

project-defiant commented Oct 22, 2024

Post harmonisation benchmark of results

The harmonisation batch jobs were run with those two steps on 2024-10-15.
The code used for the harmonisation can be found in branch

The harmonisation job performed two tasks:

  • harmonisation
  • qc

The harmonisation results can be found in gs://gwas_catalog_inputs/harmonised_summary_statistics/

Tasks:

  1. Check new harmonisation results
  2. Check log files

Randomly choosen summary statistics check

file = "gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90002037"
df = session.spark.read.parquet(file)
df.show()

The output of the summary statistics looks healthy

                                                                                
+------------+---------------+----------+---------+--------------+--------------+---------+-------------+-------------------------------+----------+
|     studyId|      variantId|chromosome| position|pValueMantissa|pValueExponent|     beta|standardError|effectAlleleFrequencyFromSource|sampleSize|
+------------+---------------+----------+---------+--------------+--------------+---------+-------------+-------------------------------+----------+
|GCST90002037|10_89736530_T_C|        10| 89736530|         9.481|            -1| 0.001876|      0.02879|                         0.5731|      2871|
|GCST90002037|1_169207781_G_T|         1|169207781|         2.333|            -1|   0.1353|       0.1135|                         0.0162|      2871|
|GCST90002037|1_208095420_T_C|         1|208095420|         9.993|            -1|-2.586E-5|      0.02952|                         0.3299|      2871|
|GCST90002037|10_51065761_T_A|        10| 51065761|         7.005|            -1|   0.2299|       0.5976|                         5.0E-4|      2871|
|GCST90002037| 1_36913580_A_G|         1| 36913580|         1.197|            -1|   0.2236|       0.1436|                         0.0094|      2871|
|GCST90002037|10_83365038_G_A|        10| 83365038|         2.149|            -1|   0.2971|       0.2395|                         0.0042|      2871|
|GCST90002037|10_76804749_T_C|        10| 76804749|         8.095|            -2|   0.2431|       0.1393|                         0.0111|      2871|
|GCST90002037|1_216153499_A_C|         1|216153499|         9.545|            -1|  0.06595|        1.156|                         2.0E-4|      2871|
|GCST90002037|1_205279309_C_T|         1|205279309|         6.069|            -1| -0.04591|      0.08922|                         0.0251|      2871|
|GCST90002037| 1_73709095_G_A|         1| 73709095|         7.965|            -1|-0.008486|       0.0329|                         0.2436|      2871|
|GCST90002037|1_164457787_T_C|         1|164457787|         5.448|            -1|  -0.1131|       0.1867|                         0.0059|      2871|
|GCST90002037| 1_71061890_G_A|         1| 71061890|          5.96|            -1|  -0.3383|        0.638|                         5.0E-4|      2871|
|GCST90002037|1_247566792_T_C|         1|247566792|         9.943|            -1| 0.003534|       0.4908|                         9.0E-4|      2871|
|GCST90002037| 1_88192464_T_C|         1| 88192464|         3.022|            -1| -0.03897|      0.03777|                         0.8274|      2871|
|GCST90002037|10_32956046_C_T|        10| 32956046|         2.032|            -1|  -0.7677|       0.6031|                         5.0E-4|      2871|
|GCST90002037|1_176384863_G_A|         1|176384863|         6.852|            -1|  0.03723|      0.09183|                         0.0254|      2871|
|GCST90002037|1_173934162_G_A|         1|173934162|         2.053|            -1| -0.04528|      0.03574|                         0.1858|      2871|
|GCST90002037|10_61584982_A_G|        10| 61584982|         7.188|            -1| -0.01035|      0.02875|                         0.5946|      2871|
|GCST90002037|10_15012099_C_A|        10| 15012099|         9.167|            -1| 0.003868|      0.03695|                         0.1789|      2871|
|GCST90002037| 1_36789281_G_A|         1| 36789281|         9.757|            -1|  0.01099|       0.3603|                         0.0014|      2871|
+------------+---------------+----------+---------+--------------+--------------+---------+-------------+-------------------------------+----------+
only showing top 20 rows

Check log files

Dump the harmonisation summaries, to see which logs have to be attended. This needs to be done once. After that the data will be saved to the harmonisation_summary_metrics file.

harmonisation_summary_metrics = "gs://gwas_catalog_inputs/statistics/20241015/harmonisation_summary_metrics.tsv"
input_path = "gs://gwas_catalog_inputs/harmonisation_summary/*/latest/harmonisation.csv"

# Read the input data
session.spark.read.csv(input_path).repartition(1).write.parquet("/tmp/harmonisation_summary.parquet")
s = session.spark.read.parquet("/tmp/harmonisation_summary.parquet")
# Rewrite the data to drop the header rows
data = s.filter(f.col("_c0") != "study")
columns = ["study", "harmonisationExitCode", "qcExitCode", "rawSumstatFile", "rawSumstatFileSize", "rawUnzippedSumstatFileSize"]
data = data.select(*map(lambda val: f.col(val[0]).alias(columns[val[1]]), [(c, idx) for idx, c in enumerate(data.columns)]))
# Save the full harmonisation summary to the flat file
data.toPandas().to_csv(harmonisation_summary_metrics, sep="\t", index=False)
print(f"Harmonisation summary metrics saved to the {harmonisation_summary_metrics}")

Harmonisation summary metrics saved to the gs://gwas_catalog_inputs/statistics/20241015/harmonisation_summary_metrics.tsv

Harmonisation step checks

See the number of studies that failed the harmonisation step with exit code != 0

# Count the number of studies that failed Harmonisation overall
data.filter(f.col("harmonisationExitCode") != 0).cache().show()

This resulted in

+------------+---------------------+----------+--------------------+------------------+--------------------------+
|       study|harmonisationExitCode|qcExitCode|      rawSumstatFile|rawSumstatFileSize|rawUnzippedSumstatFileSize|
+------------+---------------------+----------+--------------------+------------------+--------------------------+
|GCST90444202|                    1|         1|gs://gwas_catalog...|              5.0G|                      null|
+------------+---------------------+----------+--------------------+------------------+--------------------------+

GCST90444202 failed harmonisation due to the too big file to handle by the machines.

The job was reported to fail a single harmonisation in last batch. This study was ingested from FTP on 2024-10-14. The size of the zipped file is ~5G, after unzipping it's ~15G

QC Step checks

See the number of studies that failed the qc step with exit code != 0

# Count the number of studies that failed QC overall
failed_qc_count = data.filter(f.col("qcExitCode") > 0).count()
failed_qc_studies = [d["study"] for d in data.filter(f.col("qcExitCode") != 0).cache().select("study").collect()]
print(f"Sumstats that fail the QC: {failed_qc_count}")
Sumstats that fail the QC: 8166

Extracting errors from logs

Extract the logs for the studies in the failed_qc_count

# see how individual failed logs look like
gsutil cat gs://gwas_catalog_inputs/harmonisation_summary/GCST90013662/latest/harmonisation.log

```bash
[2024.10.14 13:16] Copying raw summary statistics from gs://gwas_catalog_inputs/raw_summary_statistics/GCST90013001-GCST90014000/GCST90013662/harmonised/34315874-GCST90013662-MONDO_0003090.h.tsv.gz to 34315874-GCST90013662-MONDO_0003090.h.tsv.gz
[2024.10.14 13:16] Raw file size 321M
[2024.10.14 13:16] Unzipping 34315874-GCST90013662-MONDO_0003090.h.tsv.gz to 34315874-GCST90013662-MONDO_0003090.h.tsv
[2024.10.14 13:16] Unzipped file size 1.4G
[2024.10.14 13:16] Running harmonisation on 34315874-GCST90013662-MONDO_0003090.h.tsv file
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/14 13:16:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
step:                                                                           
  session:
    start_hail: false
    write_mode: overwrite
    spark_uri: local[*]
    hail_home: [/app/.venv/lib/python3.10/site-packages/hail](https://file+.vscode-resource.vscode-cdn.net/app/.venv/lib/python3.10/site-packages/hail)
    extended_spark_conf:
      spark.jars: https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar
      spark.dynamicAllocation.enabled: 'False'
      spark.driver.memory: 29g
      spark.kryoserializer.buffer.max: 500m
      spark.driver.maxResultSize: 5g
    _target_: gentropy.common.session.Session
  raw_sumstats_path: 34315874-GCST90013662-MONDO_0003090.h.tsv
  out_sumstats_path: gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90013662/
  _target_: gentropy.gwas_catalog_sumstat_preprocess.GWASCatalogSumstatsPreprocessStep
datasets: {}

[2024-10-14 13:18:10,978][py4j.clientserver][INFO] - Closing down clientserver connection
[2024.10.14 13:18] Harmonisation exit code: 0
[2024.10.14 13:18] Running qc on gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90013662/ file
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/14 13:18:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
step:                                                                           
  session:
    start_hail: false
    write_mode: overwrite
    spark_uri: local[*]
    hail_home: [/app/.venv/lib/python3.10/site-packages/hail](https://file+.vscode-resource.vscode-cdn.net/app/.venv/lib/python3.10/site-packages/hail)
    extended_spark_conf:
      spark.jars: https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar
      spark.dynamicAllocation.enabled: 'False'
      spark.driver.memory: 29g
      spark.kryoserializer.buffer.max: 500m
      spark.driver.maxResultSize: 5g
    _target_: gentropy.common.session.Session
  gwas_path: gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90013662/
  output_path: gs://gwas_catalog_inputs/summary_statistics_qc/GCST90013662/
  pval_threshold: 1.0e-08
  _target_: gentropy.sumstat_qc_step.SummaryStatisticsQCStep
datasets: {}

Error executing job with overrides: ['step=summary_statistics_qc', 'step.gwas_path=gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90013662/', 'step.output_path=gs://gwas_catalog_inputs/summary_statistics_qc/GCST90013662/', 'step.pval_threshold=1e-08', 'step.session.write_mode=overwrite', '+step.session.extended_spark_conf={spark.jars:https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar}', '+step.session.extended_spark_conf={spark.dynamicAllocation.enabled:false}', '+step.session.extended_spark_conf={spark.driver.memory:29g}', '+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:500m}', '+step.session.extended_spark_conf={spark.driver.maxResultSize:5g}']
Traceback (most recent call last):
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/instantiate/_instantiate2.py", line 92, in _call_target
    return _target_(*args, **kwargs)
  File "/app/src/gentropy/sumstat_qc_step.py", line 29, in __init__
    gwas = SummaryStatistics.from_parquet(session, path=gwas_path)
  File "/app/src/gentropy/dataset/dataset.py", line 125, in from_parquet
    raise ValueError(f"Parquet file is empty: {path}")
ValueError: Parquet file is empty: gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90013662/

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/app/.venv/lib/python3.10/site-packages/hydra/main.py", line 94, in decorated_main
    _run_hydra(
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/utils.py", line 394, in _run_hydra
    _run_app(
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/utils.py", line 457, in _run_app
    run_and_report(
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/utils.py", line 223, in run_and_report
    raise ex
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/utils.py", line 220, in run_and_report
    return func()
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/utils.py", line 458, in <lambda>
    lambda: hydra.run(
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/hydra.py", line 132, in run
    _ = ret.return_value
  File "/app/.venv/lib/python3.10/site-packages/hydra/core/utils.py", line 260, in return_value
    raise self._return_value
  File "/app/.venv/lib/python3.10/site-packages/hydra/core/utils.py", line 186, in run_job
    ret.return_value = task_function(task_cfg)
  File "/app/src/gentropy/cli.py", line 22, in main
    instantiate(cfg.step)
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/instantiate/_instantiate2.py", line 226, in instantiate
    return instantiate_node(
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/instantiate/_instantiate2.py", line 347, in instantiate_node
    return _call_target(_target_, partial, args, kwargs, full_key)
  File "/app/.venv/lib/python3.10/site-packages/hydra/_internal/instantiate/_instantiate2.py", line 97, in _call_target
    raise InstantiationException(msg) from e
hydra.errors.InstantiationException: Error in call to target 'gentropy.sumstat_qc_step.SummaryStatisticsQCStep':
ValueError('Parquet file is empty: gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90013662/')
full_key: step
[2024-10-14 13:18:29,811][py4j.clientserver][INFO] - Closing down clientserver connection
[2024.10.14 13:18] QC exit code: 1

Read the logs for the studies with failed qc

These studies are contained in the /latest/harmonisation.log path(s)

failed_paths = [f"gs://gwas_catalog_inputs/harmonisation_summary/{study}/latest/harmonisation.log" for study in failed_qc_studies]
df = session.spark.read.text(failed_paths, wholetext=True).cache()
pattern = re.compile(r"error|exception", re.IGNORECASE)
# split the logs by the separator to filter the error code
error_logs = df.withColumn("logs", f.split(f.col("value"), "\n")).cache().withColumn("study", f.regexp_replace(f.regexp_extract(f.col("value"), r"/GCST\d+/", 0), "/", ""))
error_logs_2 = error_logs.withColumn("error", f.filter(f.col("logs"), lambda x: f.regexp_extract(x, r"(ValueError:){1}", 0) != "")).cache()
# extract the studyId
error_logs_3 = error_logs_2.withColumn("error", f.regexp_extract(f.col("error").getItem(0), r"ValueError: Parquet file is empty", 0))
error_logs_3.groupBy("error").count().cache().show(truncate=False)
+---------------------------------+-----+
|error                            |count|
+---------------------------------+-----+
|ValueError: Parquet file is empty|8165 |
|null                             |1    |
+---------------------------------+-----+

Findings

All of 8165 studies reported to fail the QC, due to the fact that harmonisation produced empty parquet file.
One other study failed both (QC and Harmonisation) due to insufficient disk size, that was too small to handle summmary statistics after unzipping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants