Skip to content

Commit

Permalink
Added an option to kill the thrift connection when Engine is consider…
Browse files Browse the repository at this point in the history
…ed dead
  • Loading branch information
risyomei committed Mar 8, 2023
1 parent 5318585 commit e40fcfc
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,14 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(120).toMillis)

val ENGINE_ALIVE_CLOSE_CONNETION: ConfigEntry[Boolean] =
buildConf("kyuubi.session.engine.alive.closeConnection")
.doc("Whether to kill the thrift connection to engine when engine is marked as no-alive. " +
"If set to true, the connection will be killed")
.version("1.7.0")
.booleanConf
.createWithDefault(false)

val ENGINE_OPEN_MAX_ATTEMPTS: ConfigEntry[Int] =
buildConf("kyuubi.session.engine.open.max.attempts")
.doc("The number of times an open engine will retry when encountering a special error.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.duration.Duration
import com.google.common.annotations.VisibleForTesting
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.{TBinaryProtocol, TProtocol}
import org.apache.thrift.transport.TSocket
import org.apache.thrift.transport.{TSocket, TTransportException}

import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
Expand All @@ -43,7 +43,8 @@ class KyuubiSyncThriftClient private (
protocol: TProtocol,
engineAliveProbeProtocol: Option[TProtocol],
engineAliveProbeInterval: Long,
engineAliveTimeout: Long)
engineAliveTimeout: Long,
engineAliveCloseConnection: Boolean = false)
extends TCLIService.Client(protocol) with Logging {

@volatile private var _remoteSessionHandle: TSessionHandle = _
Expand Down Expand Up @@ -105,6 +106,15 @@ class KyuubiSyncThriftClient private (
}
} else {
shutdownAsyncRequestExecutor()
if (engineAliveCloseConnection) {
try {
warn(s"Force closing transport to interrupt the protocol thread")
protocol.getTransport().close()
} catch {
case e: TTransportException =>
warn(s"Error closing transport: ${e.getMessage}")
}
}
}
}
}
Expand Down Expand Up @@ -453,6 +463,7 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
val aliveCloseConnection = conf.get(KyuubiConf.ENGINE_ALIVE_CLOSE_CONNETION)

val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout)

Expand All @@ -466,6 +477,7 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
tProtocol,
aliveProbeProtocol,
aliveProbeInterval,
aliveTimeout)
aliveTimeout,
aliveCloseConnection)
}
}

0 comments on commit e40fcfc

Please sign in to comment.