Skip to content

Commit

Permalink
Merge pull request #7055 from ministryofjustice/DV_GlueJobV4c_Improve…
Browse files Browse the repository at this point in the history
…ments

DV_GluJobV4c_Improvements
  • Loading branch information
madhu-k-sr2 authored Jul 11, 2024
2 parents 638ce7e + 56eeb6b commit c6d14db
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ resource "aws_glue_job" "dms_dv_glue_job_v4a" {
"--conf" = <<EOF
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED
--conf spark.sql.parquet.aggregatePushdown=true
--conf spark.sql.shuffle.partitions=2001
--conf spark.sql.shuffle.partitions=2001
--conf spark.sql.files.maxPartitionBytes=128m
EOF

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
# ===============================================================================

sc = SparkContext()
# sc._jsc.hadoopConfiguration().set("spark.executor.memory", "9g")
# sc._jsc.hadoopConfiguration().set("spark.executor.cores", "3")
sc._jsc.hadoopConfiguration().set("spark.memory.offHeap.enabled", "true")
sc._jsc.hadoopConfiguration().set("spark.memory.offHeap.size", "3g")
sc._jsc.hadoopConfiguration().set("spark.dynamicAllocation.enabled", "true")
Expand Down Expand Up @@ -269,6 +267,38 @@ def get_rds_df_between_pkey_ids(in_rds_db_name, in_table_name,
.load())


def get_df_read_rds_db_tbl_int_pkey(in_rds_db_name, in_table_name,
jdbc_partition_column,
jdbc_partition_col_lowerbound,
jdbc_partition_col_upperbound,
jdbc_read_partitions_num
) -> DataFrame:
given_rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"]

numPartitions = jdbc_read_partitions_num
# Note: numPartitions is normally equal to number of executors defined.
# The maximum number of partitions that can be used for parallelism in table reading and writing.
# This also determines the maximum number of concurrent JDBC connections.

query_str = f"""
SELECT *
FROM {given_rds_sqlserver_db_schema}.[{in_table_name}]
WHERE {jdbc_partition_column} BETWEEN {jdbc_partition_col_lowerbound} AND {jdbc_partition_col_upperbound}
""".strip()

return (spark.read.format("jdbc")
.option("url", get_rds_db_jdbc_url(in_rds_db_name))
.option("driver", RDS_DB_INSTANCE_DRIVER)
.option("user", RDS_DB_INSTANCE_USER)
.option("password", RDS_DB_INSTANCE_PWD)
.option("dbtable", f"""({query_str}) as t""")
.option("partitionColumn", jdbc_partition_column)
.option("lowerBound", jdbc_partition_col_lowerbound)
.option("upperBound", jdbc_partition_col_upperbound)
.option("numPartitions", numPartitions)
.load())


def get_rds_tbl_col_attributes(in_rds_db_name, in_tbl_name) -> DataFrame:
given_rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"]

Expand Down Expand Up @@ -591,6 +621,7 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb) ->
# -------------------------------------------------------

# df_rds_columns_list = rds_db_table_empty_df.columns

df_rds_dtype_dict = get_dtypes_dict(rds_db_table_empty_df)
int_dtypes_colname_list = [colname for colname, dtype in df_rds_dtype_dict.items()
if dtype in INT_DATATYPES_LIST]
Expand Down Expand Up @@ -649,31 +680,39 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb) ->
while (jdbc_partition_col_upperbound+rds_rows_per_batch) <= pkey_max_value:
loop_count += 1

jdbc_partition_col_lowerbound = jdbc_partition_col_upperbound+1
jdbc_partition_col_lowerbound = 0 if jdbc_partition_col_upperbound == 0 \
else jdbc_partition_col_upperbound+1
LOGGER.info(f"""{loop_count}-jdbc_partition_col_lowerbound = {jdbc_partition_col_lowerbound}""")

jdbc_partition_col_upperbound = jdbc_partition_col_lowerbound + rds_rows_per_batch
LOGGER.info(f"""{loop_count}-jdbc_partition_col_upperbound = {jdbc_partition_col_upperbound}""")

df_rds_temp = (get_rds_df_between_pkey_ids(rds_db_name, rds_tbl_name,
jdbc_partition_column,
jdbc_partition_col_lowerbound,
jdbc_partition_col_upperbound))
# df_rds_temp = (get_rds_df_between_pkey_ids(rds_db_name, rds_tbl_name,
# jdbc_partition_column,
# jdbc_partition_col_lowerbound,
# jdbc_partition_col_upperbound))

df_rds_temp = get_df_read_rds_db_tbl_int_pkey(rds_db_name, rds_tbl_name,
jdbc_partition_column,
jdbc_partition_col_lowerbound,
jdbc_partition_col_upperbound,
given_df_repartition_num)

