From 4b93f5596e712c44e596b7c89c508388ab9348c7 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Sat, 16 Mar 2024 21:47:40 -0500 Subject: [PATCH 1/9] fix bugs in adding extra SortExec --- .../src/physical_optimizer/enforce_sorting.rs | 14 ++++- datafusion/physical-plan/src/filter.rs | 58 ++++++++++++++++++- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 5bf21c3dfab5..2fed96da10a8 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -131,7 +131,7 @@ fn update_coalesce_ctx_children( coalesce_context.data = if children.is_empty() { // Plan has no children, it cannot be a `CoalescePartitionsExec`. false - } else if is_coalesce_partitions(&coalesce_context.plan) { + } else if is_coalesce_partitions(&coalesce_context.plan) || coalesce_context.data { // Initiate a connection: true } else { @@ -162,8 +162,18 @@ impl PhysicalOptimizerRule for EnforceSorting { // remove unnecessary sorts, and optimize sort-sensitive operators: let adjusted = plan_requirements.transform_up(&ensure_sorting)?.data; let new_plan = if config.optimizer.repartition_sorts { - let plan_with_coalesce_partitions = + let data_vec: Vec<_> = adjusted.children.iter().map(|x| x.data).collect(); + let mut plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); + plan_with_coalesce_partitions.children = plan_with_coalesce_partitions + .children + .into_iter() + .enumerate() + .map(|(id, mut child)| { + child.data = data_vec[id]; + child + }) + .collect(); let parallel = plan_with_coalesce_partitions .transform_up(¶llelize_sorts) .data()?; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 4155b00820f4..129c7ad65bd8 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -19,6 +19,7 @@ //! include in its output batches. use std::any::Any; +use std::collections::HashSet; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -37,10 +38,11 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ @@ -158,7 +160,53 @@ impl FilterExec { column_statistics, }) } + fn is_constant(expr: &Arc) -> bool { + expr.as_any().is::() + && expr.children().iter().all(|child| Self::is_constant(child)) + } + + fn extend_constants( + predicate: &Arc, + columns: &HashSet, + ) -> Vec> { + let mut res_constants = Vec::new(); + + let constant_match = |x: &Arc, + y: &Arc| + -> Option> { + if columns + .iter() + .any(|col| x.as_any().downcast_ref::() == Some(col)) + && Self::is_constant(y) + { + Some(x.clone()) + } else { + None + } + }; + + predicate + .apply(&mut |expr| { + if let Some(binary) = expr.as_any().downcast_ref::() { + if binary.op() == &Operator::Eq { + if let Some(constant_expr) = + constant_match(binary.left(), binary.right()) + { + res_constants.push(constant_expr) + } + if let Some(constant_expr) = + constant_match(binary.right(), binary.left()) + { + res_constants.push(constant_expr) + } + } + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("no way to return error during recursion"); + res_constants + } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, @@ -181,8 +229,14 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| Arc::new(column) as _); + // this is for statistics eq_properties = eq_properties.add_constants(constants); - + // this is for logical constant (for example: a = '1', then a could be marked as a constant) + eq_properties = eq_properties.add_constants(Self::extend_constants( + predicate, + &collect_columns(predicate), + )); + println!("eq_properties {:?}", eq_properties); Ok(PlanProperties::new( eq_properties, input.output_partitioning().clone(), // Output Partitioning From 83e10164a647ddadebb2cec9e029cb5d957d7e5c Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Sat, 16 Mar 2024 22:11:15 -0500 Subject: [PATCH 2/9] adding tests --- datafusion/physical-plan/src/filter.rs | 1 - .../test_files/filter_without_sort_exec.slt | 61 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 datafusion/sqllogictest/test_files/filter_without_sort_exec.slt diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 129c7ad65bd8..271a25312471 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -236,7 +236,6 @@ impl FilterExec { predicate, &collect_columns(predicate), )); - println!("eq_properties {:?}", eq_properties); Ok(PlanProperties::new( eq_properties, input.output_partitioning().clone(), // Output Partitioning diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt new file mode 100644 index 000000000000..05e622db8a02 --- /dev/null +++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# prepare table +statement ok +CREATE UNBOUNDED EXTERNAL TABLE data ( + "date" VARCHAR, + "ticker" VARCHAR, + "time" VARCHAR, +) STORED AS CSV +WITH ORDER ("date", "ticker", "time") +LOCATION './a.parquet'; + + +# query +query TT +explain SELECT * FROM data +WHERE ticker = 'A' +ORDER BY "date", "time"; +---- +logical_plan +Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST +--Filter: data.ticker = Utf8("A") +----TableScan: data projection=[date, ticker, time] +physical_plan +SortPreservingMergeExec: [date@0 ASC NULLS LAST,time@2 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: ticker@1 = A +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] + +# query +query TT +explain SELECT * FROM data +WHERE date = 'A' +ORDER BY "ticker", "time"; +---- +logical_plan +Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST +--Filter: data.date = Utf8("A") +----TableScan: data projection=[date, ticker, time] +physical_plan +SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: date@0 = A +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] From 9ffa6adfacf0a6ac09ca4dfd003d4b525885ce28 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Sun, 17 Mar 2024 21:06:10 -0500 Subject: [PATCH 3/9] optimize code --- datafusion/physical-plan/src/filter.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 271a25312471..b99c84e59256 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -161,8 +161,7 @@ impl FilterExec { }) } fn is_constant(expr: &Arc) -> bool { - expr.as_any().is::() - && expr.children().iter().all(|child| Self::is_constant(child)) + expr.as_any().is::() && expr.children().iter().all(Self::is_constant) } fn extend_constants( @@ -184,7 +183,7 @@ impl FilterExec { None } }; - + let mut contains_or = false; predicate .apply(&mut |expr| { if let Some(binary) = expr.as_any().downcast_ref::() { @@ -193,18 +192,21 @@ impl FilterExec { constant_match(binary.left(), binary.right()) { res_constants.push(constant_expr) - } - if let Some(constant_expr) = + } else if let Some(constant_expr) = constant_match(binary.right(), binary.left()) { res_constants.push(constant_expr) } + } else if binary.op() == &Operator::Or { + contains_or = true; } } Ok(TreeNodeRecursion::Continue) }) .expect("no way to return error during recursion"); - + if contains_or { + res_constants.clear(); + } res_constants } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -232,6 +234,7 @@ impl FilterExec { // this is for statistics eq_properties = eq_properties.add_constants(constants); // this is for logical constant (for example: a = '1', then a could be marked as a constant) + // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) eq_properties = eq_properties.add_constants(Self::extend_constants( predicate, &collect_columns(predicate), From eda1a0fa872b91c45d07f791478d7da6224d4735 Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Mon, 18 Mar 2024 08:05:58 -0500 Subject: [PATCH 4/9] Update datafusion/physical-plan/src/filter.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> --- datafusion/physical-plan/src/filter.rs | 48 +++++++------------------- 1 file changed, 12 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index b99c84e59256..9d353c488b6c 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -165,47 +165,23 @@ impl FilterExec { } fn extend_constants( + input: &Arc, predicate: &Arc, - columns: &HashSet, ) -> Vec> { let mut res_constants = Vec::new(); - - let constant_match = |x: &Arc, - y: &Arc| - -> Option> { - if columns - .iter() - .any(|col| x.as_any().downcast_ref::() == Some(col)) - && Self::is_constant(y) - { - Some(x.clone()) - } else { - None - } - }; - let mut contains_or = false; - predicate - .apply(&mut |expr| { - if let Some(binary) = expr.as_any().downcast_ref::() { - if binary.op() == &Operator::Eq { - if let Some(constant_expr) = - constant_match(binary.left(), binary.right()) - { - res_constants.push(constant_expr) - } else if let Some(constant_expr) = - constant_match(binary.right(), binary.left()) - { - res_constants.push(constant_expr) - } - } else if binary.op() == &Operator::Or { - contains_or = true; + let input_eqs = input.equivalence_properties(); + + let conjunctions = split_conjunction(predicate); + for conjunction in conjunctions { + if let Some(binary) = conjunction.as_any().downcast_ref::() { + if binary.op() == &Operator::Eq { + if input_eqs.is_expr_constant(binary.left()) { + res_constants.push(binary.right().clone()) + } else if input_eqs.is_expr_constant(binary.right()) { + res_constants.push(binary.left().clone()) } } - Ok(TreeNodeRecursion::Continue) - }) - .expect("no way to return error during recursion"); - if contains_or { - res_constants.clear(); + } } res_constants } From 3bec3f3b487f799ac56eabfd71cdb60929e14523 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Mon, 18 Mar 2024 08:09:51 -0500 Subject: [PATCH 5/9] optimize code --- datafusion/core/src/physical_optimizer/enforce_sorting.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 2fed96da10a8..05fcb1e97aaf 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -131,7 +131,7 @@ fn update_coalesce_ctx_children( coalesce_context.data = if children.is_empty() { // Plan has no children, it cannot be a `CoalescePartitionsExec`. false - } else if is_coalesce_partitions(&coalesce_context.plan) || coalesce_context.data { + } else if is_coalesce_partitions(&coalesce_context.plan) { // Initiate a connection: true } else { From a89929da1673c0d35b084e677ef6845bd12862ad Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Mon, 18 Mar 2024 08:33:44 -0500 Subject: [PATCH 6/9] optimize code --- datafusion/physical-plan/src/filter.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 9d353c488b6c..0a9db1c68934 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -211,10 +211,8 @@ impl FilterExec { eq_properties = eq_properties.add_constants(constants); // this is for logical constant (for example: a = '1', then a could be marked as a constant) // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) - eq_properties = eq_properties.add_constants(Self::extend_constants( - predicate, - &collect_columns(predicate), - )); + eq_properties = + eq_properties.add_constants(Self::extend_constants(input, predicate)); Ok(PlanProperties::new( eq_properties, input.output_partitioning().clone(), // Output Partitioning From 87cfdba39d73f6932c301411d838f12529fd3b08 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Mon, 18 Mar 2024 08:42:32 -0500 Subject: [PATCH 7/9] optimize code --- datafusion/physical-plan/src/filter.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 0a9db1c68934..beda1f0f32e1 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -160,9 +160,6 @@ impl FilterExec { column_statistics, }) } - fn is_constant(expr: &Arc) -> bool { - expr.as_any().is::() && expr.children().iter().all(Self::is_constant) - } fn extend_constants( input: &Arc, From 826c5aa7ca0be0d37a9a86430a61a1f2180da13c Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Mon, 18 Mar 2024 08:47:43 -0500 Subject: [PATCH 8/9] optimize code --- .../core/src/physical_optimizer/enforce_sorting.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 05fcb1e97aaf..5bf21c3dfab5 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -162,18 +162,8 @@ impl PhysicalOptimizerRule for EnforceSorting { // remove unnecessary sorts, and optimize sort-sensitive operators: let adjusted = plan_requirements.transform_up(&ensure_sorting)?.data; let new_plan = if config.optimizer.repartition_sorts { - let data_vec: Vec<_> = adjusted.children.iter().map(|x| x.data).collect(); - let mut plan_with_coalesce_partitions = + let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); - plan_with_coalesce_partitions.children = plan_with_coalesce_partitions - .children - .into_iter() - .enumerate() - .map(|(id, mut child)| { - child.data = data_vec[id]; - child - }) - .collect(); let parallel = plan_with_coalesce_partitions .transform_up(¶llelize_sorts) .data()?; From b4aed3bd05cd3a66b4e2bb1102976bd18254a178 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Mon, 18 Mar 2024 08:59:00 -0500 Subject: [PATCH 9/9] fix clippy --- datafusion/physical-plan/src/filter.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index beda1f0f32e1..72f885a93962 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -19,7 +19,6 @@ //! include in its output batches. use std::any::Any; -use std::collections::HashSet; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -38,11 +37,10 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; +use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{