Skip to content

Commit

Permalink
Hudi loader should fail early if missing permissions on Glue catalog (#…
Browse files Browse the repository at this point in the history
…72)

It is possible to run the Hudi Lake Loader enabling the hudi option
`"hoodie.datasource.hive_sync.enable": "true"` to register/sync the
table to a Hive Metastore or Glue.

However, with that setting enabled, the Hudi delays syncing until the
first time events are committed.  For use case, it is more helpful if
the loader connects to Glue/Hive during startup, so we more quickly get
an alert if the loader is missing permissions.

This PR works my making the loader add an empty commit during startup.
It does not add any parquet file, but it triggers the loader to sync the
table to Glue/Hive.
  • Loading branch information
istreeter authored Aug 2, 2024
1 parent c2c088a commit cd920c4
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object SparkSchema {
*
* The returned schema includes atomic fields and non-atomic fields but not the load_tstamp column
*/
private[processing] def forBatch(entities: Vector[TypedTabledEntity], respectIgluNullability: Boolean): StructType = {
def forBatch(entities: Vector[TypedTabledEntity], respectIgluNullability: Boolean): StructType = {
val nonAtomicFields = entities.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ package com.snowplowanalytics.snowplow.lakes.tables

import cats.implicits._
import cats.effect.Sync
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.snowplow.lakes.Config
import com.snowplowanalytics.snowplow.lakes.processing.SparkSchema

import scala.jdk.CollectionConverters._

class HudiWriter(config: Config.Hudi) extends Writer {

private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]
Expand Down Expand Up @@ -58,7 +60,9 @@ class HudiWriter(config: Config.Hudi) extends Writer {
spark.sql(s"""
CALL archive_commits(table => '$internal_table_name')
""")
}.void
}.void *>
// We make an empty commit during startup, so the loader can fail early if we are missing any permissions
write[F](spark.createDataFrame(List.empty[Row].asJava, SparkSchema.forBatch(Vector.empty, true)))
}

private def maybeCreateDatabase[F[_]: Sync](spark: SparkSession): F[Unit] =
Expand Down

0 comments on commit cd920c4

Please sign in to comment.