diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 4155b00820f4..72f885a93962 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -159,6 +159,27 @@ impl FilterExec { }) } + fn extend_constants( + input: &Arc, + predicate: &Arc, + ) -> Vec> { + let mut res_constants = Vec::new(); + let input_eqs = input.equivalence_properties(); + + let conjunctions = split_conjunction(predicate); + for conjunction in conjunctions { + if let Some(binary) = conjunction.as_any().downcast_ref::() { + if binary.op() == &Operator::Eq { + if input_eqs.is_expr_constant(binary.left()) { + res_constants.push(binary.right().clone()) + } else if input_eqs.is_expr_constant(binary.right()) { + res_constants.push(binary.left().clone()) + } + } + } + } + res_constants + } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, @@ -181,8 +202,12 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| Arc::new(column) as _); + // this is for statistics eq_properties = eq_properties.add_constants(constants); - + // this is for logical constant (for example: a = '1', then a could be marked as a constant) + // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) + eq_properties = + eq_properties.add_constants(Self::extend_constants(input, predicate)); Ok(PlanProperties::new( eq_properties, input.output_partitioning().clone(), // Output Partitioning diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt new file mode 100644 index 000000000000..05e622db8a02 --- /dev/null +++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# prepare table +statement ok +CREATE UNBOUNDED EXTERNAL TABLE data ( + "date" VARCHAR, + "ticker" VARCHAR, + "time" VARCHAR, +) STORED AS CSV +WITH ORDER ("date", "ticker", "time") +LOCATION './a.parquet'; + + +# query +query TT +explain SELECT * FROM data +WHERE ticker = 'A' +ORDER BY "date", "time"; +---- +logical_plan +Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST +--Filter: data.ticker = Utf8("A") +----TableScan: data projection=[date, ticker, time] +physical_plan +SortPreservingMergeExec: [date@0 ASC NULLS LAST,time@2 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: ticker@1 = A +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] + +# query +query TT +explain SELECT * FROM data +WHERE date = 'A' +ORDER BY "ticker", "time"; +---- +logical_plan +Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST +--Filter: data.date = Utf8("A") +----TableScan: data projection=[date, ticker, time] +physical_plan +SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: date@0 = A +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST]