Skip to content

Commit 65d5664

Browse files
committed
address comments
1 parent ca6ccb2 commit 65d5664

File tree

1 file changed

+81
-54
lines changed

1 file changed

+81
-54
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala

+81-54
Original file line numberDiff line numberDiff line change
@@ -19,72 +19,36 @@ package org.apache.spark.sql.execution.datasources.v2
1919

2020
import scala.collection.mutable
2121

22-
import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression}
22+
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
2323
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2424
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
2525
import org.apache.spark.sql.catalyst.rules.Rule
2626
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
2727
import org.apache.spark.sql.sources
28-
import org.apache.spark.sql.sources.v2.reader.{SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
28+
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
2929

3030
object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
3131
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
3232
// PhysicalOperation guarantees that filters are deterministic; no need to check
3333
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
3434
val newReader = relation.createFreshReader
35-
var newRelation = relation.copy(optimizedReader = Some(newReader))
35+
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
36+
// `postScanFilters` need to be evaluated after the scan.
37+
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
38+
val (pushedFilters, postScanFilters) = pushFilters(newReader, filters)
39+
val newOutput = pruneColumns(newReader, relation, project ++ postScanFilters)
40+
logInfo(
41+
s"""
42+
|Pushing operators to ${relation.source.getClass}
43+
|Pushed Filters: ${pushedFilters.mkString(", ")}
44+
|Post-Scan Filters: ${postScanFilters.mkString(",")}
45+
|Output: ${newOutput.mkString(", ")}
46+
""".stripMargin)
3647

37-
val postScanFilters: Seq[Expression] = newReader match {
38-
case r: SupportsPushDownCatalystFilters =>
39-
val postScanFilters = r.pushCatalystFilters(filters.toArray)
40-
newRelation.copy(pushedFilters = r.pushedCatalystFilters())
41-
postScanFilters
42-
43-
case r: SupportsPushDownFilters =>
44-
// A map from translated data source filters to original catalyst filter expressions.
45-
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
46-
// Catalyst filter expression that can't be translated to data source filters.
47-
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
48-
49-
for (filterExpr <- filters) {
50-
val translated = DataSourceStrategy.translateFilter(filterExpr)
51-
if (translated.isDefined) {
52-
translatedFilterToExpr(translated.get) = filterExpr
53-
} else {
54-
untranslatableExprs += filterExpr
55-
}
56-
}
57-
58-
// Data source filters that need to be evaluated again after scanning. which means
59-
// the data source cannot guarantee the rows returned can pass these filters.
60-
// As a result we must return it so Spark can plan an extra filter operator.
61-
val postScanFilters =
62-
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
63-
// The filters which are marked as pushed to this data source
64-
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
65-
newRelation = newRelation.copy(pushedFilters = pushedFilters)
66-
untranslatableExprs ++ postScanFilters
67-
68-
case _ => filters
69-
}
70-
71-
newReader match {
72-
case r: SupportsPushDownRequiredColumns =>
73-
val requiredColumns = AttributeSet(
74-
project.flatMap(_.references) ++ postScanFilters.flatMap(_.references))
75-
val neededOutput = relation.output.filter(requiredColumns.contains)
76-
if (neededOutput != relation.output) {
77-
r.pruneColumns(neededOutput.toStructType)
78-
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
79-
val newOutput = r.readSchema().toAttributes.map {
80-
// We have to keep the attribute id during transformation.
81-
a => a.withExprId(nameToAttr(a.name).exprId)
82-
}
83-
newRelation = newRelation.copy(output = newOutput)
84-
}
85-
86-
case _ =>
87-
}
48+
val newRelation = relation.copy(
49+
output = newOutput,
50+
pushedFilters = pushedFilters,
51+
optimizedReader = Some(newReader))
8852

8953
val filterCondition = postScanFilters.reduceLeftOption(And)
9054
val withFilter = filterCondition.map(Filter(_, newRelation)).getOrElse(newRelation)
@@ -96,4 +60,67 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
9660

9761
case other => other.mapChildren(apply)
9862
}
63+
64+
/**
65+
* Pushes down filters to the data source reader, returns pushed filter and post-scan filters.
66+
*/
67+
private def pushFilters(
68+
reader: DataSourceReader,
69+
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
70+
reader match {
71+
case r: SupportsPushDownCatalystFilters =>
72+
val postScanFilters = r.pushCatalystFilters(filters.toArray)
73+
val pushedFilters = r.pushedCatalystFilters()
74+
(pushedFilters, postScanFilters)
75+
76+
case r: SupportsPushDownFilters =>
77+
// A map from translated data source filters to original catalyst filter expressions.
78+
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
79+
// Catalyst filter expression that can't be translated to data source filters.
80+
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
81+
82+
for (filterExpr <- filters) {
83+
val translated = DataSourceStrategy.translateFilter(filterExpr)
84+
if (translated.isDefined) {
85+
translatedFilterToExpr(translated.get) = filterExpr
86+
} else {
87+
untranslatableExprs += filterExpr
88+
}
89+
}
90+
91+
// Data source filters that need to be evaluated again after scanning. which means
92+
// the data source cannot guarantee the rows returned can pass these filters.
93+
// As a result we must return it so Spark can plan an extra filter operator.
94+
val postScanFilters = r.pushFilters(translatedFilterToExpr.keys.toArray)
95+
.map(translatedFilterToExpr)
96+
// The filters which are marked as pushed to this data source
97+
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
98+
(pushedFilters, untranslatableExprs ++ postScanFilters)
99+
100+
case _ => (Nil, filters)
101+
}
102+
}
103+
104+
private def pruneColumns(
105+
reader: DataSourceReader,
106+
relation: DataSourceV2Relation,
107+
exprs: Seq[Expression]): Seq[AttributeReference] = {
108+
reader match {
109+
case r: SupportsPushDownRequiredColumns =>
110+
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
111+
val neededOutput = relation.output.filter(requiredColumns.contains)
112+
if (neededOutput != relation.output) {
113+
r.pruneColumns(neededOutput.toStructType)
114+
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
115+
r.readSchema().toAttributes.map {
116+
// We have to keep the attribute id during transformation.
117+
a => a.withExprId(nameToAttr(a.name).exprId)
118+
}
119+
} else {
120+
relation.output
121+
}
122+
123+
case _ => relation.output
124+
}
125+
}
99126
}

0 commit comments

Comments
 (0)