diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58bff5229..15832e6cf4ec4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.IOException +import java.io.{File, IOException} import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -136,8 +136,14 @@ class SparkHadoopUtil extends Logging { def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } - def loginUserFromKeytab(principalName: String, keytabFilename: String) { - UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) + def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = { + if (!new File(keytabFilename).exists()) { + throw new SparkException(s"Keytab file: ${keytabFilename} does not exist") + } else { + logInfo("Attempting to login to Kerberos" + + s" using principal: ${principalName} and keytab: ${keytabFilename}") + UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) + } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0ea14361b2f77..809aebbc50086 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -581,18 +581,13 @@ object SparkSubmit extends CommandLineUtils { if (clusterManager == YARN || clusterManager == LOCAL) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") - if (!new File(args.keytab).exists()) { - throw new SparkException(s"Keytab file: ${args.keytab} does not exist") - } else { - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sysProps.put("spark.yarn.keytab", args.keytab) - sysProps.put("spark.yarn.principal", args.principal) - - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - } + SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab) + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala new file mode 100644 index 0000000000000..5f9ea4d26790b --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import org.apache.hadoop.hive.cli.CliSessionState +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.session.SessionState + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.hive.HiveUtils + +class HiveCliSessionStateSuite extends SparkFunSuite { + + def withSessionClear(f: () => Unit): Unit = { + try f finally SessionState.detachSession() + } + + test("CliSessionState will be reused") { + withSessionClear { () => + val hiveConf = new HiveConf(classOf[SessionState]) + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { + case (key, value) => hiveConf.set(key, value) + } + val sessionState: SessionState = new CliSessionState(hiveConf) + SessionState.start(sessionState) + val s1 = SessionState.get + val sparkConf = new SparkConf() + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + val s2 = HiveUtils.newClientForMetadata(sparkConf, hadoopConf).getState + assert(s1 === s2) + assert(s2.isInstanceOf[CliSessionState]) + } + } + + test("SessionState will not be reused") { + withSessionClear { () => + val sparkConf = new SparkConf() + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { + case (key, value) => hadoopConf.set(key, value) + } + val hiveClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf) + val s1 = hiveClient.getState + val s2 = hiveClient.newSession().getState + assert(s1 !== s2) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a29d7a7565ee1..8efad394d9490 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo @@ -229,6 +230,22 @@ private[spark] object HiveUtils extends Logging { }.toMap } + /** + * Check current Thread's SessionState type + * @return true when SessionState.get returns an instance of CliSessionState, + * false when it gets non-CliSessionState instance or null + */ + def isCliSessionState(): Boolean = { + val state = SessionState.get + var temp: Class[_] = if (state != null) state.getClass else null + var found = false + while (temp != null && !found) { + found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" + temp = temp.getSuperclass + } + found + } + /** * Create a [[HiveClient]] used for execution. * @@ -312,7 +329,7 @@ private[spark] object HiveUtils extends Logging { hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, - isolationOn = true, + isolationOn = !isCliSessionState(), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 16a80f9fff452..8cff0ca0963bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -38,6 +38,12 @@ private[hive] trait HiveClient { /** Returns the configuration for the given key in the current session. */ def getConf(key: String, defaultValue: String): String + /** + * Return the associated Hive SessionState of this [[HiveClientImpl]] + * @return [[Any]] not SessionState to avoid linkage error + */ + def getState: Any + /** * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will * result in one string. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index be024adac8eb0..27f60cf3d3911 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException @@ -105,100 +105,33 @@ private[hive] class HiveClientImpl( // Create an internal session state for this HiveClientImpl. val state: SessionState = { val original = Thread.currentThread().getContextClassLoader - // Switch to the initClassLoader. - Thread.currentThread().setContextClassLoader(initClassLoader) - - // Set up kerberos credentials for UserGroupInformation.loginUser within - // current class loader - if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { - val principalName = sparkConf.get("spark.yarn.principal") - val keytabFileName = sparkConf.get("spark.yarn.keytab") - if (!new File(keytabFileName).exists()) { - throw new SparkException(s"Keytab file: ${keytabFileName}" + - " specified in spark.yarn.keytab does not exist") - } else { - logInfo("Attempting to login to Kerberos" + - s" using principal: ${principalName} and keytab: ${keytabFileName}") - UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName) - } - } - - def isCliSessionState(state: SessionState): Boolean = { - var temp: Class[_] = if (state != null) state.getClass else null - var found = false - while (temp != null && !found) { - found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" - temp = temp.getSuperclass + if (clientLoader.isolationOn) { + // Switch to the initClassLoader. + Thread.currentThread().setContextClassLoader(initClassLoader) + // Set up kerberos credentials for UserGroupInformation.loginUser within current class loader + if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { + val principal = sparkConf.get("spark.yarn.principal") + val keytab = sparkConf.get("spark.yarn.keytab") + SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab) } - found - } - - val ret = try { - // originState will be created if not exists, will never be null - val originalState = SessionState.get() - if (isCliSessionState(originalState)) { - // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`, - // which contains information like configurations from command line. Later - // we call `SparkSQLEnv.init()` there, which would run into this part again. - // so we should keep `conf` and reuse the existing instance of `CliSessionState`. - originalState - } else { - val hiveConf = new HiveConf(classOf[SessionState]) - // 1: we set all confs in the hadoopConf to this hiveConf. - // This hadoopConf contains user settings in Hadoop's core-site.xml file - // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in - // SharedState and put settings in this hadoopConf instead of relying on HiveConf - // to load user settings. Otherwise, HiveConf's initialize method will override - // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars - // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath - // has hive-site.xml. So, HiveConf will use that to override its default values. - hadoopConf.iterator().asScala.foreach { entry => - val key = entry.getKey - val value = entry.getValue - if (key.toLowerCase(Locale.ROOT).contains("password")) { - logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=xxx") - } else { - logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=$value") - } - hiveConf.set(key, value) - } - // HiveConf is a Hadoop Configuration, which has a field of classLoader and - // the initial value will be the current thread's context class loader - // (i.e. initClassLoader at here). - // We call initialConf.setClassLoader(initClassLoader) at here to make - // this action explicit. - hiveConf.setClassLoader(initClassLoader) - // 2: we set all spark confs to this hiveConf. - sparkConf.getAll.foreach { case (k, v) => - if (k.toLowerCase(Locale.ROOT).contains("password")) { - logDebug(s"Applying Spark config to Hive Conf: $k=xxx") - } else { - logDebug(s"Applying Spark config to Hive Conf: $k=$v") - } - hiveConf.set(k, v) - } - // 3: we set all entries in config to this hiveConf. - extraConfig.foreach { case (k, v) => - if (k.toLowerCase(Locale.ROOT).contains("password")) { - logDebug(s"Applying extra config to HiveConf: $k=xxx") - } else { - logDebug(s"Applying extra config to HiveConf: $k=$v") - } - hiveConf.set(k, v) - } - val state = new SessionState(hiveConf) - if (clientLoader.cachedHive != null) { - Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) - } - SessionState.start(state) - state.out = new PrintStream(outputBuffer, true, "UTF-8") - state.err = new PrintStream(outputBuffer, true, "UTF-8") - state + try { + newState() + } finally { + Thread.currentThread().setContextClassLoader(original) } - } finally { - Thread.currentThread().setContextClassLoader(original) + } else { + // Isolation off means we detect a CliSessionState instance in current thread. + // 1: Inside the spark project, we have already started a CliSessionState in + // `SparkSQLCLIDriver`, which contains configurations from command lines. Later, we call + // `SparkSQLEnv.init()` there, which would new a hive client again. so we should keep those + // configurations and reuse the existing instance of `CliSessionState`. In this case, + // SessionState.get will always return a CliSessionState. + // 2: In another case, a user app may start a CliSessionState outside spark project with built + // in hive jars, which will turn off isolation, if SessionSate.detachSession is + // called to remove the current state after that, hive client created later will initialize + // its own state by newState() + Option(SessionState.get).getOrElse(newState()) } - ret } // Log the default warehouse location. @@ -206,6 +139,44 @@ private[hive] class HiveClientImpl( s"Warehouse location for Hive client " + s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") + private def newState(): SessionState = { + val hiveConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + hiveConf.setClassLoader(initClassLoader) + + // 1: Take all from the hadoopConf to this hiveConf. + // This hadoopConf contains user settings in Hadoop's core-site.xml file + // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in + // SharedState and put settings in this hadoopConf instead of relying on HiveConf + // to load user settings. Otherwise, HiveConf's initialize method will override + // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars + // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath + // has hive-site.xml. So, HiveConf will use that to override its default values. + // 2: we set all spark confs to this hiveConf. + // 3: we set all entries in config to this hiveConf. + (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) + ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) => + logDebug( + s""" + |Applying Hadoop/Hive/Spark and extra properties to Hive Conf: + |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v} + """.stripMargin) + hiveConf.set(k, v) + } + val state = new SessionState(hiveConf) + if (clientLoader.cachedHive != null) { + Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) + } + SessionState.start(state) + state.out = new PrintStream(outputBuffer, true, "UTF-8") + state.err = new PrintStream(outputBuffer, true, "UTF-8") + state + } + /** Returns the configuration for the current session. */ def conf: HiveConf = state.getConf @@ -269,6 +240,9 @@ private[hive] class HiveClientImpl( } } + /** Return the associated Hive [[SessionState]] of this [[HiveClientImpl]] */ + override def getState: SessionState = withHiveState(state) + /** * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive. */