Skip to content

Commit

Permalink
filter out equal to self, generate all possible predicates
Browse files Browse the repository at this point in the history
Why the plans of TPCDSV1_4_PlanStabilitySuite changed?
For q5, the condition of EliminateOuterJoin only fulfilled after InferFiltersFromConstraints, so the added InferFiltersFromConstraints pushes more predicates down
For q8, the added predicates of filter apache#19 is inferred by InferFiltersFromConstraints according to EqualNullSafe; about the added predicates of scan apache#13 and filter apache#15, after the predicate isnotnull(substr(ca_zip#9, 1, 5)) is pushed to filter apache#15 by current rules, then the laster InferFiltersFromConstraints pushes isnotnull(ca_zip#9) to filter apache#15 and scan apache#13.
For q14a, the added predicates of filter apache#18 is inferred by InferFiltersFromConstraints according to EqualNullSafe.
For q14b, it's same with q14a.
For q85, PushExtraPredicateThroughJoin pushes predicates from join apache#21, then the added InferFiltersFromConstraints inferred the predicates from join apache#27
For q93, it's same as q5

Why the plans of TPCDSV2_7_PlanStabilityWithStatsSuite, TPCDSV1_4_PlanStabilityWithStatsSuite and TPCDSV2_7_PlanStabilitySuite changed?
For TPCDSV2_7_PlanStabilityWithStatsSuite and TPCDSV2_7_PlanStabilitySuite, it's same as TPCDSV1_4_PlanStabilitySuite. For TPCDSV1_4_PlanStabilityWithStatsSuite, most plans are changed with same reason as TPCDSV1_4_PlanStabilitySuite, except q85, whose plan is changed further by rule CostBasedJoinReorder is triggered.
  • Loading branch information
