9
9
from feast import Entity , FeatureView , RepoConfig
10
10
from feast .infra .key_encoding_utils import serialize_entity_key
11
11
from feast .infra .online_stores .online_store import OnlineStore
12
- from feast .protos .feast .types .EntityKey_pb2 import EntityKey
13
- from feast .protos .feast .types .Value_pb2 import Value
12
+ from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
13
+ from feast .protos .feast .types .Value_pb2 import Value as ValueProto
14
14
from feast .repo_config import FeastConfigBaseModel
15
15
16
16
@@ -94,9 +94,9 @@ def online_read(
94
94
self ,
95
95
config : RepoConfig ,
96
96
table : FeatureView ,
97
- entity_keys : List [EntityKey ],
97
+ entity_keys : List [EntityKeyProto ],
98
98
requested_features : Optional [List [str ]] = None ,
99
- ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , Value ]]]]:
99
+ ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]]:
100
100
if self ._index is None :
101
101
return [(None , None )] * len (entity_keys )
102
102
@@ -111,7 +111,7 @@ def online_read(
111
111
else :
112
112
feature_vector = self ._index .reconstruct (int (idx ))
113
113
feature_dict = {
114
- name : Value (double_val = value )
114
+ name : ValueProto (double_val = value )
115
115
for name , value in zip (
116
116
self ._in_memory_store .feature_names , feature_vector
117
117
)
@@ -123,7 +123,9 @@ def online_write_batch(
123
123
self ,
124
124
config : RepoConfig ,
125
125
table : FeatureView ,
126
- data : List [Tuple [EntityKey , Dict [str , Value ], datetime , Optional [datetime ]]],
126
+ data : List [
127
+ Tuple [EntityKeyProto , Dict [str , ValueProto ], datetime , Optional [datetime ]]
128
+ ],
127
129
progress : Optional [Callable [[int ], Any ]],
128
130
) -> None :
129
131
if self ._index is None :
@@ -181,9 +183,10 @@ def retrieve_online_documents(
181
183
) -> List [
182
184
Tuple [
183
185
Optional [datetime ],
184
- Optional [Value ],
185
- Optional [Value ],
186
- Optional [Value ],
186
+ Optional [EntityKeyProto ],
187
+ Optional [ValueProto ],
188
+ Optional [ValueProto ],
189
+ Optional [ValueProto ],
187
190
]
188
191
]:
189
192
if self ._index is None :
@@ -196,9 +199,10 @@ def retrieve_online_documents(
196
199
results : List [
197
200
Tuple [
198
201
Optional [datetime ],
199
- Optional [Value ],
200
- Optional [Value ],
201
- Optional [Value ],
202
+ Optional [EntityKeyProto ],
203
+ Optional [ValueProto ],
204
+ Optional [ValueProto ],
205
+ Optional [ValueProto ],
202
206
]
203
207
] = []
204
208
for i , idx in enumerate (indices [0 ]):
@@ -209,14 +213,15 @@ def retrieve_online_documents(
209
213
210
214
timestamp = Timestamp ()
211
215
timestamp .GetCurrentTime ()
212
-
213
- feature_value = Value (string_val = "," .join (map (str , feature_vector )))
214
- vector_value = Value (string_val = "," .join (map (str , feature_vector )))
215
- distance_value = Value (float_val = distances [0 ][i ])
216
+ entity_value = EntityKeyProto ()
217
+ feature_value = ValueProto (string_val = "," .join (map (str , feature_vector )))
218
+ vector_value = ValueProto (string_val = "," .join (map (str , feature_vector )))
219
+ distance_value = ValueProto (float_val = distances [0 ][i ])
216
220
217
221
results .append (
218
222
(
219
223
timestamp .ToDatetime (),
224
+ entity_value ,
220
225
feature_value ,
221
226
vector_value ,
222
227
distance_value ,
@@ -229,8 +234,8 @@ async def online_read_async(
229
234
self ,
230
235
config : RepoConfig ,
231
236
table : FeatureView ,
232
- entity_keys : List [EntityKey ],
237
+ entity_keys : List [EntityKeyProto ],
233
238
requested_features : Optional [List [str ]] = None ,
234
- ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , Value ]]]]:
239
+ ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]]:
235
240
# Implement async read if needed
236
241
raise NotImplementedError ("Async read is not implemented for FaissOnlineStore" )
0 commit comments