Skip to content

Commit 31b3a4c

Browse files
tokokotqtensor
authored andcommitted
feat: Add Substrait-based ODFV transformation (feast-dev#3969)
1 parent 7a411ce commit 31b3a4c

11 files changed

+504
-164
lines changed

protos/feast/core/OnDemandFeatureView.proto

+5
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ message OnDemandFeatureViewSpec {
5050

5151
oneof transformation {
5252
UserDefinedFunction user_defined_function = 5;
53+
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9;
5354
}
5455

5556
// Description of the on demand feature view.
@@ -89,3 +90,7 @@ message UserDefinedFunction {
8990
// The string representation of the udf
9091
string body_text = 3;
9192
}
93+
94+
message OnDemandSubstraitTransformation {
95+
bytes substrait_plan = 1;
96+
}

sdk/python/feast/on_demand_feature_view.py

+52-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import copy
22
import functools
3+
import inspect
34
import warnings
45
from datetime import datetime
56
from types import FunctionType
@@ -17,6 +18,7 @@
1718
from feast.feature_view_projection import FeatureViewProjection
1819
from feast.field import Field, from_value_type
1920
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
21+
from feast.on_demand_substrait_transformation import OnDemandSubstraitTransformation
2022
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
2123
OnDemandFeatureView as OnDemandFeatureViewProto,
2224
)
@@ -210,6 +212,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
210212
user_defined_function=self.transformation.to_proto()
211213
if type(self.transformation) == OnDemandPandasTransformation
212214
else None,
215+
on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore
216+
if type(self.transformation) == OnDemandSubstraitTransformation
217+
else None,
213218
description=self.description,
214219
tags=self.tags,
215220
owner=self.owner,
@@ -255,6 +260,13 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
255260
transformation = OnDemandPandasTransformation.from_proto(
256261
on_demand_feature_view_proto.spec.user_defined_function
257262
)
263+
elif (
264+
on_demand_feature_view_proto.spec.WhichOneof("transformation")
265+
== "on_demand_substrait_transformation"
266+
):
267+
transformation = OnDemandSubstraitTransformation.from_proto(
268+
on_demand_feature_view_proto.spec.on_demand_substrait_transformation
269+
)
258270
else:
259271
raise Exception("At least one transformation type needs to be provided")
260272

@@ -460,10 +472,47 @@ def mainify(obj) -> None:
460472
obj.__module__ = "__main__"
461473

462474
def decorator(user_function):
463-
udf_string = dill.source.getsource(user_function)
464-
mainify(user_function)
475+
return_annotation = inspect.signature(user_function).return_annotation
476+
if (
477+
return_annotation
478+
and return_annotation.__module__ == "ibis.expr.types.relations"
479+
and return_annotation.__name__ == "Table"
480+
):
481+
import ibis
482+
import ibis.expr.datatypes as dt
483+
from ibis_substrait.compiler.core import SubstraitCompiler
484+
485+
compiler = SubstraitCompiler()
486+
487+
input_fields: Field = []
488+
489+
for s in sources:
490+
if type(s) == FeatureView:
491+
fields = s.projection.features
492+
else:
493+
fields = s.features
494+
495+
input_fields.extend(
496+
[
497+
(
498+
f.name,
499+
dt.dtype(
500+
feast_value_type_to_pandas_type(f.dtype.to_value_type())
501+
),
502+
)
503+
for f in fields
504+
]
505+
)
506+
507+
expr = user_function(ibis.table(input_fields, "t"))
465508

466-
transformation = OnDemandPandasTransformation(user_function, udf_string)
509+
transformation = OnDemandSubstraitTransformation(
510+
substrait_plan=compiler.compile(expr).SerializeToString()
511+
)
512+
else:
513+
udf_string = dill.source.getsource(user_function)
514+
mainify(user_function)
515+
transformation = OnDemandPandasTransformation(user_function, udf_string)
467516

468517
on_demand_feature_view_obj = OnDemandFeatureView(
469518
name=user_function.__name__,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import pandas as pd
2+
import pyarrow
3+
import pyarrow.substrait as substrait # type: ignore # noqa
4+
5+
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
6+
OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto,
7+
)
8+
9+
10+
class OnDemandSubstraitTransformation:
11+
def __init__(self, substrait_plan: bytes):
12+
"""
13+
Creates an OnDemandSubstraitTransformation object.
14+
15+
Args:
16+
substrait_plan: The user-provided substrait plan.
17+
"""
18+
self.substrait_plan = substrait_plan
19+
20+
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
21+
def table_provider(names, schema: pyarrow.Schema):
22+
return pyarrow.Table.from_pandas(df[schema.names])
23+
24+
table: pyarrow.Table = pyarrow.substrait.run_query(
25+
self.substrait_plan, table_provider=table_provider
26+
).read_all()
27+
return table.to_pandas()
28+
29+
def __eq__(self, other):
30+
if not isinstance(other, OnDemandSubstraitTransformation):
31+
raise TypeError(
32+
"Comparisons should only involve OnDemandSubstraitTransformation class objects."
33+
)
34+
35+
if not super().__eq__(other):
36+
return False
37+
38+
return self.substrait_plan == other.substrait_plan
39+
40+
def to_proto(self) -> OnDemandSubstraitTransformationProto:
41+
return OnDemandSubstraitTransformationProto(substrait_plan=self.substrait_plan)
42+
43+
@classmethod
44+
def from_proto(
45+
cls,
46+
on_demand_substrait_transformation_proto: OnDemandSubstraitTransformationProto,
47+
):
48+
return OnDemandSubstraitTransformation(
49+
substrait_plan=on_demand_substrait_transformation_proto.substrait_plan
50+
)

0 commit comments

Comments
 (0)