Skip to content

Commit

Permalink
Updates to CFN and README for additional support.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjmaws committed Dec 26, 2024
1 parent 2fcb230 commit b9fdfc2
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 129 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Before getting started with the migration process, ensure the following is in pl
3. **git**: Install [git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) to clone the sample code repository.
4. **Source Data**: Identify the existing Amazon S3 bucket and the Glue Data Catalog table that contains the tabular data to migrate.
- If the source bucket is encrypted with [CMK-KMS](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#customer-cmk), remember to grant the "EMREc2role" created by the CloudFormation template, in the Stack Output, access to the CMK-KMS key prior to running the migration.
- This solution supports only Apache Iceberg source tables registered with AWS Glue Data Catalog.
- This solution supports source AWS Glue Catalog Standard and Apache Iceberg table formats.
5. **Destination S3 Tables**: Determine the Amazon S3 table bucket and namespace/database where tabular data will migrate.
6. **IAM Permissions**: Ensure you have the necessary IAM permissions to create and manage the required AWS resources, such as CloudFormation stacks, Amazon S3, AWS Glue, Amazon EMR, and AWS Step Functions.
- If an [AWS Lake Formation](https://aws.amazon.com/lake-formation/) table is referenced, specific Lake Formation permissions (such as SELECT, DESCRIBE) on Glue databases and tables are granted.
Expand Down
100 changes: 70 additions & 30 deletions scripts/pyspark/mys3tablespysparkscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
parser.add_argument('--data_source_bucket', help="Source data S3 bucket name.")
parser.add_argument('--data_source_db', help="Source data Glue Database name.")
parser.add_argument('--data_source_tbl', help="Source data Glue Table name.")
parser.add_argument('--data_source_type', help="Source data Glue Table Type.")
parser.add_argument('--data_source_catalog', help="Source DB/TableCatalog.")
parser.add_argument('--data_destination_s3tables_arn', help="Destination S3 Table ARN.")
parser.add_argument('--data_destination_catalog', help="Destination S3 Tables Catalog.")
Expand All @@ -31,6 +32,7 @@
data_source_bucket = args.data_source_bucket
data_source_db = args.data_source_db
data_source_tbl = args.data_source_tbl
data_source_type = args.data_source_type
data_source_catalog = args.data_source_catalog
data_destination_catalog = args.data_destination_catalog
data_destination_s3tables_arn = args.data_destination_s3tables_arn
Expand Down Expand Up @@ -74,11 +76,19 @@ def insert_update_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl)
sql_query_insert = ''
# Let's start the INSERT INTO action FOR the earlier CTAS
print(f"Initiating INSERT INTO worklow from {src_catalog}.{src_db}.{src_tbl} into {dst_db}.{dst_tbl} please hold...")
sql_query_insert = f"""
INSERT INTO
`{catalog}`.`{dst_db}`.`{dst_tbl}`
SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}`
"""
# Handle query with or without catalog name provided
if src_catalog:
sql_query_insert = f"""
INSERT INTO
`{catalog}`.`{dst_db}`.`{dst_tbl}`
SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}`
"""
else:
sql_query_insert = f"""
INSERT INTO
`{catalog}`.`{dst_db}`.`{dst_tbl}`
SELECT * FROM `{src_db}`.`{src_tbl}`
"""

# Run the INSERT INTO SQL query
spark_sql_query_insert = spark.sql(sql_query_insert)
Expand All @@ -90,9 +100,6 @@ def insert_update_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl)






