Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validating historical features against reference dataset with "great expectations" profiler #2243

Merged
merged 17 commits into from
Feb 2, 2022
Merged
77 changes: 77 additions & 0 deletions docs/reference/dqm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Data Quality Monitoring

Data Quality Monitoring (DQM) is a Feast module aimed to help users to validate their data with the user-curated set of rules.
Validation could be applied during:
* Historical retrieval (training dataset generation)
* [planned] Writing features into an online store
* [planned] Reading features from an online store

Its goal is to address several complex data problems, namely:
* Data consistency - new training datasets can be significantly different from previous datasets. This might require a change in model architecture.
* Issues/bugs in the upstream pipeline - bugs in upstream pipelines can cause invalid values to overwrite existing valid values in an online store.
* Training/serving skew - distribution shift could significantly decrease the performance of the model.

> To monitor data quality, we check that the characteristics of the tested dataset (aka the tested dataset's profile) are "equivalent" to the characteristics of the reference dataset.
> How exactly profile equivalency should be measured is up to the user.

### Overview

The validation process consists of the following steps:
1. User prepares reference dataset (currently only [saved datasets](../getting-started/concepts/dataset.md) from historical retrieval are supported).
2. User defines profiler function, which should produce profile by given dataset (currently only profilers based on [Great Expectations](https://docs.greatexpectations.io) are allowed).
3. Validation of tested dataset is performed with reference dataset and profiler provided as parameters.

### Preparations
Feast with Great Expectations support can be installed via
```shell
pip install 'feast[ge]'
```

### Dataset profile
Currently, Feast supports only [Great Expectation's](https://greatexpectations.io/) [ExpectationSuite](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/core/expectation_suite/index.html#great_expectations.core.expectation_suite.ExpectationSuite)
as dataset's profile. Hence, the user needs to define a function (profiler) that would receive a dataset and return an [ExpectationSuite](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/core/expectation_suite/index.html#great_expectations.core.expectation_suite.ExpectationSuite).

Great Expectations supports automatic profiling as well as manually specifying expectations:
```python
from great_expectations.dataset import Dataset
from great_expectations.core.expectation_suite import ExpectationSuite

from feast.dqm.profilers.ge_profiler import ge_profiler

@ge_profiler
def automatic_profiler(dataset: Dataset) -> ExpectationSuite:
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler

return UserConfigurableProfiler(
profile_dataset=dataset,
ignored_columns=['conv_rate'],
value_set_threshold='few'
).build_suite()
```
However, from our experience capabilities of automatic profiler are quite limited. So we would recommend crafting your own expectations:
```python
@ge_profiler
def manual_profiler(dataset: Dataset) -> ExpectationSuite:
dataset.expect_column_max_to_be_between("column", 1, 2)
return dataset.get_expectation_suite()
```



### Validating Training Dataset
During retrieval of historical features, `validation_reference` can be passed as a parameter to methods `.to_df(validation_reference=...)` or `.to_arrow(validation_reference=...)` of RetrievalJob.
If parameter is provided Feast will run validation once dataset is materialized. In case if validation successful materialized dataset is returned.
Otherwise, `feast.dqm.errors.ValidationFailed` exception would be raised. It will consist of all details for expectations that didn't pass.

```python
from feast import FeatureStore

fs = FeatureStore(".")

job = fs.get_historical_features(...)
job.to_df(
validation_reference=fs
.get_saved_dataset("my_reference_dataset")
.as_reference(profiler=manual_profiler)
)
```
48 changes: 48 additions & 0 deletions protos/feast/core/ValidationProfile.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright 2021 The Feast Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//


syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "ValidationProfile";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/SavedDataset.proto";

message GEValidationProfiler {
message UserDefinedProfiler {
// The python-syntax function body (serialized by dill)
bytes body = 1;
}

UserDefinedProfiler profiler = 1;
}

message GEValidationProfile {
// JSON-serialized ExpectationSuite object
bytes expectation_suite = 1;
}

message ValidationReference {
SavedDataset dataset = 1;

oneof profiler {
GEValidationProfiler ge_profiler = 2;
}
}
Empty file.
13 changes: 13 additions & 0 deletions sdk/python/feast/dqm/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .profilers.profiler import ValidationReport


class ValidationFailed(Exception):
def __init__(self, validation_report: "ValidationReport"):
self.validation_report = validation_report

@property
def report(self) -> "ValidationReport":
return self.validation_report
Empty file.
162 changes: 162 additions & 0 deletions sdk/python/feast/dqm/profilers/ge_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to folder /dqm/ ? Or would the following sdk/python/feast/profilers/ge_profiler.py be enough ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's doesn't sound very clear what is feast.profilers module for. So I thought feast.dqm would be a useful namespace.

from typing import Any, Callable, Dict, List

import dill
import great_expectations as ge
import numpy as np
import pandas as pd
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import PandasDataset
from great_expectations.profile.base import ProfilerTypeMapping

from feast.dqm.profilers.profiler import (
Profile,
Profiler,
ValidationError,
ValidationReport,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
GEValidationProfile as GEValidationProfileProto,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
GEValidationProfiler as GEValidationProfilerProto,
)


def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
dataset_copy = dataset.copy(deep=True)

for column in dataset.columns:
if dataset.expect_column_values_to_be_in_type_list(
column, type_list=sorted(list(ProfilerTypeMapping.DATETIME_TYPE_NAMES))
).success:
# GE cannot parse Timestamp or other pandas datetime time
dataset_copy[column] = dataset[column].dt.strftime("%Y-%m-%dT%H:%M:%S")

if dataset[column].dtype == np.float32:
# GE converts expectation arguments into native Python float
# This could cause error on comparison => so better to convert to double prematurely
dataset_copy[column] = dataset[column].astype(np.float64)

return dataset_copy


class GEProfile(Profile):
"""
GEProfile is an implementation of abstract Profile for integration with Great Expectations.
It executes validation by applying expectations from ExpectationSuite instance to a given dataset.
"""

expectation_suite: ExpectationSuite

def __init__(self, expectation_suite: ExpectationSuite):
self.expectation_suite = expectation_suite

def validate(self, df: pd.DataFrame) -> "GEValidationReport":
"""
Validate provided dataframe against GE expectation suite.
1. Pandas dataframe is converted into PandasDataset (GE type)
2. Some fixes applied to the data to avoid crashes inside GE (see _prepare_dataset)
3. Each expectation from ExpectationSuite instance tested against resulting dataset

Return GEValidationReport, which parses great expectation's schema into list of generic ValidationErrors.
"""
dataset = PandasDataset(df)

dataset = _prepare_dataset(dataset)

results = ge.validate(
dataset, expectation_suite=self.expectation_suite, result_format="COMPLETE"
)
return GEValidationReport(results)

def to_proto(self):
return GEValidationProfileProto(
expectation_suite=json.dumps(self.expectation_suite.to_json_dict()).encode()
)

@classmethod
def from_proto(cls, proto: GEValidationProfileProto) -> "GEProfile":
return GEProfile(
expectation_suite=ExpectationSuite(**json.loads(proto.expectation_suite))
)

def __repr__(self):
expectations = json.dumps(
[e.to_json_dict() for e in self.expectation_suite.expectations], indent=2
)
return f"<GEProfile with expectations: {expectations}>"


class GEProfiler(Profiler):
"""
GEProfiler is an implementation of abstract Profiler for integration with Great Expectations.
It wraps around user defined profiler that should accept dataset (in a form of pandas dataframe)
and return ExpectationSuite.
"""

def __init__(
self, user_defined_profiler: Callable[[pd.DataFrame], ExpectationSuite]
):
self.user_defined_profiler = user_defined_profiler

def analyze_dataset(self, df: pd.DataFrame) -> Profile:
"""
Generate GEProfile with ExpectationSuite (set of expectations)
from a given pandas dataframe by applying user defined profiler.

Some fixes are also applied to the dataset (see _prepare_dataset function) to make it compatible with GE.

Return GEProfile
"""
dataset = PandasDataset(df)

dataset = _prepare_dataset(dataset)

return GEProfile(expectation_suite=self.user_defined_profiler(dataset))

def to_proto(self):
return GEValidationProfilerProto(
profiler=GEValidationProfilerProto.UserDefinedProfiler(
body=dill.dumps(self.user_defined_profiler, recurse=True)
)
)

@classmethod
def from_proto(cls, proto: GEValidationProfilerProto) -> "GEProfiler":
return GEProfiler(user_defined_profiler=dill.loads(proto.profiler.body))


class GEValidationReport(ValidationReport):
def __init__(self, validation_result: Dict[Any, Any]):
self._validation_result = validation_result

@property
def is_success(self) -> bool:
return self._validation_result["success"]

@property
def errors(self) -> List["ValidationError"]:
return [
ValidationError(
check_name=res.expectation_config.expectation_type,
column_name=res.expectation_config.kwargs["column"],
check_config=res.expectation_config.kwargs,
missing_count=res["result"].get("missing_count"),
missing_percent=res["result"].get("missing_percent"),
)
for res in self._validation_result["results"]
if not res["success"]
]

def __repr__(self):
failed_expectations = [
res.to_json_dict()
for res in self._validation_result["results"]
if not res["success"]
]
return json.dumps(failed_expectations, indent=2)


def ge_profiler(func):
return GEProfiler(user_defined_profiler=func)
88 changes: 88 additions & 0 deletions sdk/python/feast/dqm/profilers/profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import abc
from typing import Any, List, Optional

import pandas as pd


class Profile:
@abc.abstractmethod
def validate(self, dataset: pd.DataFrame) -> "ValidationReport":
"""
Run set of rules / expectations from current profile against given dataset.

Return ValidationReport
"""
...

@abc.abstractmethod
def to_proto(self):
...

@classmethod
@abc.abstractmethod
def from_proto(cls, proto) -> "Profile":
...


class Profiler:
@abc.abstractmethod
def analyze_dataset(self, dataset: pd.DataFrame) -> Profile:
"""
Generate Profile object with dataset's characteristics (with rules / expectations)
from given dataset (as pandas dataframe).
"""
...

@abc.abstractmethod
def to_proto(self):
...

@classmethod
@abc.abstractmethod
def from_proto(cls, proto) -> "Profiler":
...


class ValidationReport:
@property
@abc.abstractmethod
def is_success(self) -> bool:
"""
Return whether validation was successful
"""
...

@property
@abc.abstractmethod
def errors(self) -> List["ValidationError"]:
"""
Return list of ValidationErrors if validation failed (is_success = false)
"""
...


class ValidationError:
check_name: str
column_name: str

check_config: Optional[Any]

missing_count: Optional[int]
missing_percent: Optional[float]

def __init__(
self,
check_name: str,
column_name: str,
check_config: Optional[Any] = None,
missing_count: Optional[int] = None,
missing_percent: Optional[float] = None,
):
self.check_name = check_name
self.column_name = column_name
self.check_config = check_config
self.missing_count = missing_count
self.missing_percent = missing_percent

def __repr__(self):
return f"<ValidationError {self.check_name}:{self.column_name}>"
Loading