Skip to content

Commit

Permalink
feat: logic and airflow pipeline for validation (#730)
Browse files Browse the repository at this point in the history
* refactor: generalised validation logic

* refactor: generalised validation logic

* fix(airflow): fine-tuning DAG for data validation

* fix(validation): study locus uniqueness fixed

* feat: add invalid/valid dataset generation in validation steps/dag (#734)

* fix: does not belong in this PR

* refactor: to be moved to orchestration repo

* docs: respective docs pages for the steps

* docs: ammend docstrings

* revert: maintain dag for now but it should be removed eventually

* feat: validate study_locus dataset to produce valid or invalid df

* feat: adjust DAG to parametrise dataset validation

* fix: duplicated row

* feat: increase abstraction of Dataset validation of rows

* docs: increase clarity of what the function does

* fix: error message

* revert: unintended change

* test: testing dataset filtering by quality flag

---------

Co-authored-by: DSuveges <daniel.suveges@protonmail.com>

* chore: pre-commit auto fixes [...]

---------

Co-authored-by: David Ochoa <ochoa@ebi.ac.uk>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Szymon Szyszkowski <69353402+project-defiant@users.noreply.github.com>
  • Loading branch information
4 people authored Sep 3, 2024
1 parent 93a6e60 commit bb8558c
Show file tree
Hide file tree
Showing 10 changed files with 530 additions and 23 deletions.
5 changes: 5 additions & 0 deletions docs/python_api/steps/study_locus_validation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Study-Locus Validation
---

::: gentropy.study_locus_validation.StudyLocusValidationStep
5 changes: 5 additions & 0 deletions docs/python_api/steps/study_validation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Study Validation
---

::: gentropy.study_validation.StudyValidationStep
96 changes: 96 additions & 0 deletions src/airflow/dags/data_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""DAG to validate study locus and study index datasets."""

from __future__ import annotations

from pathlib import Path

import common_airflow as common

from airflow.models.dag import DAG

CLUSTER_NAME = "otg-validation"

# Input datasets:
STUDY_INDICES = [
"gs://gwas_catalog_data/study_index",
"gs://eqtl_catalogue_data/study_index",
"gs://finngen_data/r10/study_index",
]
STUDY_LOCI = [
"gs://gwas_catalog_data/credible_set_datasets/gwas_catalog_PICSed_curated_associations",
"gs://gwas_catalog_data/credible_set_datasets/gwas_catalog_PICSed_summary_statistics",
"gs://eqtl_catalogue_data/credible_set_datasets/susie",
"gs://finngen_data/r10/credible_set_datasets/finngen_susie_processed",
]
TARGET_INDEX = "gs://genetics_etl_python_playground/releases/24.06/gene_index"
DISEASE_INDEX = "gs://open-targets-pre-data-releases/24.06/output/etl/parquet/diseases"

# Output datasets:
VALIDATED_STUDY = "gs://ot-team/dsuveges/otg-data/validated_study_index"
INVALID_STUDY = f"{VALIDATED_STUDY}_invalid"
INVALID_STUDY_QC = [
"UNRESOLVED_TARGET",
"UNRESOLVED_DISEASE",
"UNKNOWN_STUDY_TYPE",
"DUPLICATED_STUDY",
"NO_GENE_PROVIDED",
]

VALIDATED_STUDY_LOCI = "gs://ot-team/dsuveges/otg-data/validated_credible_set"
INVALID_STUDY_LOCI = f"{VALIDATED_STUDY_LOCI}_invalid"
INVALID_STUDY_LOCUS_QC = [
"DUPLICATED_STUDYLOCUS_ID",
"AMBIGUOUS_STUDY",
"FAILED_STUDY",
"MISSING_STUDY",
"NO_GENOMIC_LOCATION_FLAG",
"COMPOSITE_FLAG",
"INCONSISTENCY_FLAG",
"PALINDROMIC_ALLELE_FLAG",
]

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Study locus and study index validation",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
) as dag:
# Definition of the study index validation step:
validate_studies = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="study_validation",
task_id="study_validation",
other_args=[
f"step.study_index_path={STUDY_INDICES}",
f"step.target_index_path={TARGET_INDEX}",
f"step.disease_index_path={DISEASE_INDEX}",
f"step.valid_study_index_path={VALIDATED_STUDY}",
f"step.invalid_study_index_path={INVALID_STUDY_LOCI}",
f"step.invalid_qc_reasons={INVALID_STUDY_QC}",
],
)

# Definition of the study locus validation step:
validate_study_loci = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="credible_set_validation",
task_id="credible_set_validation",
other_args=[
f"step.study_index_path={VALIDATED_STUDY}",
f"step.study_locus_path={STUDY_LOCI}",
f"step.valid_study_locus_path={VALIDATED_STUDY_LOCI}",
f"step.invalid_study_locus_path={INVALID_STUDY_LOCI}",
f"step.invalid_qc_reasons={INVALID_STUDY_LOCUS_QC}",
],
)

(
common.create_cluster(
CLUSTER_NAME,
master_machine_type="n1-highmem-32",
)
>> common.install_dependencies(CLUSTER_NAME)
>> validate_studies
>> validate_study_loci
# >> common.delete_cluster(CLUSTER_NAME)
)
42 changes: 42 additions & 0 deletions src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,38 @@ class CredibleSetQCConfig(StepConfig):
_target_: str = "gentropy.credible_set_qc.CredibleSetQCStep"


@dataclass
class StudyValidationStepConfig(StepConfig):
"""Configuration of the study index validation step.
The study indices are read from multiple location, therefore we are expecting a list of paths.
"""

study_index_path: list[str] = MISSING
target_index_path: str = MISSING
disease_index_path: str = MISSING
valid_study_index_path: str = MISSING
invalid_study_index_path: str = MISSING
invalid_qc_reasons: list[str] = MISSING
_target_: str = "gentropy.study_validation.StudyValidationStep"


@dataclass
class StudyLocusValidationStepConfig(StepConfig):
"""Configuration of the study index validation step.
The study locus datasets are read from multiple location, therefore we are expecting a list of paths.
"""

study_index_path: str = MISSING
study_locus_path: list[str] = MISSING
valid_study_locus_path: str = MISSING
invalid_study_locus_path: str = MISSING
invalid_qc_reasons: list[str] = MISSING
gwas_significance: float = WindowBasedClumpingStepConfig.gwas_significance
_target_: str = "gentropy.study_locus_validation.StudyLocusValidationStep"


@dataclass
class Config:
"""Application configuration."""
Expand Down Expand Up @@ -544,3 +576,13 @@ def register_config() -> None:
cs.store(
group="step", name="locus_breaker_clumping", node=LocusBreakerClumpingConfig
)
cs.store(
group="step",
name="credible_set_validation",
node=StudyLocusValidationStepConfig,
)
cs.store(
group="step",
name="study_validation",
node=StudyValidationStepConfig,
)
79 changes: 79 additions & 0 deletions src/gentropy/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from functools import reduce
from typing import TYPE_CHECKING, Any

import pyspark.sql.functions as f
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window
from typing_extensions import Self

from gentropy.common.schemas import flatten_schema
Expand Down Expand Up @@ -74,6 +76,24 @@ def get_schema(cls: type[Self]) -> StructType:
"""
pass

@classmethod
def get_QC_column_name(cls: type[Self]) -> str | None:
"""Abstract method to get the QC column name. Assumes None unless overriden by child classes.
Returns:
str | None: Column name
"""
return None

@classmethod
def get_QC_categories(cls: type[Self]) -> list[str]:
"""Method to get the QC categories for this dataset. Returns empty list unless overriden by child classes.
Returns:
list[str]: Column name
"""
return []

@classmethod
def from_parquet(
cls: type[Self],
Expand Down Expand Up @@ -170,6 +190,46 @@ def validate_schema(self: Dataset) -> None:
f"The following fields present differences in their datatypes: {fields_with_different_observed_datatype}."
)

def valid_rows(self: Self, invalid_flags: list[str], invalid: bool = False) -> Self:
"""Filters `Dataset` according to a list of quality control flags. Only `Dataset` classes with a QC column can be validated.
Args:
invalid_flags (list[str]): List of quality control flags to be excluded.
invalid (bool): If True returns the invalid rows, instead of the valids. Defaults to False.
Returns:
Self: filtered dataset.
Raises:
ValueError: If the Dataset does not contain a QC column.
"""
# If the invalid flags are not valid quality checks (enum) for this Dataset we raise an error:
for flag in invalid_flags:
if flag not in self.get_QC_categories():
raise ValueError(
f"{flag} is not a valid QC flag for {type(self).__name__} ({self.get_QC_categories()})."
)

qc_column_name = self.get_QC_column_name()
# If Dataset (class) does not contain QC column we raise an error:
if not qc_column_name:
raise ValueError(
f"{type(self).__name__} objects do not contain a QC column to filter by."
)
else:
column: str = qc_column_name
# If QC column (nullable) is not available in the dataframe we create an empty array:
qc = f.when(f.col(column).isNull(), f.array()).otherwise(f.col(column))

filterCondition = ~f.arrays_overlap(
f.array([f.lit(i) for i in invalid_flags]), qc
)
# Returning the filtered dataset:
if invalid:
return self.filter(~filterCondition)
else:
return self.filter(filterCondition)

def drop_infinity_values(self: Self, *cols: str) -> Self:
"""Drop infinity values from Double typed column.
Expand Down Expand Up @@ -260,3 +320,22 @@ def update_quality_flag(
flag_condition,
f.array_union(qc, f.array(f.lit(flag_text.value))),
).otherwise(qc)

@staticmethod
def flag_duplicates(test_column: Column) -> Column:
"""Return True for duplicated values in column.
Args:
test_column (Column): Column to check for duplicates
Returns:
Column: Column with a boolean flag for duplicates
"""
return (
f.count(test_column).over(
Window.partitionBy(test_column).rowsBetween(
Window.unboundedPreceding, Window.unboundedFollowing
)
)
> 1
)
46 changes: 25 additions & 21 deletions src/gentropy/dataset/study_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import TYPE_CHECKING

from pyspark.sql import functions as f
from pyspark.sql.window import Window

from gentropy.assets import data
from gentropy.common.schemas import parse_spark_schema
Expand Down Expand Up @@ -109,6 +108,24 @@ def get_schema(cls: type[StudyIndex]) -> StructType:
"""
return parse_spark_schema("study_index.json")

@classmethod
def get_QC_column_name(cls: type[StudyIndex]) -> str:
"""Return the name of the quality control column.
Returns:
str: The name of the quality control column.
"""
return "qualityControls"

@classmethod
def get_QC_categories(cls: type[StudyIndex]) -> list[str]:
"""Return the quality control categories.
Returns:
list[str]: The quality control categories.
"""
return [member.value for member in StudyQualityCheck]

@classmethod
def aggregate_and_map_ancestries(
cls: type[StudyIndex], discovery_samples: Column
Expand Down Expand Up @@ -197,7 +214,7 @@ def is_quality_flagged(self: StudyIndex) -> Column:
if "qualityControls" not in self.df.columns:
return f.lit(False)
else:
return f.size(self.df.qualityControls) != 0
return f.size(self.df["qualityControls"]) != 0

def has_summarystats(self: StudyIndex) -> Column:
"""Return a boolean column indicating if a study has harmonized summary statistics.
Expand All @@ -213,30 +230,17 @@ def validate_unique_study_id(self: StudyIndex) -> StudyIndex:
Returns:
StudyIndex: with flagged duplicated studies.
"""
validated_df = (
self.df.withColumn(
"isDuplicated",
f.when(
f.count("studyType").over(
Window.partitionBy("studyId").rowsBetween(
Window.unboundedPreceding, Window.unboundedFollowing
)
)
> 1,
True,
).otherwise(False),
)
.withColumn(
return StudyIndex(
_df=self.df.withColumn(
"qualityControls",
StudyIndex.update_quality_flag(
self.update_quality_flag(
f.col("qualityControls"),
f.col("isDuplicated"),
self.flag_duplicates(f.col("studyId")),
StudyQualityCheck.DUPLICATED_STUDY,
),
)
.drop("isDuplicated")
),
_schema=StudyIndex.get_schema(),
)
return StudyIndex(_df=validated_df, _schema=StudyIndex.get_schema())

def _normalise_disease(
self: StudyIndex,
Expand Down
Loading

0 comments on commit bb8558c

Please sign in to comment.