Skip to content

Commit

Permalink
[SQL] Improve column pruning.
Browse files Browse the repository at this point in the history
Fixed a bug that was preventing us from ever pruning beneath Joins.

## TPC-DS Q3
### Before:
```
Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2]
 Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150)
  Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, DoubleType)) AS PartialSum#79]
   Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43]
    HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight
     Exchange (HashPartitioning [ss_item_sk#36:30], 150)
      HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight
       Exchange (HashPartitioning [d_date_sk#6:0], 150)
        Filter (d_moy#14:8 = 12)
         HiveTableScan [d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33], (MetastoreRelation default, date_dim, Some(dt)), None
       Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150)
        HiveTableScan [ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56], (MetastoreRelation default, store_sales, None), None
     Exchange (HashPartitioning [i_item_sk#57:0], 150)
      Filter (i_manufact_id#70:13 = 436)
       HiveTableScan [i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78], (MetastoreRelation default, item, None), None
```
### After
```
Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS brand#161,SUM(PartialSum#239) AS sum_agg#162]
 Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150)
  Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, DoubleType)) AS PartialSum#239]
   Project [d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0]
    HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight
     Exchange (HashPartitioning [ss_item_sk#196:2], 150)
      Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3]
       HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight
        Exchange (HashPartitioning [d_date_sk#166:0], 150)
         Project [d_date_sk#166:0,d_year#172:1]
          Filter (d_moy#174:2 = 12)
           HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], (MetastoreRelation default, date_dim, Some(dt)), None
        Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150)
         HiveTableScan [ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation default, store_sales, None), None
     Exchange (HashPartitioning [i_item_sk#217:1], 150)
      Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2]
       Filter (i_manufact_id#230:3 = 436)
        HiveTableScan [i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], (MetastoreRelation default, item, None), None
```

Author: Michael Armbrust <michael@databricks.com>

Closes #729 from marmbrus/fixPruning and squashes the following commits:

5feeff0 [Michael Armbrust] Improve column pruning.
  • Loading branch information
marmbrus authored and pwendell committed May 14, 2014
1 parent 7bb9a52 commit 6ce0884
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("ConstantFolding", Once,
Batch("ConstantFolding", FixedPoint(100),
NullPropagation,
ConstantFolding,
BooleanSimplification,
SimplifyFilters,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin,
Expand All @@ -49,24 +49,27 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Eliminate attributes that are not needed to calculate the specified aggregates.
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
// Project away references that are not needed to calculate the required aggregates.
a.copy(child = Project(a.references.toSeq, child))

// Eliminate unneeded attributes from either side of a Join.
case Project(projectList, Join(left, right, joinType, condition)) =>
// Collect the list of off references required either above or to evaluate the condition.
val allReferences: Set[Attribute] =
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
/** Applies a projection when the child is producing unnecessary attributes */

/** Applies a projection only when the child is producing unnecessary attributes */
def prunedChild(c: LogicalPlan) =
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}

Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))

// Combine adjacent Projects.
case Project(projectList1, Project(projectList2, child)) =>
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
Expand All @@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
}).asInstanceOf[Seq[NamedExpression]]

Project(substitutedProjection, child)

// Eliminate no-op Projects
case Project(projectList, child) if(child.output == projectList) => child
}
}

Expand Down

0 comments on commit 6ce0884

Please sign in to comment.