From 0ef35457d957d8b40c78df1ea9f0cd676c6786a1 Mon Sep 17 00:00:00 2001 From: Szymon Szyszkowski <69353402+project-defiant@users.noreply.github.com> Date: Fri, 25 Oct 2024 14:43:16 +0100 Subject: [PATCH] feat(gwas catalog sumstats): finemapping (#51) * feat(gwas catalog sumstats): dag for locus breaker * feat: updated config * docs(gwas catalog): sumstat processing * perf(gwas catalog): lb clumping insufficient partition size * perf(locus_breaker): spark configuration update * docs(gwas_catalog): update * feat(finemapping): gwas catalog sumstats * feat(finemapping): manifest cache * docs(gwas catalog): susie dataset * feat: sequential write * feat: caching of finemapped loci * chore: change branch back to dev --------- Co-authored-by: Szymon Szyszkowski --- docs/datasources/gwas_catalog_data/README.md | 339 ++++++++++++++++-- .../gwas_catalog_sumstats_pics.svg | 113 ++++++ .../gwas_catalog_sumstats_susie_clumping.svg | 56 +++ ...was_catalog_sumstats_susie_finemapping.svg | 32 ++ .../gwas_catalog_top_hits.svg | 95 +++++ docs/gwas_catalog_harmonisation.svg | 80 ----- docs/gwas_catalog_preprocess.svg | 218 ----------- docs/gwas_curation_update.svg | 44 --- .../gwas_catalog_sumstats_susie_clumping.yaml | 45 +++ ...as_catalog_sumstats_susie_finemapping.yaml | 29 ++ .../dags/config/ukb_ppp_eur_finemapping.yaml | 2 +- .../gwas_catalog_sumstats_susie_clumping.py | 41 +++ ...gwas_catalog_sumstats_susie_finemapping.py | 48 +++ .../operators/batch/finemapping.py | 78 ++-- src/ot_orchestration/utils/path.py | 11 +- tests/test_io_manager.py | 17 +- 16 files changed, 834 insertions(+), 414 deletions(-) create mode 100644 docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_pics.svg create mode 100644 docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_clumping.svg create mode 100644 docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_finemapping.svg create mode 100644 docs/datasources/gwas_catalog_data/gwas_catalog_top_hits.svg delete mode 100644 docs/gwas_catalog_harmonisation.svg delete mode 100644 docs/gwas_catalog_preprocess.svg delete mode 100644 docs/gwas_curation_update.svg create mode 100644 src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_clumping.yaml create mode 100644 src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_finemapping.yaml create mode 100644 src/ot_orchestration/dags/gwas_catalog_sumstats_susie_clumping.py create mode 100644 src/ot_orchestration/dags/gwas_catalog_sumstats_susie_finemapping.py diff --git a/docs/datasources/gwas_catalog_data/README.md b/docs/datasources/gwas_catalog_data/README.md index a774708..4e3b2da 100644 --- a/docs/datasources/gwas_catalog_data/README.md +++ b/docs/datasources/gwas_catalog_data/README.md @@ -1,72 +1,333 @@ # GWAS Catalog data source -This document was updated on 2024-09-11 +This document was updated on 2024-10-18 -Data stored under `gs://gwas_catalog_data` bucket comes with following structure: +Data stored under 4 buckets: + +- `gs://gwas_catalog_inputs` +- `gs://gwas_catalog_sumstats_pics` +- `gs://gwas_catalog_sumstats_susie` +- `gs://gwas_catalog_top_hits` + +## GWAS Catalog inputs + +Bucket `gs://gwas_catalog_inputs` contains: ``` -gs://gwas_catalog_data/credible_set_datasets/ -gs://gwas_catalog_data/curated_inputs/ -gs://gwas_catalog_data/harmonised_summary_statistics/ -gs://gwas_catalog_data/manifests/ -gs://gwas_catalog_data/raw_summary_statistics/ -gs://gwas_catalog_data/study_index/ -gs://gwas_catalog_data/study_locus_datasets/ +gs://gwas_catalog_inputs/gwas_catalog_associations_ontology_annotated.tsv +gs://gwas_catalog_inputs/gwas_catalog_download_ancestries.tsv +gs://gwas_catalog_inputs/gwas_catalog_download_studies.tsv +gs://gwas_catalog_inputs/harmonisation_manifest.csv +gs://gwas_catalog_inputs/harmonisation_summary/ +gs://gwas_catalog_inputs/harmonised_summary_statistics/ +gs://gwas_catalog_inputs/raw_summary_statistics/ +gs://gwas_catalog_inputs/summary_statistics_qc/ +gs://gwas_catalog_inputs/statistics/ ``` -## raw_summary_statistics +### raw_summary_statistics This directory contains summary statistics in the form of harmonised (by GWAS Catalog) gzipped tsv files that are synced directly from the [GWAS Catalog FTP server](https://ftp.ebi.ac.uk/pub/databases/gwas/summary_statistics/) by a cron job. -## curated_inputs +### harmonised_summary_statistics + +This directory contains outputs from the Open Targets inhouse ETL harmonisation process described in [GWAS Catalog harmonisation dag](https://github.com/opentargets/orchestration/blob/dev/src/ot_orchestration/dags/gwas_catalog_sumstat_harmonisation.py). The result is the [SummaryStatistics dataset](https://opentargets.github.io/gentropy/python_api/datasets/summary_statistics/) saved in parquet format per summary statistics input file. + +### summary_statistics_qc + +This directory contains outputs from the Open Targets inhouse ETL harmonisation process described in [GWAS Catalog harmonisation dag](https://github.com/opentargets/orchestration/blob/dev/src/ot_orchestration/dags/gwas_catalog_sumstat_harmonisation.py). The result is the [summary statistics QC dataset](https://github.com/opentargets/gentropy/blob/dev/src/gentropy/sumstat_qc_step.py) saved in the csv format per summary statistics input file. + +### harmonisation_manifest.csv + +The `harmonisation_manifest.csv` is the file that is generated before the harmonisation is performed. This is the input file to all google batch jobs that are used to perform Harmonisation and Quality Checks on raw summary statistics. Each row of the manifest represents a single GWAS Catalog study. + +The file describes following fields: + +- study - study identifier +- rawSumstatPath - input path for the harmonisation +- harmonisedSumstatPath - output path of the harmonised study +- qcPath - output path of the qc results performed on the harmonised study +- isHarmonised - boolean flag depicting if the `harmonisedSumstatPath` has been found +- qcPerformed - boolean flag depicting if the `qcPath` has been found + +The manifest file gets generated each time the harmonisation dag is triggered and the manifest get's updated with the up-to-date state of the staging bucket. + +As the QC step can fail without producing any meaningful metrics (the most common reason is due to empty harmonised summary statistics parquet), the study that fails the QC is considered +as failing. + +> [!NOTE] +> Rescuing these summary statistics that fail to produce a QC output currently has to be done manually. This can be achieved with deep look into the `harmonisation logs and individual study summaries`. See the `harmonisation_summary` for more details. + +
+ Expand to see the example of manifest file + +``` +rawSumstatPath,study,harmonisedSumstatPath,isHarmonised,qcPath,qcPerformed +gs://gwas_catalog_inputs/raw_summary_statistics/GCST000001-GCST001000/GCST000028/harmonised/17463246-GCST000028-EFO_0001360.h.tsv.gz,GCST000028,gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST000028/,True,gs://gwas_catalog_inputs/summary_statistics_qc/GCST000028/,True +``` + +
+ +### harmonisation_summary + +This is the dataset containing meta information about the status of finemapping. This directory contains per summary statistics file metrics in two files: + +- harmonisation_summary (harmonisation.csv) +- harmonisation_log (harmonisation.log) + +The files are stored under the per study directory in the form like below: + +``` +gs://gwas_catalog_inputs/harmonisation_summary/GCST90077749/202410141529/harmonisation.csv +gs://gwas_catalog_inputs/harmonisation_summary/GCST90077749/202410141529/harmonisation.log +gs://gwas_catalog_inputs/harmonisation_summary/GCST90077749/latest/harmonisation.csv +gs://gwas_catalog_inputs/harmonisation_summary/GCST90077749/latest/harmonisation.log +``` + +inside the `dated` and `latest` subdirectories, so subsequent harmonisation runs do not overwrite the existing information. + +#### harmonisation.csv + +This is the result of the harmonisation script metadata collection. + +The file reports following metrics: + +- study - study identifier +- harmonisationExitCode - the return value of the harmonisation spark job performed during the harmonisation script execution. Can be 0 or int, in case it's not 0, this metrics reports that harmonisation task failed for this study. +- qcExitCode - the return value of the qc spark job performed during the harmonisation script execution. Can be 0 or int, in case it's not 0, this metrics reports that qc task failed for this study. +- rawSumstatFile - input to the harmonisation +- rawSumstatFileSize - size of the raw file before unzipping +- rawUnzippedSumstatFileSize - size of the raw file after unzipping - if this metric is empty, this could mean that the unzipping produced too big file to handle for the google batch task executor resulting in job failure. + +
+ Expand to see the example + +``` +study,harmonisationExitCode,qcExitCode,rawSumstatFile,rawSumstatFileSize,rawUnzippedSumstatFileSize +GCST90077749,0,1,gs://gwas_catalog_inputs/raw_summary_statistics/GCST90077001-GCST90078000/GCST90077749/harmonised/34662886-GCST90077749-EFO_1001919.h.tsv.gz,18M,62M +``` + +
+ +#### harmonisation.log + +This file contains logs from the harmonisation script collected during it's execution. In case of any issues found in the `harmonisation.csv`, the best way to uncover what happened is to see directly into this file content and look for the `Error` or `Exception`. This file contains entire configuration as well as all the error logs. + +
+ Expand to see the example + +``` +[2024.10.14 15:33] Copying raw summary statistics from gs://gwas_catalog_inputs/raw_summary_statistics/GCST90078001-GCST90079000/GCST90079000/harmonised/GCST90079000.h.tsv.gz to GCST90079000.h.tsv.gz +[2024.10.14 15:34] Raw file size 17M +[2024.10.14 15:34] Unzipping GCST90079000.h.tsv.gz to GCST90079000.h.tsv +[2024.10.14 15:34] Unzipped file size 74M +[2024.10.14 15:34] Running harmonisation on GCST90079000.h.tsv file +Setting default log level to "WARN". +To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). +24/10/14 15:34:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +step: + session: + start_hail: false + write_mode: overwrite + spark_uri: local[*] + hail_home: /app/.venv/lib/python3.10/site-packages/hail + extended_spark_conf: + spark.jars: https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar + spark.dynamicAllocation.enabled: 'False' + spark.driver.memory: 16g + spark.kryoserializer.buffer.max: 500m + spark.driver.maxResultSize: 5g + _target_: gentropy.common.session.Session + raw_sumstats_path: GCST90079000.h.tsv + out_sumstats_path: gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90079000/ + _target_: gentropy.gwas_catalog_sumstat_preprocess.GWASCatalogSumstatsPreprocessStep +datasets: {} + +[2024-10-14 15:34:57,389][py4j.clientserver][INFO] - Closing down clientserver connection +[2024.10.14 15:34] Harmonisation exit code: 0 +[2024.10.14 15:34] Running qc on gs://gwas*catalog_inputs/harmonised_summary_statistics/GCST90079000/ file +Setting default log level to "WARN". +To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). +24/10/14 15:35:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +step: +session: +start_hail: false +write_mode: overwrite +spark_uri: local[*] +hail_home: /app/.venv/lib/python3.10/site-packages/hail +extended_spark_conf: +spark.jars: https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar +spark.dynamicAllocation.enabled: 'False' +spark.driver.memory: 16g +spark.kryoserializer.buffer.max: 500m +spark.driver.maxResultSize: 5g +\_target*: gentropy.common.session.Session +gwas*path: gs://gwas_catalog_inputs/harmonised_summary_statistics/GCST90079000/ +output_path: gs://gwas_catalog_inputs/summary_statistics_qc/GCST90079000/ +pval_threshold: 1.0e-08 +\_target*: gentropy.sumstat_qc_step.SummaryStatisticsQCStep +datasets: {} + +[2024-10-14 15:35:53,153][py4j.clientserver][INFO] - Closing down clientserver connection +[2024.10.14 15:35] QC exit code: 0 + +``` + +
+ +### statistics + +This directory contains various analysis performed on harmonisation results. + +## GWAS Catalog top hits + +Bucket `gs://gwas_catalog_top_hits` contains: + +``` +gs://gwas_catalog_top_hits/credible_sets/ +gs://gwas_catalog_top_hits/study_index/ +gs://gwas_catalog_top_hits/study_locus_ld_clumped/ +gs://gwas_catalog_top_hits/study_locus_window_based_clumped/ +``` + +Data contained in the `top_hits` corresponds to the [GWAS Catalog top associations](https://www.ebi.ac.uk/gwas/docs/methods/criteria#:~:text=statistics%20here.-,Top%20associations,-We%20also%20report) + +Data is produced by **gwas_catalog_top_hits DAG**. The dag configuration and topology can be found in `gwas_catalog_top_hits.yaml` file under the config directory. + +DAG contains 3 steps: + +![gwas_catalog_top_hits](gwas_catalog_top_hits.svg) + +1. gwas_catalog_top_hits_ingestion +2. ld_based_clumping +3. pics + +### gwas_catalog_top_hits_ingestion + +The step runs on the dataproc cluster that is used to generate the [study_index dataset](https://opentargets.github.io/gentropy/python_api/datasets/study_index/) and perform [window_based_clumping](https://opentargets.github.io/gentropy/python_api/methods/clumping/) on top associations. The datasets are saved under `gs://gwas_catalog_top_hits/study_index/` and `gs://gwas_catalog_top_hits/study_locus_window_based_clumped/`. + +### ld_based_clumping + +The step that performs [ld_clumping](https://opentargets.github.io/gentropy/python_api/methods/clumping/) on the results from the previous step. The results from this step are saved under `gs://gwas_catalog_top_hits/study_locus_ld_clumped/`. + +### pics + +The step that performs [PICS finemapping](https://opentargets.github.io/gentropy/python_api/methods/pics/) based on the ld clumped study locus dataset to obtain the dataset containing credible sets. This dataset is stored under `gs://gwas_catalog_top_hits/credible_sets/` + +## GWAS Catalog pics summary statistics + +Bucket `gs://gwas_catalog_sumstats_pics` contains: + +``` +gs://gwas_catalog_sumstats_pics/credible_sets/ +gs://gwas_catalog_sumstats_pics/study_index/ +gs://gwas_catalog_sumstats_pics/study_locus_ld_clumped/ +gs://gwas_catalog_sumstats_pics/study_locus_window_based_clumped/ +``` + +Data contained in the `pics summary statistics` corresponds to the GWAS Catalog summary statistics harmonised to the Open Targets [SummaryStatistics dataset](https://opentargets.github.io/gentropy/python_api/datasets/summary_statistics/). + +Data is produced by **gwas_catalog_sumstats_pics DAG**. The dag configuration and topology can be found in `gwas_catalog_sumstats_pics.yaml` file under the config directory. + +DAG contains 4 steps: + +![gwas_catalog_sumstats_pics](gwas_catalog_sumstats_pics.svg) + +1. gwas_catalog_study_index +2. window_based_clumping +3. ld_based_clumping +4. pics + +### gwas_catalog_study_index + +The step runs on the dataproc cluster that is used to generate the [study_index dataset](https://opentargets.github.io/gentropy/python_api/datasets/study_index/) that is saved under the `gs://gwas_catalog_sumstats_pics/study_index/` + +### window_based_clumping + +The step that performs the [window_based_clumping](https://opentargets.github.io/gentropy/python_api/methods/clumping/) on harmonised summary statistics. The dataset is saved under `gs://gs://gwas_catalog_sumstats_pics/study_locus_ld_clumped/`. + +### ld_based_clumping + +The step that performs [ld_clumping](https://opentargets.github.io/gentropy/python_api/methods/clumping/) on the results from window based clumping step. The results from this step are saved under `gs://gwas_catalog_top_hits/study_locus_ld_clumped/`. + +### pics + +The step that performs [PICS finemapping](https://opentargets.github.io/gentropy/python_api/methods/pics/) based on the ld clumped study locus dataset to obtain the dataset containing credible sets. This dataset is stored under `gs://gwas_catalog_sumstats_pics/credible_sets/` + +## GWAS Catalog susie summary statistics + +Bucket `gs://gwas_catalog_sumstats_susie` contains: + +``` +gs://gwas_catalog_sumstats_susie/credible_set_datasets/ +gs://gwas_catalog_sumstats_susie/finemapping_logs/ +gs://gwas_catalog_sumstats_susie/finemapping_manifests/ +gs://gwas_catalog_sumstats_susie/study_index/ +gs://gwas_catalog_sumstats_susie/study_locus_lb_clumped/ +``` + +Data is produced by 2 dags: + +- **gwas_catalog_sumstats_susie_clumping** + +![gwas_catalog_sumstat_susie_clumping](gwas_catalog_sumstats_susie_clumping.svg) -This directory contains GWAS Catalog metadata input files required to create curation table. The source of the metadata comes from [release directory in GWAS Catalog FTP serve](https://ftp.ebi.ac.uk/pub/databases/gwas/releases/latest/). This include +### gwas_catalog_study_index -- study ancestries -- study metadata -- ontology associations +The step runs on the dataproc cluster that is used to generate the [study_index dataset](https://opentargets.github.io/gentropy/python_api/datasets/study_index/) that is saved under the `gs://gwas_catalog_sumstats_susie/study_index/`. -All described in [downloads](https://www.ebi.ac.uk/gwas/docs/file-downloads) +This step requires following resources to reason about the inclusion or exclusion of the +gwas studies that were harmonised. -List of harmonised files comes from [summary statistics directory in GWAS Catalog FTP server](https://ftp.ebi.ac.uk/pub/databases/gwas/summary_statistics/harmonised_list.txt) +- harmonisation qc results +- manual curation of the gwas studies (not provided by default) -All of the files used for the curation can be downloaded by [gentroutils](https://github.com/opentargets/gentroutils/blob/v0.1.5/src/gentroutils/commands/update_gwas_curation_metadata.py) +The reasoning is captured in the study index metadata fields. -## manifests +### locus_breaker_clumping -This directory contains inputs and outputs of the [GWAS Curation update dag](https://github.com/opentargets/orchestration/blob/dev/src/ot_orchestration/dags/gwas_curation_update.py). +This process performs locus clumping on previously harmonised summary statistics and results in [StudyLocus](https://opentargets.github.io/gentropy/python_api/datasets/study_locus/) dataset stored under `gs://gwas_catalog_sumstats_susie/study_locus_lb_clumped`. -### manual curation process +- **gwas_catalog_sumstats_susie_finemapping** -The main output of the GWAS Curation update dag is the `GWAS_Catalog_study_curation.tsv` file that gets uploaded to the [curation repository](https://raw.githubusercontent.com/opentargets/curation/master/genetics/GWAS_Catalog_study_curation.tsv). +![gwas_catalog_sumstat_susie_clumping](gwas_catalog_sumstats_susie_finemapping.svg) -When data release starts, the file mentioned above is downloaded to the `manifests/` directory and used as a cached input to the curation update dag which appends to it newly synced summary statistics from GWAS Catalog FTP with their metadata. After that the `updated GWAS_Catalog_study_curation.tsv` file is used to perform the manual curation of the new studies. +This dag performs fine mapping with SuSiE-inf algorithm on clumped study loci to obtain [Credible sets](https://opentargets.github.io/gentropy/python_api/datasets/study_locus/). This is expensive process and is run on google batch. -The result of manual curation is used during the next release. +Due to infrastructure, the fine mapping process is divided into a 2-step logic: -The curation process runs per each release, the inputs to the curation process are the outputs from previous curation. +- [x] Generate manifests - `FinemappingBatchJobManifestOperator` +- [x] Execute Finemapping batch job (finemapping step per each manifest) - `FinemappingBatchOperator` -## harmonised_summary_statistics +1. Tasks performed by `FinemappingBatchJobManifestOperator` -This directory contains outputs from the Open Targets inhouse ETL harmonisation process described in [GWAS Catalog harmonisation dag](https://github.com/opentargets/orchestration/blob/dev/src/ot_orchestration/dags/gwas_catalog_harmonisation.py). The result is the [SummaryStatistics dataset](https://opentargets.github.io/gentropy/python_api/datasets/summary_statistics/) saved in parquet format per summary statistics input file. +- Collect all individual loci parquet files +- Partition collected loci into batches with with `max_records_per_chunk` as a limit of the batch size. +- For each batch create a manifest file that will be imputed to the fine mapping gentropy step +- Save the batch manifests to google cloud storage. -## study_index +2. Tasks performed by `FinemappingBatchOperator` -This directory contains the [study_index dataset](https://opentargets.github.io/gentropy/python_api/datasets/study_index/) generated by the [GWAS Catalog preprocess dag](https://github.com/opentargets/orchestration/blob/dev/src/ot_orchestration/dags/gwas_catalog_preprocess.py) +- Execute one google batch job per manifest with `n <= max_records_per_chunk` tasks. +- Each task executes finemapping step on single `StudyLocus` record. -## study_locus_datasets +> [!WARNING] +> For the GWAS Catalog sumstats susie dataset the number of tasks can not exceed 40k due to the +> size of the environment payload that is send to each batch task. +> `400 Request payload size exceeds the limit: 10485760 bytes` -This directory contains clumping results for GWAS Catalog datasets produced by the Open Targets ETL. The methods used are: +3. Collect logs -- [ld_clumping](https://opentargets.github.io/gentropy/python_api/methods/clumping/) -- [window_based_clumping](https://opentargets.github.io/gentropy/python_api/methods/clumping/) +The output of finemapping can be found under the: -The details of the process can be found in [GWAS Catalog preprocess dag](https://github.com/opentargets/orchestration/blob/dev/src/ot_orchestration/dags/gwas_catalog_preprocess.py) +- `gs://gwas_catalog_sumstats_susie/credible_set_datasets/` - fine mapped study loci +- `gs://gwas_catalog_sumstats_susie/finemapping_manifests/` - manifests used during the fine mapping job +- `gs://gwas_catalog_sumstats_susie/finemapping_logs/` - logs from the individual finemapping tasks -## credible_set_datasets +#### Parametrization of google batch finemapping job -This directory contains finemapping results for GWAS Catalog datasets produced by the Open Targets (either manually or by the ETL). This consists of: +The configuration of the google batch infrastructure and individual step parameters can be found in `gwas_catalog_sumstats_susie_finemapping.yaml` file. +To adjust the parameters for google batch infrastructure refer to the `google_batch` block in the node configuration. -- gs://gwas_catalog_data/credible_set_datasets/gwas_catalog_PICSed_curated_associations/ (PICS run only on top hits) -- gs://gwas_catalog_data/credible_set_datasets/gwas_catalog_PICSed_summary_statistics/ (PICS run on full summary statistics) -- gs://gwas_catalog_data/credible_set_datasets/gwas_catalog_susie_summary_statistics/ (SuSiE-inf run on full summary statistics) +> [!WARNING] +> After running the google batch fine mapping job, ensure that the job tasks have succeeded, otherwise the job requires manual curation. diff --git a/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_pics.svg b/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_pics.svg new file mode 100644 index 0000000..ebac88a --- /dev/null +++ b/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_pics.svg @@ -0,0 +1,113 @@ + + + + + + +gwas_catalog_sumstats_pics + +gwas_catalog_sumstats_pics + +cluster_summary_statistics_processing + +summary_statistics_processing + + + +create_cluster + +create_cluster + + + +summary_statistics_processing.upstream_join_id + + + + +create_cluster->summary_statistics_processing.upstream_join_id + + + + + +delete_cluster + +delete_cluster + + + +summary_statistics_processing.gwas_catalog_study_index + +gwas_catalog_study_index + + + +summary_statistics_processing.upstream_join_id->summary_statistics_processing.gwas_catalog_study_index + + + + + +summary_statistics_processing.window_based_clumping + +window_based_clumping + + + +summary_statistics_processing.upstream_join_id->summary_statistics_processing.window_based_clumping + + + + + +summary_statistics_processing.downstream_join_id + + + + +summary_statistics_processing.downstream_join_id->delete_cluster + + + + + +summary_statistics_processing.gwas_catalog_study_index->summary_statistics_processing.downstream_join_id + + + + + +summary_statistics_processing.ld_based_clumping + +ld_based_clumping + + + +summary_statistics_processing.pics + +pics + + + +summary_statistics_processing.ld_based_clumping->summary_statistics_processing.pics + + + + + +summary_statistics_processing.pics->summary_statistics_processing.downstream_join_id + + + + + +summary_statistics_processing.window_based_clumping->summary_statistics_processing.ld_based_clumping + + + + + diff --git a/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_clumping.svg b/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_clumping.svg new file mode 100644 index 0000000..e66f383 --- /dev/null +++ b/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_clumping.svg @@ -0,0 +1,56 @@ + + + + + + +gwas_catalog_sumstats_susie_clumping + +gwas_catalog_sumstats_susie_clumping + + +create_cluster + +create_cluster + + + +gwas_catalog_study_index + +gwas_catalog_study_index + + + +create_cluster->gwas_catalog_study_index + + + + + +delete_cluster + +delete_cluster + + + +locus_breaker_clumping + +locus_breaker_clumping + + + +gwas_catalog_study_index->locus_breaker_clumping + + + + + +locus_breaker_clumping->delete_cluster + + + + + diff --git a/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_finemapping.svg b/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_finemapping.svg new file mode 100644 index 0000000..a70446d --- /dev/null +++ b/docs/datasources/gwas_catalog_data/gwas_catalog_sumstats_susie_finemapping.svg @@ -0,0 +1,32 @@ + + + + + + +gwas_catalog_sumstats_susie_finemapping + +gwas_catalog_sumstats_susie_finemapping + + +finemapping_batch_job + +finemapping_batch_job + + + +generate_manifests + +generate_manifests + + + +generate_manifests->finemapping_batch_job + + + + + diff --git a/docs/datasources/gwas_catalog_data/gwas_catalog_top_hits.svg b/docs/datasources/gwas_catalog_data/gwas_catalog_top_hits.svg new file mode 100644 index 0000000..5d0afbf --- /dev/null +++ b/docs/datasources/gwas_catalog_data/gwas_catalog_top_hits.svg @@ -0,0 +1,95 @@ + + + + + + +gwas_catalog_top_hits + +gwas_catalog_top_hits + +cluster_top_hits_processing + +top_hits_processing + + + +create_cluster + +create_cluster + + + +top_hits_processing.upstream_join_id + + + + +create_cluster->top_hits_processing.upstream_join_id + + + + + +delete_cluster + +delete_cluster + + + +top_hits_processing.gwas_catalog_top_hit_ingestion + +gwas_catalog_top_hit_ingestion + + + +top_hits_processing.upstream_join_id->top_hits_processing.gwas_catalog_top_hit_ingestion + + + + + +top_hits_processing.downstream_join_id + + + + +top_hits_processing.downstream_join_id->delete_cluster + + + + + +top_hits_processing.ld_based_clumping + +ld_based_clumping + + + +top_hits_processing.gwas_catalog_top_hit_ingestion->top_hits_processing.ld_based_clumping + + + + + +top_hits_processing.pics + +pics + + + +top_hits_processing.ld_based_clumping->top_hits_processing.pics + + + + + +top_hits_processing.pics->top_hits_processing.downstream_join_id + + + + + diff --git a/docs/gwas_catalog_harmonisation.svg b/docs/gwas_catalog_harmonisation.svg deleted file mode 100644 index f93094e..0000000 --- a/docs/gwas_catalog_harmonisation.svg +++ /dev/null @@ -1,80 +0,0 @@ - - - - - - -gwas_catalog_harmonisation - -gwas_catalog_harmonisation - - -create_cluster - -create_cluster - - - -install_dependencies - -install_dependencies - - - -create_cluster->install_dependencies - - - - - -create_to_do_list - -create_to_do_list - - - -create_to_do_list->create_cluster - - - - - -submit_jobs - -submit_jobs - - - -install_dependencies->submit_jobs - - - - - -list_harmonised_parquet - -list_harmonised_parquet - - - -list_harmonised_parquet->create_to_do_list - - - - - -list_raw_harmonised - -list_raw_harmonised - - - -list_raw_harmonised->create_to_do_list - - - - - diff --git a/docs/gwas_catalog_preprocess.svg b/docs/gwas_catalog_preprocess.svg deleted file mode 100644 index e9b42e7..0000000 --- a/docs/gwas_catalog_preprocess.svg +++ /dev/null @@ -1,218 +0,0 @@ - - - - - - -gwas_catalog_preprocess - -gwas_catalog_preprocess - -cluster_curation_processing - -curation_processing - - -cluster_summary_statistics_processing - -summary_statistics_processing - - - -create_cluster - -create_cluster - - - -install_dependencies - -install_dependencies - - - -create_cluster->install_dependencies - - - - - -curation_processing.upstream_join_id - - - - -curation_processing.catalog_curation_inclusion_list - -catalog_curation_inclusion_list - - - -curation_processing.upstream_join_id->curation_processing.catalog_curation_inclusion_list - - - - - -curation_processing.downstream_join_id - - - - -summary_statistics_processing.upstream_join_id - - - - -curation_processing.downstream_join_id->summary_statistics_processing.upstream_join_id - - - - - -curation_processing.ingest_curated_gwas_catalog_data - -ingest_curated_gwas_catalog_data - - - -curation_processing.catalog_curation_inclusion_list->curation_processing.ingest_curated_gwas_catalog_data - - - - - -curation_processing.catalog_curation_ld_clumping - -catalog_curation_ld_clumping - - - -curation_processing.catalog_curation_pics - -catalog_curation_pics - - - -curation_processing.catalog_curation_ld_clumping->curation_processing.catalog_curation_pics - - - - - -curation_processing.catalog_curation_pics->curation_processing.downstream_join_id - - - - - -curation_processing.ingest_curated_gwas_catalog_data->curation_processing.catalog_curation_ld_clumping - - - - - -delete_cluster - -delete_cluster - - - -list_harmonised_parquet - -list_harmonised_parquet - - - -install_dependencies->list_harmonised_parquet - - - - - -uploader - -uploader - - - -list_harmonised_parquet->uploader - - - - - -summary_statistics_processing.catalog_sumstats_inclusion_list - -catalog_sumstats_inclusion_list - - - -summary_statistics_processing.upstream_join_id->summary_statistics_processing.catalog_sumstats_inclusion_list - - - - - -summary_statistics_processing.downstream_join_id - - - - -summary_statistics_processing.downstream_join_id->delete_cluster - - - - - -summary_statistics_processing.catalog_sumstats_window_clumping - -catalog_sumstats_window_clumping - - - -summary_statistics_processing.catalog_sumstats_inclusion_list->summary_statistics_processing.catalog_sumstats_window_clumping - - - - - -summary_statistics_processing.catalog_sumstats_ld_clumping - -catalog_sumstats_ld_clumping - - - -summary_statistics_processing.catalog_sumstats_pics - -catalog_sumstats_pics - - - -summary_statistics_processing.catalog_sumstats_ld_clumping->summary_statistics_processing.catalog_sumstats_pics - - - - - -summary_statistics_processing.catalog_sumstats_pics->summary_statistics_processing.downstream_join_id - - - - - -summary_statistics_processing.catalog_sumstats_window_clumping->summary_statistics_processing.catalog_sumstats_ld_clumping - - - - - -uploader->curation_processing.upstream_join_id - - - - - diff --git a/docs/gwas_curation_update.svg b/docs/gwas_curation_update.svg deleted file mode 100644 index b8af996..0000000 --- a/docs/gwas_curation_update.svg +++ /dev/null @@ -1,44 +0,0 @@ - - - - - - -gwas_curation_update - -gwas_curation_update - - -create_cluster - -create_cluster - - - -install_dependencies - -install_dependencies - - - -create_cluster->install_dependencies - - - - - -gwas_catalog_curation_update - -gwas_catalog_curation_update - - - -install_dependencies->gwas_catalog_curation_update - - - - - diff --git a/src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_clumping.yaml b/src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_clumping.yaml new file mode 100644 index 0000000..39a4c1f --- /dev/null +++ b/src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_clumping.yaml @@ -0,0 +1,45 @@ +dataproc: + python_main_module: gs://genetics_etl_python_playground/initialisation/gentropy/dev/cli.py + cluster_metadata: + PACKAGE: gs://genetics_etl_python_playground/initialisation/gentropy/dev/gentropy-0.0.0-py3-none-any.whl + cluster_init_script: gs://genetics_etl_python_playground/initialisation/gentropy/dev/install_dependencies_on_cluster.sh + cluster_name: otg-gwas-catalog-sumstat-susie-clumping + autoscaling_policy: otg-etl + +nodes: + - id: gwas_catalog_study_index + kind: Task + prerequisites: [] + params: + step: gwas_catalog_study_index + step.catalog_study_files: + - gs://gwas_catalog_inputs/gwas_catalog_download_studies.tsv + step.catalog_ancestry_files: + - gs://gwas_catalog_inputs/gwas_catalog_download_ancestries.tsv + step.study_index_path: gs://gwas_catalog_sumstats_susie/study_index + step.gwas_catalog_study_curation_file: gs://genetics-portal-dev-analysis/yt4/gwas_catalog_curation/20241004_output_curation.tsv + step.sumstats_qc_path: gs://gwas_catalog_inputs/summary_statistics_qc + step.session.write_mode: overwrite + - id: locus_breaker_clumping + kind: Task + prerequisites: + - gwas_catalog_study_index + params: + step: locus_breaker_clumping + step.summary_statistics_input_path: gs://gwas_catalog_inputs/harmonised_summary_statistics/*/*.parquet + step.clumped_study_locus_output_path: gs://gwas_catalog_sumstats_susie/study_locus_lb_clumped + step.lbc_baseline_pvalue: 1.0e-5 + step.lbc_distance_cutoff: 250_000 + step.lbc_pvalue_threshold: 1.0e-8 + step.lbc_flanking_distance: 100_000 + step.large_loci_size: 1_500_000 + step.wbc_clump_distance: 500_000 + step.wbc_pvalue_threshold: 1.0e-8 + step.collect_locus: True + step.remove_mhc: True + # More memory on driver to speed up the partition discovery + # Added spark.sql.sources.parallelPartitionDiscovery.parallelism to allow for discovering ~70k processed sumstats + # FIXME: This step is still not performing smoothly. Most likely due to the fact that the input dataset + # is not read with recursiveFileLookup=True in gentropy, rather glob patterns. + +step.session.extended_spark_conf: "{spark.sql.sources.parallelPartitionDiscovery.parallelism: '100000', spark.driver.memory: '10g'}" + step.session.write_mode: overwrite diff --git a/src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_finemapping.yaml b/src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_finemapping.yaml new file mode 100644 index 0000000..1360628 --- /dev/null +++ b/src/ot_orchestration/dags/config/gwas_catalog_sumstats_susie_finemapping.yaml @@ -0,0 +1,29 @@ +nodes: + - id: generate_manifests + kind: Task + prerequisites: [] + params: + collected_loci_path: gs://gwas_catalog_sumstats_susie/study_locus_lb_clumped + manifest_prefix: gs://gwas_catalog_sumstats_susie/finemapping_manifests + output_path: gs://gwas_catalog_sumstats_susie/credible_set_datasets + log_path: gs://gwas_catalog_sumstats_susie/finemapping_logs + max_records_per_chunk: 40_000 + + - id: finemapping_batch_job + kind: Task + prerequisites: + - generate_manifests + params: + study_index_path: gs://gwas_catalog_sumstats_susie/study_index + google_batch: + entrypoint: /bin/sh + image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/gentropy:dev + resource_specs: + cpu_milli: 4000 + memory_mib: 25000 + boot_disk_mib: 20_000 + task_specs: + max_retry_count: 1 + max_run_duration: "7200s" + policy_specs: + machine_type: n2-highmem-4 diff --git a/src/ot_orchestration/dags/config/ukb_ppp_eur_finemapping.yaml b/src/ot_orchestration/dags/config/ukb_ppp_eur_finemapping.yaml index 42f99c9..9c14a15 100644 --- a/src/ot_orchestration/dags/config/ukb_ppp_eur_finemapping.yaml +++ b/src/ot_orchestration/dags/config/ukb_ppp_eur_finemapping.yaml @@ -26,4 +26,4 @@ nodes: max_retry_count: 5 max_run_duration: "7200s" policy_specs: - machine_type: n1-highmem-4 + machine_type: n2-highmem-4 diff --git a/src/ot_orchestration/dags/gwas_catalog_sumstats_susie_clumping.py b/src/ot_orchestration/dags/gwas_catalog_sumstats_susie_clumping.py new file mode 100644 index 0000000..31e587d --- /dev/null +++ b/src/ot_orchestration/dags/gwas_catalog_sumstats_susie_clumping.py @@ -0,0 +1,41 @@ +"""Airflow DAG to perform locus breaker clumping.""" + +from pathlib import Path + +from airflow.models.dag import DAG + +from ot_orchestration.utils import chain_dependencies, read_yaml_config +from ot_orchestration.utils.common import shared_dag_args, shared_dag_kwargs +from ot_orchestration.utils.dataproc import ( + generate_dataproc_task_chain, + submit_gentropy_step, +) + +config = read_yaml_config( + Path(__file__).parent / "config" / "gwas_catalog_sumstats_susie_clumping.yaml" +) + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics - Clump GWAS Catalog summary statistics with locus breaker", + default_args=shared_dag_args, + **shared_dag_kwargs, +): + tasks = {} + for step in config["nodes"]: + task = submit_gentropy_step( + cluster_name=config["dataproc"]["cluster_name"], + step_name=step["id"], + python_main_module=config["dataproc"]["python_main_module"], + params=step["params"], + ) + + tasks[step["id"]] = task + chain_dependencies(nodes=config["nodes"], tasks_or_task_groups=tasks) + + dag = generate_dataproc_task_chain( + cluster_name=config["dataproc"]["cluster_name"], + cluster_init_script=config["dataproc"]["cluster_init_script"], + cluster_metadata=config["dataproc"]["cluster_metadata"], + tasks=[t for t in tasks.values()], + ) diff --git a/src/ot_orchestration/dags/gwas_catalog_sumstats_susie_finemapping.py b/src/ot_orchestration/dags/gwas_catalog_sumstats_susie_finemapping.py new file mode 100644 index 0000000..432acac --- /dev/null +++ b/src/ot_orchestration/dags/gwas_catalog_sumstats_susie_finemapping.py @@ -0,0 +1,48 @@ +"""Airflow DAG that uses Google Cloud Batch to run the SuSie Finemapper step for gwas catalog sumstats.""" + +from pathlib import Path + +from airflow.models.dag import DAG + +from ot_orchestration.operators.batch.finemapping import ( + FinemappingBatchJobManifestOperator, + FinemappingBatchOperator, +) +from ot_orchestration.utils import ( + chain_dependencies, + find_node_in_config, + read_yaml_config, +) +from ot_orchestration.utils.common import shared_dag_args, shared_dag_kwargs + +config = read_yaml_config( + Path(__file__).parent / "config" / "gwas_catalog_sumstats_susie_finemapping.yaml" +) + + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — Susie Finemap GWAS Catalog summary statistics", + default_args=shared_dag_args, + **shared_dag_kwargs, +): + tasks = {} + + task_config = find_node_in_config(config["nodes"], "generate_manifests") + if task_config: + generate_manifests = FinemappingBatchJobManifestOperator( + task_id=task_config["id"], + **task_config["params"], + ) + tasks[generate_manifests.task_id] = generate_manifests + + task_config = find_node_in_config(config["nodes"], "finemapping_batch_job") + if task_config: + finemapping_job = FinemappingBatchOperator.partial( + task_id=task_config["id"], + study_index_path=task_config["params"]["study_index_path"], + google_batch=task_config["google_batch"], + ).expand(manifest=generate_manifests.output) + tasks[finemapping_job.task_id] = finemapping_job + + chain_dependencies(nodes=config["nodes"], tasks_or_task_groups=tasks) diff --git a/src/ot_orchestration/operators/batch/finemapping.py b/src/ot_orchestration/operators/batch/finemapping.py index 4eee135..7f02987 100644 --- a/src/ot_orchestration/operators/batch/finemapping.py +++ b/src/ot_orchestration/operators/batch/finemapping.py @@ -1,7 +1,6 @@ """Finemapping operators.""" import time -from functools import cached_property from airflow.models.baseoperator import BaseOperator from airflow.providers.google.cloud.operators.cloud_batch import ( @@ -16,7 +15,7 @@ create_task_spec, ) from ot_orchestration.utils.common import GCP_PROJECT_GENETICS, GCP_REGION -from ot_orchestration.utils.path import GCSPath, IOManager, extract_partition_from_blob +from ot_orchestration.utils.path import GCSPath, extract_partition_from_blob class FinemappingBatchJobManifestOperator(BaseOperator): @@ -29,6 +28,7 @@ def __init__( output_path: str, log_path: str, max_records_per_chunk: int = 100_000, + chunk_limit: int = 2, **kwargs, ): self.log.info("Using collected loci from %s", collected_loci_path) @@ -36,22 +36,18 @@ def __init__( self.log.info("The output of the finemapping will be in %s", output_path) self.log.info("The logs of the finemapping will be in %s", log_path) self.collected_loci_path = GCSPath(collected_loci_path) - self.manifest_prefix = manifest_prefix - self.output_path = output_path - self.log_path = log_path + self.manifest_prefix = GCSPath(manifest_prefix) + self.output_path = GCSPath(output_path) + self.log_path = GCSPath(log_path) self.max_records_per_chunk = max_records_per_chunk + self.chunk_limit = chunk_limit super().__init__(**kwargs) def execute(self, context): """Execute the operator.""" return self.generate_manifests_for_finemapping() - @cached_property - def io_manager(self) -> IOManager: - """Property to get the IOManager to load and dump files.""" - return IOManager() - - def _extract_study_locus_ids_from_blobs(self) -> list[str]: + def _extract_study_locus_ids_from_blobs(self) -> set[str]: """Get list of loci from the input Google Storage path. NOTE: This step requires the dataset to be partitioned only by StudyLocusId!! @@ -63,26 +59,50 @@ def _extract_study_locus_ids_from_blobs(self) -> list[str]: client = self.collected_loci_path.client bucket = client.get_bucket(self.collected_loci_path.bucket) blobs = bucket.list_blobs(prefix=self.collected_loci_path.path) - all_study_locus_ids = [ + # Use set to avoid duplicates that comes from the + # multiple parquet files and directory. + all_study_locus_ids = { # ensure that we do not retain the schema of the - extract_partition_from_blob(blob.name) + extract_partition_from_blob(blob.name, with_prefix=False) for blob in blobs if "studyLocusId" in blob.name - ] + } self.log.info("Found %s studyLocusId(s)", len(all_study_locus_ids)) return all_study_locus_ids + def _extract_loci_from_logfiles(self) -> set[str]: + """Get list of loci from the output Google Storage path.""" + self.log.info( + "Extracting studyLocusId from partition names in %s.", self.log_path + ) + client = self.log_path.client + bucket = client.get_bucket(self.log_path.bucket) + blobs = bucket.list_blobs(prefix=self.log_path.path) + self.log.info("prefix: %s", self.log_path.path) + + # NOTE: these blobs are not partitioned, so we need to retain only the StudyLocusId. + # The blobs should be following this convention `credible_set_datasets/${studyLocusId}/_SUCCESS` + all_study_locus_ids = { + blob.name.removeprefix(self.log_path.path) + .removesuffix(".log") + .replace("/", "") + for blob in blobs + if blob.name.endswith(".log") + } + self.log.info( + "Found %s studyLocusId(s) that were finemapped.", len(all_study_locus_ids) + ) + return all_study_locus_ids + def _generate_manifest_rows(self, study_locus_ids: list[str]) -> list[str]: """This method generates a list containing all rows that will be used to generate the manifests.""" self.log.info("Concatenating studyLocusId(s) to create manifest rows.") manifest_rows: list[str] = [] for locus in study_locus_ids: - input_loci_path = f"{self.collected_loci_path}/{locus}" + input_loci_path = f"{self.collected_loci_path}/studyLocusId={locus}" # NOTE: make sure that outputs do not preserve the partitions inside output paths derived from the input loci paths. - output_loci_path = ( - f"{self.output_path}/{locus.removeprefix('studyLocusId=')}" - ) - log_path = f"{self.log_path}/{locus.removeprefix('studyLocusId=')}" + output_loci_path = f"{self.output_path}/{locus}" + log_path = f"{self.log_path}/{locus}" manifest_row = ",".join([input_loci_path, output_loci_path, log_path]) manifest_rows.append(manifest_row) return manifest_rows @@ -105,7 +125,8 @@ def _partition_rows_by_range(self, manifest_rows: list[str]) -> list[list[str]]: lines = ["study_locus_input,study_locus_output,log_output"] + chunk manifest_chunks.append(lines) self.log.info("Example output %s", lines[0:2]) - + if self.chunk_limit: + manifest_chunks = manifest_chunks[: self.chunk_limit] return manifest_chunks def _prepare_batch_task_env( @@ -114,19 +135,22 @@ def _prepare_batch_task_env( """Get the environment that will be used by batch tasks.""" transfer_objects = [] env_objects: list[tuple[int, str, int]] = [] + manifest_generation_date = time.strftime("%Y%m%d%H%M%S") for i, lines in enumerate(manifest_chunks): self.log.info("Amending %s lines for %s manifest", len(lines) - 1, i) text = "\n".join(lines) - manifest_path = f"{self.manifest_prefix}/chunk_{i}" + manifest_path = ( + f"{self.manifest_prefix}/{manifest_generation_date}/chunk_{i}" + ) self.log.info("Writing manifest to %s.", manifest_path) transfer_objects.append((manifest_path, text)) env_objects.append((i, manifest_path, len(lines) - 1)) self.log.info("Writing %s manifests", len(transfer_objects)) - self.io_manager.dump_many( - paths=[t[0] for t in transfer_objects], - objects=[t[1] for t in transfer_objects], - ) + for t in transfer_objects: + self.log.info("Writing manifest to %s.", t[0]) + self.log.info("Example output %s", t[1].split("\n")[0:2]) + GCSPath(t[0]).dump(t[1]) return env_objects def generate_manifests_for_finemapping(self) -> list[tuple[int, str, int]]: @@ -139,7 +163,9 @@ def generate_manifests_for_finemapping(self) -> list[tuple[int, str, int]]: list[(int, str, int)]: List of tuples, where the first value is index of the manifest, the second value is a path to manifest, and the third is the number of records in that manifest. """ all_study_locus_ids = self._extract_study_locus_ids_from_blobs() - manifest_rows = self._generate_manifest_rows(all_study_locus_ids) + finemapped_study_locus_ids = self._extract_loci_from_logfiles() + study_locus_ids = list(all_study_locus_ids - finemapped_study_locus_ids) + manifest_rows = self._generate_manifest_rows(study_locus_ids) manifest_chunks = self._partition_rows_by_range(manifest_rows) environments = self._prepare_batch_task_env(manifest_chunks) return environments diff --git a/src/ot_orchestration/utils/path.py b/src/ot_orchestration/utils/path.py index 0744cd8..040c06c 100644 --- a/src/ot_orchestration/utils/path.py +++ b/src/ot_orchestration/utils/path.py @@ -17,7 +17,7 @@ CHUNK_SIZE = 1024 * 256 MAX_N_THREADS = 32 URI_PATTERN = r"^^((?P.*)://)?(?P[(\w)-]+)/(?P([(\w)-/])+)" -PARTITION_REGEX = r"\w*=\w*" +PARTITION_REGEX = r"\w*=(\w*)" class PathSegments(TypedDict): @@ -461,11 +461,14 @@ class ThreadSafetyError(Exception): pass -def extract_partition_from_blob(blob: storage.Blob | str) -> str: +def extract_partition_from_blob( + blob: storage.Blob | str, with_prefix: bool = True +) -> str: """Extract partition prefix from a Google Cloud Storage Blob. Args: blob (storage.Blob): Google Cloud Storage Blob. + with_prefix (bool): Include prefix in the partition. Defaults to True. Returns: str: Partition prefix. @@ -479,4 +482,6 @@ def extract_partition_from_blob(blob: storage.Blob | str) -> str: _match = re.search(PARTITION_REGEX, name) if not _match: raise ValueError("No partition found in %s", name) - return _match.group(0) + if with_prefix: + return _match.group(0) + return _match.group(1) diff --git a/tests/test_io_manager.py b/tests/test_io_manager.py index d40606f..2e857b9 100644 --- a/tests/test_io_manager.py +++ b/tests/test_io_manager.py @@ -192,25 +192,36 @@ def test_bucket_property(self, gcs_path: str, bucket: str) -> None: @pytest.mark.parametrize( - ["input_blob", "partition"], + ["input_blob", "partition", "with_prefix"], [ pytest.param( "gs://bucket/prefix/partition=123aa/file.parquet", "partition=123aa", + True, id="single partition", ), pytest.param( "gs://bucket/prefix/partition=123aa/otherPartition=123bbb/file.parquet", "partition=123aa", + True, id="only first partition is checked", ), pytest.param( "gs://bucket/prefix/partition=123aa/otherPartition=123bbb/file.parquet", "partition=123aa", + True, id="only first partition is checked", ), + pytest.param( + "gs://bucket/prefix/partition=123aa/otherPartition=123bbb/file.parquet", + "123aa", + False, + id="Return without prefix", + ), ], ) -def test_extract_partition_from_blob(input_blob: str, partition: str) -> None: +def test_extract_partition_from_blob( + input_blob: str, partition: str, with_prefix: bool +) -> None: """Test extracting partition from a blob.""" - assert extract_partition_from_blob(input_blob) == partition + assert extract_partition_from_blob(input_blob, with_prefix) == partition