Skip to content

Commit

Permalink
Merge branch 'master' into add-entity-deprecation
Browse files Browse the repository at this point in the history
  • Loading branch information
jjoyce0510 committed Feb 7, 2022
2 parents 78a1e43 + 52272d6 commit 84c3241
Show file tree
Hide file tree
Showing 23 changed files with 516 additions and 284 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_long_description():
"bigquery": sql_common | bigquery_common | {"pybigquery >= 0.6.0"},
"bigquery-usage": bigquery_common | {"cachetools"},
"datahub-business-glossary": set(),
"data-lake": {"pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"},
"data-lake": {*aws_common, "pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"},
"dbt": {"requests"},
"druid": sql_common | {"pydruid>=0.6.2"},
"elasticsearch": {"elasticsearch"},
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/source_docs/data_lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).

:::caution

This source is in **Beta** and under active development. Not yet considered ready for production.

:::

## Setup

To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Because the files are read using PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed.
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,15 @@ def show(run_id: str) -> None:

@ingest.command()
@click.option("--run-id", required=True, type=str)
@click.option("-f", "--force", required=False, is_flag=True)
@click.option("--dry-run", "-n", required=False, is_flag=True, default=False)
@telemetry.with_telemetry
def rollback(run_id: str, dry_run: bool) -> None:
def rollback(run_id: str, force: bool, dry_run: bool) -> None:
"""Rollback a provided ingestion run to datahub"""

cli_utils.test_connectivity_complain_exit("ingest")

