Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate schemas overlooking the nullability fields #71

Merged
merged 32 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6cc9af1
fix: update configure and gitignore
ireneisdoomed Apr 6, 2023
c57919d
fix: move rsId and concordance check outside the filter function
ireneisdoomed Apr 6, 2023
829ef6b
fix: join `_variant_coordinates_in_ldindex` on `variantId`
ireneisdoomed Apr 6, 2023
512c842
style: rename ld indices location to directory and no extension
ireneisdoomed Apr 6, 2023
b1d56c4
fix: correct attribute names for ld indices
ireneisdoomed Apr 11, 2023
5a8e03a
fix: order ld index by idx and unpersist data
ireneisdoomed Apr 11, 2023
838d4f1
feat: redefine `validate_schema` to avoid nullability issues
ireneisdoomed Apr 11, 2023
0d5c6e9
build: update mypy to 1.2.0
ireneisdoomed Apr 11, 2023
b997962
test: add `TestValidateSchema` suite
ireneisdoomed Apr 11, 2023
f1a3fc1
feat: redefine validate_schema to avoid nullability issues
ireneisdoomed Apr 11, 2023
1f8d9a1
feat: add type checking to validate_schema
ireneisdoomed Apr 12, 2023
f76e62f
test: added `test_validate_schema_different_datatype`
ireneisdoomed Apr 12, 2023
5414538
feat: added flatten_schema function and test
ireneisdoomed Apr 12, 2023
51ad0aa
feat: add support and tests for nested data
ireneisdoomed Apr 12, 2023
ad95698
feat: merge with remote branch
ireneisdoomed Apr 12, 2023
480539b
feat: add support and tests for nested data
ireneisdoomed Apr 12, 2023
27eeb03
Merge branch 'main' into il-schemas
d0choa Apr 12, 2023
079ee76
Merge branch 'il-schemas' of https://github.com/opentargets/genetics_…
d0choa Apr 12, 2023
4051d0d
Revert "fix: update configure and gitignore"
ireneisdoomed Apr 12, 2023
5dda33f
Revert "fix: move rsId and concordance check outside the filter funct…
ireneisdoomed Apr 12, 2023
5f9c7c1
Revert "fix: join `_variant_coordinates_in_ldindex` on `variantId`"
ireneisdoomed Apr 12, 2023
eedf763
Revert "style: rename ld indices location to directory and no extension"
ireneisdoomed Apr 12, 2023
1f6e389
Revert "fix: correct attribute names for ld indices"
ireneisdoomed Apr 12, 2023
aba6bd8
Revert "fix: order ld index by idx and unpersist data"
ireneisdoomed Apr 12, 2023
34f1024
Revert "build: update mypy to 1.2.0"
ireneisdoomed Apr 12, 2023
57310d7
Merge branch 'il-schemas' of https://github.com/opentargets/genetics_…
ireneisdoomed Apr 12, 2023
dec2cf4
fix: `_annotate_sumstats_info` drop duplicated columns before join an…
ireneisdoomed Apr 13, 2023
dce0e76
fix: `_annotate_ancestries` drop default fields before join
ireneisdoomed Apr 13, 2023
fe93cf5
fix: `_annotate_discovery_sample_sizes` drop default fields before join
ireneisdoomed Apr 13, 2023
26d9aea
fix: handle duplicated chrom in v2g generation
ireneisdoomed Apr 13, 2023
d0b3489
feat: add check for duplicated field to `validate_schema`
ireneisdoomed Apr 13, 2023
b22feb3
refactor: drop redundants in tests
ireneisdoomed Apr 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
d0choa marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ outputs/
coverage.xml
docs/assets/schemas/
.cache/
mock_data/
notebooks/wandb
src/wandb
2 changes: 1 addition & 1 deletion config/datasets/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ ld_index: ${datasets.outputs}/ld_index
catalog_study_index: ${datasets.outputs}/catalog_study_index
catalog_study_locus: ${datasets.outputs}/catalog_study_locus
#templates
ld_index_template: ${datasets.outputs}/gnomad_r2.1.1.{POP}.common.ld.variant_indices.parquet
ld_index_template: ${datasets.outputs}/ld_indices/gnomad_r2.1.1.{POP}.common.ld.variant_indices
67 changes: 35 additions & 32 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ flake8-class-attributes-order = "^0.1.3"
ipykernel = "^6.19.0"
flake8-pytest-style = "^1.7.2"
google-cloud-dataproc = "^5.4.1"
mypy = "1.2.0"
d0choa marked this conversation as resolved.
Show resolved Hide resolved

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
36 changes: 35 additions & 1 deletion src/otg/common/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import importlib.resources as pkg_resources
import json

from pyspark.sql.types import StructType
from pyspark.sql.types import ArrayType, StructType

from otg.assets import schemas

Expand All @@ -22,3 +22,37 @@ def parse_spark_schema(schema_json: str) -> StructType:
pkg_resources.read_text(schemas, schema_json, encoding="utf-8")
)
return StructType.fromJson(core_schema)


