Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#438 Optimize hive table existence query #439

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2839,6 +2839,14 @@ pramen {
# a column name is one of SQL reverved words.
escape.column.names = true

# If
# - true, uses this query for checking Hive table existence:
# DESCRIBE my_db.my_table
# (this is faster since it never touhces data, but may depend on Hive dialect)
# - false (default), uses this query for checking Hive table existence:
# SELECT 1 FROM my_db.my_table WHERE 0 = 1
optimize.exist.query = true

# Optional, use only if you want to use JDBC rather than Spark metastore to query Hive
hive.jdbc {
driver = "com.cloudera.hive.jdbc41.HS2Driver"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ case class JdbcConfig(
connectionTimeoutSeconds: Option[Int] = None,
sanitizeDateTime: Boolean = true,
incorrectDecimalsAsString: Boolean = false,
optimizedExistQuery: Boolean = false,
extraOptions: Map[String, String] = Map.empty[String, String]
)

Expand All @@ -48,6 +49,7 @@ object JdbcConfig {
val JDBC_CONNECTION_TIMEOUT = "jdbc.connection.timeout"
val JDBC_SANITIZE_DATETIME = "jdbc.sanitize.datetime"
val JDBC_INCORRECT_PRECISION_AS_STRING = "jdbc.incorrect.precision.as.string"
val JDBC_OPTIMIZED_EXIST_QUERY = "jdbc.optimize.exist.query"
val JDBC_EXTRA_OPTIONS_PREFIX = "jdbc.option"

def load(conf: Config, parent: String = ""): JdbcConfig = {
Expand Down Expand Up @@ -78,6 +80,7 @@ object JdbcConfig {
connectionTimeoutSeconds = ConfigUtils.getOptionInt(conf, JDBC_CONNECTION_TIMEOUT),
sanitizeDateTime = ConfigUtils.getOptionBoolean(conf, JDBC_SANITIZE_DATETIME).getOrElse(true),
incorrectDecimalsAsString = ConfigUtils.getOptionBoolean(conf, JDBC_INCORRECT_PRECISION_AS_STRING).getOrElse(false),
optimizedExistQuery = ConfigUtils.getOptionBoolean(conf, JDBC_OPTIMIZED_EXIST_QUERY).getOrElse(false),
extraOptions = ConfigUtils.getExtraOptions(conf, JDBC_EXTRA_OPTIONS_PREFIX)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.TimeUtils
import za.co.absa.pramen.core.utils.hive.HiveHelper

import java.sql.Date
Expand Down Expand Up @@ -495,13 +496,20 @@ abstract class TaskRunnerBase(conf: Config,

val emoji = if (result.runStatus.isFailure) s"$FAILURE" else s"$WARNING"

val elapsedTimeStr = result.runInfo match {
case Some(runInfo) =>
val elapsedTimeMs = runInfo.finished.toEpochMilli - runInfo.started.toEpochMilli
s" Elapsed time: ${TimeUtils.prettyPrintElapsedTime(elapsedTimeMs)} seconds."
case None => ""
}

result.runStatus match {
case _: RunStatus.Succeeded =>
log.info(s"$SUCCESS $taskStr '${result.jobName}'$infoDateMsg has SUCCEEDED.")
log.info(s"$SUCCESS $taskStr '${result.jobName}'$infoDateMsg has SUCCEEDED.$elapsedTimeStr")
case RunStatus.ValidationFailed(ex) =>
log.error(s"$FAILURE $taskStr '${result.jobName}'$infoDateMsg has FAILED VALIDATION", ex)
log.error(s"$FAILURE $taskStr '${result.jobName}'$infoDateMsg has FAILED VALIDATION.$elapsedTimeStr", ex)
case RunStatus.Failed(ex) =>
log.error(s"$FAILURE $taskStr '${result.jobName}'$infoDateMsg has FAILED", ex)
log.error(s"$FAILURE $taskStr '${result.jobName}'$infoDateMsg has FAILED.$elapsedTimeStr", ex)
case RunStatus.MissingDependencies(_, tables) =>
log.error(s"$emoji $taskStr '${result.jobName}'$infoDateMsg has MISSING TABLES: ${tables.mkString(", ")}")
case RunStatus.FailedDependencies(_, deps) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig

import java.sql.{Connection, ResultSet, SQLException, SQLSyntaxErrorException}
import scala.util.Try
import scala.util.{Failure, Try}
import scala.util.control.NonFatal

class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector) extends QueryExecutor {
Expand All @@ -34,11 +34,22 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector) extends QueryExecutor
override def doesTableExist(dbName: Option[String], tableName: String): Boolean = {
val fullTableName = HiveHelper.getFullTable(dbName, tableName)

val query = s"SELECT 1 FROM $fullTableName WHERE 0 = 1"
val query = if (jdbcUrlSelector.jdbcConfig.optimizedExistQuery) {
s"DESCRIBE $fullTableName"
} else {
s"SELECT 1 FROM $fullTableName WHERE 0 = 1"
}

Try {
execute(query)
}.isSuccess
} match {
case Failure(ex) =>
log.info(s"The query resulted in an error, assuming the table $fullTableName does not exist" + ex.getMessage)
false
case _ =>
log.info(s"Table $fullTableName exists.")
true
}
}

@throws[SQLSyntaxErrorException]
Expand All @@ -58,7 +69,7 @@ class QueryExecutorJdbc(jdbcUrlSelector: JdbcUrlSelector) extends QueryExecutor

override def close(): Unit = if (connection != null) connection.close()

private[core] def executeActionOnConnection(action: Connection => Unit): Unit = {
private[core] def executeActionOnConnection(action: Connection => Boolean): Boolean = {
val currentConnection = getConnection(forceReconnect = false)
try {
action(currentConnection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class QueryExecutorSpark(implicit spark: SparkSession) extends QueryExecutor {
override def doesTableExist(dbName: Option[String], tableName: String): Boolean = {
val (database, table) = splitTableDatabase(dbName, tableName)

database match {
val exists = database match {
case Some(db) =>
if (spark.catalog.databaseExists(db)) {
spark.catalog.tableExists(db, table)
Expand All @@ -35,6 +35,18 @@ class QueryExecutorSpark(implicit spark: SparkSession) extends QueryExecutor {
case None =>
spark.catalog.tableExists(tableName)
}

val dbStr = database match {
case Some(db) => s"$db."
case None => ""
}

if (exists)
log.info(s"Table $dbStr$table exists.")
else
log.info(s"Table $dbStr$table does not exist.")

exists
}

@throws[AnalysisException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,26 @@ class QueryExecutorJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Rel
qe.close()
}

"return false if the table is not found in an optimized query" in {
val qe = new QueryExecutorJdbc(JdbcUrlSelector(jdbcConfig.copy(optimizedExistQuery = true)))

val exist = qe.doesTableExist(Option(database), "does_not_exist")

assert(!exist)

qe.close()
}

"return false if the table is not found in an optimized query without a database" in {
val qe = new QueryExecutorJdbc(JdbcUrlSelector(jdbcConfig.copy(optimizedExistQuery = true)))

val exist = qe.doesTableExist(None, "does_not_exist")

assert(!exist)

qe.close()
}

"handle retries" in {
val baseSelector = JdbcUrlSelector(jdbcConfig)
val (conn, _) = baseSelector.getWorkingConnection(1)
Expand All @@ -123,6 +143,7 @@ class QueryExecutorJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Rel
}
actionExecuted = true
assert(conn != null)
true
}

qe.close()
Expand Down Expand Up @@ -154,6 +175,7 @@ class QueryExecutorJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Rel
}
actionExecuted = true
assert(conn != null)
true
}
}

Expand Down Expand Up @@ -185,6 +207,7 @@ class QueryExecutorJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Rel
execution += 1
actionExecuted = true
assert(conn != null)
true
}
}

Expand Down
Loading