Skip to content

Commit 6256369

Browse files
shuchutokoko
authored andcommitted
fix: Verify the existence of Registry tables in snowflake before calling CREATE sql command. Allow read-only user to call feast apply. (feast-dev#3851)
Signed-off-by: Shuchu Han <shuchu.han@gmail.com> Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
1 parent 1220ab6 commit 6256369

File tree

3 files changed

+171
-14
lines changed

3 files changed

+171
-14
lines changed

sdk/python/feast/infra/registry/snowflake.py

+62-9
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,19 @@ def __init__(
124124
f'"{self.registry_config.database}"."{self.registry_config.schema_}"'
125125
)
126126

127-
with GetSnowflakeConnection(self.registry_config) as conn:
128-
sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql"
129-
with open(sql_function_file, "r") as file:
130-
sqlFile = file.read()
131-
132-
sqlCommands = sqlFile.split(";")
133-
for command in sqlCommands:
134-
query = command.replace("REGISTRY_PATH", f"{self.registry_path}")
135-
execute_snowflake_statement(conn, query)
127+
if not self._verify_registry_database():
128+
# Verify the existing resitry database schema from snowflake. If any table names and column types is wrong, run table recreation SQL.
129+
with GetSnowflakeConnection(self.registry_config) as conn:
130+
sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql"
131+
with open(sql_function_file, "r") as file:
132+
sqlFile = file.read()
133+
134+
sqlCommands = sqlFile.split(";")
135+
for command in sqlCommands:
136+
query = command.replace(
137+
"REGISTRY_PATH", f"{self.registry_path}"
138+
)
139+
execute_snowflake_statement(conn, query)
136140

137141
self.cached_registry_proto = self.proto()
138142
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
@@ -145,6 +149,55 @@ def __init__(
145149
)
146150
self.project = project
147151

152+
def _verify_registry_database(
153+
self,
154+
) -> bool:
155+
"""Verify the records in registry database. To check:
156+
1, the 11 tables are existed.
157+
2, the column types are correct.
158+
159+
Example return from snowflake's cursor.describe("SELECT * FROM a_table") command:
160+
[ResultMetadata(name='ENTITY_NAME', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False),
161+
ResultMetadata(name='PROJECT_ID', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False),
162+
ResultMetadata(name='LAST_UPDATED_TIMESTAMP', type_code=6, display_size=None, internal_size=None, precision=0, scale=9, is_nullable=False),
163+
ResultMetadata(name='ENTITY_PROTO', type_code=11, display_size=None, internal_size=8388608, precision=None, scale=None, is_nullable=False)]
164+
165+
Returns:
166+
True if the necessary 11 tables are existed in Snowflake and schema of each table is correct.
167+
False if failure happens.
168+
"""
169+
170+
from feast.infra.utils.snowflake.registry.snowflake_registry_table import (
171+
snowflake_registry_table_names_and_column_types as expect_tables,
172+
)
173+
174+
res = True
175+
176+
try:
177+
with GetSnowflakeConnection(self.registry_config) as conn:
178+
for table_name in expect_tables:
179+
result_metadata_list = conn.cursor().describe(
180+
f"SELECT * FROM {table_name}"
181+
)
182+
for col in result_metadata_list:
183+
if (
184+
expect_tables[table_name][col.name]["type_code"]
185+
!= col.type_code
186+
):
187+
res = False
188+
break
189+
except Exception as e:
190+
res = False # Set to False for all errors.
191+
logger.debug(
192+
f"Failed to verify Registry tables and columns types with exception: {e}."
193+
)
194+
finally:
195+
# The implementation in snowflake_utils.py will cache the established connection without re-connection logic.
196+
# conn.close()
197+
pass
198+
199+
return res
200+
148201
def refresh(self, project: Optional[str] = None):
149202
if project:
150203
project_metadata = proto_registry_utils.get_project_metadata(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
The table names and column types are following the creation detail listed
5+
in "snowflake_table_creation.sql".
6+
7+
Snowflake Reference:
8+
1, ResultMetadata: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-resultmetadata-object
9+
2, Type Codes: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-type-codes
10+
----------------------------------------------
11+
type_code String Representation Data Type
12+
0 FIXED NUMBER/INT
13+
1 REAL REAL
14+
2 TEXT VARCHAR/STRING
15+
3 DATE DATE
16+
4 TIMESTAMP TIMESTAMP
17+
5 VARIANT VARIANT
18+
6 TIMESTAMP_LTZ TIMESTAMP_LTZ
19+
7 TIMESTAMP_TZ TIMESTAMP_TZ
20+
8 TIMESTAMP_NTZ TIMESTAMP_TZ
21+
9 OBJECT OBJECT
22+
10 ARRAY ARRAY
23+
11 BINARY BINARY
24+
12 TIME TIME
25+
13 BOOLEAN BOOLEAN
26+
----------------------------------------------
27+
28+
(last update: 2023-11-30)
29+
30+
"""
31+
32+
snowflake_registry_table_names_and_column_types = {
33+
"DATA_SOURCES": {
34+
"DATA_SOURCE_NAME": {"type_code": 2, "type": "VARCHAR"},
35+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
36+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
37+
"DATA_SOURCE_PROTO": {"type_code": 11, "type": "BINARY"},
38+
},
39+
"ENTITIES": {
40+
"ENTITY_NAME": {"type_code": 2, "type": "VARCHAR"},
41+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
42+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
43+
"ENTITY_PROTO": {"type_code": 11, "type": "BINARY"},
44+
},
45+
"FEAST_METADATA": {
46+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
47+
"METADATA_KEY": {"type_code": 2, "type": "VARCHAR"},
48+
"METADATA_VALUE": {"type_code": 2, "type": "VARCHAR"},
49+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
50+
},
51+
"FEATURE_SERVICES": {
52+
"FEATURE_SERVICE_NAME": {"type_code": 2, "type": "VARCHAR"},
53+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
54+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
55+
"FEATURE_SERVICE_PROTO": {"type_code": 11, "type": "BINARY"},
56+
},
57+
"FEATURE_VIEWS": {
58+
"FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
59+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
60+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
61+
"FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
62+
"MATERIALIZED_INTERVALS": {"type_code": 11, "type": "BINARY"},
63+
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
64+
},
65+
"MANAGED_INFRA": {
66+
"INFRA_NAME": {"type_code": 2, "type": "VARCHAR"},
67+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
68+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
69+
"INFRA_PROTO": {"type_code": 11, "type": "BINARY"},
70+
},
71+
"ON_DEMAND_FEATURE_VIEWS": {
72+
"ON_DEMAND_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
73+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
74+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
75+
"ON_DEMAND_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
76+
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
77+
},
78+
"REQUEST_FEATURE_VIEWS": {
79+
"REQUEST_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
80+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
81+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
82+
"REQUEST_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
83+
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
84+
},
85+
"SAVED_DATASETS": {
86+
"SAVED_DATASET_NAME": {"type_code": 2, "type": "VARCHAR"},
87+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
88+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
89+
"SAVED_DATASET_PROTO": {"type_code": 11, "type": "BINARY"},
90+
},
91+
"STREAM_FEATURE_VIEWS": {
92+
"STREAM_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
93+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
94+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
95+
"STREAM_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
96+
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
97+
},
98+
"VALIDATION_REFERENCES": {
99+
"VALIDATION_REFERENCE_NAME": {"type_code": 2, "type": "VARCHAR"},
100+
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
101+
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
102+
"VALIDATION_REFERENCE_PROTO": {"type_code": 11, "type": "BINARY"},
103+
},
104+
}

sdk/python/feast/infra/utils/snowflake/snowflake_utils.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,19 @@ def __init__(self, config: str, autocommit=True):
4949

5050
def __enter__(self):
5151

52-
assert self.config.type in [
52+
assert self.config.type in {
5353
"snowflake.registry",
5454
"snowflake.offline",
5555
"snowflake.engine",
5656
"snowflake.online",
57-
]
57+
}
5858

5959
if self.config.type not in _cache:
6060
if self.config.type == "snowflake.registry":
6161
config_header = "connections.feast_registry"
6262
elif self.config.type == "snowflake.offline":
6363
config_header = "connections.feast_offline_store"
64-
if self.config.type == "snowflake.engine":
64+
elif self.config.type == "snowflake.engine":
6565
config_header = "connections.feast_batch_engine"
6666
elif self.config.type == "snowflake.online":
6767
config_header = "connections.feast_online_store"
@@ -113,11 +113,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
113113

114114
def assert_snowflake_feature_names(feature_view: FeatureView) -> None:
115115
for feature in feature_view.features:
116-
assert feature.name not in [
116+
assert feature.name not in {
117117
"entity_key",
118118
"feature_name",
119119
"feature_value",
120-
], f"Feature Name: {feature.name} is a protected name to ensure query stability"
120+
}, f"Feature Name: {feature.name} is a protected name to ensure query stability"
121121
return None
122122

123123

0 commit comments

Comments
 (0)