diff --git a/README.md b/README.md index 62ecceb..b9dc8fb 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ Before getting started with the migration process, ensure the following is in pl 3. **git**: Install [git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) to clone the sample code repository. 4. **Source Data**: Identify the existing Amazon S3 bucket and the Glue Data Catalog table that contains the tabular data to migrate. - If the source bucket is encrypted with [CMK-KMS](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#customer-cmk), remember to grant the "EMREc2role" created by the CloudFormation template, in the Stack Output, access to the CMK-KMS key prior to running the migration. - - This solution supports only Apache Iceberg source tables registered with AWS Glue Data Catalog. + - This solution supports source AWS Glue Catalog Standard and Apache Iceberg table formats. 5. **Destination S3 Tables**: Determine the Amazon S3 table bucket and namespace/database where tabular data will migrate. 6. **IAM Permissions**: Ensure you have the necessary IAM permissions to create and manage the required AWS resources, such as CloudFormation stacks, Amazon S3, AWS Glue, Amazon EMR, and AWS Step Functions. - If an [AWS Lake Formation](https://aws.amazon.com/lake-formation/) table is referenced, specific Lake Formation permissions (such as SELECT, DESCRIBE) on Glue databases and tables are granted. diff --git a/scripts/pyspark/mys3tablespysparkscript.py b/scripts/pyspark/mys3tablespysparkscript.py index 724af8a..71533fb 100644 --- a/scripts/pyspark/mys3tablespysparkscript.py +++ b/scripts/pyspark/mys3tablespysparkscript.py @@ -15,6 +15,7 @@ parser.add_argument('--data_source_bucket', help="Source data S3 bucket name.") parser.add_argument('--data_source_db', help="Source data Glue Database name.") parser.add_argument('--data_source_tbl', help="Source data Glue Table name.") +parser.add_argument('--data_source_type', help="Source data Glue Table Type.") parser.add_argument('--data_source_catalog', help="Source DB/TableCatalog.") parser.add_argument('--data_destination_s3tables_arn', help="Destination S3 Table ARN.") parser.add_argument('--data_destination_catalog', help="Destination S3 Tables Catalog.") @@ -31,6 +32,7 @@ data_source_bucket = args.data_source_bucket data_source_db = args.data_source_db data_source_tbl = args.data_source_tbl +data_source_type = args.data_source_type data_source_catalog = args.data_source_catalog data_destination_catalog = args.data_destination_catalog data_destination_s3tables_arn = args.data_destination_s3tables_arn @@ -74,11 +76,19 @@ def insert_update_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl) sql_query_insert = '' # Let's start the INSERT INTO action FOR the earlier CTAS print(f"Initiating INSERT INTO worklow from {src_catalog}.{src_db}.{src_tbl} into {dst_db}.{dst_tbl} please hold...") - sql_query_insert = f""" - INSERT INTO - `{catalog}`.`{dst_db}`.`{dst_tbl}` - SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}` - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_insert = f""" + INSERT INTO + `{catalog}`.`{dst_db}`.`{dst_tbl}` + SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}` + """ + else: + sql_query_insert = f""" + INSERT INTO + `{catalog}`.`{dst_db}`.`{dst_tbl}` + SELECT * FROM `{src_db}`.`{src_tbl}` + """ # Run the INSERT INTO SQL query spark_sql_query_insert = spark.sql(sql_query_insert) @@ -90,9 +100,6 @@ def insert_update_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl) - - - # Function for performing CTAS - CREATE TABLE AS SELECT into a new destination Database/Table - creates a new DB/Table def ctas_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl, dst_partitions): """ @@ -113,22 +120,43 @@ def ctas_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl, dst_part # Check the provided partition name and value for the destination Table if dst_partitions: if dst_partitions == "NotApplicable": - sql_query_d = f""" - CREATE TABLE IF NOT EXISTS - `{catalog}`.`{dst_db}`.`{dst_tbl}` - USING iceberg - AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}` - LIMIT 0 - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{catalog}`.`{dst_db}`.`{dst_tbl}` + USING iceberg + AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}` + LIMIT 0 + """ + else: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{catalog}`.`{dst_db}`.`{dst_tbl}` + USING iceberg + AS SELECT * FROM `{src_db}`.`{src_tbl}` + LIMIT 0 + """ else: - sql_query_d = f""" - CREATE TABLE IF NOT EXISTS - `{catalog}`.`{dst_db}`.`{dst_tbl}` - USING iceberg - PARTITIONED BY {dst_partitions} - AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}` - LIMIT 0 - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{catalog}`.`{dst_db}`.`{dst_tbl}` + USING iceberg + PARTITIONED BY {dst_partitions} + AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}` + LIMIT 0 + """ + else: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{catalog}`.`{dst_db}`.`{dst_tbl}` + USING iceberg + PARTITIONED BY {dst_partitions} + AS SELECT * FROM `{src_db}`.`{src_tbl}` + LIMIT 0 + """ # Run the CTAS SQL query spark_sql_query_d = spark.sql(sql_query_d) @@ -176,17 +204,29 @@ def initiate_workflow(): try: # First let's query the source table print(f"Let do a test query of the source table {data_source_db}.{data_source_tbl} to see if we can perform a successful query") - query_table_data(data_source_catalog, data_source_db, data_source_tbl) + if data_source_type == 'Standard': + query_table_data(None, data_source_db, data_source_tbl) + elif data_source_type == 'Iceberg': + query_table_data(data_source_catalog, data_source_db, data_source_tbl) print(f"Test query of the source table {data_source_db}.{data_source_tbl} is successful proceeding to main task") # Choose the CTAS option to create new Amazon S3 Table Bucket destination NameSpace and Table if data_migration_type == 'New-Migration': print(f"We are performing a new migration, so will use CTAS to create a new table and load data") - ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, - data_destination_s3tables_tbl, data_destination_s3tables_partitions - ) - # Now that we have successfully created the destination table, let's perform an INSERT INTO - insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, - data_destination_s3tables_namespace, data_destination_s3tables_tbl) + if data_source_type == 'Iceberg': + print(f"Source Table type is Hive....") + ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, + data_destination_s3tables_tbl, data_destination_s3tables_partitions + ) + # Now that we have successfully created the destination table, let's perform an INSERT INTO + insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, + data_destination_s3tables_namespace, data_destination_s3tables_tbl) + elif data_source_type == 'Standard': + ctas_action(None, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, + data_destination_s3tables_tbl, data_destination_s3tables_partitions + ) + # Now that we have successfully created the destination table, let's perform an INSERT INTO + insert_update_action(None, data_destination_catalog, data_source_db, data_source_tbl, + data_destination_s3tables_namespace, data_destination_s3tables_tbl) # Now we are done with CTAS and INSERT INTO, let's perform some verifications on the destination Table # Let's query the destination table diff --git a/src/automated-migration-to-s3-tables-latest.yaml b/src/automated-migration-to-s3-tables-latest.yaml index 130052a..c62b86b 100644 --- a/src/automated-migration-to-s3-tables-latest.yaml +++ b/src/automated-migration-to-s3-tables-latest.yaml @@ -32,7 +32,8 @@ Metadata: Parameters: - YourS3Bucket - YourExistingGlueDatabase - - YourExistingGlueTable + - YourExistingGlueTable + - YourExistingTableType - Label: @@ -89,6 +90,8 @@ Metadata: default: "The source Glue Data Catalog table name" MigrationType: default: "Migration type" + YourExistingTableType: + default: "The source Glue Data Catalog table format for example Standard(Hive) or Iceberg" RecipientEmail: default: "Email address to receive job notifications" ClusterSize: @@ -162,6 +165,15 @@ Parameters: ConstraintDescription: Please provide your existing Glue table name AllowedPattern: '[\u0020-\uD7FF\uE000-\uFFFD\uD800\uDC00-\uDBFF\uDFFF\t]*' + YourExistingTableType: + AllowedValues: + - Standard + - Iceberg + Description: Please specify your source Glue table format, for example Standard or Iceberg + Type: String + Default: Standard + ConstraintDescription: Please provide your existing Glue table format + ClusterSize: Description: Please choose the size of your EMR Cluster to meet the desired migration workload @@ -211,7 +223,7 @@ Mappings: csvwithversionid: restore-and-copy/csv-manifest/with-version-id/ Parameter: catalogname: s3tablescatalog - sparkcatalog: spark_catalog + sparkcatalog: mysparkcatalog EMR: Cluster: @@ -293,10 +305,10 @@ Mappings: PrimaryInstanceCount: 1 PrimaryInstanceType: r5.4xlarge PrimaryInstanceType2: i3.4xlarge - CoreInstanceCount: 4 + CoreInstanceCount: 8 CoreInstanceType: i3.4xlarge CoreInstanceType2: r5d.4xlarge - TaskInstanceCount: 16 + TaskInstanceCount: 12 TaskInstanceType: i3.4xlarge TaskInstanceType2: r5d.4xlarge executorMemory: 28G @@ -308,7 +320,7 @@ Mappings: driverMemoryOverhead: 6G driverMaxResultsSize: 16G DiskSize: 256 - PryNodeDiskCount: 2 + PryNodeDiskCount: 3 CoreNodeDiskCount: 4 TaskNodeDiskCount: 3 @@ -319,7 +331,6 @@ Mappings: - Resources: Topic: @@ -350,7 +361,6 @@ Resources: DependsOn: - CheckTableLFAccess - CheckDBLFAccess - - CheckDefaultDBLFAccess Type: 'Custom::LambdaTrigger' Properties: ServiceToken: !GetAtt CheckResourceExistsLambdaFunction.Arn @@ -393,21 +403,6 @@ Resources: - "DESCRIBE" - CheckDefaultDBLFAccess: - Type: AWS::LakeFormation::PrincipalPermissions - Properties: - Principal: - DataLakePrincipalIdentifier: !GetAtt CheckResourceExistsIAMRole.Arn - Resource: - Database: - CatalogId: !Sub ${AWS::AccountId} - Name: default - Permissions: - - "DESCRIBE" - PermissionsWithGrantOption: - - "DESCRIBE" - - CheckResourceExistsIAMRole: Type: 'AWS::IAM::Role' @@ -734,6 +729,7 @@ Resources: parser.add_argument('--data_source_bucket', help="Source data S3 bucket name.") parser.add_argument('--data_source_db', help="Source data Glue Database name.") parser.add_argument('--data_source_tbl', help="Source data Glue Table name.") + parser.add_argument('--data_source_type', help="Source data Glue Table Type.") parser.add_argument('--data_source_catalog', help="Source DB/TableCatalog.") parser.add_argument('--data_destination_s3tables_arn', help="Destination S3 Table ARN.") parser.add_argument('--data_destination_catalog', help="Destination S3 Tables Catalog.") @@ -750,6 +746,7 @@ Resources: data_source_bucket = args.data_source_bucket data_source_db = args.data_source_db data_source_tbl = args.data_source_tbl + data_source_type = args.data_source_type data_source_catalog = args.data_source_catalog data_destination_catalog = args.data_destination_catalog data_destination_s3tables_arn = args.data_destination_s3tables_arn @@ -803,11 +800,19 @@ Resources: sql_query_insert = '' # Let's start the INSERT INTO action FOR the earlier CTAS print(f"Initiating INSERT INTO worklow from {{src_catalog}}.{{src_db}}.{{src_tbl}} into {{dst_db}}.{{dst_tbl}} please hold...") - sql_query_insert = f""" - INSERT INTO - `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` - SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_insert = f""" + INSERT INTO + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` + """ + else: + sql_query_insert = f""" + INSERT INTO + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + SELECT * FROM `{{src_db}}`.`{{src_tbl}}` + """ # Run the INSERT INTO SQL query spark_sql_query_insert = spark.sql(sql_query_insert) @@ -819,9 +824,6 @@ Resources: - - - # Function for performing CTAS - CREATE TABLE AS SELECT into a new destination Database/Table - creates a new DB/Table def ctas_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl, dst_partitions): """ @@ -842,22 +844,43 @@ Resources: # Check the provided partition name and value for the destination Table if dst_partitions: if dst_partitions == "NotApplicable": - sql_query_d = f""" - CREATE TABLE IF NOT EXISTS - `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` - USING iceberg - AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` - LIMIT 0 - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ + else: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + AS SELECT * FROM `{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ else: - sql_query_d = f""" - CREATE TABLE IF NOT EXISTS - `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` - USING iceberg - PARTITIONED BY {{dst_partitions}} - AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` - LIMIT 0 - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + PARTITIONED BY {{dst_partitions}} + AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ + else: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + PARTITIONED BY {{dst_partitions}} + AS SELECT * FROM `{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ # Run the CTAS SQL query spark_sql_query_d = spark.sql(sql_query_d) @@ -905,17 +928,29 @@ Resources: try: # First let's query the source table print(f"Let do a test query of the source table {{data_source_db}}.{{data_source_tbl}} to see if we can perform a successful query") - query_table_data(data_source_catalog, data_source_db, data_source_tbl) + if data_source_type == 'Standard': + query_table_data(None, data_source_db, data_source_tbl) + elif data_source_type == 'Iceberg': + query_table_data(data_source_catalog, data_source_db, data_source_tbl) print(f"Test query of the source table {{data_source_db}}.{{data_source_tbl}} is successful proceeding to main task") # Choose the CTAS option to create new Amazon S3 Table Bucket destination NameSpace and Table if data_migration_type == 'New-Migration': print(f"We are performing a new migration, so will use CTAS to create a new table and load data") - ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, - data_destination_s3tables_tbl, data_destination_s3tables_partitions - ) - # Now that we have successfully created the destination table, let's perform an INSERT INTO - insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, - data_destination_s3tables_namespace, data_destination_s3tables_tbl) + if data_source_type == 'Iceberg': + print(f"Source Table type is Hive....") + ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, + data_destination_s3tables_tbl, data_destination_s3tables_partitions + ) + # Now that we have successfully created the destination table, let's perform an INSERT INTO + insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, + data_destination_s3tables_namespace, data_destination_s3tables_tbl) + elif data_source_type == 'Standard': + ctas_action(None, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, + data_destination_s3tables_tbl, data_destination_s3tables_partitions + ) + # Now that we have successfully created the destination table, let's perform an INSERT INTO + insert_update_action(None, data_destination_catalog, data_source_db, data_source_tbl, + data_destination_s3tables_namespace, data_destination_s3tables_tbl) # Now we are done with CTAS and INSERT INTO, let's perform some verifications on the destination Table # Let's query the destination table @@ -1319,6 +1354,8 @@ Resources: "${DataSourceGlueDatabase}", "--data_source_tbl", "${DataSourceGlueTable}", + "--data_source_type", + "${DataSourceGlueTableType}", "--data_source_catalog", "${DataSourceCatalog}", "--data_destination_catalog", @@ -1432,6 +1469,7 @@ Resources: DataSourceS3Bucket: !Ref YourS3Bucket DataSourceGlueTable: !Ref YourExistingGlueTable DataSourceGlueDatabase: !Ref YourExistingGlueDatabase + DataSourceGlueTableType: !Ref YourExistingTableType DataSourceCatalog: !FindInMap [ PySpark, Parameter, sparkcatalog ] DataDestinationCatalog: !FindInMap [ PySpark, Parameter, catalogname ] DataDestinationS3TablesArn: !Ref S3TableBucket @@ -1509,22 +1547,6 @@ Resources: - "DESCRIBE" - GrantDefaultDBLFAccess: - DependsOn: - - CheckResourceExists - Type: AWS::LakeFormation::PrincipalPermissions - Properties: - Principal: - DataLakePrincipalIdentifier: !GetAtt EMREc2Role.Arn - Resource: - Database: - CatalogId: !Sub ${AWS::AccountId} - Name: default - Permissions: - - "DESCRIBE" - PermissionsWithGrantOption: - - "DESCRIBE" - ################################################################# End LF permissions ##################################################### diff --git a/src/function_codes/UploadScriptFunction.py b/src/function_codes/UploadScriptFunction.py index be5d937..5ce50b2 100644 --- a/src/function_codes/UploadScriptFunction.py +++ b/src/function_codes/UploadScriptFunction.py @@ -59,6 +59,7 @@ def stream_to_s3(bucket, key, body): parser.add_argument('--data_source_bucket', help="Source data S3 bucket name.") parser.add_argument('--data_source_db', help="Source data Glue Database name.") parser.add_argument('--data_source_tbl', help="Source data Glue Table name.") +parser.add_argument('--data_source_type', help="Source data Glue Table Type.") parser.add_argument('--data_source_catalog', help="Source DB/TableCatalog.") parser.add_argument('--data_destination_s3tables_arn', help="Destination S3 Table ARN.") parser.add_argument('--data_destination_catalog', help="Destination S3 Tables Catalog.") @@ -75,6 +76,7 @@ def stream_to_s3(bucket, key, body): data_source_bucket = args.data_source_bucket data_source_db = args.data_source_db data_source_tbl = args.data_source_tbl +data_source_type = args.data_source_type data_source_catalog = args.data_source_catalog data_destination_catalog = args.data_destination_catalog data_destination_s3tables_arn = args.data_destination_s3tables_arn @@ -128,11 +130,19 @@ def insert_update_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl) sql_query_insert = '' # Let's start the INSERT INTO action FOR the earlier CTAS print(f"Initiating INSERT INTO worklow from {{src_catalog}}.{{src_db}}.{{src_tbl}} into {{dst_db}}.{{dst_tbl}} please hold...") - sql_query_insert = f""" - INSERT INTO - `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` - SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_insert = f""" + INSERT INTO + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` + """ + else: + sql_query_insert = f""" + INSERT INTO + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + SELECT * FROM `{{src_db}}`.`{{src_tbl}}` + """ # Run the INSERT INTO SQL query spark_sql_query_insert = spark.sql(sql_query_insert) @@ -144,9 +154,6 @@ def insert_update_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl) - - - # Function for performing CTAS - CREATE TABLE AS SELECT into a new destination Database/Table - creates a new DB/Table def ctas_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl, dst_partitions): """ @@ -167,22 +174,43 @@ def ctas_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl, dst_part # Check the provided partition name and value for the destination Table if dst_partitions: if dst_partitions == "NotApplicable": - sql_query_d = f""" - CREATE TABLE IF NOT EXISTS - `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` - USING iceberg - AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` - LIMIT 0 - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ + else: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + AS SELECT * FROM `{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ else: - sql_query_d = f""" - CREATE TABLE IF NOT EXISTS - `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` - USING iceberg - PARTITIONED BY {{dst_partitions}} - AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` - LIMIT 0 - """ + # Handle query with or without catalog name provided + if src_catalog: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + PARTITIONED BY {{dst_partitions}} + AS SELECT * FROM `{{src_catalog}}`.`{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ + else: + sql_query_d = f""" + CREATE TABLE IF NOT EXISTS + `{{catalog}}`.`{{dst_db}}`.`{{dst_tbl}}` + USING iceberg + PARTITIONED BY {{dst_partitions}} + AS SELECT * FROM `{{src_db}}`.`{{src_tbl}}` + LIMIT 0 + """ # Run the CTAS SQL query spark_sql_query_d = spark.sql(sql_query_d) @@ -230,17 +258,29 @@ def initiate_workflow(): try: # First let's query the source table print(f"Let do a test query of the source table {{data_source_db}}.{{data_source_tbl}} to see if we can perform a successful query") - query_table_data(data_source_catalog, data_source_db, data_source_tbl) + if data_source_type == 'Standard': + query_table_data(None, data_source_db, data_source_tbl) + elif data_source_type == 'Iceberg': + query_table_data(data_source_catalog, data_source_db, data_source_tbl) print(f"Test query of the source table {{data_source_db}}.{{data_source_tbl}} is successful proceeding to main task") # Choose the CTAS option to create new Amazon S3 Table Bucket destination NameSpace and Table if data_migration_type == 'New-Migration': print(f"We are performing a new migration, so will use CTAS to create a new table and load data") - ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, - data_destination_s3tables_tbl, data_destination_s3tables_partitions - ) - # Now that we have successfully created the destination table, let's perform an INSERT INTO - insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, - data_destination_s3tables_namespace, data_destination_s3tables_tbl) + if data_source_type == 'Iceberg': + print(f"Source Table type is Hive....") + ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, + data_destination_s3tables_tbl, data_destination_s3tables_partitions + ) + # Now that we have successfully created the destination table, let's perform an INSERT INTO + insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, + data_destination_s3tables_namespace, data_destination_s3tables_tbl) + elif data_source_type == 'Standard': + ctas_action(None, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace, + data_destination_s3tables_tbl, data_destination_s3tables_partitions + ) + # Now that we have successfully created the destination table, let's perform an INSERT INTO + insert_update_action(None, data_destination_catalog, data_source_db, data_source_tbl, + data_destination_s3tables_namespace, data_destination_s3tables_tbl) # Now we are done with CTAS and INSERT INTO, let's perform some verifications on the destination Table # Let's query the destination table