From 5feeff0746c534d3ba614946a34cfa973662736f Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 10 May 2014 22:02:15 -0700 Subject: [PATCH] Improve column pruning. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3037d45cc6e35..406ffd6801e98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = - Batch("ConstantFolding", Once, + Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, BooleanSimplification, SimplifyFilters, SimplifyCasts) :: - Batch("Filter Pushdown", Once, + Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin, @@ -49,17 +49,19 @@ object Optimizer extends RuleExecutor[LogicalPlan] { */ object ColumnPruning extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Eliminate attributes that are not needed to calculate the specified aggregates. case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => - // Project away references that are not needed to calculate the required aggregates. a.copy(child = Project(a.references.toSeq, child)) + // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => // Collect the list of off references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) - /** Applies a projection when the child is producing unnecessary attributes */ + + /** Applies a projection only when the child is producing unnecessary attributes */ def prunedChild(c: LogicalPlan) = - if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) { + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { Project(allReferences.filter(c.outputSet.contains).toSeq, c) } else { c @@ -67,6 +69,7 @@ object ColumnPruning extends Rule[LogicalPlan] { Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)). @@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] { }).asInstanceOf[Seq[NamedExpression]] Project(substitutedProjection, child) + + // Eliminate no-op Projects + case Project(projectList, child) if(child.output == projectList) => child } }