Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ukb_ppp_eur) dags refactoring #39

Merged
merged 10 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ build-airflow-image: generate-requirements ## build local airflow image for the
-f Dockerfile \
--no-cache

upload-gwas-catalog-bucket-readme: ## Upload gwas_catalog_bucket readme to the bucket.
@gsutil cp docs/datasources/gwas_catalog_data/README.md gs://gwas_catalog_data/README.md
upload-ukb-ppp-bucket-readme: ## Upload ukb_ppp_eur_data readme to the bucket
@gsutil rsync docs/datasources/ukb_ppp_eur_data gs://ukb_ppp_eur_data/docs
12 changes: 12 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,13 @@
# Orchestration documentation

This catalog describes how the orchestration works in the current state

### How to generate dag svg files

1. Locate your global `airflow.cfg` file and update the [core] dag_folder in `airflow.cfg` to point to the `src` directory of the orchestration repository or set the `AIRFLOW__CORE__DAGS_FOLDER` environment variable.

2. Run

```bash
poetry run airflow dags show --save docs/${DAG_NAME}.svg ${DAG_NAME}
```
112 changes: 112 additions & 0 deletions docs/datasources/ukb_ppp_eur_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# UK Biobank Pharma Proteomics Project (UKB-PPP)

This document was updated on 2024-10-11

Data source comes from the `https://registry.opendata.aws/ukbppp/`

Data stored under `gs://ukb_ppp_eur_data` bucket comes with following structure

```
gs://ukb_ppp_eur_data/finemapping_manifests/
gs://ukb_ppp_eur_data/harmonised_summary_statistics/
gs://ukb_ppp_eur_data/study_index/
gs://ukb_ppp_eur_data/study_locus_lb_clumped/
gs://ukb_ppp_eur_data/test/
```

## Processing description

## Pre-steps

Full description of the process can be found in https://github.com/opentargets/issues/issues/3234

### 1. Mirror

- **Input.** Original data is hosted on Synapse.
- **Transformation.**
- As we decided in the past, we want to keep the copy of the original data in the Vault.
- Protocol is available here: https://github.com/opentargets/gentropy-vault/blob/main/datasets/ukb-ppp.md.
- The protocol must be run manually.
- **Output.** The output of this step is kept forever in the Vault.

### 2. Preprocess

- **Input.** The mirrored data from the previous step.
- **Transformation.**
- The data which we mirrored during the previous steps has to undergo several specific transformations which aren't achievable in Spark (especially the first one):
- Extract gzipped per-chromosome files from inside the individual TAR files, decompress, partition by chromosome
- Recreate the study ID. This is required because multiple rows in the study index can reuse the same summary stats file
- Drop certain rows which don't have a corresponding summary stats file
- This transformation is done using Google Batch. The code can be found in this new repository: https://github.com/opentargets/gentropy-input-support. The UKB PPP-specific part is this one: https://github.com/opentargets/gentropy-input-support/blob/dc5f8f7aee13a066933f3fd5b18a9b3a5ca71069/data_sources.py#L43-L103.
- The command to run is `./submit.py ukb_ppp_eur` inside the `gentropy-input-support`
- This step must also be triggered manually, how to do this is described in the repository.
- **Output.** Precursors of study index and summary stats datasets are output. Because we decided that we don't want to keep the data twice, the output of this step is only kept temporarily and is deleted after 60 days according to the _gs://gentropy-tmp_ bucket lifecycle rules.

## Orchestration steps

### ukb_ppp_eur_harmonisation dag

**Harmonisation dag** contains two steps:

- raw sumstat preprocessing (ukb_ppp_eur_sumstat_preprocess)
- locus breaker clumping (locus_breaker_clumping)

The configuration of the dataproc infrastructure and individual step parameters can be found in `ukb_ppp_eur_harmonisation.yaml` file.

#### ukb_ppp_eur_sumstat_preprocess

![harmonisation dag](ukb_ppp_eur_harmonisation.svg)

