Skip to content

Commit

Permalink
set correct execution Id for broadcast query stage (apache#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonwang authored and Justin Uang committed Jan 14, 2019
1 parent fff789a commit f3b7dee
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,21 @@ abstract class QueryStage extends UnaryExecNode {
* blocking on one child stage.
*/
def executeChildStages(): Unit = {
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)

// Handle broadcast stages
val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
case bqs: BroadcastQueryStageInput => bqs.childStage
}
val broadcastFutures = broadcastQueryStages.map { queryStage =>
Future { queryStage.prepareBroadcast() }(QueryStage.executionContext)
Future {
SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) {
queryStage.prepareBroadcast()
}
}(QueryStage.executionContext)
}

// Submit shuffle stages
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect {
case sqs: ShuffleQueryStageInput => sqs.childStage
}
Expand Down

0 comments on commit f3b7dee

Please sign in to comment.