LOGGER.info(f"""{loop_count}-df_rds_temp-{db_sch_tbl}: READ PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""")

df_rds_temp_t3, trim_str_msg, trim_ts_ms_msg = apply_rds_transforms(df_rds_temp, rds_db_name, rds_tbl_name)
df_rds_temp_t4, trim_str_msg, trim_ts_ms_msg = apply_rds_transforms(df_rds_temp, rds_db_name, rds_tbl_name)
additional_msg = trim_str_msg+trim_ts_ms_msg if trim_str_msg+trim_ts_ms_msg != '' else additional_msg

if loop_count%20 == 0:
given_df_repartition_num = given_df_repartition_num - 10
# if loop_count%20 == 0:
# given_df_repartition_num = given_df_repartition_num - 10

msg_prefix = f"""df_rds_temp_t3-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: >> RE-PARTITIONING on {jdbc_partition_column} <<""")
df_rds_temp_t4 = df_rds_temp_t3.repartition(given_df_repartition_num,
jdbc_partition_column)
# msg_prefix = f"""df_rds_temp_t3-{rds_tbl_name}"""
# LOGGER.info(f"""{loop_count}-{msg_prefix}: >> RE-PARTITIONING on {jdbc_partition_column} <<""")
# df_rds_temp_t4 = df_rds_temp_t3.repartition(given_df_repartition_num,
# jdbc_partition_column)

msg_prefix = f"""df_rds_temp_t4-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: RDS-DF-Partitions = {df_rds_temp_t4.rdd.getNumPartitions()}""")
# msg_prefix = f"""df_rds_temp_t4-{rds_tbl_name}"""
# LOGGER.info(f"""{loop_count}-{msg_prefix}: RDS-DF-Partitions = {df_rds_temp_t4.rdd.getNumPartitions()}""")

df_rds_temp_t4_count = df_rds_temp_t4.count()

Expand Down Expand Up @@ -739,22 +778,29 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb) ->
jdbc_partition_col_upperbound = pkey_max_value
LOGGER.info(f"""{loop_count}-jdbc_partition_col_upperbound = {jdbc_partition_col_upperbound}""")

df_rds_temp = (get_rds_df_between_pkey_ids(rds_db_name, rds_tbl_name,
# df_rds_temp = (get_rds_df_between_pkey_ids(rds_db_name, rds_tbl_name,
# jdbc_partition_column,
# jdbc_partition_col_lowerbound,
# jdbc_partition_col_upperbound))

df_rds_temp = get_df_read_rds_db_tbl_int_pkey(rds_db_name, rds_tbl_name,
jdbc_partition_column,
jdbc_partition_col_lowerbound,
jdbc_partition_col_upperbound))
jdbc_partition_col_upperbound,
given_df_repartition_num)

LOGGER.info(f"""{loop_count}-df_rds_temp-{db_sch_tbl}: READ PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""")

df_rds_temp_t3, trim_str_msg, trim_ts_ms_msg = apply_rds_transforms(df_rds_temp, rds_db_name, rds_tbl_name)
df_rds_temp_t4, trim_str_msg, trim_ts_ms_msg = apply_rds_transforms(df_rds_temp, rds_db_name, rds_tbl_name)
additional_msg = trim_str_msg+trim_ts_ms_msg if trim_str_msg+trim_ts_ms_msg != '' else additional_msg

msg_prefix = f"""df_rds_temp_t3-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: >> RE-PARTITIONING on {jdbc_partition_column} <<""")
df_rds_temp_t4 = df_rds_temp_t3.repartition(int(given_df_repartition_num/2),
jdbc_partition_column)
# msg_prefix = f"""df_rds_temp_t3-{rds_tbl_name}"""
# LOGGER.info(f"""{loop_count}-{msg_prefix}: >> RE-PARTITIONING on {jdbc_partition_column} <<""")
# df_rds_temp_t4 = df_rds_temp_t3.repartition(int(given_df_repartition_num/2),
# jdbc_partition_column)

msg_prefix = f"""df_rds_temp_t4-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: RDS-DF-Partitions = {df_rds_temp_t4.rdd.getNumPartitions()}""")
# msg_prefix = f"""df_rds_temp_t4-{rds_tbl_name}"""
# LOGGER.info(f"""{loop_count}-{msg_prefix}: RDS-DF-Partitions = {df_rds_temp_t4.rdd.getNumPartitions()}""")

df_rds_temp_t4_count = df_rds_temp_t4.count()

Expand Down

0 comments on commit c6d14db

Please sign in to comment.