This process **harmonizes the raw pre-processed data** to the [SummaryStatistics](https://opentargets.github.io/gentropy/python_api/datasets/summary_statistics/) and creates the [StudyIndex](https://opentargets.github.io/gentropy/python_api/datasets/study_index/).
The process is runs on **dataproc** cluster.

The outputs are stored in:

- `gs://ukb_ppp_eur_data/study_index` - study index
- `gs://ukb_ppp_eur_data/harmonised_summary_statistics` - summary statistics

#### locus_breaker_clumping

This process performs locus clumping on previously harmonised summary statistics and results in [StudyLocus](https://opentargets.github.io/gentropy/python_api/datasets/study_locus/) dataset stored under `gs://ukb_ppp_eur_data/study_locus_lb_clumped`.

#### Parametrization of dataproc preprocessing jobs

To parametrize the dataproc cluster one need to update the logic inside the `dataproc` block in `ukb_ppp_eur_harmonisation.yaml` file.

### ukb_ppp_eur_finemapping dag

![finemapping dag](ukb_ppp_eur_finemapping.svg)

This dag performs fine mapping with SuSiE-inf algorithm on clumped study loci to obtain [Credible sets](https://opentargets.github.io/gentropy/python_api/datasets/study_locus/). This is expensive process and is run on google batch.

Due to infrastructure, the fine mapping process is divided into a 2-step logic:

- [x] Generate manifests - `FinemappingBatchJobManifestOperator`
- [x] Execute Finemapping batch job (finemapping step per each manifest) - `FinemappingBatchOperator`

![finemapping](finemapping.svg)

1. Tasks performed by `FinemappingBatchJobManifestOperator`

- Collect all individual loci parquet files
- Partition collected loci into batches with with `max_records_per_chunk` as a limit of the batch size.
- For each batch create a manifest file that will be imputed to the fine mapping gentropy step
- Save the batch manifests to google cloud storage.

2. Tasks performed by `FinemappingBatchOperator`

- Execute one google batch job per manifest with `n <= max_records_per_chunk` tasks.
- Each task executes finemapping step on single `StudyLocus` record.

The output of finemapping can be found under the:

- `gs://ukb_ppp_eur_data/credible_set_datasets/` - fine mapped study loci
- `gs://ukb_ppp_eur_data/finemapping_manifests/` - manifests used during the fine mapping job

#### Parametrization of google batch finemapping job

The configuration of the google batch infrastructure and individual step parameters can be found in `ukb_ppp_eur_finemapping.yaml` file.
To adjust the parameters for google batch infrastructure refer to the `google_batch` block in the node configuration.

> [!WARNING]
> After running the google batch fine mapping job, ensure that the job tasks have succeeded, otherwise the job requires manual curation.
13 changes: 13 additions & 0 deletions docs/datasources/ukb_ppp_eur_data/finemapping.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 32 additions & 0 deletions docs/datasources/ukb_ppp_eur_data/ukb_ppp_eur_finemapping.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
56 changes: 56 additions & 0 deletions docs/datasources/ukb_ppp_eur_data/ukb_ppp_eur_harmonisation.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 17 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pre-commit = "^3.7.1"
coverage = "^7.5.4"
psycopg2-binary = "^2.9.9"
interrogate = "^1.7.0"
graphviz = "^0.20.3"


[tool.poetry.group.test.dependencies]
Expand Down
28 changes: 28 additions & 0 deletions src/ot_orchestration/dags/config/ukb_ppp_eur_finemapping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
nodes:
- id: generate_manifests
kind: Task
prerequisites: []
params:
collected_loci_path: gs://ukb_ppp_eur_data/study_locus_lb_clumped
manifest_prefix: gs://ukb_ppp_eur_data/finemapping_manifests
output_path: gs://ukb_ppp_eur_data/credible_set_datasets/susie
max_records_per_chunk: 100_000

- id: finemapping_batch_job
kind: Task
prerequisites:
- generate_manifests
params:
study_index_path: gs://ukb_ppp_eur_data/study_index
google_batch:
entrypoint: /bin/sh
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/gentropy:dev
resource_specs:
cpu_milli: 4000
memory_mib: 25000
boot_disk_mib: 20_000
task_specs:
max_retry_count: 5
max_run_duration: "7200s"
policy_specs:
machine_type: n1-highmem-4
34 changes: 29 additions & 5 deletions src/ot_orchestration/dags/config/ukb_ppp_eur_harmonisation.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,42 @@
dataproc:
python_main_module: gs://genetics_etl_python_playground/initialisation/gentropy/szsz-update-package-for-dataproc-run/cli.py
python_main_module: gs://genetics_etl_python_playground/initialisation/gentropy/dev/cli.py
cluster_metadata:
PACKAGE: gs://genetics_etl_python_playground/initialisation/gentropy/szsz-update-package-for-dataproc-run/gentropy-0.0.0-py3-none-any.whl
cluster_init_script: gs://genetics_etl_python_playground/initialisation/0.0.0/install_dependencies_on_cluster.sh
PACKAGE: gs://genetics_etl_python_playground/initialisation/gentropy/dev/gentropy-0.0.0-py3-none-any.whl
cluster_init_script: gs://genetics_etl_python_playground/initialisation/gentropy/dev/install_dependencies_on_cluster.sh
cluster_name: otg-ukb-ppp-eur
autoscaling_policy: otg-etl

nodes:
- id: ukb_ppp_eur_sumstat_preprocess
kind: Task
prerequisites: []
params:
# NOTE: Check documentation to see how to generate raw input files from source
step: ukb_ppp_eur_sumstat_preprocess
step.raw_study_index_path_from_tsv: gs://gentropy-tmp/batch/output/ukb_ppp_eur/study_index.tsv
step.raw_summary_stats_path: gs://gentropy-tmp/batch/output/ukb_ppp_eur/summary_stats.parquet
step.variant_annotation_path: gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX/variant_annotation
# all other parameters
step.tmp_variant_annotation_path: gs://gentropy-tmp/variant_annotation
step.variant_annotation_path: gs://gnomad_data_2/gnomad_variant_index
step.study_index_output_path: gs://ukb_ppp_eur_data/study_index
step.summary_stats_output_path: gs://ukb_ppp_eur_data/summary_stats
step.summary_stats_output_path: gs://ukb_ppp_eur_data/harmonised_summary_statistics
step.session.write_mode: overwrite

- id: locus_breaker_clumping
kind: Task
prerequisites:
- ukb_ppp_eur_sumstat_preprocess
params:
step: locus_breaker_clumping
step.summary_statistics_input_path: gs://ukb_ppp_eur_data/harmonised_summary_statistics
step.clumped_study_locus_output_path: gs://ukb_ppp_eur_data/study_locus_lb_clumped
step.lbc_baseline_pvalue: 1.0e-5
step.lbc_distance_cutoff: 250_000
step.lbc_pvalue_threshold: 1.7e-11
step.lbc_flanking_distance: 100_000
step.large_loci_size: 1_500_000
step.wbc_clump_distance: 500_000
step.wbc_pvalue_threshold: 1.7e-11
step.collect_locus: True
step.remove_mhc: True
step.session.write_mode: overwrite
2 changes: 1 addition & 1 deletion src/ot_orchestration/dags/genetics_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.task_group import TaskGroup

from ot_orchestration.operators.vep import (
from ot_orchestration.operators.batch.vep import (
ConvertVariantsToVcfOperator,
VepAnnotateOperator,
)
Expand Down
47 changes: 47 additions & 0 deletions src/ot_orchestration/dags/ukb_ppp_eur_finemapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Airflow DAG that uses Google Cloud Batch to run the SuSie Finemapper step for UKB PPP."""

from pathlib import Path

from airflow.models.dag import DAG

from ot_orchestration.operators.batch.finemapping import (
FinemappingBatchJobManifestOperator,
FinemappingBatchOperator,
)
from ot_orchestration.utils import (
chain_dependencies,
find_node_in_config,
read_yaml_config,
)
from ot_orchestration.utils.common import shared_dag_args, shared_dag_kwargs

config = read_yaml_config(
Path(__file__).parent / "config" / "ukb_ppp_eur_finemapping.yaml"
)


with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Susie Finemap UKB PPP (EUR)",
default_args=shared_dag_args,
**shared_dag_kwargs,
):
tasks = {}

task_config = find_node_in_config(config["nodes"], "generate_manifests")
generate_manifests = FinemappingBatchJobManifestOperator(
task_id=task_config["id"],
**task_config["params"],
)

task_config = find_node_in_config(config["nodes"], "finemapping_batch_job")
finemapping_job = FinemappingBatchOperator.partial(
task_id=task_config["id"],
study_index_path=task_config["params"]["study_index_path"],
google_batch=task_config["google_batch"],
).expand(manifest=generate_manifests.output)

tasks[generate_manifests.task_id] = generate_manifests
tasks[finemapping_job.task_id] = finemapping_job

chain_dependencies(nodes=config["nodes"], tasks_or_task_groups=tasks)
Loading