diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ddd4b3755d629..82b0ccc01695e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,12 +40,43 @@ object Optimizer extends RuleExecutor[LogicalPlan] { SimplifyCasts, SimplifyCaseConversionExpressions) :: Batch("Filter Pushdown", FixedPoint(100), + UnionPushdown, CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning) :: Nil } +object UnionPushdown extends Rule[LogicalPlan] { + def fixProject(project: Project, child: LogicalPlan): Project = { + val pl = project.projectList.map(p => child.output.find { c => + c.name == p.name && c.qualifiers == p.qualifiers + }.getOrElse(p)) + Project(pl, child) + } + + def fixFilter(filter: Filter, child: LogicalPlan): Filter = { + val cond = filter.condition.transform { + case a: AttributeReference => + child.output.find { c => + c.name == a.name && c.qualifiers == a.qualifiers + }.getOrElse(a) + } + Filter(cond, child) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Push down filter into union + case f @ Filter(_, Union(left, right)) => + Union(fixFilter(f, left), fixFilter(f, right)) + + // Push down projection into union + case p @ Project(_, Union(left, right)) => + Union(fixProject(p, left), fixProject(p, right)) + } +} + + /** * Attempts to eliminate the reading of unneeded columns from the query plan using the following * transformations: