Skip to content

Commit

Permalink
Improve column pruning.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed May 11, 2014
1 parent 70bcdef commit 5feeff0
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("ConstantFolding", Once,
Batch("ConstantFolding", FixedPoint(100),
NullPropagation,
ConstantFolding,
BooleanSimplification,
SimplifyFilters,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin,
Expand All @@ -49,24 +49,27 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Eliminate attributes that are not needed to calculate the specified aggregates.
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
// Project away references that are not needed to calculate the required aggregates.
a.copy(child = Project(a.references.toSeq, child))

// Eliminate unneeded attributes from either side of a Join.
case Project(projectList, Join(left, right, joinType, condition)) =>
// Collect the list of off references required either above or to evaluate the condition.
val allReferences: Set[Attribute] =
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
/** Applies a projection when the child is producing unnecessary attributes */

/** Applies a projection only when the child is producing unnecessary attributes */
def prunedChild(c: LogicalPlan) =
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}

Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))

// Combine adjacent Projects.
case Project(projectList1, Project(projectList2, child)) =>
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
Expand All @@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
}).asInstanceOf[Seq[NamedExpression]]

Project(substitutedProjection, child)

// Eliminate no-op Projects
case Project(projectList, child) if(child.output == projectList) => child
}
}

Expand Down

0 comments on commit 5feeff0

Please sign in to comment.