diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 2e9975bcabc3f..f7a4be9591818 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -63,6 +63,10 @@ private[hive] class SparkExecuteStatementOperation( } } + private val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) { + new VariableSubstitution().substitute(statement) + } + private var result: DataFrame = _ // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. @@ -126,6 +130,17 @@ private[hive] class SparkExecuteStatementOperation( } def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties { + try { + sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement) + getNextRowSetInternal(order, maxRowsL) + } finally { + sqlContext.sparkContext.clearJobGroup() + } + } + + private def getNextRowSetInternal( + order: FetchOrientation, + maxRowsL: Long): RowSet = withLocalProperties { log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + s"with ${statementId}") validateDefaultFetchOrientation(order) @@ -306,9 +321,6 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader) } - val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) { - new VariableSubstitution().substitute(statement) - } sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement) result = sqlContext.sql(statement) logDebug(result.queryExecution.toString())