Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Atlas databuilder compatibility #1225

Merged
merged 7 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions common/amundsen_common/utils/atlas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import abc
import re
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Set


class AtlasStatus:
Expand All @@ -26,10 +26,27 @@ class AtlasCommonTypes:
bookmark = 'Bookmark'
user = 'User'
reader = 'Reader'
cluster = 'Cluster'
application = 'Application'
data_set = 'DataSet'

# These are just `virtual` types which do not actually exist in Atlas.
# We use those constant values to distinguish Atlas Python Client methods which should be used for populating
# such data.
# Tags are published using Glossary API, badges using Classification API. Other entities are published using regular
# Entity API.
tag = 'Tag'
badge = 'Badge'


class AtlasTableTypes:
table = 'Table'
column = 'Column'
database = 'Database'
schema = 'Schema'
source = 'Source'
watermark = 'TablePartition'
process = 'LineageProcess'


class AtlasDashboardTypes:
Expand Down Expand Up @@ -165,6 +182,34 @@ def amundsen_key(self) -> str:
"""
pass

@property
def native_atlas_entity_types(self) -> Set[str]:
"""
Atlas can be populated using two approaches:
1. Using Atlas-provided bridge/hook tools to ingest data in push manner (like Atlas Hive Hook)
2. Using Amundsen-provided databuilder framework in pull manner

Since Atlas-provided tools follow different approach for rendering qualified name than databuilder does,
to provide compatibility for both approaches we need to act differently depending whether the table entity
was loaded by Atlas-provided or Amundsen-provided tools. We distinguish them by entity type - in Atlas the
naming convention assumes '_table' suffix in entity name while Amundsen does not have such suffix.

If the entity_type (database in Amundsen lingo) is one of the values from this property, we treat it like
it was provided by Atlas and follow Atlas qualified name convention.

