Skip to content

Commit

Permalink
[SPARK-33463][SQL] Keep Job Id during incremental collect in Spark Th…
Browse files Browse the repository at this point in the history
…rift Server

### What changes were proposed in this pull request?

When enabling **spark.sql.thriftServer.incrementalCollect** Job Ids get lost and tracing queries in Spark Thrift Server ends up being too complicated.

### Why are the changes needed?

Because it will make easier tracing Spark Thrift Server queries.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

The current tests are enough. No need of more tests.

Closes #30390 from gumartinm/master.

Authored-by: Gustavo Martin Morcuende <gu.martinm@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
gumartinm authored and dongjoon-hyun committed Nov 21, 2020
1 parent cf74901 commit 517b810
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) {
new VariableSubstitution().substitute(statement)
}

private var result: DataFrame = _

// We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST.
Expand Down Expand Up @@ -126,6 +130,17 @@ private[hive] class SparkExecuteStatementOperation(
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
try {
sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement)
getNextRowSetInternal(order, maxRowsL)
} finally {
sqlContext.sparkContext.clearJobGroup()
}
}

private def getNextRowSetInternal(
order: FetchOrientation,
maxRowsL: Long): RowSet = withLocalProperties {
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
Expand Down Expand Up @@ -306,9 +321,6 @@ private[hive] class SparkExecuteStatementOperation(
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
}

val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) {
new VariableSubstitution().substitute(statement)
}
sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
Expand Down

0 comments on commit 517b810

Please sign in to comment.