Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add a level of record abstraction #380

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions databuilder/loader/file_system_neo4j_csv_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@

from databuilder.job.base_job import Job
from databuilder.loader.base_loader import Loader
from databuilder.models.neo4j_csv_serde import NODE_LABEL, \
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.utils.closer import Closer
from databuilder.serializers import neo4_serializer


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,7 +89,7 @@ def _delete_dir() -> None:
# Directory should be deleted after publish is finished
Job.closer.register(_delete_dir)

def load(self, csv_serializable: Neo4jCsvSerializable) -> None:
def load(self, csv_serializable: GraphSerializable) -> None:
"""
Writes Neo4jCsvSerializable into CSV files.
There are multiple CSV files that this method writes.
Expand All @@ -107,23 +106,25 @@ def load(self, csv_serializable: Neo4jCsvSerializable) -> None:
:return:
"""

node_dict = csv_serializable.next_node()
while node_dict:
key = (node_dict[NODE_LABEL], len(node_dict))
node = csv_serializable.next_node()
while node:
node_dict = neo4_serializer.serialize_node(node)
key = (node.label, len(node_dict))
file_suffix = '{}_{}'.format(*key)
node_writer = self._get_writer(node_dict,
self._node_file_mapping,
key,
self._node_dir,
file_suffix)
node_writer.writerow(node_dict)
node_dict = csv_serializable.next_node()

relation_dict = csv_serializable.next_relation()
while relation_dict:
key2 = (relation_dict[RELATION_START_LABEL],
relation_dict[RELATION_END_LABEL],
relation_dict[RELATION_TYPE],
node = csv_serializable.next_node()

relation = csv_serializable.next_relation()
while relation:
relation_dict = neo4_serializer.serialize_relationship(relation)
key2 = (relation.start_label,
relation.end_label,
relation.type,
len(relation_dict))

file_suffix = '{}_{}_{}'.format(key2[0], key2[1], key2[2])
Expand All @@ -133,7 +134,7 @@ def load(self, csv_serializable: Neo4jCsvSerializable) -> None:
self._relation_dir,
file_suffix)
relation_writer.writerow(relation_dict)
relation_dict = csv_serializable.next_relation()
relation = csv_serializable.next_relation()

def _get_writer(self,
csv_record_dict: Dict[str, Any],
Expand Down
69 changes: 38 additions & 31 deletions databuilder/models/application.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Any, Dict, List, Union
from typing import List, Union

from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.graph_serializable import GraphSerializable

from databuilder.models.table_metadata import TableMetadata
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship


class Application(Neo4jCsvSerializable):
class Application(GraphSerializable):
"""
Application-table matching model (Airflow task and table)
"""
Expand Down Expand Up @@ -48,14 +48,14 @@ def __init__(self,
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())

def create_next_node(self) -> Union[Dict[str, Any], None]:
def create_next_node(self) -> Union[GraphNode, None]:
# creates new node
try:
return next(self._node_iter)
except StopIteration:
return None

def create_next_relation(self) -> Union[Dict[str, Any], None]:
def create_next_relation(self) -> Union[GraphRelationship, None]:
try:
return next(self._relation_iter)
except StopIteration:
Expand All @@ -74,40 +74,47 @@ def get_application_model_key(self) -> str:
dag=self.dag,
task=self.task)

def create_nodes(self) -> List[Dict[str, Any]]:
def create_nodes(self) -> List[GraphNode]:
"""
Create a list of Neo4j node records
:return:
"""
results = []

results.append({
NODE_KEY: self.get_application_model_key(),
NODE_LABEL: Application.APPLICATION_LABEL,
Application.APPLICATION_URL_NAME: self.application_url,
Application.APPLICATION_NAME: Application.APPLICATION_TYPE,
Application.APPLICATION_DESCRIPTION:
'{app_type} with id {id}'.format(app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag,
task_id=self.task)),
Application.APPLICATION_ID: Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag,
task_id=self.task)
})
application_description = '{app_type} with id {id}'.format(
app_type=Application.APPLICATION_TYPE,
id=Application.APPLICATION_ID_FORMAT.format(dag_id=self.dag, task_id=self.task)
)
application_id = Application.APPLICATION_ID_FORMAT.format(
dag_id=self.dag,
task_id=self.task
)
application_node = GraphNode(
key=self.get_application_model_key(),
label=Application.APPLICATION_LABEL,
attributes={
Application.APPLICATION_URL_NAME: self.application_url,
Application.APPLICATION_NAME: Application.APPLICATION_TYPE,
Application.APPLICATION_DESCRIPTION: application_description,
Application.APPLICATION_ID: application_id
}
)
results.append(application_node)

return results

def create_relation(self) -> List[Dict[str, Any]]:
def create_relation(self) -> List[GraphRelationship]:
"""
Create a list of relations between application and table nodes
:return:
"""
results = [{
RELATION_START_KEY: self.get_table_model_key(),
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_KEY: self.get_application_model_key(),
RELATION_END_LABEL: Application.APPLICATION_LABEL,
RELATION_TYPE: Application.TABLE_APPLICATION_RELATION_TYPE,
RELATION_REVERSE_TYPE: Application.APPLICATION_TABLE_RELATION_TYPE
}]

