Skip to content

Commit

Permalink
#349 Add support for explicit add partitions in EnceladusSink.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Apr 26, 2024
1 parent 1076b36 commit 8a34bc5
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ object HiveConfig {
val defaultTemplates = defaults.templates.getOrElse(format.name, HiveQueryTemplates(
DEFAULT_CREATE_TABLE_TEMPLATE,
DEFAULT_REPAIR_TABLE_TEMPLATE,
DEFAULT_ADD_PARTITION_TEMPLATE,
DEFAULT_DROP_TABLE_TEMPLATE
))

Expand All @@ -104,13 +105,16 @@ object HiveConfig {
val repairTableTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$REPAIR_TABLE_TEMPLATE_KEY")
.getOrElse(defaultTemplates.repairTableTemplate)

val addPartitionTableTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$ADD_PARTITION_TEMPLATE_KEY")
.getOrElse(defaultTemplates.addPartitionTemplate)

val dropTableTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$DROP_TABLE_TEMPLATE_KEY")
.getOrElse(defaultTemplates.dropTableTemplate)

HiveConfig(
hiveApi = hiveApi,
database = database,
templates = HiveQueryTemplates(createTableTemplate, repairTableTemplate, dropTableTemplate),
templates = HiveQueryTemplates(createTableTemplate, repairTableTemplate, addPartitionTableTemplate, dropTableTemplate),
jdbcConfig = jdbcConfig,
ignoreFailures
)
Expand All @@ -127,6 +131,7 @@ object HiveConfig {
val templates = defaults.templates.getOrElse(format.name, HiveQueryTemplates(
DEFAULT_CREATE_TABLE_TEMPLATE,
DEFAULT_REPAIR_TABLE_TEMPLATE,
DEFAULT_ADD_PARTITION_TEMPLATE,
DEFAULT_DROP_TABLE_TEMPLATE
))

Expand All @@ -136,7 +141,7 @@ object HiveConfig {
def getNullConfig: HiveConfig = HiveConfig(
HiveApi.Sql,
None,
HiveQueryTemplates(DEFAULT_CREATE_TABLE_TEMPLATE, DEFAULT_REPAIR_TABLE_TEMPLATE, DEFAULT_DROP_TABLE_TEMPLATE),
HiveQueryTemplates(DEFAULT_CREATE_TABLE_TEMPLATE, DEFAULT_REPAIR_TABLE_TEMPLATE, DEFAULT_ADD_PARTITION_TEMPLATE, DEFAULT_DROP_TABLE_TEMPLATE),
None,
ignoreFailures = false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ abstract class HiveHelper {
tableName: String,
format: HiveFormat): Unit

def addPartition(databaseName: Option[String],
tableName: String,
partitionBy: Seq[String],
partitionValues: Seq[String],
location: String): Unit

def doesTableExist(databaseName: Option[String],
tableName: String): Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ class HiveHelperSparkCatalog(spark: SparkSession) extends HiveHelper {
}
}

def addPartition(databaseName: Option[String],
tableName: String,
partitionBy: Seq[String],
partitionValues: Seq[String],
location: String): Unit = {
if (partitionBy.length != partitionValues.length) {
throw new IllegalArgumentException(s"Partition columns and values must have the same length. Columns: $partitionBy, values: $partitionValues")
}
val fullTableName = HiveHelper.getFullTable(databaseName, tableName)
val partitionClause = partitionBy.zip(partitionValues).map { case (col, value) => s"$col='$value'" }.mkString(", ")
val sql = s"ALTER TABLE $fullTableName ADD IF NOT EXISTS PARTITION ($partitionClause) LOCATION '$location'"
log.info(s"Executing: $sql")
spark.sql(sql).collect()
}

private def dropCatalogTable(fullTableName: String): Unit = {
spark.sql(s"DROP TABLE $fullTableName").collect()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
}
}

def addPartition(databaseName: Option[String],
tableName: String,
partitionBy: Seq[String],
partitionValues: Seq[String],
location: String): Unit = {
if (partitionBy.length != partitionValues.length) {
throw new IllegalArgumentException(s"Partition columns and values must have the same length. Columns: $partitionBy, values: $partitionValues")
}
val fullTableName = HiveHelper.getFullTable(databaseName, tableName)
val partitionClause = partitionBy.zip(partitionValues).map { case (col, value) => s"$col='$value'" }.mkString(", ")
val sql = applyPartitionTemplate(hiveConfig.addPartitionTemplate, fullTableName, location, partitionClause)
queryExecutor.execute(sql)
}


override def doesTableExist(databaseName: Option[String], tableName: String): Boolean = queryExecutor.doesTableExist(databaseName, tableName)

override def dropTable(databaseName: Option[String],
Expand Down Expand Up @@ -133,4 +148,14 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
.replace("@schema", schemaDDL)
.replace("@partitionedBy", partitionDDL)
}

private def applyPartitionTemplate(template: String,
fullTableName: String,
partitionPath: String = "",
partitionClause: String = ""
): String = {
template.replace("@fullTableName", fullTableName)
.replace("@partitionPath", partitionPath)
.replace("@partitionClause", partitionClause)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.pramen.core.utils.ConfigUtils
case class HiveQueryTemplates(
createTableTemplate: String,
repairTableTemplate: String,
addPartitionTemplate: String,
dropTableTemplate: String
)

Expand All @@ -30,6 +31,7 @@ object HiveQueryTemplates {

val CREATE_TABLE_TEMPLATE_KEY = "create.table.template"
val REPAIR_TABLE_TEMPLATE_KEY = "repair.table.template"
val ADD_PARTITION_TEMPLATE_KEY = "add.partition.template"
val DROP_TABLE_TEMPLATE_KEY = "drop.table.template"

val DEFAULT_CREATE_TABLE_TEMPLATE: String =
Expand All @@ -43,6 +45,9 @@ object HiveQueryTemplates {

val DEFAULT_REPAIR_TABLE_TEMPLATE: String = "MSCK REPAIR TABLE @fullTableName"

val DEFAULT_ADD_PARTITION_TEMPLATE: String =
"""ALTER TABLE @fullTableName ADD IF NOT EXISTS PARTITION (@partitionClause) LOCATION '@partitionPath';""".stripMargin

val DEFAULT_DROP_TABLE_TEMPLATE: String = "DROP TABLE IF EXISTS @fullTableName"

def fromConfig(conf: Config): HiveQueryTemplates = {
Expand All @@ -52,12 +57,16 @@ object HiveQueryTemplates {
val repairTableTemplate = ConfigUtils.getOptionString(conf, REPAIR_TABLE_TEMPLATE_KEY)
.getOrElse(DEFAULT_REPAIR_TABLE_TEMPLATE)

val addPartitionTemplate = ConfigUtils.getOptionString(conf, ADD_PARTITION_TEMPLATE_KEY)
.getOrElse(DEFAULT_ADD_PARTITION_TEMPLATE)

val dropTableTemplate = ConfigUtils.getOptionString(conf, DROP_TABLE_TEMPLATE_KEY)
.getOrElse(DEFAULT_DROP_TABLE_TEMPLATE)

HiveQueryTemplates(
createTableTemplate = createTableTemplate,
repairTableTemplate = repairTableTemplate,
addPartitionTemplate = addPartitionTemplate,
dropTableTemplate = dropTableTemplate
)
}
Expand All @@ -66,6 +75,7 @@ object HiveQueryTemplates {
HiveQueryTemplates(
createTableTemplate = DEFAULT_CREATE_TABLE_TEMPLATE,
repairTableTemplate = DEFAULT_REPAIR_TABLE_TEMPLATE,
addPartitionTemplate = DEFAULT_ADD_PARTITION_TEMPLATE,
dropTableTemplate = DEFAULT_DROP_TABLE_TEMPLATE
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class HiveConfigSuite extends AnyWordSpec {
val defaultConfig = HiveDefaultConfig(
HiveApi.SparkCatalog,
Some("mydb1"),
Map("parquet" -> HiveQueryTemplates("create1", "repair1", "drop1")),
Map("parquet" -> HiveQueryTemplates("create1", "repair1", "add_partition1", "drop1")),
None,
ignoreFailures = true)

Expand All @@ -41,6 +41,7 @@ class HiveConfigSuite extends AnyWordSpec {
assert(hiveConfig.ignoreFailures)
assert(hiveConfig.templates.createTableTemplate.contains("create1"))
assert(hiveConfig.templates.repairTableTemplate.contains("repair1"))
assert(hiveConfig.templates.addPartitionTemplate.contains("add_partition1"))
assert(hiveConfig.templates.dropTableTemplate.contains("drop1"))
}

Expand All @@ -61,14 +62,15 @@ class HiveConfigSuite extends AnyWordSpec {
|conf {
| create.table.template = "create2"
| repair.table.template = "repair2"
| add.partition.template = "add_partition2"
| drop.table.template = "drop2"
|}
|""".stripMargin)

val defaultConfig = HiveDefaultConfig(
HiveApi.Sql,
Some("mydb1"),
Map("parquet" -> HiveQueryTemplates("create1", "repair1", "drop1")),
Map("parquet" -> HiveQueryTemplates("create1", "repair1", "add_partition1", "drop1")),
None,
ignoreFailures = false)

Expand All @@ -81,6 +83,7 @@ class HiveConfigSuite extends AnyWordSpec {
assert(hiveConfig.ignoreFailures)
assert(hiveConfig.templates.createTableTemplate.contains("create2"))
assert(hiveConfig.templates.repairTableTemplate.contains("repair2"))
assert(hiveConfig.templates.addPartitionTemplate.contains("add_partition2"))
assert(hiveConfig.templates.dropTableTemplate.contains("drop2"))
}
}
Expand All @@ -90,7 +93,7 @@ class HiveConfigSuite extends AnyWordSpec {
val defaultConfig = HiveDefaultConfig(
HiveApi.Sql,
Some("mydb"),
Map("parquet" -> HiveQueryTemplates("create", "repair", "drop")),
Map("parquet" -> HiveQueryTemplates("create", "repair", "add_partition1", "drop")),
None,
ignoreFailures = true)

Expand All @@ -102,6 +105,7 @@ class HiveConfigSuite extends AnyWordSpec {
assert(hiveConfig.ignoreFailures)
assert(hiveConfig.templates.createTableTemplate.contains("create"))
assert(hiveConfig.templates.repairTableTemplate.contains("repair"))
assert(hiveConfig.templates.addPartitionTemplate.contains("add_partition1"))
assert(hiveConfig.templates.dropTableTemplate.contains("drop"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class MetaTableSuite extends AnyWordSpec {

val defaultHiveConfig = HiveDefaultConfig(HiveApi.Sql,
Some("mydb"),
Map("parquet" -> HiveQueryTemplates("create", "repair", "drop")),
Map("parquet" -> HiveQueryTemplates("create", "repair", "add_partition", "drop")),
Some(JdbcConfig("driver", Some("url"),
user = Some("user"),
password = Some("pass")
Expand All @@ -202,6 +202,7 @@ class MetaTableSuite extends AnyWordSpec {
assert(metaTable.hiveConfig.jdbcConfig.exists(_.driver == "driver"))
assert(metaTable.hiveConfig.templates.createTableTemplate == "create")
assert(metaTable.hiveConfig.templates.repairTableTemplate == "repair")
assert(metaTable.hiveConfig.templates.addPartitionTemplate == "add_partition")
assert(metaTable.hiveConfig.templates.dropTableTemplate == "drop")
assert(metaTable.hiveConfig.ignoreFailures)
assert(metaTable.format.name == "parquet")
Expand Down Expand Up @@ -239,6 +240,7 @@ class MetaTableSuite extends AnyWordSpec {
| conf {
| create.table.template = "create2"
| repair.table.template = "repair2"
| add.partition.template = "add_partition2"
| drop.table.template = "drop2"
| }
|}
Expand All @@ -247,7 +249,7 @@ class MetaTableSuite extends AnyWordSpec {
val defaultHiveConfig = HiveDefaultConfig(
HiveApi.Sql,
Some("mydb1"),
Map("parquet" -> HiveQueryTemplates("create1", "repair1", "drop1")),
Map("parquet" -> HiveQueryTemplates("create1", "repair1", "add_partition1", "drop1")),
Some(JdbcConfig("driver1", Some("url1"),
user = Some("user1"),
password = Some("pass1")
Expand All @@ -263,6 +265,7 @@ class MetaTableSuite extends AnyWordSpec {
assert(metaTable.hiveConfig.jdbcConfig.exists(_.driver == "driver2"))
assert(metaTable.hiveConfig.templates.createTableTemplate == "create2")
assert(metaTable.hiveConfig.templates.repairTableTemplate == "repair2")
assert(metaTable.hiveConfig.templates.addPartitionTemplate == "add_partition2")
assert(metaTable.hiveConfig.templates.dropTableTemplate == "drop2")
assert(metaTable.hiveConfig.ignoreFailures)
assert(metaTable.format.name == "parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ case class EnceladusConfig(
recordsPerPartition: Option[Int],
saveEmpty: Boolean,
generateInfoFile: Boolean,
useAddPartition: Boolean,
enceladusMainClass: String,
enceladusCmdLineTemplate: String,
hiveDatabase: Option[String],
Expand All @@ -48,6 +49,7 @@ object EnceladusConfig {
val RECORDS_PER_PARTITION = "records.per.partition"
val SAVE_EMPTY_KEY = "save.empty"
val GENERATE_INFO_FILE_KEY = "info.file.generate"
val USE_ADD_PARTITION_KEY = "use.add.partition"
val TIMEZONE_ID_KEY = "timezone"

val ENCELADUS_RUN_MAIN_CLASS_KEY = "enceladus.run.main.class"
Expand Down Expand Up @@ -84,6 +86,7 @@ object EnceladusConfig {
ConfigUtils.getOptionInt(conf, RECORDS_PER_PARTITION),
ConfigUtils.getOptionBoolean(conf, SAVE_EMPTY_KEY).getOrElse(true),
ConfigUtils.getOptionBoolean(conf, GENERATE_INFO_FILE_KEY).getOrElse(true),
ConfigUtils.getOptionBoolean(conf, USE_ADD_PARTITION_KEY).getOrElse(false),
ConfigUtils.getOptionString(conf, ENCELADUS_RUN_MAIN_CLASS_KEY).getOrElse(DEFAULT_ENCELADUS_RUN_MAIN_CLASS),
ConfigUtils.getOptionString(conf, ENCELADUS_COMMAND_LINE_TEMPLATE_KEY).getOrElse(DEFAULT_ENCELADUS_COMMAND_LINE_TEMPLATE),
ConfigUtils.getOptionString(conf, HIVE_DATABASE_KEY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class EnceladusSink(sinkConfig: Config,
)
if (options.contains(HIVE_TABLE_KEY)) {
Try {
updateTable(options(HIVE_TABLE_KEY), publishBase)
updateTable(options(HIVE_TABLE_KEY), publishBase, infoDate, infoVersion)
} match {
case Success(_) =>
log.info(s"Hive table '${options(HIVE_TABLE_KEY)}' was repaired successfully.")
Expand Down Expand Up @@ -487,12 +487,18 @@ class EnceladusSink(sinkConfig: Config,
}
}

private[extras] def updateTable(hiveTable: String, publishBase: String)(implicit spark: SparkSession): Unit = {
private[extras] def updateTable(hiveTable: String, publishBase: String, infoDate: LocalDate, infoVersion: Int)(implicit spark: SparkSession): Unit = {
if (hiveHelper.doesTableExist(enceladusConfig.hiveDatabase, hiveTable)) {
log.info(s"Table ${getHiveTableFullName(hiveTable)} exists. Repairing partitions...")
hiveHelper.repairHiveTable(enceladusConfig.hiveDatabase, hiveTable, HiveFormat.Parquet)
if (enceladusConfig.useAddPartition) {
val location = s"$publishBase/enceladus_info_date=$infoDate/enceladus_info_version=$infoVersion"
log.info(s"Table '${getHiveTableFullName(hiveTable)}' exists. Adding new partition: '$location'...")
hiveHelper.addPartition(enceladusConfig.hiveDatabase, hiveTable, Seq("enceladus_info_date", "enceladus_info_version"), Seq(infoDate.toString, infoVersion.toString), location)
} else {
log.info(s"Table '${getHiveTableFullName(hiveTable)}' exists. Repairing partitions...")
hiveHelper.repairHiveTable(enceladusConfig.hiveDatabase, hiveTable, HiveFormat.Parquet)
}
} else {
log.info(s"Table ${getHiveTableFullName(hiveTable)} does not exist. Creating...")
log.info(s"Table '${getHiveTableFullName(hiveTable)}' does not exist. Creating...")
val df = spark.read.option("mergeSchema", "true").parquet(publishBase)

val schema = df.schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class EnceladusConfigSuite extends AnyWordSpec with SparkTestBase {
assert(enceladusConfig.formatOptions("my.option2") == "2")
assert(!enceladusConfig.saveEmpty)
assert(!enceladusConfig.generateInfoFile)
assert(!enceladusConfig.useAddPartition)
assert(enceladusConfig.enceladusMainClass == EnceladusConfig.DEFAULT_ENCELADUS_RUN_MAIN_CLASS)
assert(enceladusConfig.enceladusCmdLineTemplate == EnceladusConfig.DEFAULT_ENCELADUS_COMMAND_LINE_TEMPLATE)
assert(enceladusConfig.publishPartitionPattern == "enceladus_info_date={year}-{month}-{day}/enceladus_info_version={version}")
Expand All @@ -67,6 +68,7 @@ class EnceladusConfigSuite extends AnyWordSpec with SparkTestBase {
|format = "json"
|mode = "append"
|save.empty = false
|use.add.partition = true
|
|enceladus.run.main.class = "A"
|enceladus.command.line.template = "B"
Expand Down Expand Up @@ -97,6 +99,7 @@ class EnceladusConfigSuite extends AnyWordSpec with SparkTestBase {
assert(enceladusConfig.formatOptions("my.option2") == "2")
assert(!enceladusConfig.saveEmpty)
assert(!enceladusConfig.generateInfoFile)
assert(enceladusConfig.useAddPartition)
assert(enceladusConfig.enceladusMainClass == "A")
assert(enceladusConfig.enceladusCmdLineTemplate == "B")
assert(enceladusConfig.publishPartitionPattern == "aaa")
Expand Down

0 comments on commit 8a34bc5

Please sign in to comment.