Skip to content

Commit

Permalink
move filter pushdown test into parquetFilterSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
stefankandic committed Jan 30, 2024
1 parent 5a359b8 commit 5cdc733
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ abstract class FileScanBuilder(
}

override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
val (deterministicFilters, nonDeterminsticFilters) = filters
.filter(f => DataSourceUtils.shouldPushFilter(f))
.partition(_.deterministic)
val (filtersToPush, filtersToIgnore) = filters
.partition(f => f.deterministic && DataSourceUtils.shouldPushFilter(f))
val (partitionFilters, dataFilters) =
DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, deterministicFilters)
DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, filtersToPush)
this.partitionFilters = partitionFilters
this.dataFilters = dataFilters
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
Expand All @@ -85,7 +84,7 @@ abstract class FileScanBuilder(
}
}
pushedDataFilters = pushDataFilters(translatedFilters.toArray)
dataFilters ++ nonDeterminsticFilters
dataFilters ++ filtersToIgnore
}

override def pushedFilters: Array[Predicate] = pushedDataFilters.map(_.toV2)
Expand Down
27 changes: 0 additions & 27 deletions sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,31 +308,4 @@ class CollationSuite extends QueryTest
checkAnswer(sql(s"SELECT COUNT(DISTINCT c1) FROM $tableName"), Seq(Row(1)))
}
}

test("disable filter pushdown") {
val tableName = "parquet_dummy_t2"
val collation = "'sr_ci_ai'"
withTable(tableName) {
spark.sql(
s"""
| CREATE TABLE $tableName(c1 STRING COLLATE $collation) USING PARQUET
|""".stripMargin)
spark.sql(s"INSERT INTO $tableName VALUES ('aaa')")
spark.sql(s"INSERT INTO $tableName VALUES ('AAA')")

val filters = Seq(
(">=", Seq(Row("aaa"), Row("AAA"))),
("<=", Seq(Row("aaa"), Row("AAA"))),
(">", Seq()),
("<", Seq()),
("!=", Seq())
)

filters.foreach { filter =>
val df = sql(s"SELECT * FROM $tableName WHERE c1 ${filter._1} collate('aaa', $collation)")
assert(df.queryExecution.toString().contains("PushedFilters: []"))
checkAnswer(df, filter._2)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2208,6 +2208,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
}

test("disable filter pushdown for collated strings") {
withTempPath { path =>
val collation = "'SR_CI_AI'"
val df = sql(
s""" SELECT collate(c, $collation) as c
|FROM VALUES ('aaa'), ('AAA'), ('bbb')
|as data(c)
|""".stripMargin)

df.write.parquet(path.getAbsolutePath)

val filters = Seq(
("==", Seq(Row("aaa"), Row("AAA"))),
("!=", Seq(Row("bbb"))),
("<", Seq()),
("<=", Seq(Row("aaa"), Row("AAA"))),
(">", Seq(Row("bbb"))),
(">=", Seq(Row("aaa"), Row("AAA"), Row("bbb"))),
)

filters.foreach { filter =>
val readback = spark.read.parquet(path.getAbsolutePath)
.where(s"c ${filter._1} collate('aaa', $collation)")
val explain = readback.queryExecution.explainString(ExplainMode.fromString("extended"))
assert(explain.contains("PushedFilters: []"))
checkAnswer(readback, filter._2)
}
}
}
}

@ExtendedSQLTest
Expand Down

0 comments on commit 5cdc733

Please sign in to comment.