-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: CometShuffleExchangeExec logical link should be correct #324
Conversation
"spark.sql.adaptive.autoBroadcastJoinThreshold" -> "-1", | ||
"spark.sql.autoBroadcastJoinThreshold" -> "-1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query plan is changed due to the logical link fix. In order to have CometSortMergeJoin, we need to disable broadcast join and AQE broadcast join.
3a9fdb9
to
aa100c6
Compare
@@ -373,6 +405,7 @@ class CometExecSuite extends CometTestBase { | |||
withSQLConf( | |||
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true", | |||
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", | |||
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar reason here. The query plan is changed and AQE interferes with a broadcast join.
aa100c6
to
96b9826
Compare
override def equals(obj: Any): Boolean = { | ||
obj match { | ||
case other: CometShuffleExchangeExec => | ||
this.outputPartitioning == other.outputPartitioning && | ||
this.shuffleOrigin == other.shuffleOrigin && this.child == other.child && | ||
this.shuffleType == other.shuffleType && | ||
this.advisoryPartitionSize == other.advisoryPartitionSize | ||
case _ => | ||
false | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because CometShuffleExchangeExec
adds originalPlan
parameter which is not covered by canonicalization in Spark, we need to exclude it when compare two CometShuffleExchangeExec
to make sure Spark reuse shuffle rule work.
override def stringArgs: Iterator[Any] = | ||
Iterator(outputPartitioning, shuffleOrigin, shuffleType, child) ++ Iterator(s"[plan_id=$id]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow Spark Exchange.stringArgs
.
spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Outdated
Show resolved
Hide resolved
@@ -91,6 +91,7 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH | |||
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") | |||
conf.set(CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key, "true") | |||
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") | |||
conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #336
1 similar comment
cc @andygrove Can you take a look if you have time? Thanks. |
case ReusedExchangeExec(_, c: CometBroadcastExchangeExec) => | ||
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() | ||
case BroadcastQueryStageExec( | ||
_, | ||
ReusedExchangeExec(_, c: CometBroadcastExchangeExec), | ||
_) => | ||
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are necessary but missed to add previously. This fix exposes that.
Merged. Thanks @sunchao |
Which issue does this PR close?
Closes #323.
Rationale for this change
What changes are included in this PR?
How are these changes tested?