Skip to content

Commit

Permalink
fix: CometShuffleExchangeExec logical link should be correct
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 25, 2024
1 parent ef94c55 commit 96b9826
Show file tree
Hide file tree
Showing 38 changed files with 3,065 additions and 1,498 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,37 @@ class CometSparkSessionExtensions
}

// Set up logical links
newPlan = newPlan.transform { case op: CometExec =>
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
op
newPlan = newPlan.transform {
case op: CometExec =>
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
case op: CometShuffleExchangeExec =>
// Original Spark shuffle exchange operator might have empty logical link.
// But the `setLogicalLink` call above on downstream operator of
// `CometShuffleExchangeExec` will set its logical link to the downstream
// operators which cause AQE behavior to be incorrect. So we need to unset
// the logical link here.
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op

case op: CometBroadcastExchangeExec =>
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
}

// Convert native execution block by linking consecutive native operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import org.apache.comet.shims.ShimCometShuffleExchangeExec
case class CometShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
originalPlan: ShuffleExchangeLike,
shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS,
shuffleType: ShuffleType = CometNativeShuffle,
advisoryPartitionSize: Option[Long] = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait ShimCometShuffleExchangeExec {
CometShuffleExchangeExec(
s.outputPartitioning,
s.child,
s,
s.shuffleOrigin,
shuffleType,
advisoryPartitionSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Arguments: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_wareho

(4) CometExchange
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1]
Arguments: hashpartitioning(cs_order_number#5, 5), Exchange hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, [plan_id=1], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]

(5) CometSort
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Expand All @@ -84,7 +84,7 @@ Arguments: [cs_warehouse_sk#9, cs_order_number#10], [cs_warehouse_sk#9, cs_order

(9) CometExchange
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]
Arguments: hashpartitioning(cs_order_number#10, 5), Exchange hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, [plan_id=3], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=4]

(10) CometSort
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Expand Down Expand Up @@ -115,7 +115,7 @@ Arguments: [cr_order_number#12], [cr_order_number#12]

(16) CometExchange
Input [1]: [cr_order_number#12]
Arguments: hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=3]
Arguments: hashpartitioning(cr_order_number#12, 5), Exchange hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, [plan_id=5], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=6]

(17) CometSort
Input [1]: [cr_order_number#12]
Expand Down Expand Up @@ -150,7 +150,7 @@ Input [1]: [d_date_sk#14]

(24) BroadcastExchange
Input [1]: [d_date_sk#14]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=7]

(25) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [cs_ship_date_sk#1]
Expand Down Expand Up @@ -182,7 +182,7 @@ Input [1]: [ca_address_sk#16]

(31) BroadcastExchange
Input [1]: [ca_address_sk#16]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=5]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=8]

(32) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [cs_ship_addr_sk#2]
Expand Down Expand Up @@ -214,7 +214,7 @@ Input [1]: [cc_call_center_sk#18]

(38) BroadcastExchange
Input [1]: [cc_call_center_sk#18]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=6]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=9]

(39) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [cs_call_center_sk#3]
Expand Down Expand Up @@ -249,7 +249,7 @@ Results [3]: [sum#22, sum#23, count#25]

(44) Exchange
Input [3]: [sum#22, sum#23, count#25]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=7]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10]

(45) HashAggregate [codegen id : 9]
Input [3]: [sum#22, sum#23, count#25]
Expand Down
Loading

0 comments on commit 96b9826

Please sign in to comment.