Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 25, 2024
1 parent 08079a5 commit 5b733b3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,32 @@ class CometSparkSessionExtensions
case CometScanWrapper(_, s) => s
}

// Set up logical links
newPlan = newPlan.transform { case op: CometExec =>
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
// scalastyle:off println
println(
s"op: $op, logicalLink: ${op.logicalLink}, " +
s"originalPlan: ${op.originalPlan}")
op
newPlan = newPlan.transform {
case op: CometExec =>
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
op
case op: CometShuffleExchangeExec =>
// Original Spark shuffle exchange operator might have empty logical link.
// But the `setLogicalLink` call above on downstream operator of
// `CometShuffleExchangeExec` will set its logical link to the downstream
// operators which cause AQE behavior to be incorrect. So we need to unset
// the logical link here.
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op

case op: CometBroadcastExchangeExec =>
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
}

// Convert native execution block by linking consecutive native operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ trait ShimCometShuffleExchangeExec {
val cometShuffle = CometShuffleExchangeExec(
s.outputPartitioning,
s.child,
s,
s.shuffleOrigin,
shuffleType,
advisoryPartitionSize)
// Set logical link to the new CometShuffleExchangeExec to make AQE work correctly
s.logicalLink.foreach(cometShuffle.setLogicalLink)
// s.logicalLink.foreach(cometShuffle.setLogicalLink)
// scalastyle:off println
println(
s"s: $s, s.logicalLink: ${s.logicalLink}, cometShuffle: $cometShuffle, " +
s"${cometShuffle.logicalLink}")
// println(
// s"s: $s, s.logicalLink: ${s.logicalLink}, cometShuffle: $cometShuffle, " +
// s"${cometShuffle.logicalLink}")
cometShuffle
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import org.apache.comet.shims.ShimCometShuffleExchangeExec
case class CometShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
originalPlan: ShuffleExchangeLike,
shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS,
shuffleType: ShuffleType = CometNativeShuffle,
advisoryPartitionSize: Option[Long] = None)
Expand Down

0 comments on commit 5b733b3

Please sign in to comment.