Skip to content

Commit 52ba1ff

Browse files
author
Bhargav Dodla
committed
fix: Fix for SQL registry initialization fails #4543
Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com>
1 parent 0fb76e9 commit 52ba1ff

File tree

3 files changed

+116
-17
lines changed

3 files changed

+116
-17
lines changed

sdk/python/feast/infra/registry/caching_registry.py

+23-15
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from feast.permissions.permission import Permission
2020
from feast.project import Project
2121
from feast.project_metadata import ProjectMetadata
22+
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
2223
from feast.saved_dataset import SavedDataset, ValidationReference
2324
from feast.stream_feature_view import StreamFeatureView
2425
from feast.utils import _utc_now
@@ -28,13 +29,14 @@
2829

2930
class CachingRegistry(BaseRegistry):
3031
def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str):
31-
self.cached_registry_proto = self.proto()
32-
self.cached_registry_proto_created = _utc_now()
32+
self.cache_mode = cache_mode
33+
self.cached_registry_proto = RegistryProto()
3334
self._refresh_lock = Lock()
3435
self.cached_registry_proto_ttl = timedelta(
3536
seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0
3637
)
37-
self.cache_mode = cache_mode
38+
self.cached_registry_proto = self.proto()
39+
self.cached_registry_proto_created = _utc_now()
3840
if cache_mode == "thread":
3941
self._start_thread_async_refresh(cache_ttl_seconds)
4042
atexit.register(self._exit_handler)
@@ -429,20 +431,26 @@ def refresh(self, project: Optional[str] = None):
429431
def _refresh_cached_registry_if_necessary(self):
430432
if self.cache_mode == "sync":
431433
with self._refresh_lock:
432-
expired = (
433-
self.cached_registry_proto is None
434-
or self.cached_registry_proto_created is None
435-
) or (
436-
self.cached_registry_proto_ttl.total_seconds()
437-
> 0 # 0 ttl means infinity
438-
and (
439-
_utc_now()
440-
> (
441-
self.cached_registry_proto_created
442-
+ self.cached_registry_proto_ttl
434+
if self.cached_registry_proto == RegistryProto():
435+
# Avoids the need to refresh the registry when cache is not populated yet
436+
# Specially during the __init__ phase
437+
# proto() will populate the cache with project metadata if no objects are registered
438+
expired = False
439+
else:
440+
expired = (
441+
self.cached_registry_proto is None
442+
or self.cached_registry_proto_created is None
443+
) or (
444+
self.cached_registry_proto_ttl.total_seconds()
445+
> 0 # 0 ttl means infinity
446+
and (
447+
_utc_now()
448+
> (
449+
self.cached_registry_proto_created
450+
+ self.cached_registry_proto_ttl
451+
)
443452
)
444453
)
445-
)
446454
if expired:
447455
logger.info("Registry cache expired, so refreshing")
448456
self.refresh()

sdk/python/feast/infra/registry/sql.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ def __init__(
227227
registry_config, SqlRegistryConfig
228228
), "SqlRegistry needs a valid registry_config"
229229

230+
self.registry_config = registry_config
231+
230232
self.write_engine: Engine = create_engine(
231233
registry_config.path, **registry_config.sqlalchemy_config_kwargs
232234
)
@@ -257,7 +259,7 @@ def __init__(
257259
def _sync_feast_metadata_to_projects_table(self):
258260
feast_metadata_projects: set = []
259261
projects_set: set = []
260-
with self.write_engine.begin() as conn:
262+
with self.read_engine.begin() as conn:
261263
stmt = select(feast_metadata).where(
262264
feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value
263265
)
@@ -266,7 +268,7 @@ def _sync_feast_metadata_to_projects_table(self):
266268
feast_metadata_projects.append(row._mapping["project_id"])
267269

268270
if len(feast_metadata_projects) > 0:
269-
with self.write_engine.begin() as conn:
271+
with self.read_engine.begin() as conn:
270272
stmt = select(projects)
271273
rows = conn.execute(stmt).all()
272274
for row in rows:

sdk/python/tests/integration/registration/test_universal_registry.py

+89
Original file line numberDiff line numberDiff line change
@@ -1767,3 +1767,92 @@ def test_apply_entity_success_with_purge_feast_metadata(test_registry):
17671767
assert len(entities) == 0
17681768

17691769
test_registry.teardown()
1770+
1771+
1772+
combined_sql_fixtures = [
1773+
pytest.param(
1774+
lazy_fixture("pg_registry"), marks=pytest.mark.xdist_group(name="pg_registry")
1775+
),
1776+
pytest.param(
1777+
lazy_fixture("mysql_registry"),
1778+
marks=pytest.mark.xdist_group(name="mysql_registry"),
1779+
),
1780+
lazy_fixture("sqlite_registry"),
1781+
pytest.param(
1782+
lazy_fixture("pg_registry_async"),
1783+
marks=pytest.mark.xdist_group(name="pg_registry"),
1784+
),
1785+
pytest.param(
1786+
lazy_fixture("mysql_registry_async"),
1787+
marks=pytest.mark.xdist_group(name="mysql_registry"),
1788+
),
1789+
pytest.param(
1790+
lazy_fixture("pg_registry_purge_feast_metadata"),
1791+
marks=pytest.mark.xdist_group(name="pg_registry"),
1792+
),
1793+
pytest.param(
1794+
lazy_fixture("mysql_registry_purge_feast_metadata"),
1795+
marks=pytest.mark.xdist_group(name="mysql_registry"),
1796+
),
1797+
]
1798+
1799+
1800+
@pytest.mark.integration
1801+
@pytest.mark.parametrize(
1802+
"test_registry",
1803+
combined_sql_fixtures,
1804+
)
1805+
def test_apply_entity_to_sql_registry_and_reinitialize_sql_registry(test_registry):
1806+
entity = Entity(
1807+
name="driver_car_id",
1808+
description="Car driver id",
1809+
tags={"team": "matchmaking"},
1810+
)
1811+
1812+
project = "project"
1813+
1814+
# Register Entity
1815+
test_registry.apply_entity(entity, project)
1816+
assert_project(project, test_registry)
1817+
1818+
entities = test_registry.list_entities(project, tags=entity.tags)
1819+
assert_project(project, test_registry)
1820+
1821+
entity = entities[0]
1822+
assert (
1823+
len(entities) == 1
1824+
and entity.name == "driver_car_id"
1825+
and entity.description == "Car driver id"
1826+
and "team" in entity.tags
1827+
and entity.tags["team"] == "matchmaking"
1828+
)
1829+
1830+
entity = test_registry.get_entity("driver_car_id", project)
1831+
assert (
1832+
entity.name == "driver_car_id"
1833+
and entity.description == "Car driver id"
1834+
and "team" in entity.tags
1835+
and entity.tags["team"] == "matchmaking"
1836+
)
1837+
1838+
# After the first apply, the created_timestamp should be the same as the last_update_timestamp.
1839+
assert entity.created_timestamp == entity.last_updated_timestamp
1840+
updated_test_registry = SqlRegistry(test_registry.registry_config, "project", None)
1841+
1842+
# Update entity
1843+
updated_entity = Entity(
1844+
name="driver_car_id",
1845+
description="Car driver Id",
1846+
tags={"team": "matchmaking"},
1847+
)
1848+
updated_test_registry.apply_entity(updated_entity, project)
1849+
1850+
updated_entity = updated_test_registry.get_entity("driver_car_id", project)
1851+
updated_test_registry.delete_entity("driver_car_id", project)
1852+
assert_project(project, updated_test_registry)
1853+
entities = updated_test_registry.list_entities(project)
1854+
assert_project(project, updated_test_registry)
1855+
assert len(entities) == 0
1856+
1857+
updated_test_registry.teardown()
1858+
test_registry.teardown()

0 commit comments

Comments
 (0)