Skip to content

Commit

Permalink
feat: generalize application model (+ add databricks application supp…
Browse files Browse the repository at this point in the history
…ort) (#1398)

Signed-off-by: jroof88 <jack.roof@samsara.com>
  • Loading branch information
jroof88 authored Aug 12, 2021
1 parent 3297b05 commit 1a75a2f
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 170 deletions.
129 changes: 72 additions & 57 deletions databuilder/databuilder/models/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,26 @@

class Application(GraphSerializable, TableSerializable, AtlasSerializable):
"""
Application-table matching model (Airflow task and table)
Application-table matching model
Application represent the applications that generate tables
"""

APPLICATION_LABEL = 'Application'
APPLICATION_KEY_FORMAT = 'application://{cluster}.airflow/{dag}/{task}'

APPLICATION_KEY_FORMAT = 'application://{application_type}/{database}/{table}'
APPLICATION_ID_FORMAT = '{application_type}.{database}.{table}'
APPLICATION_DESCRIPTION_FORMAT = '{application_type} application for {database}.{table}'

# Hardcode Airflow configuration values for backwards compatibility
AIRFLOW_APPLICATION_KEY_FORMAT = 'application://{cluster}.airflow/{dag}/{task}'
AIRFLOW_APPLICATION_ID_FORMAT = '{dag}/{task}'
AIRFLOW_APPLICATION_DESCRIPTION_FORMAT = 'Airflow with id {id}'

APPLICATION_URL_NAME = 'application_url'
APPLICATION_NAME = 'name'
APPLICATION_ID = 'id'
APPLICATION_ID_FORMAT = '{dag_id}/{task_id}'
APPLICATION_DESCRIPTION = 'description'
APPLICATION_TYPE = 'Airflow'

APPLICATION_TABLE_RELATION_TYPE = 'GENERATES'
TABLE_APPLICATION_RELATION_TYPE = 'DERIVED_FROM'

Expand All @@ -46,15 +54,54 @@ def __init__(self,
cluster: str = 'gold',
schema: str = '',
table_name: str = '',
exec_date: str = '',
application_type: str = 'Airflow',
) -> None:
self.task = task_id

# todo: need to modify this hack
self.application_url = application_url_template.format(dag_id=dag_id)
self.database, self.cluster, self.schema, self.table = db_name, cluster, schema, table_name

self.database = db_name
self.cluster = cluster
self.schema = schema
self.table = table_name
self.dag = dag_id
self.application_type = application_type
self.task = task_id

application_id_format = Application.APPLICATION_ID_FORMAT
application_key_format = Application.APPLICATION_KEY_FORMAT
application_description_format = Application.APPLICATION_DESCRIPTION_FORMAT

# The Application model was originally designed to only be compatible with Airflow
# If the type is Airflow we must use the hardcoded Airflow constants for backwards compatibility
if self.application_type.lower() == 'airflow':
application_id_format = Application.AIRFLOW_APPLICATION_ID_FORMAT
application_key_format = Application.AIRFLOW_APPLICATION_KEY_FORMAT
application_description_format = Application.AIRFLOW_APPLICATION_DESCRIPTION_FORMAT

self.application_id = application_id_format.format(
dag=self.dag,
task=self.task,
table=self.table,
database=self.database,
application_type=self.application_type,
)
self.application_key = application_key_format.format(
dag=self.dag,
task=self.task,
table=self.table,
database=self.database,
cluster=self.cluster,
application_type=self.application_type,
)

self.application_description = application_description_format.format(
dag=self.dag,
task=self.task,
table=self.table,
database=self.database,
cluster=self.cluster,
id=self.application_id,
application_type=self.application_type,
)

self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()
Expand Down Expand Up @@ -88,33 +135,19 @@ def get_table_model_key(self) -> str:
tbl=self.table,
cluster=self.cluster)

def get_application_model_key(self) -> str:
# returns formatting string for application of type dag
return Application.APPLICATION_KEY_FORMAT.format(cluster=self.cluster,
dag=self.dag,
task=self.task)

