Skip to content

Commit

Permalink
feat(ingest): stricter deserialization for MCE JSONs (#2976)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jul 28, 2021
1 parent a1d1dd4 commit 7ab6355
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 96 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/examples/mce_files/bootstrap_mce.json
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@
},
{
"com.linkedin.pegasus2avro.common.Status": {
"boolean": false
"removed": false
}
},
{
Expand Down
113 changes: 56 additions & 57 deletions metadata-ingestion/examples/mce_files/mce_list.json
Original file line number Diff line number Diff line change
@@ -1,67 +1,66 @@
[
{
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "who knows?"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
,
{
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:jane",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Jane Doe"
},
"email": "jane@example.com",
"title": {
"string": "CEO"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Jane Doe"
},
"countryCode": null
}
}
]
}
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:jane",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Jane Doe"
},
"email": "jane@example.com",
"title": {
"string": "CEO"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Jane Doe"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
}
]
58 changes: 29 additions & 29 deletions metadata-ingestion/examples/mce_files/single_mce.json
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "who knows?"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
},
"proposedDelta": null
]
}
},
"proposedDelta": null
}
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_long_description():
"entrypoints",
"docker",
"expandvars>=0.6.5",
"avro-gen3==0.5.3",
"avro-gen3==0.6.0",
"avro-python3>=1.8.2",
"python-dateutil",
"stackprinter",
Expand Down
15 changes: 7 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ def iterate_mce_file(path: str) -> Iterator[MetadataChangeEvent]:
def iterate_generic_file(
path: str,
) -> Iterator[Union[MetadataChangeEvent, UsageAggregationClass]]:
for obj in _iterate_file(path):
for i, obj in enumerate(_iterate_file(path)):
item: Union[MetadataChangeEvent, UsageAggregationClass]
if "proposedSnapshot" in obj:
mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
yield mce
item = MetadataChangeEvent.from_obj(obj)
else:
bucket: UsageAggregationClass = UsageAggregationClass.from_obj(obj)
yield bucket
item = UsageAggregationClass.from_obj(obj)
if not item.validate():
raise ValueError(f"failed to parse: {obj} (index {i})")
yield item


class FileSourceConfig(ConfigModel):
Expand All @@ -51,9 +53,6 @@ def create(cls, config_dict, ctx):

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
for i, obj in enumerate(iterate_generic_file(self.config.filename)):
if not obj.validate():
raise ValueError(f"failed to parse: {obj} (index {i})")

wu: Union[MetadataWorkUnit, UsageStatsWorkUnit]
if isinstance(obj, UsageAggregationClass):
wu = UsageStatsWorkUnit(f"file://{self.config.filename}:{i}", obj)
Expand Down
19 changes: 19 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from click.testing import CliRunner

import datahub.metadata.schema_classes as models
from datahub.cli.json_file import check_mce_file
from datahub.entrypoints import datahub
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import iterate_mce_file
Expand Down Expand Up @@ -106,6 +107,24 @@ def test_check_mce_schema(pytestconfig: PytestConfig, json_filename: str) -> Non
assert result.exit_code == 0


@pytest.mark.parametrize(
"json_filename",
[
# Extra field.
"tests/unit/serde/test_serde_extra_field.json",
# Missing fields.
"tests/unit/serde/test_serde_missing_field.json",
],
)
def test_check_mce_schema_failure(
pytestconfig: PytestConfig, json_filename: str
) -> None:
json_file_path = pytestconfig.rootpath / json_filename

with pytest.raises((ValueError, AssertionError)):
check_mce_file(str(json_file_path))


def test_field_discriminator() -> None:
cost_object = models.CostClass(
costType=models.CostTypeClass.ORG_COST_TYPE,
Expand Down
33 changes: 33 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde_extra_field.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"this_is_an_invalid_field": "hope it throws an error!",
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
30 changes: 30 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde_missing_field.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"displayName": {
"string": "notice that the 'active' and 'email' fields are missing, but should be required"
},
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}

0 comments on commit 7ab6355

Please sign in to comment.