Skip to content

Commit

Permalink
Add an API to log the state of HBase connection cache (#56)
Browse files Browse the repository at this point in the history
* Add a API to log the state of HBase connection cache

* Add more 'assert' in the unit test
  • Loading branch information
weiqingy committed Nov 6, 2016
1 parent 0a0cc25 commit 82d7aa0
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ private[spark] object HBaseConnectionCache extends Logging {
// A hashmap of Spark-HBase connections. Key is HBaseConnectionKey.
val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()

val cacheStat = HBaseConnectionCacheStat(0, 0, 0)

// in milliseconds
private final val DEFAULT_TIME_OUT: Long = SparkHBaseConf.connectionCloseDelay
private var timeout = DEFAULT_TIME_OUT
Expand All @@ -57,6 +59,13 @@ private[spark] object HBaseConnectionCache extends Logging {
housekeepingThread.setDaemon(true)
housekeepingThread.start()

def getStat: HBaseConnectionCacheStat = {
connectionMap.synchronized {
cacheStat.numActiveConnections = connectionMap.size
cacheStat.copy()
}
}

def close(): Unit = {
try {
connectionMap.synchronized {
Expand Down Expand Up @@ -99,7 +108,9 @@ private[spark] object HBaseConnectionCache extends Logging {
connectionMap.synchronized {
if(closed)
return null
val sc = connectionMap.getOrElseUpdate(key, new SmartConnection(conn))
cacheStat.numTotalRequests += 1
val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1
new SmartConnection(conn)})
sc.refCount += 1
sc
}
Expand Down Expand Up @@ -135,13 +146,13 @@ private[hbase] case class SmartConnection (
}

/**
* Denotes a unique key to an HBase Connection instance.
* Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'.
*
* In essence, this class captures the properties in Configuration
* that may be used in the process of establishing a connection.
*
*/
* Denotes a unique key to an HBase Connection instance.
* Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'.
*
* In essence, this class captures the properties in Configuration
* that may be used in the process of establishing a connection.
*
*/
class HBaseConnectionKey(c: Configuration) extends Logging {
val conf: Configuration = c
val CONNECTION_PROPERTIES: Array[String] = Array[String](
Expand Down Expand Up @@ -239,4 +250,13 @@ class HBaseConnectionKey(c: Configuration) extends Logging {
}
}


/**
* To log the state of [[HBaseConnectionCache]]
*
* @param numTotalRequests number of total connection requests to the cache
* @param numActualConnectionsCreated number of actual HBase connections the cache ever created
* @param numActiveConnections number of current alive HBase connections the cache is holding
*/
case class HBaseConnectionCacheStat(var numTotalRequests: Long,
var numActualConnectionsCreated: Long,
var numActiveConnections: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import scala.util.Random
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
Connection, BufferedMutatorParams, Admin}
Connection, BufferedMutatorParams, Admin}
import org.apache.spark.Logging
import org.apache.spark.sql.execution.datasources.hbase._

case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (null) {
case class HBaseConnectionKeyMocker(confId: Int) extends HBaseConnectionKey(null) {
override def hashCode: Int = {
confId
}

override def equals(obj: Any): Boolean = {
if(!obj.isInstanceOf[HBaseConnectionKeyMocker])
if (!obj.isInstanceOf[HBaseConnectionKeyMocker])
false
else
confId == obj.asInstanceOf[HBaseConnectionKeyMocker].confId
Expand All @@ -44,12 +44,18 @@ case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (nu
class ConnectionMocker extends Connection {
var isClosed: Boolean = false

def getRegionLocator (tableName: TableName): RegionLocator = null
def getRegionLocator(tableName: TableName): RegionLocator = null

def getConfiguration: Configuration = null
def getTable (tableName: TableName): Table = null

def getTable(tableName: TableName): Table = null

def getTable(tableName: TableName, pool: ExecutorService): Table = null
def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
def getBufferedMutator (tableName: TableName): BufferedMutator = null

def getBufferedMutator(params: BufferedMutatorParams): BufferedMutator = null

def getBufferedMutator(tableName: TableName): BufferedMutator = null

def getAdmin: Admin = null

def close(): Unit = {
Expand All @@ -59,6 +65,7 @@ class ConnectionMocker extends Connection {
}

def isAborted: Boolean = true

def abort(why: String, e: Throwable) = {}
}

Expand All @@ -77,7 +84,15 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
testWithPressureWithClose()
}

def cleanEnv() {
HBaseConnectionCache.connectionMap.clear()
HBaseConnectionCache.cacheStat.numActiveConnections = 0
HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0
HBaseConnectionCache.cacheStat.numTotalRequests = 0
}

def testBasic() {
cleanEnv()
HBaseConnectionCache.setTimeout(1 * 1000)

val connKeyMocker1 = new HBaseConnectionKeyMocker(1)
Expand All @@ -86,41 +101,48 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {

val c1 = HBaseConnectionCache
.getConnection(connKeyMocker1, new ConnectionMocker)

assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.getStat.numTotalRequests === 1)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)

val c1a = HBaseConnectionCache
.getConnection(connKeyMocker1a, new ConnectionMocker)

HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 1)
}
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.getStat.numTotalRequests === 2)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)

val c2 = HBaseConnectionCache
.getConnection(connKeyMocker2, new ConnectionMocker)

HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2)
}
assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numTotalRequests === 3)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)

c1.close()
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2)
}
assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)

c1a.close()
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2)
}
assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)

Thread.sleep(3 * 1000) // Leave housekeeping thread enough time
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.connectionMap.iterator.next()._1
.asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
}
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.connectionMap.iterator.next()._1
.asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)

c2.close()
}

def testWithPressureWithoutClose() {
cleanEnv()

class TestThread extends Runnable {
override def run() {
for (i <- 0 to 999) {
Expand All @@ -143,25 +165,30 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
}

Thread.sleep(1000)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 10)
var totalRc : Int = 0
HBaseConnectionCache.connectionMap.foreach {
x => totalRc += x._2.refCount
}
assert(totalRc === 100 * 1000)
HBaseConnectionCache.connectionMap.foreach {
x => {
x._2.refCount = 0
x._2.timestamp = System.currentTimeMillis() - 1000
}
assert(HBaseConnectionCache.connectionMap.size === 10)
assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
var totalRc: Int = 0
HBaseConnectionCache.connectionMap.foreach {
x => totalRc += x._2.refCount
}
assert(totalRc === 100 * 1000)
HBaseConnectionCache.connectionMap.foreach {
x => {
x._2.refCount = 0
x._2.timestamp = System.currentTimeMillis() - 1000
}
}
Thread.sleep(1000)
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
}

def testWithPressureWithClose() {
cleanEnv()

class TestThread extends Runnable {
override def run() {
for (i <- 0 to 999) {
Expand All @@ -185,13 +212,14 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
case e: InterruptedException => println(e.getMessage)
}

HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 10)
}
assert(HBaseConnectionCache.connectionMap.size === 10)
assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 10)

Thread.sleep(6 * 1000)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 0)
}
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
}
}

0 comments on commit 82d7aa0

Please sign in to comment.