Skip to content

Commit

Permalink
[SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Change `CombineLimits` name to `EliminateLimits` and add check if `Limit` child max row <= limit.

### Why are the changes needed?

In Add-hoc scene, we always add limit for the query if user have no special limit value, but not all limit is nesessary.

A general negative example is
```
select count(*) from t limit 100000;
```

It will be great if we can eliminate limit at Spark side.

Also, we make a benchmark for this case
```
runBenchmark("Sort and Limit") {
  val N = 100000
  val benchmark = new Benchmark("benchmark sort and limit", N)

  benchmark.addCase("TakeOrderedAndProject", 3) { _ =>
    spark.range(N).toDF("c").repartition(200).sort("c").take(200000)
  }

  benchmark.addCase("Sort And Limit", 3) { _ =>
    withSQLConf("spark.sql.execution.topKSortFallbackThreshold" -> "-1") {
      spark.range(N).toDF("c").repartition(200).sort("c").take(200000)
    }
  }

  benchmark.addCase("Sort", 3) { _ =>
    spark.range(N).toDF("c").repartition(200).sort("c").collect()
  }
  benchmark.run()
}
```

and the result is
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.15.6
Intel(R) Core(TM) i5-5257U CPU  2.70GHz
benchmark sort and limit:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProject                              1833           2259         382          0.1       18327.1       1.0X
Sort And Limit                                     1417           1658         285          0.1       14167.5       1.3X
Sort                                               1324           1484         225          0.1       13238.3       1.4X
```

It shows that it makes sense to replace `TakeOrderedAndProjectExec` with `Sort + Project`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add test.

Closes apache#30368 from ulysses-you/SPARK-33442.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ulysses-you authored and cloud-fan committed Nov 19, 2020
1 parent a03c540 commit 21b1350
Show file tree
Hide file tree
Showing 36 changed files with 1,113 additions and 1,113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeWindowFunctions,
CollapseWindow,
CombineFilters,
CombineLimits,
EliminateLimits,
CombineUnions,
// Constant folding and strength reduction
TransposeWindow,
Expand Down Expand Up @@ -1451,11 +1451,20 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}

/**
* Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
* This rule optimizes Limit operators by:
* 1. Eliminate [[Limit]] operators if it's child max row <= limit.
* 2. Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
*/
object CombineLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
object EliminateLimits extends Rule[LogicalPlan] {
private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = {
limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] }
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case Limit(l, child) if canEliminate(l, child) =>
child

case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
GlobalLimit(Least(Seq(ne, le)), grandChild)
case LocalLimit(le, LocalLimit(ne, grandChild)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class CombiningLimitsSuite extends PlanTest {
Batch("Column Pruning", FixedPoint(100),
ColumnPruning,
RemoveNoopOperators) ::
Batch("Combine Limit", FixedPoint(10),
CombineLimits) ::
Batch("Eliminate Limit", FixedPoint(10),
EliminateLimits) ::
Batch("Constant Folding", FixedPoint(10),
NullPropagation,
ConstantFolding,
Expand Down Expand Up @@ -90,4 +90,31 @@ class CombiningLimitsSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("SPARK-33442: Change Combine Limit to Eliminate limit using max row") {
// test child max row <= limit.
val query1 = testRelation.select().groupBy()(count(1)).limit(1).analyze
val optimized1 = Optimize.execute(query1)
val expected1 = testRelation.select().groupBy()(count(1)).analyze
comparePlans(optimized1, expected1)

// test child max row > limit.
val query2 = testRelation.select().groupBy()(count(1)).limit(0).analyze
val optimized2 = Optimize.execute(query2)
comparePlans(optimized2, query2)

// test child max row is none
val query3 = testRelation.select(Symbol("a")).limit(1).analyze
val optimized3 = Optimize.execute(query3)
comparePlans(optimized3, query3)

// test sort after limit
val query4 = testRelation.select().groupBy()(count(1))
.orderBy(count(1).asc).limit(1).analyze
val optimized4 = Optimize.execute(query4)
// the top project has been removed, so we need optimize expected too
val expected4 = Optimize.execute(
testRelation.select().groupBy()(count(1)).orderBy(count(1).asc).analyze)
comparePlans(optimized4, expected4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Limit pushdown", FixedPoint(100),
LimitPushDown,
CombineLimits,
EliminateLimits,
ConstantFolding,
BooleanSimplification) :: Nil
}
Expand Down Expand Up @@ -74,7 +74,7 @@ class LimitPushdownSuite extends PlanTest {
Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1)).limit(2)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(2, Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1))).analyze
Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1)).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
== Physical Plan ==
TakeOrderedAndProject (44)
* Sort (44)
+- * HashAggregate (43)
+- Exchange (42)
+- * HashAggregate (41)
Expand Down Expand Up @@ -244,7 +244,7 @@ Functions [3]: [sum(UnscaledValue(cs_ext_ship_cost#6)), sum(UnscaledValue(cs_net
Aggregate Attributes [3]: [sum(UnscaledValue(cs_ext_ship_cost#6))#23, sum(UnscaledValue(cs_net_profit#7))#24, count(cs_order_number#5)#27]
Results [3]: [count(cs_order_number#5)#27 AS order count #30, MakeDecimal(sum(UnscaledValue(cs_ext_ship_cost#6))#23,17,2) AS total shipping cost #31, MakeDecimal(sum(UnscaledValue(cs_net_profit#7))#24,17,2) AS total net profit #32]

(44) TakeOrderedAndProject
(44) Sort [codegen id : 12]
Input [3]: [order count #30, total shipping cost #31, total net profit #32]
Arguments: 100, [order count #30 ASC NULLS FIRST], [order count #30, total shipping cost #31, total net profit #32]
Arguments: [order count #30 ASC NULLS FIRST], true, 0

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
TakeOrderedAndProject [order count ,total shipping cost ,total net profit ]
WholeStageCodegen (12)
WholeStageCodegen (12)
Sort [order count ]
HashAggregate [sum,sum,count] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),order count ,total shipping cost ,total net profit ,sum,sum,count]
InputAdapter
Exchange #1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
== Physical Plan ==
TakeOrderedAndProject (41)
* Sort (41)
+- * HashAggregate (40)
+- Exchange (39)
+- * HashAggregate (38)
Expand Down Expand Up @@ -229,7 +229,7 @@ Functions [3]: [sum(UnscaledValue(cs_ext_ship_cost#6)), sum(UnscaledValue(cs_net
Aggregate Attributes [3]: [sum(UnscaledValue(cs_ext_ship_cost#6))#22, sum(UnscaledValue(cs_net_profit#7))#23, count(cs_order_number#5)#27]
Results [3]: [count(cs_order_number#5)#27 AS order count #30, MakeDecimal(sum(UnscaledValue(cs_ext_ship_cost#6))#22,17,2) AS total shipping cost #31, MakeDecimal(sum(UnscaledValue(cs_net_profit#7))#23,17,2) AS total net profit #32]

(41) TakeOrderedAndProject
(41) Sort [codegen id : 8]
Input [3]: [order count #30, total shipping cost #31, total net profit #32]
Arguments: 100, [order count #30 ASC NULLS FIRST], [order count #30, total shipping cost #31, total net profit #32]
Arguments: [order count #30 ASC NULLS FIRST], true, 0

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
TakeOrderedAndProject [order count ,total shipping cost ,total net profit ]
WholeStageCodegen (8)
WholeStageCodegen (8)
Sort [order count ]
HashAggregate [sum,sum,count] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),order count ,total shipping cost ,total net profit ,sum,sum,count]
InputAdapter
Exchange #1
Expand Down
Loading

0 comments on commit 21b1350

Please sign in to comment.