diff --git a/databuilder/README.md b/databuilder/README.md index b470622feb..85bc8b656b 100644 --- a/databuilder/README.md +++ b/databuilder/README.md @@ -178,6 +178,28 @@ If using the filters option here is the input format ] ``` +#### [DruidMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/druid_metadata_extractor.py) +An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a [Druid](https://druid.apache.org/) DB. + +The `where_clause_suffix` could be defined, normally you would like to filter out the in `INFORMATION_SCHEMA`. + +You could specify the following job config +```python +conn_string = "druid+https://{host}:{port}/druid/v2/sql/".format( + host=druid_broker_host, + port=443 +) +job_config = ConfigFactory.from_dict({ + 'extractor.druid_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.druid_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): conn_string()}) +job = DefaultJob( + conf=job_config, + task=DefaultTask( + extractor=DruidMetadataExtractor(), + loader=AnyLoader())) +job.launch() +``` + #### [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor") An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database. @@ -211,7 +233,7 @@ The `where_clause_suffix` below should define which schemas you'd like to query The SQL query driving the extraction is defined [here](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mssql_metadata_extractor.py) -This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor"). +This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor"). ```python job_config = ConfigFactory.from_dict({ 'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, @@ -225,7 +247,7 @@ job = DefaultJob( job.launch() ``` -#### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "PostgresMetadataExtractor") +#### [MysqlMetadataExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/mysql_metadata_extractor.py "MysqlMetadataExtractor") An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a MYSQL database. By default, the MYSQL database name is used as the cluster name. To override this, set `USE_CATALOG_AS_CLUSTER_NAME` @@ -239,7 +261,7 @@ The SQL query driving the extraction is defined [here](https://github.com/lyft/a job_config = ConfigFactory.from_dict({ 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, - 'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) + 'extractor.mysql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob(conf=job_config, task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()), publisher=Neo4jCsvPublisher()) diff --git a/databuilder/databuilder/extractor/druid_metadata_extractor.py b/databuilder/databuilder/extractor/druid_metadata_extractor.py new file mode 100644 index 0000000000..bf5d53d561 --- /dev/null +++ b/databuilder/databuilder/extractor/druid_metadata_extractor.py @@ -0,0 +1,115 @@ +import logging +from collections import namedtuple +import textwrap + +from pyhocon import ConfigFactory, ConfigTree # noqa: F401 +from typing import Iterator, Union, Dict, Any # noqa: F401 + +from databuilder import Scoped +from databuilder.extractor.base_extractor import Extractor +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.models.table_metadata import TableMetadata, ColumnMetadata +from itertools import groupby + + +TableKey = namedtuple('TableKey', ['schema', 'table_name']) + +LOGGER = logging.getLogger(__name__) + + +class DruidMetadataExtractor(Extractor): + """ + Extracts Druid table and column metadata from druid using dbapi extractor + """ + SQL_STATEMENT = textwrap.dedent(""" + SELECT + TABLE_SCHEMA as schema, + TABLE_NAME as name, + COLUMN_NAME as col_name, + DATA_TYPE as col_type, + ORDINAL_POSITION as col_sort_order + FROM INFORMATION_SCHEMA.COLUMNS + {where_clause_suffix} + order by TABLE_SCHEMA, TABLE_NAME, CAST(ORDINAL_POSITION AS int) + """) + + # CONFIG KEYS + WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix' + CLUSTER_KEY = 'cluster' + + DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ', + CLUSTER_KEY: 'gold'}) + + def init(self, conf): + # type: (ConfigTree) -> None + conf = conf.with_fallback(DruidMetadataExtractor.DEFAULT_CONFIG) + self._cluster = '{}'.format(conf.get_string(DruidMetadataExtractor.CLUSTER_KEY)) + + self.sql_stmt = DruidMetadataExtractor.SQL_STATEMENT.format( + where_clause_suffix=conf.get_string(DruidMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY, + default='')) + + self._alchemy_extractor = SQLAlchemyExtractor() + sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\ + .with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) + + self._alchemy_extractor.init(sql_alch_conf) + self._extract_iter = None # type: Union[None, Iterator] + + def extract(self): + # type: () -> Union[TableMetadata, None] + if not self._extract_iter: + self._extract_iter = self._get_extract_iter() + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self): + # type: () -> str + return 'extractor.druid_metadata' + + def _get_extract_iter(self): + # type: () -> Iterator[TableMetadata] + """ + Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata + :return: + """ + for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key): + columns = [] + # no table description and column description + for row in group: + last_row = row + columns.append(ColumnMetadata(name=row['col_name'], + description='', + col_type=row['col_type'], + sort_order=row['col_sort_order'])) + yield TableMetadata(database='druid', + cluster=self._cluster, + schema=last_row['schema'], + name=last_row['name'], + description='', + columns=columns) + + def _get_raw_extract_iter(self): + # type: () -> Iterator[Dict[str, Any]] + """ + Provides iterator of result row from dbapi extractor + :return: + """ + row = self._alchemy_extractor.extract() + while row: + yield row + row = self._alchemy_extractor.extract() + + def _get_table_key(self, row): + # type: (Dict[str, Any]) -> Union[TableKey, None] + """ + Table key consists of schema and table name + :param row: + :return: + """ + if row: + return TableKey(schema=row['schema'], table_name=row['name']) + + return None diff --git a/databuilder/setup.py b/databuilder/setup.py index 004052e112..1535a0c686 100644 --- a/databuilder/setup.py +++ b/databuilder/setup.py @@ -37,7 +37,11 @@ 'ibm-db-sa-py3' ] -all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 +druid = [ + 'pydruid' +] + +all_deps = requirements + kafka + cassandra + glue + snowflake + athena + bigquery + jsonpath + db2 + druid setup( name='amundsen-databuilder', @@ -60,7 +64,8 @@ 'athena': athena, 'bigquery': bigquery, 'jsonpath': jsonpath, - 'db2': db2 + 'db2': db2, + 'druid': druid, }, classifiers=[ 'Programming Language :: Python :: 2.7',