Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CometShuffleExchangeExec logical link should be correct #324

Merged
merged 8 commits into from
Apr 30, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Apr 25, 2024

Which issue does this PR close?

Closes #323.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

Comment on lines +336 to +337
"spark.sql.adaptive.autoBroadcastJoinThreshold" -> "-1",
"spark.sql.autoBroadcastJoinThreshold" -> "-1",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query plan is changed due to the logical link fix. In order to have CometSortMergeJoin, we need to disable broadcast join and AQE broadcast join.

@@ -373,6 +405,7 @@ class CometExecSuite extends CometTestBase {
withSQLConf(
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
Copy link
Member Author

@viirya viirya Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar reason here. The query plan is changed and AQE interferes with a broadcast join.

Comment on lines +199 to +209
override def equals(obj: Any): Boolean = {
obj match {
case other: CometShuffleExchangeExec =>
this.outputPartitioning == other.outputPartitioning &&
this.shuffleOrigin == other.shuffleOrigin && this.child == other.child &&
this.shuffleType == other.shuffleType &&
this.advisoryPartitionSize == other.advisoryPartitionSize
case _ =>
false
}
}
Copy link
Member Author

@viirya viirya Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because CometShuffleExchangeExec adds originalPlan parameter which is not covered by canonicalization in Spark, we need to exclude it when compare two CometShuffleExchangeExec to make sure Spark reuse shuffle rule work.

Comment on lines +214 to +215
override def stringArgs: Iterator[Any] =
Iterator(outputPartitioning, shuffleOrigin, shuffleType, child) ++ Iterator(s"[plan_id=$id]")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow Spark Exchange.stringArgs.

@@ -91,6 +91,7 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #336

@viirya
Copy link
Member Author

viirya commented Apr 27, 2024

cc @sunchao @andygrove

1 similar comment
@viirya
Copy link
Member Author

viirya commented Apr 29, 2024

cc @sunchao @andygrove

@viirya
Copy link
Member Author

viirya commented Apr 29, 2024

cc @andygrove Can you take a look if you have time? Thanks.

Comment on lines +267 to +273
case ReusedExchangeExec(_, c: CometBroadcastExchangeExec) =>
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
case BroadcastQueryStageExec(
_,
ReusedExchangeExec(_, c: CometBroadcastExchangeExec),
_) =>
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are necessary but missed to add previously. This fix exposes that.

@viirya viirya merged commit b326637 into apache:main Apr 30, 2024
28 checks passed
@viirya
Copy link
Member Author

viirya commented Apr 30, 2024

Merged. Thanks @sunchao

@viirya viirya deleted the fix_shuffle_logical_link branch April 30, 2024 06:10
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
)

* fix: CometShuffleExchangeExec logical link should be correct

* Implement equals and hashCode for CometShuffleExchangeExec

* Update plan stability

* Restore plan stability

* Dedup test

* Remove unused import

* Fix test

* Use columnar shuffle
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CometShuffleExchangeExec logical link is different to Spark ShuffleExchangeExec
3 participants