Skip to content

Commit 311efc5

Browse files
authored
feat: Adding get_online_features_async to feature store sdk (#4172)
* feat: Adding get_online_features_async to feature store sdk Signed-off-by: Breno Costa <brenocosta0901@gmail.com> * add more unit tests Signed-off-by: Breno Costa <brenocosta0901@gmail.com> * fix redis key generation Signed-off-by: Breno Costa <brenocosta0901@gmail.com> * fix unit tests Signed-off-by: Breno Costa <brenocosta0901@gmail.com> --------- Signed-off-by: Breno Costa <brenocosta0901@gmail.com>
1 parent 369ca98 commit 311efc5

File tree

9 files changed

+522
-56
lines changed

9 files changed

+522
-56
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<img src="docs/assets/feast_logo.png" width="550">
66
</a>
77
</p>
8-
<br />
8+
<br />
99

1010
[![unit-tests](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml)
1111
[![integration-tests-and-build](https://github.com/feast-dev/feast/actions/workflows/master_only.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/master_only.yml)

sdk/python/feast/feature_store.py

+206-27
Original file line numberDiff line numberDiff line change
@@ -1550,6 +1550,54 @@ def get_online_features(
15501550
native_entity_values=True,
15511551
)
15521552

1553+
@log_exceptions_and_usage
1554+
async def get_online_features_async(
1555+
self,
1556+
features: Union[List[str], FeatureService],
1557+
entity_rows: List[Dict[str, Any]],
1558+
full_feature_names: bool = False,
1559+
) -> OnlineResponse:
1560+
"""
1561+
[Alpha] Retrieves the latest online feature data asynchronously.
1562+
1563+
Note: This method will download the full feature registry the first time it is run. If you are using a
1564+
remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL
1565+
duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has
1566+
passed), then a new registry will be downloaded synchronously by this method. This download may
1567+
introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call
1568+
refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to
1569+
infinity (cache forever).
1570+
1571+
Args:
1572+
features: The list of features that should be retrieved from the online store. These features can be
1573+
specified either as a list of string feature references or as a feature service. String feature
1574+
references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions".
1575+
entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair.
1576+
full_feature_names: If True, feature names will be prefixed with the corresponding feature view name,
1577+
changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions"
1578+
changes to "customer_fv__daily_transactions").
1579+
1580+
Returns:
1581+
OnlineResponse containing the feature data in records.
1582+
1583+
Raises:
1584+
Exception: No entity with the specified name exists.
1585+
"""
1586+
columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()}
1587+
for entity_row in entity_rows:
1588+
for key, value in entity_row.items():
1589+
try:
1590+
columnar[key].append(value)
1591+
except KeyError as e:
1592+
raise ValueError("All entity_rows must have the same keys.") from e
1593+
1594+
return await self._get_online_features_async(
1595+
features=features,
1596+
entity_values=columnar,
1597+
full_feature_names=full_feature_names,
1598+
native_entity_values=True,
1599+
)
1600+
15531601
def _get_online_request_context(
15541602
self, features: Union[List[str], FeatureService], full_feature_names: bool
15551603
):
@@ -1609,7 +1657,7 @@ def _get_online_request_context(
16091657
entityless_case,
16101658
)
16111659

1612-
def _get_online_features(
1660+
def _prepare_entities_to_read_from_online_store(
16131661
self,
16141662
features: Union[List[str], FeatureService],
16151663
entity_values: Mapping[
@@ -1619,7 +1667,7 @@ def _get_online_features(
16191667
native_entity_values: bool = True,
16201668
):
16211669
(
1622-
_feature_refs,
1670+
feature_refs,
16231671
requested_on_demand_feature_views,
16241672
entity_name_to_join_key_map,
16251673
entity_type_map,
@@ -1694,6 +1742,40 @@ def _get_online_features(
16941742
[DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type
16951743
)
16961744

1745+
return (
1746+
join_key_values,
1747+
grouped_refs,
1748+
entity_name_to_join_key_map,
1749+
requested_on_demand_feature_views,
1750+
feature_refs,
1751+
requested_result_row_names,
1752+
online_features_response,
1753+
)
1754+
1755+
def _get_online_features(
1756+
self,
1757+
features: Union[List[str], FeatureService],
1758+
entity_values: Mapping[
1759+
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
1760+
],
1761+
full_feature_names: bool = False,
1762+
native_entity_values: bool = True,
1763+
):
1764+
(
1765+
join_key_values,
1766+
grouped_refs,
1767+
entity_name_to_join_key_map,
1768+
requested_on_demand_feature_views,
1769+
feature_refs,
1770+
requested_result_row_names,
1771+
online_features_response,
1772+
) = self._prepare_entities_to_read_from_online_store(
1773+
features=features,
1774+
entity_values=entity_values,
1775+
full_feature_names=full_feature_names,
1776+
native_entity_values=native_entity_values,
1777+
)
1778+
16971779
provider = self._get_provider()
16981780
for table, requested_features in grouped_refs:
16991781
# Get the correct set of entity values with the correct join keys.
@@ -1724,7 +1806,71 @@ def _get_online_features(
17241806
if requested_on_demand_feature_views:
17251807
self._augment_response_with_on_demand_transforms(
17261808
online_features_response,
1727-
_feature_refs,
1809+
feature_refs,
1810+
requested_on_demand_feature_views,
1811+
full_feature_names,
1812+
)
1813+
1814+
self._drop_unneeded_columns(
1815+
online_features_response, requested_result_row_names
1816+
)
1817+
return OnlineResponse(online_features_response)
1818+
1819+
async def _get_online_features_async(
1820+
self,
1821+
features: Union[List[str], FeatureService],
1822+
entity_values: Mapping[
1823+
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
1824+
],
1825+
full_feature_names: bool = False,
1826+
native_entity_values: bool = True,
1827+
):
1828+
(
1829+
join_key_values,
1830+
grouped_refs,
1831+
entity_name_to_join_key_map,
1832+
requested_on_demand_feature_views,
1833+
feature_refs,
1834+
requested_result_row_names,
1835+
online_features_response,
1836+
) = self._prepare_entities_to_read_from_online_store(
1837+
features=features,
1838+
entity_values=entity_values,
1839+
full_feature_names=full_feature_names,
1840+
native_entity_values=native_entity_values,
1841+
)
1842+
1843+
provider = self._get_provider()
1844+
for table, requested_features in grouped_refs:
1845+
# Get the correct set of entity values with the correct join keys.
1846+
table_entity_values, idxs = self._get_unique_entities(
1847+
table,
1848+
join_key_values,
1849+
entity_name_to_join_key_map,
1850+
)
1851+
1852+
# Fetch feature data for the minimum set of Entities.
1853+
feature_data = await self._read_from_online_store_async(
1854+
table_entity_values,
1855+
provider,
1856+
requested_features,
1857+
table,
1858+
)
1859+
1860+
# Populate the result_rows with the Features from the OnlineStore inplace.
1861+
self._populate_response_from_feature_data(
1862+
feature_data,
1863+
idxs,
1864+
online_features_response,
1865+
full_feature_names,
1866+
requested_features,
1867+
table,
1868+
)
1869+
1870+
if requested_on_demand_feature_views:
1871+
self._augment_response_with_on_demand_transforms(
1872+
online_features_response,
1873+
feature_refs,
17281874
requested_on_demand_feature_views,
17291875
full_feature_names,
17301876
)
@@ -1965,38 +2111,24 @@ def _get_unique_entities(
19652111
)
19662112
return unique_entities, indexes
19672113

1968-
def _read_from_online_store(
2114+
def _get_entity_key_protos(
19692115
self,
19702116
entity_rows: Iterable[Mapping[str, Value]],
1971-
provider: Provider,
1972-
requested_features: List[str],
1973-
table: FeatureView,
1974-
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
1975-
"""Read and process data from the OnlineStore for a given FeatureView.
1976-
1977-
This method guarantees that the order of the data in each element of the
1978-
List returned is the same as the order of `requested_features`.
1979-
1980-
This method assumes that `provider.online_read` returns data for each
1981-
combination of Entities in `entity_rows` in the same order as they
1982-
are provided.
1983-
"""
2117+
) -> List[EntityKeyProto]:
19842118
# Instantiate one EntityKeyProto per Entity.
19852119
entity_key_protos = [
19862120
EntityKeyProto(join_keys=row.keys(), entity_values=row.values())
19872121
for row in entity_rows
19882122
]
2123+
return entity_key_protos
19892124

1990-
# Fetch data for Entities.
1991-
read_rows = provider.online_read(
1992-
config=self.config,
1993-
table=table,
1994-
entity_keys=entity_key_protos,
1995-
requested_features=requested_features,
1996-
)
1997-
1998-
# Each row is a set of features for a given entity key. We only need to convert
1999-
# the data to Protobuf once.
2125+
def _convert_rows_to_protobuf(
2126+
self,
2127+
requested_features: List[str],
2128+
read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]],
2129+
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
2130+
# Each row is a set of features for a given entity key.
2131+
# We only need to convert the data to Protobuf once.
20002132
null_value = Value()
20012133
read_row_protos = []
20022134
for read_row in read_rows:
@@ -2023,6 +2155,53 @@ def _read_from_online_store(
20232155
read_row_protos.append((event_timestamps, statuses, values))
20242156
return read_row_protos
20252157

2158+
def _read_from_online_store(
2159+
self,
2160+
entity_rows: Iterable[Mapping[str, Value]],
2161+
provider: Provider,
2162+
requested_features: List[str],
2163+
table: FeatureView,
2164+
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
2165+
"""Read and process data from the OnlineStore for a given FeatureView.
2166+
2167+
This method guarantees that the order of the data in each element of the
2168+
List returned is the same as the order of `requested_features`.
2169+
2170+
This method assumes that `provider.online_read` returns data for each
2171+
combination of Entities in `entity_rows` in the same order as they
2172+
are provided.
2173+
"""
2174+
entity_key_protos = self._get_entity_key_protos(entity_rows)
2175+
2176+
# Fetch data for Entities.
2177+
read_rows = provider.online_read(
2178+
config=self.config,
2179+
table=table,
2180+
entity_keys=entity_key_protos,
2181+
requested_features=requested_features,
2182+
)
2183+
2184+
return self._convert_rows_to_protobuf(requested_features, read_rows)
2185+
2186+
async def _read_from_online_store_async(
2187+
self,
2188+
entity_rows: Iterable[Mapping[str, Value]],
2189+
provider: Provider,
2190+
requested_features: List[str],
2191+
table: FeatureView,
2192+
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
2193+
entity_key_protos = self._get_entity_key_protos(entity_rows)
2194+
2195+
# Fetch data for Entities.
2196+
read_rows = await provider.online_read_async(
2197+
config=self.config,
2198+
table=table,
2199+
entity_keys=entity_key_protos,
2200+
requested_features=requested_features,
2201+
)
2202+
2203+
return self._convert_rows_to_protobuf(requested_features, read_rows)
2204+
20262205
def _retrieve_from_online_store(
20272206
self,
20282207
provider: Provider,

sdk/python/feast/infra/online_stores/online_store.py

+25
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,31 @@ def online_read(
8080
"""
8181
pass
8282

83+
async def online_read_async(
84+
self,
85+
config: RepoConfig,
86+
table: FeatureView,
87+
entity_keys: List[EntityKeyProto],
88+
requested_features: Optional[List[str]] = None,
89+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
90+
"""
91+
Reads features values for the given entity keys asynchronously.
92+
93+
Args:
94+
config: The config for the current feature store.
95+
table: The feature view whose feature values should be read.
96+
entity_keys: The list of entity keys for which feature values should be read.
97+
requested_features: The list of features that should be read.
98+
99+
Returns:
100+
A list of the same length as entity_keys. Each item in the list is a tuple where the first
101+
item is the event timestamp for the row, and the second item is a dict mapping feature names
102+
to values, which are returned in proto format.
103+
"""
104+
raise NotImplementedError(
105+
f"Online store {self.__class__.__name__} does not support online read async"
106+
)
107+
83108
@abstractmethod
84109
def update(
85110
self,

0 commit comments

Comments
 (0)