def flatten_schema(schema: StructType, prefix: str = "") -> list:
"""It takes a Spark schema and returns a list of all fields in the schema once flattened.

Args:
schema: The schema of the dataframe
prefix: The prefix to prepend to the field names.

Returns:
list: A list of all the columns in the dataframe.

Examples:
>>> from pyspark.sql.types import ArrayType, StringType, StructField, StructType
>>> schema = StructType(
... [
... StructField("studyLocusId", StringType(), False),
... StructField("credibleSet", ArrayType(StructType([StructField("tagVariantId", StringType(), False)])), False)
... ]
... )
>>> df = spark.createDataFrame([("A", [{"tagVariantId": "varA"}]), ("B", [{"tagVariantId": "varB"}])], schema)
>>> flatten_schema(df.schema)
[('studyLocusId', StringType), ('credibleSet', ArrayType(StructType(List(StructField(tagVariantId,StringType,false))),true)), ('credibleSet.tagVariantId', StringType)]
"""
fields = []
for field in schema.fields:
name = f"{prefix}.{field.name}" if prefix else field.name
dtype = field.dataType
fields.append((name, dtype))
if isinstance(dtype, StructType):
fields += flatten_schema(dtype, prefix=name)
elif isinstance(dtype, ArrayType) and isinstance(dtype.elementType, StructType):
fields += flatten_schema(dtype.elementType, prefix=name)
return fields
4 changes: 3 additions & 1 deletion src/otg/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ class GWASCatalogStepConfig:
catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table.
catalog_associations_file (str): Raw GWAS catalog associations file.
variant_annotation_path (str): Input variant annotation path.
ld_populations (list): List of populations to include.
min_r2 (float): Minimum r2 to consider when considering variants within a window.
ld_index_template (str): Template path of the LD matrix index containing `{POP}` where the population is expected
ld_matrix_template (str): Template path of the LD matrix containing `{POP}` where the population is expected
ld_populations (list): List of populations to include.
catalog_studies_out (str): Output GWAS catalog studies path.
catalog_associations_out (str): Output GWAS catalog associations path.
"""
Expand Down
45 changes: 32 additions & 13 deletions src/otg/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING

from otg.common.schemas import flatten_schema

if TYPE_CHECKING:
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
Expand Down Expand Up @@ -64,19 +66,36 @@ def validate_schema(self: Dataset) -> None:
Raises:
ValueError: DataFrame schema is not valid
"""
expected_schema = self._schema # type: ignore[attr-defined]
observed_schema = self._df.schema # type: ignore[attr-defined]
# Observed fields no t in schema
missing_struct_fields = [x for x in observed_schema if x not in expected_schema]
error_message = f"The {missing_struct_fields} StructFields are not included in DataFrame schema: {expected_schema}"
if missing_struct_fields:
raise ValueError(error_message)
expected_schema = self._schema
expected_fields = flatten_schema(expected_schema)
observed_schema = self._df.schema
observed_fields = flatten_schema(observed_schema)

# Unexpected fields in dataset
if unexpected_struct_fields := [
x for x in observed_fields if x not in expected_fields
]:
raise ValueError(
f"The {unexpected_struct_fields} fields are not included in DataFrame schema: {expected_fields}"
)

# Required fields not in dataset
required_fields = [x for x in expected_schema if not x.nullable]
missing_required_fields = [
x for x in required_fields if x not in observed_schema
required_fields = [
(x.name, x.dataType) for x in expected_schema if not x.nullable
]
error_message = f"The {missing_required_fields} StructFields are required but missing from the DataFrame schema: {expected_schema}"
if missing_required_fields:
raise ValueError(error_message)
if missing_required_fields := [
x for x in required_fields if x not in observed_fields
]:
raise ValueError(
f"The {missing_required_fields} fields are required but missing: {required_fields}"
)

# Fields with different datatype
if fields_with_different_observed_datatype := [
field
for field in set(observed_fields)
if observed_fields.count(field) != expected_fields.count(field)
]:
raise ValueError(
f"The following fields present differences in their datatypes: {fields_with_different_observed_datatype}"
)
4 changes: 2 additions & 2 deletions src/otg/dataset/ld_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

@dataclass
class LDIndex(Dataset):
"""Dataset to index access to LD information from GnomAD."""
"""Dataset to access to LD information from GnomAD."""

_schema: StructType = parse_spark_schema("ld_index.json")

Expand Down Expand Up @@ -175,6 +175,7 @@ def create(
"alternateAllele"
),
)
# Convert gnomad position to Ensembl position (1-based for indels)
.withColumn(
"position",
convert_gnomad_position_to_ensembl(
Expand All @@ -195,7 +196,6 @@ def create(
)
.withColumn("start_idx", f.lit(None).cast(t.LongType()))
.withColumn("stop_idx", f.lit(None).cast(t.LongType()))
# Convert gnomad position to Ensembl position (1-based for indels)
.repartition(400, "chromosome")
.sortWithinPartitions("position")
.persist()
Expand Down
Loading