Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CometShuffleExchangeExec logical link should be correct #324

Merged
merged 8 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,37 @@ class CometSparkSessionExtensions
}

// Set up logical links
newPlan = newPlan.transform { case op: CometExec =>
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
op
newPlan = newPlan.transform {
case op: CometExec =>
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
case op: CometShuffleExchangeExec =>
// Original Spark shuffle exchange operator might have empty logical link.
// But the `setLogicalLink` call above on downstream operator of
// `CometShuffleExchangeExec` will set its logical link to the downstream
// operators which cause AQE behavior to be incorrect. So we need to unset
// the logical link here.
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op

case op: CometBroadcastExchangeExec =>
if (op.originalPlan.logicalLink.isEmpty) {
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
} else {
op.originalPlan.logicalLink.foreach(op.setLogicalLink)
}
op
}

// Convert native execution block by linking consecutive native operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import org.apache.spark.util.MutablePair
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
import org.apache.spark.util.random.XORShiftRandom

import com.google.common.base.Objects

import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, QueryPlanSerde}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
Expand All @@ -61,6 +63,7 @@ import org.apache.comet.shims.ShimCometShuffleExchangeExec
case class CometShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
originalPlan: ShuffleExchangeLike,
shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS,
shuffleType: ShuffleType = CometNativeShuffle,
advisoryPartitionSize: Option[Long] = None)
Expand Down Expand Up @@ -192,6 +195,21 @@ case class CometShuffleExchangeExec(

override protected def withNewChildInternal(newChild: SparkPlan): CometShuffleExchangeExec =
copy(child = newChild)

override def equals(obj: Any): Boolean = {
obj match {
case other: CometShuffleExchangeExec =>
this.outputPartitioning == other.outputPartitioning &&
this.shuffleOrigin == other.shuffleOrigin && this.child == other.child &&
this.shuffleType == other.shuffleType &&
this.advisoryPartitionSize == other.advisoryPartitionSize
case _ =>
false
}
}
Comment on lines +199 to +209
Copy link
Member Author

@viirya viirya Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because CometShuffleExchangeExec adds originalPlan parameter which is not covered by canonicalization in Spark, we need to exclude it when compare two CometShuffleExchangeExec to make sure Spark reuse shuffle rule work.


override def hashCode(): Int =
Objects.hashCode(outputPartitioning, shuffleOrigin, shuffleType, advisoryPartitionSize, child)
}