If the opposite is true - we treat it like it was provided by Amundsen Databuilder, use generic entity types
and follow Amundsen key name convention.
"""
return {'hive_table'}

@property
def entity_type(self) -> str:
if self.is_qualified_name:
return self._database or ''
else:
return self.get_details()['database'] \
if self.get_details()['database'] in self.native_atlas_entity_types else 'Table'


class AtlasTableKey(AtlasKey):
@property
Expand All @@ -177,7 +222,7 @@ def amundsen_key_regex(self) -> Any:

@property
def qualified_name(self) -> str:
if not self.is_qualified_name:
if not self.is_qualified_name and self.get_details()['database'] in self.native_atlas_entity_types:
spec = self._get_details_from_key()

schema = spec['schema']
Expand Down
4 changes: 2 additions & 2 deletions common/tests/unit/utils/test_atlas_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ def test_table_key_qualified_name_validation(self) -> None:

def test_table_key_qualified_name_from_amundsen_key(self) -> None:
params = [
('hive://cluster_name.db_name/table_name', 'db_name.table_name@cluster_name'),
('hive_table://cluster_name.dot.db_name/table_name', 'db_name.table_name@cluster_name.dot')
('hive_table://cluster_name.db_name/table_name', 'db_name.table_name@cluster_name'),
('hive://cluster_name.db_name/table_name', 'hive://cluster_name.db_name/table_name')
]

for key, qn in params:
Expand Down
85 changes: 73 additions & 12 deletions databuilder/databuilder/extractor/atlas_search_data_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,40 @@ def get_chart_names(queries: Optional[List]) -> List[str]:

return AtlasSearchDataExtractorHelpers.get_display_text(charts)

@staticmethod
def get_table_database(table_key: str) -> str:
result = AtlasTableKey(table_key).get_details().get('database', 'hive_table')

return result

@staticmethod
def get_source_description(parameters: Optional[dict]) -> str:
parameters = parameters or dict()

return parameters.get('sourceDescription', '')

@staticmethod
def get_usage(readers: Optional[List]) -> Tuple[int, int]:
readers = readers or []

score = 0
unique = 0

for reader in readers:
reader_status = reader.get('status')
entity_status = reader.get('relationshipAttributes', dict()).get('entity', dict()).get('entityStatus', '')
relationship_status = reader.get('relationshipAttributes',
dict()).get('entity',
dict()).get('relationshipStatus', '')

if reader_status == entity_status == relationship_status == 'ACTIVE':
score += reader.get('attributes', dict()).get('count', 0)

if score > 0:
unique += 1

return score, unique


class AtlasSearchDataExtractor(Extractor):
ATLAS_URL_CONFIG_KEY = 'atlas_url'
Expand Down Expand Up @@ -116,16 +150,21 @@ class AtlasSearchDataExtractor(Extractor):
# es_document field, atlas field path, modification function, default_value
FIELDS_MAPPING_SPEC: type_fields_mapping_spec = {
'Table': [
('database', 'typeName', None, None),
('cluster', 'attributes.qualifiedName', lambda x: x.split('@')[-1], None),
('schema', 'relationshipAttributes.db.displayText', None, None),
('database', 'attributes.qualifiedName',
lambda x: AtlasSearchDataExtractorHelpers.get_table_database(x), None),
('cluster', 'attributes.qualifiedName',
lambda x: AtlasTableKey(x).get_details()['cluster'], None),
('schema', 'attributes.qualifiedName',
lambda x: AtlasTableKey(x).get_details()['schema'], None),
('name', 'attributes.name', None, None),
('key', ['attributes.qualifiedName', 'typeName'],
lambda x, y: AtlasSearchDataExtractorHelpers.get_entity_uri(x, y), None),
('description', 'attributes.description', None, None),
('last_updated_timestamp', 'updateTime', lambda x: int(x) / 1000, 0),
('total_usage', 'attributes.popularityScore', lambda x: int(x), 0),
('unique_usage', 'attributes.uniqueUsage', lambda x: int(x), 1),
('total_usage', 'relationshipAttributes.readers',
lambda x: AtlasSearchDataExtractorHelpers.get_usage(x)[0], 0),
('unique_usage', 'relationshipAttributes.readers',
lambda x: AtlasSearchDataExtractorHelpers.get_usage(x)[1], 0),
('column_names', 'relationshipAttributes.columns',
lambda x: AtlasSearchDataExtractorHelpers.get_entity_names(x), []),
('column_descriptions', 'relationshipAttributes.columns',
Expand All @@ -134,15 +173,19 @@ class AtlasSearchDataExtractor(Extractor):
lambda x: AtlasSearchDataExtractorHelpers.get_display_text(x), []),
('badges', 'classifications',
lambda x: AtlasSearchDataExtractorHelpers.get_badges_from_classifications(x), []),
('display_name', 'attributes.qualifiedName', lambda x: x.split('@')[0], None),
('schema_description', 'attributes.parameters.sourceDescription', None, None),
('display_name', 'attributes.qualifiedName',
lambda x: '.'.join([AtlasTableKey(x).get_details()['schema'], AtlasTableKey(x).get_details()['table']]),
None),
('schema_description', 'attributes.parameters',
lambda x: AtlasSearchDataExtractorHelpers.get_source_description(x), ''),
('programmatic_descriptions', 'attributes.parameters', lambda x: [str(s) for s in list(x.values())], {})
],
'Dashboard': [
('group_name', 'relationshipAttributes.group.attributes.name', None, None),
('name', 'attributes.name', None, None),
('description', 'attributes.description', None, None),
('total_usage', 'attributes.popularityScore', lambda x: int(x), 0),
('total_usage', 'relationshipAttributes.readers',
lambda x: AtlasSearchDataExtractorHelpers.get_usage(x)[0], 0),
('product', 'attributes.product', None, None),
('cluster', 'attributes.cluster', None, None),
('group_description', 'relationshipAttributes.group.attributes.description', None, None),
Expand All @@ -159,17 +202,35 @@ class AtlasSearchDataExtractor(Extractor):
lambda x: AtlasSearchDataExtractorHelpers.get_display_text(x), []),
('badges', 'classifications',
lambda x: AtlasSearchDataExtractorHelpers.get_badges_from_classifications(x), [])
],
'User': [
('email', 'attributes.qualifiedName', None, ''),
('first_name', 'attributes.first_name', None, ''),
('last_name', 'attributes.last_name', None, ''),
('full_name', 'attributes.full_name', None, ''),
('github_username', 'attributes.github_username', None, ''),
('team_name', 'attributes.team_name', None, ''),
('employee_type', 'attributes.employee_type', None, ''),
('manager_email', 'attributes.manager_email', None, ''),
('slack_id', 'attributes.slack_id', None, ''),
('role_name', 'attributes.role_name', None, ''),
('is_active', 'attributes.is_active', None, ''),
('total_read', 'attributes.total_read', None, ''),
('total_own', 'attributes.total_own', None, ''),
('total_follow', 'attributes.total_follow', None, '')
]
}

ENTITY_MODEL_BY_TYPE = {
'Table': 'databuilder.models.table_elasticsearch_document.TableESDocument',
'Dashboard': 'databuilder.models.dashboard_elasticsearch_document.DashboardESDocument'
'Dashboard': 'databuilder.models.dashboard_elasticsearch_document.DashboardESDocument',
'User': 'databuilder.models.user_elasticsearch_document.UserESDocument'
}

REQUIRED_RELATIONSHIPS_BY_TYPE = {
'Table': ['columns'],
'Dashboard': ['group', 'charts', 'executions', 'queries']
'Table': ['columns', 'readers'],
'Dashboard': ['group', 'charts', 'executions', 'queries'],
'User': []
}

def init(self, conf: ConfigTree) -> None:
Expand Down Expand Up @@ -212,7 +273,7 @@ def search_chunk_size(self) -> int:

@property
def relationships(self) -> Optional[List[str]]:
return AtlasSearchDataExtractor.REQUIRED_RELATIONSHIPS_BY_TYPE.get(self.entity_type)
return AtlasSearchDataExtractor.REQUIRED_RELATIONSHIPS_BY_TYPE.get(self.entity_type) # type: ignore

def extract(self) -> Any:
if not self._extract_iter:
Expand Down
66 changes: 65 additions & 1 deletion databuilder/databuilder/models/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@

from typing import Iterator, Union

from amundsen_common.utils.atlas import (
AtlasCommonParams, AtlasCommonTypes, AtlasTableTypes,
)
from amundsen_rds.models import RDSModel
from amundsen_rds.models.application import Application as RDSApplication, ApplicationTable as RDSApplicationTable

from databuilder.models.atlas_entity import AtlasEntity
from databuilder.models.atlas_relationship import AtlasRelationship
from databuilder.models.atlas_serializable import AtlasSerializable
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.table_serializable import TableSerializable
from databuilder.serializers.atlas_serializer import get_entity_attrs
from databuilder.utils.atlas import AtlasRelationshipTypes, AtlasSerializedEntityOperation


class Application(GraphSerializable, TableSerializable):
class Application(GraphSerializable, TableSerializable, AtlasSerializable):
"""
Application-table matching model (Airflow task and table)
"""
Expand Down Expand Up @@ -51,6 +59,8 @@ def __init__(self,
self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()
self._record_iter = self._create_record_iterator()
self._atlas_entity_iterator = self._create_next_atlas_entity()
self._atlas_relation_iterator = self._create_atlas_relation_iterator()

def create_next_node(self) -> Union[GraphNode, None]:
# creates new node
Expand Down Expand Up @@ -148,3 +158,57 @@ def _create_record_iterator(self) -> Iterator[RDSModel]:
application_rk=self.get_application_model_key(),
)
yield application_table_record

def create_next_atlas_entity(self) -> Union[AtlasEntity, None]:
try:
return next(self._atlas_entity_iterator)
except StopIteration:
return None

def _create_next_atlas_entity(self) -> Iterator[AtlasEntity]:
application_description = '{app_type} with id {id}'.format(
app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag, task_id=self.task)
)

application_id = Application.APPLICATION_ID_FORMAT.format(
dag_id=self.dag,
task_id=self.task
)

group_attrs_mapping = [
(AtlasCommonParams.qualified_name, self.get_application_model_key()),
('name', Application.APPLICATION_TYPE),
('id', application_id),
('description', application_description),
('application_url', self.application_url)
]

entity_attrs = get_entity_attrs(group_attrs_mapping)

entity = AtlasEntity(
typeName=AtlasCommonTypes.application,
operation=AtlasSerializedEntityOperation.CREATE,
relationships=None,
attributes=entity_attrs,
)

yield entity

def create_next_atlas_relation(self) -> Union[AtlasRelationship, None]:
try:
return next(self._atlas_relation_iterator)
except StopIteration:
return None

def _create_atlas_relation_iterator(self) -> Iterator[AtlasRelationship]:
relationship = AtlasRelationship(
relationshipType=AtlasRelationshipTypes.table_application,
entityType1=AtlasTableTypes.table,
entityQualifiedName1=self.get_table_model_key(),
entityType2=AtlasCommonTypes.application,
entityQualifiedName2=self.get_application_model_key(),
attributes={}
)

yield relationship
Loading