13
13
# limitations under the License.
14
14
import itertools
15
15
import os
16
+ import json
16
17
import sqlite3
18
+ import sqlite_vss
17
19
from datetime import datetime
18
20
from pathlib import Path
19
21
from typing import Any , Callable , Dict , List , Literal , Optional , Sequence , Tuple
20
22
21
- import sqlite_vss
22
23
from pydantic import StrictStr
23
24
24
25
from feast import Entity
@@ -110,9 +111,9 @@ def online_write_batch(
110
111
created_ts = to_naive_utc (created_ts )
111
112
112
113
for feature_name , val in values .items ():
113
- vector_val = None
114
114
if config .online_store .vss_enabled :
115
- vector_val = self ._get_list_val_str (val )
115
+ print ('using vector search' )
116
+ vector_val = json .dumps (val )
116
117
conn .execute (
117
118
f"""
118
119
UPDATE { _table_id (project , table )}
@@ -121,7 +122,7 @@ def online_write_batch(
121
122
""" ,
122
123
(
123
124
# SET
124
- val . SerializeToString ( ),
125
+ str ( val ),
125
126
vector_val ,
126
127
timestamp ,
127
128
created_ts ,
@@ -138,13 +139,14 @@ def online_write_batch(
138
139
(
139
140
entity_key_bin ,
140
141
feature_name ,
141
- val . SerializeToString ( ),
142
+ str ( val ),
142
143
vector_val ,
143
144
timestamp ,
144
145
created_ts ,
145
146
),
146
147
)
147
148
else :
149
+ print ('not using vector search' )
148
150
conn .execute (
149
151
f"""
150
152
UPDATE { _table_id (project , table )}
@@ -243,8 +245,8 @@ def update(
243
245
project = config .project
244
246
245
247
for table in tables_to_keep :
246
- conn .execute (
247
- f"CREATE TABLE IF NOT EXISTS { _table_id (project , table )} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
248
+ self . conn .execute (
249
+ f"CREATE TABLE IF NOT EXISTS { _table_id (project , table )} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
248
250
)
249
251
conn .execute (
250
252
f"CREATE INDEX IF NOT EXISTS { _table_id (project , table )} _ek ON { _table_id (project , table )} (entity_key);"
@@ -282,20 +284,6 @@ def teardown(
282
284
except FileNotFoundError :
283
285
pass
284
286
285
- def _get_list_val_str (self , val : ValueProto ) -> str :
286
- if val .HasField ("string_list_val" ):
287
- return "," .join (val .string_list_val .val )
288
- elif val .HasField ("bytes_list_val" ):
289
- return "," .join (map (str , val .bytes_list_val .val ))
290
- elif val .HasField ("int64_list_val" ):
291
- return "," .join (map (str , val .int64_list_val .val ))
292
- elif val .HasField ("float_list_val" ):
293
- return "," .join (map (str , val .float_list_val .val ))
294
- elif val .HasField ("double_list_val" ):
295
- return "," .join (map (str , val .double_list_val .val ))
296
- else :
297
- raise ValueError ("Unsupported list value type" )
298
-
299
287
def retrieve_online_documents (
300
288
self ,
301
289
config : RepoConfig ,
@@ -434,7 +422,7 @@ def from_proto(sqlite_table_proto: SqliteTableProto) -> Any:
434
422
435
423
def update (self ):
436
424
self .conn .execute (
437
- f"CREATE TABLE IF NOT EXISTS { self .name } (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
425
+ f"CREATE TABLE IF NOT EXISTS { self .name } (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
438
426
)
439
427
self .conn .execute (
440
428
f"CREATE INDEX IF NOT EXISTS { self .name } _ek ON { self .name } (entity_key);"
0 commit comments