Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve the code related to on-demand-featureview. #4203

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import warnings
from datetime import datetime
from types import FunctionType
from typing import Any, Dict, List, Optional, Type, Union
from typing import Any, Optional, Union

import dill
import pandas as pd
Expand Down Expand Up @@ -62,24 +62,24 @@ class OnDemandFeatureView(BaseFeatureView):
"""

name: str
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
features: list[Field]
source_feature_view_projections: dict[str, FeatureViewProjection]
source_request_sources: dict[str, RequestSource]
feature_transformation: Union[
PandasTransformation, PythonTransformation, SubstraitTransformation
]
mode: str
description: str
tags: Dict[str, str]
tags: dict[str, str]
owner: str

@log_exceptions # noqa: C901
def __init__( # noqa: C901
self,
*,
name: str,
schema: List[Field],
sources: List[
schema: list[Field],
sources: list[
Union[
FeatureView,
RequestSource,
Expand All @@ -93,7 +93,7 @@ def __init__( # noqa: C901
],
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
tags: Optional[dict[str, str]] = None,
owner: str = "",
):
"""
Expand Down Expand Up @@ -124,32 +124,31 @@ def __init__( # noqa: C901
owner=owner,
)

if mode not in {"python", "pandas", "substrait"}:
raise Exception(
f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
self.mode = mode.lower()

if self.mode not in {"python", "pandas", "substrait"}:
raise ValueError(
f"Unknown mode {self.mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
)
else:
self.mode = mode

if not feature_transformation:
if udf:
warnings.warn(
"udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.",
DeprecationWarning,
)
# Note inspecting the return signature won't work with isinstance so this is the best alternative
if mode == "pandas":
if self.mode == "pandas":
feature_transformation = PandasTransformation(udf, udf_string)
elif mode == "python":
elif self.mode == "python":
feature_transformation = PythonTransformation(udf, udf_string)
else:
pass
else:
raise Exception(
raise ValueError(
"OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments"
)

self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.source_request_sources: Dict[str, RequestSource] = {}
self.source_feature_view_projections: dict[str, FeatureViewProjection] = {}
self.source_request_sources: dict[str, RequestSource] = {}
for odfv_source in sources:
if isinstance(odfv_source, RequestSource):
self.source_request_sources[odfv_source.name] = odfv_source
Expand All @@ -163,7 +162,7 @@ def __init__( # noqa: C901
self.feature_transformation = feature_transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
def proto_class(self) -> type[OnDemandFeatureViewProto]:
return OnDemandFeatureViewProto

def __copy__(self):
Expand Down Expand Up @@ -336,7 +335,7 @@ def from_proto(
user_defined_function_proto=backwards_compatible_udf,
)
else:
raise Exception("At least one transformation type needs to be provided")
raise ValueError("At least one transformation type needs to be provided")

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
Expand Down Expand Up @@ -372,18 +371,18 @@ def from_proto(

return on_demand_feature_view_obj

def get_request_data_schema(self) -> Dict[str, ValueType]:
schema: Dict[str, ValueType] = {}
def get_request_data_schema(self) -> dict[str, ValueType]:
schema: dict[str, ValueType] = {}
for request_source in self.source_request_sources.values():
if isinstance(request_source.schema, List):
if isinstance(request_source.schema, list):
new_schema = {}
for field in request_source.schema:
new_schema[field.name] = field.dtype.to_value_type()
schema.update(new_schema)
elif isinstance(request_source.schema, Dict):
elif isinstance(request_source.schema, dict):
schema.update(request_source.schema)
else:
raise Exception(
raise TypeError(
f"Request source schema is not correct type: ${str(type(request_source.schema))}"
)
return schema
Expand All @@ -401,7 +400,10 @@ def transform_ibis(
if not isinstance(ibis_table, Table):
raise TypeError("transform_ibis only accepts ibis.expr.types.Table")

assert type(self.feature_transformation) == SubstraitTransformation
if not isinstance(self.feature_transformation, SubstraitTransformation):
raise TypeError(
"The feature_transformation is not SubstraitTransformation type while calling transform_ibis()."
)

columns_to_cleanup = []
for source_fv_projection in self.source_feature_view_projections.values():
Expand All @@ -423,7 +425,7 @@ def transform_ibis(

transformed_table = transformed_table.drop(*columns_to_cleanup)

rename_columns: Dict[str, str] = {}
rename_columns: dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = self._get_projected_feature_name(feature.name)
Expand Down Expand Up @@ -454,11 +456,9 @@ def transform_arrow(
pa_table = pa_table.append_column(
feature.name, pa_table[full_feature_ref]
)
# pa_table[feature.name] = pa_table[full_feature_ref]
columns_to_cleanup.append(feature.name)
elif feature.name in pa_table.column_names:
# Make sure the full feature name is always present
# pa_table[full_feature_ref] = pa_table[feature.name]
pa_table = pa_table.append_column(
full_feature_ref, pa_table[feature.name]
)
Expand All @@ -469,7 +469,7 @@ def transform_arrow(
)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
rename_columns: dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = self._get_projected_feature_name(feature.name)
Expand All @@ -494,12 +494,12 @@ def transform_arrow(

def transform_dict(
self,
feature_dict: Dict[str, Any], # type: ignore
) -> Dict[str, Any]:
feature_dict: dict[str, Any], # type: ignore
) -> dict[str, Any]:
# we need a mapping from full feature name to short and back to do a renaming
# The simplest thing to do is to make the full reference, copy the columns with the short reference
# and rerun
columns_to_cleanup: List[str] = []
columns_to_cleanup: list[str] = []
for source_fv_projection in self.source_feature_view_projections.values():
for feature in source_fv_projection.features:
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
Expand All @@ -512,7 +512,7 @@ def transform_dict(
feature_dict[full_feature_ref] = feature_dict[feature.name]
columns_to_cleanup.append(str(full_feature_ref))

output_dict: Dict[str, Any] = self.feature_transformation.transform(
output_dict: dict[str, Any] = self.feature_transformation.transform(
feature_dict
)
for feature_name in columns_to_cleanup:
Expand Down Expand Up @@ -542,8 +542,8 @@ def infer_features(self) -> None:
f"Could not infer Features for the feature view '{self.name}'.",
)

def _construct_random_input(self) -> Dict[str, List[Any]]:
rand_dict_value: Dict[ValueType, List[Any]] = {
def _construct_random_input(self) -> dict[str, list[Any]]:
rand_dict_value: dict[ValueType, list[Any]] = {
ValueType.BYTES: [str.encode("hello world")],
ValueType.STRING: ["hello world"],
ValueType.INT32: [1],
Expand Down Expand Up @@ -582,11 +582,11 @@ def _construct_random_input(self) -> Dict[str, List[Any]]:
@staticmethod
def get_requested_odfvs(
feature_refs, project, registry
) -> List["OnDemandFeatureView"]:
) -> list["OnDemandFeatureView"]:
all_on_demand_feature_views = registry.list_on_demand_feature_views(
project, allow_cache=True
)
requested_on_demand_feature_views: List[OnDemandFeatureView] = []
requested_on_demand_feature_views: list[OnDemandFeatureView] = []
for odfv in all_on_demand_feature_views:
for feature in odfv.features:
if f"{odfv.name}:{feature.name}" in feature_refs:
Expand All @@ -597,8 +597,8 @@ def get_requested_odfvs(

def on_demand_feature_view(
*,
schema: List[Field],
sources: List[
schema: list[Field],
sources: list[
Union[
FeatureView,
RequestSource,
Expand All @@ -607,7 +607,7 @@ def on_demand_feature_view(
],
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
tags: Optional[dict[str, str]] = None,
owner: str = "",
):
"""
Expand Down Expand Up @@ -643,9 +643,9 @@ def decorator(user_function):
)
transformation = PandasTransformation(user_function, udf_string)
elif mode == "python":
if return_annotation not in (inspect._empty, Dict[str, Any]):
if return_annotation not in (inspect._empty, dict[str, Any]):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]"
f"return signature for {user_function} is {return_annotation} but should be dict[str, Any]"
)
transformation = PythonTransformation(user_function, udf_string)
elif mode == "substrait":
Expand Down
31 changes: 6 additions & 25 deletions sdk/python/feast/transformation/pandas_transformation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import FunctionType
from typing import Any, Dict, List
from typing import Any

import dill
import pandas as pd
Expand Down Expand Up @@ -28,35 +28,16 @@ def __init__(self, udf: FunctionType, udf_string: str = ""):
self.udf_string = udf_string

def transform_arrow(
self, pa_table: pyarrow.Table, features: List[Field]
self, pa_table: pyarrow.Table, features: list[Field]
) -> pyarrow.Table:
if not isinstance(pa_table, pyarrow.Table):
raise TypeError(
f"pa_table should be type pyarrow.Table but got {type(pa_table).__name__}"
)
output_df = self.udf.__call__(pa_table.to_pandas())
output_df = pyarrow.Table.from_pandas(output_df)
if not isinstance(output_df, pyarrow.Table):
raise TypeError(
f"output_df should be type pyarrow.Table but got {type(output_df).__name__}"
)
return output_df
output_df_pandas = self.udf.__call__(pa_table.to_pandas())
return pyarrow.Table.from_pandas(output_df_pandas)

def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
if not isinstance(input_df, pd.DataFrame):
raise TypeError(
f"input_df should be type pd.DataFrame but got {type(input_df).__name__}"
)
output_df = self.udf.__call__(input_df)
if not isinstance(output_df, pd.DataFrame):
raise TypeError(
f"output_df should be type pd.DataFrame but got {type(output_df).__name__}"
)
return output_df
return self.udf.__call__(input_df)

def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)

output_df: pd.DataFrame = self.transform(df)

return [
Expand Down
20 changes: 6 additions & 14 deletions sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import FunctionType
from typing import Any, Dict, List
from typing import Any

import dill
import pyarrow
Expand All @@ -26,27 +26,19 @@ def __init__(self, udf: FunctionType, udf_string: str = ""):
self.udf_string = udf_string

def transform_arrow(
self, pa_table: pyarrow.Table, features: List[Field]
self, pa_table: pyarrow.Table, features: list[Field]
) -> pyarrow.Table:
raise Exception(
'OnDemandFeatureView mode "python" not supported for offline processing.'
'OnDemandFeatureView with mode "python" does not support offline processing.'
)

def transform(self, input_dict: Dict) -> Dict:
if not isinstance(input_dict, Dict):
raise TypeError(
f"input_dict should be type Dict[str, Any] but got {type(input_dict).__name__}"
)
def transform(self, input_dict: dict) -> dict:
# Ensuring that the inputs are included as well
output_dict = self.udf.__call__(input_dict)
if not isinstance(output_dict, Dict):
raise TypeError(
f"output_dict should be type Dict[str, Any] but got {type(output_dict).__name__}"
)
return {**input_dict, **output_dict}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this missing the return signature?


def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
output_dict: Dict[str, List[Any]] = self.transform(random_input)
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
output_dict: dict[str, list[Any]] = self.transform(random_input)

return [
Field(
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/transformation/substrait_transformation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import FunctionType
from typing import Any, Dict, List
from typing import Any

import dill
import pandas as pd
Expand Down Expand Up @@ -42,7 +42,7 @@ def transform_ibis(self, table):
return self.ibis_function(table)

def transform_arrow(
self, pa_table: pyarrow.Table, features: List[Field] = []
self, pa_table: pyarrow.Table, features: list[Field] = []
) -> pyarrow.Table:
def table_provider(names, schema: pyarrow.Schema):
return pa_table.select(schema.names)
Expand All @@ -56,7 +56,7 @@ def table_provider(names, schema: pyarrow.Schema):

return table

def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)
output_df: pd.DataFrame = self.transform(df)

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/unit/infra/test_inference_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame:
],
mode="python",
)
def python_native_test_view(input_dict: Dict[str, Any]) -> Dict[str, Any]:
output_dict: Dict[str, Any] = {
def python_native_test_view(input_dict: dict[str, Any]) -> dict[str, Any]:
output_dict: dict[str, Any] = {
"output": input_dict["some_date"],
"object_output": str(input_dict["some_date"]),
}
Expand Down
Loading
Loading