object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait ShimCometShuffleExchangeExec {
CometShuffleExchangeExec(
s.outputPartitioning,
s.child,
s,
s.shuffleOrigin,
shuffleType,
advisoryPartitionSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Arguments: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_wareho

(4) CometExchange
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1]
Arguments: hashpartitioning(cs_order_number#5, 5), Exchange hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, [plan_id=1], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]

(5) CometSort
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Expand All @@ -84,7 +84,7 @@ Arguments: [cs_warehouse_sk#9, cs_order_number#10], [cs_warehouse_sk#9, cs_order

(9) CometExchange
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]
Arguments: hashpartitioning(cs_order_number#10, 5), Exchange hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, [plan_id=3], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=4]

(10) CometSort
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Expand Down Expand Up @@ -115,7 +115,7 @@ Arguments: [cr_order_number#12], [cr_order_number#12]

(16) CometExchange
Input [1]: [cr_order_number#12]
Arguments: hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=3]
Arguments: hashpartitioning(cr_order_number#12, 5), Exchange hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, [plan_id=5], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=6]

(17) CometSort
Input [1]: [cr_order_number#12]
Expand Down Expand Up @@ -150,7 +150,7 @@ Input [1]: [d_date_sk#14]

(24) BroadcastExchange
Input [1]: [d_date_sk#14]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=7]

(25) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [cs_ship_date_sk#1]
Expand Down Expand Up @@ -182,7 +182,7 @@ Input [1]: [ca_address_sk#16]

(31) BroadcastExchange
Input [1]: [ca_address_sk#16]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=5]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=8]

(32) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [cs_ship_addr_sk#2]
Expand Down Expand Up @@ -214,7 +214,7 @@ Input [1]: [cc_call_center_sk#18]

(38) BroadcastExchange
Input [1]: [cc_call_center_sk#18]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=6]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=9]

(39) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [cs_call_center_sk#3]
Expand Down Expand Up @@ -249,7 +249,7 @@ Results [3]: [sum#22, sum#23, count#25]

(44) Exchange
Input [3]: [sum#22, sum#23, count#25]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=7]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10]

(45) HashAggregate [codegen id : 9]
Input [3]: [sum#22, sum#23, count#25]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ Condition : isnotnull(c_customer_sk#33)

(45) CometExchange
Input [3]: [c_customer_sk#33, c_first_name#34, c_last_name#35]
Arguments: hashpartitioning(c_customer_sk#33, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=7]
Arguments: hashpartitioning(c_customer_sk#33, 5), Exchange hashpartitioning(c_customer_sk#33, 5), ENSURE_REQUIREMENTS, [plan_id=7], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=8]

(46) CometSort
Input [3]: [c_customer_sk#33, c_first_name#34, c_last_name#35]
Expand Down Expand Up @@ -341,7 +341,7 @@ Join condition: None

(54) BroadcastExchange
Input [3]: [c_customer_sk#33, c_first_name#34, c_last_name#35]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=8]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=9]

(55) BroadcastHashJoin [codegen id : 16]
Left keys [1]: [cs_bill_customer_sk#1]
Expand Down Expand Up @@ -375,7 +375,7 @@ Results [4]: [c_last_name#35, c_first_name#34, sum#39, isEmpty#40]

(61) Exchange
Input [4]: [c_last_name#35, c_first_name#34, sum#39, isEmpty#40]
Arguments: hashpartitioning(c_last_name#35, c_first_name#34, 5), ENSURE_REQUIREMENTS, [plan_id=9]
Arguments: hashpartitioning(c_last_name#35, c_first_name#34, 5), ENSURE_REQUIREMENTS, [plan_id=10]

(62) HashAggregate [codegen id : 17]
Input [4]: [c_last_name#35, c_first_name#34, sum#39, isEmpty#40]
Expand Down Expand Up @@ -414,7 +414,7 @@ Input [5]: [ws_item_sk#43, ws_bill_customer_sk#44, ws_quantity#45, ws_list_price

(69) Exchange
Input [4]: [ws_bill_customer_sk#44, ws_quantity#45, ws_list_price#46, ws_sold_date_sk#47]
Arguments: hashpartitioning(ws_bill_customer_sk#44, 5), ENSURE_REQUIREMENTS, [plan_id=10]
Arguments: hashpartitioning(ws_bill_customer_sk#44, 5), ENSURE_REQUIREMENTS, [plan_id=11]

(70) Sort [codegen id : 23]
Input [4]: [ws_bill_customer_sk#44, ws_quantity#45, ws_list_price#46, ws_sold_date_sk#47]
Expand Down Expand Up @@ -483,7 +483,7 @@ Results [4]: [c_last_name#51, c_first_name#50, sum#55, isEmpty#56]

(84) Exchange
Input [4]: [c_last_name#51, c_first_name#50, sum#55, isEmpty#56]
Arguments: hashpartitioning(c_last_name#51, c_first_name#50, 5), ENSURE_REQUIREMENTS, [plan_id=11]
Arguments: hashpartitioning(c_last_name#51, c_first_name#50, 5), ENSURE_REQUIREMENTS, [plan_id=12]

(85) HashAggregate [codegen id : 34]
Input [4]: [c_last_name#51, c_first_name#50, sum#55, isEmpty#56]
Expand Down Expand Up @@ -528,7 +528,7 @@ Input [1]: [d_date_sk#36]

(92) BroadcastExchange
Input [1]: [d_date_sk#36]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=12]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=13]

Subquery:2 Hosting operator id = 4 Hosting Expression = ss_sold_date_sk#8 IN dynamicpruning#9
BroadcastExchange (97)
Expand Down Expand Up @@ -558,7 +558,7 @@ Input [2]: [d_date_sk#10, d_date#11]

(97) BroadcastExchange
Input [2]: [d_date_sk#10, d_date#11]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=13]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=14]

Subquery:3 Hosting operator id = 39 Hosting Expression = Subquery scalar-subquery#31, [id=#32]
* HashAggregate (112)
Expand Down Expand Up @@ -628,7 +628,7 @@ Results [3]: [c_customer_sk#67, sum#71, isEmpty#72]

(108) Exchange
Input [3]: [c_customer_sk#67, sum#71, isEmpty#72]
Arguments: hashpartitioning(c_customer_sk#67, 5), ENSURE_REQUIREMENTS, [plan_id=14]
Arguments: hashpartitioning(c_customer_sk#67, 5), ENSURE_REQUIREMENTS, [plan_id=15]

(109) HashAggregate [codegen id : 4]
Input [3]: [c_customer_sk#67, sum#71, isEmpty#72]
Expand All @@ -646,7 +646,7 @@ Results [1]: [max#76]

(111) Exchange
Input [1]: [max#76]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=15]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=16]

(112) HashAggregate [codegen id : 5]
Input [1]: [max#76]
Expand Down Expand Up @@ -683,7 +683,7 @@ Input [1]: [d_date_sk#68]

(117) BroadcastExchange
Input [1]: [d_date_sk#68]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=16]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=17]

Subquery:5 Hosting operator id = 50 Hosting Expression = ReusedSubquery Subquery scalar-subquery#31, [id=#32]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Arguments: [ss_item_sk#1, ss_customer_sk#2, ss_store_sk#3, ss_ticket_number#4, s

(4) CometExchange
Input [5]: [ss_item_sk#1, ss_customer_sk#2, ss_store_sk#3, ss_ticket_number#4, ss_net_paid#5]
Arguments: hashpartitioning(ss_ticket_number#4, ss_item_sk#1, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1]
Arguments: hashpartitioning(ss_ticket_number#4, ss_item_sk#1, 5), Exchange hashpartitioning(ss_ticket_number#4, ss_item_sk#1, 5), ENSURE_REQUIREMENTS, [plan_id=1], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]

(5) CometSort
Input [5]: [ss_item_sk#1, ss_customer_sk#2, ss_store_sk#3, ss_ticket_number#4, ss_net_paid#5]
Expand All @@ -86,7 +86,7 @@ Arguments: [sr_item_sk#7, sr_ticket_number#8], [sr_item_sk#7, sr_ticket_number#8

(9) CometExchange
Input [2]: [sr_item_sk#7, sr_ticket_number#8]
Arguments: hashpartitioning(sr_ticket_number#8, sr_item_sk#7, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]
Arguments: hashpartitioning(sr_ticket_number#8, sr_item_sk#7, 5), Exchange hashpartitioning(sr_ticket_number#8, sr_item_sk#7, 5), ENSURE_REQUIREMENTS, [plan_id=3], ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=4]

(10) CometSort
Input [2]: [sr_item_sk#7, sr_ticket_number#8]
Expand Down Expand Up @@ -124,7 +124,7 @@ Input [4]: [s_store_sk#10, s_store_name#11, s_state#13, s_zip#14]

(18) BroadcastExchange
Input [4]: [s_store_sk#10, s_store_name#11, s_state#13, s_zip#14]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=3]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=5]

(19) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [ss_store_sk#3]
Expand Down Expand Up @@ -152,7 +152,7 @@ Input [6]: [i_item_sk#15, i_current_price#16, i_size#17, i_color#18, i_units#19,

(24) BroadcastExchange
Input [6]: [i_item_sk#15, i_current_price#16, i_size#17, i_color#18, i_units#19, i_manager_id#20]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=6]

(25) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [ss_item_sk#1]
Expand Down Expand Up @@ -180,7 +180,7 @@ Input [4]: [c_customer_sk#21, c_first_name#22, c_last_name#23, c_birth_country#2

(30) BroadcastExchange
Input [4]: [c_customer_sk#21, c_first_name#22, c_last_name#23, c_birth_country#24]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=5]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=7]

(31) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [ss_customer_sk#2]
Expand Down Expand Up @@ -208,7 +208,7 @@ Input [3]: [ca_state#25, ca_zip#26, ca_country#27]

(36) BroadcastExchange
Input [3]: [ca_state#25, ca_zip#26, ca_country#27]
Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [plan_id=6]
Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [plan_id=8]

(37) BroadcastHashJoin [codegen id : 5]
Left keys [2]: [c_birth_country#24, s_zip#14]
Expand All @@ -229,7 +229,7 @@ Results [11]: [c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_

(40) Exchange
Input [11]: [c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, sum#29]
Arguments: hashpartitioning(c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, 5), ENSURE_REQUIREMENTS, [plan_id=7]
Arguments: hashpartitioning(c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, 5), ENSURE_REQUIREMENTS, [plan_id=9]

(41) HashAggregate [codegen id : 6]
Input [11]: [c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, sum#29]
Expand All @@ -247,7 +247,7 @@ Results [5]: [c_last_name#23, c_first_name#22, s_store_name#11, sum#34, isEmpty#

(43) Exchange
Input [5]: [c_last_name#23, c_first_name#22, s_store_name#11, sum#34, isEmpty#35]
Arguments: hashpartitioning(c_last_name#23, c_first_name#22, s_store_name#11, 5), ENSURE_REQUIREMENTS, [plan_id=8]
Arguments: hashpartitioning(c_last_name#23, c_first_name#22, s_store_name#11, 5), ENSURE_REQUIREMENTS, [plan_id=10]

(44) HashAggregate [codegen id : 7]
Input [5]: [c_last_name#23, c_first_name#22, s_store_name#11, sum#34, isEmpty#35]
Expand Down Expand Up @@ -348,7 +348,7 @@ Input [6]: [i_item_sk#15, i_current_price#16, i_size#17, i_color#18, i_units#19,

(59) BroadcastExchange
Input [6]: [i_item_sk#15, i_current_price#16, i_size#17, i_color#18, i_units#19, i_manager_id#20]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=9]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=11]

(60) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [ss_item_sk#1]
Expand Down Expand Up @@ -395,7 +395,7 @@ Results [11]: [c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_

(69) Exchange
Input [11]: [c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, sum#41]
Arguments: hashpartitioning(c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, 5), ENSURE_REQUIREMENTS, [plan_id=10]
Arguments: hashpartitioning(c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, 5), ENSURE_REQUIREMENTS, [plan_id=12]

(70) HashAggregate [codegen id : 6]
Input [11]: [c_last_name#23, c_first_name#22, s_store_name#11, ca_state#25, s_state#13, i_color#18, i_current_price#16, i_manager_id#20, i_units#19, i_size#17, sum#41]
Expand All @@ -413,7 +413,7 @@ Results [2]: [sum#44, count#45]

(72) Exchange
Input [2]: [sum#44, count#45]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=13]

(73) HashAggregate [codegen id : 7]
Input [2]: [sum#44, count#45]
Expand Down
Loading
Loading