def _create_node_iterator(self) -> Iterator[GraphNode]:
"""
Create an application node
:return:
"""
application_description = '{app_type} with id {id}'.format(
app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag, task_id=self.task)
)
application_id = Application.APPLICATION_ID_FORMAT.format(
dag_id=self.dag,
task_id=self.task
)
application_node = GraphNode(
key=self.get_application_model_key(),
key=self.application_key,
label=Application.APPLICATION_LABEL,
attributes={
Application.APPLICATION_URL_NAME: self.application_url,
Application.APPLICATION_NAME: Application.APPLICATION_TYPE,
Application.APPLICATION_DESCRIPTION: application_description,
Application.APPLICATION_ID: application_id
Application.APPLICATION_NAME: self.application_type,
Application.APPLICATION_DESCRIPTION: self.application_description,
Application.APPLICATION_ID: self.application_id
}
)
yield application_node
Expand All @@ -127,7 +160,7 @@ def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
graph_relationship = GraphRelationship(
start_key=self.get_table_model_key(),
start_label=TableMetadata.TABLE_NODE_LABEL,
end_key=self.get_application_model_key(),
end_key=self.application_key,
end_label=Application.APPLICATION_LABEL,
type=Application.TABLE_APPLICATION_RELATION_TYPE,
reverse_type=Application.APPLICATION_TABLE_RELATION_TYPE,
Expand All @@ -136,26 +169,18 @@ def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
yield graph_relationship

def _create_record_iterator(self) -> Iterator[RDSModel]:
application_description = '{app_type} with id {id}'.format(
app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag, task_id=self.task)
)
application_id = Application.APPLICATION_ID_FORMAT.format(
dag_id=self.dag,
task_id=self.task
)
application_record = RDSApplication(
rk=self.get_application_model_key(),
rk=self.application_key,
application_url=self.application_url,
name=Application.APPLICATION_TYPE,
id=application_id,
description=application_description
name=self.application_type,
id=self.application_id,
description=self.application_description
)
yield application_record

application_table_record = RDSApplicationTable(
rk=self.get_table_model_key(),
application_rk=self.get_application_model_key(),
application_rk=self.application_key,
)
yield application_table_record

Expand All @@ -166,21 +191,11 @@ def create_next_atlas_entity(self) -> Union[AtlasEntity, None]:
return None

def _create_next_atlas_entity(self) -> Iterator[AtlasEntity]:
application_description = '{app_type} with id {id}'.format(
app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag, task_id=self.task)
)

application_id = Application.APPLICATION_ID_FORMAT.format(
dag_id=self.dag,
task_id=self.task
)

group_attrs_mapping = [
(AtlasCommonParams.qualified_name, self.get_application_model_key()),
('name', Application.APPLICATION_TYPE),
('id', application_id),
('description', application_description),
(AtlasCommonParams.qualified_name, self.application_key),
('name', self.application_type),
('id', self.application_id),
('description', self.application_description),
('application_url', self.application_url)
]

Expand All @@ -207,7 +222,7 @@ def _create_atlas_relation_iterator(self) -> Iterator[AtlasRelationship]:
entityType1=AtlasTableTypes.table,
entityQualifiedName1=self.get_table_model_key(),
entityType2=AtlasCommonTypes.application,
entityQualifiedName2=self.get_application_model_key(),
entityQualifiedName2=self.application_key,
attributes={}
)

Expand Down
1 change: 0 additions & 1 deletion databuilder/docs/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ Calculate statistics that you care about such as min/max/average etc.

#### Description
This is used to provide users a way to find out what job/application is responsible for writing to this dataset.
Currently the model assumes the application has to be in airflow, but in theory it could be generalized to other orchestration frameworks.

#### Extraction
TODO
Expand Down
2 changes: 1 addition & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from setuptools import find_packages, setup

__version__ = '6.0.1'
__version__ = '6.0.2'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down
Loading

0 comments on commit 1a75a2f

Please sign in to comment.