Skip to content

Commit

Permalink
[SPARK-7215] made coalesce and repartition a part of the query plan
Browse files Browse the repository at this point in the history
Coalesce and repartition now show up as part of the query plan, rather than resulting in a new `DataFrame`.

cc rxin

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#5762 from brkyvz/df-repartition and squashes the following commits:

b1e76dd [Burak Yavuz] added documentation on repartitions
5807e35 [Burak Yavuz] renamed coalescepartitions
fa4509f [Burak Yavuz] rename coalesce
2c349b5 [Burak Yavuz] address comments
f2e6af1 [Burak Yavuz] add ticks
686c90b [Burak Yavuz] made coalesce and repartition a part of the query plan
  • Loading branch information
brkyvz authored and jeanlyn committed May 28, 2015
1 parent 5f643b8 commit 22d3abc
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,17 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* Return a new RDD that has exactly `numPartitions` partitions. Differs from
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
* of the output requires some specific ordering or distribution of the data.
*/
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* A relation with one row. This is used in "SELECT ..." without a from clause.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@ abstract class RedistributeData extends UnaryNode {
case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
extends RedistributeData

case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
/**
* This method repartitions data using [[Expression]]s, and receives information about the
* number of partitions during execution. Used when a specific ordering or distribution is
* expected by the consumer of the query result. Use [[Repartition]] for RDD-like
* `coalesce` and `repartition`.
*/
case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
extends RedistributeData
9 changes: 2 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -961,9 +961,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def repartition(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
schema, needsConversion = false)
Repartition(numPartitions, shuffle = true, logicalPlan)
}

/**
Expand All @@ -974,10 +972,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def coalesce(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.coalesce(numPartitions),
schema,
needsConversion = false)
Repartition(numPartitions, shuffle = false, logicalPlan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil

case logical.Repartition(numPartitions, shuffle, child) =>
execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
Expand Down Expand Up @@ -345,7 +346,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
case logical.RepartitionByExpression(expressions, child) =>
execution.Exchange(
HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
}
}

/**
* :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
*/
@DeveloperApi
case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def execute(): RDD[Row] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
}


/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case (None, Some(perPartitionOrdering), None, None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
case (None, None, Some(partitionExprs), None) =>
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)
case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, Some(clusterExprs)) =>
Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, None) => withHaving
case _ => sys.error("Unsupported set of ordering / distribution clauses.")
}
Expand Down

0 comments on commit 22d3abc

Please sign in to comment.