-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32056][SQL] Coalesce partitions for repartition by expressions when AQE is enabled #28900
Changes from 1 commit
0a9223f
43c4726
8e39ed7
4b9b0e8
7ceaebc
df6a035
1ae1a87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2991,17 +2991,9 @@ class Dataset[T] private[sql]( | |
Repartition(numPartitions, shuffle = true, logicalPlan) | ||
} | ||
|
||
/** | ||
* Returns a new Dataset partitioned by the given partitioning expressions into | ||
* `numPartitions`. The resulting Dataset is hash partitioned. | ||
* | ||
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). | ||
* | ||
* @group typedrel | ||
* @since 2.0.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { | ||
private def repartitionByExpression( | ||
numPartitions: Option[Int], | ||
partitionExprs: Column*): Dataset[T] = { | ||
// The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. | ||
// However, we don't want to complicate the semantics of this API method. | ||
// Instead, let's give users a friendly error message, pointing them to the new method. | ||
|
@@ -3015,6 +3007,20 @@ class Dataset[T] private[sql]( | |
} | ||
} | ||
|
||
/** | ||
* Returns a new Dataset partitioned by the given partitioning expressions into | ||
* `numPartitions`. The resulting Dataset is hash partitioned. | ||
* | ||
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). | ||
* | ||
* @group typedrel | ||
* @since 2.0.0 | ||
*/ | ||
@scala.annotation.varargs | ||
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { | ||
repartitionByExpression(Some(numPartitions), partitionExprs: _*) | ||
} | ||
|
||
/** | ||
* Returns a new Dataset partitioned by the given partitioning expressions, using | ||
* `spark.sql.shuffle.partitions` as number of partitions. | ||
|
@@ -3027,7 +3033,20 @@ class Dataset[T] private[sql]( | |
*/ | ||
@scala.annotation.varargs | ||
def repartition(partitionExprs: Column*): Dataset[T] = { | ||
repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) | ||
repartitionByExpression(None, partitionExprs: _*) | ||
} | ||
|
||
private def repartitionByRange( | ||
numPartitions: Option[Int], | ||
partitionExprs: Column*): Dataset[T] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") | ||
val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { | ||
case expr: SortOrder => expr | ||
case expr: Expression => SortOrder(expr, Ascending) | ||
}) | ||
withTypedPlan { | ||
RepartitionByExpression(sortOrder, logicalPlan, numPartitions) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -3049,14 +3068,7 @@ class Dataset[T] private[sql]( | |
*/ | ||
@scala.annotation.varargs | ||
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { | ||
require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") | ||
val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { | ||
case expr: SortOrder => expr | ||
case expr: Expression => SortOrder(expr, Ascending) | ||
}) | ||
withTypedPlan { | ||
RepartitionByExpression(sortOrder, logicalPlan, numPartitions) | ||
} | ||
repartitionByRange(Some(numPartitions), partitionExprs: _*) | ||
} | ||
|
||
/** | ||
|
@@ -3078,7 +3090,7 @@ class Dataset[T] private[sql]( | |
*/ | ||
@scala.annotation.varargs | ||
def repartitionByRange(partitionExprs: Column*): Dataset[T] = { | ||
repartitionByRange(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) | ||
repartitionByRange(None, partitionExprs: _*) | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -685,8 +685,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
case r: logical.Range => | ||
execution.RangeExec(r) :: Nil | ||
case r: logical.RepartitionByExpression => | ||
val canChangeNumParts = r.optNumPartitions.isEmpty | ||
exchange.ShuffleExchangeExec( | ||
r.partitioning, planLater(r.child), canChangeNumPartitions = false) :: Nil | ||
r.partitioning, planLater(r.child), canChangeNumPartitions = canChangeNumParts) :: Nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now we have a variable name, we can just write |
||
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil | ||
case r: LogicalRDD => | ||
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} | |
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} | ||
import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} | ||
import org.apache.spark.sql.execution.command.DataWritingCommandExec | ||
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} | ||
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} | ||
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} | ||
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate | ||
import org.apache.spark.sql.functions._ | ||
|
@@ -1026,15 +1026,48 @@ class AdaptiveQueryExecSuite | |
Seq(true, false).foreach { enableAQE => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can merge this test case to your two newly added test cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i.e. one test to test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, merged them. |
||
withSQLConf( | ||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, | ||
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", | ||
SQLConf.SHUFFLE_PARTITIONS.key -> "6", | ||
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { | ||
val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length | ||
val df = spark.range(10).repartition($"id") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. |
||
val partitionsNum = df.rdd.collectPartitions().length | ||
if (enableAQE) { | ||
assert(partitionsNum === 7) | ||
assert(partitionsNum < 6) | ||
|
||
val plan = df.queryExecution.executedPlan | ||
assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) | ||
val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { | ||
case s: ShuffleExchangeExec => s | ||
} | ||
assert(shuffle.size == 1) | ||
assert(shuffle(0).outputPartitioning.numPartitions == 7) | ||
} else { | ||
assert(partitionsNum === 6) | ||
} | ||
} | ||
} | ||
} | ||
|
||
test("SPARK-32056 coalesce partitions for repartition by expressions when AQE is enabled") { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Seq(true, false).foreach { enableAQE => | ||
withSQLConf( | ||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, | ||
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", | ||
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50", | ||
SQLConf.SHUFFLE_PARTITIONS.key -> "10") { | ||
val partitionsNum1 = (1 to 10).toDF.repartition($"value") | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.rdd.collectPartitions().length | ||
|
||
val partitionsNum2 = (1 to 10).toDF.repartitionByRange($"value".asc) | ||
.rdd.collectPartitions().length | ||
if (enableAQE) { | ||
assert(partitionsNum1 < 10) | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert(partitionsNum2 < 10) | ||
} else { | ||
assert(partitionsNum1 === 10) | ||
assert(partitionsNum2 === 10) | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for internal method, we don't need to use var-length parameter list.