Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled #27986

Closed
wants to merge 5 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Mar 23, 2020

What changes were proposed in this pull request?

This PR makes repartition/DISTRIBUTE BY obeys initialPartitionNum when adaptive execution enabled.

Why are the changes needed?

To make DISTRIBUTE BY/GROUP BY partitioned by same partition number.
How to reproduce:

spark.sql("CREATE TABLE spark_31220(id int)")
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")

Before this PR:

scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 200), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]

After this PR:

scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@SparkQA
Copy link

SparkQA commented Mar 23, 2020

Test build #120187 has finished for PR 27986 at commit af4248b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2020

Test build #120191 has finished for PR 27986 at commit 9a6fe6c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2020

Test build #120204 has finished for PR 27986 at commit abf3ef1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Mar 31, 2020

cc @cloud-fan @HyukjinKwon @dongjoon-hyun @maryannxue Do we need this change to make DISTRIBUTE BY/GROUP BY partitioned by same partition number?

@HyukjinKwon
Copy link
Member

From a cursory look, I think it makes sense.

@koertkuipers
Copy link
Contributor

adaptive execution estimates the number of partitions for a shuffle using spark.sql.adaptive.shuffle.targetPostShuffleInputSize as its target size per shuffled partition. i was surprised to find out however that it does not do this for a DataFrame.repartition(...). i dont understand why since under the hood its also just a shuffle no different than a DataFrame.groupBy.
will this pull request fix this issue? from looking at code i dont understand if it does so, it doesnt look like it to me.

# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123610 has finished for PR 27986 at commit 1e6ed30.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jun 8, 2020

retest this please

@@ -1021,4 +1021,20 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") {
Seq(true, false).foreach { adaptiveExecutionEnabled =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this shorter? enableAQE?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") {
Seq(true, false).foreach { adaptiveExecutionEnabled =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$adaptiveExecutionEnabled",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enableAQE.toString

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123625 has finished for PR 27986 at commit 1e6ed30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123636 has finished for PR 27986 at commit c752441.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in 1d1eacd Jun 9, 2020
cloud-fan pushed a commit that referenced this pull request Jun 9, 2020
…eExecutionEnabled

### What changes were proposed in this pull request?
This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled.

### Why are the changes needed?
To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number.
How to reproduce:
```scala
spark.sql("CREATE TABLE spark_31220(id int)")
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")
```

Before this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 200), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```
After this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #27986 from wangyum/SPARK-31220.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 1d1eacd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@wangyum wangyum deleted the SPARK-31220 branch June 10, 2020 00:31
@koertkuipers
Copy link
Contributor

koertkuipers commented Jun 21, 2020

i rebuild spark using this pullreq hoping to have def repartition(partitionExprs: Column*) now use adaptive execution to scale the number of reducers. but i see no such behavior.
my simple test job only does 3 things:

  1. read from parquet datasource
  2. repartition by one column
  3. write to parquet datasink

what i see is that the actual number of shuffle partitions is always equal to spark.sql.shuffle.partitions or to spark.sql.adaptive.coalescePartitions.initialPartitionNum if it has been set. but it never adapts to the data size as i would expect with adapative execution (which it does with a groupBy).

e.g. if i have adaptive execution enabled and i set spark.sql.adaptive.coalescePartitions.initialPartitionNum=2048 then all my .repartition(...) operations run with 2048 reducers even for tiny data size.. which is not a good thing.

@wangyum
Copy link
Member Author

wangyum commented Jun 22, 2020

@koertkuipers The shuffle write step is always spark.sql.adaptive.coalescePartitions.initialPartitionNum.
Shuffle read step will coalesce if spark.sql.adaptive.coalescePartitions.enabled=true and the data size is small.

@koertkuipers
Copy link
Contributor

@wangyum i have spark.sql.adaptive.coalescePartitions.enabled=true and the data size is small.
how can i see that the step does coalesce? in number of tasks (i always see 2048)? in number of output files (i always see 2048)?

i have spark.sql.shuffle.partitions=2048 and spark.sql.adaptive.coalescePartitions.initialPartitionNum=2048.

when i do a groupBy instead of repartition than the number of tasks varies with data size (and is less than 2048) and the number of output files varies too (and is less than 2048). with repartition the tasks and output files are always fixed at 2048.

@HyukjinKwon
Copy link
Member

@koertkuipers, are you able to show the exact reproducible steps? Sounds like ideally we should file a separate JIRA.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 22, 2020

note: repartition is a physical operation that users specify how many partitions to have after the shuffle. That's why Spark can't coalesce the partition, as it breaks the user's expectation.

Other optimization can still happen, like SMJ to BHJ.

@koertkuipers
Copy link
Contributor

koertkuipers commented Jun 22, 2020

@cloud-fan so how can i repartition by a column while the number of partitions is set smartly (based on data size) instead of using some user specified number of partitions or hardcoded value?

repartitioning a dataframe by columns is fairly typical before writing to a partitioned file sink to avoid too many files per directory. see for example:
https://github.com/delta-io/delta/blob/master/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala#L472
in these situations its beneficial to write out the optimal number of files, not a fixed/hardcoded number...

and personally for repartition i would expect the optimal number of files to be written if AQE is enabled and i did not specify the number of partitions. thats why i was so confused by the current results. but thats just my opinion.

@cloud-fan
Copy link
Contributor

repartition by key is another story and should support partition coalesce. If it's not the case now, please create a bug report in the JIRA, thanks!

@koertkuipers
Copy link
Contributor

@cloud-fan ok i will create jira for repartition by key with partition coalesce for AQE

@koertkuipers
Copy link
Contributor

@cloud-fan see https://issues.apache.org/jira/browse/SPARK-32056

MGHawes pushed a commit to palantir/spark that referenced this pull request May 16, 2021
…eExecutionEnabled

This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled.

To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number.
How to reproduce:
```scala
spark.sql("CREATE TABLE spark_31220(id int)")
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")
```

Before this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 200), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```
After this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```

No.

Unit test.

Closes apache#27986 from wangyum/SPARK-31220.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants