@@ -41,7 +41,6 @@ class MySQLOnlineStore(OnlineStore):
41
41
_conn : Optional [Connection ] = None
42
42
43
43
def _get_conn (self , config : RepoConfig ) -> Connection :
44
-
45
44
online_store_config = config .online_store
46
45
assert isinstance (online_store_config , MySQLOnlineStoreConfig )
47
46
@@ -65,7 +64,6 @@ def online_write_batch(
65
64
],
66
65
progress : Optional [Callable [[int ], Any ]],
67
66
) -> None :
68
-
69
67
conn = self ._get_conn (config )
70
68
cur = conn .cursor ()
71
69
@@ -178,18 +176,26 @@ def update(
178
176
179
177
# We don't create any special state for the entities in this implementation.
180
178
for table in tables_to_keep :
179
+ table_name = _table_id (project , table )
180
+ index_name = f"{ table_name } _ek"
181
181
cur .execute (
182
- f"""CREATE TABLE IF NOT EXISTS { _table_id ( project , table ) } (entity_key VARCHAR(512),
182
+ f"""CREATE TABLE IF NOT EXISTS { table_name } (entity_key VARCHAR(512),
183
183
feature_name VARCHAR(256),
184
184
value BLOB,
185
185
event_ts timestamp NULL DEFAULT NULL,
186
186
created_ts timestamp NULL DEFAULT NULL,
187
187
PRIMARY KEY(entity_key, feature_name))"""
188
188
)
189
-
190
- cur .execute (
191
- f"ALTER TABLE { _table_id (project , table )} ADD INDEX { _table_id (project , table )} _ek (entity_key);"
189
+ index_exists = cur .execute (
190
+ f"""
191
+ SELECT 1 FROM information_schema.statistics
192
+ WHERE table_schema = DATABASE() AND table_name = '{ table_name } ' AND index_name = '{ index_name } '
193
+ """
192
194
)
195
+ if not index_exists :
196
+ cur .execute (
197
+ f"ALTER TABLE { table_name } ADD INDEX { index_name } (entity_key);"
198
+ )
193
199
194
200
for table in tables_to_delete :
195
201
_drop_table_and_index (cur , project , table )
0 commit comments