Skip to content

Commit

Permalink
feat(assertion): update python example, assertion entity doc (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored and maggiehays committed Aug 1, 2022
1 parent aa60546 commit beb3d78
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public static Assertion map(final EntityResponse entityResponse) {
if (envelopedPlatformInstance != null) {
result.setPlatform(mapPlatform(new DataPlatformInstance(envelopedPlatformInstance.getValue().data())));
} else {
// Containers must have DPI to be rendered.
return null;
final DataPlatform unknownPlatform = new DataPlatform();
unknownPlatform.setUrn(Constants.UNKNOWN_DATA_PLATFORM);
result.setPlatform(unknownPlatform);
}

return result;
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ task lint(type: Exec, dependsOn: installDev) {
*/
commandLine 'bash', '-x', '-c',
"find ${venv_name}/lib -path *airflow/_vendor/connexion/spec.py -exec sed -i.bak -e '169,169s/ # type: List\\[str\\]//g' {} \\; && " +
"source ${venv_name}/bin/activate && black --check --diff src/ tests/ && isort --check --diff src/ tests/ && flake8 --count --statistics src/ tests/ && mypy src/ tests/"
"source ${venv_name}/bin/activate && black --check --diff src/ tests/ examples/ && isort --check --diff src/ tests/ examples/ && flake8 --count --statistics src/ tests/ examples/ && mypy src/ tests/ examples/"
}
task lintFix(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && " +
"black src/ tests/ && " +
"isort src/ tests/ && " +
"flake8 src/ tests/ && " +
"mypy src/ tests/"
"black src/ tests/ examples/ && " +
"isort src/ tests/ examples/ && " +
"flake8 src/ tests/ examples/ && " +
"mypy src/ tests/ examples/"
}

task testQuick(type: Exec, dependsOn: installDev) {
Expand Down
74 changes: 60 additions & 14 deletions metadata-ingestion/examples/library/data_quality_mcpw_rest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import time

import datahub.emitter.mce_builder as builder
Expand All @@ -11,12 +12,17 @@
AssertionRunStatus,
AssertionStdAggregation,
AssertionStdOperator,
AssertionStdParameter,
AssertionStdParameters,
AssertionStdParameterType,
AssertionType,
DatasetAssertionInfo,
DatasetAssertionScope,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties
from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType
from datahub.metadata.schema_classes import AssertionRunEventClass, PartitionSpecClass
from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionSpec


def datasetUrn(tbl: str) -> str:
Expand All @@ -31,7 +37,7 @@ def assertionUrn(info: AssertionInfo) -> str:
return "urn:li:assertion:432475190cc846f2894b5b3aa4d55af2"


def emitAssertionResult(assertionResult: AssertionResult) -> None:
def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:

dataset_assertionRunEvent_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
Expand All @@ -45,17 +51,43 @@ def emitAssertionResult(assertionResult: AssertionResult) -> None:
emitter.emit_mcp(dataset_assertionRunEvent_mcp)


# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

datasetProperties = DatasetProperties(
name="bazTable",
)
# Construct a MetadataChangeProposalWrapper object for dataset
dataset_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeType.UPSERT,
entityUrn=datasetUrn("bazTable"),
aspectName="datasetProperties",
aspect=datasetProperties,
)

# Emit Dataset entity properties aspect! (Skip if dataset is already present)
emitter.emit_mcp(dataset_mcp)

# Construct an assertion object.
assertion_maxVal = AssertionInfo(
type=AssertionType.DATASET,
datasetAssertion=DatasetAssertionInfo(
scope=DatasetAssertionScope.DATASET_COLUMN,
operator=AssertionStdOperator.LESS_THAN,
nativeType="column_value_is_less_than",
aggregation=AssertionStdAggregation.IDENTITY,
operator=AssertionStdOperator.BETWEEN,
nativeType="expect_column_max_to_be_between",
aggregation=AssertionStdAggregation.MAX,
fields=[fldUrn("bazTable", "col1")],
dataset=datasetUrn("bazTable"),
nativeParameters={"max_value": "99"},
nativeParameters={"max_value": "99", "min_value": "89"},
parameters=AssertionStdParameters(
minValue=AssertionStdParameter(
type=AssertionStdParameterType.NUMBER, value="89"
),
maxValue=AssertionStdParameter(
type=AssertionStdParameterType.NUMBER, value="99"
),
),
),
customProperties={"suite_name": "demo_suite"},
)
Expand All @@ -69,18 +101,32 @@ def emitAssertionResult(assertionResult: AssertionResult) -> None:
aspect=assertion_maxVal,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit Assertion entity info object!
# Emit Assertion entity info aspect!
emitter.emit_mcp(assertion_maxVal_mcp)

# Construct an assertion platform object.
assertion_dataPlatformInstance = DataPlatformInstance(
platform=builder.make_data_platform_urn("great-expectations")
)

# Construct a MetadataChangeProposalWrapper object for assertion platform
assertion_dataPlatformInstance_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
changeType=ChangeType.UPSERT,
entityUrn=assertionUrn(assertion_maxVal),
aspectName="dataPlatformInstance",
aspect=assertion_dataPlatformInstance,
)
# Emit Assertion entity platform aspect!
emitter.emit(assertion_dataPlatformInstance_mcp)


# Construct batch assertion result object for partition 1 batch
assertionResult_maxVal_batch_partition1 = AssertionRunEvent(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
partitionSpec=PartitionSpecClass(partition=str([{"country": "IN"}])),
partitionSpec=PartitionSpec(partition=json.dumps([{"country": "IN"}])),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
Expand All @@ -95,11 +141,11 @@ def emitAssertionResult(assertionResult: AssertionResult) -> None:
)

# Construct batch assertion result object for partition 2 batch
assertionResult_maxVal_batch_partition2 = AssertionRunEventClass(
assertionResult_maxVal_batch_partition2 = AssertionRunEvent(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
partitionSpec=PartitionSpecClass(partition=str([{"country": "US"}])),
partitionSpec=PartitionSpec(partition=json.dumps([{"country": "US"}])),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
Expand All @@ -114,7 +160,7 @@ def emitAssertionResult(assertionResult: AssertionResult) -> None:
)

# Construct batch assertion result object for full table batch.
assertionResult_maxVal_batch_fulltable = AssertionRunEventClass(
assertionResult_maxVal_batch_fulltable = AssertionRunEvent(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import datahub.emitter.mce_builder as builder
import json

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage
)
from datahub.metadata.schema_classes import ChangeTypeClass, DataJobInputOutputClass


def datasetUrn(tbl):
return builder.make_dataset_urn("postgres", tbl)


def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld);
return builder.make_schema_field_urn(datasetUrn(tbl), fld)


# Lineage of fields output by a job
# bar.c1 <-- unknownFunc(bar2.c1, bar4.c1)
Expand All @@ -30,72 +28,87 @@ def fldUrn(tbl, fld):
# Note that the semantic of the "transformOperation" value is contextual.
# In above example, it is regarded as some kind of UDF; but it could also be an expression etc.

fineGrainedLineages=[
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c1")]),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar3","c2")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c2")],
confidenceScore = 0.8, transformOperation="myfunc"),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2","c2"), fldUrn("bar2","c3"), fldUrn("bar3","c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")],
confidenceScore = 0.7),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar3")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c5")]),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar4")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")]),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.NONE,
upstreams=[],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar2", "c9")])
]
fineGrainedLineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c1")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar3", "c2")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c2")],
confidenceScore=0.8,
transformOperation="myfunc",
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c2"), fldUrn("bar2", "c3"), fldUrn("bar3", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")],
confidenceScore=0.7,
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar3")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c5")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar4")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.NONE,
upstreams=[],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar2", "c9")],
),
]

