Skip to content

Commit

Permalink
feat: Updating protos to separate transformation (#4018)
Browse files Browse the repository at this point in the history
* feat: updating protos to separate transformation

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* fixed stuff...i think

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated tests and registry diff function

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated base registry

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated react component

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* formatted

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated stream feature view proto

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* making the proto changes backwards compatable

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* trying to make this backwards compatible

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* caught a bug and fixed the linter

* actually linted

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated ui component

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* accidentally commented out fixtures

* Updated

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* incrementing protos

* updated tests

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* fixed linting issue and made backwards compatible

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* added more tests

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

---------

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>
  • Loading branch information
franciscojavierarceo authored Mar 24, 2024
1 parent e815562 commit c58ef74
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 232 deletions.
14 changes: 11 additions & 3 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import "feast/core/FeatureView.proto";
import "feast/core/FeatureViewProjection.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Transformation.proto";

message OnDemandFeatureView {
// User-specified specifications of this feature view.
Expand All @@ -49,9 +50,11 @@ message OnDemandFeatureViewSpec {
map<string, OnDemandSource> sources = 4;

oneof transformation {
UserDefinedFunction user_defined_function = 5;
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9;
UserDefinedFunction user_defined_function = 5 [deprecated = true];
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9 [deprecated = true];
}
// Oneof with {user_defined_function, on_demand_substrait_transformation}
FeatureTransformationV2 feature_transformation = 10;

// Description of the on demand feature view.
string description = 6;
Expand All @@ -61,6 +64,7 @@ message OnDemandFeatureViewSpec {

// Owner of the on demand feature view.
string owner = 8;
string mode = 11;
}

message OnDemandFeatureViewMeta {
Expand All @@ -81,6 +85,8 @@ message OnDemandSource {

// Serialized representation of python function.
message UserDefinedFunction {
option deprecated = true;

// The function name
string name = 1;

Expand All @@ -92,5 +98,7 @@ message UserDefinedFunction {
}

message OnDemandSubstraitTransformation {
option deprecated = true;

bytes substrait_plan = 1;
}
}
7 changes: 6 additions & 1 deletion protos/feast/core/StreamFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Aggregation.proto";
import "feast/core/Transformation.proto";

message StreamFeatureView {
// User-specified specifications of this feature view.
Expand Down Expand Up @@ -77,7 +78,8 @@ message StreamFeatureViewSpec {
bool online = 12;

// Serialized function that is encoded in the streamfeatureview
UserDefinedFunction user_defined_function = 13;
UserDefinedFunction user_defined_function = 13 [deprecated = true];


// Mode of execution
string mode = 14;
Expand All @@ -87,5 +89,8 @@ message StreamFeatureViewSpec {

// Timestamp field for aggregation
string timestamp_field = 16;

// Oneof with {user_defined_function, on_demand_substrait_transformation}
FeatureTransformationV2 feature_transformation = 17;
}

33 changes: 33 additions & 0 deletions protos/feast/core/Transformation.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "FeatureTransformationProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";

// Serialized representation of python function.
message UserDefinedFunctionV2 {
// The function name
string name = 1;

// The python-syntax function body (serialized by dill)
bytes body = 2;

// The string representation of the udf
string body_text = 3;
}

// A feature transformation executed as a user-defined function
message FeatureTransformationV2 {
// Note this Transformation starts at 5 for backwards compatibility
oneof transformation {
UserDefinedFunctionV2 user_defined_function = 1;
OnDemandSubstraitTransformationV2 on_demand_substrait_transformation = 2;
}
}

message OnDemandSubstraitTransformationV2 {
bytes substrait_plan = 1;
}
21 changes: 18 additions & 3 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, cast

Expand Down Expand Up @@ -144,11 +145,25 @@ def diff_registry_objects(
if _field.name in FIELDS_TO_IGNORE:
continue
elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name):
if _field.name == "user_defined_function":
# TODO: Delete "transformation" after we've safely deprecated it from the proto
if _field.name in ["transformation", "feature_transformation"]:
warnings.warn(
"transformation will be deprecated in the future please use feature_transformation instead.",
DeprecationWarning,
)
current_spec = cast(OnDemandFeatureViewSpec, current_spec)
new_spec = cast(OnDemandFeatureViewSpec, new_spec)
current_udf = current_spec.user_defined_function
new_udf = new_spec.user_defined_function
# Check if the old proto is populated and use that if it is
deprecated_udf = current_spec.user_defined_function
feature_transformation_udf = (
current_spec.feature_transformation.user_defined_function
)
current_udf = (
deprecated_udf
if deprecated_udf.body_text != ""
else feature_transformation_udf
)
new_udf = new_spec.feature_transformation.user_defined_function
for _udf_field in current_udf.DESCRIPTOR.fields:
if _udf_field.name == "body":
continue
Expand Down
14 changes: 11 additions & 3 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
Expand Down Expand Up @@ -662,10 +663,16 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
key=lambda on_demand_feature_view: on_demand_feature_view.name,
):
odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto())

odfv_dict["spec"]["userDefinedFunction"][
# We are logging a warning because the registry object may be read from a proto that is not updated
# i.e., we have to submit dual writes but in order to ensure the read behavior succeeds we have to load
# both objects to compare any changes in the registry
warnings.warn(
"We will be deprecating the usage of spec.userDefinedFunction in a future release please upgrade cautiously.",
DeprecationWarning,
)
odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.transformation.udf_string
] = on_demand_feature_view.feature_transformation.udf_string
registry_dict["onDemandFeatureViews"].append(odfv_dict)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
Expand All @@ -684,6 +691,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
"body"
] = stream_feature_view.udf_string
registry_dict["streamFeatureViews"].append(sfv_dict)

for saved_dataset in sorted(
self.list_saved_datasets(project=project), key=lambda item: item.name
):
Expand Down
54 changes: 44 additions & 10 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
OnDemandFeatureViewSpec,
OnDemandSource,
)
from feast.protos.feast.core.Transformation_pb2 import (
FeatureTransformationV2 as FeatureTransformationProto,
)
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand Down Expand Up @@ -63,6 +69,7 @@ class OnDemandFeatureView(BaseFeatureView):
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
transformation: Union[OnDemandPandasTransformation]
feature_transformation: Union[OnDemandPandasTransformation]
description: str
tags: Dict[str, str]
owner: str
Expand All @@ -83,6 +90,7 @@ def __init__( # noqa: C901
udf: Optional[FunctionType] = None,
udf_string: str = "",
transformation: Optional[Union[OnDemandPandasTransformation]] = None,
feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand All @@ -101,6 +109,7 @@ def __init__( # noqa: C901
dataframes as inputs.
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
transformation: The user defined transformation.
feature_transformation: The user defined transformation.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the on demand feature view, typically the email
Expand Down Expand Up @@ -139,6 +148,7 @@ def __init__( # noqa: C901
] = odfv_source.projection

self.transformation = transformation
self.feature_transformation = self.transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
Expand All @@ -151,6 +161,7 @@ def __copy__(self):
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
transformation=self.transformation,
feature_transformation=self.transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -172,6 +183,7 @@ def __eq__(self, other):
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.transformation != other.transformation
or self.feature_transformation != other.feature_transformation
):
return False

Expand Down Expand Up @@ -205,16 +217,19 @@ def to_proto(self) -> OnDemandFeatureViewProto:
request_data_source=request_sources.to_proto()
)

spec = OnDemandFeatureViewSpec(
name=self.name,
features=[feature.to_proto() for feature in self.features],
sources=sources,
feature_transformation = FeatureTransformationProto(
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
else None,
on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore
on_demand_substrait_transformation=self.transformation.to_proto()
if type(self.transformation) == OnDemandSubstraitTransformation
else None,
else None, # type: ignore
)
spec = OnDemandFeatureViewSpec(
name=self.name,
features=[feature.to_proto() for feature in self.features],
sources=sources,
feature_transformation=feature_transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down Expand Up @@ -254,18 +269,37 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
)

if (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "user_defined_function"
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
!= ""
):
transformation = OnDemandPandasTransformation.from_proto(
on_demand_feature_view_proto.spec.user_defined_function
on_demand_feature_view_proto.spec.feature_transformation.user_defined_function
)
elif (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "on_demand_substrait_transformation"
):
transformation = OnDemandSubstraitTransformation.from_proto(
on_demand_feature_view_proto.spec.on_demand_substrait_transformation
on_demand_feature_view_proto.spec.feature_transformation.on_demand_substrait_transformation
)
elif (
hasattr(on_demand_feature_view_proto.spec, "user_defined_function")
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
== ""
):
backwards_compatible_udf = UserDefinedFunctionProto(
name=on_demand_feature_view_proto.spec.user_defined_function.name,
body=on_demand_feature_view_proto.spec.user_defined_function.body,
body_text=on_demand_feature_view_proto.spec.user_defined_function.body_text,
)
transformation = OnDemandPandasTransformation.from_proto(
user_defined_function_proto=backwards_compatible_udf,
)
else:
raise Exception("At least one transformation type needs to be provided")
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/on_demand_pandas_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import dill
import pandas as pd

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)


Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/on_demand_substrait_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import pyarrow
import pyarrow.substrait as substrait # type: ignore # noqa

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto,
from feast.protos.feast.core.Transformation_pb2 import (
OnDemandSubstraitTransformationV2 as OnDemandSubstraitTransformationProto,
)


Expand Down
Loading

0 comments on commit c58ef74

Please sign in to comment.