diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index d8eda98da422b9..11558aa74e9817 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -277,10 +277,12 @@ class SnowflakePrivilege: capabilities: List[SourceCapability] = [c.capability for c in SnowflakeV2Source.get_capabilities() if c.capability not in (SourceCapability.PLATFORM_INSTANCE, SourceCapability.DOMAINS, SourceCapability.DELETION_DETECTION)] # type: ignore cur = conn.query("select current_role()") - current_role = [row[0] for row in cur][0] + current_role = [row["CURRENT_ROLE()"] for row in cur][0] cur = conn.query("select current_secondary_roles()") - secondary_roles_str = json.loads([row[0] for row in cur][0])["roles"] + secondary_roles_str = json.loads( + [row["CURRENT_SECONDARY_ROLES()"] for row in cur][0] + )["roles"] secondary_roles = ( [] if secondary_roles_str == "" else secondary_roles_str.split(",") ) @@ -299,7 +301,9 @@ class SnowflakePrivilege: cur = conn.query(f'show grants to role "{role}"') for row in cur: privilege = SnowflakePrivilege( - privilege=row[1], object_type=row[2], object_name=row[3] + privilege=row["privilege"], + object_type=row["granted_on"], + object_name=row["name"], ) privileges.append(privilege) @@ -362,7 +366,7 @@ class SnowflakePrivilege: roles.append(privilege.object_name) cur = conn.query("select current_warehouse()") - current_warehouse = [row[0] for row in cur][0] + current_warehouse = [row["CURRENT_WAREHOUSE()"] for row in cur][0] default_failure_messages = { SourceCapability.SCHEMA_METADATA: "Either no tables exist or current role does not have permissions to access them", diff --git a/metadata-ingestion/tests/unit/test_snowflake_source.py b/metadata-ingestion/tests/unit/test_snowflake_source.py index 3353e74449c957..72b59a3a4e4938 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/test_snowflake_source.py @@ -274,21 +274,31 @@ def test_test_connection_basic_success(mock_connect): test_connection_helpers.assert_basic_connectivity_success(report) -def setup_mock_connect(mock_connect, query_results=None): - def default_query_results(query): +class MissingQueryMock(Exception): + pass + + +def setup_mock_connect(mock_connect, extra_query_results=None): + def query_results(query): + if extra_query_results is not None: + try: + return extra_query_results(query) + except MissingQueryMock: + pass + if query == "select current_role()": - return [("TEST_ROLE",)] + return [{"CURRENT_ROLE()": "TEST_ROLE"}] elif query == "select current_secondary_roles()": - return [('{"roles":"","value":""}',)] + return [{"CURRENT_SECONDARY_ROLES()": '{"roles":"","value":""}'}] elif query == "select current_warehouse()": - return [("TEST_WAREHOUSE")] - raise ValueError(f"Unexpected query: {query}") + return [{"CURRENT_WAREHOUSE()": "TEST_WAREHOUSE"}] + elif query == 'show grants to role "PUBLIC"': + return [] + raise MissingQueryMock(f"Unexpected query: {query}") connection_mock = MagicMock() cursor_mock = MagicMock() - cursor_mock.execute.side_effect = ( - query_results if query_results is not None else default_query_results - ) + cursor_mock.execute.side_effect = query_results connection_mock.cursor.return_value = cursor_mock mock_connect.return_value = connection_mock @@ -296,21 +306,11 @@ def default_query_results(query): @patch("snowflake.connector.connect") def test_test_connection_no_warehouse(mock_connect): def query_results(query): - if query == "select current_role()": - return [("TEST_ROLE",)] - elif query == "select current_secondary_roles()": - return [('{"roles":"","value":""}',)] - elif query == "select current_warehouse()": - return [(None,)] + if query == "select current_warehouse()": + return [{"CURRENT_WAREHOUSE()": None}] elif query == 'show grants to role "TEST_ROLE"': - return [ - ("", "USAGE", "DATABASE", "DB1"), - ("", "USAGE", "SCHEMA", "DB1.SCHEMA1"), - ("", "REFERENCES", "TABLE", "DB1.SCHEMA1.TABLE1"), - ] - elif query == 'show grants to role "PUBLIC"': - return [] - raise ValueError(f"Unexpected query: {query}") + return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}] + raise MissingQueryMock(f"Unexpected query: {query}") setup_mock_connect(mock_connect, query_results) report = test_connection_helpers.run_test_connection( @@ -330,17 +330,9 @@ def query_results(query): @patch("snowflake.connector.connect") def test_test_connection_capability_schema_failure(mock_connect): def query_results(query): - if query == "select current_role()": - return [("TEST_ROLE",)] - elif query == "select current_secondary_roles()": - return [('{"roles":"","value":""}',)] - elif query == "select current_warehouse()": - return [("TEST_WAREHOUSE",)] - elif query == 'show grants to role "TEST_ROLE"': - return [("", "USAGE", "DATABASE", "DB1")] - elif query == 'show grants to role "PUBLIC"': - return [] - raise ValueError(f"Unexpected query: {query}") + if query == 'show grants to role "TEST_ROLE"': + return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}] + raise MissingQueryMock(f"Unexpected query: {query}") setup_mock_connect(mock_connect, query_results) @@ -361,21 +353,17 @@ def query_results(query): @patch("snowflake.connector.connect") def test_test_connection_capability_schema_success(mock_connect): def query_results(query): - if query == "select current_role()": - return [("TEST_ROLE",)] - elif query == "select current_secondary_roles()": - return [('{"roles":"","value":""}',)] - elif query == "select current_warehouse()": - return [("TEST_WAREHOUSE")] - elif query == 'show grants to role "TEST_ROLE"': + if query == 'show grants to role "TEST_ROLE"': return [ - ["", "USAGE", "DATABASE", "DB1"], - ["", "USAGE", "SCHEMA", "DB1.SCHEMA1"], - ["", "REFERENCES", "TABLE", "DB1.SCHEMA1.TABLE1"], + {"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}, + {"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"}, + { + "privilege": "REFERENCES", + "granted_on": "TABLE", + "name": "DB1.SCHEMA1.TABLE1", + }, ] - elif query == 'show grants to role "PUBLIC"': - return [] - raise ValueError(f"Unexpected query: {query}") + raise MissingQueryMock(f"Unexpected query: {query}") setup_mock_connect(mock_connect, query_results) @@ -397,30 +385,38 @@ def query_results(query): @patch("snowflake.connector.connect") def test_test_connection_capability_all_success(mock_connect): def query_results(query): - if query == "select current_role()": - return [("TEST_ROLE",)] - elif query == "select current_secondary_roles()": - return [('{"roles":"","value":""}',)] - elif query == "select current_warehouse()": - return [("TEST_WAREHOUSE")] - elif query == 'show grants to role "TEST_ROLE"': + if query == 'show grants to role "TEST_ROLE"': return [ - ("", "USAGE", "DATABASE", "DB1"), - ("", "USAGE", "SCHEMA", "DB1.SCHEMA1"), - ("", "SELECT", "TABLE", "DB1.SCHEMA1.TABLE1"), - ("", "USAGE", "ROLE", "TEST_USAGE_ROLE"), + {"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}, + {"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"}, + { + "privilege": "SELECT", + "granted_on": "TABLE", + "name": "DB1.SCHEMA1.TABLE1", + }, + {"privilege": "USAGE", "granted_on": "ROLE", "name": "TEST_USAGE_ROLE"}, ] - elif query == 'show grants to role "PUBLIC"': - return [] elif query == 'show grants to role "TEST_USAGE_ROLE"': return [ - ["", "USAGE", "DATABASE", "SNOWFLAKE"], - ["", "USAGE", "SCHEMA", "ACCOUNT_USAGE"], - ["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY"], - ["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY"], - ["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES"], + {"privilege": "USAGE", "granted_on": "DATABASE", "name": "SNOWFLAKE"}, + {"privilege": "USAGE", "granted_on": "SCHEMA", "name": "ACCOUNT_USAGE"}, + { + "privilege": "USAGE", + "granted_on": "VIEW", + "name": "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY", + }, + { + "privilege": "USAGE", + "granted_on": "VIEW", + "name": "SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY", + }, + { + "privilege": "USAGE", + "granted_on": "VIEW", + "name": "SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES", + }, ] - raise ValueError(f"Unexpected query: {query}") + raise MissingQueryMock(f"Unexpected query: {query}") setup_mock_connect(mock_connect, query_results)