# The lineage of output col bar.c9 is unknown. So there is no lineage for it above.
# Note that bar2 is an input as well as an output dataset, but some fields are inputs while other fields are outputs.

dataJobInputOutput = DataJobInputOutputClass(
inputDatasets=[datasetUrn("bar2"), datasetUrn("bar3"), datasetUrn("bar4")],
outputDatasets=[datasetUrn("bar"), datasetUrn("bar2")],
inputDatasets=[datasetUrn("bar2"), datasetUrn("bar3"), datasetUrn("bar4")],
outputDatasets=[datasetUrn("bar"), datasetUrn("bar2")],
inputDatajobs=None,
inputDatasetFields=[fldUrn("bar2","c1"), fldUrn("bar2","c2"), fldUrn("bar2","c3"),
fldUrn("bar3","c1"), fldUrn("bar3","c2"), fldUrn("bar4","c1")],
outputDatasetFields=[fldUrn("bar", "c1"), fldUrn("bar", "c2"),fldUrn("bar", "c3"),
fldUrn("bar", "c4"), fldUrn("bar", "c5"),fldUrn("bar", "c6"),
fldUrn("bar", "c7"), fldUrn("bar", "c9"),
fldUrn("bar2", "c9")],
fineGrainedLineages=fineGrainedLineages
)
inputDatasetFields=[
fldUrn("bar2", "c1"),
fldUrn("bar2", "c2"),
fldUrn("bar2", "c3"),
fldUrn("bar3", "c1"),
fldUrn("bar3", "c2"),
fldUrn("bar4", "c1"),
],
outputDatasetFields=[
fldUrn("bar", "c1"),
fldUrn("bar", "c2"),
fldUrn("bar", "c3"),
fldUrn("bar", "c4"),
fldUrn("bar", "c5"),
fldUrn("bar", "c6"),
fldUrn("bar", "c7"),
fldUrn("bar", "c9"),
fldUrn("bar2", "c9"),
],
fineGrainedLineages=fineGrainedLineages,
)

dataJobLineageMcp = MetadataChangeProposalWrapper(
entityType="dataJob",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_data_job_urn("spark", "Flow1", "Task1"),
aspectName="dataJobInputOutput",
aspect=dataJobInputOutput
aspect=dataJobInputOutput,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(dataJobLineageMcp)
emitter.emit_mcp(dataJobLineageMcp)
Loading

0 comments on commit beb3d78

Please sign in to comment.