Skip to content

Commit

Permalink
Druid metadata extractor (amundsen-io#257)
Browse files Browse the repository at this point in the history
* Druid metadata extractor

* add doc

* fix lint

* add extra deps

* Update version
  • Loading branch information
feng-tao authored and Hans Adriaans committed Jun 30, 2022
1 parent 6a0e29f commit f2beb0b
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 5 deletions.
28 changes: 25 additions & 3 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,28 @@ If using the filters option here is the input format
]
```

#### [DruidMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/druid_metadata_extractor.py)
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a [Druid](https://druid.apache.org/) DB.

The `where_clause_suffix` could be defined, normally you would like to filter out the in `INFORMATION_SCHEMA`.

You could specify the following job config
```python
conn_string = "druid+https://{host}:{port}/druid/v2/sql/".format(
host=druid_broker_host,
port=443
)
job_config = ConfigFactory.from_dict({
'extractor.druid_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.druid_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): conn_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=DruidMetadataExtractor(),
loader=AnyLoader()))
job.launch()
```

#### [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database.

Expand Down Expand Up @@ -211,7 +233,7 @@ The `where_clause_suffix` below should define which schemas you'd like to query

The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mssql_metadata_extractor.py)

This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor").
This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor").
```python
job_config = ConfigFactory.from_dict({
'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
Expand All @@ -225,7 +247,7 @@ job = DefaultJob(
job.launch()
```

#### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "PostgresMetadataExtractor")
#### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "MysqlMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a MYSQL database.

By default, the MYSQL database name is used as the cluster name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME`
Expand All @@ -239,7 +261,7 @@ The SQL query driving the extraction is defined [here](https://github.com/lyft/a
job_config = ConfigFactory.from_dict({
'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
'extractor.mysql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
Expand Down
115 changes: 115 additions & 0 deletions databuilder/databuilder/extractor/druid_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from collections import namedtuple
import textwrap

from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from itertools import groupby


TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class DruidMetadataExtractor(Extractor):
"""
Extracts Druid table and column metadata from druid using dbapi extractor
"""
SQL_STATEMENT = textwrap.dedent("""
SELECT
TABLE_SCHEMA as schema,
TABLE_NAME as name,
COLUMN_NAME as col_name,
DATA_TYPE as col_type,
ORDINAL_POSITION as col_sort_order
FROM INFORMATION_SCHEMA.COLUMNS
{where_clause_suffix}
order by TABLE_SCHEMA, TABLE_NAME, CAST(ORDINAL_POSITION AS int)
""")

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster'

DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ',
CLUSTER_KEY: 'gold'})

def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(DruidMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(DruidMetadataExtractor.CLUSTER_KEY))

self.sql_stmt = DruidMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(DruidMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY,
default=''))

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]

def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self):
# type: () -> str
return 'extractor.druid_metadata'

def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
# no table description and column description
for row in group:
last_row = row
columns.append(ColumnMetadata(name=row['col_name'],
description='',
col_type=row['col_type'],
sort_order=row['col_sort_order']))
yield TableMetadata(database='druid',
cluster=self._cluster,
schema=last_row['schema'],
name=last_row['name'],
description='',
columns=columns)

def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
"""
Provides iterator of result row from dbapi extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
9 changes: 7 additions & 2 deletions databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
'ibm-db-sa-py3'
]

all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2
druid = [
'pydruid'
]

all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 + druid

setup(
name='amundsen-databuilder',
Expand All @@ -60,7 +64,8 @@
'athena': athena,
'bigquery': bigquery,
'jsonpath': jsonpath,
'db2': db2
'db2': db2,
'druid': druid,
},
classifiers=[
'Programming Language :: Python :: 2.7',
Expand Down

0 comments on commit f2beb0b

Please sign in to comment.