copperybean committed Sep 14, 2023
1 parent d648816 commit d5dda74
Show file tree
Hide file tree
Showing 41 changed files with 1,078 additions and 928 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
val operatorOptimizationBatch: Seq[Batch] = {
Batch("Operator Optimization before Inferring Filters", fixedPoint,
operatorOptimizationRuleSet: _*) ::
Batch("Infer Filters", Once,
Batch("Infer Filters", fixedPoint,
InferFiltersFromGenerate,
InferFiltersFromConstraints) ::
Batch("Operator Optimization after Inferring Filters", fixedPoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,33 +62,60 @@ trait ConstraintHelper {
*/
def inferAdditionalConstraints(constraints: ExpressionSet): ExpressionSet = {
var inferredConstraints = ExpressionSet()

var lastInferredCount = 0
var predicates = ExpressionSet()
var equalNullSafeCandidates = constraints.filter(_.deterministic)
// IsNotNull should be constructed by `constructIsNotNullConstraints`.
val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull])
predicates.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
// Also remove EqualNullSafe with the same l and r to avoid Once strategy's idempotence
// is broken. l = r and l <=> r can infer l <=> l and r <=> r which is useless.
val candidateConstraints = predicates - eq - EqualNullSafe(l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
case eq @ EqualTo(l @ Cast(_: Attribute, _, _, _), r: Attribute) =>
inferredConstraints ++= replaceConstraints(predicates - eq - EqualNullSafe(l, r), r, l)
case eq @ EqualTo(l: Attribute, r @ Cast(_: Attribute, _, _, _)) =>
inferredConstraints ++= replaceConstraints(predicates - eq - EqualNullSafe(l, r), l, r)
case _ => // No inference
}
var equalToCandidates = equalNullSafeCandidates.filterNot(_.isInstanceOf[IsNotNull])

constraints.foreach {
case eq @ EqualNullSafe(l: Attribute, r: Attribute) =>
val candidateConstraints = constraints - eq
inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
case eq @ EqualNullSafe(l @ Cast(_: Attribute, _, _, _), r: Attribute) =>
inferredConstraints ++= replaceConstraints(constraints - eq, r, l)
case eq @ EqualNullSafe(l: Attribute, r @ Cast(_: Attribute, _, _, _)) =>
inferredConstraints ++= replaceConstraints(constraints - eq, l, r)
case _ => // No inference
val refineEqualToCandidates = (curExp: Expression, l: Expression, r: Expression) => {
// Also remove EqualNullSafe with the same l and r,
// because l = r and l <=> r can infer l <=> l and r <=> r which are useless.
equalToCandidates - curExp - EqualNullSafe(l, r) - EqualNullSafe(r, l)
}
val refineEqualNullSafeCandidates = (curExp: Expression, l: Expression, r: Expression) => {
// Also remove EqualNullSafe and EqualTo with the same l and r,
// because l <=> r and l <=> r can infer l <=> l and r <=> r which are useless.
// l <=> r and l = r can infer l = l and r = r which is useless also.
equalNullSafeCandidates - curExp - EqualNullSafe(l, r) - EqualNullSafe(r, l) -
EqualTo(l, r) - EqualTo(r, l)
}

// For constrains such as [a = b, b = c, c = d], predicate a = d can not be inferred by just
// one iterator, so a while loop is needed to infer all possible predicates.
do {
lastInferredCount = inferredConstraints.size

predicates = constraints.union(inferredConstraints)
predicates.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
val candidateConstraints = refineEqualToCandidates(eq, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
case eq @ EqualTo(l @ Cast(_: Attribute, _, _, _), r: Attribute) =>
inferredConstraints ++= replaceConstraints(refineEqualToCandidates(eq, l, r), r, l)
case eq @ EqualTo(l: Attribute, r @ Cast(_: Attribute, _, _, _)) =>
inferredConstraints ++= replaceConstraints(refineEqualToCandidates(eq, l, r), l, r)
case _ => // No inference
}

predicates.foreach {
case eq @ EqualNullSafe(l: Attribute, r: Attribute) =>
val candidateConstraints = refineEqualNullSafeCandidates(eq, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
case eq @ EqualNullSafe(l @ Cast(_: Attribute, _, _, _), r: Attribute) =>
inferredConstraints ++= replaceConstraints(refineEqualNullSafeCandidates(eq, l, r), r, l)
case eq @ EqualNullSafe(l: Attribute, r @ Cast(_: Attribute, _, _, _)) =>
inferredConstraints ++= replaceConstraints(refineEqualNullSafeCandidates(eq, l, r), l, r)
case _ => // No inference
}

equalToCandidates = inferredConstraints.filterNot(_.isInstanceOf[IsNotNull])
equalNullSafeCandidates = inferredConstraints
} while (inferredConstraints.size > lastInferredCount)

inferredConstraints -- constraints
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class SetOperationSuite extends PlanTest {

}

test("SPARK-44812: push filters through join generated from intersect") {
test("SPARK-44812: infer filters from join generated from intersect") {
val a = LocalRelation.fromExternalRows(Seq($"a".int), Seq(Row(1), Row(2), Row(3)))
val b = LocalRelation.fromExternalRows(Seq($"b".int), Seq(Row(2), Row(3), Row(4)))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
CREATE TABLE i1_tbl (
t1a integer,
t1b integer,
t1c integer,
t1d integer,
t1e integer,
t1f integer
) USING parquet
-- !query analysis
CreateDataSourceTableCommand `spark_catalog`.`default`.`i1_tbl`, false


-- !query
CREATE TABLE i2_tbl (
t2a integer,
t2b integer,
t2c integer,
t2d integer,
t2e integer,
t2f integer
) USING parquet
-- !query analysis
CreateDataSourceTableCommand `spark_catalog`.`default`.`i2_tbl`, false


-- !query
CREATE TABLE i3_tbl (
t3a integer,
t3b integer,
t3c integer,
t3d integer,
t3e integer,
t3f integer
) USING parquet
-- !query analysis
CreateDataSourceTableCommand `spark_catalog`.`default`.`i3_tbl`, false


-- !query
select i3_tbl.* from (
select * from i1_tbl left semi join (select * from i2_tbl where t2a > 1000) tmp2 on i1_tbl.t1a = tmp2.t2a
) tmp1 join i3_tbl on tmp1.t1a = i3_tbl.t3a
-- !query analysis
Project [t3a#x, t3b#x, t3c#x, t3d#x, t3e#x, t3f#x]
+- Join Inner, (t1a#x = t3a#x)
:- SubqueryAlias tmp1
: +- Project [t1a#x, t1b#x, t1c#x, t1d#x, t1e#x, t1f#x]
: +- Join LeftSemi, (t1a#x = t2a#x)
: :- SubqueryAlias spark_catalog.default.i1_tbl
: : +- Relation spark_catalog.default.i1_tbl[t1a#x,t1b#x,t1c#x,t1d#x,t1e#x,t1f#x] parquet
: +- SubqueryAlias tmp2
: +- Project [t2a#x, t2b#x, t2c#x, t2d#x, t2e#x, t2f#x]
: +- Filter (t2a#x > 1000)
: +- SubqueryAlias spark_catalog.default.i2_tbl
: +- Relation spark_catalog.default.i2_tbl[t2a#x,t2b#x,t2c#x,t2d#x,t2e#x,t2f#x] parquet
+- SubqueryAlias spark_catalog.default.i3_tbl
+- Relation spark_catalog.default.i3_tbl[t3a#x,t3b#x,t3c#x,t3d#x,t3e#x,t3f#x] parquet


-- !query
select * from i1_tbl join i2_tbl on t1a = t2a and t2a = t1b and t1b = t2b and t2b = t1c and t1c = t2c and t2c = t1d and t1d = t2d and t2d = t1e and t1e = t2e and t2e = t1f
-- !query analysis
Project [t1a#x, t1b#x, t1c#x, t1d#x, t1e#x, t1f#x, t2a#x, t2b#x, t2c#x, t2d#x, t2e#x, t2f#x]
+- Join Inner, (((((t1a#x = t2a#x) AND (t2a#x = t1b#x)) AND (t1b#x = t2b#x)) AND ((t2b#x = t1c#x) AND (t1c#x = t2c#x))) AND ((((t2c#x = t1d#x) AND (t1d#x = t2d#x)) AND (t2d#x = t1e#x)) AND ((t1e#x = t2e#x) AND (t2e#x = t1f#x))))
:- SubqueryAlias spark_catalog.default.i1_tbl
: +- Relation spark_catalog.default.i1_tbl[t1a#x,t1b#x,t1c#x,t1d#x,t1e#x,t1f#x] parquet
+- SubqueryAlias spark_catalog.default.i2_tbl
+- Relation spark_catalog.default.i2_tbl[t2a#x,t2b#x,t2c#x,t2d#x,t2e#x,t2f#x] parquet


-- !query
DROP TABLE i1_tbl
-- !query analysis
DropTable false, false
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.i1_tbl


-- !query
DROP TABLE i2_tbl
-- !query analysis
DropTable false, false
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.i2_tbl


-- !query
DROP TABLE i3_tbl
-- !query analysis
DropTable false, false
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.i3_tbl
38 changes: 38 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/infer-filters.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE i1_tbl (
t1a integer,
t1b integer,
t1c integer,
t1d integer,
t1e integer,
t1f integer
) USING parquet;

CREATE TABLE i2_tbl (
t2a integer,
t2b integer,
t2c integer,
t2d integer,
t2e integer,
t2f integer
) USING parquet;

CREATE TABLE i3_tbl (
t3a integer,
t3b integer,
t3c integer,
t3d integer,
t3e integer,
t3f integer
) USING parquet;


select i3_tbl.* from (
select * from i1_tbl left semi join (select * from i2_tbl where t2a > 1000) tmp2 on i1_tbl.t1a = tmp2.t2a
) tmp1 join i3_tbl on tmp1.t1a = i3_tbl.t3a;


select * from i1_tbl join i2_tbl on t1a = t2a and t2a = t1b and t1b = t2b and t2b = t1c and t1c = t2c and t2c = t1d and t1d = t2d and t2d = t1e and t1e = t2e and t2e = t1f;

DROP TABLE i1_tbl;
DROP TABLE i2_tbl;
DROP TABLE i3_tbl;
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
CREATE TABLE i1_tbl (
t1a integer,
t1b integer,
t1c integer,
t1d integer,
t1e integer,
t1f integer
) USING parquet
-- !query schema
struct<>
-- !query output



-- !query
CREATE TABLE i2_tbl (
t2a integer,
t2b integer,
t2c integer,
t2d integer,
t2e integer,
t2f integer
) USING parquet
-- !query schema
struct<>
-- !query output



-- !query
CREATE TABLE i3_tbl (
t3a integer,
t3b integer,
t3c integer,
t3d integer,
t3e integer,
t3f integer
) USING parquet
-- !query schema
struct<>
-- !query output



-- !query
select i3_tbl.* from (
select * from i1_tbl left semi join (select * from i2_tbl where t2a > 1000) tmp2 on i1_tbl.t1a = tmp2.t2a
) tmp1 join i3_tbl on tmp1.t1a = i3_tbl.t3a
-- !query schema
struct<t3a:int,t3b:int,t3c:int,t3d:int,t3e:int,t3f:int>
-- !query output



-- !query
select * from i1_tbl join i2_tbl on t1a = t2a and t2a = t1b and t1b = t2b and t2b = t1c and t1c = t2c and t2c = t1d and t1d = t2d and t2d = t1e and t1e = t2e and t2e = t1f
-- !query schema
struct<t1a:int,t1b:int,t1c:int,t1d:int,t1e:int,t1f:int,t2a:int,t2b:int,t2c:int,t2d:int,t2e:int,t2f:int>
-- !query output



-- !query
DROP TABLE i1_tbl
-- !query schema
struct<>
-- !query output



-- !query
DROP TABLE i2_tbl
-- !query schema
struct<>
-- !query output



-- !query
DROP TABLE i3_tbl
-- !query schema
struct<>
-- !query output

Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,15 @@ Input [3]: [cs_item_sk#18, cs_sold_date_sk#19, d_date_sk#20]
Output [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]
Batched: true
Location [not included in comparison]/{warehouse_dir}/item]
PushedFilters: [IsNotNull(i_item_sk)]
PushedFilters: [IsNotNull(i_item_sk), IsNotNull(i_brand_id), IsNotNull(i_class_id), IsNotNull(i_category_id)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(25) ColumnarToRow [codegen id : 5]
Input [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]

(26) Filter [codegen id : 5]
Input [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]
Condition : isnotnull(i_item_sk#21)
Condition : (((isnotnull(i_item_sk#21) AND isnotnull(i_brand_id#22)) AND isnotnull(i_class_id#23)) AND isnotnull(i_category_id#24))

(27) BroadcastExchange
Input [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ TakeOrderedAndProject [channel,i_brand_id,i_class_id,i_category_id,sum(sales),su
InputAdapter
BroadcastExchange #12
WholeStageCodegen (5)
Filter [i_item_sk]
Filter [i_item_sk,i_brand_id,i_class_id,i_category_id]
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.item [i_item_sk,i_brand_id,i_class_id,i_category_id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ Condition : isnotnull(cs_item_sk#17)
Output [4]: [i_item_sk#19, i_brand_id#20, i_class_id#21, i_category_id#22]
Batched: true
Location [not included in comparison]/{warehouse_dir}/item]
PushedFilters: [IsNotNull(i_item_sk)]
PushedFilters: [IsNotNull(i_item_sk), IsNotNull(i_brand_id), IsNotNull(i_class_id), IsNotNull(i_category_id)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(17) ColumnarToRow [codegen id : 1]
Input [4]: [i_item_sk#19, i_brand_id#20, i_class_id#21, i_category_id#22]

(18) Filter [codegen id : 1]
Input [4]: [i_item_sk#19, i_brand_id#20, i_class_id#21, i_category_id#22]
Condition : isnotnull(i_item_sk#19)
Condition : (((isnotnull(i_item_sk#19) AND isnotnull(i_brand_id#20)) AND isnotnull(i_class_id#21)) AND isnotnull(i_category_id#22))

(19) BroadcastExchange
Input [4]: [i_item_sk#19, i_brand_id#20, i_class_id#21, i_category_id#22]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ TakeOrderedAndProject [channel,i_brand_id,i_class_id,i_category_id,sum(sales),su
InputAdapter
BroadcastExchange #10
WholeStageCodegen (1)
Filter [i_item_sk]
Filter [i_item_sk,i_brand_id,i_class_id,i_category_id]
ColumnarToRow
InputAdapter
Scan parquet spark_catalog.default.item [i_item_sk,i_brand_id,i_class_id,i_category_id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,15 @@ Input [3]: [cs_item_sk#18, cs_sold_date_sk#19, d_date_sk#20]
Output [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]
Batched: true
Location [not included in comparison]/{warehouse_dir}/item]
PushedFilters: [IsNotNull(i_item_sk)]
PushedFilters: [IsNotNull(i_item_sk), IsNotNull(i_brand_id), IsNotNull(i_class_id), IsNotNull(i_category_id)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(25) ColumnarToRow [codegen id : 5]
Input [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]

(26) Filter [codegen id : 5]
Input [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]
Condition : isnotnull(i_item_sk#21)
Condition : (((isnotnull(i_item_sk#21) AND isnotnull(i_brand_id#22)) AND isnotnull(i_class_id#23)) AND isnotnull(i_category_id#24))

(27) BroadcastExchange
Input [4]: [i_item_sk#21, i_brand_id#22, i_class_id#23, i_category_id#24]
Expand Down
Loading

0 comments on commit d5dda74

Please sign in to comment.