Skip to content

Commit 73bc853

Browse files
feat: Adding support for Native Python feature transformations for ODFVs (#4045)
1 parent ef62def commit 73bc853

10 files changed

+488
-38
lines changed

sdk/python/feast/embedded_go/online_features_service.py

+5
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,11 @@ def transformation_callback(
252252
# the typeguard requirement.
253253
full_feature_names = bool(full_feature_names)
254254

255+
if odfv.mode != "pandas":
256+
raise Exception(
257+
f"OnDemandFeatureView mode '{odfv.mode} not supported by EmbeddedOnlineFeatureServer."
258+
)
259+
255260
output = odfv.get_transformed_features_df(
256261
input_record.to_pandas(), full_feature_names=full_feature_names
257262
)

sdk/python/feast/feature_store.py

+41-14
Original file line numberDiff line numberDiff line change
@@ -1995,26 +1995,53 @@ def _augment_response_with_on_demand_transforms(
19951995
)
19961996

19971997
initial_response = OnlineResponse(online_features_response)
1998-
initial_response_df = initial_response.to_df()
1998+
initial_response_df: Optional[pd.DataFrame] = None
1999+
initial_response_dict: Optional[Dict[str, List[Any]]] = None
19992000

20002001
# Apply on demand transformations and augment the result rows
20012002
odfv_result_names = set()
20022003
for odfv_name, _feature_refs in odfv_feature_refs.items():
20032004
odfv = requested_odfv_map[odfv_name]
2004-
transformed_features_df = odfv.get_transformed_features_df(
2005-
initial_response_df,
2006-
full_feature_names,
2007-
)
2008-
selected_subset = [
2009-
f for f in transformed_features_df.columns if f in _feature_refs
2010-
]
2011-
2012-
proto_values = [
2013-
python_values_to_proto_values(
2014-
transformed_features_df[feature].values, ValueType.UNKNOWN
2005+
if odfv.mode == "python":
2006+
if initial_response_dict is None:
2007+
initial_response_dict = initial_response.to_dict()
2008+
transformed_features_dict: Dict[
2009+
str, List[Any]
2010+
] = odfv.get_transformed_features(
2011+
initial_response_dict,
2012+
full_feature_names,
20152013
)
2016-
for feature in selected_subset
2017-
]
2014+
elif odfv.mode in {"pandas", "substrait"}:
2015+
if initial_response_df is None:
2016+
initial_response_df = initial_response.to_df()
2017+
transformed_features_df: pd.DataFrame = odfv.get_transformed_features(
2018+
initial_response_df,
2019+
full_feature_names,
2020+
)
2021+
else:
2022+
raise Exception(
2023+
f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'."
2024+
)
2025+
2026+
transformed_features = (
2027+
transformed_features_dict
2028+
if odfv.mode == "python"
2029+
else transformed_features_df
2030+
)
2031+
transformed_columns = (
2032+
transformed_features.columns
2033+
if isinstance(transformed_features, pd.DataFrame)
2034+
else transformed_features
2035+
)
2036+
selected_subset = [f for f in transformed_columns if f in _feature_refs]
2037+
2038+
proto_values = []
2039+
for selected_feature in selected_subset:
2040+
if odfv.mode in ["python", "pandas"]:
2041+
feature_vector = transformed_features[selected_feature]
2042+
proto_values.append(
2043+
python_values_to_proto_values(feature_vector, ValueType.UNKNOWN)
2044+
)
20182045

20192046
odfv_result_names |= set(selected_subset)
20202047

sdk/python/feast/infra/offline_stores/offline_store.py

+8
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ def to_df(
8181
if self.on_demand_feature_views:
8282
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
8383
for odfv in self.on_demand_feature_views:
84+
if odfv.mode not in {"pandas", "substrait"}:
85+
raise Exception(
86+
f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.'
87+
)
8488
features_df = features_df.join(
8589
odfv.get_transformed_features_df(
8690
features_df,
@@ -124,6 +128,10 @@ def to_arrow(
124128
features_df = self._to_df_internal(timeout=timeout)
125129
if self.on_demand_feature_views:
126130
for odfv in self.on_demand_feature_views:
131+
if odfv.mode != "pandas":
132+
raise Exception(
133+
f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.'
134+
)
127135
features_df = features_df.join(
128136
odfv.get_transformed_features_df(
129137
features_df,

0 commit comments

Comments
 (0)