diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index 3ee89f33b1a38..cfd1c47ccce9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -70,11 +70,10 @@ abstract class FileScanBuilder( } override def pushFilters(filters: Seq[Expression]): Seq[Expression] = { - val (deterministicFilters, nonDeterminsticFilters) = filters - .filter(f => DataSourceUtils.shouldPushFilter(f)) - .partition(_.deterministic) + val (filtersToPush, filtersToIgnore) = filters + .partition(f => f.deterministic && DataSourceUtils.shouldPushFilter(f)) val (partitionFilters, dataFilters) = - DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, deterministicFilters) + DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, filtersToPush) this.partitionFilters = partitionFilters this.dataFilters = dataFilters val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] @@ -85,7 +84,7 @@ abstract class FileScanBuilder( } } pushedDataFilters = pushDataFilters(translatedFilters.toArray) - dataFilters ++ nonDeterminsticFilters + dataFilters ++ filtersToIgnore } override def pushedFilters: Array[Predicate] = pushedDataFilters.map(_.toV2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 94f0bb109b1ae..edd36bd0b9df1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -308,31 +308,4 @@ class CollationSuite extends QueryTest checkAnswer(sql(s"SELECT COUNT(DISTINCT c1) FROM $tableName"), Seq(Row(1))) } } - - test("disable filter pushdown") { - val tableName = "parquet_dummy_t2" - val collation = "'sr_ci_ai'" - withTable(tableName) { - spark.sql( - s""" - | CREATE TABLE $tableName(c1 STRING COLLATE $collation) USING PARQUET - |""".stripMargin) - spark.sql(s"INSERT INTO $tableName VALUES ('aaa')") - spark.sql(s"INSERT INTO $tableName VALUES ('AAA')") - - val filters = Seq( - (">=", Seq(Row("aaa"), Row("AAA"))), - ("<=", Seq(Row("aaa"), Row("AAA"))), - (">", Seq()), - ("<", Seq()), - ("!=", Seq()) - ) - - filters.foreach { filter => - val df = sql(s"SELECT * FROM $tableName WHERE c1 ${filter._1} collate('aaa', $collation)") - assert(df.queryExecution.toString().contains("PushedFilters: []")) - checkAnswer(df, filter._2) - } - } - } } \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index da2705f7c72b1..c85b0d9adae35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -2208,6 +2208,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } } + + test("disable filter pushdown for collated strings") { + withTempPath { path => + val collation = "'SR_CI_AI'" + val df = sql( + s""" SELECT collate(c, $collation) as c + |FROM VALUES ('aaa'), ('AAA'), ('bbb') + |as data(c) + |""".stripMargin) + + df.write.parquet(path.getAbsolutePath) + + val filters = Seq( + ("==", Seq(Row("aaa"), Row("AAA"))), + ("!=", Seq(Row("bbb"))), + ("<", Seq()), + ("<=", Seq(Row("aaa"), Row("AAA"))), + (">", Seq(Row("bbb"))), + (">=", Seq(Row("aaa"), Row("AAA"), Row("bbb"))), + ) + + filters.foreach { filter => + val readback = spark.read.parquet(path.getAbsolutePath) + .where(s"c ${filter._1} collate('aaa', $collation)") + val explain = readback.queryExecution.explainString(ExplainMode.fromString("extended")) + assert(explain.contains("PushedFilters: []")) + checkAnswer(readback, filter._2) + } + } + } } @ExtendedSQLTest