Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preset glossary term #12361

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
93 changes: 91 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_term_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand All @@ -35,6 +38,7 @@
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import (
get_platform_from_sqlalchemy_uri,
Expand All @@ -48,6 +52,12 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata._schema_classes import (
AuditStampClass,
GlossaryTermAssociationClass,
GlossaryTermInfoClass,
GlossaryTermsClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Expand Down Expand Up @@ -80,7 +90,6 @@

PAGE_SIZE = 25


chart_type_from_viz_type = {
"line": ChartTypeClass.LINE,
"big_number": ChartTypeClass.LINE,
Expand All @@ -97,7 +106,6 @@
"box_plot": ChartTypeClass.BAR,
}


platform_without_databases = ["druid"]


Expand Down Expand Up @@ -231,6 +239,7 @@
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)
self.sink_config = ctx.pipeline_config.sink.config
self.session = self.login()

def login(self) -> requests.Session:
Expand Down Expand Up @@ -583,6 +592,71 @@
env=self.config.env,
)

def check_if_term_exists(self, term_urn):
graph = DataHubGraph(

Check warning on line 596 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L596

Added line #L596 was not covered by tests
DatahubClientConfig(
server=self.sink_config.get("server", ""),
token=self.sink_config.get("token", ""),
)
)
# Query multiple aspects from entity
result = graph.get_entity_semityped(

Check warning on line 603 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L603

Added line #L603 was not covered by tests
entity_urn=term_urn,
aspects=["glossaryTermInfo"],
)

if result.get("glossaryTermInfo"):
return True
return False

Check warning on line 610 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L608-L610

Added lines #L608 - L610 were not covered by tests

def parse_glossary_terms_from_metrics(
self, metrics, last_modified
) -> GlossaryTermsClass:
glossary_term_urns = []
for metric in metrics:
expression = metric.get("expression", "")
certification_details = metric.get("extra", "")
metric_name = metric.get("metric_name", "")
description = metric.get("description", "")
term_urn = make_term_urn(metric_name)

Check warning on line 621 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L615-L621

Added lines #L615 - L621 were not covered by tests

if self.check_if_term_exists(term_urn):
logger.info(f"Term {term_urn} already exists")
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))
continue

Check warning on line 626 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L623-L626

Added lines #L623 - L626 were not covered by tests

term_properties_aspect = GlossaryTermInfoClass(

Check warning on line 628 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L628

Added line #L628 was not covered by tests
name=metric_name,
definition=f"Description: {description} \nSql Expression: {expression} \nCertification details: {certification_details}",
termSource="",
)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(

Check warning on line 634 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L634

Added line #L634 was not covered by tests
entityUrn=term_urn,
aspect=term_properties_aspect,
)

# Create rest emitter
rest_emitter = DatahubRestEmitter(

Check warning on line 640 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L640

Added line #L640 was not covered by tests
gms_server=self.sink_config.get("server", ""),
token=self.sink_config.get("token", ""),
)
rest_emitter.emit(event)
logger.info(f"Created Glossary term {term_urn}")
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))

Check warning on line 646 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L644-L646

Added lines #L644 - L646 were not covered by tests

return GlossaryTermsClass(terms=glossary_term_urns, auditStamp=last_modified)

Check warning on line 648 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L648

Added line #L648 was not covered by tests

def _is_certified_metric(self, response_result: dict) -> bool:
# We only want to ingest certified metrics for physical preset dataset
metrics = response_result.get("metrics", {})
extra = response_result.get("extra", {})
kind = response_result.get("kind")
if metrics and extra and "certification" in extra and kind == "physical":
return True

Check warning on line 656 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L656

Added line #L656 was not covered by tests
else:
return False

def construct_dataset_from_dataset_data(
self, dataset_data: dict
) -> DatasetSnapshot:
Expand All @@ -591,6 +665,12 @@
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)
now = datetime.now().strftime("%I:%M%p on %B %d, %Y")
modified_ts = int(
dp.parse(dataset_data.get("changed_on") or now).timestamp() * 1000
)
modified_actor = f"urn:li:corpuser:{(dataset_data.get('changed_by') or {}).get('username', 'unknown')}"
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)

dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}"

Expand All @@ -602,6 +682,7 @@
else None,
externalUrl=dataset_url,
)

aspects_items: List[Any] = []
aspects_items.extend(
[
Expand All @@ -610,6 +691,14 @@
]
)

response_result = dataset_response.get("result", {})

if self._is_certified_metric(response_result):
glossary_terms = self.parse_glossary_terms_from_metrics(

Check warning on line 697 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L697

Added line #L697 was not covered by tests
response_result.get("metrics", {}), last_modified
)
aspects_items.append(glossary_terms)

Check warning on line 700 in metadata-ingestion/src/datahub/ingestion/source/superset.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/superset.py#L700

Added line #L700 was not covered by tests

dataset_snapshot = DatasetSnapshot(
urn=datasource_urn,
aspects=aspects_items,
Expand Down
Loading