diff --git a/README.md b/README.md index 6a8cfde323..f38ee30ee4 100644 --- a/README.md +++ b/README.md @@ -649,6 +649,9 @@ Transforms string timestamp into int epoch #### [RemoveFieldTransformer](./databuilder/transformer/remove_field_transformer.py) Remove fields from the Dict. +#### [GenericTransformer](./databuilder/transformer/generic_transformer.py) +Transforms dictionary based on callback function that user provides. + ## List of loader #### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader") diff --git a/databuilder/transformer/generic_transformer.py b/databuilder/transformer/generic_transformer.py new file mode 100644 index 0000000000..eda7c0ca85 --- /dev/null +++ b/databuilder/transformer/generic_transformer.py @@ -0,0 +1,35 @@ +import logging + +from pyhocon import ConfigTree # noqa: F401 +from typing import Any, Dict # noqa: F401 + +from databuilder.transformer.base_transformer import Transformer + +CALLBACK_FUNCTION = 'callback_function' +FIELD_NAME = 'field_name' + +LOGGER = logging.getLogger(__name__) + + +class GenericTransformer(Transformer): + """ + A generic transformer that accepts a callback function that transforms the record on specified field. + """ + + def init(self, conf): + # type: (ConfigTree) -> None + self._callback_function = conf.get(CALLBACK_FUNCTION) + self._field_name = conf.get_string(FIELD_NAME) + + def transform(self, record): + # type: (Dict[str, Any]) -> Dict[str, Any] + + for k, v in record.items(): + if k == self._field_name: + new_val = self._callback_function(v) + record[k] = new_val + return record + + def get_scope(self): + # type: () -> str + return 'transformer.generic' diff --git a/example/sample_data/sample_dashboard_base.csv b/example/sample_data/sample_dashboard_base.csv new file mode 100644 index 0000000000..ef62b59a88 --- /dev/null +++ b/example/sample_data/sample_dashboard_base.csv @@ -0,0 +1,5 @@ +product,cluster,dashboard_group,dashboard_group_id,dashboard_group_description,dashboard_group_url,dashboard_name,dashboard_id,description,created_timestamp,dashboard_url +mode,gold,test group1,test_group_id_1,test group description 1,http://mode.test_group_id_1.com,test dashboard,test_dashboard_id_1,test dashboard description,1592333799,http://mode.test_group_id_1.com/test_dashboard_id_1 +mode,gold,test group1,test_group_id_1,test group description 1_2,http://mode.test_group_id_1.com,test dashboard,test_dashboard_id_1_2,test dashboard description 1_2,1592332799,http://mode.test_group_id_1.com/test_dashboard_id_1_2 +mode,gold,test group2,test_group_id_2,test group description 2,http://mode.test_group_id_2.com,test dashboard,test_dashboard_id_2,test dashboard description,1592133799,http://mode.test_group_id_2.com/test_dashboard_id_2 +superset,gold,test group3,test_group_id_3,test group description 1,http://mode.test_group_id_3.com,test dashboard,test_dashboard_id_3,test dashboard description,1591333799,http://mode.test_group_id_3.com/test_dashboard_id_3 diff --git a/example/sample_data/sample_dashboard_last_execution.csv b/example/sample_data/sample_dashboard_last_execution.csv new file mode 100644 index 0000000000..eca7f13b52 --- /dev/null +++ b/example/sample_data/sample_dashboard_last_execution.csv @@ -0,0 +1,5 @@ +product,cluster,dashboard_group_id,dashboard_id,execution_id,execution_timestamp,execution_state +mode,gold,test_group_id_1,test_dashboard_id_1,_last_successful_execution,1592351193,success +mode,gold,test_group_id_2,test_dashboard_id_2,_last_successful_execution,1592351210,success +mode,gold,test_group_id_1,test_dashboard_id_1,_last_execution,1593351193,fail +mode,gold,test_group_id_2,test_dashboard_id_2,_last_execution,1594351210,success \ No newline at end of file diff --git a/example/sample_data/sample_dashboard_last_modified.csv b/example/sample_data/sample_dashboard_last_modified.csv new file mode 100644 index 0000000000..a5cf5d0cbb --- /dev/null +++ b/example/sample_data/sample_dashboard_last_modified.csv @@ -0,0 +1,3 @@ +product,cluster,dashboard_group_id,dashboard_id,last_modified_timestamp +mode,gold,test_group_id_1,test_dashboard_id_1,1592351454 +mode,gold,test_group_id_2,test_dashboard_id_2,1592311423 \ No newline at end of file diff --git a/example/sample_data/sample_dashboard_owner.csv b/example/sample_data/sample_dashboard_owner.csv new file mode 100644 index 0000000000..6479632848 --- /dev/null +++ b/example/sample_data/sample_dashboard_owner.csv @@ -0,0 +1,3 @@ +product,cluster,dashboard_group_id,dashboard_id,email +mode,gold,test_group_id_1,test_dashboard_id_1,roald.amundsen@example.org +mode,gold,test_group_id_2,test_dashboard_id_2,buzz@example.org \ No newline at end of file diff --git a/example/sample_data/sample_dashboard_query.csv b/example/sample_data/sample_dashboard_query.csv new file mode 100644 index 0000000000..82b7724457 --- /dev/null +++ b/example/sample_data/sample_dashboard_query.csv @@ -0,0 +1,3 @@ +product,cluster,dashboard_group_id,dashboard_id,query_name,query_id,url,query_text +mode,gold,test_group_id_1,test_dashboard_id_1,first query,query_1,http://mode.test_group_id_1.com/test_dashboard_id_1/query/query_1,SELECT * FROM foo.bar +mode,gold,test_group_id_2,test_dashboard_id_2,second query,query_2,http://mode.test_group_id_2.com/test_dashboard_id_2/query/query_2,SELECT * FROM bar.foo JOIN foo.bar USING (baz) \ No newline at end of file diff --git a/example/sample_data/sample_dashboard_table.csv b/example/sample_data/sample_dashboard_table.csv new file mode 100644 index 0000000000..21fe284d42 --- /dev/null +++ b/example/sample_data/sample_dashboard_table.csv @@ -0,0 +1,3 @@ +product,cluster,dashboard_group_id,dashboard_id,table_ids +mode,gold,test_group_id_1,test_dashboard_id_1,"hive://gold.test_schema/test_table1" +mode,gold,test_group_id_2,test_dashboard_id_2,"hive://gold.test_schema/test_view1,hive://gold.test_schema/test_table3" diff --git a/example/sample_data/sample_dashboard_usage.csv b/example/sample_data/sample_dashboard_usage.csv new file mode 100644 index 0000000000..6f4a1a980c --- /dev/null +++ b/example/sample_data/sample_dashboard_usage.csv @@ -0,0 +1,3 @@ +product,cluster,dashboard_group_id,dashboard_id,view_count,email +mode,gold,test_group_id_1,test_dashboard_id_1,100,roald.amundsen@example.org +mode,gold,test_group_id_2,test_dashboard_id_2,2000,chrisc@example.org \ No newline at end of file diff --git a/example/scripts/sample_data_loader.py b/example/scripts/sample_data_loader.py index 7c17266fc2..b55906ed6f 100644 --- a/example/scripts/sample_data_loader.py +++ b/example/scripts/sample_data_loader.py @@ -22,6 +22,7 @@ import sqlite3 import sys import uuid + from elasticsearch import Elasticsearch from pyhocon import ConfigFactory from sqlalchemy.ext.declarative import declarative_base @@ -32,10 +33,15 @@ from databuilder.job.job import DefaultJob from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader +from databuilder.publisher.elasticsearch_constants import DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, \ + USER_ELASTICSEARCH_INDEX_MAPPING from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher from databuilder.task.task import DefaultTask +from databuilder.transformer.base_transformer import ChainedTransformer from databuilder.transformer.base_transformer import NoopTransformer +from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS +from databuilder.transformer.generic_transformer import GenericTransformer, CALLBACK_FUNCTION, FIELD_NAME es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost') neo_host = os.getenv('CREDENTIALS_NEO4J_PROXY_HOST', 'localhost') @@ -62,18 +68,20 @@ neo4j_user = 'neo4j' neo4j_password = 'test' +LOGGER = logging.getLogger(__name__) + def create_connection(db_file): try: conn = sqlite3.connect(db_file) return conn except Exception: - logging.exception('exception') + LOGGER.exception('exception') return None -def run_csv_job(file_loc, table_name, model): - tmp_folder = '/var/tmp/amundsen/{table_name}'.format(table_name=table_name) +def run_csv_job(file_loc, job_name, model): + tmp_folder = '/var/tmp/amundsen/{job_name}'.format(job_name=job_name) node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) @@ -162,6 +170,52 @@ def create_last_updated_job(): publisher=Neo4jCsvPublisher()) +def _str_to_list(str_val): + return str_val.split(',') + + +def create_dashboard_tables_job(): + # loader saves data to these folders and publisher reads it from here + tmp_folder = '/var/tmp/amundsen/dashboard_table' + node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) + relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) + + csv_extractor = CsvExtractor() + csv_loader = FsNeo4jCSVLoader() + + generic_transformer = GenericTransformer() + dict_to_model_transformer = DictToModel() + transformer = ChainedTransformer(transformers=[generic_transformer, dict_to_model_transformer], + is_init_transformers=True) + + task = DefaultTask(extractor=csv_extractor, + loader=csv_loader, + transformer=transformer) + publisher = Neo4jCsvPublisher() + + job_config = ConfigFactory.from_dict({ + '{}.file_location'.format(csv_extractor.get_scope()): 'example/sample_data/sample_dashboard_table.csv', + '{}.{}.{}'.format(transformer.get_scope(), generic_transformer.get_scope(), FIELD_NAME): 'table_ids', + '{}.{}.{}'.format(transformer.get_scope(), generic_transformer.get_scope(), CALLBACK_FUNCTION): _str_to_list, + '{}.{}.{}'.format(transformer.get_scope(), dict_to_model_transformer.get_scope(), MODEL_CLASS): + 'databuilder.models.dashboard.dashboard_table.DashboardTable', + '{}.node_dir_path'.format(csv_loader.get_scope()): node_files_folder, + '{}.relationship_dir_path'.format(csv_loader.get_scope()): relationship_files_folder, + '{}.delete_created_directories'.format(csv_loader.get_scope()): True, + '{}.node_files_directory'.format(publisher.get_scope()): node_files_folder, + '{}.relation_files_directory'.format(publisher.get_scope()): relationship_files_folder, + '{}.neo4j_endpoint'.format(publisher.get_scope()): neo4j_endpoint, + '{}.neo4j_user'.format(publisher.get_scope()): neo4j_user, + '{}.neo4j_password'.format(publisher.get_scope()): neo4j_password, + '{}.neo4j_encrypted'.format(publisher.get_scope()): False, + '{}.job_publish_tag'.format(publisher.get_scope()): 'unique_tag', # should use unique tag here like {ds} + }) + + return DefaultJob(conf=job_config, + task=task, + publisher=publisher) + + def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index', elasticsearch_doc_type_key='table', model_name='databuilder.models.table_elasticsearch_document.TableESDocument', @@ -171,7 +225,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index :param elasticsearch_index_alias: alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in - `table_search_index` + `table_{uuid}` :param model_name: the Databuilder model class used in transporting between Extractor and Loader :param entity_type: Entity type handed to the `Neo4jSearchDataExtractor` class, used to determine Cypher query to extract data from Neo4j. Defaults to `table`. @@ -188,7 +242,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index # elastic search client instance elasticsearch_client = es # unique name of new index in Elasticsearch - elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) + elasticsearch_new_index_key = '{}_'.format(elasticsearch_doc_type_key) + str(uuid.uuid4()) job_config = ConfigFactory.from_dict({ 'extractor.search_data.entity_type': entity_type, @@ -246,6 +300,20 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index 'databuilder.models.table_last_updated.TableLastUpdated') run_csv_job('example/sample_data/sample_schema_description.csv', 'test_schema_description', 'databuilder.models.schema.schema.SchemaModel') + run_csv_job('example/sample_data/sample_dashboard_base.csv', 'test_dashboard_base', + 'databuilder.models.dashboard.dashboard_metadata.DashboardMetadata') + run_csv_job('example/sample_data/sample_dashboard_usage.csv', 'test_dashboard_usage', + 'databuilder.models.dashboard.dashboard_usage.DashboardUsage') + run_csv_job('example/sample_data/sample_dashboard_owner.csv', 'test_dashboard_owner', + 'databuilder.models.dashboard.dashboard_owner.DashboardOwner') + run_csv_job('example/sample_data/sample_dashboard_query.csv', 'test_dashboard_query', + 'databuilder.models.dashboard.dashboard_query.DashboardQuery') + run_csv_job('example/sample_data/sample_dashboard_last_execution.csv', 'test_dashboard_last_execution', + 'databuilder.models.dashboard.dashboard_execution.DashboardExecution') + run_csv_job('example/sample_data/sample_dashboard_last_modified.csv', 'test_dashboard_last_modified', + 'databuilder.models.dashboard.dashboard_last_modified.DashboardLastModifiedTimestamp') + + create_dashboard_tables_job().launch() create_last_updated_job().launch() @@ -256,66 +324,18 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index model_name='databuilder.models.table_elasticsearch_document.TableESDocument') job_es_table.launch() - user_elasticsearch_mapping = """ - { - "mappings":{ - "user":{ - "properties": { - "email": { - "type":"text", - "analyzer": "simple", - "fields": { - "raw": { - "type": "keyword" - } - } - }, - "first_name": { - "type":"text", - "analyzer": "simple", - "fields": { - "raw": { - "type": "keyword" - } - } - }, - "last_name": { - "type":"text", - "analyzer": "simple", - "fields": { - "raw": { - "type": "keyword" - } - } - }, - "full_name": { - "type":"text", - "analyzer": "simple", - "fields": { - "raw": { - "type": "keyword" - } - } - }, - "total_read":{ - "type": "long" - }, - "total_own": { - "type": "long" - }, - "total_follow": { - "type": "long" - } - } - } - } - } - """ - job_es_user = create_es_publisher_sample_job( elasticsearch_index_alias='user_search_index', elasticsearch_doc_type_key='user', model_name='databuilder.models.user_elasticsearch_document.UserESDocument', entity_type='user', - elasticsearch_mapping=user_elasticsearch_mapping) + elasticsearch_mapping=USER_ELASTICSEARCH_INDEX_MAPPING) job_es_user.launch() + + job_es_dashboard = create_es_publisher_sample_job( + elasticsearch_index_alias='dashboard_search_index', + elasticsearch_doc_type_key='dashboard', + model_name='databuilder.models.dashboard_elasticsearch_document.DashboardESDocument', + entity_type='dashboard', + elasticsearch_mapping=DASHBOARD_ELASTICSEARCH_INDEX_MAPPING) + job_es_dashboard.launch()