diff --git a/README.md b/README.md index d6a1ebb53c..470e172fee 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,10 @@ them to fit your particular use case. BigQuery query and returns the output as an XML string. * [BigQuery Translation Validator](examples/bigquery-translation-validator-utility) - A python utility to compare 2 SQL Files and point basic differences like column names, table names, joins, function names, is-Null and query syntax. +* [BigQuery Generic DDL Migration Utility](examples/bigquery-generic-ddl-migration-utility) - + Generic DDL Migration Utility to migrate the tables schema (DDL) from Database(Oracle, Snowflake, MSSQL, Vertica, Neteeza) DB to BigQuery. + The utility leverages BigQuery Translation API and offers additional features + such as adding partitioning, clustering, metadata columns and prefixes to table names. * [Bigtable Dataflow Cryptocurrencies Exchange RealTime Example](examples/cryptorealtime) - Apache Beam example that reads from the Crypto Exchanges WebSocket API as Google Cloud Dataflow pipeline and saves the feed in Google Cloud Bigtable. diff --git a/examples/bigquery-generic-ddl-migration-utility/BQ_Table_Creator/bq_table_creator.py b/examples/bigquery-generic-ddl-migration-utility/BQ_Table_Creator/bq_table_creator.py new file mode 100644 index 0000000000..cc914f9060 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/BQ_Table_Creator/bq_table_creator.py @@ -0,0 +1,415 @@ +# Copyright 2022 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to create bq table from the extracted ddl""" + + +import sys +import argparse +import json +import ast +import datetime +from google.cloud import bigquery +from google.api_core.exceptions import NotFound +from google.cloud import storage + + +def create_log_table(project_id, target_dataset, client): + """ + Function to create log table + """ + table_id = f"{project_id}.{target_dataset}.report_status_log_tbl" + schema = [ + bigquery.SchemaField("Timestamp", "STRING"), + bigquery.SchemaField("FilePath", "STRING"), + bigquery.SchemaField("Schemaname", "STRING"), + bigquery.SchemaField("TableName", "STRING"), + bigquery.SchemaField("Category", "STRING"), + bigquery.SchemaField("Message", "STRING"), + bigquery.SchemaField("Status", "STRING"), + bigquery.SchemaField("Action", "STRING"), + ] + table_address = bigquery.Table(table_id, schema=schema) + table_ref = client.create_table(table_address, exists_ok=True) + return table_ref + + +def log_table_data(table, client, records): + """ + function for logging the overall report + """ + # Create the table_id in the required format + try: + client.insert_rows(table, records) + print("Table logged Successfully in Log Table") + except Exception as ex: + print(f"Found the error when loading the data in table : {str(ex)}") + + +def log_err_table( + project_id, target_dataset, log_table_id, target_bucket_name, client, table_name +): + """ + Function for logging the batch translation report + """ + table_id = f"{project_id}.{target_dataset}.{table_name}" + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("Timestamp", "STRING"), + bigquery.SchemaField("FilePath", "STRING"), + bigquery.SchemaField("FileName", "STRING"), + bigquery.SchemaField("ScriptLine", "INTEGER"), + bigquery.SchemaField("ScriptColumn", "INTEGER"), + bigquery.SchemaField("TranspilerComponent", "STRING"), + bigquery.SchemaField("Environment", "STRING"), + bigquery.SchemaField("ObjectName", "STRING"), + bigquery.SchemaField("Severity", "STRING"), + bigquery.SchemaField("Category", "STRING"), + bigquery.SchemaField("Message", "STRING"), + bigquery.SchemaField("ScriptContext", "STRING"), + bigquery.SchemaField("Action", "STRING"), + ], + skip_leading_rows=1, + # The source format defaults to CSV, so the line below is optional. + source_format=bigquery.SourceFormat.CSV, + write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, + field_delimiter=",", + ignore_unknown_values=True, + allow_quoted_newlines=True, + ) + uri = f"gs://{target_bucket_name}/{log_table_id}" + try: + load_job = client.load_table_from_uri(uri, table_id, job_config=job_config) + # Make an API request. + load_job.result() # Waits for the job to complete. + destination_table = client.get_table(table_id) # Make an API request. + print(f"Loaded {destination_table.num_rows} rows.") + except Exception as ex: + print(f"Found the error while creating the table : {str(ex)}") + + +def file_exist(gcs_client, target_bucket_name, log_table_id) -> bool: + """ + Function to check file exist in gcs bucket + """ + bucket = gcs_client.bucket(target_bucket_name) + return storage.Blob(bucket=bucket, name=log_table_id).exists(gcs_client) + + +def get_cluster_partitioncolumns( + gcs_client, + config_source_bucket_name, + config_source_prefix, + table_name, + audit_col_str, +): + """ + Function to create partion and cluster by column string + """ + config_string = readgcs_file( + gcs_client, config_source_bucket_name, config_source_prefix + ) + migration_config_dict = parse_json(config_string) + cluster_partition_query = "" + for input_val in migration_config_dict["table_config"]: + if input_val["table_name"] == table_name: + cluster_partition_query = f"{audit_col_str}) \ + PARTITION BY DATE({input_val['partitioning_field']}) \ + CLUSTER BY {input_val['clustering_fields']};" + return cluster_partition_query + else: + pass + + +def readgcs_file(gcs_client, bucket_name, prefix): + """ + Function to read gcs file + """ + bucket = gcs_client.get_bucket(bucket_name) + blob = bucket.get_blob(prefix) + contents = blob.download_as_string() + return contents.decode("utf-8") + + +# Parse gcs json file to dictionary +def parse_json(raw_string): + """ + Function to parse json string + """ + config_parsed_data = ast.literal_eval(raw_string) + config_intermediate_data = json.dumps(config_parsed_data) + parsed_json_data = json.loads(config_intermediate_data) + return parsed_json_data + + +# Write to Blob +def write_to_blob(gcs_client, bucket_name, file_name, content): + """ + Function to write to gcs bucket + """ + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob(file_name) + blob.upload_from_string(content) + + +# function to update the schema +def schema_update_tbl(gcs_client, target_bucket_name, audit_config_file): + """ + Create DDL for schema update + """ + try: + json_data_string = readgcs_file( + gcs_client, target_bucket_name, prefix=audit_config_file + ) + audit_columns = parse_json(json_data_string) + audit_schema_str = "," + for json_d in audit_columns: + audit_schema_str = ( + audit_schema_str + json_d["name"] + " " + json_d["type"] + "," + ) + return audit_schema_str + except Exception as ex: + print(f"Error while creating schema ddl : {str(ex)}") + sys.exit(1) + + +def create_ddl( + source_bucket_name, + target_bucket_name, + source_prefix, + target_prefix, + target_dataset, + project_id, + dataset_location, + log_table_id, + audit_config_file, + target_table_prefix, + config_source_bucket_name, + config_source_prefix, + table_create_flag, +): + """ + Check table exist or not and create the table if don't exist + with the DDL converted with migration workflow. + """ + client = bigquery.Client(project=project_id, location=dataset_location) + gcs_client = storage.Client(project_id) + + bucket = gcs_client.lookup_bucket(source_bucket_name) + table_ref = create_log_table(project_id, target_dataset, client) + + error_tbl_name = [] + + # Call the function for logging the report + if file_exist(gcs_client, target_bucket_name, log_table_id): + log_err_table( + project_id, + target_dataset, + log_table_id, + target_bucket_name, + client, + table_name="report_err_log_tbl", + ) + query = ( + "SELECT FileName,FilePath,Timestamp,Action,Category,Message,Severity,Category FROM " + + project_id + + "." + + target_dataset + + ".report_err_log_tbl;" + ) + job = client.query(query) + for row in job.result(): + if row[6] == "ERROR" and row[7] == "SyntaxError": + error_tbl_name.append(row[0]) + failure_record = [ + ( + row[2], + row[1], + target_dataset, + row[0][:-4], + row[4], + row[5], + "Failed", + row[3], + ) + ] + log_table_data(table_ref, client, failure_record) + else: + pass + else: + print( + "Log table is not created since batch_translation_report file doesn't exists" + ) + + for input_tbl_name in bucket.list_blobs(prefix=source_prefix + "/"): + if input_tbl_name.name.endswith(".sql"): + if input_tbl_name.name.split("/")[1] not in error_tbl_name: + input_table = input_tbl_name.name.split("/")[1][:-4] + tbl_name = f"{input_table.split('-')[0]}.{input_table.split('-')[1]}" + if target_table_prefix != "": + target_table = f"{input_table.split('-')[1]}_{target_table_prefix}" + else: + target_table = f"{input_table.split('-')[1]}{target_table_prefix}" + target_id = project_id + "." + target_dataset + "." + target_table + target_path = ( + f"gs://'{target_bucket_name}/{target_dataset}/{input_tbl_name.name}" + ) + + try: + client.get_table(target_id) + print(f"Table {target_id} already exist so skipping") + failure_record = [ + ( + datetime.datetime.now().strftime("%s"), + target_path, + target_dataset, + input_table, + "BQ_Generic_Migration", + "Table Already Exist", + "Failed", + None, + ) + ] + log_table_data(table_ref, client, failure_record) + except NotFound: + query = ( + readgcs_file( + gcs_client, + target_bucket_name, + target_prefix + "/" + input_table + ".sql", + ) + .replace(input_table.split("-")[1], target_table) + .replace(")\n;", "") + .replace("CREATE TABLE ", "CREATE TABLE IF NOT EXISTS ") + ) + audit_column_str = schema_update_tbl( + gcs_client, target_bucket_name, audit_config_file + ) + cluster_partition_query = get_cluster_partitioncolumns( + gcs_client, + config_source_bucket_name, + config_source_prefix, + tbl_name, + audit_column_str, + ) + final_query = "".join([query, cluster_partition_query]) + if table_create_flag == "true": + job = client.query(final_query) + job.result() + + print( + f"Created new table \ + {job.destination.project}. \ + {job.destination.dataset_id}.\ + {job.destination.table_id}" + ) + + sucess_record = [ + ( + datetime.datetime.now().strftime("%s"), + target_path, + target_dataset, + input_table, + "BQ_Generic_Migration", + "Migration Successfull", + "Success", + job.error_result, + ) + ] + log_table_data(table_ref, client, sucess_record) + print("Updated the Schema Succesfully") + + write_to_blob( + gcs_client, + bucket_name=source_bucket_name, + file_name=f"{target_dataset}_processed/{target_table}.sql", + content=final_query, + ) + else: + print( + f"Table {target_dataset}.{input_tbl_name.name} contains error in \ + the script so skipping" + ) + elif input_tbl_name.name.split("/")[0] == source_prefix: + input_object = input_tbl_name.name.split("/")[0] + print(f"Source prefix object {input_object} so skipping") + else: + print("No sql file present in the object") + + +def main(gcs_config_path, project_id): + """ + Main function to execute the other function call + """ + + # Set the variable value + config_source_bucket_name = gcs_config_path.split("//")[1].split("/")[0] + config_source_prefix = gcs_config_path.split(config_source_bucket_name + "/")[1] + + # Read Config File values: + gcs_client = storage.Client(project_id) + + config_string = readgcs_file( + gcs_client, config_source_bucket_name, config_source_prefix + ) + + migration_config_dict = parse_json(config_string) + + # Read variable values define in config file + gcs_source_path = migration_config_dict["gcs_source_path"] + dataset_location = migration_config_dict["dataset_location"] + target_dataset = migration_config_dict["target_dataset"] + audit_config_file = migration_config_dict["audit_column_config_path"] + target_table_prefix = migration_config_dict["target_table_prefix"] + table_create_flag = migration_config_dict["table_create_flag"] + + # Derived variable required in the code: + source_bucket_name = gcs_source_path.split("//")[1].split("/")[0] + gcs_target_path = f"gs://{source_bucket_name}/{target_dataset}" + target_bucket_name = gcs_target_path.split("//")[1].split("/")[0] + source_prefix = gcs_source_path.split("//")[1].split("/")[1] + target_prefix = gcs_target_path.split("//")[1].split("/")[1] + log_table_id = f"{target_dataset}/batch_translation_report.csv" + + print("\n-------------------- Running create ddl function --------------------\n") + # Call the function to create the ddl + create_ddl( + source_bucket_name, + target_bucket_name, + source_prefix, + target_prefix, + target_dataset, + project_id, + dataset_location, + log_table_id, + audit_config_file, + target_table_prefix, + config_source_bucket_name, + config_source_prefix, + table_create_flag, + ) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument("--gcs_config_path", help="GCS Config Path for defined variables") + + parser.add_argument("--project_id", help="Project_id required to run the code") + + args = parser.parse_args() + + main(args.gcs_config_path, args.project_id) diff --git a/examples/bigquery-generic-ddl-migration-utility/BQ_Table_Creator/generic_bq_table_creator.md b/examples/bigquery-generic-ddl-migration-utility/BQ_Table_Creator/generic_bq_table_creator.md new file mode 100644 index 0000000000..8a860c8ec7 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/BQ_Table_Creator/generic_bq_table_creator.md @@ -0,0 +1,24 @@ +The BQ Table Creator Script does the following functionalities + +1. Reads the output sql file created by the mssql/netezza/vertica bq converter script +2. The script create the Bigquery Tables in the specified target dataset. +3. The table structure will include source columns, metadata columns and paritioning and clustering info +3. The final DDL which includes source columns, metadata columns and paritioning and clustering info is placed in the gcs path + +Below packages are need to run the script +google-cloud-bigquery +google-cloud-storage +google-api-core + + +Steps to run this script: + +1. Make Sure the pre-requsitie of the script (generic_bq_converter.py) is met. + +2. After completing the above steps, the script can be run as + + a) pip install -r requirements.txt + b) python3 bq_table_creator.py --gcs_config_path --project_id + +3. Once done, verify that the bigquery tables are created in the specified dataset and the final DDL is placed in the specified gcs path + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Archiver/archive_ddl.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Archiver/archive_ddl.py new file mode 100644 index 0000000000..68f9e41fe7 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Archiver/archive_ddl.py @@ -0,0 +1,126 @@ +# Copyright 2022 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module the Archive the DDL""" + + +import argparse +import json +import ast +from google.cloud import storage + + +def readgcs_file(gcs_client, bucket_name, prefix): + """ + Function to read gcs file + """ + bucket = gcs_client.get_bucket(bucket_name) + blob = bucket.get_blob(prefix) + contents = blob.download_as_string() + return contents.decode("utf-8") + + +# Parse gcs json file to dictionary +def parse_json(raw_string): + """ + Function to parse json string + """ + config_parsed_data = ast.literal_eval(raw_string) + config_intermediate_data = json.dumps(config_parsed_data) + parsed_json_data = json.loads(config_intermediate_data) + return parsed_json_data + + +def mv_blob(source_bucket_name, archive_bucket_name, source_prefix, project_id): + """ + Function to move the file from one bucket to another bucket + """ + gcs_client = storage.Client(project_id) + + source_bucket = gcs_client.get_bucket(source_bucket_name) + + archive_bucket = gcs_client.get_bucket(archive_bucket_name) + + for input_tbl_name in source_bucket.list_blobs(prefix=source_prefix + "/"): + source_blob = source_bucket.blob(input_tbl_name.name) + new_blob = source_bucket.copy_blob(source_blob, archive_bucket) + print(f"File copied from {source_blob} to {new_blob}") + source_blob.delete() + print( + f"Files inside object {source_prefix} deleted successfully from bucket {source_bucket_name}" + ) + + +def main(gcs_config_path, project_id): + """ + Main function to execute the other function call + """ + + # Set the variable value + config_source_bucket_name = gcs_config_path.split("//")[1].split("/")[0] + config_source_prefix = gcs_config_path.split(config_source_bucket_name + "/")[1] + + # Read Config File values: + gcs_client = storage.Client(project_id) + + config_string = readgcs_file( + gcs_client, config_source_bucket_name, config_source_prefix + ) + + migration_config_dict = parse_json(config_string) + + # Read variable values define in config file + gcs_source_path = migration_config_dict["gcs_source_path"] + archive_bucket_name = migration_config_dict["archive_bucket_name"] + target_dataset = migration_config_dict["target_dataset"] + + # Derived variable required in the code: + source_bucket_name = gcs_source_path.split("//")[1].split("/")[0] + gcs_target_path = f"gs://{source_bucket_name}/{target_dataset}" + source_prefix = gcs_source_path.split("//")[1].split("/")[1] + target_prefix = gcs_target_path.split("//")[1].split("/")[1] + + # move the Generic DDL to archive storage: + print("\n-------------------- Archive Generic DDL --------------------\n") + mv_blob(source_bucket_name, archive_bucket_name, source_prefix, project_id) + + print( + "\n-------------------- Archive Converted Bigquery DDL --------------------\n" + ) + # move the Bigquery Converted DDL to archive storage: + mv_blob(source_bucket_name, archive_bucket_name, target_prefix, project_id) + + print("\n-------------------- Archive Final Bigquery DDL--------------------\n") + # move the Bigquery Converted DDL to archive storage: + mv_blob( + source_bucket_name, + archive_bucket_name, + target_prefix + "_processed", + project_id, + ) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument("--gcs_config_path", help="GCS Config Path for defined variables") + + parser.add_argument("--project_id", help="Project_id required to run the code") + + args = parser.parse_args() + + main(args.gcs_config_path, args.project_id) + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Archiver/generic_archive_ddl.md b/examples/bigquery-generic-ddl-migration-utility/DDL_Archiver/generic_archive_ddl.md new file mode 100644 index 0000000000..5fc6e8ddaa --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Archiver/generic_archive_ddl.md @@ -0,0 +1,17 @@ +The Archive DDL Script archive the DDL files created by the scripts (generic_ddl_extraction.py, generic_bq_converter.py and archive_ddl.py) +and place the files in the specified archive bucket. + +Below packages are need to run the script: +google-cloud-storage + +Steps to run this script: + +1. Make Sure the pre-requsitie of the script (generic_bq_converter.py) is met + +2. After completing the above steps, the script can be run as + + a) pip install -r requirements.txt + b) python3 archive_ddl.py --gcs_config_path --project_id + +3. Once done, verify that the DDL files are archived in the specified gcs path + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Converter/generic_bq_converter.md b/examples/bigquery-generic-ddl-migration-utility/DDL_Converter/generic_bq_converter.md new file mode 100644 index 0000000000..a689b891ba --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Converter/generic_bq_converter.md @@ -0,0 +1,31 @@ +The Generic BQ Converter Script does the following functionalities + +1. The script reads the generic ddl files from the specified gcs path (output path of the generic_ddl_extraction script) +2. The script calls the BigQuery Migration API and converts the ddl to the BigQuery DDL and placed it in the specified gcs path + + +Below packages are need to run the script +google-cloud-bigquery-migration +google-cloud-bigquery +google-cloud-storage +google-api-core + + +Steps to run this script: + +1. Create the generic-ddl-extraction-config.json file and place it in the gcs bucket. + +2. Create the additional metadata columns to the metadata_columns.json file and place it in the gcs bucket + +3. Create the object_name_mapping.json file and place it in the gcs bucket. + +4. After completing the above steps, the script can be run as + + a) pip install -r requirements.txt + b) python3 generic_bq_converter.py --gcs_config_path --project_id --db_type + +5. Once done, verify that the converted ddl is placed in the specified gcs path + + + + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Converter/generic_bq_converter.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Converter/generic_bq_converter.py new file mode 100644 index 0000000000..4a5c4c7450 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Converter/generic_bq_converter.py @@ -0,0 +1,208 @@ +# Copyright 2022 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to create ddl in bq table""" + +import argparse +import json +import ast +import time as t +from google.cloud import bigquery_migration_v2 +from google.cloud import storage + + +def file_exist(gcs_client, target_bucket_name, log_table_id) -> bool: + """ + Function to check file exist in gcs bucket + """ + bucket = gcs_client.bucket(target_bucket_name) + return storage.Blob(bucket=bucket, name=log_table_id).exists(gcs_client) + + +def readgcs_file(gcs_client, bucket_name, prefix): + """ + Function to read gcs file + """ + bucket = gcs_client.get_bucket(bucket_name) + blob = bucket.get_blob(prefix) + contents = blob.download_as_string() + return contents.decode("utf-8") + + +# Parse gcs json file to dictionary +def parse_json(raw_string): + """ + Function to parse json string + """ + config_parsed_data = ast.literal_eval(raw_string) + config_intermediate_data = json.dumps(config_parsed_data) + parsed_json_data = json.loads(config_intermediate_data) + return parsed_json_data + + +# Create migration workflow: +def create_migration_workflow( + gcs_client, + gcs_source_path, + gcs_target_path, + project_id, + db_type, + target_bucket_name, + log_table_id, + object_name_config_file, + default_db, +): + """ + Check if batch report file already exist then delete before running the migration: + """ + print("Starting Migration workflow") + + parent = f"projects/{project_id}/locations/us" + + # Construct a BigQuery Migration client object. + service_client = bigquery_migration_v2.MigrationServiceClient() + source_dialect = bigquery_migration_v2.Dialect() + + # Set the source dialect to Generic SQL. + if db_type == 'oracle': + source_dialect.oracle_dialect = bigquery_migration_v2.OracleDialect() + elif db_type == 'snowflake': + source_dialect.snowflake_dialect = bigquery_migration_v2.SnowflakeDialect() + elif db_type == 'netezza': + source_dialect.netezza_dialect = bigquery_migration_v2.NetezzaDialect() + elif db_type == 'vertica': + source_dialect.vertica_dialect = bigquery_migration_v2.VerticaDialect() + elif db_type == "mssql": + source_dialect.sqlserver_dialect = bigquery_migration_v2.SQLServerDialect() + else: + raise Exception("Invalid DB Type") + + print(f"Reached here :{db_type}") + # Set the target dialect to BigQuery dialect. + target_dialect = bigquery_migration_v2.Dialect() + target_dialect.bigquery_dialect = bigquery_migration_v2.BigQueryDialect() + + # Source env + source_env = bigquery_migration_v2.SourceEnv(default_database=default_db) + json_data_string = readgcs_file( + gcs_client, target_bucket_name, prefix=object_name_config_file + ) + object_name_mapping_list = parse_json(json_data_string) + print(object_name_mapping_list) + + # Prepare the config proto. + translation_config = bigquery_migration_v2.TranslationConfigDetails( + gcs_source_path=gcs_source_path, + gcs_target_path=gcs_target_path, + source_dialect=source_dialect, + target_dialect=target_dialect, + source_env=source_env, + name_mapping_list=object_name_mapping_list, + ) + + print(translation_config) + # Prepare the task. + migration_task = bigquery_migration_v2.MigrationTask( + type_="Translation_Generic2BQ", translation_config_details=translation_config + ) + + # Prepare the workflow. + workflow = bigquery_migration_v2.MigrationWorkflow( + display_name="Demo_Generic2BQ_Migration" + ) + workflow.tasks["translation-task"] = migration_task + + # Prepare the API request to create a migration workflow. + request = bigquery_migration_v2.CreateMigrationWorkflowRequest( + parent=parent, + migration_workflow=workflow, + ) + response = service_client.create_migration_workflow(request=request) + + # code to check the migration completed succesfully or not: + starttime = t.time() + while True: + if file_exist(gcs_client, target_bucket_name, log_table_id): + break + else: + t.sleep(60.0 - ((t.time() - starttime) % 60.0)) + print("Created workflow:") + print(response.display_name) + print("Current state:") + print(response.State(response.state)) + print("Migration completed succesfully") + + +def main(gcs_config_path, project_id, db_type): + """ + Main function to execute the other function call + """ + + # Set the variable value + config_source_bucket_name = gcs_config_path.split("//")[1].split("/")[0] + config_source_prefix = gcs_config_path.split(config_source_bucket_name + "/")[1] + + # Read Config File values: + gcs_client = storage.Client(project_id) + + config_string = readgcs_file( + gcs_client, config_source_bucket_name, config_source_prefix + ) + + migration_config_dict = parse_json(config_string) + + # Read variable values define in config file + gcs_source_path = migration_config_dict["gcs_source_path"] + target_dataset = migration_config_dict["target_dataset"] + object_name_config_file = migration_config_dict["object_name_mapping_path"] + default_db = migration_config_dict["default_database"] + + # Derived variable required in the code: + source_bucket_name = gcs_source_path.split("//")[1].split("/")[0] + gcs_target_path = f"gs://{source_bucket_name}/{target_dataset}" + target_bucket_name = gcs_target_path.split("//")[1].split("/")[0] + log_table_id = f"{target_dataset}/batch_translation_report.csv" + + print( + "-------------------- Running Migration workflow function --------------------\n" + ) + # Call the migration workflow function + create_migration_workflow( + gcs_client, + gcs_source_path, + gcs_target_path, + project_id, + db_type, + target_bucket_name, + log_table_id, + object_name_config_file, + default_db, + ) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument("--gcs_config_path", help="GCS Config Path for defined variables") + + parser.add_argument("--project_id", help="Project_id required to run the code") + + parser.add_argument("--db_type", help="GCS Config Path for defined variables") + + args = parser.parse_args() + + main(args.gcs_config_path, args.project_id, args.db_type) + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/generic_ddl_extraction.md b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/generic_ddl_extraction.md new file mode 100644 index 0000000000..e7f2a07c41 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/generic_ddl_extraction.md @@ -0,0 +1,71 @@ +Below packages are need to run the script: +google-cloud-secret-manager +google-cloud-bigquery +google-cloud-storage +google-api-core +sudo apt install unixodbc + + +Steps to run this script: + +1. Create the generic-ddl-extraction-config.json file and place it in the gcs bucket. + +2. Create the object_name_mapping.json file and place it in the gcs bucket. + +3. Add the needed additional metadata columns to the metadata_columns.json file and place it in the gcs bucket + +4. After completing the above steps, the script can be run as + + a) pip install -r requirements.txt + b) python3 generic_ddl_extraction.py + +5. Once done, verify that the extracted ddl is placed in the specified gcs path. + +# DDL Extractor Utility + +A utility to extract metadata of the tables in the Database(Oracle, Snowflake, MSSQL, Vertica, Neteeza). + +## Step to setup MSSql driver for running the code: +Install pip install -r requirement.txt +To install the SQL Server 2017 ODBC driver on Debian 11, you can use the following steps: + +1. Download the ODBC driver package for Debian 10 (Buster) from the Microsoft repository: + `wget https://packages.microsoft.com/debian/10/prod/pool/main/m/msodbcsql17/msodbcsql17_17.8.1.1-1_amd64.deb` + +2. Install the downloaded package: + `sudo dpkg -i msodbcsql17_17.8.1.1-1_amd64.deb` + + If any dependencies are missing, the dpkg command will notify you. You can use apt-get to install the required dependencies before re-running the dpkg command. + + `sudo apt-get install -f` + + `sudo dpkg -i msodbcsql17_17.8.1.1-1_amd64.deb` + +3. Verify the installation: + Once the installation is complete, you can proceed with installing pyodbc using the steps mentioned earlier. Verify the installation by importing pyodbc in a Python shell or script: + + import pyodbc + print(pyodbc.version) + + If the import and version printout are successful, it means pyodbc is installed correctly, and the SQL Server 2017 ODBC driver is ready to use. + +4. In order to locate the ODBC driver package : + In case the ODBC driver library is installed in a different location, you can perform a system-wide search using the find command. This may take some time as it searches the entire filesystem. + `sudo find / -name "libmsodbcsql*"` + + The command will search for any file starting with libmsodbcsql in the filesystem. Note down the path of the library file if it is found. + +By following these steps, you should be able to locate the ODBC driver library file (libmsodbcsql.so) on your system. Once you find the library file, you can use its path to set the driver variable in your code for connecting to the SQL Server database using pyodbc. + +## Instructions to Run + +Below packages are need to run the script:pandas, cx_Oracle + +1. Install the dependencies listed in requirements.txt using pip3. + `pip3 install -r requirements.txt ` +2. Add your credentials in the secret manager for the respective database +3. Select the type of database. Currently supported types include (mysql, vertica, netezza). +4. Run the utility + python3 generic_ddl_extractions.py --dbtype --secret_name --gcs_config_path --project_id +5. Check the result in given bucket name Folder. + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/generic_ddl_extractions.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/generic_ddl_extractions.py new file mode 100644 index 0000000000..de2d9e9a4c --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/generic_ddl_extractions.py @@ -0,0 +1,98 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to drive the generic metadata utility""" +from google.cloud import secretmanager +import json +import argparse +from netezza_metadata_extraction import NetezzaMetastoreModule +from vertica_metadata_extraction import VerticaMetastoreModule +from mssql_metadata_extraction import MssqlMetastoreModule +from oracle_metadata_extraction import OracleMetastoreModule +from snowflake_metadata_extraction import SnowflakeMetastoreModule +from utils.setup_logger import logger + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument( + "--dbtype", help="database type (vertica, netezza, mysql)", required=True + ) + + parser.add_argument( + "--secret_name", help="Secret name", required=True + ) + + parser.add_argument("--gcs_config_path", help="output bucket name", required=True) + + parser.add_argument( + "--project_id", help="Project_Id", required=True + ) + + args = parser.parse_args() + + try: + secret_resource_id = f"projects/{args.project_id}/secrets/{args.secret_name}/versions/latest" + client_secret = secretmanager.SecretManagerServiceClient() + response = client_secret.access_secret_version(name=secret_resource_id) + secret_val = response.payload.data.decode('UTF-8') + credentials = json.loads(secret_val) + username = credentials['user'] + password = credentials['password'] + if args.dbtype == "vertica": + host = credentials['host'] + port = credentials['port'] + dbname = credentials['database'] + vertica_module = VerticaMetastoreModule( + username, password, host, port, dbname, args.gcs_config_path, args.project_id + ) + vertica_module.vertica_metastore_discovery() + elif args.dbtype == "netezza": + host = credentials['host'] + port = credentials['port'] + dbname = credentials['database'] + netezza_module = NetezzaMetastoreModule( + username, password, host, port, dbname, args.gcs_config_path, args.project_id + ) + netezza_module.netezza_metastore_discovery() + elif args.dbtype == "mssql": + host = credentials['host'] + port = credentials['port'] + dbname = credentials['database'] + mssql_module = MssqlMetastoreModule( + username, password, host, port, dbname, args.gcs_config_path, args.project_id + ) + mssql_module.mssql_metastore_discovery() + elif args.dbtype == "oracle": + host = credentials['host'] + port = credentials['port'] + dbname = credentials['database'] + instant_client_path = credentials['instant_client_path'] + oracle_module = OracleMetastoreModule( + username, password, host, port, dbname, args.gcs_config_path, args.project_id, instant_client_path + ) + oracle_module.oracle_metastore_discovery() + elif args.dbtype == "snowflake": + account = credentials['account'] + warehouse = credentials['warehouse'] + dbname = credentials['database'] + schema=credentials['schema'] + snowflake_module = SnowflakeMetastoreModule( + username, password, account, warehouse, dbname, schema, args.gcs_config_path, args.project_id + ) + snowflake_module.snowflake_metastore_discovery() + except Exception as error: + logger.error(f"Error when retreiving secret credentials/Incorrect option : {error}") + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/mssql_metadata_extraction.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/mssql_metadata_extraction.py new file mode 100644 index 0000000000..483c7be2f6 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/mssql_metadata_extraction.py @@ -0,0 +1,192 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to extract mssql metastore data""" +import sys +import pyodbc +import datetime +from google.cloud import storage, bigquery +from utils.setup_logger import logger +from utils import utilities as UtilFunction + + +class MssqlMetastoreModule: + """ + This Class has functions related to the Generic Metadata Utility Module. + It has functions which fetch different data metrics from database and + write it to GCS bucket. + + Args: + inputs (dict): Contains user input attributes + """ + + def __init__( + self, + username: str, + password: str, + host: str, + port: str, + dbname: str, + gcs_config_path: str, + project_id: str + ) -> None: + """Initialize the attributes""" + self.username = username + self.password = password + self.host = host + self.port = port + self.dbname = dbname + self.gcs_config_path = gcs_config_path + self.project_id = project_id + + def connect_mssql_conn(self, table_ref, bq_client): + """ + Initializes a connection pool for a Mssql. + Uses the pyodbc Python package. + """ + try: + # Construct the connection string + connection_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={self.host};\ + DATABASE={self.dbname};UID={self.username};PWD={self.password}" + logger.info("Connecting to the Mssql Database...") + conn = pyodbc.connect(connection_string) + return conn + except Exception as ex: + print(f"Connection to oracle failed: {str(ex)}") + failure_record = [ + ( + datetime.datetime.now().strftime("%s"), + None, + None, + None, + "DatabaseError", + str(ex), + "Failed", + "Check the connection credentials", + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, failure_record) + raise Exception(str(ex)) from ex + + def extract_metastore(self, con, gcs_client, bq_client, table_config, source_bucket_name, source_dataset, table_ref): + """Function to execute the core logic for metastore extraction""" + try: + cursor = con.cursor() + con.outputtypehandler = UtilFunction.output_type_handler + + cursor = con.cursor() + # Execute the metadata query + query = """ + SELECT + t.TABLE_SCHEMA, + t.TABLE_NAME + FROM + INFORMATION_SCHEMA.TABLES AS t + WHERE + t.TABLE_TYPE = 'BASE TABLE' OR t.TABLE_TYPE = 'VIEW' + ORDER BY + t.TABLE_SCHEMA, + t.TABLE_NAME, + """ + cursor.execute(query) + + # Fetch all the rows from the result set + rows = cursor.fetchall() + logger.info("---Extraction Completed\n") + # Create a ddl string from the metadata rows + content = "" + for row in rows: + try: + query = f""" + exec sp_GetDDL '[{row[0]}].[{row[1]}]' + """ + cursor.execute(query) + ddl_rows= cursor.fetchall() + content += str(ddl_rows) + "\n" + + UtilFunction.write_to_blob( + gcs_client, + bucket_name=source_bucket_name, + file_name=f"{source_dataset}/{row['table_name'].split('.')[0].strip()}-\ + {row['table_name'].split('.')[1].strip()}.sql", + content=content, + ) + except Exception as ex: + failure_record = [ + ( + datetime.datetime.now().strftime("%s"), + None, + row["table_name"].split(".")[0].strip(), + row["table_name"].split(".")[1].strip(), + "Excecution Error", + str(ex), + "Failed", + "Check the query", + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, failure_record) + else: + print( + f"DDL Generated Successfully for table \ + {row['table_name'].split('.')[1].strip()}" + ) + file_path = f"gs://{source_bucket_name}/{source_dataset}/\ + {row['table_name'].split('.')[0].strip()}-{row['table_name'].split('.')[1].strip()}.sql" + success_record = [ + ( + datetime.datetime.now().strftime("%s"), + file_path, + row["table_name"].split(".")[0].strip(), + row["table_name"].split(".")[1].strip(), + "DDL Extraction", + "DDL Generated Successfully", + "Success", + None, + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, success_record) + con.commit() + print( + "Connection close in case of failure of any table check the log table in Big Query" + ) + except Exception as error: + logger.error("Error in the Extract Metastore function call %s", str(error)) + + def mssql_metastore_discovery(self): + """Creates a connection and query to the Mssql database.""" + try: + config_source_bucket_name = self.gcs_config_path.split("//")[1].split("/")[0] + config_source_prefix = self.gcs_config_path.split(config_source_bucket_name + "/")[1] + gcs_client = storage.Client(self.project_id) + + migration_config_dict = UtilFunction.read_config(gcs_client, config_source_bucket_name, config_source_prefix) + + # Read variable values define in config file + gcs_source_path = migration_config_dict["gcs_source_path"] + dataset_location = migration_config_dict["dataset_location"] + target_dataset = migration_config_dict["target_dataset"] + source_bucket_name = migration_config_dict["source_bucket_name"] + source_dataset = gcs_source_path.split("//")[1].split("/")[1] + table_config = migration_config_dict["table_config"] + + bq_client = bigquery.Client(project=self.project_id, location=dataset_location) + + table_ref = UtilFunction.create_log_table(self.project_id, target_dataset, bq_client) + + con = self.connect_mssql_conn(table_ref, bq_client) + self.extract_metastore(con, gcs_client, bq_client, table_config, source_bucket_name, source_dataset, table_ref) + con.close() + except Exception as error: + logger.error("Error in the main function call %s", str(error)) + sys.exit(1) + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/netezza_metadata_extraction.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/netezza_metadata_extraction.py new file mode 100644 index 0000000000..292dc95a25 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/netezza_metadata_extraction.py @@ -0,0 +1,128 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to extract Netezza metastore data""" +import sys +import datetime +import jaydebeapi +from google.cloud import storage +from utils.setup_logger import logger +from utils import utilities as UtilFunction + + +class NetezzaMetastoreModule: + """ + This Class has functions related to the Generic Metadata Utility Module. + It has functions which fetch different data metrics from database and + write it to GCS bucket. + + Args: + inputs (dict): Contains user input attributes + """ + + def __init__( + self, + username: str, + password: str, + host: str, + port: str, + dbname: str, + bucket: str, + project_id: str + ) -> None: + """Initialize the attributes""" + self.username = username + self.password = password + self.host = host + self.port = port + self.dbname = dbname + self.bucket = bucket + self.project_id = project_id + + + def connect_netezza_conn(self): + """ + Initializes a connection pool for a Mssql. + Uses the jaydebeapi Python package. + """ + try: + logger.info("Connecting to the Netezza Database...") + # Create a connection to Netezza + conn = jaydebeapi.connect( + "org.netezza.Driver", + f"jdbc:netezza://{self.host}:{self.port}/{self.dbname}", + {"user": self.username, "password": self.password}, + ) + except Exception as error: + logger.error("Connection Has Failed... %s", str(error)) + sys.exit(1) + return conn + + + def extract_metastore(self, conn: str): + """Function to execute the core logic for metastore extraction""" + try: + cursor = conn.cursor() + # Execute the metadata query + query = """ + SELECT + COLUMN_NAME, + DATA_TYPE, + COLUMN_LENGTH, + COLUMN_PRECISION, + COLUMN_SCALE, + COLUMN_NULLABLE, + COLUMN_IDENTITY, + COLUMN_DEFAULT, + COLUMN_COMMENT, + FROM + SYSCAT.COLUMNS WHERE TABLE_NAME = \'mytable\' + """ + cursor.execute(query) + + # Fetch all the rows from the result set + rows = cursor.fetchall() + logger.info("---Extraction Completed\n") + # Create a CSV string from the metadata rows + csv_content = "" + for row in rows: + csv_content += ",".join(str(field) for field in row) + "\n" + + # Upload the CSV content to a GCS bucket + storage_client = storage.Client() + current_timestamp = datetime.datetime.now() + UtilFunction.write_to_blob_from_file( + storage_client, self.bucket, f"metadata{current_timestamp}.csv", csv_content + ) + print("Metadata written to GCS bucket successfully!") + + # Close the cursor and connection + cursor.close() + conn.close() + except jaydebeapi.Error as error: + logger.error("Error connecting to Netezza Server: %s", str(error)) + except Exception as error: + logger.error("Error when running the query %s", str(error)) + raise + + + def netezza_metastore_discovery(self): + """Creates a connection and query to the Mssql database.""" + try: + conn = self.connect_netezza_conn() + self.extract_metastore(conn) + except Exception as error: + logger.error("Error in the main function call %s", str(error)) + sys.exit(1) + finally: + conn.close() diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/oracle_metadata_extraction.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/oracle_metadata_extraction.py new file mode 100644 index 0000000000..a1f3c1fa48 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/oracle_metadata_extraction.py @@ -0,0 +1,224 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to extract Oracle metastore data""" +import sys +import cx_Oracle +import oracledb +import datetime +from google.cloud import storage, bigquery +from utils.setup_logger import logger +from utils import utilities as UtilFunction + + +class OracleMetastoreModule: + """ + This Class has functions related to the Generic Metadata Utility Module. + It has functions which fetch different data metrics from database and + write it to GCS bucket. + + Args: + inputs (dict): Contains user input attributes + """ + + def __init__( + self, + username: str, + password: str, + host: str, + port: str, + dbname: str, + gcs_config_path: str, + project_id: str, + instant_client_path: str + ) -> None: + """Initialize the attributes""" + self.username = username + self.password = password + self.host = host + self.port = port + self.dbname = dbname + self.gcs_config_path = gcs_config_path + self.project_id = project_id + self.instant_client_path = instant_client_path + + def connect_oracle_conn(self, table_ref, bq_client): + """ + Initializes a connection pool for a Mssql. + Uses the pyodbc Python package. + """ + try: + credentials_str = ( + self.username + + "/" + + self.password + + "@" + + self.host + + ":" + + self.port + + "/" + + self.dbname + ) + # This is needed only in the case of oracle thick client + oracledb.init_oracle_client(lib_dir=self.instant_client_path) + con = oracledb.connect(credentials_str) + return con + except oracledb.DatabaseError as ex: + print(f"Connection to oracle failed: {str(ex)}") + failure_record = [ + ( + datetime.datetime.now().strftime("%s"), + None, + None, + None, + "DatabaseError", + str(ex), + "Failed", + "Check the connection credentials", + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, failure_record) + raise Exception(str(ex)) from ex + + def output_type_handler(self, cursor, name, default_type, size, precision, scale): + if default_type == cx_Oracle.DB_TYPE_CLOB: + return cursor.var(cx_Oracle.DB_TYPE_LONG, arraysize=cursor.arraysize) + if default_type == cx_Oracle.DB_TYPE_BLOB: + return cursor.var(cx_Oracle.DB_TYPE_LONG_RAW, arraysize=cursor.arraysize) + + + def extract_metastore(self, con, gcs_client, bq_client, table_config, source_bucket_name, source_dataset, table_ref): + """Function to execute the core logic for metastore extraction""" + try: + cursor = con.cursor() + con.outputtypehandler = self.output_type_handler + for row in table_config: + try: + query = f""" + WITH cte_sql AS + ( + select table_name table_name, 0 seq, 'CREATE TABLE ' || rtrim(owner)||'.'||rtrim(table_name) || '(' AS sql_out + from all_tab_columns where owner = upper('{row["table_name"].split(".")[0].strip()}') AND table_name in (upper('{row["table_name"].split(".")[1].strip()}') + ) + union + select table_name table_name, + column_id seq, + decode(column_id,1,' ',' ,')|| + rtrim(column_name)||' '|| + rtrim(data_type) ||' '|| + rtrim(decode(data_type,'DATE',null,'LONG',null, + 'NUMBER',decode(to_char(data_precision),null,null,'('), + '(')) || + rtrim(decode(data_type, + 'DATE',null, + 'CHAR',data_length, + 'VARCHAR2',data_length, + 'NUMBER',decode(to_char(data_precision),null,null, + to_char(data_precision) || ',' || to_char(data_scale)), + 'LONG',null, + '')) || + rtrim(decode(data_type,'DATE',null,'LONG',null, + 'NUMBER',decode(to_char(data_precision),null,null,')'), + ')')) ||' '|| + rtrim(decode(nullable,'N','NOT NULL',null)) AS sql_out + from all_tab_columns where owner = upper('{row["table_name"].split(".")[0].strip()}') AND table_name in ( upper('{row["table_name"].split(".")[1].strip()}')) + union + select table_name table_name, + 999999 seq, + ')' AS sql_out + from all_tab_columns + where owner = upper('{row["table_name"].split(".")[0].strip()}') + AND table_name in (upper('{row["table_name"].split(".")[1].strip()}')) + ) + select + xmlagg (xmlelement (e, sql_out || '') ORDER BY seq).extract ('//text()').getclobval() sql_output + from + cte_sql + group by + table_name""" + cursor.execute(query) + print("Query Executed Successfully") + output = cursor.fetchall() + output_str = str(output[0][0]) + UtilFunction.write_to_blob( + gcs_client, + source_bucket_name, + f"{source_dataset}/{row['table_name'].split('.')[0].strip()}-\ + {row['table_name'].split('.')[1].strip()}.sql", + output_str, + ) + except Exception as ex: + failure_record = [ + ( + datetime.datetime.now().strftime("%s"), + None, + row["table_name"].split(".")[0].strip(), + row["table_name"].split(".")[1].strip(), + "Excecution Error", + str(ex), + "Failed", + "Check the query", + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, failure_record) + else: + print( + f"DDL Generated Successfully for table \ + {row['table_name'].split('.')[1].strip()}" + ) + file_path = f"gs://{source_bucket_name}/{source_dataset}/\ + {row['table_name'].split('.')[0].strip()}-{row['table_name'].split('.')[1].strip()}.sql" + success_record = [ + ( + datetime.datetime.now().strftime("%s"), + file_path, + row["table_name"].split(".")[0].strip(), + row["table_name"].split(".")[1].strip(), + "DDL Extraction", + "DDL Generated Successfully", + "Success", + None, + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, success_record) + con.commit() + print( + "Connection close in case of failure of any table check the log table in Big Query" + ) + except Exception as error: + logger.error("Error in the Extract Metastore function call %s", str(error)) + + def oracle_metastore_discovery(self): + """Creates a connection and query to the Oracle database.""" + try: + config_source_bucket_name = self.gcs_config_path.split("//")[1].split("/")[0] + config_source_prefix = self.gcs_config_path.split(config_source_bucket_name + "/")[1] + gcs_client = storage.Client(self.project_id) + + migration_config_dict = UtilFunction.read_config(gcs_client, config_source_bucket_name, config_source_prefix) + + # Read variable values define in config file + gcs_source_path = migration_config_dict["gcs_source_path"] + dataset_location = migration_config_dict["dataset_location"] + target_dataset = migration_config_dict["target_dataset"] + source_bucket_name = migration_config_dict["source_bucket_name"] + source_dataset = gcs_source_path.split("//")[1].split("/")[1] + table_config = migration_config_dict["table_config"] + bq_client = bigquery.Client(project=self.project_id, location=dataset_location) + table_ref = UtilFunction.create_log_table(self.project_id, target_dataset, bq_client) + con = self.connect_oracle_conn(table_ref, bq_client) + self.extract_metastore(con, gcs_client, bq_client, table_config, source_bucket_name, source_dataset, table_ref) + con.close() + except Exception as error: + logger.error("Error in the main function call %s", str(error)) + sys.exit(1) diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/snowflake_metadata_extraction.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/snowflake_metadata_extraction.py new file mode 100644 index 0000000000..ed7727348a --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/snowflake_metadata_extraction.py @@ -0,0 +1,171 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to extract Snowflake metastore data""" +import sys +import datetime +import snowflake.connector +from snowflake.connector.errors import DatabaseError +from google.cloud import storage, bigquery +from utils.setup_logger import logger +from utils import utilities as UtilFunction + + +class SnowflakeMetastoreModule: + """ + This Class has functions related to the Generic Metadata Utility Module. + It has functions which fetch different data metrics from database and + write it to GCS bucket. + + Args: + inputs (dict): Contains user input attributes + """ + + def __init__( + self, + username: str, + password: str, + account: str, + warehouse: str, + dbname: str, + schema: str, + gcs_config_path: str, + project_id: str, + ) -> None: + """Initialize the attributes""" + self.username = username + self.password = password + self.account = account + self.warehouse = warehouse + self.dbname = dbname + self.schema = schema + self.gcs_config_path = gcs_config_path + self.project_id = project_id + + def connect_snowflake_conn(self, table_ref, bq_client): + """ + Initializes a connection pool for a Mssql. + Uses the pyodbc Python package. + """ + try: + ctx = snowflake.connector.connect( + user=self.username, + password=self.password, + account=self.account, + warehouse=self.warehouse, + database=self.dbname, + schema=self.schema + ) + return ctx + except DatabaseError as ex: + print(f"Connection to snowflake failed: {str(ex)}") + failure_record = [ + ( + datetime.datetime.now().strftime("%s"), + None, + None, + None, + "DatabaseError", + str(ex), + "Failed", + "Check the connection credentials", + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, failure_record) + raise Exception(str(ex)) from ex + + + def extract_metastore(self, con, gcs_client, bq_client, table_config, source_bucket_name, source_dataset, table_ref): + """Function to execute the core logic for metastore extraction""" + try: + cursor = con.cursor() + for row in table_config: + try: + query = """ SELECT GET_DDL('TABLE',upper('{1}'),true) """.format(row['table_name'].split('.')[0].strip(),row['table_name'].split('.')[1].strip()) + cursor.execute(query) + output = cursor.fetchall() + output_str = str(output[0][0]) + UtilFunction.write_to_blob( + gcs_client, + bucket_name=source_bucket_name, + file_name=f"{source_dataset}/{row['table_name'].split('.')[0].strip()}-\ + {row['table_name'].split('.')[1].strip()}.sql", + content=output_str, + ) + except Exception as ex: + failure_record = [ + ( + datetime.datetime.now().strftime("%s"), + None, + row["table_name"].split(".")[0].strip(), + row["table_name"].split(".")[1].strip(), + "Excecution Error", + str(ex), + "Failed", + "Check the query", + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, failure_record) + else: + print( + f"DDL Generated Successfully for table \ + {row['table_name'].split('.')[1].strip()}" + ) + file_path = f"gs://{source_bucket_name}/{source_dataset}/\ + {row['table_name'].split('.')[0].strip()}-{row['table_name'].split('.')[1].strip()}.sql" + success_record = [ + ( + datetime.datetime.now().strftime("%s"), + file_path, + row["table_name"].split(".")[0].strip(), + row["table_name"].split(".")[1].strip(), + "DDL Extraction", + "DDL Generated Successfully", + "Success", + None, + ) + ] + UtilFunction.log_table_data(table_ref, bq_client, success_record) + cursor.close() + con.close() + except Exception as error: + logger.error("Error in the Extract Metastore function call %s", str(error)) + + def snowflake_metastore_discovery(self): + """Creates a connection and query to the Snowflake database.""" + try: + config_source_bucket_name = self.gcs_config_path.split("//")[1].split("/")[0] + config_source_prefix = self.gcs_config_path.split(config_source_bucket_name + "/")[1] + gcs_client = storage.Client(self.project_id) + + migration_config_dict = UtilFunction.read_config(gcs_client, config_source_bucket_name, config_source_prefix) + + # Read variable values define in config file + gcs_source_path = migration_config_dict["gcs_source_path"] + dataset_location = migration_config_dict["dataset_location"] + target_dataset = migration_config_dict["target_dataset"] + source_bucket_name = migration_config_dict["source_bucket_name"] + source_dataset = gcs_source_path.split("//")[1].split("/")[1] + table_config = migration_config_dict["table_config"] + + bq_client = bigquery.Client(project=self.project_id, location=dataset_location) + + table_ref = UtilFunction.create_log_table(self.project_id, target_dataset, bq_client) + + con = self.connect_snowflake_conn(table_ref, bq_client) + self.extract_metastore(con, gcs_client, bq_client, table_config, source_bucket_name, source_dataset, table_ref) + con.close() + except Exception as error: + logger.error("Error in the main function call %s", str(error)) + sys.exit(1) + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/__init__.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/setup_logger.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/setup_logger.py new file mode 100644 index 0000000000..da56e3b428 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/setup_logger.py @@ -0,0 +1,28 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Logging module to log the error and other information""" +import logging + +logging.basicConfig(level=logging.INFO) + +logger = logging.getLogger("Generic DDL Migration Utility") + +ch = logging.StreamHandler() +ch.setLevel(logging.INFO) + +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + +ch.setFormatter(formatter) + +logger.addHandler(ch) diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/utilities.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/utilities.py new file mode 100644 index 0000000000..50370995e9 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/utils/utilities.py @@ -0,0 +1,87 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Utility for the function used in generic ddl extractor utility modules""" +from utils.setup_logger import logger +from google.cloud import bigquery +import ast +import json + +def write_to_blob_from_file(gcs_client, bucket_name, destination_file_name, source_file_name): + """Function to write data in gcs bucket""" + bucket = gcs_client.bucket(bucket_name) + if bucket.exists(): + blob = bucket.blob(destination_file_name) + blob.upload_from_filename(source_file_name, "text/csv") + else: + logger.info("Bucket doesn't exist") + + +def create_log_table(project_id, target_dataset, client): + """Function to create log table for storing the job fopailure""" + table_id = project_id + "." + target_dataset + ".report_status_log_tbl" + schema = [ + bigquery.SchemaField("Timestamp", "STRING"), + bigquery.SchemaField("FilePath", "STRING"), + bigquery.SchemaField("Schemaname", "STRING"), + bigquery.SchemaField("TableName", "STRING"), + bigquery.SchemaField("Category", "STRING"), + bigquery.SchemaField("Message", "STRING"), + bigquery.SchemaField("Status", "STRING"), + bigquery.SchemaField("Action", "STRING"), + ] + table_address = bigquery.Table(table_id, schema=schema) + table_ref = client.create_table(table_address, exists_ok=True) + return table_ref + + +def log_table_data(table, client, records): + """Function to log the table data""" + try: + client.insert_rows(table, records) + print(f"Table : {table} logged Successfully in Log Table") + except ValueError as ex: + print(f"Found the error when loading the data in table : {ex}") + + +def write_to_blob(gcs_client, bucket_name, file_name, content): + """Function to write the data to gcs bucket""" + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob(file_name) + blob.upload_from_string(content) + + +def readgcs_file(gcs_client, bucket_name, prefix): + """Function to read the gcs file""" + bucket = gcs_client.get_bucket(bucket_name) + blob = bucket.get_blob(prefix) + contents = blob.download_as_string() + return contents.decode("utf-8") + + +def parse_json(raw_string): + """Function to parse the json""" + config_parsed_data = ast.literal_eval(raw_string) + config_intermediate_data = json.dumps(config_parsed_data) + parsed_json_data = json.loads(config_intermediate_data) + return parsed_json_data + + +def read_config(gcs_client, config_source_bucket_name, config_source_prefix): + # Read Config File values: + config_string = readgcs_file( + gcs_client, config_source_bucket_name, config_source_prefix + ) + migration_config_dict = parse_json(config_string) + return migration_config_dict + diff --git a/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/vertica_metadata_extraction.py b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/vertica_metadata_extraction.py new file mode 100644 index 0000000000..02d5d56719 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/DDL_Extractor/vertica_metadata_extraction.py @@ -0,0 +1,132 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module to extract vertica metastore data""" +import sys +import datetime +import vertica_python +from google.cloud import storage +from utils.setup_logger import logger +from utils import utilities as UtilFunction + + +class VerticaMetastoreModule: + """ + This Class has functions related to the Generic Metadata Utility Module. + It has functions which fetch different data metrics from database and + write it to GCS bucket. + + Args: + inputs (dict): Contains user input attributes + """ + + def __init__( + self, + username: str, + password: str, + host: str, + port: str, + dbname: str, + bucket: str, + project_id: str + ) -> None: + """Initialize the attributes""" + self.username = username + self.password = password + self.host = host + self.port = port + self.dbname = dbname + self.bucket = bucket + self.project_id = project_id + + + def connect_vertica_conn(self): + """ + Initializes a connection pool for a Vertica. + Uses the vertica_python Python package. + """ + try: + # Construct the connection string + connection_string = f"vertica://{self.username}:{self.password}@{self.host}:{self.port}/{self.dbname}" + logger.info("Connecting to the vertica Database...") + # Connect to Vertica + conn = vertica_python.connect(connection_string) + return conn + except Exception as error: + logger.error("Connection Has Failed... %s", str(error)) + sys.exit(1) + + + def extract_metastore_from_on_prem(self, conn: str): + """Function to execute the core logic for metastore extraction""" + try: + cursor = conn.cursor() + # Execute the metadata query + query = """ + SELECT + column_name, + data_type, + character_maximum_length, + numeric_precision, + numeric_scale, + is_nullable, + is_unique, + is_primary_key, + is_foreign_key, + foreign_key_table_name, + foreign_key_column_name, + FROM + v_catalog.columns + WHERE + table_name = 'my_table' + ORDER BY + column_name; + """ + cursor.execute(query) + + # Fetch all the rows from the result set + rows = cursor.fetchall() + logger.info("---Extraction Completed\n") + # Create a CSV string from the metadata rows + csv_content = "" + for row in rows: + csv_content += ",".join(str(field) for field in row) + "\n" + + # Upload the CSV content to a GCS bucket + storage_client = storage.Client() + current_timestamp = datetime.datetime.now() + UtilFunction.write_to_blob_from_file( + storage_client, self.bucket, f"metadata{current_timestamp}.csv", csv_content + ) + print("Metadata written to GCS bucket successfully!") + + # Close the cursor and connection + cursor.close() + conn.close() + except vertica_python.Error as error: + logger.error("Error connecting to Vertica Server: %s", str(error)) + except Exception as error: + logger.error("Error when running the query %s", str(error)) + raise + + + def vertica_metastore_discovery(self): + """Creates a connection and query to the Mssql database.""" + try: + conn = self.connect_vertica_conn() + self.extract_metastore_from_on_prem(conn) + conn.close() + except Exception as error: + logger.error("Error in the main function call %s", str(error)) + sys.exit(1) + diff --git a/examples/bigquery-generic-ddl-migration-utility/README.md b/examples/bigquery-generic-ddl-migration-utility/README.md new file mode 100644 index 0000000000..c4791920fc --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/README.md @@ -0,0 +1,30 @@ +The Generic DDL Migration Utility does the following functionalities: + +1. The script connects to Generic Database (Oracle, Snowflake, MSSQL, Vertica, Neteeza). +2. The script uses the metadata table (all_tab_columns) to retrieve the table schema information. +3. The script produces the "create table" statement using the schema information and store the extracted ddl in the specified gcs path. +4. The script calls the BigQuery Migration API and converts the ddl to the BigQuery DDL and placed it in the specified gcs path. +5. The script create the Bigquery Tables in the specified target dataset. +6. The table structure will include source columns, metadata columns and paritioning and clustering info. +7. The script archives the DDL files created by the scripts (generic_ddl_extraction.py, generic_bq_converter.py and archive_ddl.py). +8. The status of each table conversion is logged in the audit table in the target datset. + + +The order of execution of the script is as follows along with the command mentioned below: + +1. DDL_Extractor -> generic_ddl_extraction.py + command: python3 generic_ddl_extractions.py --dbtype --secret_name --gcs_config_path --project_id +2. DDL_Converter -> generic_bq_converter.py + command: python3 generic_bq_converter.py --gcs_config_path --project_id --db_type +3. BQ_Table_Creator -> bq_table_creator.py + command: python3 bq_table_creator.py --gcs_config_path --project_id +4. DDL_Archiver -> archive_ddl.py + command: python3 archive_ddl.py --gcs_config_path --project_id + +## Business Requirements + +To create a common repository for metadata extraction for each of the source databases, so that there is a standard way to retrieve this information and also avoid any duplication efforts from different engagements. + +## Asset Feature + +The utility that will connect to each of the legacy databases and extract the Table Metadata by reading from different internal system tables, formatting the information and producing the final Table Metadata to be migrated to GCP. diff --git a/examples/bigquery-generic-ddl-migration-utility/config/generic-ddl-extraction-config.json b/examples/bigquery-generic-ddl-migration-utility/config/generic-ddl-extraction-config.json new file mode 100644 index 0000000000..a8e3fd06f4 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/config/generic-ddl-extraction-config.json @@ -0,0 +1,27 @@ +{ + "table_config": [ + { + "table_name": "ACCOUNT.ORDER", + "clustering_fields": "account_id", + "partitioning_field": "insertion_time" + + }, + { + "table_name": "ACCOUNT.ORDER_ITEM", + "clustering_fields": "account_id", + "partitioning_field": "insertion_time" + + } + ], + "project_id" : "POC" , + "dataset_location" : "US" , + "target_table_prefix" : "", + "gcs_source_path" : "gs://generic-migration/generic_ddl", + "target_dataset" : "trgt_db", + "archive_bucket_name" : "generic-migration-archive", + "table_create_flag" : "true", + "object_name_mapping_path" : "config/object_name_mapping.json", + "audit_column_config_path" : "config/metadata_columns.json", + "source_bucket_name" : "generic-migration", + "default_database" : "POC" +} \ No newline at end of file diff --git a/examples/bigquery-generic-ddl-migration-utility/config/metadata_columns.json b/examples/bigquery-generic-ddl-migration-utility/config/metadata_columns.json new file mode 100644 index 0000000000..e4fcc39ba8 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/config/metadata_columns.json @@ -0,0 +1,17 @@ +[ + { + "name": "_metadata_timestamp", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "_metadata_change_type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "_metadata_read_method", + "type": "STRING", + "mode": "NULLABLE" + } +] \ No newline at end of file diff --git a/examples/bigquery-generic-ddl-migration-utility/config/object_name_mapping.json b/examples/bigquery-generic-ddl-migration-utility/config/object_name_mapping.json new file mode 100644 index 0000000000..6dabcfb54e --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/config/object_name_mapping.json @@ -0,0 +1,15 @@ +{ + "name_map": [ + { + "source": { + "type_": "SCHEMA", + "database": "POC", + "schema": "ACCOUNT" + }, + "target": { + "database": "POC", + "schema": "ACCOUNT_TGT" + } + } + ] + } \ No newline at end of file diff --git a/examples/bigquery-generic-ddl-migration-utility/requirements.txt b/examples/bigquery-generic-ddl-migration-utility/requirements.txt new file mode 100644 index 0000000000..d94ef85b70 --- /dev/null +++ b/examples/bigquery-generic-ddl-migration-utility/requirements.txt @@ -0,0 +1,11 @@ +google-cloud-storage +google-cloud-bigquery +google-cloud-secret-manager +google-cloud-bigquery-migration +google-api-core +vertica_python +jaydebeapi +oracledb +pyodbc +cx_Oracle +snowflake-connector-python diff --git a/examples/bigquery-oracle-ddl-migration-utility/DDL_Extractor/oracle_ddl_extraction.py b/examples/bigquery-oracle-ddl-migration-utility/DDL_Extractor/oracle_ddl_extraction.py index 665c37c2b8..843e7800cb 100644 --- a/examples/bigquery-oracle-ddl-migration-utility/DDL_Extractor/oracle_ddl_extraction.py +++ b/examples/bigquery-oracle-ddl-migration-utility/DDL_Extractor/oracle_ddl_extraction.py @@ -157,7 +157,7 @@ def main(gcs_config_path, project_id, instant_client_path): WITH cte_sql AS ( select table_name table_name, 0 seq, 'CREATE TABLE ' || rtrim(owner)||'.'||rtrim(table_name) || '(' AS sql_out - from all_tab_columns where owner = upper('{0}') AND table_name in (upper('{1}') + from all_tab_columns where owner = upper('{row["table_name"].split(".")[0].strip()}') AND table_name in (upper('{row["table_name"].split(".")[1].strip()}') ) union select table_name table_name,