diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 8f4f51a1ffe467..4a00f192568bca 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -5,7 +5,17 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ## Next ### Breaking Changes +- #5240 `lineage_client_project_id` in `bigquery` source is removed. Use `storage_project_id` instead. +### Potential Downtime + +### Deprecations + +### Other notable Changes + +## `v0.8.39` + +### Breaking Changes - Refactored the `health` field of the `Dataset` GraphQL Type to be of type **list of HealthStatus** (was type **HealthStatus**). See [this PR](https://github.com/datahub-project/datahub/pull/5222/files) for more details. ### Potential Downtime diff --git a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml index f5d57f36935ecc..377af98598d981 100644 --- a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml @@ -40,7 +40,7 @@ source: # - "schema.table.column" # deny: # - "*.*.*" - #lineage_client_project_id: project-id-1234567 + #storage_project_id: project-id-1234567 ## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 781585c2a72ca4..a486b3bb545aaf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -731,6 +731,7 @@ def generate_profiles( requests: List[GEProfilerRequest], max_workers: int, platform: Optional[str] = None, + profiler_args: Optional[Dict] = None, ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: with PerfTimer() as timer, concurrent.futures.ThreadPoolExecutor( max_workers=max_workers @@ -758,6 +759,7 @@ def generate_profiles( query_combiner, request, platform=platform, + profiler_args=profiler_args, ) for request in requests ] @@ -817,11 +819,13 @@ def _generate_profile_from_request( query_combiner: SQLAlchemyQueryCombiner, request: GEProfilerRequest, platform: Optional[str] = None, + profiler_args: Optional[Dict] = None, ) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]: return request, self._generate_single_profile( query_combiner=query_combiner, pretty_name=request.pretty_name, platform=platform, + profiler_args=profiler_args, **request.batch_kwargs, ) @@ -844,8 +848,12 @@ def _generate_single_profile( partition: Optional[str] = None, custom_sql: Optional[str] = None, platform: Optional[str] = None, + profiler_args: Optional[Dict] = None, **kwargs: Any, ) -> Optional[DatasetProfileClass]: + logger.debug( + f"Received single profile request for {pretty_name} for {schema}, {table}, {custom_sql}" + ) bigquery_temp_table: Optional[str] = None ge_config = { @@ -858,16 +866,19 @@ def _generate_single_profile( # We have to create temporary tables if offset or limit or custom sql is set on Bigquery if custom_sql or self.config.limit or self.config.offset: + if profiler_args is not None: + temp_table_db = profiler_args.get("temp_table_db", schema) + if platform is not None and platform == "bigquery": + ge_config["schema"] = temp_table_db + if self.config.bigquery_temp_table_schema: - bigquery_temp_table = ( - f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" - ) + bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" else: assert table table_parts = table.split(".") if len(table_parts) == 2: bigquery_temp_table = ( - f"{schema}.{table_parts[0]}.ge-temp-{uuid.uuid4()}" + f"{temp_table_db}.{table_parts[0]}.ge-temp-{uuid.uuid4()}" ) # With this pr there is no option anymore to set the bigquery temp table: @@ -941,6 +952,7 @@ def _get_ge_dataset( # }, # ) + logger.debug(f"Got pretty_name={pretty_name}, kwargs={batch_kwargs}") expectation_suite_name = ge_context.datasource_name + "." + pretty_name ge_context.data_context.create_expectation_suite( @@ -955,4 +967,15 @@ def _get_ge_dataset( **batch_kwargs, }, ) + if platform is not None and platform == "bigquery": + # This is done as GE makes the name as DATASET.TABLE + # but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups + logger.debug(f"Setting table name to be {pretty_name}") + batch._table = sa.text(pretty_name) + name_parts = pretty_name.split(".") + if len(name_parts) != 3: + logger.error( + f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts." + ) + return batch diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 729199dc37549c..2dd2f0efd654ba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -111,7 +111,7 @@ class GEProfilingConfig(ConfigModel): partition_profiling_enabled: bool = Field(default=True, description="") bigquery_temp_table_schema: Optional[str] = Field( default=None, - description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a schema where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).", + description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a dataset where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).", ) partition_datetime: Optional[datetime.datetime] = Field( default=None, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 83396cea68184f..cbbf3e561398ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -17,6 +17,7 @@ from google.cloud.logging_v2.client import Client as GCPLoggingClient from ratelimiter import RateLimiter from sqlalchemy import create_engine, inspect +from sqlalchemy.engine import Engine from sqlalchemy.engine.reflection import Inspector from datahub.emitter import mce_builder @@ -36,6 +37,7 @@ support_status, ) from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler from datahub.ingestion.source.sql.sql_common import ( SQLAlchemyConfig, SQLAlchemySource, @@ -332,8 +334,17 @@ def __init__(self, config, ctx): self.partition_info: Dict[str, str] = dict() atexit.register(cleanup, config) - def get_db_name(self, inspector: Inspector = None) -> str: - if self.config.project_id: + def get_db_name( + self, inspector: Inspector = None, for_sql_queries: bool = True + ) -> str: + """ + for_sql_queries - Used mainly for multi-project setups with different permissions + - should be set to True if this is to be used to run sql queries + - should be set to False if this is to inspect contents and not run sql queries + """ + if for_sql_queries and self.config.storage_project_id: + return self.config.storage_project_id + elif self.config.project_id: return self.config.project_id else: return self._get_project_id(inspector) @@ -342,13 +353,10 @@ def _compute_big_query_lineage(self) -> None: if not self.config.include_table_lineage: return - lineage_client_project_id = self._get_lineage_client_project_id() if self.config.use_exported_bigquery_audit_metadata: - self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata( - lineage_client_project_id - ) + self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata() else: - self._compute_bigquery_lineage_via_gcp_logging(lineage_client_project_id) + self._compute_bigquery_lineage_via_gcp_logging() if self.lineage_metadata is None: self.lineage_metadata = {} @@ -359,14 +367,11 @@ def _compute_big_query_lineage(self) -> None: ) logger.debug(f"lineage metadata is {self.lineage_metadata}") - def _compute_bigquery_lineage_via_gcp_logging( - self, lineage_client_project_id: Optional[str] - ) -> None: + def _compute_bigquery_lineage_via_gcp_logging(self) -> None: + project_id = self.get_db_name() logger.info("Populating lineage info via GCP audit logs") try: - _clients: List[GCPLoggingClient] = self._make_bigquery_client( - lineage_client_project_id - ) + _clients: List[GCPLoggingClient] = self._make_gcp_logging_client(project_id) template: str = BQ_FILTER_RULE_TEMPLATE if self.config.use_v2_audit_metadata: @@ -386,12 +391,11 @@ def _compute_bigquery_lineage_via_gcp_logging( f"Error was {e}", ) - def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata( - self, lineage_client_project_id: Optional[str] - ) -> None: + def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None: + project_id = self.get_db_name(for_sql_queries=True) logger.info("Populating lineage info via exported GCP audit logs") try: - _client: BigQueryClient = BigQueryClient(project=lineage_client_project_id) + _client: BigQueryClient = BigQueryClient(project=project_id) exported_bigquery_audit_metadata: Iterable[ BigQueryAuditMetadata ] = self._get_exported_bigquery_audit_metadata(_client) @@ -408,28 +412,18 @@ def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata( f"Error: {e}", ) - def _make_bigquery_client( - self, lineage_client_project_id: Optional[str] + def _make_gcp_logging_client( + self, project_id: Optional[str] ) -> List[GCPLoggingClient]: # See https://github.com/googleapis/google-cloud-python/issues/2674 for # why we disable gRPC here. client_options = self.config.extra_client_options.copy() client_options["_use_grpc"] = False - if lineage_client_project_id is not None: - return [ - GCPLoggingClient(**client_options, project=lineage_client_project_id) - ] + if project_id is not None: + return [GCPLoggingClient(**client_options, project=project_id)] else: return [GCPLoggingClient(**client_options)] - def _get_lineage_client_project_id(self) -> Optional[str]: - project_id: Optional[str] = ( - self.config.lineage_client_project_id - if self.config.lineage_client_project_id - else self.config.project_id - ) - return project_id - def _get_bigquery_log_entries( self, clients: List[GCPLoggingClient], @@ -665,8 +659,7 @@ def is_table_partitioned( if database: project_id = database else: - url = self.config.get_sql_alchemy_url() - engine = create_engine(url, **self.config.options) + engine = self._get_engine(for_run_sql=False) with engine.connect() as con: inspector = inspect(con) project_id = self.get_db_name(inspector) @@ -675,8 +668,8 @@ def is_table_partitioned( def get_latest_partition( self, schema: str, table: str ) -> Optional[BigQueryPartitionColumn]: - url = self.config.get_sql_alchemy_url() - engine = create_engine(url, **self.config.options) + logger.debug(f"get_latest_partition for {schema} and {table}") + engine = self._get_engine(for_run_sql=True) with engine.connect() as con: inspector = inspect(con) project_id = self.get_db_name(inspector) @@ -685,7 +678,9 @@ def get_latest_partition( ): return None sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( - project_id=project_id, schema=schema, table=table + project_id=self.get_db_name(inspector, for_sql_queries=True), + schema=schema, + table=table, ) result = con.execute(sql) # Bigquery only supports one partition column @@ -709,8 +704,7 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool: table_name, shard = self.get_shard_from_table(table) if shard: logger.debug(f"{table_name} is sharded and shard id is: {shard}") - url = self.config.get_sql_alchemy_url() - engine = create_engine(url, **self.config.options) + engine = self._get_engine(for_run_sql=True) if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids: with engine.connect() as con: sql = BQ_GET_LATEST_SHARD.format( @@ -734,9 +728,13 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool: else: return True + def _get_engine(self, for_run_sql: bool) -> Engine: + url = self.config.get_sql_alchemy_url(for_run_sql=for_run_sql) + logger.debug(f"sql_alchemy_url={url}") + return create_engine(url, **self.config.options) + def add_information_for_schema(self, inspector: Inspector, schema: str) -> None: - url = self.config.get_sql_alchemy_url() - engine = create_engine(url, **self.config.options) + engine = self._get_engine(for_run_sql=True) project_id = self.get_db_name(inspector) with engine.connect() as con: inspector = inspect(con) @@ -821,6 +819,22 @@ def generate_partition_profiler_query( return shard, None return None, None + def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": + logger.debug("Getting profiler instance from bigquery") + engine = self._get_engine(for_run_sql=True) + with engine.connect() as conn: + inspector = inspect(conn) + + return DatahubGEProfiler( + conn=inspector.bind, + report=self.report, + config=self.config.profiling, + platform=self.platform, + ) + + def get_profile_args(self) -> Dict: + return {"temp_table_db": self.config.project_id} + def is_dataset_eligible_for_profiling( self, dataset_name: str, @@ -884,6 +898,7 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: isinstance(wu, SqlWorkUnit) and isinstance(wu.metadata, MetadataChangeEvent) and isinstance(wu.metadata.proposedSnapshot, DatasetSnapshot) + and self.config.include_table_lineage ): lineage_mcp = self.get_lineage_mcp(wu.metadata.proposedSnapshot.urn) if lineage_mcp is not None: @@ -977,7 +992,7 @@ def prepare_profiler_args( custom_sql: Optional[str] = None, ) -> dict: return dict( - schema=self.config.project_id, + schema=self.get_db_name(for_sql_queries=True), table=f"{schema}.{table}", partition=partition, custom_sql=custom_sql, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 5cb88f0dadda97..902e89bc051c6f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -713,7 +713,7 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: profiler = None profile_requests: List["GEProfilerRequest"] = [] if sql_config.profiling.enabled: - profiler = self._get_profiler_instance(inspector) + profiler = self.get_profiler_instance(inspector) db_name = self.get_db_name(inspector) yield from self.gen_database_containers(db_name) @@ -1309,7 +1309,7 @@ def add_table_to_schema_container( self.report.report_workunit(wu) yield wu - def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": + def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler return DatahubGEProfiler( @@ -1319,6 +1319,10 @@ def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": platform=self.platform, ) + def get_profile_args(self) -> Dict: + """Passed down to GE profiler""" + return {} + # Override if needed def generate_partition_profiler_query( self, schema: str, table: str, partition_datetime: Optional[datetime.datetime] @@ -1426,6 +1430,9 @@ def loop_profiler_requests( continue self.report.report_entity_profiled(dataset_name) + logger.debug( + f"Preparing profiling request for {schema}, {table}, {partition}" + ) yield GEProfilerRequest( pretty_name=dataset_name, batch_kwargs=self.prepare_profiler_args( @@ -1443,7 +1450,10 @@ def loop_profiler( platform: Optional[str] = None, ) -> Iterable[MetadataWorkUnit]: for request, profile in profiler.generate_profiles( - profile_requests, self.config.profiling.max_workers, platform=platform + profile_requests, + self.config.profiling.max_workers, + platform=platform, + profiler_args=self.get_profile_args(), ): if profile is None: continue diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py index 6132bcfb28816f..67f6e608fa22ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py @@ -18,11 +18,11 @@ class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig) scheme: str = "bigquery" project_id: Optional[str] = pydantic.Field( default=None, - description="Project ID to ingest from. If not specified, will infer from environment.", + description="Project ID where you have rights to run queries and create tables. If `storage_project_id` is not specified then it is assumed this is the same project where data is stored. If not specified, will infer from environment.", ) - lineage_client_project_id: Optional[str] = pydantic.Field( + storage_project_id: Optional[str] = pydantic.Field( default=None, - description="If you want to use a different ProjectId for the lineage collection you can set it here.", + description="If your data is stored in a different project where you don't have rights to run jobs and create tables then specify this field. The same service account must have read rights in this GCP project and write rights in `project_id`.", ) log_page_size: pydantic.PositiveInt = pydantic.Field( default=1000, @@ -75,7 +75,9 @@ def __init__(self, **data: Any): ) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path - def get_sql_alchemy_url(self): + def get_sql_alchemy_url(self, for_run_sql: bool = False) -> str: + if (not for_run_sql) and self.storage_project_id: + return f"{self.scheme}://{self.storage_project_id}" if self.project_id: return f"{self.scheme}://{self.project_id}" # When project_id is not set, we will attempt to detect the project ID @@ -94,6 +96,15 @@ def bigquery_doesnt_need_platform_instance(cls, v): def validate_that_bigquery_audit_metadata_datasets_is_correctly_configured( cls, values: Dict[str, Any] ) -> Dict[str, Any]: + profiling = values.get("profiling") + if ( + values.get("storage_project_id") + and profiling is not None + and not profiling.bigquery_temp_table_schema + ): + raise ConfigurationError( + "If storage project is being used with profiling then bigquery_temp_table_schema needs to be set to a dataset in the compute project" + ) if ( values.get("use_exported_bigquery_audit_metadata") and not values.get("use_v2_audit_metadata")