Skip to content

Commit

Permalink
fix(ingest): dbt - add support for trino datatypes (datahub-project#5379
Browse files Browse the repository at this point in the history
)
  • Loading branch information
aezomz authored and maggiehays committed Aug 1, 2022
1 parent 2df9e36 commit e615ec3
Show file tree
Hide file tree
Showing 15 changed files with 488 additions and 242 deletions.
25 changes: 21 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
POSTGRES_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
SPARK_SQL_TYPES_MAP,
TRINO_SQL_TYPES_MAP,
resolve_postgres_modified_type,
resolve_trino_modified_type,
)
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.sql_common_state import (
Expand Down Expand Up @@ -319,6 +321,7 @@ class DBTNode:
description: str
raw_sql: Optional[str]

dbt_adapter: str
dbt_name: str
dbt_file_path: str

Expand Down Expand Up @@ -375,6 +378,7 @@ def extract_dbt_entities(
all_manifest_entities: Dict[str, Dict[str, Any]],
all_catalog_entities: Dict[str, Dict[str, Any]],
sources_results: List[Dict[str, Any]],
manifest_adapter: str,
load_schemas: bool,
use_identifiers: bool,
tag_prefix: str,
Expand Down Expand Up @@ -450,6 +454,7 @@ def extract_dbt_entities(
meta_props = manifest_node.get("config", {}).get("meta", {})
dbtNode = DBTNode(
dbt_name=key,
dbt_adapter=manifest_adapter,
database=manifest_node["database"],
schema=manifest_node["schema"],
name=name,
Expand Down Expand Up @@ -633,20 +638,24 @@ def get_upstream_lineage(upstream_urns: List[str]) -> UpstreamLineage:
**SNOWFLAKE_TYPES_MAP,
**BIGQUERY_TYPES_MAP,
**SPARK_SQL_TYPES_MAP,
**TRINO_SQL_TYPES_MAP,
}


def get_column_type(
report: DBTSourceReport, dataset_name: str, column_type: str
report: DBTSourceReport, dataset_name: str, column_type: str, dbt_adapter: str
) -> SchemaFieldDataType:
"""
Maps known DBT types to datahub types
"""
TypeClass: Any = _field_type_mapping.get(column_type)

if TypeClass is None:
# attempt Postgres modified type
TypeClass = resolve_postgres_modified_type(column_type)
# resolve modified type
if dbt_adapter == "trino":
TypeClass = resolve_trino_modified_type(column_type)
elif dbt_adapter == "postgres":
TypeClass = resolve_postgres_modified_type(column_type)

# if still not found, report the warning
if TypeClass is None:
Expand Down Expand Up @@ -688,7 +697,9 @@ def get_schema_metadata(
field = SchemaField(
fieldPath=column.name,
nativeDataType=column.data_type,
type=get_column_type(report, node.dbt_name, column.data_type),
type=get_column_type(
report, node.dbt_name, column.data_type, node.dbt_adapter
),
description=description,
nullable=False, # TODO: actually autodetect this
recursive=False,
Expand Down Expand Up @@ -1070,6 +1081,7 @@ def loadManifestAndCatalog(
Optional[str],
Optional[str],
Optional[str],
Optional[str],
Dict[str, Dict[str, Any]],
]:
dbt_manifest_json = self.load_file_as_json(manifest_path)
Expand All @@ -1086,6 +1098,7 @@ def loadManifestAndCatalog(
"dbt_schema_version"
)
manifest_version = dbt_manifest_json.get("metadata", {}).get("dbt_version")
manifest_adapter = dbt_manifest_json.get("metadata", {}).get("adapter_type")

catalog_schema = dbt_catalog_json.get("metadata", {}).get("dbt_schema_version")
catalog_version = dbt_catalog_json.get("metadata", {}).get("dbt_version")
Expand All @@ -1104,6 +1117,7 @@ def loadManifestAndCatalog(
all_manifest_entities,
all_catalog_entities,
sources_results,
manifest_adapter,
load_schemas,
use_identifiers,
tag_prefix,
Expand All @@ -1116,6 +1130,7 @@ def loadManifestAndCatalog(
nodes,
manifest_schema,
manifest_version,
manifest_adapter,
catalog_schema,
catalog_version,
all_manifest_entities,
Expand Down Expand Up @@ -1321,6 +1336,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
nodes,
manifest_schema,
manifest_version,
manifest_adapter,
catalog_schema,
catalog_version,
manifest_nodes_raw,
Expand All @@ -1339,6 +1355,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
additional_custom_props = {
"manifest_schema": manifest_schema,
"manifest_version": manifest_version,
"manifest_adapter": manifest_adapter,
"catalog_schema": catalog_schema,
"catalog_version": catalog_version,
}
Expand Down
30 changes: 30 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ def resolve_postgres_modified_type(type_string: str) -> Any:
return None


def resolve_trino_modified_type(type_string: str) -> Any:
# for cases like timestamp(3)
if re.match(r"[a-zA-Z]+\([0-9]+\)", type_string):
modified_type_base = re.match(r"([a-zA-Z]+)\([0-9]+\)", type_string).group(1) # type: ignore
return TRINO_SQL_TYPES_MAP[modified_type_base]
else:
return TRINO_SQL_TYPES_MAP[type_string]


# see https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html
SNOWFLAKE_TYPES_MAP: Dict[str, Any] = {
"NUMBER": NumberType,
Expand Down Expand Up @@ -308,3 +317,24 @@ def resolve_postgres_modified_type(type_string: str) -> Any:
"struct": RecordType,
"map": RecordType,
}

# https://trino.io/docs/current/language/types.html
# https://github.com/trinodb/trino-python-client/blob/master/trino/sqlalchemy/datatype.py#L75
TRINO_SQL_TYPES_MAP = {
"boolean": BooleanType,
"tinyint": NumberType,
"smallint": NumberType,
"int": NumberType,
"integer": NumberType,
"bigint": NumberType,
"real": NumberType,
"double": NumberType,
"decimal": NumberType,
"varchar": StringType,
"char": StringType,
"varbinary": BytesType,
"json": RecordType,
"date": DateType,
"time": TimeType,
"timestamp": TimeType,
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -208,6 +209,7 @@
"dbt_file_path": "models/transform/customer_details.sql",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -341,6 +343,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -546,6 +549,7 @@
"catalog_type": "VIEW",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -826,6 +830,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -1094,6 +1099,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -1262,6 +1268,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -1453,6 +1460,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -1637,6 +1645,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -1945,6 +1954,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -2177,6 +2187,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -2421,6 +2432,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -2649,6 +2661,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -2877,6 +2890,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down Expand Up @@ -3105,6 +3119,7 @@
"catalog_type": "BASE TABLE",
"manifest_schema": "https://schemas.getdbt.com/dbt/manifest/v4.json",
"manifest_version": "1.0.3",
"manifest_adapter": "postgres",
"catalog_schema": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"catalog_version": "1.0.3"
},
Expand Down
Loading

0 comments on commit e615ec3

Please sign in to comment.