Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 4, 2024
1 parent b35c9f5 commit 5efc953
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ case class CometBroadcastExchangeExec(originalPlan: SparkPlan, child: SparkPlan)
this
}
def getNumPartitions(): Int = {
// scalastyle:off println
println(s"numPartitions: $numPartitions")
numPartitions.getOrElse(child.executeColumnar().getNumPartitions)
}

Expand Down
21 changes: 16 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ abstract class CometNativeExec extends CometExec {
val (broadcastPlans, plans) = sparkPlans.partition { plan =>
plan.find {
case _: CometBroadcastExchangeExec => true
case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
case _ => false
}.isDefined
}
Expand All @@ -246,18 +247,28 @@ abstract class CometNativeExec extends CometExec {

if (broadcastPlans.nonEmpty) {
broadcastPlans.foreach { broadcastPlan =>
val rdd = broadcastPlan
.asInstanceOf[CometBroadcastExchangeExec]
.setNumPartitions(numPartitions(0))
.executeColumnar()
inputs += rdd
broadcastPlan match {
case c: CometBroadcastExchangeExec =>
c.setNumPartitions(numPartitions(0))
case BroadcastQueryStageExec(_, c: CometBroadcastExchangeExec, _) =>
c.setNumPartitions(numPartitions(0))
case _ =>
throw new CometRuntimeException(
s"Unexpected plan type for broadcast plan: $broadcastPlan")
}
}
broadcastPlans.map(_.executeColumnar()).foreach(inputs += _)
}

if (inputs.isEmpty) {
throw new CometRuntimeException(s"No input for CometNativeExec: $this")
}

if (inputs.size != sparkPlans.size) {
throw new CometRuntimeException(
s"Number of inputs doesn't match number of plans: $inputs vs. $sparkPlans")
}

ZippedPartitionsRDD(sparkContext, inputs.toSeq)(createCometExecIter(_))
}
}
Expand Down

0 comments on commit 5efc953

Please sign in to comment.