Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): stricter deserialization for MCE JSONs #2976

Merged
merged 1 commit into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/examples/mce_files/bootstrap_mce.json
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@
},
{
"com.linkedin.pegasus2avro.common.Status": {
"boolean": false
"removed": false
}
},
{
Expand Down
113 changes: 56 additions & 57 deletions metadata-ingestion/examples/mce_files/mce_list.json
Original file line number Diff line number Diff line change
@@ -1,67 +1,66 @@
[
{
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "who knows?"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
,
{
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:jane",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Jane Doe"
},
"email": "jane@example.com",
"title": {
"string": "CEO"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Jane Doe"
},
"countryCode": null
}
}
]
}
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:jane",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Jane Doe"
},
"email": "jane@example.com",
"title": {
"string": "CEO"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Jane Doe"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
}
]
58 changes: 29 additions & 29 deletions metadata-ingestion/examples/mce_files/single_mce.json
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "who knows?"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
},
"proposedDelta": null
]
}
},
"proposedDelta": null
}
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_long_description():
"entrypoints",
"docker",
"expandvars>=0.6.5",
"avro-gen3==0.5.3",
"avro-gen3==0.6.0",
"avro-python3>=1.8.2",
"python-dateutil",
"stackprinter",
Expand Down
15 changes: 7 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ def iterate_mce_file(path: str) -> Iterator[MetadataChangeEvent]:
def iterate_generic_file(
path: str,
) -> Iterator[Union[MetadataChangeEvent, UsageAggregationClass]]:
for obj in _iterate_file(path):
for i, obj in enumerate(_iterate_file(path)):
item: Union[MetadataChangeEvent, UsageAggregationClass]
if "proposedSnapshot" in obj:
mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
yield mce
item = MetadataChangeEvent.from_obj(obj)
else:
bucket: UsageAggregationClass = UsageAggregationClass.from_obj(obj)
yield bucket
item = UsageAggregationClass.from_obj(obj)
if not item.validate():
raise ValueError(f"failed to parse: {obj} (index {i})")
yield item


class FileSourceConfig(ConfigModel):
Expand All @@ -51,9 +53,6 @@ def create(cls, config_dict, ctx):

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
for i, obj in enumerate(iterate_generic_file(self.config.filename)):
if not obj.validate():
raise ValueError(f"failed to parse: {obj} (index {i})")

wu: Union[MetadataWorkUnit, UsageStatsWorkUnit]
if isinstance(obj, UsageAggregationClass):
wu = UsageStatsWorkUnit(f"file://{self.config.filename}:{i}", obj)
Expand Down
19 changes: 19 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from click.testing import CliRunner

import datahub.metadata.schema_classes as models
from datahub.cli.json_file import check_mce_file
from datahub.entrypoints import datahub
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import iterate_mce_file
Expand Down Expand Up @@ -106,6 +107,24 @@ def test_check_mce_schema(pytestconfig: PytestConfig, json_filename: str) -> Non
assert result.exit_code == 0


@pytest.mark.parametrize(
"json_filename",
[
# Extra field.
"tests/unit/serde/test_serde_extra_field.json",
# Missing fields.
"tests/unit/serde/test_serde_missing_field.json",
],
)
def test_check_mce_schema_failure(
pytestconfig: PytestConfig, json_filename: str
) -> None:
json_file_path = pytestconfig.rootpath / json_filename

with pytest.raises((ValueError, AssertionError)):
check_mce_file(str(json_file_path))


def test_field_discriminator() -> None:
cost_object = models.CostClass(
costType=models.CostTypeClass.ORG_COST_TYPE,
Expand Down
33 changes: 33 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde_extra_field.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"this_is_an_invalid_field": "hope it throws an error!",
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
30 changes: 30 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde_missing_field.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"displayName": {
"string": "notice that the 'active' and 'email' fields are missing, but should be required"
},
"title": {
"string": "Engineer"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}