From 517b810dfa5076c3d0155d1e134dc93317ec3ec0 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Sat, 21 Nov 2020 08:39:16 -0800 Subject: [PATCH] [SPARK-33463][SQL] Keep Job Id during incremental collect in Spark Thrift Server ### What changes were proposed in this pull request? When enabling **spark.sql.thriftServer.incrementalCollect** Job Ids get lost and tracing queries in Spark Thrift Server ends up being too complicated. ### Why are the changes needed? Because it will make easier tracing Spark Thrift Server queries. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests are enough. No need of more tests. Closes #30390 from gumartinm/master. Authored-by: Gustavo Martin Morcuende Signed-off-by: Dongjoon Hyun --- .../SparkExecuteStatementOperation.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 2e9975bcabc3f..f7a4be9591818 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -63,6 +63,10 @@ private[hive] class SparkExecuteStatementOperation( } } + private val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) { + new VariableSubstitution().substitute(statement) + } + private var result: DataFrame = _ // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. @@ -126,6 +130,17 @@ private[hive] class SparkExecuteStatementOperation( } def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties { + try { + sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement) + getNextRowSetInternal(order, maxRowsL) + } finally { + sqlContext.sparkContext.clearJobGroup() + } + } + + private def getNextRowSetInternal( + order: FetchOrientation, + maxRowsL: Long): RowSet = withLocalProperties { log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + s"with ${statementId}") validateDefaultFetchOrientation(order) @@ -306,9 +321,6 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader) } - val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) { - new VariableSubstitution().substitute(statement) - } sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement) result = sqlContext.sql(statement) logDebug(result.queryExecution.toString())