Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21428] Turn IsolatedClientLoader off while using builtin Hive jars for reusing CliSessionState #18648

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.IOException
import java.io.{File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
Expand Down Expand Up @@ -136,8 +136,14 @@ class SparkHadoopUtil extends Logging {

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }

def loginUserFromKeytab(principalName: String, keytabFilename: String) {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
} else {
logInfo("Attempting to login to Kerberos" +
s" using principal: ${principalName} and keytab: ${keytabFilename}")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
}

/**
Expand Down
19 changes: 7 additions & 12 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -581,18 +581,13 @@ object SparkSubmit extends CommandLineUtils {
if (clusterManager == YARN || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver

import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveUtils

class HiveCliSessionStateSuite extends SparkFunSuite {

def withSessionClear(f: () => Unit): Unit = {
try f finally SessionState.detachSession()
}

test("CliSessionState will be reused") {
withSessionClear { () =>
val hiveConf = new HiveConf(classOf[SessionState])
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
case (key, value) => hiveConf.set(key, value)
}
val sessionState: SessionState = new CliSessionState(hiveConf)
SessionState.start(sessionState)
val s1 = SessionState.get
val sparkConf = new SparkConf()
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val s2 = HiveUtils.newClientForMetadata(sparkConf, hadoopConf).getState
assert(s1 === s2)
assert(s2.isInstanceOf[CliSessionState])
}
}

test("SessionState will not be reused") {
withSessionClear { () =>
val sparkConf = new SparkConf()
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
case (key, value) => hadoopConf.set(key, value)
}
val hiveClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf)
val s1 = hiveClient.getState
val s2 = hiveClient.newSession().getState
assert(s1 !== s2)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo

Expand Down Expand Up @@ -229,6 +230,22 @@ private[spark] object HiveUtils extends Logging {
}.toMap
}

/**
* Check current Thread's SessionState type
* @return true when SessionState.get returns an instance of CliSessionState,
* false when it gets non-CliSessionState instance or null
*/
def isCliSessionState(): Boolean = {
val state = SessionState.get
var temp: Class[_] = if (state != null) state.getClass else null
var found = false
while (temp != null && !found) {
found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
temp = temp.getSuperclass
}
found
}

/**
* Create a [[HiveClient]] used for execution.
*
Expand Down Expand Up @@ -312,7 +329,7 @@ private[spark] object HiveUtils extends Logging {
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = true,
isolationOn = !isCliSessionState(),
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ private[hive] trait HiveClient {
/** Returns the configuration for the given key in the current session. */
def getConf(key: String, defaultValue: String): String

/**
* Return the associated Hive SessionState of this [[HiveClientImpl]]
* @return [[Any]] not SessionState to avoid linkage error
*/
def getState: Any

/**
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
* result in one string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -105,107 +105,78 @@ private[hive] class HiveClientImpl(
// Create an internal session state for this HiveClientImpl.
val state: SessionState = {
val original = Thread.currentThread().getContextClassLoader
// Switch to the initClassLoader.
Thread.currentThread().setContextClassLoader(initClassLoader)

// Set up kerberos credentials for UserGroupInformation.loginUser within
// current class loader
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
val principalName = sparkConf.get("spark.yarn.principal")
val keytabFileName = sparkConf.get("spark.yarn.keytab")
if (!new File(keytabFileName).exists()) {
throw new SparkException(s"Keytab file: ${keytabFileName}" +
" specified in spark.yarn.keytab does not exist")
} else {
logInfo("Attempting to login to Kerberos" +
s" using principal: ${principalName} and keytab: ${keytabFileName}")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
}
}

def isCliSessionState(state: SessionState): Boolean = {
var temp: Class[_] = if (state != null) state.getClass else null
var found = false
while (temp != null && !found) {
found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
temp = temp.getSuperclass
if (clientLoader.isolationOn) {
// Switch to the initClassLoader.
Thread.currentThread().setContextClassLoader(initClassLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the behavior change safe here? Previously, we switch the context ClassLoader for both conditions, while in this PR we only do that if isolationOn is true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when isolation Off, we just switch a classloader to itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SessionState.get() is None, we should still call newState() and init from initClassLoader, should we also switch in that case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SessionState.get be null, then the IsolateOn will be turned on always. Only if we call SessionState.detachSession, will this happens?

Copy link
Member Author

@yaooqinn yaooqinn Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user app new an CliSessionState instance with built in hive jars to trigger isolate off, then it detach this state, and then new a hive client again, this time isolate off and SessionState.get() will be None, newState() will be called without changing the classloader, I think this is OK, because we never create a isolate class loader from beginning to end.

// Set up kerberos credentials for UserGroupInformation.loginUser within current class loader
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
val principal = sparkConf.get("spark.yarn.principal")
val keytab = sparkConf.get("spark.yarn.keytab")
SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab)
}
found
}

val ret = try {
// originState will be created if not exists, will never be null
val originalState = SessionState.get()
if (isCliSessionState(originalState)) {
// In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
// which contains information like configurations from command line. Later
// we call `SparkSQLEnv.init()` there, which would run into this part again.
// so we should keep `conf` and reuse the existing instance of `CliSessionState`.
originalState
} else {
val hiveConf = new HiveConf(classOf[SessionState])
// 1: we set all confs in the hadoopConf to this hiveConf.
// This hadoopConf contains user settings in Hadoop's core-site.xml file
// and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
// SharedState and put settings in this hadoopConf instead of relying on HiveConf
// to load user settings. Otherwise, HiveConf's initialize method will override
// settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
// is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
// has hive-site.xml. So, HiveConf will use that to override its default values.
hadoopConf.iterator().asScala.foreach { entry =>
val key = entry.getKey
val value = entry.getValue
if (key.toLowerCase(Locale.ROOT).contains("password")) {
logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=xxx")
} else {
logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=$value")
}
hiveConf.set(key, value)
}
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
hiveConf.setClassLoader(initClassLoader)
// 2: we set all spark confs to this hiveConf.
sparkConf.getAll.foreach { case (k, v) =>
if (k.toLowerCase(Locale.ROOT).contains("password")) {
logDebug(s"Applying Spark config to Hive Conf: $k=xxx")
} else {
logDebug(s"Applying Spark config to Hive Conf: $k=$v")
}
hiveConf.set(k, v)
}
// 3: we set all entries in config to this hiveConf.
extraConfig.foreach { case (k, v) =>
if (k.toLowerCase(Locale.ROOT).contains("password")) {
logDebug(s"Applying extra config to HiveConf: $k=xxx")
} else {
logDebug(s"Applying extra config to HiveConf: $k=$v")
}
hiveConf.set(k, v)
}
val state = new SessionState(hiveConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state
try {
newState()
} finally {
Thread.currentThread().setContextClassLoader(original)
}
} finally {
Thread.currentThread().setContextClassLoader(original)
} else {
// Isolation off means we detect a CliSessionState instance in current thread.
// 1: Inside the spark project, we have already started a CliSessionState in
// `SparkSQLCLIDriver`, which contains configurations from command lines. Later, we call
// `SparkSQLEnv.init()` there, which would new a hive client again. so we should keep those
// configurations and reuse the existing instance of `CliSessionState`. In this case,
// SessionState.get will always return a CliSessionState.
// 2: In another case, a user app may start a CliSessionState outside spark project with built
// in hive jars, which will turn off isolation, if SessionSate.detachSession is
// called to remove the current state after that, hive client created later will initialize
// its own state by newState()
Option(SessionState.get).getOrElse(newState())
}
ret
}

// Log the default warehouse location.
logInfo(
s"Warehouse location for Hive client " +
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")

private def newState(): SessionState = {
val hiveConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
hiveConf.setClassLoader(initClassLoader)

// 1: Take all from the hadoopConf to this hiveConf.
// This hadoopConf contains user settings in Hadoop's core-site.xml file
// and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
// SharedState and put settings in this hadoopConf instead of relying on HiveConf
// to load user settings. Otherwise, HiveConf's initialize method will override
// settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
// is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
// has hive-site.xml. So, HiveConf will use that to override its default values.
// 2: we set all spark confs to this hiveConf.
// 3: we set all entries in config to this hiveConf.
(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) =>
logDebug(
s"""
|Applying Hadoop/Hive/Spark and extra properties to Hive Conf:
|$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v}
""".stripMargin)
hiveConf.set(k, v)
}
val state = new SessionState(hiveConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state
}

/** Returns the configuration for the current session. */
def conf: HiveConf = state.getConf

Expand Down Expand Up @@ -269,6 +240,9 @@ private[hive] class HiveClientImpl(
}
}

/** Return the associated Hive [[SessionState]] of this [[HiveClientImpl]] */
override def getState: SessionState = withHiveState(state)

/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
Expand Down