Skip to content

Commit

Permalink
add check for determinism in shouldPushFilter method
Browse files Browse the repository at this point in the history
  • Loading branch information
stefankandic committed Jan 30, 2024
1 parent 5cdc733 commit 61aecfc
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,19 @@ object DataSourceUtils extends PredicateHelper {
* @param expression The filter expression to be evaluated.
* @return A boolean indicating whether the filter should be pushed down or not.
*/
def shouldPushFilter(expression: Expression): Boolean = expression match {
case attr: AttributeReference =>
attr.dataType match {
// don't push down filters for string columns with non-default collation
// as it could lead to incorrect results
case st: StringType => st.isDefaultCollation
case _ => true
}
def shouldPushFilter(expression: Expression): Boolean = {
def shouldPushFilterRecursive(expression: Expression): Boolean = expression match {
case attr: AttributeReference =>
attr.dataType match {
// don't push down filters for string columns with non-default collation
// as it could lead to incorrect results
case st: StringType => st.isDefaultCollation
case _ => true
}

case _ => expression.children.forall(shouldPushFilterRecursive)
}

case _ => expression.children.forall(shouldPushFilter)
expression.deterministic && shouldPushFilterRecursive(expression)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,10 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

val deterministicFiltersToPush = filters
.filter(_.deterministic)
val filtersToPush = filters
.filter(f => DataSourceUtils.shouldPushFilter(f))
val normalizedFilters = DataSourceStrategy.normalizeExprs(
deterministicFiltersToPush, l.output)
filtersToPush, l.output)

val partitionColumns =
l.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
_))
if filters.nonEmpty && fsRelation.partitionSchema.nonEmpty =>
val normalizedFilters = DataSourceStrategy.normalizeExprs(
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)
&& DataSourceUtils.shouldPushFilter(f)),
filters.filter(f => DataSourceUtils.shouldPushFilter(f) &&
!SubqueryExpression.hasSubquery(f)),
logicalRelation.output)
val (partitionKeyFilters, _) = DataSourceUtils
.getPartitionFiltersAndDataFilters(partitionSchema, normalizedFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ abstract class FileScanBuilder(

override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
val (filtersToPush, filtersToIgnore) = filters
.partition(f => f.deterministic && DataSourceUtils.shouldPushFilter(f))
.partition(f => DataSourceUtils.shouldPushFilter(f))
val (partitionFilters, dataFilters) =
DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, filtersToPush)
this.partitionFilters = partitionFilters
Expand Down

0 comments on commit 61aecfc

Please sign in to comment.