From 630049949d089249742b219635c069c52cf06db8 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 12 Jun 2024 12:04:22 -0500 Subject: [PATCH] feat(ingest): add snowflake-summary source (#10642) --- metadata-ingestion/scripts/docgen.py | 89 ++++++--- metadata-ingestion/setup.py | 1 + .../source/snowflake/snowflake_summary.py | 179 ++++++++++++++++++ .../source/snowflake/snowflake_utils.py | 2 +- .../source_config/usage/snowflake_usage.py | 2 +- 5 files changed, 247 insertions(+), 26 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_summary.py diff --git a/metadata-ingestion/scripts/docgen.py b/metadata-ingestion/scripts/docgen.py index d240f8e16c7700..797a2f698c2f40 100644 --- a/metadata-ingestion/scripts/docgen.py +++ b/metadata-ingestion/scripts/docgen.py @@ -583,6 +583,12 @@ def generate( if source and source != plugin_name: continue + if plugin_name in { + "snowflake-summary", + }: + logger.info(f"Skipping {plugin_name} as it is on the deny list") + continue + metrics["plugins"]["discovered"] = metrics["plugins"]["discovered"] + 1 # type: ignore # We want to attempt to load all plugins before printing a summary. source_type = None @@ -885,11 +891,14 @@ def generate( os.makedirs(source_dir, exist_ok=True) doc_file = f"{source_dir}/lineage-feature-guide.md" with open(doc_file, "w+") as f: - f.write("import FeatureAvailability from '@site/src/components/FeatureAvailability';\n\n") + f.write( + "import FeatureAvailability from '@site/src/components/FeatureAvailability';\n\n" + ) f.write(f"# About DataHub Lineage\n\n") f.write("\n") - f.write(""" + f.write( + """ Data lineage is a **map that shows how data flows through your organization.** It details where your data originates, how it travels, and where it ultimately ends up. This can happen within a single system (like data moving between Snowflake tables) or across various platforms. @@ -979,24 +988,27 @@ def generate( ### Automatic Lineage Extraction Support -This is a summary of automatic lineage extraciton support in our data source. Please refer to the **Important Capabilities** table in the source documentation. Note that even if the source does not support automatic extraction, you can still add lineage manually using our API & SDKs.\n""") +This is a summary of automatic lineage extraciton support in our data source. Please refer to the **Important Capabilities** table in the source documentation. Note that even if the source does not support automatic extraction, you can still add lineage manually using our API & SDKs.\n""" + ) - f.write("\n| Source | Table-Level Lineage | Column-Level Lineage | Related Configs |\n") + f.write( + "\n| Source | Table-Level Lineage | Column-Level Lineage | Related Configs |\n" + ) f.write("| ---------- | ------ | ----- |----- |\n") for platform_id, platform_docs in sorted( - source_documentation.items(), - key=lambda x: (x[1]["name"].casefold(), x[1]["name"]) - if "name" in x[1] - else (x[0].casefold(), x[0]), + source_documentation.items(), + key=lambda x: (x[1]["name"].casefold(), x[1]["name"]) + if "name" in x[1] + else (x[0].casefold(), x[0]), ): for plugin, plugin_docs in sorted( - platform_docs["plugins"].items(), - key=lambda x: str(x[1].get("doc_order")) - if x[1].get("doc_order") - else x[0], + platform_docs["plugins"].items(), + key=lambda x: str(x[1].get("doc_order")) + if x[1].get("doc_order") + else x[0], ): - platform_name = platform_docs['name'] + platform_name = platform_docs["name"] if len(platform_docs["plugins"].keys()) > 1: # We only need to show this if there are multiple modules. platform_name = f"{platform_name} `{plugin}`" @@ -1004,33 +1016,60 @@ def generate( # Initialize variables table_level_supported = "❌" column_level_supported = "❌" - config_names = '' + config_names = "" if "capabilities" in plugin_docs: plugin_capabilities = plugin_docs["capabilities"] for cap_setting in plugin_capabilities: capability_text = get_capability_text(cap_setting.capability) - capability_supported = get_capability_supported_badge(cap_setting.supported) + capability_supported = get_capability_supported_badge( + cap_setting.supported + ) - if capability_text == "Table-Level Lineage" and capability_supported == "✅": + if ( + capability_text == "Table-Level Lineage" + and capability_supported == "✅" + ): table_level_supported = "✅" - if capability_text == "Column-level Lineage" and capability_supported == "✅": + if ( + capability_text == "Column-level Lineage" + and capability_supported == "✅" + ): column_level_supported = "✅" if not (table_level_supported == "❌" and column_level_supported == "❌"): if "config_schema" in plugin_docs: - config_properties = json.loads(plugin_docs['config_schema']).get('properties', {}) - config_names = '
'.join( - [f'- {property_name}' for property_name in config_properties if 'lineage' in property_name]) - lineage_not_applicable_sources = ['azure-ad', 'csv', 'demo-data', 'dynamodb', 'iceberg', 'json-schema', 'ldap', 'openapi', 'pulsar', 'sqlalchemy' ] - if platform_id not in lineage_not_applicable_sources : + config_properties = json.loads( + plugin_docs["config_schema"] + ).get("properties", {}) + config_names = "
".join( + [ + f"- {property_name}" + for property_name in config_properties + if "lineage" in property_name + ] + ) + lineage_not_applicable_sources = [ + "azure-ad", + "csv", + "demo-data", + "dynamodb", + "iceberg", + "json-schema", + "ldap", + "openapi", + "pulsar", + "sqlalchemy", + ] + if platform_id not in lineage_not_applicable_sources: f.write( f"| [{platform_name}](../../generated/ingestion/sources/{platform_id}.md) | {table_level_supported} | {column_level_supported} | {config_names}|\n" ) - f.write(""" + f.write( + """ ### SQL Parser Lineage Extraction @@ -1054,10 +1093,12 @@ def generate( - [Data in Context: Lineage Explorer in DataHub](https://blog.datahubproject.io/data-in-context-lineage-explorer-in-datahub-a53a9a476dc4) - [Harnessing the Power of Data Lineage with DataHub](https://blog.datahubproject.io/harnessing-the-power-of-data-lineage-with-datahub-ad086358dec4) - [Data Lineage: What It Is And Why It Matters](https://blog.datahubproject.io/data-lineage-what-it-is-and-why-it-matters-1a8d9846f0bd) - """) + """ + ) print("Lineage Documentation Generation Complete") + if __name__ == "__main__": logger.setLevel("INFO") generate() diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index ade1e1a6ee5ba4..bb2e5d468143bb 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -653,6 +653,7 @@ "redshift = datahub.ingestion.source.redshift.redshift:RedshiftSource", "slack = datahub.ingestion.source.slack.slack:SlackSource", "snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source", + "snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource", "superset = datahub.ingestion.source.superset:SupersetSource", "tableau = datahub.ingestion.source.tableau:TableauSource", "openapi = datahub.ingestion.source.openapi:OpenApiSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_summary.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_summary.py new file mode 100644 index 00000000000000..ef08866ccd3ede --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_summary.py @@ -0,0 +1,179 @@ +import dataclasses +import logging +from collections import defaultdict +from typing import Dict, Iterable, List, Optional + +import pydantic +from snowflake.connector import SnowflakeConnection + +from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.source_common import LowerCaseDatasetUrnConfigMixin +from datahub.configuration.time_window_config import BaseTimeWindowConfig +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import SupportStatus, config_class, support_status +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.snowflake.snowflake_schema import ( + SnowflakeDatabase, + SnowflakeDataDictionary, +) +from datahub.ingestion.source.snowflake.snowflake_utils import ( + SnowflakeCommonMixin, + SnowflakeConnectionMixin, + SnowflakeQueryMixin, +) +from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source +from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig +from datahub.ingestion.source_report.time_window import BaseTimeWindowReport +from datahub.utilities.lossy_collections import LossyList + + +class SnowflakeSummaryConfig( + BaseSnowflakeConfig, BaseTimeWindowConfig, LowerCaseDatasetUrnConfigMixin +): + + # Copied from SnowflakeConfig. + database_pattern: AllowDenyPattern = AllowDenyPattern( + deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"] + ) + schema_pattern: AllowDenyPattern = pydantic.Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for schemas to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'", + ) + table_pattern: AllowDenyPattern = pydantic.Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + ) + view_pattern: AllowDenyPattern = pydantic.Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + ) + match_fully_qualified_names: bool = pydantic.Field( + default=True, + description="Whether `schema_pattern` is matched against fully qualified schema name `.`.", + ) + + +@dataclasses.dataclass +class SnowflakeSummaryReport(SourceReport, BaseTimeWindowReport): + filtered: LossyList[str] = dataclasses.field(default_factory=LossyList) + + num_get_tables_for_schema_queries: int = 0 + num_get_views_for_schema_queries: int = 0 + + schema_counters: Dict[str, int] = dataclasses.field(default_factory=dict) + object_counters: Dict[str, Dict[str, int]] = dataclasses.field( + default_factory=lambda: defaultdict(lambda: defaultdict(int)) + ) + + num_snowflake_queries: Optional[int] = None + num_snowflake_mutations: Optional[int] = None + + def report_dropped(self, ent_name: str) -> None: + self.filtered.append(ent_name) + + def report_entity_scanned(self, name: str, ent_type: str = "table") -> None: + pass + + +@config_class(SnowflakeSummaryConfig) +@support_status(SupportStatus.INCUBATING) +class SnowflakeSummarySource( + SnowflakeQueryMixin, + SnowflakeConnectionMixin, + SnowflakeCommonMixin, + Source, +): + def __init__(self, ctx: PipelineContext, config: SnowflakeSummaryConfig): + super().__init__(ctx) + self.config: SnowflakeSummaryConfig = config + self.report: SnowflakeSummaryReport = SnowflakeSummaryReport() + + self.data_dictionary = SnowflakeDataDictionary() + self.connection: Optional[SnowflakeConnection] = None + self.logger = logging.getLogger(__name__) + + def create_connection(self) -> Optional[SnowflakeConnection]: + # TODO: Eventually we'll want to use the implementation from SnowflakeConnectionMixin, + # since it has better error reporting. + # return super().create_connection() + return self.config.get_connection() + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + self.connection = self.create_connection() + if self.connection is None: + return + + self.data_dictionary.set_connection(self.connection) + + # Databases. + databases: List[SnowflakeDatabase] = [] + for database in self.get_databases() or []: # type: ignore + # TODO: Support database_patterns. + if not self.config.database_pattern.allowed(database.name): + self.report.report_dropped(f"{database.name}.*") + else: + databases.append(database) + + # Schemas. + for database in databases: + self.fetch_schemas_for_database(database, database.name) # type: ignore + + self.report.schema_counters[database.name] = len(database.schemas) + + for schema in database.schemas: + # Tables/views. + tables = self.fetch_tables_for_schema( # type: ignore + schema, database.name, schema.name + ) + views = self.fetch_views_for_schema( # type: ignore + schema, database.name, schema.name + ) + + self.report.object_counters[database.name][schema.name] = len( + tables + ) + len(views) + + # Queries for usage. + start_time_millis = self.config.start_time.timestamp() * 1000 + end_time_millis = self.config.end_time.timestamp() * 1000 + for row in self.query( + f"""\ +SELECT COUNT(*) AS CNT +FROM snowflake.account_usage.query_history +WHERE query_history.start_time >= to_timestamp_ltz({start_time_millis}, 3) + AND query_history.start_time < to_timestamp_ltz({end_time_millis}, 3) +""" + ): + self.report.num_snowflake_queries = row["CNT"] + + # Queries for lineage/operations. + for row in self.query( + f"""\ +SELECT COUNT(*) AS CNT +FROM + snowflake.account_usage.access_history access_history +WHERE query_start_time >= to_timestamp_ltz({start_time_millis}, 3) + AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) + AND access_history.objects_modified is not null + AND ARRAY_SIZE(access_history.objects_modified) > 0 +""" + ): + self.report.num_snowflake_mutations = row["CNT"] + + # This source doesn't produce any metadata itself. All important information goes into the report. + yield from [] + + # This is a bit of a hack, but lets us reuse the code from the main ingestion source. + # Mypy doesn't really know how to deal with it though, which is why we have all these + # type ignore comments. + get_databases = SnowflakeV2Source.get_databases + get_databases_from_ischema = SnowflakeV2Source.get_databases_from_ischema + fetch_schemas_for_database = SnowflakeV2Source.fetch_schemas_for_database + fetch_tables_for_schema = SnowflakeV2Source.fetch_tables_for_schema + fetch_views_for_schema = SnowflakeV2Source.fetch_views_for_schema + get_tables_for_schema = SnowflakeV2Source.get_tables_for_schema + get_views_for_schema = SnowflakeV2Source.get_views_for_schema + + def get_report(self) -> SnowflakeSummaryReport: + return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index 5708b9f168c51f..adcc4ba09d8c9e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -37,7 +37,7 @@ def get_connection(self) -> SnowflakeConnection: class SnowflakeQueryMixin: def query(self: SnowflakeQueryProtocol, query: str) -> Any: try: - self.logger.debug(f"Query : {query}") + self.logger.debug(f"Query : {query}", stacklevel=2) resp = self.get_connection().cursor(DictCursor).execute(query) return resp diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py index 747bde0a8b5632..a0e79f62240ee3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py @@ -11,7 +11,7 @@ class SnowflakeUsageConfig(BaseUsageConfig): email_domain: Optional[str] = pydantic.Field( default=None, - description="Email domain of your organisation so users can be displayed on UI appropriately.", + description="Email domain of your organization so users can be displayed on UI appropriately.", ) apply_view_usage_to_tables: bool = pydantic.Field( default=False,