-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled #27986
Conversation
Test build #120187 has finished for PR 27986 at commit
|
Test build #120191 has finished for PR 27986 at commit
|
Test build #120204 has finished for PR 27986 at commit
|
cc @cloud-fan @HyukjinKwon @dongjoon-hyun @maryannxue Do we need this change to make |
From a cursory look, I think it makes sense. |
adaptive execution estimates the number of partitions for a shuffle using |
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Test build #123610 has finished for PR 27986 at commit
|
retest this please |
@@ -1021,4 +1021,20 @@ class AdaptiveQueryExecSuite | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { | |||
Seq(true, false).foreach { adaptiveExecutionEnabled => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this shorter? enableAQE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it.
test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { | ||
Seq(true, false).foreach { adaptiveExecutionEnabled => | ||
withSQLConf( | ||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$adaptiveExecutionEnabled", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enableAQE.toString
Test build #123625 has finished for PR 27986 at commit
|
Test build #123636 has finished for PR 27986 at commit
|
thanks, merging to master/3.0! |
…eExecutionEnabled ### What changes were proposed in this pull request? This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled. ### Why are the changes needed? To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number. How to reproduce: ```scala spark.sql("CREATE TABLE spark_31220(id int)") spark.sql("set spark.sql.adaptive.enabled=true") spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000") ``` Before this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 200), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` After this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 1000), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test. Closes #27986 from wangyum/SPARK-31220. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 1d1eacd) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
i rebuild spark using this pullreq hoping to have
what i see is that the actual number of shuffle partitions is always equal to e.g. if i have adaptive execution enabled and i set |
@koertkuipers The shuffle write step is always |
@wangyum i have i have when i do a groupBy instead of repartition than the number of tasks varies with data size (and is less than 2048) and the number of output files varies too (and is less than 2048). with repartition the tasks and output files are always fixed at 2048. |
@koertkuipers, are you able to show the exact reproducible steps? Sounds like ideally we should file a separate JIRA. |
note: Other optimization can still happen, like SMJ to BHJ. |
@cloud-fan so how can i repartition by a column while the number of partitions is set smartly (based on data size) instead of using some user specified number of partitions or hardcoded value? repartitioning a dataframe by columns is fairly typical before writing to a partitioned file sink to avoid too many files per directory. see for example: and personally for repartition i would expect the optimal number of files to be written if AQE is enabled and i did not specify the number of partitions. thats why i was so confused by the current results. but thats just my opinion. |
repartition by key is another story and should support partition coalesce. If it's not the case now, please create a bug report in the JIRA, thanks! |
@cloud-fan ok i will create jira for repartition by key with partition coalesce for AQE |
…eExecutionEnabled This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled. To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number. How to reproduce: ```scala spark.sql("CREATE TABLE spark_31220(id int)") spark.sql("set spark.sql.adaptive.enabled=true") spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000") ``` Before this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 200), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` After this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 1000), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` No. Unit test. Closes apache#27986 from wangyum/SPARK-31220. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR makes
repartition
/DISTRIBUTE BY
obeys initialPartitionNum when adaptive execution enabled.Why are the changes needed?
To make
DISTRIBUTE BY
/GROUP BY
partitioned by same partition number.How to reproduce:
Before this PR:
After this PR:
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.