graph_relationship = GraphRelationship(
start_key=self.get_table_model_key(),
start_label=TableMetadata.TABLE_NODE_LABEL,
end_key=self.get_application_model_key(),
end_label=Application.APPLICATION_LABEL,
type=Application.TABLE_APPLICATION_RELATION_TYPE,
reverse_type=Application.APPLICATION_TABLE_RELATION_TYPE,
attributes={}
)
results = [graph_relationship]
return results
49 changes: 27 additions & 22 deletions databuilder/models/badge.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Any, Dict, List, Optional
from typing import List, Optional
import re

from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship


class Badge:
Expand All @@ -19,7 +19,7 @@ def __repr__(self) -> str:
self.category)


class BadgeMetadata(Neo4jCsvSerializable):
class BadgeMetadata(GraphSerializable):
"""
Badge model.
"""
Expand Down Expand Up @@ -62,14 +62,14 @@ def __repr__(self) -> str:
return 'BadgeMetadata({!r}, {!r})'.format(self.start_label,
self.start_key)

def create_next_node(self) -> Optional[Dict[str, Any]]:
def create_next_node(self) -> Optional[GraphNode]:
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None

def create_next_relation(self) -> Optional[Dict[str, Any]]:
def create_next_relation(self) -> Optional[GraphRelationship]:
try:
return next(self._relation_iter)
except StopIteration:
Expand All @@ -84,30 +84,35 @@ def get_badge_key(name: str) -> str:
def get_metadata_model_key(self) -> str:
return self.start_key

def create_nodes(self) -> List[Dict[str, Any]]:
def create_nodes(self) -> List[GraphNode]:
"""
Create a list of Neo4j node records
:return:
"""
results = []
for badge in self.badges:
if badge:
results.append({
NODE_KEY: self.get_badge_key(badge.name),
NODE_LABEL: self.BADGE_NODE_LABEL,
self.BADGE_CATEGORY: badge.category
})
node = GraphNode(
key=self.get_badge_key(badge.name),
label=self.BADGE_NODE_LABEL,
attributes={
self.BADGE_CATEGORY: badge.category
}
)
results.append(node)
return results

def create_relation(self) -> List[Dict[str, Any]]:
def create_relation(self) -> List[GraphRelationship]:
results = []
for badge in self.badges:
results.append({
RELATION_START_LABEL: self.start_label,
RELATION_END_LABEL: self.BADGE_NODE_LABEL,
RELATION_START_KEY: self.start_key,
RELATION_END_KEY: self.get_badge_key(badge.name),
RELATION_TYPE: self.BADGE_RELATION_TYPE,
RELATION_REVERSE_TYPE: self.INVERSE_BADGE_RELATION_TYPE,
})
relation = GraphRelationship(
start_label=self.start_label,
end_label=self.BADGE_NODE_LABEL,
start_key=self.start_key,
end_key=self.get_badge_key(badge.name),
type=self.BADGE_RELATION_TYPE,
reverse_type=self.INVERSE_BADGE_RELATION_TYPE,
attributes={}
)
results.append(relation)
return results
41 changes: 21 additions & 20 deletions databuilder/models/column_usage_model.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Union, Dict, Any, Iterable, List
from typing import Union, Iterable, List

from databuilder.models.neo4j_csv_serde import (
Neo4jCsvSerializable, RELATION_START_KEY, RELATION_END_KEY,
RELATION_START_LABEL, RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
)
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.usage.usage_constants import (
READ_RELATION_TYPE, READ_REVERSE_RELATION_TYPE, READ_RELATION_COUNT_PROPERTY
)
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.user import User
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship


class ColumnUsageModel(Neo4jCsvSerializable):
class ColumnUsageModel(GraphSerializable):

"""
A model represents user <--> column graph model
Expand Down Expand Up @@ -49,38 +48,40 @@ def __init__(self,
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())

def create_next_node(self) -> Union[Dict[str, Any], None]:
def create_next_node(self) -> Union[GraphNode, None]:

try:
return next(self._node_iter)
except StopIteration:
return None

def create_nodes(self) -> List[Dict[str, Any]]:
def create_nodes(self) -> List[GraphNode]:
"""
Create a list of Neo4j node records
:return:
"""

return User(email=self.user_email).create_nodes()

def create_next_relation(self) -> Union[Dict[str, Any], None]:

def create_next_relation(self) -> Union[GraphRelationship, None]:
try:
return next(self._relation_iter)
except StopIteration:
return None

def create_relation(self) -> Iterable[Any]:
return [{
RELATION_START_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_END_LABEL: User.USER_NODE_LABEL,
RELATION_START_KEY: self._get_table_key(),
RELATION_END_KEY: self._get_user_key(self.user_email),
RELATION_TYPE: ColumnUsageModel.TABLE_USER_RELATION_TYPE,
RELATION_REVERSE_TYPE: ColumnUsageModel.USER_TABLE_RELATION_TYPE,
ColumnUsageModel.READ_RELATION_COUNT: self.read_count
}]
def create_relation(self) -> Iterable[GraphRelationship]:
relationship = GraphRelationship(
start_key=self._get_table_key(),
start_label=TableMetadata.TABLE_NODE_LABEL,
end_key=self._get_user_key(self.user_email),
end_label=User.USER_NODE_LABEL,
type=ColumnUsageModel.TABLE_USER_RELATION_TYPE,
reverse_type=ColumnUsageModel.USER_TABLE_RELATION_TYPE,
attributes={
ColumnUsageModel.READ_RELATION_COUNT: self.read_count
}
)
return [relationship]

def _get_table_key(self) -> str:
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
Expand Down
Loading