From 635019de58d9c8df0054b1bcac4a29d089357926 Mon Sep 17 00:00:00 2001 From: Shuchu Han Date: Thu, 30 Nov 2023 23:03:58 -0500 Subject: [PATCH] fix: Verify the existence of Registry tables in snowflake before calling CREATE sql command. Allow read-only user to call feast apply. Signed-off-by: Shuchu Han --- sdk/python/feast/infra/registry/snowflake.py | 71 ++++++++++-- .../registry/snowflake_registry_table.py | 104 ++++++++++++++++++ .../infra/utils/snowflake/snowflake_utils.py | 10 +- 3 files changed, 171 insertions(+), 14 deletions(-) create mode 100644 sdk/python/feast/infra/utils/snowflake/registry/snowflake_registry_table.py diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index 56c7bc1f65..40ec27e7d9 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -124,15 +124,19 @@ def __init__( f'"{self.registry_config.database}"."{self.registry_config.schema_}"' ) - with GetSnowflakeConnection(self.registry_config) as conn: - sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" - with open(sql_function_file, "r") as file: - sqlFile = file.read() - - sqlCommands = sqlFile.split(";") - for command in sqlCommands: - query = command.replace("REGISTRY_PATH", f"{self.registry_path}") - execute_snowflake_statement(conn, query) + if not self._verify_registry_database(): + # Verify the existing resitry database schema from snowflake. If any table names and column types is wrong, run table recreation SQL. + with GetSnowflakeConnection(self.registry_config) as conn: + sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" + with open(sql_function_file, "r") as file: + sqlFile = file.read() + + sqlCommands = sqlFile.split(";") + for command in sqlCommands: + query = command.replace( + "REGISTRY_PATH", f"{self.registry_path}" + ) + execute_snowflake_statement(conn, query) self.cached_registry_proto = self.proto() proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) @@ -145,6 +149,55 @@ def __init__( ) self.project = project + def _verify_registry_database( + self, + ) -> bool: + """Verify the records in registry database. To check: + 1, the 11 tables are existed. + 2, the column types are correct. + + Example return from snowflake's cursor.describe("SELECT * FROM a_table") command: + [ResultMetadata(name='ENTITY_NAME', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False), + ResultMetadata(name='PROJECT_ID', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False), + ResultMetadata(name='LAST_UPDATED_TIMESTAMP', type_code=6, display_size=None, internal_size=None, precision=0, scale=9, is_nullable=False), + ResultMetadata(name='ENTITY_PROTO', type_code=11, display_size=None, internal_size=8388608, precision=None, scale=None, is_nullable=False)] + + Returns: + True if the necessary 11 tables are existed in Snowflake and schema of each table is correct. + False if failure happens. + """ + + from feast.infra.utils.snowflake.registry.snowflake_registry_table import ( + snowflake_registry_table_names_and_column_types as expect_tables, + ) + + res = True + + try: + with GetSnowflakeConnection(self.registry_config) as conn: + for table_name in expect_tables: + result_metadata_list = conn.cursor().describe( + f"SELECT * FROM {table_name}" + ) + for col in result_metadata_list: + if ( + expect_tables[table_name][col.name]["type_code"] + != col.type_code + ): + res = False + break + except Exception as e: + res = False # Set to False for all errors. + logger.debug( + f"Failed to verify Registry tables and columns types with exception: {e}." + ) + finally: + # The implementation in snowflake_utils.py will cache the established connection without re-connection logic. + # conn.close() + pass + + return res + def refresh(self, project: Optional[str] = None): if project: project_metadata = proto_registry_utils.get_project_metadata( diff --git a/sdk/python/feast/infra/utils/snowflake/registry/snowflake_registry_table.py b/sdk/python/feast/infra/utils/snowflake/registry/snowflake_registry_table.py new file mode 100644 index 0000000000..d24fbc27ec --- /dev/null +++ b/sdk/python/feast/infra/utils/snowflake/registry/snowflake_registry_table.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +""" +The table names and column types are following the creation detail listed +in "snowflake_table_creation.sql". + +Snowflake Reference: +1, ResultMetadata: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-resultmetadata-object +2, Type Codes: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-type-codes +---------------------------------------------- +type_code String Representation Data Type +0 FIXED NUMBER/INT +1 REAL REAL +2 TEXT VARCHAR/STRING +3 DATE DATE +4 TIMESTAMP TIMESTAMP +5 VARIANT VARIANT +6 TIMESTAMP_LTZ TIMESTAMP_LTZ +7 TIMESTAMP_TZ TIMESTAMP_TZ +8 TIMESTAMP_NTZ TIMESTAMP_TZ +9 OBJECT OBJECT +10 ARRAY ARRAY +11 BINARY BINARY +12 TIME TIME +13 BOOLEAN BOOLEAN +---------------------------------------------- + +(last update: 2023-11-30) + +""" + +snowflake_registry_table_names_and_column_types = { + "DATA_SOURCES": { + "DATA_SOURCE_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "DATA_SOURCE_PROTO": {"type_code": 11, "type": "BINARY"}, + }, + "ENTITIES": { + "ENTITY_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "ENTITY_PROTO": {"type_code": 11, "type": "BINARY"}, + }, + "FEAST_METADATA": { + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "METADATA_KEY": {"type_code": 2, "type": "VARCHAR"}, + "METADATA_VALUE": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + }, + "FEATURE_SERVICES": { + "FEATURE_SERVICE_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "FEATURE_SERVICE_PROTO": {"type_code": 11, "type": "BINARY"}, + }, + "FEATURE_VIEWS": { + "FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, + "MATERIALIZED_INTERVALS": {"type_code": 11, "type": "BINARY"}, + "USER_METADATA": {"type_code": 11, "type": "BINARY"}, + }, + "MANAGED_INFRA": { + "INFRA_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "INFRA_PROTO": {"type_code": 11, "type": "BINARY"}, + }, + "ON_DEMAND_FEATURE_VIEWS": { + "ON_DEMAND_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "ON_DEMAND_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, + "USER_METADATA": {"type_code": 11, "type": "BINARY"}, + }, + "REQUEST_FEATURE_VIEWS": { + "REQUEST_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "REQUEST_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, + "USER_METADATA": {"type_code": 11, "type": "BINARY"}, + }, + "SAVED_DATASETS": { + "SAVED_DATASET_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "SAVED_DATASET_PROTO": {"type_code": 11, "type": "BINARY"}, + }, + "STREAM_FEATURE_VIEWS": { + "STREAM_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "STREAM_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"}, + "USER_METADATA": {"type_code": 11, "type": "BINARY"}, + }, + "VALIDATION_REFERENCES": { + "VALIDATION_REFERENCE_NAME": {"type_code": 2, "type": "VARCHAR"}, + "PROJECT_ID": {"type_code": 2, "type": "VARCHAR"}, + "LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"}, + "VALIDATION_REFERENCE_PROTO": {"type_code": 11, "type": "BINARY"}, + }, +} diff --git a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py index a4cda89a6f..3a56619bdb 100644 --- a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py +++ b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py @@ -49,19 +49,19 @@ def __init__(self, config: str, autocommit=True): def __enter__(self): - assert self.config.type in [ + assert self.config.type in { "snowflake.registry", "snowflake.offline", "snowflake.engine", "snowflake.online", - ] + } if self.config.type not in _cache: if self.config.type == "snowflake.registry": config_header = "connections.feast_registry" elif self.config.type == "snowflake.offline": config_header = "connections.feast_offline_store" - if self.config.type == "snowflake.engine": + elif self.config.type == "snowflake.engine": config_header = "connections.feast_batch_engine" elif self.config.type == "snowflake.online": config_header = "connections.feast_online_store" @@ -113,11 +113,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): def assert_snowflake_feature_names(feature_view: FeatureView) -> None: for feature in feature_view.features: - assert feature.name not in [ + assert feature.name not in { "entity_key", "feature_name", "feature_value", - ], f"Feature Name: {feature.name} is a protected name to ensure query stability" + }, f"Feature Name: {feature.name} is a protected name to ensure query stability" return None