From 82d7aa045b6fd2b2aabc5a7162d7620031eda234 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 6 Nov 2016 08:59:56 -0800 Subject: [PATCH] Add an API to log the state of HBase connection cache (#56) * Add a API to log the state of HBase connection cache * Add more 'assert' in the unit test --- .../hbase/HBaseConnectionCache.scala | 38 ++++-- .../spark/sql/HBaseConnectionCacheSuite.scala | 112 +++++++++++------- 2 files changed, 99 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseConnectionCache.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseConnectionCache.scala index 101503be..3061a9fe 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseConnectionCache.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseConnectionCache.scala @@ -32,6 +32,8 @@ private[spark] object HBaseConnectionCache extends Logging { // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey. val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]() + val cacheStat = HBaseConnectionCacheStat(0, 0, 0) + // in milliseconds private final val DEFAULT_TIME_OUT: Long = SparkHBaseConf.connectionCloseDelay private var timeout = DEFAULT_TIME_OUT @@ -57,6 +59,13 @@ private[spark] object HBaseConnectionCache extends Logging { housekeepingThread.setDaemon(true) housekeepingThread.start() + def getStat: HBaseConnectionCacheStat = { + connectionMap.synchronized { + cacheStat.numActiveConnections = connectionMap.size + cacheStat.copy() + } + } + def close(): Unit = { try { connectionMap.synchronized { @@ -99,7 +108,9 @@ private[spark] object HBaseConnectionCache extends Logging { connectionMap.synchronized { if(closed) return null - val sc = connectionMap.getOrElseUpdate(key, new SmartConnection(conn)) + cacheStat.numTotalRequests += 1 + val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1 + new SmartConnection(conn)}) sc.refCount += 1 sc } @@ -135,13 +146,13 @@ private[hbase] case class SmartConnection ( } /** - * Denotes a unique key to an HBase Connection instance. - * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'. - * - * In essence, this class captures the properties in Configuration - * that may be used in the process of establishing a connection. - * - */ + * Denotes a unique key to an HBase Connection instance. + * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'. + * + * In essence, this class captures the properties in Configuration + * that may be used in the process of establishing a connection. + * + */ class HBaseConnectionKey(c: Configuration) extends Logging { val conf: Configuration = c val CONNECTION_PROPERTIES: Array[String] = Array[String]( @@ -239,4 +250,13 @@ class HBaseConnectionKey(c: Configuration) extends Logging { } } - +/** + * To log the state of [[HBaseConnectionCache]] + * + * @param numTotalRequests number of total connection requests to the cache + * @param numActualConnectionsCreated number of actual HBase connections the cache ever created + * @param numActiveConnections number of current alive HBase connections the cache is holding + */ +case class HBaseConnectionCacheStat(var numTotalRequests: Long, + var numActualConnectionsCreated: Long, + var numActiveConnections: Long) diff --git a/core/src/test/scala/org/apache/spark/sql/HBaseConnectionCacheSuite.scala b/core/src/test/scala/org/apache/spark/sql/HBaseConnectionCacheSuite.scala index df120360..10e4d42f 100644 --- a/core/src/test/scala/org/apache/spark/sql/HBaseConnectionCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/HBaseConnectionCacheSuite.scala @@ -24,17 +24,17 @@ import scala.util.Random import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator, - Connection, BufferedMutatorParams, Admin} +Connection, BufferedMutatorParams, Admin} import org.apache.spark.Logging import org.apache.spark.sql.execution.datasources.hbase._ -case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (null) { +case class HBaseConnectionKeyMocker(confId: Int) extends HBaseConnectionKey(null) { override def hashCode: Int = { confId } override def equals(obj: Any): Boolean = { - if(!obj.isInstanceOf[HBaseConnectionKeyMocker]) + if (!obj.isInstanceOf[HBaseConnectionKeyMocker]) false else confId == obj.asInstanceOf[HBaseConnectionKeyMocker].confId @@ -44,12 +44,18 @@ case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (nu class ConnectionMocker extends Connection { var isClosed: Boolean = false - def getRegionLocator (tableName: TableName): RegionLocator = null + def getRegionLocator(tableName: TableName): RegionLocator = null + def getConfiguration: Configuration = null - def getTable (tableName: TableName): Table = null + + def getTable(tableName: TableName): Table = null + def getTable(tableName: TableName, pool: ExecutorService): Table = null - def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null - def getBufferedMutator (tableName: TableName): BufferedMutator = null + + def getBufferedMutator(params: BufferedMutatorParams): BufferedMutator = null + + def getBufferedMutator(tableName: TableName): BufferedMutator = null + def getAdmin: Admin = null def close(): Unit = { @@ -59,6 +65,7 @@ class ConnectionMocker extends Connection { } def isAborted: Boolean = true + def abort(why: String, e: Throwable) = {} } @@ -77,7 +84,15 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { testWithPressureWithClose() } + def cleanEnv() { + HBaseConnectionCache.connectionMap.clear() + HBaseConnectionCache.cacheStat.numActiveConnections = 0 + HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0 + HBaseConnectionCache.cacheStat.numTotalRequests = 0 + } + def testBasic() { + cleanEnv() HBaseConnectionCache.setTimeout(1 * 1000) val connKeyMocker1 = new HBaseConnectionKeyMocker(1) @@ -86,41 +101,48 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { val c1 = HBaseConnectionCache .getConnection(connKeyMocker1, new ConnectionMocker) + + assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.getStat.numTotalRequests === 1) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) + val c1a = HBaseConnectionCache .getConnection(connKeyMocker1a, new ConnectionMocker) - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 1) - } + assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.getStat.numTotalRequests === 2) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) val c2 = HBaseConnectionCache .getConnection(connKeyMocker2, new ConnectionMocker) - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 2) - } + assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numTotalRequests === 3) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) c1.close() - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 2) - } + assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) c1a.close() - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 2) - } + assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) Thread.sleep(3 * 1000) // Leave housekeeping thread enough time - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 1) - assert(HBaseConnectionCache.connectionMap.iterator.next()._1 - .asInstanceOf[HBaseConnectionKeyMocker].confId === 2) - } + assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.connectionMap.iterator.next()._1 + .asInstanceOf[HBaseConnectionKeyMocker].confId === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) c2.close() } def testWithPressureWithoutClose() { + cleanEnv() + class TestThread extends Runnable { override def run() { for (i <- 0 to 999) { @@ -143,25 +165,30 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { } Thread.sleep(1000) - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 10) - var totalRc : Int = 0 - HBaseConnectionCache.connectionMap.foreach { - x => totalRc += x._2.refCount - } - assert(totalRc === 100 * 1000) - HBaseConnectionCache.connectionMap.foreach { - x => { - x._2.refCount = 0 - x._2.timestamp = System.currentTimeMillis() - 1000 - } + assert(HBaseConnectionCache.connectionMap.size === 10) + assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 10) + var totalRc: Int = 0 + HBaseConnectionCache.connectionMap.foreach { + x => totalRc += x._2.refCount + } + assert(totalRc === 100 * 1000) + HBaseConnectionCache.connectionMap.foreach { + x => { + x._2.refCount = 0 + x._2.timestamp = System.currentTimeMillis() - 1000 } } Thread.sleep(1000) assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 0) } def testWithPressureWithClose() { + cleanEnv() + class TestThread extends Runnable { override def run() { for (i <- 0 to 999) { @@ -185,13 +212,14 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { case e: InterruptedException => println(e.getMessage) } - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 10) - } + assert(HBaseConnectionCache.connectionMap.size === 10) + assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 10) Thread.sleep(6 * 1000) - HBaseConnectionCache.connectionMap.synchronized { - assert(HBaseConnectionCache.connectionMap.size === 0) - } + assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 0) } }