From fabbde4ab56e16f5fb5932e7a975d5682c76cccf Mon Sep 17 00:00:00 2001 From: "Luan, Xuedong" Date: Fri, 21 Aug 2020 14:09:33 +0800 Subject: [PATCH] =?UTF-8?q?[CARMEL-3524]=20Create=20ReadWriteLock=20for=20?= =?UTF-8?q?each=20database=20in=20HiveExternalC=E2=80=A6=20(#24)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [CARMEL-3524] Create ReadWriteLock for each database in HiveExternalCatalog * fix code style check * fix comment --- .../spark/sql/hive/HiveExternalCatalog.scala | 100 +++++++++++------- 1 file changed, 60 insertions(+), 40 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 2faf42028f3a2..3d12017fd058e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -21,6 +21,8 @@ import java.io.IOException import java.lang.reflect.InvocationTargetException import java.util import java.util.Locale +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable import scala.util.control.NonFatal @@ -68,6 +70,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat HiveUtils.newClientForMetadata(conf, hadoopConf) } + private val clientLocks = new ConcurrentHashMap[String, ReentrantReadWriteLock]() + // Exceptions thrown by the hive client that we would like to wrap private val clientExceptions = Set( classOf[HiveException].getCanonicalName, @@ -94,8 +98,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * Run some code involving `client` in a [[synchronized]] block and wrap certain * exceptions thrown in the process in [[AnalysisException]]. */ - private def withClient[T](body: => T): T = synchronized { + private def withClient[T](write: Boolean, db: String)(body: => T): T = { + val lock = clientLocks.computeIfAbsent(db, (_: String) => new ReentrantReadWriteLock()) try { + if (write) { + lock.writeLock().lock() + } else { + lock.readLock().lock() + } body } catch { case NonFatal(exception) if isClientException(exception) => @@ -107,6 +117,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } throw new AnalysisException( e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) + } finally { + if (write) { + lock.writeLock().unlock() + } else { + lock.readLock().unlock() + } } } @@ -186,14 +202,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createDatabase( dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClient { + ignoreIfExists: Boolean): Unit = withClient(true, dbDefinition.name) { client.createDatabase(dbDefinition, ignoreIfExists) } override def dropDatabase( db: String, ignoreIfNotExists: Boolean, - cascade: Boolean): Unit = withClient { + cascade: Boolean): Unit = withClient(true, db) { client.dropDatabase(db, ignoreIfNotExists, cascade) } @@ -203,7 +219,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * * Note: As of now, this only supports altering database properties! */ - override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { + override def alterDatabase(dbDefinition: CatalogDatabase): Unit = + withClient(true, dbDefinition.name) { val existingDb = getDatabase(dbDefinition.name) if (existingDb.properties == dbDefinition.properties) { logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " + @@ -213,23 +230,23 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterDatabase(dbDefinition) } - override def getDatabase(db: String): CatalogDatabase = withClient { + override def getDatabase(db: String): CatalogDatabase = withClient(false, db) { client.getDatabase(db) } - override def databaseExists(db: String): Boolean = withClient { + override def databaseExists(db: String): Boolean = withClient(false, db) { client.databaseExists(db) } - override def listDatabases(): Seq[String] = withClient { + override def listDatabases(): Seq[String] = withClient(false, "") { client.listDatabases("*") } - override def listDatabases(pattern: String): Seq[String] = withClient { + override def listDatabases(pattern: String): Seq[String] = withClient(false, "") { client.listDatabases(pattern) } - override def setCurrentDatabase(db: String): Unit = withClient { + override def setCurrentDatabase(db: String): Unit = withClient(false, "") { client.setCurrentDatabase(db) } @@ -239,7 +256,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createTable( tableDefinition: CatalogTable, - ignoreIfExists: Boolean): Unit = withClient { + ignoreIfExists: Boolean): Unit = withClient(true, tableDefinition.database) { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get val table = tableDefinition.identifier.table @@ -511,7 +528,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, ignoreIfNotExists: Boolean, - purge: Boolean): Unit = withClient { + purge: Boolean): Unit = withClient(true, db) { requireDbExists(db) client.dropTable(db, table, ignoreIfNotExists, purge) } @@ -519,7 +536,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def renameTable( db: String, oldName: String, - newName: String): Unit = withClient { + newName: String): Unit = withClient(true, db) { val rawTable = getRawTable(db, oldName) // Note that Hive serde tables don't use path option in storage properties to store the value @@ -567,7 +584,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * Note: As of now, this doesn't support altering table schema, partition column names and bucket * specification. We will ignore them even if users do specify different values for these fields. */ - override def alterTable(tableDefinition: CatalogTable): Unit = withClient { + override def alterTable(tableDefinition: CatalogTable): Unit = + withClient(true, tableDefinition.database) { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) @@ -666,7 +684,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterTableDataSchema( db: String, table: String, - newDataSchema: StructType): Unit = withClient { + newDataSchema: StructType): Unit = withClient(true, db) { requireTableExists(db, table) val oldTable = getTable(db, table) verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema) @@ -698,7 +716,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterTableStats( db: String, table: String, - stats: Option[CatalogStatistics]): Unit = withClient { + stats: Option[CatalogStatistics]): Unit = withClient(true, db) { requireTableExists(db, table) val rawTable = getRawTable(db, table) @@ -715,11 +733,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterTable(updatedTable) } - override def getTable(db: String, table: String): CatalogTable = withClient { + override def getTable(db: String, table: String): CatalogTable = withClient(false, db) { restoreTableMetadata(getRawTable(db, table)) } - override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] = withClient { + override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] = + withClient(false, db) { getRawTablesByNames(db, tables).map(restoreTableMetadata) } @@ -847,21 +866,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties = table.properties.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_))) } - override def tableExists(db: String, table: String): Boolean = withClient { + override def tableExists(db: String, table: String): Boolean = withClient(false, db) { client.tableExists(db, table) } - override def listTables(db: String): Seq[String] = withClient { + override def listTables(db: String): Seq[String] = withClient(false, db) { requireDbExists(db) client.listTables(db) } - override def listTables(db: String, pattern: String): Seq[String] = withClient { + override def listTables(db: String, pattern: String): Seq[String] = withClient(false, db) { requireDbExists(db) client.listTables(db, pattern) } - override def listViews(db: String, pattern: String): Seq[String] = withClient { + override def listViews(db: String, pattern: String): Seq[String] = withClient(false, db) { requireDbExists(db) client.listTablesByType(db, pattern, CatalogTableType.VIEW) } @@ -871,7 +890,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, loadPath: String, isOverwrite: Boolean, - isSrcLocal: Boolean): Unit = withClient { + isSrcLocal: Boolean): Unit = withClient(false, db) { requireTableExists(db, table) client.loadTable( loadPath, @@ -887,7 +906,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat partition: TablePartitionSpec, isOverwrite: Boolean, inheritTableSpecs: Boolean, - isSrcLocal: Boolean): Unit = withClient { + isSrcLocal: Boolean): Unit = withClient(false, db) { requireTableExists(db, table) val orderedPartitionSpec = new util.LinkedHashMap[String, String]() @@ -917,7 +936,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat loadPath: String, partition: TablePartitionSpec, replace: Boolean, - numDP: Int): Unit = withClient { + numDP: Int): Unit = withClient(false, db) { requireTableExists(db, table) val orderedPartitionSpec = new util.LinkedHashMap[String, String]() @@ -982,7 +1001,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, parts: Seq[CatalogTablePartition], - ignoreIfExists: Boolean): Unit = withClient { + ignoreIfExists: Boolean): Unit = withClient(true, db) { requireTableExists(db, table) val tableMeta = getTable(db, table) @@ -1008,7 +1027,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, - retainData: Boolean): Unit = withClient { + retainData: Boolean): Unit = withClient(true, db) { requireTableExists(db, table) client.dropPartitions( db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData) @@ -1018,7 +1037,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, specs: Seq[TablePartitionSpec], - newSpecs: Seq[TablePartitionSpec]): Unit = withClient { + newSpecs: Seq[TablePartitionSpec]): Unit = withClient(true, db) { client.renamePartitions( db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec)) @@ -1145,7 +1164,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterPartitions( db: String, table: String, - newParts: Seq[CatalogTablePartition]): Unit = withClient { + newParts: Seq[CatalogTablePartition]): Unit = withClient(true, db) { val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) val rawTable = getRawTable(db, table) @@ -1166,7 +1185,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def getPartition( db: String, table: String, - spec: TablePartitionSpec): CatalogTablePartition = withClient { + spec: TablePartitionSpec): CatalogTablePartition = withClient(false, db) { val part = client.getPartition(db, table, lowerCasePartitionSpec(spec)) restorePartitionMetadata(part, getTable(db, table)) } @@ -1204,7 +1223,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def getPartitionOption( db: String, table: String, - spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient { + spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient(false, db) { client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part => restorePartitionMetadata(part, getTable(db, table)) } @@ -1216,7 +1235,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def listPartitionNames( db: String, table: String, - partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient { + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient(false, db) { val catalogTable = getTable(db, table) val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName) val clientPartitionNames = @@ -1237,7 +1256,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def listPartitions( db: String, table: String, - partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient { + partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = + withClient(false, db) { val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) @@ -1258,7 +1278,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, predicates: Seq[Expression], - defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient { + defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient(false, db) { val rawTable = getRawTable(db, table) val catalogTable = restoreTableMetadata(rawTable) @@ -1277,7 +1297,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createFunction( db: String, - funcDefinition: CatalogFunction): Unit = withClient { + funcDefinition: CatalogFunction): Unit = withClient(true, db) { requireDbExists(db) // Hive's metastore is case insensitive. However, Hive's createFunction does // not normalize the function name (unlike the getFunction part). So, @@ -1288,13 +1308,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier)) } - override def dropFunction(db: String, name: String): Unit = withClient { + override def dropFunction(db: String, name: String): Unit = withClient(true, db) { requireFunctionExists(db, name) client.dropFunction(db, name) } override def alterFunction( - db: String, funcDefinition: CatalogFunction): Unit = withClient { + db: String, funcDefinition: CatalogFunction): Unit = withClient(true, db) { requireDbExists(db) val functionName = funcDefinition.identifier.funcName.toLowerCase(Locale.ROOT) requireFunctionExists(db, functionName) @@ -1305,23 +1325,23 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def renameFunction( db: String, oldName: String, - newName: String): Unit = withClient { + newName: String): Unit = withClient(true, db) { requireFunctionExists(db, oldName) requireFunctionNotExists(db, newName) client.renameFunction(db, oldName, newName) } - override def getFunction(db: String, funcName: String): CatalogFunction = withClient { + override def getFunction(db: String, funcName: String): CatalogFunction = withClient(false, db) { requireFunctionExists(db, funcName) client.getFunction(db, funcName) } - override def functionExists(db: String, funcName: String): Boolean = withClient { + override def functionExists(db: String, funcName: String): Boolean = withClient(false, db) { requireDbExists(db) client.functionExists(db, funcName) } - override def listFunctions(db: String, pattern: String): Seq[String] = withClient { + override def listFunctions(db: String, pattern: String): Seq[String] = withClient(false, db) { requireDbExists(db) client.listFunctions(db, pattern) }