Skip to content

Commit

Permalink
[SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiv…
Browse files Browse the repository at this point in the history
…eExecutionEnabled

### What changes were proposed in this pull request?
This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled.

### Why are the changes needed?
To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number.
How to reproduce:
```scala
spark.sql("CREATE TABLE spark_31220(id int)")
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")
```

Before this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 200), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```
After this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #27986 from wangyum/SPARK-31220.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
wangyum authored and cloud-fan committed Jun 9, 2020
1 parent 717ec5e commit 1d1eacd
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2784,7 +2784,15 @@ class SQLConf extends Serializable with Logging {

def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)

def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

def numShufflePartitions: Int = {
if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
} else {
defaultNumShufflePartitions
}
}

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

Expand All @@ -2797,9 +2805,6 @@ class SQLConf extends Serializable with Logging {

def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)

def initialShufflePartitionNum: Int =
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ import org.apache.spark.sql.internal.SQLConf
* the input partition ordering requirements are met.
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
private def defaultNumPreShufflePartitions: Int =
if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) {
conf.initialShufflePartitionNum
} else {
conf.numShufflePartitions
}

private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
Expand All @@ -57,7 +51,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
BroadcastExchangeExec(mode, child)
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(defaultNumPreShufflePartitions)
.getOrElse(conf.numShufflePartitions)
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
}

Expand Down Expand Up @@ -95,7 +89,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
// expected number of shuffle partitions. However, if it's smaller than
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
// expected number of shuffle partitions.
math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions)
math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
} else {
childrenNumPartitions.max
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,4 +1021,20 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") {
Seq(true, false).foreach { enableAQE =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
SQLConf.SHUFFLE_PARTITIONS.key -> "6",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length
if (enableAQE) {
assert(partitionsNum === 7)
} else {
assert(partitionsNum === 6)
}
}
}
}
}

0 comments on commit 1d1eacd

Please sign in to comment.