# Function for performing CTAS - CREATE TABLE AS SELECT into a new destination Database/Table - creates a new DB/Table
def ctas_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl, dst_partitions):
"""
Expand All @@ -113,22 +120,43 @@ def ctas_action(src_catalog, catalog, src_db, src_tbl, dst_db, dst_tbl, dst_part
# Check the provided partition name and value for the destination Table
if dst_partitions:
if dst_partitions == "NotApplicable":
sql_query_d = f"""
CREATE TABLE IF NOT EXISTS
`{catalog}`.`{dst_db}`.`{dst_tbl}`
USING iceberg
AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}`
LIMIT 0
"""
# Handle query with or without catalog name provided
if src_catalog:
sql_query_d = f"""
CREATE TABLE IF NOT EXISTS
`{catalog}`.`{dst_db}`.`{dst_tbl}`
USING iceberg
AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}`
LIMIT 0
"""
else:
sql_query_d = f"""
CREATE TABLE IF NOT EXISTS
`{catalog}`.`{dst_db}`.`{dst_tbl}`
USING iceberg
AS SELECT * FROM `{src_db}`.`{src_tbl}`
LIMIT 0
"""
else:
sql_query_d = f"""
CREATE TABLE IF NOT EXISTS
`{catalog}`.`{dst_db}`.`{dst_tbl}`
USING iceberg
PARTITIONED BY {dst_partitions}
AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}`
LIMIT 0
"""
# Handle query with or without catalog name provided
if src_catalog:
sql_query_d = f"""
CREATE TABLE IF NOT EXISTS
`{catalog}`.`{dst_db}`.`{dst_tbl}`
USING iceberg
PARTITIONED BY {dst_partitions}
AS SELECT * FROM `{src_catalog}`.`{src_db}`.`{src_tbl}`
LIMIT 0
"""
else:
sql_query_d = f"""
CREATE TABLE IF NOT EXISTS
`{catalog}`.`{dst_db}`.`{dst_tbl}`
USING iceberg
PARTITIONED BY {dst_partitions}
AS SELECT * FROM `{src_db}`.`{src_tbl}`
LIMIT 0
"""

# Run the CTAS SQL query
spark_sql_query_d = spark.sql(sql_query_d)
Expand Down Expand Up @@ -176,17 +204,29 @@ def initiate_workflow():
try:
# First let's query the source table
print(f"Let do a test query of the source table {data_source_db}.{data_source_tbl} to see if we can perform a successful query")
query_table_data(data_source_catalog, data_source_db, data_source_tbl)
if data_source_type == 'Standard':
query_table_data(None, data_source_db, data_source_tbl)
elif data_source_type == 'Iceberg':
query_table_data(data_source_catalog, data_source_db, data_source_tbl)
print(f"Test query of the source table {data_source_db}.{data_source_tbl} is successful proceeding to main task")
# Choose the CTAS option to create new Amazon S3 Table Bucket destination NameSpace and Table
if data_migration_type == 'New-Migration':
print(f"We are performing a new migration, so will use CTAS to create a new table and load data")
ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace,
data_destination_s3tables_tbl, data_destination_s3tables_partitions
)
# Now that we have successfully created the destination table, let's perform an INSERT INTO
insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl,
data_destination_s3tables_namespace, data_destination_s3tables_tbl)
if data_source_type == 'Iceberg':
print(f"Source Table type is Hive....")
ctas_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace,
data_destination_s3tables_tbl, data_destination_s3tables_partitions
)
# Now that we have successfully created the destination table, let's perform an INSERT INTO
insert_update_action(data_source_catalog, data_destination_catalog, data_source_db, data_source_tbl,
data_destination_s3tables_namespace, data_destination_s3tables_tbl)
elif data_source_type == 'Standard':
ctas_action(None, data_destination_catalog, data_source_db, data_source_tbl, data_destination_s3tables_namespace,
data_destination_s3tables_tbl, data_destination_s3tables_partitions
)
# Now that we have successfully created the destination table, let's perform an INSERT INTO
insert_update_action(None, data_destination_catalog, data_source_db, data_source_tbl,
data_destination_s3tables_namespace, data_destination_s3tables_tbl)

# Now we are done with CTAS and INSERT INTO, let's perform some verifications on the destination Table
# Let's query the destination table
Expand Down
Loading

0 comments on commit b9fdfc2

Please sign in to comment.