if not dry_run:
if not force and not dry_run:
click.confirm(
"This will permanently delete data from DataHub. Do you want to continue?",
abort=True,
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ def make_dataset_urn_with_platform_instance(
return make_dataset_urn(platform=platform, name=name, env=env)


def make_schema_field_urn(parent_urn: str, field_path: str) -> str:
assert parent_urn.startswith("urn:li:"), "Schema field's parent must be an urn"
return f"urn:li:schemaField:({parent_urn},{field_path})"


def dataset_urn_to_key(dataset_urn: str) -> Optional[DatasetKeyClass]:
pattern = r"urn:li:dataset:\(urn:li:dataPlatform:(.*),(.*),(.*)\)"
results = re.search(pattern, dataset_urn)
Expand Down
16 changes: 0 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,3 @@ def get_glue_client(self) -> "GlueClient":

def get_sagemaker_client(self) -> "SageMakerClient":
return self.get_session().client("sagemaker")


def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str:

if not s3_uri.startswith("s3://"):
raise ValueError("S3 URIs should begin with 's3://'")
# remove S3 prefix (s3://)
s3_name = s3_uri[5:]

if s3_name.endswith("/"):
s3_name = s3_name[:-1]

if suffix is not None:
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{suffix},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"
3 changes: 2 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig, make_s3_urn
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand Down
17 changes: 17 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Optional


def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str:

if not s3_uri.startswith("s3://"):
raise ValueError("S3 URIs should begin with 's3://'")
# remove S3 prefix (s3://)
s3_name = s3_uri[5:]

if s3_name.endswith("/"):
s3_name = s3_name[:-1]

if suffix is not None:
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{suffix},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datahub.emitter import mce_builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.aws_common import make_s3_urn
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.aws.sagemaker_processors.common import (
SagemakerSourceReport,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.aws_common import make_s3_urn
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.data_lake.config import DataLakeSourceConfig
from datahub.ingestion.source.data_lake.profiling import _SingleTableProfiler
from datahub.ingestion.source.data_lake.report import DataLakeSourceReport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.aws_common import make_s3_urn
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.sql.sql_common import (
RecordTypeClass,
SQLAlchemyConfig,
Expand Down
41 changes: 38 additions & 3 deletions metadata-ingestion/src/datahub/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import platform
import sys
import uuid
from functools import wraps
from pathlib import Path
Expand Down Expand Up @@ -168,11 +169,45 @@ def ping(
T = TypeVar("T")


def get_full_class_name(obj):
module = obj.__class__.__module__
if module is None or module == str.__class__.__module__:
return obj.__class__.__name__
return module + "." + obj.__class__.__name__


def with_telemetry(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
res = func(*args, **kwargs)
telemetry_instance.ping(func.__module__, func.__name__)
return res

category = func.__module__
action = func.__name__

telemetry_instance.ping(category, action, "started")
try:
res = func(*args, **kwargs)
telemetry_instance.ping(category, action, "completed")
return res
# Catch general exceptions
except Exception as e:
telemetry_instance.ping(category, action, f"error:{get_full_class_name(e)}")
raise e
# System exits (used in ingestion and Docker commands) are not caught by the exception handler,
# so we need to catch them here.
except SystemExit as e:
# Forward successful exits
if e.code == 0:
telemetry_instance.ping(category, action, "completed")
sys.exit(0)
# Report failed exits
else:
telemetry_instance.ping(
category, action, f"error:{get_full_class_name(e)}"
)
sys.exit(e.code)
# Catch SIGINTs
except KeyboardInterrupt:
telemetry_instance.ping(category, action, "cancelled")
sys.exit(0)

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,34 @@ namespace com.linkedin.assertion

import com.linkedin.common.CustomProperties
import com.linkedin.common.ExternalReference
import com.linkedin.common.Urn

/**
* Information about an assertion
*/
@Aspect = {
"name": "assertionInfo"
}
record AssertionInfo includes CustomProperties, ExternalReference {
record AssertionInfo includes CustomProperties, ExternalReference {
/**
* One or more dataset schema fields that are targeted by this assertion
* Type of assertion. Assertion types can evolve to span Datasets, Flows (Pipelines), Models, Features etc.
*/
@Relationship = {
"/*": {
"name": "Asserts",
"entityTypes": [ "schemaField" ]
}
type: enum AssertionType {
// When present, then DatasetAssertionInfo elements are filled out
DATASET
}
datasetFields: optional array[Urn]

/**
* One or more datasets that are targeted by this assertion
* Dataset Assertion information when type is DATASET
*/
@Relationship = {
"/*": {
"name": "Asserts",
"entityTypes": [ "dataset" ]
}
}
datasets: optional array[Urn]

/**
* Type of assertion
*/
assertionType: AssertionType
datasetAssertion: optional DatasetAssertionInfo

/*
* Logic for assertion such as implementation of custom nativeOperator
* Logic for the assertion as expressed in the native assertion language. Code fragments, query strings, etc.
*/
assertionLogic: optional string

/**
* Parameters required for the assertion. e.g. min_value, max_value, value, columns
*/
assertionParameters: map[string, string] = { }

parameters: optional map[string, string]
}
Original file line number Diff line number Diff line change
@@ -1,51 +1,53 @@
namespace com.linkedin.assertion

import com.linkedin.timeseries.TimeseriesAspectBase
import com.linkedin.common.ExternalReference
import com.linkedin.common.Urn

/**
* The results of evaluating the assertion on the batch
*/
@Aspect = {
"name": "assertionResult",
"type": "timeseries",
}

record AssertionResult includes TimeseriesAspectBase {

/*
* Urn of assertion which is evaluated
*/
@TimeseriesField = {}
assertionUrn: Urn
record AssertionResult {

/*
* Urn of entity being asserted
*/
//example - dataset urn, if dataset is being asserted
@TimeseriesField = {}
asserteeUrn: Urn

/**
* Specification of the batch whose data quality is evaluated
* The final result, e.g. either SUCCESS or FAILURE.
*/
batchSpec: optional BatchSpec
type: enum AssertionResultType {
/**
* The Assertion Succeeded
*/
SUCCESS
/**
* The Assertion Failed
*/
FAILURE
}

/**
* Results of assertion
*/
@TimeseriesField = {}
batchAssertionResult: BatchAssertionResult
* Number of rows for evaluated batch
*/
rowCount: optional long

/**
* Native Run identifier of platform evaluating the assertions
*/
//Multiple assertions could occur in same evaluator run
nativeEvaluatorRunId: optional string
* Number of rows with missing value for evaluated batch
*/
missingCount: optional long

/**
* Number of rows with unexpected value for evaluated batch
*/
unexpectedCount: optional long

/**
* Runtime parameters of evaluation
* Observed aggregate value for evaluated batch
*/
runtimeContext: map[string, string] = { }
actualAggValue: optional float

/**
* Other results of evaluation
*/
nativeResults: optional map[string, string]

/**
* URL where full results are available
*/
externalUrl: optional string

/**
* Runtime context for the evaluation
*/
runtimeContext: optional map[string, string]
}
Loading

0 comments on commit 84c3241

Please sign in to comment.