-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32056][SQL] Coalesce partitions for repartition by expressions when AQE is enabled #28900
Conversation
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { | ||
private def repartitionByExpression( | ||
numPartitions: Option[Int], | ||
partitionExprs: Column*): Dataset[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for internal method, we don't need to use var-length parameter list.
|
||
private def repartitionByRange( | ||
numPartitions: Option[Int], | ||
partitionExprs: Column*): Dataset[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
exchange.ShuffleExchangeExec( | ||
r.partitioning, planLater(r.child), canChangeNumPartitions = false) :: Nil | ||
r.partitioning, planLater(r.child), canChangeNumPartitions = canChangeNumParts) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we have a variable name, we can just write r.partitioning, planLater(r.child), canChangeNumParts
SQLConf.SHUFFLE_PARTITIONS.key -> "6", | ||
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { | ||
val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length | ||
val df = spark.range(10).repartition($"id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test repartition(numPartitions)
in this test case and make sure the partition number doesn't change? Your new test case already test repartition by key/range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
Test build #124378 has finished for PR 28900 at commit
|
Test build #124391 has finished for PR 28900 at commit
|
Test build #124387 has finished for PR 28900 at commit
|
retest this please |
} | ||
|
||
val partitionsNum2 = df2.rdd.collectPartitions().length | ||
assert(partitionsNum2 == 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: assert(df2.rdd.collectPartitions().length == 10)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
Test build #124401 has finished for PR 28900 at commit
|
Test build #124424 has finished for PR 28900 at commit
|
Can we add the feature in |
Do you mean like Seems currently the |
val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length | ||
val df1 = spark.range(10).repartition($"id") | ||
val df2 = spark.range(10).repartition(10, $"id") | ||
val df3 = spark.range(10).repartition(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repartitionByRange
also takes numPartitions
. Can we test it as well and check it doesn't coalesce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it.
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
I mean the repartition, such as this sql |
@@ -1026,13 +1026,79 @@ class AdaptiveQueryExecSuite | |||
Seq(true, false).foreach { enableAQE => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can merge this test case to your two newly added test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. one test to test repartition
, and it verifies both the initial partition number and the coalesced partition number. The other test tests the same thing but for repartitionByRange
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, merged them.
Sounds reasonable to me. @cloud-fan WDYT? |
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Show resolved
Hide resolved
Yea, |
Test build #124461 has finished for PR 28900 at commit
|
Test build #124467 has finished for PR 28900 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
LGTM. We can support |
Test build #124491 has finished for PR 28900 at commit
|
@viirya Can we support |
thanks, merging to master! |
@viirya please send a new PR to fix the SQL side, thanks! |
@cloud-fan Thanks, will do it. |
…nt and sql when AQE is enabled ### What changes were proposed in this pull request? As the followup of #28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled. ### Why are the changes needed? When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled. ### Does this PR introduce _any_ user-facing change? Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions. ### How was this patch tested? Unit tests. Closes #28952 from viirya/SPARK-32056-sql. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… when AQE is enabled This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled. When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling. Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions. Added unit test. Closes apache#28900 from viirya/SPARK-32056. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nt and sql when AQE is enabled As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled. When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled. Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions. Unit tests. Closes apache#28952 from viirya/SPARK-32056-sql. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… when AQE is enabled This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled. When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling. Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions. Added unit test. Closes apache#28900 from viirya/SPARK-32056. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nt and sql when AQE is enabled As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled. When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled. Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions. Unit tests. Closes apache#28952 from viirya/SPARK-32056-sql. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… when AQE is enabled This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled. When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling. Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions. Added unit test. Closes apache#28900 from viirya/SPARK-32056. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nt and sql when AQE is enabled As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled. When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled. Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions. Unit tests. Closes apache#28952 from viirya/SPARK-32056-sql. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… when AQE is enabled This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled. When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling. Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions. Added unit test. Closes apache#28900 from viirya/SPARK-32056. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nt and sql when AQE is enabled As the followup of apache#28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled. When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled. Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions. Unit tests. Closes apache#28952 from viirya/SPARK-32056-sql. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.
Why are the changes needed?
When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.
Does this PR introduce any user-facing change?
Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.
How was this patch tested?
Added unit test.