From ef617784b150ba65dc96e57c29907584952a145c Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Sat, 5 Feb 2022 00:27:44 +0000 Subject: [PATCH 1/7] Mark data lake metadata source as Beta (#4061) --- metadata-ingestion/source_docs/data_lake.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/metadata-ingestion/source_docs/data_lake.md b/metadata-ingestion/source_docs/data_lake.md index 7d8f9a34edcdfd..241e0e291c2657 100644 --- a/metadata-ingestion/source_docs/data_lake.md +++ b/metadata-ingestion/source_docs/data_lake.md @@ -2,6 +2,12 @@ For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). +:::caution + +This source is in **Beta** and under active development. Not yet considered ready for production. + +::: + ## Setup To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Because the files are read using PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed. From 6fe062f37cc75373ac725d767fbf1f5ca1e8ab32 Mon Sep 17 00:00:00 2001 From: Kevin Hu <6051736+kevinhu@users.noreply.github.com> Date: Sat, 5 Feb 2022 00:11:04 -0500 Subject: [PATCH 2/7] feat(ingest): log CLI invocations and completions (#4062) --- .../src/datahub/telemetry/telemetry.py | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 87306766478c45..03fca7663a3cf5 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -3,6 +3,7 @@ import logging import os import platform +import sys import uuid from functools import wraps from pathlib import Path @@ -168,11 +169,45 @@ def ping( T = TypeVar("T") +def get_full_class_name(obj): + module = obj.__class__.__module__ + if module is None or module == str.__class__.__module__: + return obj.__class__.__name__ + return module + "." + obj.__class__.__name__ + + def with_telemetry(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: - res = func(*args, **kwargs) - telemetry_instance.ping(func.__module__, func.__name__) - return res + + category = func.__module__ + action = func.__name__ + + telemetry_instance.ping(category, action, "started") + try: + res = func(*args, **kwargs) + telemetry_instance.ping(category, action, "completed") + return res + # Catch general exceptions + except Exception as e: + telemetry_instance.ping(category, action, f"error:{get_full_class_name(e)}") + raise e + # System exits (used in ingestion and Docker commands) are not caught by the exception handler, + # so we need to catch them here. + except SystemExit as e: + # Forward successful exits + if e.code == 0: + telemetry_instance.ping(category, action, "completed") + sys.exit(0) + # Report failed exits + else: + telemetry_instance.ping( + category, action, f"error:{get_full_class_name(e)}" + ) + sys.exit(e.code) + # Catch SIGINTs + except KeyboardInterrupt: + telemetry_instance.ping(category, action, "cancelled") + sys.exit(0) return wrapper From 1a2f75b1e6da7c7c8751094aa6213229a1903592 Mon Sep 17 00:00:00 2001 From: Kevin Hu <6051736+kevinhu@users.noreply.github.com> Date: Sat, 5 Feb 2022 00:12:09 -0500 Subject: [PATCH 3/7] fix(ingest): data-lake - add aws dependencies (#4060) --- metadata-ingestion/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 161cf2c6057462..dcd637809700df 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -106,7 +106,7 @@ def get_long_description(): "bigquery": sql_common | bigquery_common | {"pybigquery >= 0.6.0"}, "bigquery-usage": bigquery_common | {"cachetools"}, "datahub-business-glossary": set(), - "data-lake": {"pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"}, + "data-lake": {*aws_common, "pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"}, "dbt": {"requests"}, "druid": sql_common | {"pydruid>=0.6.2"}, "elasticsearch": {"elasticsearch"}, From 61db4ed1526542d74b1019dd3f9c8215309ae081 Mon Sep 17 00:00:00 2001 From: Aditya Radhakrishnan Date: Fri, 4 Feb 2022 23:24:20 -0800 Subject: [PATCH 4/7] refactor(ingest) - remove snowflake_common dependency on aws_common (#4054) Co-authored-by: Shirshanka Das --- .../datahub/ingestion/source/aws/aws_common.py | 16 ---------------- .../src/datahub/ingestion/source/aws/glue.py | 3 ++- .../src/datahub/ingestion/source/aws/s3_util.py | 17 +++++++++++++++++ .../source/aws/sagemaker_processors/jobs.py | 2 +- .../ingestion/source/data_lake/__init__.py | 2 +- .../datahub/ingestion/source/sql/snowflake.py | 2 +- 6 files changed, 22 insertions(+), 20 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py index 29f6e512205b95..d12c215bd3ab66 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py @@ -102,19 +102,3 @@ def get_glue_client(self) -> "GlueClient": def get_sagemaker_client(self) -> "SageMakerClient": return self.get_session().client("sagemaker") - - -def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str: - - if not s3_uri.startswith("s3://"): - raise ValueError("S3 URIs should begin with 's3://'") - # remove S3 prefix (s3://) - s3_name = s3_uri[5:] - - if s3_name.endswith("/"): - s3_name = s3_name[:-1] - - if suffix is not None: - return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{suffix},{env})" - - return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})" diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 9c2d98a0e4f06a..d91a5f28934080 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -15,7 +15,8 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws.aws_common import AwsSourceConfig, make_s3_urn +from datahub.ingestion.source.aws.aws_common import AwsSourceConfig +from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.metadata.com.linkedin.pegasus2avro.common import Status from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py new file mode 100644 index 00000000000000..57df31807435bc --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py @@ -0,0 +1,17 @@ +from typing import Optional + + +def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str: + + if not s3_uri.startswith("s3://"): + raise ValueError("S3 URIs should begin with 's3://'") + # remove S3 prefix (s3://) + s3_name = s3_uri[5:] + + if s3_name.endswith("/"): + s3_name = s3_name[:-1] + + if suffix is not None: + return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{suffix},{env})" + + return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})" diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py index a8f6e346c1b65f..8ec02403e497d2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py @@ -19,7 +19,7 @@ from datahub.emitter import mce_builder from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws.aws_common import make_s3_urn +from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.aws.sagemaker_processors.common import ( SagemakerSourceReport, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake/__init__.py index 3e76931c3c9a27..9870f8299b6502 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake/__init__.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake/__init__.py @@ -35,7 +35,7 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws.aws_common import make_s3_urn +from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.data_lake.config import DataLakeSourceConfig from datahub.ingestion.source.data_lake.profiling import _SingleTableProfiler from datahub.ingestion.source.data_lake.report import DataLakeSourceReport diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index 1a02bd3435f7f9..f4a697796d84ce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -24,7 +24,7 @@ from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws.aws_common import make_s3_urn +from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.sql.sql_common import ( RecordTypeClass, SQLAlchemyConfig, From 3c52f6c62bc8842310903ceb36c0edad887cf790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ebu=20=28=E3=81=88=E3=81=B6=29?= Date: Sat, 5 Feb 2022 16:24:55 +0900 Subject: [PATCH 5/7] feat(ui): Add svg datahub loading logo (#4065) --- .../datahub-logo-color-loading_pendulum.svg | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 datahub-web-react/src/images/datahub-logo-color-loading_pendulum.svg diff --git a/datahub-web-react/src/images/datahub-logo-color-loading_pendulum.svg b/datahub-web-react/src/images/datahub-logo-color-loading_pendulum.svg new file mode 100644 index 00000000000000..06fe3b28ac70d8 --- /dev/null +++ b/datahub-web-react/src/images/datahub-logo-color-loading_pendulum.svg @@ -0,0 +1,48 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From d19241740ec648f76a46b4763f8570b3951aaed2 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Sun, 6 Feb 2022 14:30:40 -0800 Subject: [PATCH 6/7] refactor(model): refactor new Assertion models (#4064) --- .../src/datahub/emitter/mce_builder.py | 5 + .../com/linkedin/assertion/AssertionInfo.pdl | 34 +-- .../linkedin/assertion/AssertionResult.pdl | 78 +++--- .../linkedin/assertion/AssertionRunEvent.pdl | 63 +++++ .../com/linkedin/assertion/AssertionType.pdl | 32 --- .../assertion/BatchAssertionResult.pdl | 40 ---- .../assertion/DatasetAssertionInfo.pdl | 66 +++++ .../metadata/aspect/AssertionAspect.pdl | 14 -- .../metadata/snapshot/AssertionSnapshot.pdl | 24 -- .../src/main/resources/entity-registry.yml | 2 +- smoke-test/test_data_quality.py | 226 +++++++++++++++++- .../bootstrap_data_quality.json | 72 ------ 12 files changed, 398 insertions(+), 258 deletions(-) create mode 100644 metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl delete mode 100644 metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionType.pdl delete mode 100644 metadata-models/src/main/pegasus/com/linkedin/assertion/BatchAssertionResult.pdl create mode 100644 metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl delete mode 100644 metadata-models/src/main/pegasus/com/linkedin/metadata/aspect/AssertionAspect.pdl delete mode 100644 metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/AssertionSnapshot.pdl delete mode 100644 smoke-test/test_resources/bootstrap_data_quality.json diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 3fe2b60ae12eb6..a3f73b804708fa 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -75,6 +75,11 @@ def make_dataset_urn_with_platform_instance( return make_dataset_urn(platform=platform, name=name, env=env) +def make_schema_field_urn(parent_urn: str, field_path: str) -> str: + assert parent_urn.startswith("urn:li:"), "Schema field's parent must be an urn" + return f"urn:li:schemaField:({parent_urn},{field_path})" + + def dataset_urn_to_key(dataset_urn: str) -> Optional[DatasetKeyClass]: pattern = r"urn:li:dataset:\(urn:li:dataPlatform:(.*),(.*),(.*)\)" results = re.search(pattern, dataset_urn) diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl index b097ce274bf524..62ece6b5f581de 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl @@ -2,7 +2,6 @@ namespace com.linkedin.assertion import com.linkedin.common.CustomProperties import com.linkedin.common.ExternalReference -import com.linkedin.common.Urn /** * Information about an assertion @@ -10,42 +9,27 @@ import com.linkedin.common.Urn @Aspect = { "name": "assertionInfo" } -record AssertionInfo includes CustomProperties, ExternalReference { +record AssertionInfo includes CustomProperties, ExternalReference { /** - * One or more dataset schema fields that are targeted by this assertion + * Type of assertion. Assertion types can evolve to span Datasets, Flows (Pipelines), Models, Features etc. */ - @Relationship = { - "/*": { - "name": "Asserts", - "entityTypes": [ "schemaField" ] - } + type: enum AssertionType { + // When present, then DatasetAssertionInfo elements are filled out + DATASET } - datasetFields: optional array[Urn] /** - * One or more datasets that are targeted by this assertion + * Dataset Assertion information when type is DATASET */ - @Relationship = { - "/*": { - "name": "Asserts", - "entityTypes": [ "dataset" ] - } - } - datasets: optional array[Urn] - - /** - * Type of assertion - */ - assertionType: AssertionType + datasetAssertion: optional DatasetAssertionInfo /* - * Logic for assertion such as implementation of custom nativeOperator + * Logic for the assertion as expressed in the native assertion language. Code fragments, query strings, etc. */ assertionLogic: optional string /** * Parameters required for the assertion. e.g. min_value, max_value, value, columns */ - assertionParameters: map[string, string] = { } - + parameters: optional map[string, string] } \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl index 7163e1a7941aad..8b7226cc8d8818 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl @@ -1,51 +1,53 @@ namespace com.linkedin.assertion -import com.linkedin.timeseries.TimeseriesAspectBase -import com.linkedin.common.ExternalReference -import com.linkedin.common.Urn - -/** - * The results of evaluating the assertion on the batch - */ -@Aspect = { - "name": "assertionResult", - "type": "timeseries", -} - -record AssertionResult includes TimeseriesAspectBase { - - /* - * Urn of assertion which is evaluated - */ - @TimeseriesField = {} - assertionUrn: Urn +record AssertionResult { - /* - * Urn of entity being asserted - */ - //example - dataset urn, if dataset is being asserted - @TimeseriesField = {} - asserteeUrn: Urn - /** - * Specification of the batch whose data quality is evaluated + * The final result, e.g. either SUCCESS or FAILURE. */ - batchSpec: optional BatchSpec + type: enum AssertionResultType { + /** + * The Assertion Succeeded + */ + SUCCESS + /** + * The Assertion Failed + */ + FAILURE + } /** - * Results of assertion - */ - @TimeseriesField = {} - batchAssertionResult: BatchAssertionResult + * Number of rows for evaluated batch + */ + rowCount: optional long /** - * Native Run identifier of platform evaluating the assertions - */ - //Multiple assertions could occur in same evaluator run - nativeEvaluatorRunId: optional string + * Number of rows with missing value for evaluated batch + */ + missingCount: optional long + + /** + * Number of rows with unexpected value for evaluated batch + */ + unexpectedCount: optional long /** - * Runtime parameters of evaluation + * Observed aggregate value for evaluated batch */ - runtimeContext: map[string, string] = { } + actualAggValue: optional float + + /** + * Other results of evaluation + */ + nativeResults: optional map[string, string] + + /** + * URL where full results are available + */ + externalUrl: optional string + + /** + * Runtime context for the evaluation + */ + runtimeContext: optional map[string, string] } \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl new file mode 100644 index 00000000000000..40e51002fadf16 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl @@ -0,0 +1,63 @@ +namespace com.linkedin.assertion + +import com.linkedin.timeseries.TimeseriesAspectBase +import com.linkedin.common.ExternalReference +import com.linkedin.common.Urn + +/** + * An event representing the current status of evaluating an assertion on a batch. + * AssertionRunEvent should be used for reporting the status of a run as an assertion evaluation progresses. + */ +@Aspect = { + "name": "assertionRunEvent", + "type": "timeseries", +} +record AssertionRunEvent includes TimeseriesAspectBase { + + /** + * Native (platform-specific) identifier for this run + */ + //Multiple assertions could occur in same evaluator run + runId: string + + /* + * Urn of assertion which is evaluated + */ + @TimeseriesField = {} + assertionUrn: Urn + + /* + * Urn of entity on which the assertion is applicable + */ + //example - dataset urn, if dataset is being asserted + @TimeseriesField = {} + asserteeUrn: Urn + + /** + * Specification of the batch which this run is evaluating + */ + batchSpec: optional BatchSpec + + /** + * The status of the assertion run as per this timeseries event. + */ + // Currently just supports COMPLETE, but should evolve to support other statuses like STARTED, RUNNING, etc. + @TimeseriesField = {} + status: enum AssertionRunStatus { + /** + * The Assertion Run has completed + */ + COMPLETE + } + + /** + * Results of assertion, present if the status is COMPLETE + */ + @TimeseriesField = {} + result: optional AssertionResult + + /** + * Runtime parameters of evaluation + */ + runtimeContext: optional map[string, string] +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionType.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionType.pdl deleted file mode 100644 index b1eb2badc91ef4..00000000000000 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionType.pdl +++ /dev/null @@ -1,32 +0,0 @@ -namespace com.linkedin.assertion - -/** -* Type of Assertion -*/ -record AssertionType { - /** - * Scope of Assertion - */ - scope: enum AssertionScope { - DATASET_COLUMN - DATASET_ROWS - DATASET_SCHEMA - CROSS_DATASET - } - - /** - * Assertion details for scope DATASET_COLUMN - */ - datasetColumnAssertion: optional DatasetColumnAssertion - - /** - * Assertion details for scope DATASET_ROWS - */ - datasetRowsAssertion: optional DatasetRowsAssertion - - /** - * Assertion details for scope DATASET_SCHEMA - */ - datasetSchemaAssertion: optional DatasetSchemaAssertion - -} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/BatchAssertionResult.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/BatchAssertionResult.pdl deleted file mode 100644 index 6dfa11b1f1b287..00000000000000 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/BatchAssertionResult.pdl +++ /dev/null @@ -1,40 +0,0 @@ -namespace com.linkedin.assertion - -record BatchAssertionResult { - - /** - * Indicator of whether the constraint is fully satisfied for the batch - */ - success: boolean - - /** - * Number of rows for evaluated batch - */ - rowCount: optional long - - /** - * Number of rows with missing value for evaluated batch - */ - missingCount: optional long - - /** - * Number of rows with unexpected value for evaluated batch - */ - unexpectedCount: optional long - - /** - * Observed aggregate value for evaluated batch - */ - actualAggValue: optional float - - /** - * Other results of evaluation - */ - nativeResults: map[string, string] = { } - - /** - * URL where the reference exist - */ - //TODO - Change type to optional Url, not working - externalUrl: optional string -} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl new file mode 100644 index 00000000000000..0a7f2083004de3 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl @@ -0,0 +1,66 @@ +namespace com.linkedin.assertion + +import com.linkedin.common.Urn + +/** +* Assertion attributes that are applicable to Dataset Assertions +**/ +record DatasetAssertionInfo { + /** + * Scope of the Assertion. What part of the dataset does this assertion apply to? + * Declared optional to make it convenient to inline into other aspects like AssertionInfo without requiring + * additional nesting. Semantically required, if this is a Dataset Assertion. + **/ + scope: optional enum DatasetAssertionScope { + /** + * This assertion applies to dataset columns + */ + DATASET_COLUMN + /** + * This assertion applies to entire rows of the dataset + */ + DATASET_ROWS + /** + * This assertion applies to the schema of the dataset + */ + DATASET_SCHEMA + // Future evolution can include things like CROSS_DATASET assertions + } + + /** + * Assertion details when scope is DATASET_COLUMN + */ + columnAssertion: optional DatasetColumnAssertion + + /** + * Assertion details when scope is DATASET_ROWS + */ + rowsAssertion: optional DatasetRowsAssertion + + /** + * Assertion details when scope is DATASET_SCHEMA + */ + schemaAssertion: optional DatasetSchemaAssertion + + /** + * One or more dataset schema fields that are targeted by this assertion + */ + @Relationship = { + "/*": { + "name": "Asserts", + "entityTypes": [ "schemaField" ] + } + } + fields: optional array[Urn] + + /** + * One or more datasets that are targeted by this assertion + */ + @Relationship = { + "/*": { + "name": "Asserts", + "entityTypes": [ "dataset" ] + } + } + datasets: optional array[Urn] +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/aspect/AssertionAspect.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/aspect/AssertionAspect.pdl deleted file mode 100644 index 2e6fb81577044e..00000000000000 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/aspect/AssertionAspect.pdl +++ /dev/null @@ -1,14 +0,0 @@ -namespace com.linkedin.metadata.aspect - -import com.linkedin.metadata.key.AssertionKey -import com.linkedin.common.DataPlatformInstance -import com.linkedin.assertion.AssertionInfo - -/** - * A union of all supported metadata aspects for a Assertion - */ -typeref AssertionAspect = union[ - AssertionKey, - DataPlatformInstance, - AssertionInfo -] \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/AssertionSnapshot.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/AssertionSnapshot.pdl deleted file mode 100644 index a4be3ee5176b99..00000000000000 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/AssertionSnapshot.pdl +++ /dev/null @@ -1,24 +0,0 @@ -namespace com.linkedin.metadata.snapshot - -import com.linkedin.common.Urn -import com.linkedin.metadata.aspect.AssertionAspect - -/** - * A metadata snapshot for a specific Assertion entity. - */ -@Entity = { - "name": "assertion", - "keyAspect": "assertionKey" -} -record AssertionSnapshot { - - /** - * URN for the entity the metadata snapshot is associated with. - */ - urn: Urn - - /** - * The list of metadata aspects associated with the assertion. - */ - aspects: array[AssertionAspect] -} diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index fa2428591789e3..70c89f0dec8cec 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -12,7 +12,7 @@ entities: - schemaMetadata - status - container - - assertionResult + - assertionRunEvent - name: dataHubPolicy doc: DataHub Policies represent access policies granted to users or groups on metadata operations like edit, view etc. keyAspect: dataHubPolicyKey diff --git a/smoke-test/test_data_quality.py b/smoke-test/test_data_quality.py index 099c513c62576a..d753748eef70c2 100644 --- a/smoke-test/test_data_quality.py +++ b/smoke-test/test_data_quality.py @@ -5,8 +5,13 @@ import requests from datahub.cli.docker import check_local_docker_containers from tests.utils import ingest_file_via_rest - -bootstrap_sample_data = "test_resources/bootstrap_data_quality.json" +from datahub.metadata.schema_classes import AssertionResultTypeClass, DatasetAssertionInfoClass, PartitionTypeClass, AssertionInfoClass, AssertionTypeClass, DatasetAssertionScopeClass, DatasetColumnAssertionClass, AssertionStdOperatorClass, DatasetColumnStdAggFuncClass, AssertionRunEventClass, PartitionSpecClass, AssertionResultClass, AssertionRunStatusClass +from datahub.emitter.mce_builder import make_schema_field_urn, make_dataset_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.sink.file import FileSink, FileSinkConfig +from datahub.ingestion.api.sink import NoopWriteCallback, WriteCallback +from datahub.ingestion.api.common import RecordEnvelope +from datahub.ingestion.api.common import PipelineContext GMS_ENDPOINT = "http://localhost:8080" restli_default_headers = { @@ -14,8 +19,202 @@ } +def create_test_data(test_file): + assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b" + dataset_urn=make_dataset_urn(platform="postgres", name="fooTable") + assertion_info = AssertionInfoClass( + type=AssertionTypeClass.DATASET, + customProperties={ + "suite_name": "demo_suite" + }, + datasetAssertion=DatasetAssertionInfoClass( + fields=[make_schema_field_urn(dataset_urn,"col1")], + datasets=[dataset_urn], + scope=DatasetAssertionScopeClass.DATASET_COLUMN, + columnAssertion=DatasetColumnAssertionClass( + stdOperator=AssertionStdOperatorClass.LESS_THAN, + nativeOperator="column_value_is_less_than", + stdAggFunc=DatasetColumnStdAggFuncClass.IDENTITY, + ), + ), + parameters={ + "max_value": "99" + }, + ) + # The assertion definition + mcp1 = MetadataChangeProposalWrapper( + entityType="assertion", + changeType="UPSERT", + entityUrn=assertion_urn, + aspectName="assertionInfo", + aspect=assertion_info + ) + timestamps = [1643794280350, 1643794280352, 1643794280354, 1643880726872, 1643880726874, 1643880726875] + msg_ids = [] + # The assertion run event attached to the dataset + mcp2 = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType="UPSERT", + aspectName="assertionRunEvent", + aspect=AssertionRunEventClass( + timestampMillis=timestamps[0], + partitionSpec=PartitionSpecClass( + partition="[{'country': 'IN'}]", + type=PartitionTypeClass.PARTITION, + ), + messageId=str(timestamps[0]), + assertionUrn=assertion_urn, + asserteeUrn=dataset_urn, + result=AssertionResultClass( + type=AssertionResultTypeClass.SUCCESS, + actualAggValue=90, + externalUrl="http://example.com/uuid1", + ), + runId="uuid1", + status=AssertionRunStatusClass.COMPLETE + ) + ) + + mcp3 = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType="UPSERT", + aspectName="assertionRunEvent", + aspect=AssertionRunEventClass( + timestampMillis=timestamps[1], + partitionSpec=PartitionSpecClass( + partition="[{'country': 'US'}]", + type=PartitionTypeClass.PARTITION, + ), + messageId=str(timestamps[1]), + assertionUrn=assertion_urn, + asserteeUrn=dataset_urn, + result=AssertionResultClass( + type=AssertionResultTypeClass.FAILURE, + actualAggValue=101, + externalUrl="http://example.com/uuid1", + ), + runId="uuid1", + status=AssertionRunStatusClass.COMPLETE + ) + ) + # Result of evaluating this assertion on the whole dataset + mcp4 = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType="UPSERT", + aspectName="assertionRunEvent", + aspect=AssertionRunEventClass( + timestampMillis=timestamps[2], + partitionSpec=PartitionSpecClass( + partition="FULL_TABLE_SNAPSHOT", + type=PartitionTypeClass.FULL_TABLE, + ), + messageId=str(timestamps[2]), + assertionUrn=assertion_urn, + asserteeUrn=dataset_urn, + result=AssertionResultClass( + type=AssertionResultTypeClass.SUCCESS, + actualAggValue=93, + externalUrl="http://example.com/uuid1", + ), + runId="uuid1", + status=AssertionRunStatusClass.COMPLETE + ) + ) + + mcp5 = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType="UPSERT", + aspectName="assertionRunEvent", + aspect=AssertionRunEventClass( + timestampMillis=timestamps[3], + partitionSpec=PartitionSpecClass( + partition="[{'country': 'IN'}]", + type=PartitionTypeClass.PARTITION, + ), + messageId=str(timestamps[3]), + assertionUrn=assertion_urn, + asserteeUrn=dataset_urn, + result=AssertionResultClass( + type=AssertionResultTypeClass.SUCCESS, + actualAggValue=90, + externalUrl="http://example.com/uuid1", + ), + runId="uuid1", + status=AssertionRunStatusClass.COMPLETE + ) + ) + mcp6 = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType="UPSERT", + aspectName="assertionRunEvent", + aspect=AssertionRunEventClass( + timestampMillis=timestamps[4], + partitionSpec=PartitionSpecClass( + partition="[{'country': 'US'}]", + type=PartitionTypeClass.PARTITION, + ), + messageId=str(timestamps[4]), + assertionUrn=assertion_urn, + asserteeUrn=dataset_urn, + result=AssertionResultClass( + type=AssertionResultTypeClass.FAILURE, + actualAggValue=101, + externalUrl="http://example.com/uuid1", + ), + runId="uuid1", + status=AssertionRunStatusClass.COMPLETE + ) + ) + + # Result of evaluating this assertion on the whole dataset + mcp7 = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=dataset_urn, + changeType="UPSERT", + aspectName="assertionRunEvent", + aspect=AssertionRunEventClass( + timestampMillis=timestamps[5], + partitionSpec=PartitionSpecClass( + partition="FULL_TABLE_SNAPSHOT", + type=PartitionTypeClass.FULL_TABLE, + ), + messageId=str(timestamps[5]), + assertionUrn=assertion_urn, + asserteeUrn=dataset_urn, + result=AssertionResultClass( + type=AssertionResultTypeClass.SUCCESS, + actualAggValue=93, + externalUrl="http://example.com/uuid1", + ), + runId="uuid1", + status=AssertionRunStatusClass.COMPLETE + ) + ) + + fileSink: FileSink = FileSink.create(FileSinkConfig(filename=test_file), ctx=PipelineContext(run_id="test-file")) + for mcp in [mcp1, mcp2, mcp3, mcp4, mcp5, mcp6, mcp7]: + fileSink.write_record_async(RecordEnvelope(record=mcp, metadata={}), write_callback=NoopWriteCallback()) + fileSink.close() + + + + +@pytest.fixture(scope="session") +def generate_test_data(tmp_path_factory): + """Generates metadata events data and stores into a test file""" + dir_name = tmp_path_factory.mktemp("test_dq_events") + file_name = dir_name / "test_dq_events.json" + create_test_data(test_file=str(file_name)) + yield str(file_name) + + @pytest.fixture(scope="session") -def wait_for_healthchecks(): +def wait_for_healthchecks(generate_test_data): # Simply assert that everything is healthy, but don't wait. assert not check_local_docker_containers() yield @@ -28,8 +227,8 @@ def test_healthchecks(wait_for_healthchecks): @pytest.mark.dependency(depends=["test_healthchecks"]) -def test_run_ingestion(wait_for_healthchecks): - ingest_file_via_rest(bootstrap_sample_data) +def test_run_ingestion(wait_for_healthchecks, generate_test_data): + ingest_file_via_rest(generate_test_data) @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) @@ -41,11 +240,11 @@ def test_gms_get_latest_assertions_results_by_partition(): # Query # Given the dataset - # show me latest assertion results grouped-by date, partition, assertionId + # show me latest assertion run events grouped-by date, partition, assertionId query = json.dumps( { "entityName": "dataset", - "aspectName": "assertionResult", + "aspectName": "assertionRunEvent", "filter": { "or": [ { @@ -60,7 +259,7 @@ def test_gms_get_latest_assertions_results_by_partition(): ] }, "metrics": [ - {"fieldPath": "batchAssertionResult", "aggregationType": "LATEST"} + {"fieldPath": "result", "aggregationType": "LATEST"} ], "buckets": [ {"key": "asserteeUrn", "type": "STRING_GROUPING_BUCKET"}, @@ -88,7 +287,7 @@ def test_gms_get_latest_assertions_results_by_partition(): assert sorted(data["value"]["table"]["columnNames"]) == [ "asserteeUrn", "assertionUrn", - "latest_batchAssertionResult", + "latest_result", "partitionSpec.partition", "timestampMillis", ] @@ -117,9 +316,10 @@ def test_gms_get_assertions_on_dataset(): @pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"]) def test_gms_get_assertions_on_dataset_field(): """lists all assertion urns including those which may not have executed""" - urn = "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD), col1)" + dataset_urn=make_dataset_urn("postgres","fooTable") + field_urn = make_schema_field_urn(dataset_urn, "col1") response = requests.get( - f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts" + f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts" ) response.raise_for_status() @@ -141,4 +341,6 @@ def test_gms_get_assertion_info(): assert data["aspect"] assert data["aspect"]["com.linkedin.assertion.AssertionInfo"] - assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["assertionType"] + assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["type"] == "DATASET" + assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"]["scope"] + diff --git a/smoke-test/test_resources/bootstrap_data_quality.json b/smoke-test/test_resources/bootstrap_data_quality.json deleted file mode 100644 index 0bd7f52e5fc645..00000000000000 --- a/smoke-test/test_resources/bootstrap_data_quality.json +++ /dev/null @@ -1,72 +0,0 @@ -[ - { - "entityType": "assertion", - "entityUrn": "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b", - "changeType": "UPSERT", - "aspectName": "assertionInfo", - "aspect": { - "value": "{\"customProperties\": {\"suite_name\": \"demo_suite\"}, \"datasetFields\": [\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD), col1)\"], \"datasets\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\"], \"assertionType\": {\"scope\": \"DATASET_COLUMN\", \"datasetColumnAssertion\": {\"stdOperator\": \"LESS_THAN\", \"nativeOperator\": \"column_value_is_less_than\", \"stdAggFunc\": \"IDENTITY\"}}, \"assertionParameters\": {\"max_value\": \"99\"}}", - "contentType": "application/json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)", - "changeType": "UPSERT", - "aspectName": "assertionResult", - "aspect": { - "value": "{\"timestampMillis\": 1643794280350, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'IN'}]\"}, \"messageId\": \"1643794280350\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 90, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}", - "contentType": "application/json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)", - "changeType": "UPSERT", - "aspectName": "assertionResult", - "aspect": { - "value": "{\"timestampMillis\": 1643794280352, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'US'}]\"}, \"messageId\": \"1643794280352\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": false, \"actualAggValue\": 101, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}", - "contentType": "application/json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)", - "changeType": "UPSERT", - "aspectName": "assertionResult", - "aspect": { - "value": "{\"timestampMillis\": 1643794280354, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"messageId\": \"1643794280354\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 93, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}", - "contentType": "application/json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)", - "changeType": "UPSERT", - "aspectName": "assertionResult", - "aspect": { - "value": "{\"timestampMillis\": 1643880726872, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'IN'}]\"}, \"messageId\": \"1643880726872\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 90, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}", - "contentType": "application/json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)", - "changeType": "UPSERT", - "aspectName": "assertionResult", - "aspect": { - "value": "{\"timestampMillis\": 1643880726874, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'US'}]\"}, \"messageId\": \"1643880726874\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": false, \"actualAggValue\": 101, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}", - "contentType": "application/json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)", - "changeType": "UPSERT", - "aspectName": "assertionResult", - "aspect": { - "value": "{\"timestampMillis\": 1643880726875, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"messageId\": \"1643880726875\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 93, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}", - "contentType": "application/json" - } - } -] \ No newline at end of file From 52272d65610fe06d7c1a0165a3745f087318f102 Mon Sep 17 00:00:00 2001 From: Danilo Peixoto Date: Sun, 6 Feb 2022 19:49:10 -0300 Subject: [PATCH 7/7] feat(cli): add --force option to ingest rollback subcommand (#4032) --- metadata-ingestion/src/datahub/cli/ingest_cli.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 6acf0d55f8a7d3..ac940823da8b85 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -183,14 +183,15 @@ def show(run_id: str) -> None: @ingest.command() @click.option("--run-id", required=True, type=str) +@click.option("-f", "--force", required=False, is_flag=True) @click.option("--dry-run", "-n", required=False, is_flag=True, default=False) @telemetry.with_telemetry -def rollback(run_id: str, dry_run: bool) -> None: +def rollback(run_id: str, force: bool, dry_run: bool) -> None: """Rollback a provided ingestion run to datahub""" cli_utils.test_connectivity_complain_exit("ingest") - if not dry_run: + if not force and not dry_run: click.confirm( "This will permanently delete data from DataHub. Do you want to continue?", abort=True,