Skip to content

Commit a2ef375

Browse files
feat: Allow local feature server to use Go feature server if enabled (#2538)
* Allow local feature server to use Go feature server if enabled Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Initialize _go_server correctly Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Convert proto values to native values to support Go feature server Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent bd4cb74 commit a2ef375

File tree

3 files changed

+51
-27
lines changed

3 files changed

+51
-27
lines changed

sdk/python/feast/feature_server.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ def get_online_features(body=Depends(get_body)):
5252
raise HTTPException(status_code=500, detail="Uneven number of columns")
5353

5454
response_proto = store._get_online_features(
55-
features,
56-
request_proto.entities,
55+
features=features,
56+
entity_values=request_proto.entities,
5757
full_feature_names=full_feature_names,
5858
native_entity_values=False,
5959
).proto

sdk/python/feast/feature_store.py

+44-25
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@
8282
from feast.repo_contents import RepoContents
8383
from feast.request_feature_view import RequestFeatureView
8484
from feast.saved_dataset import SavedDataset, SavedDatasetStorage
85-
from feast.type_map import python_values_to_proto_values
85+
from feast.type_map import (
86+
feast_value_type_to_python_type,
87+
python_values_to_proto_values,
88+
)
8689
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
8790
from feast.value_type import ValueType
8891
from feast.version import get_version
@@ -135,6 +138,7 @@ def __init__(
135138
self._registry = Registry(registry_config, repo_path=self.repo_path)
136139
self._registry._initialize_registry()
137140
self._provider = get_provider(self.config, self.repo_path)
141+
self._go_server = None
138142

139143
@log_exceptions
140144
def version(self) -> str:
@@ -1284,7 +1288,29 @@ def get_online_features(
12841288
except KeyError as e:
12851289
raise ValueError("All entity_rows must have the same keys.") from e
12861290

1287-
# If Go feature server is enabled, send request to it instead of going through a regular Python logic
1291+
return self._get_online_features(
1292+
features=features,
1293+
entity_values=columnar,
1294+
full_feature_names=full_feature_names,
1295+
native_entity_values=True,
1296+
)
1297+
1298+
def _get_online_features(
1299+
self,
1300+
features: Union[List[str], FeatureService],
1301+
entity_values: Mapping[
1302+
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
1303+
],
1304+
full_feature_names: bool = False,
1305+
native_entity_values: bool = True,
1306+
):
1307+
# Extract Sequence from RepeatedValue Protobuf.
1308+
entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = {
1309+
k: list(v) if isinstance(v, Sequence) else list(v.val)
1310+
for k, v in entity_values.items()
1311+
}
1312+
1313+
# If Go feature server is enabled, send request to it instead of going through regular Python logic
12881314
if self.config.go_feature_server:
12891315
from feast.embedded_go.online_features_service import (
12901316
EmbeddedOnlineFeatureServer,
@@ -1296,32 +1322,31 @@ def get_online_features(
12961322
str(self.repo_path.absolute()), self.config, self
12971323
)
12981324

1325+
entity_native_values: Dict[str, List[Any]]
1326+
if not native_entity_values:
1327+
# Convert proto types to native types since Go feature server currently
1328+
# only handles native types.
1329+
# TODO(felixwang9817): Remove this logic once native types are supported.
1330+
entity_native_values = {
1331+
k: [
1332+
feast_value_type_to_python_type(proto_value)
1333+
for proto_value in v
1334+
]
1335+
for k, v in entity_value_lists.items()
1336+
}
1337+
else:
1338+
entity_native_values = entity_value_lists
1339+
12991340
return self._go_server.get_online_features(
13001341
features_refs=features if isinstance(features, list) else [],
13011342
feature_service=features
13021343
if isinstance(features, FeatureService)
13031344
else None,
1304-
entities=columnar,
1345+
entities=entity_native_values,
13051346
request_data={}, # TODO: add request data parameter to public API
13061347
full_feature_names=full_feature_names,
13071348
)
13081349

1309-
return self._get_online_features(
1310-
features=features,
1311-
entity_values=columnar,
1312-
full_feature_names=full_feature_names,
1313-
native_entity_values=True,
1314-
)
1315-
1316-
def _get_online_features(
1317-
self,
1318-
features: Union[List[str], FeatureService],
1319-
entity_values: Mapping[
1320-
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
1321-
],
1322-
full_feature_names: bool = False,
1323-
native_entity_values: bool = True,
1324-
):
13251350
_feature_refs = self._get_features(features, allow_cache=True)
13261351
(
13271352
requested_feature_views,
@@ -1344,12 +1369,6 @@ def _get_online_features(
13441369
join_keys_set,
13451370
) = self._get_entity_maps(requested_feature_views)
13461371

1347-
# Extract Sequence from RepeatedValue Protobuf.
1348-
entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = {
1349-
k: list(v) if isinstance(v, Sequence) else list(v.val)
1350-
for k, v in entity_values.items()
1351-
}
1352-
13531372
entity_proto_values: Dict[str, List[Value]]
13541373
if native_entity_values:
13551374
# Convert values to Protobuf once.

sdk/python/tests/integration/feature_repos/repo_configuration.py

+5
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@
118118
IntegrationTestRepoConfig(
119119
online_store=REDIS_CONFIG, go_feature_server=True,
120120
),
121+
IntegrationTestRepoConfig(
122+
online_store=REDIS_CONFIG,
123+
python_feature_server=True,
124+
go_feature_server=True,
125+
),
121126
]
122127
)
123128
full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME)

0 commit comments

Comments
 (0)