From 0873d76e14e609ecd3619469f4c678bc96d28d60 Mon Sep 17 00:00:00 2001 From: MohdSiddique Bagwan Date: Wed, 20 Jul 2022 22:47:00 +0530 Subject: [PATCH 1/7] feat(transformers): Add domain transformer for dataset --- .../src/datahub/configuration/common.py | 20 ++- .../ingestion/transformer/base_transformer.py | 2 + .../ingestion/transformer/dataset_domain.py | 140 ++++++++++++++++++ .../transformer/dataset_transformer.py | 18 ++- .../transformer/transform_registry.py | 10 ++ 5 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index c342a7cd33424b..5ba87b80b61393 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -1,8 +1,9 @@ import re from abc import ABC, abstractmethod +from enum import Enum from typing import IO, Any, Dict, List, Optional, Pattern, cast -from pydantic import BaseModel +from pydantic import BaseModel, validator from pydantic.fields import Field @@ -11,6 +12,23 @@ class Config: extra = "forbid" +class Semantics(Enum): + """Describes semantics for aspect changes""" + + OVERWRITE = "OVERWRITE" # Apply changes blindly + PATCH = "PATCH" # Only apply differences from what exists already on the server + + +class SemanticTransformerConfigModel(ConfigModel): + semantics: Semantics = Semantics.OVERWRITE + + @validator("semantics", pre=True) + def ensure_semantics_is_upper_case(cls, v: str) -> str: + if isinstance(v, str): + return v.upper() + return v + + class DynamicTypedConfig(ConfigModel): type: str = Field( description="The type of the dynamic object", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 82cfecbddfed39..a157d0d955ba97 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -17,6 +17,7 @@ DatasetPropertiesClass, DatasetSnapshotClass, DatasetUpstreamLineageClass, + DomainsClass, EditableDatasetPropertiesClass, EditableSchemaMetadataClass, GlobalTagsClass, @@ -41,6 +42,7 @@ class SnapshotAspectRegistry: def __init__(self): self.aspect_name_type_mapping = { "ownership": OwnershipClass, + "domains": DomainsClass, "globalTags": GlobalTagsClass, "datasetProperties": DatasetPropertiesClass, "editableDatasetProperties": EditableDatasetPropertiesClass, diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py new file mode 100644 index 00000000000000..8fcd72cfeca49a --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -0,0 +1,140 @@ +from typing import Callable, List, Optional, Union + +from datahub.configuration.common import ( + ConfigurationError, + KeyValuePattern, + Semantics, + SemanticTransformerConfigModel, +) +from datahub.configuration.import_resolver import pydantic_resolve_key +from datahub.emitter.mce_builder import Aspect +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer +from datahub.metadata.schema_classes import DomainsClass + + +class AddDatasetDomainConfig(SemanticTransformerConfigModel): + get_domains_to_add: Union[ + Callable[[str], DomainsClass], + Callable[[str], DomainsClass], + ] + + _resolve_domain_fn = pydantic_resolve_key("get_domains_to_add") + + +class SimpleDatasetDomainConfig(SemanticTransformerConfigModel): + domain_urns: List[str] + + +class PatternDatasetDomainConfig(SemanticTransformerConfigModel): + domain_pattern: KeyValuePattern = KeyValuePattern.all() + + +class AddDatasetDomain(DatasetDomainTransformer): + """Transformer that adds domains to datasets according to a callback function.""" + + ctx: PipelineContext + config: AddDatasetDomainConfig + + def __init__(self, config: AddDatasetDomainConfig, ctx: PipelineContext): + super().__init__() + self.ctx = ctx + self.config = config + if self.config.semantics == Semantics.PATCH and self.ctx.graph is None: + raise ConfigurationError( + "With PATCH semantics, AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" + ) + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDomain": + config = AddDatasetDomainConfig.parse_obj(config_dict) + return cls(config, ctx) + + @staticmethod + def get_domains_to_set( + graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass] + ) -> Optional[DomainsClass]: + if not mce_domain or not mce_domain.domains: + # nothing to add, no need to consult server + return None + assert mce_domain + server_domain = graph.get_domain(entity_urn=urn) + if server_domain: + # compute patch + # we only include domain who are not present in the server domain list + domains_to_add: List[str] = [] + for domain in mce_domain.domains: + if domain not in server_domain.domains: + domains_to_add.append(domain) + + if domains_to_add: + mce_domain.domains = server_domain.domains + domains_to_add + return mce_domain + else: + return None + else: + return mce_domain + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + + domain_aspect = DomainsClass(domains=[]) + # Check if we have received existing aspect + if aspect is not None: + domain_aspect.domains.extend(aspect.domains) # type: ignore[attr-defined] + + domain_to_add = self.config.get_domains_to_add(entity_urn) + + domain_aspect.domains.extend(domain_to_add.domains) + + if self.config.semantics == Semantics.PATCH: + assert self.ctx.graph + domain_aspect = AddDatasetDomain.get_domains_to_set( + self.ctx.graph, entity_urn, domain_aspect + ) # type: ignore[assignment] + # ignore mypy errors as Aspect is not a concrete class + return domain_aspect # type: ignore[return-value] + + +class SimpleAddDatasetDomain(AddDatasetDomain): + """Transformer that adds a specified set of domains to each dataset.""" + + def __init__(self, config: SimpleDatasetDomainConfig, ctx: PipelineContext): + domain = DomainsClass(domains=config.domain_urns) + + generic_config = AddDatasetDomainConfig( + get_domains_to_add=lambda _: domain, + semantics=config.semantics, + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "SimpleAddDatasetDomain": + config = SimpleDatasetDomainConfig.parse_obj(config_dict) + return cls(config, ctx) + + +class PatternAddDatasetDomain(AddDatasetDomain): + """Transformer that adds a specified set of domains to each dataset.""" + + def __init__(self, config: PatternDatasetDomainConfig, ctx: PipelineContext): + domain_pattern = config.domain_pattern + + generic_config = AddDatasetDomainConfig( + get_domains_to_add=lambda urn: DomainsClass( + domains=[domain for domain in domain_pattern.value(urn)] + ), + semantics=config.semantics, + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "PatternAddDatasetDomain": + config = PatternDatasetDomainConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index e7c2806b8cee0a..c2e6ddf141c5bf 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -1,5 +1,5 @@ import logging -from abc import abstractmethod +from abc import ABCMeta, abstractmethod from typing import List, Optional from deprecated import deprecated @@ -59,11 +59,27 @@ def transform_aspect( # not marked as @abstractmethod to avoid impacting transf ) +# TODO: rename DatasetTransformerV2 to DatasetTransformer after upgrading all existing dataset transformer +class DatasetTransformerV2(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta): + """Transformer that does transforms sequentially on each dataset.""" + + def __init__(self): + super().__init__() + + def entity_types(self) -> List[str]: + return ["dataset"] + + class DatasetOwnershipTransformer(DatasetTransformer, SingleAspectTransformer): def aspect_name(self) -> str: return "ownership" +class DatasetDomainTransformer(DatasetTransformerV2, SingleAspectTransformer): + def aspect_name(self) -> str: + return "domains" + + class DatasetStatusTransformer(DatasetTransformer, SingleAspectTransformer): def aspect_name(self) -> str: return "status" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py index ed806f8e2ff860..12662f55e03997 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py @@ -1,5 +1,6 @@ from datahub.ingestion.api.registry import PluginRegistry from datahub.ingestion.api.transform import Transformer +from datahub.ingestion.transformer import dataset_domain from datahub.ingestion.transformer.add_dataset_browse_path import ( AddDatasetBrowsePathTransformer, ) @@ -45,6 +46,15 @@ transform_registry.register("simple_add_dataset_ownership", SimpleAddDatasetOwnership) transform_registry.register("pattern_add_dataset_ownership", PatternAddDatasetOwnership) +transform_registry.register("add_dataset_domain", dataset_domain.AddDatasetDomain) +transform_registry.register( + "simple_add_dataset_domain", dataset_domain.SimpleAddDatasetDomain +) +transform_registry.register( + "pattern_add_dataset_domain", dataset_domain.PatternAddDatasetDomain +) + + transform_registry.register("add_dataset_tags", AddDatasetTags) transform_registry.register("simple_add_dataset_tags", SimpleAddDatasetTags) transform_registry.register("pattern_add_dataset_tags", PatternAddDatasetTags) From 9f7ff27322fef5b972bce4c2ac94885ddba8d9dd Mon Sep 17 00:00:00 2001 From: MohdSiddique Bagwan Date: Thu, 21 Jul 2022 12:21:42 +0530 Subject: [PATCH 2/7] verify semantics code flow --- .../src/datahub/configuration/common.py | 2 +- .../ingestion/transformer/dataset_domain.py | 21 ++++++++----------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 050617515d999f..e774f9d8ffeb5b 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -19,7 +19,7 @@ class Semantics(Enum): PATCH = "PATCH" # Only apply differences from what exists already on the server -class SemanticTransformerConfigModel(ConfigModel): +class SemanticsTransformerConfigModel(ConfigModel): semantics: Semantics = Semantics.OVERWRITE @validator("semantics", pre=True) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 8fcd72cfeca49a..cc8df07d8818c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -4,7 +4,7 @@ ConfigurationError, KeyValuePattern, Semantics, - SemanticTransformerConfigModel, + SemanticsTransformerConfigModel, ) from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.emitter.mce_builder import Aspect @@ -14,7 +14,7 @@ from datahub.metadata.schema_classes import DomainsClass -class AddDatasetDomainConfig(SemanticTransformerConfigModel): +class AddDatasetDomainConfig(SemanticsTransformerConfigModel): get_domains_to_add: Union[ Callable[[str], DomainsClass], Callable[[str], DomainsClass], @@ -23,11 +23,11 @@ class AddDatasetDomainConfig(SemanticTransformerConfigModel): _resolve_domain_fn = pydantic_resolve_key("get_domains_to_add") -class SimpleDatasetDomainConfig(SemanticTransformerConfigModel): +class SimpleDatasetDomainConfig(SemanticsTransformerConfigModel): domain_urns: List[str] -class PatternDatasetDomainConfig(SemanticTransformerConfigModel): +class PatternDatasetDomainConfig(SemanticsTransformerConfigModel): domain_pattern: KeyValuePattern = KeyValuePattern.all() @@ -58,7 +58,7 @@ def get_domains_to_set( if not mce_domain or not mce_domain.domains: # nothing to add, no need to consult server return None - assert mce_domain + server_domain = graph.get_domain(entity_urn=urn) if server_domain: # compute patch @@ -68,13 +68,10 @@ def get_domains_to_set( if domain not in server_domain.domains: domains_to_add.append(domain) - if domains_to_add: - mce_domain.domains = server_domain.domains + domains_to_add - return mce_domain - else: - return None - else: - return mce_domain + mce_domain.domains.extend(server_domain.domains) + mce_domain.domains.extend(domains_to_add) + + return mce_domain def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] From fc9bbf85dfd77372dfda43e28db3290935d56aad Mon Sep 17 00:00:00 2001 From: MohdSiddique Bagwan Date: Wed, 27 Jul 2022 09:32:15 +0530 Subject: [PATCH 3/7] Add domain registry to validate input domains --- .../src/datahub/ingestion/graph/client.py | 9 ++++- .../ingestion/transformer/dataset_domain.py | 34 ++++++++++++++----- .../utilities/registries/domain_registry.py | 6 +--- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index f79e792b54f65d..1d27648c5f392d 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -19,7 +19,7 @@ GlobalTagsClass, GlossaryTermsClass, OwnershipClass, - TelemetryClientIdClass, + TelemetryClientIdClass, DomainPropertiesClass, ) from datahub.utilities.urns.urn import Urn @@ -183,6 +183,13 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]: aspect_type=OwnershipClass, ) + def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]: + return self.get_aspect_v2( + entity_urn=entity_urn, + aspect="domainProperties", + aspect_type=DomainPropertiesClass, + ) + def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]: return self.get_aspect_v2( entity_urn=entity_urn, diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index cc8df07d8818c4..49ebfa1af087db 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -12,6 +12,7 @@ from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer from datahub.metadata.schema_classes import DomainsClass +from datahub.utilities.registries.domain_registry import DomainRegistry class AddDatasetDomainConfig(SemanticsTransformerConfigModel): @@ -41,16 +42,20 @@ def __init__(self, config: AddDatasetDomainConfig, ctx: PipelineContext): super().__init__() self.ctx = ctx self.config = config - if self.config.semantics == Semantics.PATCH and self.ctx.graph is None: - raise ConfigurationError( - "With PATCH semantics, AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" - ) @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDomain": config = AddDatasetDomainConfig.parse_obj(config_dict) return cls(config, ctx) + @staticmethod + def get_domain_class(graph: DataHubGraph, domains: List[str]) -> DomainsClass: + domain_registry: DomainRegistry = DomainRegistry( + cached_domains=[k for k in domains], graph=graph + ) + domain_class = DomainsClass(domains=[domain_registry.get_domain_urn(domain) for domain in domains]) + return domain_class + @staticmethod def get_domains_to_set( graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass] @@ -99,10 +104,14 @@ class SimpleAddDatasetDomain(AddDatasetDomain): """Transformer that adds a specified set of domains to each dataset.""" def __init__(self, config: SimpleDatasetDomainConfig, ctx: PipelineContext): - domain = DomainsClass(domains=config.domain_urns) + if ctx.graph is None: + raise ConfigurationError( + "AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" + ) + domains = AddDatasetDomain.get_domain_class(ctx.graph, config.domain_urns) generic_config = AddDatasetDomainConfig( - get_domains_to_add=lambda _: domain, + get_domains_to_add=lambda _: domains, semantics=config.semantics, ) super().__init__(generic_config, ctx) @@ -119,12 +128,19 @@ class PatternAddDatasetDomain(AddDatasetDomain): """Transformer that adds a specified set of domains to each dataset.""" def __init__(self, config: PatternDatasetDomainConfig, ctx: PipelineContext): + if ctx.graph is None: + raise ConfigurationError( + "AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" + ) + domain_pattern = config.domain_pattern + def resolve_domain(domain_urn: str) -> DomainsClass: + domains = domain_pattern.value(domain_urn) + return self.get_domain_class(ctx.graph, domains) + generic_config = AddDatasetDomainConfig( - get_domains_to_add=lambda urn: DomainsClass( - domains=[domain for domain in domain_pattern.value(urn)] - ), + get_domains_to_add=resolve_domain, semantics=config.semantics, ) super().__init__(generic_config, ctx) diff --git a/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py b/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py index 4e719c939b6f2b..061cfaf2e3b7df 100644 --- a/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py +++ b/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py @@ -30,11 +30,7 @@ def __init__( assert graph # first try to check if this domain exists by urn maybe_domain_urn = f"urn:li:domain:{domain_identifier}" - from datahub.metadata.schema_classes import DomainPropertiesClass - - maybe_domain_properties = graph.get_aspect_v2( - maybe_domain_urn, DomainPropertiesClass, "domainProperties" - ) + maybe_domain_properties = graph.get_domain_properties(maybe_domain_urn) if maybe_domain_properties: self.domain_registry[domain_identifier] = maybe_domain_urn else: From 6844fecf3dca3a579f160b3123e22a101b772f31 Mon Sep 17 00:00:00 2001 From: MohdSiddique Bagwan Date: Wed, 27 Jul 2022 11:20:03 +0530 Subject: [PATCH 4/7] lintFix --- metadata-ingestion/src/datahub/ingestion/graph/client.py | 3 ++- .../src/datahub/ingestion/transformer/dataset_domain.py | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 1d27648c5f392d..be4116ae00267c 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -15,11 +15,12 @@ from datahub.emitter.serialization_helper import post_json_transform from datahub.metadata.schema_classes import ( DatasetUsageStatisticsClass, + DomainPropertiesClass, DomainsClass, GlobalTagsClass, GlossaryTermsClass, OwnershipClass, - TelemetryClientIdClass, DomainPropertiesClass, + TelemetryClientIdClass, ) from datahub.utilities.urns.urn import Urn diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 49ebfa1af087db..7f2bce2e337a52 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -49,11 +49,15 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDomain": return cls(config, ctx) @staticmethod - def get_domain_class(graph: DataHubGraph, domains: List[str]) -> DomainsClass: + def get_domain_class( + graph: Optional[DataHubGraph], domains: List[str] + ) -> DomainsClass: domain_registry: DomainRegistry = DomainRegistry( cached_domains=[k for k in domains], graph=graph ) - domain_class = DomainsClass(domains=[domain_registry.get_domain_urn(domain) for domain in domains]) + domain_class = DomainsClass( + domains=[domain_registry.get_domain_urn(domain) for domain in domains] + ) return domain_class @staticmethod From e365b42b020d4a510f29dce53742558a8e123319 Mon Sep 17 00:00:00 2001 From: MohdSiddique Bagwan Date: Wed, 3 Aug 2022 18:39:56 +0530 Subject: [PATCH 5/7] rename Semantics to TransformerSemantics --- .../src/datahub/configuration/common.py | 6 ++-- .../ingestion/transformer/dataset_domain.py | 30 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index e774f9d8ffeb5b..0fbaf1e74e3dd2 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -12,15 +12,15 @@ class Config: extra = Extra.forbid -class Semantics(Enum): +class TransformerSemantics(Enum): """Describes semantics for aspect changes""" OVERWRITE = "OVERWRITE" # Apply changes blindly PATCH = "PATCH" # Only apply differences from what exists already on the server -class SemanticsTransformerConfigModel(ConfigModel): - semantics: Semantics = Semantics.OVERWRITE +class TransformerSemanticsConfigModel(ConfigModel): + semantics: TransformerSemantics = TransformerSemantics.OVERWRITE @validator("semantics", pre=True) def ensure_semantics_is_upper_case(cls, v: str) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 7f2bce2e337a52..1a4ca13bb0da46 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -3,8 +3,8 @@ from datahub.configuration.common import ( ConfigurationError, KeyValuePattern, - Semantics, - SemanticsTransformerConfigModel, + TransformerSemantics, + TransformerSemanticsConfigModel, ) from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.emitter.mce_builder import Aspect @@ -15,7 +15,7 @@ from datahub.utilities.registries.domain_registry import DomainRegistry -class AddDatasetDomainConfig(SemanticsTransformerConfigModel): +class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): get_domains_to_add: Union[ Callable[[str], DomainsClass], Callable[[str], DomainsClass], @@ -24,11 +24,11 @@ class AddDatasetDomainConfig(SemanticsTransformerConfigModel): _resolve_domain_fn = pydantic_resolve_key("get_domains_to_add") -class SimpleDatasetDomainConfig(SemanticsTransformerConfigModel): +class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): domain_urns: List[str] -class PatternDatasetDomainConfig(SemanticsTransformerConfigModel): +class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): domain_pattern: KeyValuePattern = KeyValuePattern.all() @@ -36,16 +36,16 @@ class AddDatasetDomain(DatasetDomainTransformer): """Transformer that adds domains to datasets according to a callback function.""" ctx: PipelineContext - config: AddDatasetDomainConfig + config: AddDatasetDomainSemanticsConfig - def __init__(self, config: AddDatasetDomainConfig, ctx: PipelineContext): + def __init__(self, config: AddDatasetDomainSemanticsConfig, ctx: PipelineContext): super().__init__() self.ctx = ctx self.config = config @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDomain": - config = AddDatasetDomainConfig.parse_obj(config_dict) + config = AddDatasetDomainSemanticsConfig.parse_obj(config_dict) return cls(config, ctx) @staticmethod @@ -95,7 +95,7 @@ def transform_aspect( domain_aspect.domains.extend(domain_to_add.domains) - if self.config.semantics == Semantics.PATCH: + if self.config.semantics == TransformerSemantics.PATCH: assert self.ctx.graph domain_aspect = AddDatasetDomain.get_domains_to_set( self.ctx.graph, entity_urn, domain_aspect @@ -107,14 +107,14 @@ def transform_aspect( class SimpleAddDatasetDomain(AddDatasetDomain): """Transformer that adds a specified set of domains to each dataset.""" - def __init__(self, config: SimpleDatasetDomainConfig, ctx: PipelineContext): + def __init__(self, config: SimpleDatasetDomainSemanticsConfig, ctx: PipelineContext): if ctx.graph is None: raise ConfigurationError( "AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" ) domains = AddDatasetDomain.get_domain_class(ctx.graph, config.domain_urns) - generic_config = AddDatasetDomainConfig( + generic_config = AddDatasetDomainSemanticsConfig( get_domains_to_add=lambda _: domains, semantics=config.semantics, ) @@ -124,14 +124,14 @@ def __init__(self, config: SimpleDatasetDomainConfig, ctx: PipelineContext): def create( cls, config_dict: dict, ctx: PipelineContext ) -> "SimpleAddDatasetDomain": - config = SimpleDatasetDomainConfig.parse_obj(config_dict) + config = SimpleDatasetDomainSemanticsConfig.parse_obj(config_dict) return cls(config, ctx) class PatternAddDatasetDomain(AddDatasetDomain): """Transformer that adds a specified set of domains to each dataset.""" - def __init__(self, config: PatternDatasetDomainConfig, ctx: PipelineContext): + def __init__(self, config: PatternDatasetDomainSemanticsConfig, ctx: PipelineContext): if ctx.graph is None: raise ConfigurationError( "AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" @@ -143,7 +143,7 @@ def resolve_domain(domain_urn: str) -> DomainsClass: domains = domain_pattern.value(domain_urn) return self.get_domain_class(ctx.graph, domains) - generic_config = AddDatasetDomainConfig( + generic_config = AddDatasetDomainSemanticsConfig( get_domains_to_add=resolve_domain, semantics=config.semantics, ) @@ -153,5 +153,5 @@ def resolve_domain(domain_urn: str) -> DomainsClass: def create( cls, config_dict: dict, ctx: PipelineContext ) -> "PatternAddDatasetDomain": - config = PatternDatasetDomainConfig.parse_obj(config_dict) + config = PatternDatasetDomainSemanticsConfig.parse_obj(config_dict) return cls(config, ctx) From 66fd5c66c1973d85ba8f65474eb2d759b15e171d Mon Sep 17 00:00:00 2001 From: MohdSiddique Bagwan Date: Wed, 3 Aug 2022 18:44:26 +0530 Subject: [PATCH 6/7] lintFix --- .../src/datahub/ingestion/transformer/dataset_domain.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 1a4ca13bb0da46..1bed18ad43efd3 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -107,7 +107,9 @@ def transform_aspect( class SimpleAddDatasetDomain(AddDatasetDomain): """Transformer that adds a specified set of domains to each dataset.""" - def __init__(self, config: SimpleDatasetDomainSemanticsConfig, ctx: PipelineContext): + def __init__( + self, config: SimpleDatasetDomainSemanticsConfig, ctx: PipelineContext + ): if ctx.graph is None: raise ConfigurationError( "AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" @@ -131,7 +133,9 @@ def create( class PatternAddDatasetDomain(AddDatasetDomain): """Transformer that adds a specified set of domains to each dataset.""" - def __init__(self, config: PatternDatasetDomainSemanticsConfig, ctx: PipelineContext): + def __init__( + self, config: PatternDatasetDomainSemanticsConfig, ctx: PipelineContext + ): if ctx.graph is None: raise ConfigurationError( "AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe" From c6c5694e44dfba5f45aa1defc869a84f9c33780f Mon Sep 17 00:00:00 2001 From: MohdSiddique Bagwan Date: Wed, 3 Aug 2022 20:43:43 +0530 Subject: [PATCH 7/7] fix mypy lint --- .../ingestion/transformer/dataset_domain.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 1bed18ad43efd3..5ed7d7d7616b3d 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Optional, Union +from typing import Callable, List, Optional, Union, cast from datahub.configuration.common import ( ConfigurationError, @@ -86,10 +86,10 @@ def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: - domain_aspect = DomainsClass(domains=[]) + domain_aspect: DomainsClass = DomainsClass(domains=[]) # Check if we have received existing aspect if aspect is not None: - domain_aspect.domains.extend(aspect.domains) # type: ignore[attr-defined] + domain_aspect.domains.extend(cast(DomainsClass, aspect).domains) domain_to_add = self.config.get_domains_to_add(entity_urn) @@ -97,11 +97,19 @@ def transform_aspect( if self.config.semantics == TransformerSemantics.PATCH: assert self.ctx.graph - domain_aspect = AddDatasetDomain.get_domains_to_set( + patch_domain_aspect: Optional[ + DomainsClass + ] = AddDatasetDomain.get_domains_to_set( self.ctx.graph, entity_urn, domain_aspect - ) # type: ignore[assignment] - # ignore mypy errors as Aspect is not a concrete class - return domain_aspect # type: ignore[return-value] + ) + # This will pass the mypy lint + domain_aspect = ( + patch_domain_aspect + if patch_domain_aspect is not None + else domain_aspect + ) + + return cast(Optional[Aspect], domain_aspect) class SimpleAddDatasetDomain(AddDatasetDomain):