Skip to content

Commit

Permalink
[CARMEL-3524] Create ReadWriteLock for each database in HiveExternalC… (
Browse files Browse the repository at this point in the history
#24)

* [CARMEL-3524] Create ReadWriteLock for each database in HiveExternalCatalog

* fix code style check

* fix comment
  • Loading branch information
Luan, Xuedong authored and allenma committed Aug 21, 2020
1 parent c173b47 commit fabbde4
Showing 1 changed file with 60 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.util
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection.mutable
import scala.util.control.NonFatal
Expand Down Expand Up @@ -68,6 +70,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
HiveUtils.newClientForMetadata(conf, hadoopConf)
}

private val clientLocks = new ConcurrentHashMap[String, ReentrantReadWriteLock]()

// Exceptions thrown by the hive client that we would like to wrap
private val clientExceptions = Set(
classOf[HiveException].getCanonicalName,
Expand All @@ -94,8 +98,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* Run some code involving `client` in a [[synchronized]] block and wrap certain
* exceptions thrown in the process in [[AnalysisException]].
*/
private def withClient[T](body: => T): T = synchronized {
private def withClient[T](write: Boolean, db: String)(body: => T): T = {
val lock = clientLocks.computeIfAbsent(db, (_: String) => new ReentrantReadWriteLock())
try {
if (write) {
lock.writeLock().lock()
} else {
lock.readLock().lock()
}
body
} catch {
case NonFatal(exception) if isClientException(exception) =>
Expand All @@ -107,6 +117,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
} finally {
if (write) {
lock.writeLock().unlock()
} else {
lock.readLock().unlock()
}
}
}

Expand Down Expand Up @@ -186,14 +202,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

override def createDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withClient {
ignoreIfExists: Boolean): Unit = withClient(true, dbDefinition.name) {
client.createDatabase(dbDefinition, ignoreIfExists)
}

override def dropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = withClient {
cascade: Boolean): Unit = withClient(true, db) {
client.dropDatabase(db, ignoreIfNotExists, cascade)
}

Expand All @@ -203,7 +219,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
*
* Note: As of now, this only supports altering database properties!
*/
override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
override def alterDatabase(dbDefinition: CatalogDatabase): Unit =
withClient(true, dbDefinition.name) {
val existingDb = getDatabase(dbDefinition.name)
if (existingDb.properties == dbDefinition.properties) {
logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
Expand All @@ -213,23 +230,23 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.alterDatabase(dbDefinition)
}

override def getDatabase(db: String): CatalogDatabase = withClient {
override def getDatabase(db: String): CatalogDatabase = withClient(false, db) {
client.getDatabase(db)
}

override def databaseExists(db: String): Boolean = withClient {
override def databaseExists(db: String): Boolean = withClient(false, db) {
client.databaseExists(db)
}

override def listDatabases(): Seq[String] = withClient {
override def listDatabases(): Seq[String] = withClient(false, "") {
client.listDatabases("*")
}

override def listDatabases(pattern: String): Seq[String] = withClient {
override def listDatabases(pattern: String): Seq[String] = withClient(false, "") {
client.listDatabases(pattern)
}

override def setCurrentDatabase(db: String): Unit = withClient {
override def setCurrentDatabase(db: String): Unit = withClient(false, "") {
client.setCurrentDatabase(db)
}

Expand All @@ -239,7 +256,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

override def createTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = withClient {
ignoreIfExists: Boolean): Unit = withClient(true, tableDefinition.database) {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
val table = tableDefinition.identifier.table
Expand Down Expand Up @@ -511,15 +528,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = withClient {
purge: Boolean): Unit = withClient(true, db) {
requireDbExists(db)
client.dropTable(db, table, ignoreIfNotExists, purge)
}

override def renameTable(
db: String,
oldName: String,
newName: String): Unit = withClient {
newName: String): Unit = withClient(true, db) {
val rawTable = getRawTable(db, oldName)

// Note that Hive serde tables don't use path option in storage properties to store the value
Expand Down Expand Up @@ -567,7 +584,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* Note: As of now, this doesn't support altering table schema, partition column names and bucket
* specification. We will ignore them even if users do specify different values for these fields.
*/
override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
override def alterTable(tableDefinition: CatalogTable): Unit =
withClient(true, tableDefinition.database) {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
Expand Down Expand Up @@ -666,7 +684,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterTableDataSchema(
db: String,
table: String,
newDataSchema: StructType): Unit = withClient {
newDataSchema: StructType): Unit = withClient(true, db) {
requireTableExists(db, table)
val oldTable = getTable(db, table)
verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema)
Expand Down Expand Up @@ -698,7 +716,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterTableStats(
db: String,
table: String,
stats: Option[CatalogStatistics]): Unit = withClient {
stats: Option[CatalogStatistics]): Unit = withClient(true, db) {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)

Expand All @@ -715,11 +733,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.alterTable(updatedTable)
}

override def getTable(db: String, table: String): CatalogTable = withClient {
override def getTable(db: String, table: String): CatalogTable = withClient(false, db) {
restoreTableMetadata(getRawTable(db, table))
}

override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] = withClient {
override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] =
withClient(false, db) {
getRawTablesByNames(db, tables).map(restoreTableMetadata)
}

Expand Down Expand Up @@ -847,21 +866,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
properties = table.properties.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)))
}

override def tableExists(db: String, table: String): Boolean = withClient {
override def tableExists(db: String, table: String): Boolean = withClient(false, db) {
client.tableExists(db, table)
}

override def listTables(db: String): Seq[String] = withClient {
override def listTables(db: String): Seq[String] = withClient(false, db) {
requireDbExists(db)
client.listTables(db)
}

override def listTables(db: String, pattern: String): Seq[String] = withClient {
override def listTables(db: String, pattern: String): Seq[String] = withClient(false, db) {
requireDbExists(db)
client.listTables(db, pattern)
}

override def listViews(db: String, pattern: String): Seq[String] = withClient {
override def listViews(db: String, pattern: String): Seq[String] = withClient(false, db) {
requireDbExists(db)
client.listTablesByType(db, pattern, CatalogTableType.VIEW)
}
Expand All @@ -871,7 +890,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
loadPath: String,
isOverwrite: Boolean,
isSrcLocal: Boolean): Unit = withClient {
isSrcLocal: Boolean): Unit = withClient(false, db) {
requireTableExists(db, table)
client.loadTable(
loadPath,
Expand All @@ -887,7 +906,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
partition: TablePartitionSpec,
isOverwrite: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = withClient {
isSrcLocal: Boolean): Unit = withClient(false, db) {
requireTableExists(db, table)

val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
Expand Down Expand Up @@ -917,7 +936,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
loadPath: String,
partition: TablePartitionSpec,
replace: Boolean,
numDP: Int): Unit = withClient {
numDP: Int): Unit = withClient(false, db) {
requireTableExists(db, table)

val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
Expand Down Expand Up @@ -982,7 +1001,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withClient {
ignoreIfExists: Boolean): Unit = withClient(true, db) {
requireTableExists(db, table)

val tableMeta = getTable(db, table)
Expand All @@ -1008,7 +1027,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = withClient {
retainData: Boolean): Unit = withClient(true, db) {
requireTableExists(db, table)
client.dropPartitions(
db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
Expand All @@ -1018,7 +1037,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
newSpecs: Seq[TablePartitionSpec]): Unit = withClient(true, db) {
client.renamePartitions(
db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))

Expand Down Expand Up @@ -1145,7 +1164,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterPartitions(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
newParts: Seq[CatalogTablePartition]): Unit = withClient(true, db) {
val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))

val rawTable = getRawTable(db, table)
Expand All @@ -1166,7 +1185,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def getPartition(
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
spec: TablePartitionSpec): CatalogTablePartition = withClient(false, db) {
val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
restorePartitionMetadata(part, getTable(db, table))
}
Expand Down Expand Up @@ -1204,7 +1223,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def getPartitionOption(
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient(false, db) {
client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
restorePartitionMetadata(part, getTable(db, table))
}
Expand All @@ -1216,7 +1235,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def listPartitionNames(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient {
partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient(false, db) {
val catalogTable = getTable(db, table)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
val clientPartitionNames =
Expand All @@ -1237,7 +1256,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def listPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] =
withClient(false, db) {
val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
Expand All @@ -1258,7 +1278,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
predicates: Seq[Expression],
defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient {
defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient(false, db) {
val rawTable = getRawTable(db, table)
val catalogTable = restoreTableMetadata(rawTable)

Expand All @@ -1277,7 +1297,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

override def createFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
funcDefinition: CatalogFunction): Unit = withClient(true, db) {
requireDbExists(db)
// Hive's metastore is case insensitive. However, Hive's createFunction does
// not normalize the function name (unlike the getFunction part). So,
Expand All @@ -1288,13 +1308,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}

override def dropFunction(db: String, name: String): Unit = withClient {
override def dropFunction(db: String, name: String): Unit = withClient(true, db) {
requireFunctionExists(db, name)
client.dropFunction(db, name)
}

override def alterFunction(
db: String, funcDefinition: CatalogFunction): Unit = withClient {
db: String, funcDefinition: CatalogFunction): Unit = withClient(true, db) {
requireDbExists(db)
val functionName = funcDefinition.identifier.funcName.toLowerCase(Locale.ROOT)
requireFunctionExists(db, functionName)
Expand All @@ -1305,23 +1325,23 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def renameFunction(
db: String,
oldName: String,
newName: String): Unit = withClient {
newName: String): Unit = withClient(true, db) {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
client.renameFunction(db, oldName, newName)
}

override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
override def getFunction(db: String, funcName: String): CatalogFunction = withClient(false, db) {
requireFunctionExists(db, funcName)
client.getFunction(db, funcName)
}

override def functionExists(db: String, funcName: String): Boolean = withClient {
override def functionExists(db: String, funcName: String): Boolean = withClient(false, db) {
requireDbExists(db)
client.functionExists(db, funcName)
}

override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
override def listFunctions(db: String, pattern: String): Seq[String] = withClient(false, db) {
requireDbExists(db)
client.listFunctions(db, pattern)
}
Expand Down

0 comments on commit fabbde4

Please sign in to comment.